diff --git a/CHANGELOG.md b/CHANGELOG.md index b72ea58beb..faa07fd1a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,60 @@ +# Indigo 1.12.0 +Released 2023-07-09 + +## What's Changed + +## Bugfixes +* #965 MDL Molfile v3000: when opening files containing 'Salts an Solvents', names are truncated and abbreviation is expanded +* #1036 SMILES import: general chiral specification labels (TH, AL, SP, TB, OH ) don't work +* #1051 Opening file with a superatom label saved in RXN v3000 format only the first part of the label is displayed +* #1114 Atoms of Benzene ring become Monoradicals when opened from file saved in Daylight SMARTS +* #1132 SMILES loader uninitialized heap fix +* #1102 When pasting Extended SMILES structure with stereochemistry there are two &1 centers instead of an ABS and an &1 +* #1135 C library macro - va_end() is missing before return statement. +* #1126 Segfault when iterating CDX file from USPTO downloads +* #1144 Unable to save the structure after clicking 'Save', an error appears + +## Improvements +* #1098 api: add method for copying RGroups + +**Full Changelog**: https://github.com/epam/Indigo/compare/indigo-1.11.0...indigo-1.12.0 + + +# Indigo 1.11.0 +Released 2023-06-07 + +## What's Changed + +## Features +* #1053 Split publish job in "Indigo CI" GitHub Action +* #310 Support stereo CIP calculation in Ket format +* #957 Support of Korean, Chinese and Japanese characters in Standalone. +* #995 Automated memory leaks testing + +## Bugfixes +* #1044 SVG/PNG: Reaction arrows are not visible without structures at preview and in saved files +* #932 Reagents: When opening Daylight SMILES and Extended SMILES files with reagent the original structure is distorted +* #1084 Can't open mol v3000 files with 'S-Group Properties Type = Generic' and 'S-Group Properties Type = Multiple' +* #1083 Indigo Service: enable of using Indigo Options +* #910 MDL Molfile v3000 encoding: Automatic selection of MDL Molfile v3000 encoding doesn't work if the number of atoms (or bonds) exceeds 999 +* #956 Copy Image: When inversion type is chosen in the atom's properties, it is not saved +* #955 Copy Image: Saved bonds does not have Reacting Center marks +* #1052 Set "Indigo Docker images preparation" GItHub Action to start manually only add version tag to Docker images +* #1064 Keep implicit hydrogens information in KET-format +* #1048 Memory leak in 3rd party library +* #1056 RXN2000/3000 should not serialize INDIGO_DESC fields for s-groups +* #1050 Memory leak in StringPool code +* #1031 Calculate CIP: Hovering over the label R/S displays Indigo system information +* #1049 Memory leak in the SMILES loader code +* #973 Daylight SMARTS: Error when save file in SMART format with reaction arrow and reagent +* #1017 imagoVersions is undefined +* #899 Add restrictions on size to be less than 1000 +* #1015 Cannot test CDX export with certain files +* #944 CDX import: Greek letters, Celsius and Fahrenheit signs are replaced with question marks +* #1093 python binding memory leak from 1.8.0 (and still present in 1.10.0) + +**Full Changelog**: https://github.com/epam/Indigo/compare/indigo-1.10.0...indigo-1.11.0 + # Indigo 1.10.0 Released 2023-03-22 @@ -43,7 +100,7 @@ Released 2023-01-31 * Improve ssl bingo elastic by @MysterionRise in #901 * bingo: postgres: add support for Postgres 15, drop support for Postgres 10 by @mkviatkovskii in #903 * #521: core: replace MultiMap in MoleculeRGroupsComposition class by @loimu in #917 -* #521: core: replace MultiMap in MolfileLoader class by @loimu in #911  +* #521: core: replace MultiMap in MolfileLoader class by @loimu in #911  * #929: fix auto-saving to CTAB v3000 by @mkviatkovskii in #931 **Full Changelog**: https://github.com/epam/Indigo/compare/indigo-1.8.0...indigo-1.9.0 diff --git a/bingo/bingo-elastic/python/bingo_elastic/elastic.py b/bingo/bingo-elastic/python/bingo_elastic/elastic.py index 61cf4ebcb1..2a332b7f93 100644 --- a/bingo/bingo-elastic/python/bingo_elastic/elastic.py +++ b/bingo/bingo-elastic/python/bingo_elastic/elastic.py @@ -1,7 +1,8 @@ +from __future__ import annotations + from enum import Enum from typing import ( Any, - AsyncGenerator, Dict, Generator, List, @@ -9,7 +10,7 @@ Tuple, Type, TypeVar, - Union, + Union, Awaitable, Iterable, Iterator, ) from elasticsearch import Elasticsearch @@ -34,67 +35,286 @@ ElasticRepositoryT = TypeVar("ElasticRepositoryT") -MAX_ALLOWED_SIZE = 1000 +class BingoElasticPageCriteria: + """ + Captures the criteria to make a paged query in Bingo + """ + _pit_id: Optional[str] + _page_size: int + _pit_stay_alive_minutes: int + _sort: Optional[List[Dict[str, str]]] + _search_after: Optional[List[Any]] + _query: Optional[Dict[str, Any]] + _next_page_search_after: List[Any] + + def to_json(self) -> Dict[str, Any]: + """ + Provide ability to serialize this page criteria into a JSON for REST API clients. + """ + return { + "pit_id": self._pit_id, + "page_size": self._page_size, + "stay_alive_minutes": self._pit_stay_alive_minutes, + "sort": self._sort, + "search_after": self._search_after, + "query": self._query + } -class IndexName(Enum): - def __init__(self, value): - self._value_ = value + @staticmethod + def from_json(json_dct: Dict[str, Any]) -> BingoElasticPageCriteria: + """ + Provide deserialization ability from page criteria to a JSON REST API client. + """ + _pit_id: Optional[str] = json_dct.get("pit_id") + _page_size: int = json_dct.get("page_size") + _pit_stay_alive_minutes: int = json_dct.get("stay_alive_minutes") + _sort: List[Dict[str, str]] = json_dct.get("sort") + _search_after: List[Any] = json_dct.get("search_after") + _query: Dict[str, Any] = json_dct.get("query") + return BingoElasticPageCriteria(page_size=_page_size, pit_id=_pit_id, + sort=_sort, pit_stay_alive_minutes=_pit_stay_alive_minutes, + search_after=_search_after, query=_query) + + @property + def query(self) -> Optional[Dict[str, Any]]: + """ + Get the precompiled query, which will be stored in the following pages of first page for performance. + """ + return self._query + + @property + def pit_id(self) -> Optional[str]: + """ + Get the Point In Time (PIT) query identifier. The identifier must either be blank, or must be non-expired. + """ + return self._pit_id + + @property + def page_size(self) -> int: + """ + The page size of the query total. + Cannot exceed maximum 999 (1 extra is canary for testing next page availability). + """ + return self._page_size + @property + def pit_stay_alive_minutes(self) -> int: + """ + Get the Point In Time query stay alive minutes, which will be refreshed if there is another paged query again. + Note the elasticsearch does not support floating point values. + """ + return self._pit_stay_alive_minutes + + @property + def sort_criteria(self) -> Optional[List[Dict[str, str]]]: + """ + By default, the query will be sorted by score followed by PIT shard ID as tie-breaker implicitly. + If an alternative sort order is desired, enter it here. + """ + return self._sort + + @property + def search_after(self) -> Optional[List[Any]]: + """ + The cursor of the page we are retrieving of the previous record of the first record of this page. + If this is the first page. This will be None. + """ + return self._search_after + + def __init__(self, page_size: int = 10, + pit_id: Optional[str] = None, + sort: Optional[List[Dict[str, str]]] = None, + pit_stay_alive_minutes: int = 30, + search_after: Optional[List[Any]] = None, + query: Optional[Dict[str, Any]] = None): + """ + Create custom page criteria to query any particular page with particular number of records to skip. + Note: in order to continue the query, the sort order must not be changed. + https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html + :param page_size: The page size must not exceed the elastic page limit. The elastic page limit is adjustable + by admin configuration and is usually default to 999 (with canary). + Note: there will be one extra hit to be used for canary so this should be 1 less than the limit. + :param pit_id: The PIT identifier for the current query. A fresh query with have this as None. + :param sort: The optional sort order of the paged query. If none provided, this will be sort by relevance score. + :param pit_stay_alive_minutes: the number of minutes the PIT (point in time) query will stay alive for continued + browsing. + :param search_after To continue querying, obtain the last record's sort result and append it in this parameter. + """ + if not sort: + sort = [ + {"_score": "desc"} + ] + # shard_doc in sort is implicit + self._page_size = page_size + self._pit_id = pit_id + self._sort = sort + self._pit_stay_alive_minutes = pit_stay_alive_minutes + self._search_after = search_after + self._query = query + + +class BingoElasticPageResult(Awaitable, Iterable): + """ + Result of a single page query in Bingo elastic. + """ + + _records_of_page: List[Optional[IndigoRecord]] + _current_page_criteria: BingoElasticPageCriteria + _num_hits_in_elastic: int + _num_actual_hits: int + _last_hit_sort_object: Optional[List[Any]] + _gen: Generator[IndigoRecord, None, None] + _completed_processing: bool + + def get_records(self, filter_false_positives: bool = True) -> Tuple[Optional[IndigoRecord], ...]: + """ + Get records in this page. + :param filter_false_positives: If true, the hits in elastic that are filtered out by post-processor will + not be returned. If false, null object will be returned in position where false positive had occurred. + """ + if not filter_false_positives: + return tuple(self._records_of_page) + return tuple([x for x in self._records_of_page if x is not None]) + + @property + def current_page(self) -> BingoElasticPageCriteria: + """ + Get the current page criteria for the query. + """ + return self._current_page_criteria + + @property + def num_hits_in_elastic(self) -> int: + """ + Get number of hits in this page that's in elastic but may be false-positives. + """ + return self._num_hits_in_elastic + + @property + def num_actual_hits(self) -> int: + """ + Get the number of actual hits after post-processing filter. + Note: this can be size 1 greater than original page size if the canary for next page is hit. + """ + return self._num_actual_hits + + @property + def has_next_page(self) -> bool: + """ + Use the canary to decide whether the next page is available or not. + """ + # If there isn't any hit in ELASTIC page at all for some reason (i.e. first page no result) then no next page. + if not self._completed_processing: + raise AssertionError("Cannot test next page availability using async I/O " + "without fully retrieving current page result first..") + if not self._last_hit_sort_object: + return False + return self._num_hits_in_elastic >= self._current_page_criteria.page_size + 1 + + @property + def next_page_criteria(self) -> Optional[BingoElasticPageCriteria]: + if not self.has_next_page: + return None + cur = self.current_page + return BingoElasticPageCriteria(page_size=cur.page_size, pit_id=cur.pit_id, + sort=cur.sort_criteria, pit_stay_alive_minutes=cur.pit_stay_alive_minutes, + query=cur.query, + search_after=self._last_hit_sort_object) + + def synchronized(self) -> None: + """ + Synchronize the object so we finish the processing completely + """ + if self._completed_processing: + return + for record in self.__await__(): + pass + + def __init__(self, gen: Generator[IndigoRecord, None, None], + current_page_criteria: BingoElasticPageCriteria): + self._current_page_criteria = current_page_criteria + self._gen = gen + self._records_of_page = list() + self._num_hits_in_elastic = 0 + self._num_actual_hits = 0 + self._completed_processing = False + self._last_hit_sort_object = None + + def __iter__(self) -> Iterator[Optional[IndigoRecord]]: + """ + Backward compatibility method to obtain iterator of indigo records. + We track the iterator, so it never goes back like before, to mimic its behavior. + """ + self.synchronized() + return self.get_records(filter_false_positives=False).__iter__() + + def __await__(self) -> Generator[IndigoRecord, None, None]: + for record in self._gen: + self._num_hits_in_elastic += 1 + # Avoid returning the canary in the page. + if self._num_hits_in_elastic > self._current_page_criteria.page_size: + break + # make sure we get canary of the last hit of actual last page instead of canary (must be after break) + # noinspection PyProtectedMember + self._last_hit_sort_object = self._current_page_criteria._next_page_search_after + self._records_of_page.append(record) + # If post-processing filtered it out then it's not an actual hit. + if record is not None: + self._num_actual_hits += 1 + yield record + self._completed_processing = True + + +class IndexType(Enum): BINGO_MOLECULE = "bingo-molecules" BINGO_REACTION = "bingo-reactions" - BINGO_CUSTOM = "custom-index" - - def set_value(self, new_value): - self._value_ = new_value -def get_index_name(record: IndigoRecord) -> IndexName: +def get_index_type(record: IndigoRecord) -> IndexType: if isinstance(record, IndigoRecordMolecule): - return IndexName.BINGO_MOLECULE + return IndexType.BINGO_MOLECULE if isinstance(record, IndigoRecordReaction): - return IndexName.BINGO_REACTION - if isinstance(record, str): - return IndexName.BINGO_CUSTOM + return IndexType.BINGO_REACTION raise AttributeError(f"Unknown IndigoRecord type {record}") def get_record_by_index( - response: Dict, index: str + response: Dict, index_type: IndexType ) -> Union[IndigoRecordMolecule, IndigoRecordReaction]: - if index == IndexName.BINGO_MOLECULE.value: + if index_type == IndexType.BINGO_MOLECULE: return IndigoRecordMolecule(elastic_response=response) - if index == IndexName.BINGO_REACTION.value: + if index_type == IndexType.BINGO_REACTION: return IndigoRecordReaction(elastic_response=response) - if index == IndexName.BINGO_CUSTOM.value: - return IndigoRecordMolecule(elastic_response=response) - raise AttributeError(f"Unknown index {index}") + raise AttributeError(f"Unknown index {str(index_type)}") -def elastic_repository_molecule(*args, **kwargs): - return ElasticRepository(IndexName.BINGO_MOLECULE, *args, **kwargs) +def elastic_repository_molecule(index_name: str, *args, **kwargs): + return ElasticRepository(IndexType.BINGO_MOLECULE, index_name, *args, **kwargs) -def elastic_repository_reaction(*args, **kwargs): - return ElasticRepository(IndexName.BINGO_REACTION, *args, **kwargs) +def elastic_repository_reaction(index_name: str, *args, **kwargs): + return ElasticRepository(IndexType.BINGO_REACTION, index_name, *args, **kwargs) def get_client( - *, - client_type: Type[ElasticRepositoryT], - host: Union[str, List[str]] = "localhost", - port: int = 9200, - scheme: str = "", - http_auth: Optional[Tuple[str]] = None, - ssl_context: Any = None, - request_timeout: int = 60, - retry_on_timeout: bool = True, + *, + client_type: Type[ElasticRepositoryT], + host: Union[str, List[str]] = "localhost", + port: int = 9200, + scheme: str = "", + http_auth: Optional[List[str]] = None, + ssl_context: Any = None, + request_timeout: int = 60, + timeout: int = 60, + retry_on_timeout: bool = True, ) -> ElasticRepositoryT: arguments = { "port": port, "scheme": "https" if scheme == "https" else "http", "request_timeout": request_timeout, "retry_on_timeout": retry_on_timeout, + "timeout": timeout } if isinstance(host, str): arguments["host"] = host @@ -130,8 +350,8 @@ def check_index_exception(err_: RequestError) -> None: raise err_ cause = err_.info.get("error", {}).get("root_cause", []) if ( - len(cause) == 1 - and cause[0].get("type") == "resource_already_exists_exception" + len(cause) == 1 + and cause[0].get("type") == "resource_already_exists_exception" ): return raise err_ @@ -145,7 +365,7 @@ def create_index(index_name: str, el_client: Elasticsearch) -> None: async def a_create_index( - index_name: str, el_client: "AsyncElasticsearch" + index_name: str, el_client: "AsyncElasticsearch" ) -> None: try: await el_client.indices.create(index=index_name, body=index_body) @@ -154,48 +374,55 @@ async def a_create_index( def prepare( - records: Generator[IndigoRecord, None, None] + index_type: IndexType, records: Generator[IndigoRecord, None, None] ) -> Generator[Dict, None, None]: for record in records: - # if get_index_name(record).value != index_name: - # raise ValueError( - # f"Index {index_name} doesn't support store value " - # f"of type {type(record)}" - # ) + if index_type != get_index_type(record): + raise ValueError( + f"Index {str(index_type)} doesn't support store value " + f"of type {type(record)}" + ) yield record.as_dict() -def response_to_records( - res: dict, - index_name: str, - postprocess_actions: Optional[PostprocessType] = None, - indigo_session: Optional[Indigo] = None, - options: str = "", -) -> Generator[IndigoRecord, None, None]: - for el_response in res.get("hits", {}).get("hits", []): - record = get_record_by_index(el_response, index_name) - for action_fn in postprocess_actions: # type: ignore - record = action_fn(record, indigo_session, options) # type: ignore - if not record: - continue - yield record +def get_page_result( + res: dict, + index_type: IndexType, + page_criteria: BingoElasticPageCriteria, + postprocess_actions: PostprocessType = None, + indigo_session: Indigo = None, + options: str = "", +) -> BingoElasticPageResult: + def page_result_gen() -> Generator[IndigoRecord, None, None]: + for el_response in res.get("hits", {}).get("hits", []): + record = get_record_by_index(el_response, index_type) + for action_fn in postprocess_actions: # type: ignore + record = action_fn(record, indigo_session, options) # type: ignore + if not record: + continue + yield record + + return BingoElasticPageResult(gen=page_result_gen(), current_page_criteria=page_criteria) class AsyncElasticRepository: def __init__( - self, - index_name: IndexName, - *, - host: Union[str, List[str]] = "localhost", - port: int = 9200, - scheme: str = "", - http_auth: Optional[Tuple[str]] = None, - ssl_context: Any = None, - request_timeout: int = 60, - retry_on_timeout: bool = True, + self, + index_type: IndexType, + index_name: str, + *, + host: Union[str, List[str]] = "localhost", + port: int = 9200, + scheme: str = "", + http_auth: Optional[List[str]] = None, + ssl_context: Any = None, + request_timeout: int = 60, + timeout: int = 60, + retry_on_timeout: bool = True, ) -> None: """ - :param index_name: use function get_index_name for setting this argument + :param index_type: use function get_index_name for setting this argument + :param index_name: the name of the index :param host: host or list of hosts :param port: :param scheme: http or https @@ -204,7 +431,10 @@ def __init__( :param timeout: :param retry_on_timeout: """ - self.index_name = index_name.value + self.index_type = index_type + self.index_name = index_type.value + if index_name: + self.index_name += "-" + index_name self.el_client = get_client( client_type=AsyncElasticsearch, @@ -214,6 +444,7 @@ def __init__( http_auth=http_auth, ssl_context=ssl_context, request_timeout=request_timeout, + timeout=timeout, retry_on_timeout=retry_on_timeout, ) @@ -227,39 +458,57 @@ async def index_records(self, records: Generator, chunk_size: int = 500): await a_create_index(self.index_name, self.el_client) # pylint: disable=unused-variable async for is_ok, action in async_streaming_bulk( - self.el_client, - prepare(records), - index=self.index_name, - chunk_size=chunk_size, + self.el_client, + prepare(self.index_type, records), + index=self.index_name, + chunk_size=chunk_size, ): pass async def filter( - self, - query_subject: Union[BaseMatch, IndigoObject, IndigoRecord] = None, - indigo_session: Indigo = None, - limit: int = 10, - options: str = "", - **kwargs, - ) -> AsyncGenerator[IndigoRecord, None]: - if limit > MAX_ALLOWED_SIZE: - raise ValueError( - f"limit should less or equal to {MAX_ALLOWED_SIZE}" - ) + self, + query_subject: Union[BaseMatch, IndigoObject, IndigoRecord] = None, + indigo_session: Indigo = None, + page_criteria: Optional[BingoElasticPageCriteria] = None, + options: str = "", + **kwargs, + ) -> BingoElasticPageResult: + """ + Return async page result without waiting for page's full post-processing to complete. + The client is expected to consume the Awaitable object returned by using "await object" syntax + or consume its generator directly (depending on the type of parallelism desired). + """ + # actions needed to be called on elastic_search result postprocess_actions: PostprocessType = [] - query = compile_query( + page_criteria = self.compile_query( query_subject=query_subject, - limit=limit, + page_criteria=page_criteria, postprocess_actions=postprocess_actions, **kwargs, ) - res = await self.el_client.search(index=self.index_name, body=query) - for record in response_to_records( - res, self.index_name, postprocess_actions, indigo_session, options - ): - yield record + # We must NOT specify an index name as this is inherited by PIT. + res = await self.el_client.search(body=page_criteria.query) + ret = get_page_result( + res, self.index_type, page_criteria, postprocess_actions, indigo_session, options + ) + return ret + + async def delete(self, query_subject: Union[BaseMatch, IndigoObject, IndigoRecord] = None, + limit: int = 1000, **kwargs, ) -> Dict[str, Any]: + """ + Delete documents in index by a query filter. + """ + if not self.el_client.indices.exists(index=self.index_name): + return dict() + page_criteria = self.compile_query( + query_subject=query_subject, + page_criteria=BingoElasticPageCriteria(page_size=limit-1), + is_delete_query=True, + **kwargs, + ) + return await self.el_client.delete_by_query(index=self.index_name, body=page_criteria.query, slices="auto") async def close(self) -> None: await self.el_client.close() @@ -270,22 +519,32 @@ async def __aenter__(self, *args, **kwargs) -> "AsyncElasticRepository": async def __aexit__(self, *args, **kwargs) -> None: await self.close() + def compile_query(self, query_subject: Union[BaseMatch, IndigoObject, IndigoRecord] = None, + page_criteria: Optional[BingoElasticPageCriteria] = None, + postprocess_actions: PostprocessType = None, + is_delete_query: bool = True, **kwargs, ) -> BingoElasticPageCriteria: + return _compile_query(self.index_name, self.el_client, + query_subject, page_criteria, postprocess_actions, is_delete_query, **kwargs) + class ElasticRepository: def __init__( - self, - index_name: IndexName, - *, - host: Union[str, List[str]] = "localhost", - port: int = 9200, - scheme: str = "", - http_auth: Optional[Tuple[str]] = None, - ssl_context: Any = None, - request_timeout: int = 60, - retry_on_timeout: bool = True, + self, + index_type: IndexType, + index_name: str, + *, + host: Union[str, List[str]] = "localhost", + port: int = 9200, + scheme: str = "", + http_auth: Optional[List[str]] = None, + ssl_context: Any = None, + request_timeout: int = 60, + timeout: int = 60, + retry_on_timeout: bool = True, ) -> None: """ - :param index_name: use function get_index_name for setting this argument + :param index_type: use function get_index_name for setting this argument + :param index_name: the name of this index after index type. :param host: host or list of hosts :param port: :param scheme: http or https @@ -294,7 +553,10 @@ def __init__( :param timeout: :param retry_on_timeout: """ - self.index_name = index_name.value + self.index_type = index_type + self.index_name = index_type.value + if index_name: + self.index_name += "-" + index_name self.el_client = get_client( client_type=Elasticsearch, @@ -304,6 +566,7 @@ def __init__( http_auth=http_auth, ssl_context=ssl_context, request_timeout=request_timeout, + timeout=timeout, retry_on_timeout=retry_on_timeout, ) @@ -317,10 +580,10 @@ def index_records(self, records: Generator, chunk_size: int = 500): create_index(self.index_name, self.el_client) # pylint: disable=unused-variable for is_ok, action in streaming_bulk( - self.el_client, - prepare(records), - index=self.index_name, - chunk_size=chunk_size, + self.el_client, + prepare(self.index_type, records), + index=self.index_name, + chunk_size=chunk_size, ): pass @@ -331,62 +594,123 @@ def delete_all_records(self): pass def filter( - self, - query_subject: Union[BaseMatch, IndigoObject, IndigoRecord] = None, - indigo_session: Indigo = None, - limit: int = 10, - options: str = "", - **kwargs, - ) -> Generator[IndigoRecord, None, None]: - if limit > MAX_ALLOWED_SIZE: - raise ValueError( - f"limit should less or equal to {MAX_ALLOWED_SIZE}" - ) + self, + query_subject: Union[BaseMatch, IndigoObject, IndigoRecord] = None, + indigo_session: Indigo = None, + page_criteria: Optional[BingoElasticPageCriteria] = None, + options: str = "", + **kwargs, + ) -> BingoElasticPageResult: # actions needed to be called on elastic_search result postprocess_actions: PostprocessType = [] - query = compile_query( + page_criteria = self.compile_query( query_subject=query_subject, - limit=limit, + page_criteria=page_criteria, postprocess_actions=postprocess_actions, **kwargs, ) - res = self.el_client.search(index=self.index_name, body=query) - yield from response_to_records( - res, self.index_name, postprocess_actions, indigo_session, options + # We must NOT specify an index name as this is inherited by PIT. + res = self.el_client.search(body=page_criteria.query) + ret = get_page_result( + res, self.index_type, page_criteria, postprocess_actions, indigo_session, options ) + ret.synchronized() + return ret - -def compile_query( - query_subject: Union[BaseMatch, IndigoObject, IndigoRecord] = None, - limit: int = 10, - postprocess_actions: Optional[PostprocessType] = None, - **kwargs, -) -> Dict: - query = { - "size": limit, - "_source": { - "includes": ["*"], - "excludes": [ - "sim_fingerprint", - "sim_fingerprint_len", - "sub_fingerprint_len", - "sub_fingerprint", - ], - }, - } - - if isinstance(query_subject, BaseMatch): - query_subject.compile(query, postprocess_actions) - elif isinstance(query_subject, IndigoRecord): - query_factory("exact", query_subject).compile( - query, postprocess_actions - ) - elif isinstance(query_subject, IndigoObject): - query_factory("substructure", query_subject).compile( - query, postprocess_actions + def delete(self, + query_subject: Union[BaseMatch, IndigoObject, IndigoRecord] = None, + limit: int = 1000, + **kwargs, ) -> Dict[str, Any]: + """ + Delete documents in index by a query filter. + """ + if not self.el_client.indices.exists(index=self.index_name): + return dict() + page_criteria = self.compile_query( + query_subject=query_subject, + page_criteria=BingoElasticPageCriteria(page_size=limit-1), + is_delete_query=True, + **kwargs, ) + return self.el_client.delete_by_query(index=self.index_name, body=page_criteria.query, + slices="auto") + + def compile_query(self, query_subject: Union[BaseMatch, IndigoObject, IndigoRecord] = None, + page_criteria: Optional[BingoElasticPageCriteria] = None, + postprocess_actions: PostprocessType = None, + is_delete_query: bool = False, **kwargs, ) -> BingoElasticPageCriteria: + return _compile_query(self.index_name, self.el_client, + query_subject, page_criteria, postprocess_actions, is_delete_query, **kwargs) + + +def _compile_query(index_name: str, el_client: ElasticRepositoryT, + query_subject: Union[BaseMatch, IndigoObject, IndigoRecord] = None, + page_criteria: Optional[BingoElasticPageCriteria] = None, + postprocess_actions: PostprocessType = None, + is_delete_query: bool = False, + **kwargs, + ) -> BingoElasticPageCriteria: + # record last elastic hit's sort object, regardless of its post-process filtering status. + if postprocess_actions is None: + postprocess_actions = [] + if page_criteria is None: + page_criteria = BingoElasticPageCriteria() + if not page_criteria.pit_id and not is_delete_query: + pit_result = el_client.open_point_in_time(index=index_name, + keep_alive=str(page_criteria.pit_stay_alive_minutes) + "m") + pit_id: str = pit_result["id"] + page_criteria._pit_id = pit_id + + def page_processing_routine(record: IndigoRecord, indigo: Indigo, options: str) -> Optional[IndigoRecord]: + # This is the first post-processing action always, so it shouldn't return None + assert record is not None + page_criteria._next_page_search_after = record.sort + return record + + if not is_delete_query: + postprocess_actions.insert(0, page_processing_routine) + + query: Dict[str, Any] + if not page_criteria.query: + query = { + "size": page_criteria.page_size + 1, + "_source": { + "includes": ["*"], + "excludes": [ + "sim_fingerprint", + "sim_fingerprint_len", + "sub_fingerprint_len", + "sub_fingerprint", + ], + }, + # Sort is necessary for paging. + "sort": page_criteria.sort_criteria + } + if not is_delete_query: + query["pit"] = { + "id": page_criteria.pit_id, + "keep_alive": str(page_criteria.pit_stay_alive_minutes) + "m" + } + + if isinstance(query_subject, BaseMatch): + query_subject.compile(query, postprocess_actions) + elif isinstance(query_subject, IndigoRecord): + query_factory("exact", query_subject).compile( + query, postprocess_actions + ) + elif isinstance(query_subject, IndigoObject): + query_subject.aromatize() + query_factory("substructure", query_subject).compile( + query, postprocess_actions + ) - for key, value in kwargs.items(): - query_factory(key, value).compile(query) - - return query + for key, value in kwargs.items(): + query_factory(key, value).compile(query) + else: + # We only bother to compile the query if this is the first page. Otherwise, we use the same query as before. + query = page_criteria.query + # But regardless of which page, we will overwrite search_after criteria if specified. + if page_criteria.search_after: + query["search_after"] = page_criteria.search_after + page_criteria._query = query + return page_criteria diff --git a/bingo/bingo-elastic/python/bingo_elastic/model/record.py b/bingo/bingo-elastic/python/bingo_elastic/model/record.py index f720b204aa..11396f5d28 100644 --- a/bingo/bingo-elastic/python/bingo_elastic/model/record.py +++ b/bingo/bingo-elastic/python/bingo_elastic/model/record.py @@ -28,6 +28,7 @@ def __set__(self, instance: IndigoRecord, value: Dict): el_src = value["_source"] for arg, val in el_src.items(): setattr(instance, arg, val) + setattr(instance, "_sort", value.get("sort")) class WithIndigoObject: @@ -108,6 +109,12 @@ class IndigoRecord: elastic_response = WithElasticResponse() record_id: Optional[str] = None error_handler: Optional[Callable[[object, BaseException], None]] = None + # Sort for page cursor + _sort: list + + @property + def sort(self) -> list: + return self._sort def __init__(self, **kwargs) -> None: """ diff --git a/bingo/bingo-elastic/python/bingo_elastic/queries.py b/bingo/bingo-elastic/python/bingo_elastic/queries.py index 7da47cd063..580c86a0a1 100644 --- a/bingo/bingo-elastic/python/bingo_elastic/queries.py +++ b/bingo/bingo-elastic/python/bingo_elastic/queries.py @@ -47,23 +47,60 @@ def compile( class KeywordQuery(CompilableQuery): - def __init__(self, value: str): + is_substructure: bool + + def __init__(self, value: str, is_substructure: bool = False): self._value = value + self.is_substructure = is_substructure def compile( self, query: Dict, postprocess_actions: Optional[PostprocessType] = None, ) -> None: - bool_head = head_by_path( - query, ("query", "script_score", "query", "bool") - ) - if not bool_head.get("must"): - bool_head["must"] = [] - bool_head["must"].append( + if self.is_substructure: + bool_head = head_by_path(query, ("query", "bool")) + else: + bool_head = head_by_path(query, ("query", "script_score", "query", "bool")) + parent_term = "must" + if not bool_head.get(parent_term): + bool_head[parent_term] = [] + bool_head[parent_term].append( {"match": {self.field: {"query": self._value, "boost": 0}}} ) - default_script_score(query) + if not self.is_substructure: + default_script_score(query) + + +class TermQuery(CompilableQuery): + """ + Result must match at least one result in the querying field. + """ + def __init__(self, value_list: List[str], is_substructure: bool = False): + self._value = value_list + self.is_substructure = is_substructure + + def compile( + self, query: Dict, postprocess_actions: PostprocessType = None + ) -> None: + if self.is_substructure: + bool_head = head_by_path(query, ("query", "bool")) + else: + bool_head = head_by_path(query, ("query", "script_score", "query", "bool")) + parent_term = "filter" + field_name = str(self.field) + ".keyword" + if not bool_head.get(parent_term): + bool_head[parent_term] = [] + if len(self._value) > 1: + bool_head[parent_term].append({ + "terms": {field_name: self._value} + }) + else: + bool_head[parent_term].append({ + "term": {field_name: self._value[0]} + }) + if not self.is_substructure: + default_script_score(query) class SubstructureQuery(CompilableQuery): @@ -79,6 +116,7 @@ def postprocess( return None mol = record.as_indigo_object(indigo) + mol.aromatize() matcher = indigo.substructureMatcher(mol, options) if matcher.match(self._value): diff --git a/bingo/bingo-elastic/python/bingo_elastic/utils.py b/bingo/bingo-elastic/python/bingo_elastic/utils.py index 6e77bb002f..22ab2f67b0 100644 --- a/bingo/bingo-elastic/python/bingo_elastic/utils.py +++ b/bingo/bingo-elastic/python/bingo_elastic/utils.py @@ -5,7 +5,7 @@ from bingo_elastic.model.record import IndigoRecord PostprocessType = List[ - Callable[[IndigoRecord, Indigo], Optional[IndigoRecord]] + Callable[[IndigoRecord, Indigo, str], Optional[IndigoRecord]] ]