Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENG-6654] fixfix: correct misunderstanding, handle conflicts #836

Merged
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ services:

worker:
image: quay.io/centerforopenscience/share:develop
command: /usr/local/bin/celery --app project worker --uid daemon -l INFO
command:
chown -R daemon:daemon /elastic8_certs/ &&
/usr/local/bin/celery --app project worker --uid daemon -l INFO
depends_on:
- postgres
- rabbitmq
Expand Down
31 changes: 3 additions & 28 deletions share/search/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import threading
import time

import amqp.exceptions
from django.conf import settings
import kombu
from kombu.mixins import ConsumerMixin
Expand Down Expand Up @@ -61,8 +60,6 @@ def start_daemonthreads_for_strategy(self, index_strategy):
index_strategy=index_strategy,
message_callback=_daemon.on_message,
)
# give the daemon a more robust callback for ack-ing
_daemon.ack_callback = _consumer.ensure_ack
# spin up daemonthreads, ready for messages
self._daemonthreads.extend(_daemon.start())
# start a thread to consume messages from this strategy's queues
Expand All @@ -82,7 +79,7 @@ def stop_daemonthreads(self, *, wait=False):


class KombuMessageConsumer(ConsumerMixin):
PREFETCH_COUNT = 7500
PREFETCH_COUNT = settings.ELASTICSEARCH['CHUNK_SIZE']
mfraezz marked this conversation as resolved.
Show resolved Hide resolved

should_stop: bool # (from ConsumerMixin)

Expand Down Expand Up @@ -130,28 +127,9 @@ def consume(self, *args, **kwargs):
consume = self.connection.ensure(self.connection, super().consume)
return consume(*args, **kwargs)

def ensure_ack(self, daemon_message: messages.DaemonMessage):
# if the connection the message came thru is no longer usable,
# use `kombu.Connection.autoretry` to revive it for an ack
try:
daemon_message.ack()
except (ConnectionError, amqp.exceptions.ConnectionError):
@self.connection.autoretry
def _do_ack(*, channel):
try:
channel.basic_ack(daemon_message.kombu_message.delivery_tag)
finally:
channel.close()
_do_ack()


def _default_ack_callback(daemon_message: messages.DaemonMessage) -> None:
daemon_message.ack()


class IndexerDaemon:
MAX_LOCAL_QUEUE_SIZE = 5000
ack_callback: Callable[[messages.DaemonMessage], None]
MAX_LOCAL_QUEUE_SIZE = settings.ELASTICSEARCH['CHUNK_SIZE']

def __init__(self, index_strategy, *, stop_event=None, daemonthread_context=None):
self.stop_event = (
Expand All @@ -163,7 +141,6 @@ def __init__(self, index_strategy, *, stop_event=None, daemonthread_context=None
self.__daemonthread_context = daemonthread_context or contextlib.nullcontext
self.__local_message_queues = {}
self.__started = False
self.ack_callback = _default_ack_callback

def start(self) -> list[threading.Thread]:
if self.__started:
Expand Down Expand Up @@ -192,7 +169,6 @@ def start_typed_loop_and_queue(self, message_type) -> threading.Thread:
local_message_queue=_queue_from_rabbit_to_daemon,
log_prefix=f'{repr(self)} MessageHandlingLoop: ',
daemonthread_context=self.__daemonthread_context,
ack_callback=self.ack_callback,
)
return _handling_loop.start_thread()

Expand Down Expand Up @@ -226,7 +202,6 @@ class MessageHandlingLoop:
local_message_queue: queue.Queue
log_prefix: str
daemonthread_context: Callable[[], contextlib.AbstractContextManager]
ack_callback: Callable[[messages.DaemonMessage], None]
_leftover_daemon_messages_by_target_id = None

def __post_init__(self):
Expand Down Expand Up @@ -310,7 +285,7 @@ def _handle_some_messages(self):
sentry_sdk.capture_message('error handling message', extras={'message_response': message_response})
target_id = message_response.index_message.target_id
for daemon_message in daemon_messages_by_target_id.pop(target_id, ()):
self.ack_callback(daemon_message)
daemon_message.ack() # finally set it free
if daemon_messages_by_target_id: # should be empty by now
logger.error('%sUnhandled messages?? %s', self.log_prefix, len(daemon_messages_by_target_id))
sentry_sdk.capture_message(
Expand Down
69 changes: 47 additions & 22 deletions share/search/index_strategy/elastic8.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ def index_mappings(self):
raise NotImplementedError

@abc.abstractmethod
def build_elastic_actions(self, messages_chunk: messages.MessagesChunk) -> typing.Iterable[tuple[int, dict]]:
# yield (message_target_id, elastic_action) pairs
def build_elastic_actions(
self,
messages_chunk: messages.MessagesChunk,
) -> typing.Iterable[tuple[int, dict | typing.Iterable[dict]]]:
# yield (message_target_id, [elastic_action, ...]) pairs
raise NotImplementedError

def before_chunk(
Expand Down Expand Up @@ -148,10 +151,17 @@ def pls_handle_messages_chunk(self, messages_chunk):
_indexname = _response_body['_index']
_is_done = _ok or (_op_type == 'delete' and _status == 404)
if _is_done:
_action_tracker.action_done(_indexname, _docid)
_finished_message_id = _action_tracker.action_done(_indexname, _docid)
if _finished_message_id is not None:
yield messages.IndexMessageResponse(
is_done=True,
index_message=messages.IndexMessage(messages_chunk.message_type, _finished_message_id),
status_code=HTTPStatus.OK.value,
error_text=None,
)
_action_tracker.forget_message(_finished_message_id)
else:
_action_tracker.action_errored(_indexname, _docid)
# yield error responses immediately
yield messages.IndexMessageResponse(
is_done=False,
index_message=messages.IndexMessage(
Expand All @@ -161,16 +171,14 @@ def pls_handle_messages_chunk(self, messages_chunk):
status_code=_status,
error_text=str(_response_body),
)
self.after_chunk(messages_chunk, _indexnames)
# yield successes after the whole chunk completes
# (since one message may involve several actions)
for _messageid in _action_tracker.all_done_messages():
for _message_id in _action_tracker.remaining_done_messages():
yield messages.IndexMessageResponse(
is_done=True,
index_message=messages.IndexMessage(messages_chunk.message_type, _messageid),
index_message=messages.IndexMessage(messages_chunk.message_type, _message_id),
status_code=HTTPStatus.OK.value,
error_text=None,
)
self.after_chunk(messages_chunk, _indexnames)

# abstract method from IndexStrategy
def pls_make_default_for_searching(self, specific_index: IndexStrategy.SpecificIndex):
Expand Down Expand Up @@ -202,14 +210,18 @@ def _alias_for_keeping_live(self):
def _elastic_actions_with_index(self, messages_chunk, indexnames, action_tracker: _ActionTracker):
if not indexnames:
raise ValueError('cannot index to no indexes')
for _message_target_id, _elastic_action in self.build_elastic_actions(messages_chunk):
_docid = _elastic_action['_id']
for _indexname in indexnames:
action_tracker.add_action(_message_target_id, _indexname, _docid)
yield {
**_elastic_action,
'_index': _indexname,
}
for _message_target_id, _elastic_actions in self.build_elastic_actions(messages_chunk):
if isinstance(_elastic_actions, dict): # allow a single action
_elastic_actions = [_elastic_actions]
for _elastic_action in _elastic_actions:
_docid = _elastic_action['_id']
for _indexname in indexnames:
action_tracker.add_action(_message_target_id, _indexname, _docid)
yield {
**_elastic_action,
'_index': _indexname,
}
action_tracker.done_scheduling(_message_target_id)

def _get_indexnames_for_alias(self, alias_name) -> set[str]:
try:
Expand Down Expand Up @@ -371,24 +383,37 @@ class _ActionTracker:
default_factory=lambda: collections.defaultdict(set),
)
errored_messageids: set[int] = dataclasses.field(default_factory=set)
fully_scheduled_messageids: set[int] = dataclasses.field(default_factory=set)

def add_action(self, message_id: int, index_name: str, doc_id: str):
self.messageid_by_docid[doc_id] = message_id
self.actions_by_messageid[message_id].add((index_name, doc_id))

def action_done(self, index_name: str, doc_id: str):
_messageid = self.messageid_by_docid[doc_id]
_message_actions = self.actions_by_messageid[_messageid]
_message_actions.discard((index_name, doc_id))
def action_done(self, index_name: str, doc_id: str) -> int | None:
_messageid = self.get_message_id(doc_id)
_remaining_message_actions = self.actions_by_messageid[_messageid]
_remaining_message_actions.discard((index_name, doc_id))
# return the message id only if this was the last action for that message
return (
None
if _remaining_message_actions or (_messageid not in self.fully_scheduled_messageids)
else _messageid
)

def action_errored(self, index_name: str, doc_id: str):
_messageid = self.messageid_by_docid[doc_id]
self.errored_messageids.add(_messageid)

def done_scheduling(self, message_id: int):
self.fully_scheduled_messageids.add(message_id)

def forget_message(self, message_id: int):
del self.actions_by_messageid[message_id]

def get_message_id(self, doc_id: str):
return self.messageid_by_docid[doc_id]

def all_done_messages(self):
def remaining_done_messages(self):
for _messageid, _actions in self.actions_by_messageid.items():
if _messageid not in self.errored_messageids:
assert not _actions
Expand Down
73 changes: 60 additions & 13 deletions share/search/index_strategy/trovesearch_denorm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Literal,
)

import celery
from django.conf import settings
import elasticsearch8
from primitive_metadata import primitive_rdf as rdf
Expand Down Expand Up @@ -154,15 +155,14 @@ def _paths_and_values_mappings(self):

# override method from Elastic8IndexStrategy
def after_chunk(self, messages_chunk: messages.MessagesChunk, indexnames: Iterable[str]):
# refresh to avoid delete-by-query conflicts
self.es8_client.indices.refresh(index=','.join(indexnames))
# delete any docs that belong to cards in this chunk but weren't touched by indexing
self.es8_client.delete_by_query(
index=list(indexnames),
query={'bool': {'must': [
{'terms': {'card.card_pk': messages_chunk.target_ids_chunk}},
{'range': {'chunk_timestamp': {'lt': messages_chunk.timestamp}}},
]}},
task__delete_iri_value_scraps.apply_async(
kwargs={
'index_strategy_name': self.name,
'indexnames': list(indexnames),
'card_pks': messages_chunk.target_ids_chunk,
'timestamp': messages_chunk.timestamp,
},
countdown=3, # TODO: config?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Should probably be configurable, yes.
Something like
settings.ELASTICSEARCH['COUNTDOWN_DELAY'] = int(os.environ.get('ELASTICSEARCH_COUNTDOWN_DELAY', 3))
would probably be reasonable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

went with ELASTICSEARCH_POST_INDEX_DELAY

)

# abstract method from Elastic8IndexStrategy
Expand All @@ -173,12 +173,13 @@ def build_elastic_actions(self, messages_chunk: messages.MessagesChunk):
_docbuilder = self._SourcedocBuilder(_indexcard_rdf, messages_chunk.timestamp)
if not _docbuilder.should_skip(): # if skipped, will be deleted
_indexcard_pk = _indexcard_rdf.indexcard_id
for _doc_id, _doc in _docbuilder.build_docs():
_index_action = self.build_index_action(
yield _indexcard_pk, (
self.build_index_action(
doc_id=_doc_id,
doc_source=_doc,
)
yield _indexcard_pk, _index_action
for _doc_id, _doc in _docbuilder.build_docs()
)
_remaining_indexcard_pks.discard(_indexcard_pk)
# delete any that were skipped for any reason
for _indexcard_pk in _remaining_indexcard_pks:
Expand Down Expand Up @@ -279,7 +280,10 @@ def should_skip(self) -> bool:

def build_docs(self) -> Iterator[tuple[str, dict]]:
# index once without `iri_value`
yield self._doc_id(), {'card': self._card_subdoc}
yield self._doc_id(), {
'card': self._card_subdoc,
'chunk_timestamp': self.chunk_timestamp,
}
for _iri in self._fullwalk.paths_by_iri:
yield self._doc_id(_iri), {
'card': self._card_subdoc,
Expand Down Expand Up @@ -888,3 +892,46 @@ def _any_query(queries: abc.Collection[dict]):
(_query,) = queries
return _query
return {'bool': {'should': list(queries), 'minimum_should_match': 1}}


@celery.shared_task(
name='share.search.index_strategy.trovesearch_denorm.task__delete_iri_value_scraps',
max_retries=None, # retries only on delete_by_query conflicts -- should work eventually!
retry_backoff=True,
bind=True, # for explicit retry
)
def task__delete_iri_value_scraps(
task: celery.Task,
index_strategy_name: str,
card_pks: list[int],
indexnames: list[str],
timestamp: int,
):
'''followup task to delete value-docs no longer present

each time an index-card is updated, value-docs are created (or updated) for each iri value
present in the card's contents -- if some values are absent from a later update, the
corresponding docs will remain untouched

this task deletes those untouched value-docs after the index has refreshed at its own pace
(allowing a slightly longer delay for items to _stop_ matching queries for removed values)
'''
from share.search.index_strategy import get_index_strategy
_index_strategy = get_index_strategy(index_strategy_name)
assert isinstance(_index_strategy, Elastic8IndexStrategy)
# delete any docs that belong to cards in this chunk but weren't touched by indexing
_delete_resp = _index_strategy.es8_client.delete_by_query(
index=indexnames,
query={'bool': {'must': [
{'terms': {'card.card_pk': card_pks}},
{'range': {'chunk_timestamp': {'lt': timestamp}}},
]}},
params={
'slices': 'auto',
'conflicts': 'proceed', # count conflicts instead of halting
'request_cache': False,
},
)
_conflict_count = _delete_resp.get('version_conflicts', 0)
if _conflict_count > 0:
raise task.retry()
2 changes: 1 addition & 1 deletion share/search/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def __init__(self, *, kombu_message=None):
def ack(self):
if self.kombu_message is None:
raise exceptions.DaemonMessageError('ack! called DaemonMessage.ack() but there is nothing to ack')
return self.kombu_message.ack()
self.kombu_message.ack()

def requeue(self):
if self.kombu_message is None:
Expand Down
10 changes: 10 additions & 0 deletions tests/share/search/index_strategy/_with_real_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ def setUp(self):
super().setUp()
self.enterContext(mock.patch('share.models.core._setup_user_token_and_groups'))
self.index_strategy = self.get_index_strategy()

def _fake_get_index_strategy(name):
if self.index_strategy.name == name:
return self.index_strategy
raise ValueError(f'unknown index strategy in test: {name}')

self.enterContext(mock.patch(
'share.search.index_strategy.get_index_strategy',
new=_fake_get_index_strategy,
))
self.index_messenger = IndexMessenger(
celery_app=celery_app,
index_strategys=[self.index_strategy],
Expand Down
24 changes: 22 additions & 2 deletions tests/share/search/index_strategy/test_trovesearch_denorm.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
from share.search.index_strategy.trovesearch_denorm import TrovesearchDenormIndexStrategy
from unittest import mock

from share.search.index_strategy.trovesearch_denorm import (
TrovesearchDenormIndexStrategy,
task__delete_iri_value_scraps,
)

from . import _common_trovesearch_tests


class TestTroveIndexcardFlats(_common_trovesearch_tests.CommonTrovesearchTests):
class TestTrovesearchDenorm(_common_trovesearch_tests.CommonTrovesearchTests):
def setUp(self):
super().setUp()

# make the followup delete task eager
def _fake_apply_async(*args, **kwargs):
kwargs['countdown'] = 0 # don't wait
task__delete_iri_value_scraps.apply(*args, **kwargs)
self.enterContext(
mock.patch.object(
task__delete_iri_value_scraps,
'apply_async',
new=_fake_apply_async,
)
)

# for RealElasticTestCase
def get_index_strategy(self):
return TrovesearchDenormIndexStrategy('test_trovesearch_denorm')
Loading