Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENG-6189] denorm fixes/followup #831

Merged
merged 5 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion share/search/index_strategy/_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import functools
import logging
import typing

Expand Down Expand Up @@ -63,8 +64,9 @@ def indexname_prefix(self):
def indexname_wildcard(self):
return f'{self.indexname_prefix}*'

@property
@functools.cached_property
def current_indexname(self):
self.assert_strategy_is_current()
return ''.join((
self.indexname_prefix,
self.CURRENT_STRATEGY_CHECKSUM.hexdigest,
Expand Down
10 changes: 9 additions & 1 deletion share/search/index_strategy/elastic8.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,14 @@ def before_chunk(
messages_chunk: messages.MessagesChunk,
indexnames: typing.Iterable[str],
) -> None:
pass # implement when needed
... # implement when needed

def after_chunk(
self,
messages_chunk: messages.MessagesChunk,
indexnames: typing.Iterable[str],
) -> None:
... # implement when needed

###
# helper methods for subclasses to use (or override)
Expand Down Expand Up @@ -154,6 +161,7 @@ def pls_handle_messages_chunk(self, messages_chunk):
status_code=_status,
error_text=str(_response_body),
)
self.after_chunk(messages_chunk, _indexnames)
# yield successes after the whole chunk completes
# (since one message may involve several actions)
for _messageid in _action_tracker.all_done_messages():
Expand Down
24 changes: 13 additions & 11 deletions share/search/index_strategy/trovesearch_denorm.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TrovesearchDenormIndexStrategy(Elastic8IndexStrategy):
CURRENT_STRATEGY_CHECKSUM = ChecksumIri(
checksumalgorithm_name='sha-256',
salt='TrovesearchDenormIndexStrategy',
hexdigest='fa8fe6459f658877f84620412dcab5e2e70d0c949d8977354c586dca99ff2f28',
hexdigest='e538bbc5966a6a289da9e5ba51ecde5ff29528bf07e940716ef8a888d6601916',
)

# abstract method from IndexStrategy
Expand Down Expand Up @@ -83,6 +83,7 @@ def index_mappings(self):
'properties': {
'card': {'properties': self._card_mappings()},
'iri_value': {'properties': self._iri_value_mappings()},
'chunk_timestamp': {'type': 'unsigned_long'},
},
}

Expand Down Expand Up @@ -149,24 +150,24 @@ def _paths_and_values_mappings(self):
}

# override method from Elastic8IndexStrategy
def before_chunk(self, messages_chunk: messages.MessagesChunk, indexnames: Iterable[str]):
# delete all per-value docs (to account for missing values)
def after_chunk(self, messages_chunk: messages.MessagesChunk, indexnames: Iterable[str]):
# refresh to avoid delete-by-query conflicts
self.es8_client.indices.refresh(index=','.join(indexnames))
# delete any docs that belong to cards in this chunk but weren't touched by indexing
self.es8_client.delete_by_query(
index=list(indexnames),
query={'bool': {'must': [
{'terms': {'card.card_pk': messages_chunk.target_ids_chunk}},
{'exists': {'field': 'iri_value.value_iri'}},
{'range': {'chunk_timestamp': {'lt': messages_chunk.timestamp}}},
]}},
)
# (possible optimization: instead, hold onto doc_ids and (in `after_chunk`?)
# delete_by_query excluding those)

# abstract method from Elastic8IndexStrategy
def build_elastic_actions(self, messages_chunk: messages.MessagesChunk):
_indexcard_rdf_qs = ts.latest_rdf_for_indexcard_pks(messages_chunk.target_ids_chunk)
_remaining_indexcard_pks = set(messages_chunk.target_ids_chunk)
for _indexcard_rdf in _indexcard_rdf_qs:
_docbuilder = self._SourcedocBuilder(_indexcard_rdf)
_docbuilder = self._SourcedocBuilder(_indexcard_rdf, messages_chunk.timestamp)
if not _docbuilder.should_skip(): # if skipped, will be deleted
_indexcard_pk = _indexcard_rdf.indexcard_id
for _doc_id, _doc in _docbuilder.build_docs():
Expand Down Expand Up @@ -254,6 +255,7 @@ class _SourcedocBuilder:
'''build elasticsearch sourcedocs for an rdf document
'''
indexcard_rdf: trove_db.IndexcardRdf
chunk_timestamp: int
indexcard: trove_db.Indexcard = dataclasses.field(init=False)
focus_iri: str = dataclasses.field(init=False)
rdfdoc: rdf.RdfTripleDictionary = dataclasses.field(init=False)
Expand All @@ -279,6 +281,7 @@ def build_docs(self) -> Iterator[tuple[str, dict]]:
yield self._doc_id(_iri), {
'card': self._card_subdoc,
'iri_value': self._iri_value_subdoc(_iri),
'chunk_timestamp': self.chunk_timestamp,
}

def _doc_id(self, value_iri=None) -> str:
Expand Down Expand Up @@ -487,13 +490,12 @@ def _cardsearch_response(
PropertypathUsage(property_path=_path, usage_count=0)
for _path in cardsearch_params.related_property_paths
)
_relatedproperty_by_path = {
_result.property_path: _result
_relatedproperty_by_pathkey = {
ts.propertypath_as_keyword(_result.property_path): _result
for _result in _relatedproperty_list
}
for _bucket in es8_response['aggregations']['agg_related_propertypath_usage']['buckets']:
_path = tuple(json.loads(_bucket['key']))
_relatedproperty_by_path[_path].usage_count += _bucket['doc_count']
_relatedproperty_by_pathkey[_bucket['key']].usage_count += _bucket['doc_count']
return CardsearchResponse(
cursor=cursor,
search_result_page=_results,
Expand Down
6 changes: 6 additions & 0 deletions share/search/messages.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import abc
import dataclasses
import enum
import functools
import logging
import time
import typing

from share.search import exceptions
Expand Down Expand Up @@ -83,6 +85,10 @@ def as_tuples(self):
target_id=target_id,
)

@functools.cached_property # cached so it's constant (and unique-enough) for an instance
def timestamp(self) -> int:
return time.time_ns()

@classmethod
def stream_chunks(
cls,
Expand Down
8 changes: 0 additions & 8 deletions share/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,6 @@ def schedule_index_backfill(self, index_backfill_pk):
.exclude(source_config__source__is_deleted=True)
.values_list('id', flat=True)
)
elif _messagetype == MessageType.BACKFILL_IDENTIFIER:
_targetid_queryset = (
trove_db.ResourceIdentifier.objects
.exclude(suid_set__source_config__disabled=True)
.exclude(suid_set__source_config__source__is_deleted=True)
.values_list('id', flat=True)
.distinct()
)
else:
raise ValueError(f'unknown backfill messagetype {_messagetype}')
_chunk_size = settings.ELASTICSEARCH['CHUNK_SIZE']
Expand Down
Loading
Loading