Skip to content

Commit

Permalink
Merge pull request #831 from aaxelb/fix/denorm-followup
Browse files Browse the repository at this point in the history
[ENG-6189] denorm fixes/followup
  • Loading branch information
aaxelb authored Nov 13, 2024
2 parents d6d5774 + f5b2169 commit adb4dfe
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 82 deletions.
4 changes: 3 additions & 1 deletion share/search/index_strategy/_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import functools
import logging
import typing

Expand Down Expand Up @@ -63,8 +64,9 @@ def indexname_prefix(self):
def indexname_wildcard(self):
return f'{self.indexname_prefix}*'

@property
@functools.cached_property
def current_indexname(self):
self.assert_strategy_is_current()
return ''.join((
self.indexname_prefix,
self.CURRENT_STRATEGY_CHECKSUM.hexdigest,
Expand Down
10 changes: 9 additions & 1 deletion share/search/index_strategy/elastic8.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,14 @@ def before_chunk(
messages_chunk: messages.MessagesChunk,
indexnames: typing.Iterable[str],
) -> None:
pass # implement when needed
... # implement when needed

def after_chunk(
self,
messages_chunk: messages.MessagesChunk,
indexnames: typing.Iterable[str],
) -> None:
... # implement when needed

###
# helper methods for subclasses to use (or override)
Expand Down Expand Up @@ -154,6 +161,7 @@ def pls_handle_messages_chunk(self, messages_chunk):
status_code=_status,
error_text=str(_response_body),
)
self.after_chunk(messages_chunk, _indexnames)
# yield successes after the whole chunk completes
# (since one message may involve several actions)
for _messageid in _action_tracker.all_done_messages():
Expand Down
24 changes: 13 additions & 11 deletions share/search/index_strategy/trovesearch_denorm.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TrovesearchDenormIndexStrategy(Elastic8IndexStrategy):
CURRENT_STRATEGY_CHECKSUM = ChecksumIri(
checksumalgorithm_name='sha-256',
salt='TrovesearchDenormIndexStrategy',
hexdigest='fa8fe6459f658877f84620412dcab5e2e70d0c949d8977354c586dca99ff2f28',
hexdigest='e538bbc5966a6a289da9e5ba51ecde5ff29528bf07e940716ef8a888d6601916',
)

# abstract method from IndexStrategy
Expand Down Expand Up @@ -83,6 +83,7 @@ def index_mappings(self):
'properties': {
'card': {'properties': self._card_mappings()},
'iri_value': {'properties': self._iri_value_mappings()},
'chunk_timestamp': {'type': 'unsigned_long'},
},
}

Expand Down Expand Up @@ -149,24 +150,24 @@ def _paths_and_values_mappings(self):
}

# override method from Elastic8IndexStrategy
def before_chunk(self, messages_chunk: messages.MessagesChunk, indexnames: Iterable[str]):
# delete all per-value docs (to account for missing values)
def after_chunk(self, messages_chunk: messages.MessagesChunk, indexnames: Iterable[str]):
# refresh to avoid delete-by-query conflicts
self.es8_client.indices.refresh(index=','.join(indexnames))
# delete any docs that belong to cards in this chunk but weren't touched by indexing
self.es8_client.delete_by_query(
index=list(indexnames),
query={'bool': {'must': [
{'terms': {'card.card_pk': messages_chunk.target_ids_chunk}},
{'exists': {'field': 'iri_value.value_iri'}},
{'range': {'chunk_timestamp': {'lt': messages_chunk.timestamp}}},
]}},
)
# (possible optimization: instead, hold onto doc_ids and (in `after_chunk`?)
# delete_by_query excluding those)

# abstract method from Elastic8IndexStrategy
def build_elastic_actions(self, messages_chunk: messages.MessagesChunk):
_indexcard_rdf_qs = ts.latest_rdf_for_indexcard_pks(messages_chunk.target_ids_chunk)
_remaining_indexcard_pks = set(messages_chunk.target_ids_chunk)
for _indexcard_rdf in _indexcard_rdf_qs:
_docbuilder = self._SourcedocBuilder(_indexcard_rdf)
_docbuilder = self._SourcedocBuilder(_indexcard_rdf, messages_chunk.timestamp)
if not _docbuilder.should_skip(): # if skipped, will be deleted
_indexcard_pk = _indexcard_rdf.indexcard_id
for _doc_id, _doc in _docbuilder.build_docs():
Expand Down Expand Up @@ -254,6 +255,7 @@ class _SourcedocBuilder:
'''build elasticsearch sourcedocs for an rdf document
'''
indexcard_rdf: trove_db.IndexcardRdf
chunk_timestamp: int
indexcard: trove_db.Indexcard = dataclasses.field(init=False)
focus_iri: str = dataclasses.field(init=False)
rdfdoc: rdf.RdfTripleDictionary = dataclasses.field(init=False)
Expand All @@ -279,6 +281,7 @@ def build_docs(self) -> Iterator[tuple[str, dict]]:
yield self._doc_id(_iri), {
'card': self._card_subdoc,
'iri_value': self._iri_value_subdoc(_iri),
'chunk_timestamp': self.chunk_timestamp,
}

def _doc_id(self, value_iri=None) -> str:
Expand Down Expand Up @@ -487,13 +490,12 @@ def _cardsearch_response(
PropertypathUsage(property_path=_path, usage_count=0)
for _path in cardsearch_params.related_property_paths
)
_relatedproperty_by_path = {
_result.property_path: _result
_relatedproperty_by_pathkey = {
ts.propertypath_as_keyword(_result.property_path): _result
for _result in _relatedproperty_list
}
for _bucket in es8_response['aggregations']['agg_related_propertypath_usage']['buckets']:
_path = tuple(json.loads(_bucket['key']))
_relatedproperty_by_path[_path].usage_count += _bucket['doc_count']
_relatedproperty_by_pathkey[_bucket['key']].usage_count += _bucket['doc_count']
return CardsearchResponse(
cursor=cursor,
search_result_page=_results,
Expand Down
6 changes: 6 additions & 0 deletions share/search/messages.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import abc
import dataclasses
import enum
import functools
import logging
import time
import typing

from share.search import exceptions
Expand Down Expand Up @@ -83,6 +85,10 @@ def as_tuples(self):
target_id=target_id,
)

@functools.cached_property # cached so it's constant (and unique-enough) for an instance
def timestamp(self) -> int:
return time.time_ns()

@classmethod
def stream_chunks(
cls,
Expand Down
8 changes: 0 additions & 8 deletions share/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,6 @@ def schedule_index_backfill(self, index_backfill_pk):
.exclude(source_config__source__is_deleted=True)
.values_list('id', flat=True)
)
elif _messagetype == MessageType.BACKFILL_IDENTIFIER:
_targetid_queryset = (
trove_db.ResourceIdentifier.objects
.exclude(suid_set__source_config__disabled=True)
.exclude(suid_set__source_config__source__is_deleted=True)
.values_list('id', flat=True)
.distinct()
)
else:
raise ValueError(f'unknown backfill messagetype {_messagetype}')
_chunk_size = settings.ELASTICSEARCH['CHUNK_SIZE']
Expand Down
Loading

0 comments on commit adb4dfe

Please sign in to comment.