diff --git a/bingo/bingo-elastic/java/src/main/java/com/epam/indigo/elastic/ElasticStream.java b/bingo/bingo-elastic/java/src/main/java/com/epam/indigo/elastic/ElasticStream.java index 25e6eced50..dee40aeeb4 100644 --- a/bingo/bingo-elastic/java/src/main/java/com/epam/indigo/elastic/ElasticStream.java +++ b/bingo/bingo-elastic/java/src/main/java/com/epam/indigo/elastic/ElasticStream.java @@ -4,7 +4,12 @@ import com.epam.indigo.model.Helpers; import com.epam.indigo.model.IndigoRecord; import com.epam.indigo.model.NamingConstants; -import com.epam.indigo.predicate.*; +import com.epam.indigo.predicate.BaseMatch; +import com.epam.indigo.predicate.ExactMatch; +import com.epam.indigo.predicate.FilterPredicate; +import com.epam.indigo.predicate.IndigoPredicate; +import com.epam.indigo.predicate.SubstructureMatch; +import com.epam.indigo.sort.IndigoComparator; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; @@ -15,13 +20,29 @@ import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Spliterator; import java.util.function.*; -import java.util.stream.*; +import java.util.stream.Collector; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; -import static com.epam.indigo.model.NamingConstants.*; +import static com.epam.indigo.model.NamingConstants.SIM_FINGERPRINT; +import static com.epam.indigo.model.NamingConstants.SIM_FINGERPRINT_LEN; +import static com.epam.indigo.model.NamingConstants.SUB_FINGERPRINT; +import static com.epam.indigo.model.NamingConstants.SUB_FINGERPRINT_LEN; /** * Implementation of JDK Stream API @@ -32,8 +53,10 @@ public class ElasticStream implements Stream { private final RestHighLevelClient elasticClient; private final List> predicates = new ArrayList<>(); private final String indexName; - private int size = 10; - private final int MAX_ALLOWED_SIZE = 1000; + private int limit = Integer.MAX_VALUE; + private final List> comparators = new ArrayList<>(); + + private static final int BATCH_SIZE = 10000; public ElasticStream(RestHighLevelClient elasticClient, String indexName) { this.elasticClient = elasticClient; @@ -51,9 +74,9 @@ public Stream filter(Predicate predicate) { @Override public Stream limit(long maxSize) { - if (maxSize > MAX_ALLOWED_SIZE) - throw new IllegalArgumentException(String.format("Bingo Elastic max page size should be less than or equal to %1", MAX_ALLOWED_SIZE)); - this.size = (int) maxSize; + if (maxSize > Integer.MAX_VALUE) + throw new IllegalArgumentException(String.format("Bingo Elastic max page size should be less than or equal to %1$d", Integer.MAX_VALUE)); + this.limit = (int) maxSize; return this; } @@ -65,26 +88,51 @@ public boolean isParallel() { @Override public R collect(Collector collector) { A container = collector.supplier().get(); - SearchRequest searchRequest = compileRequest(); - SearchHit[] hits; - try { - SearchResponse searchResponse = this.elasticClient.search(searchRequest, RequestOptions.DEFAULT); - hits = searchResponse.getHits().getHits(); - if (NamingConstants.BINGO_REACTIONS.equals(this.indexName)) { - for (SearchHit hit : hits) { - collector.accumulator().accept(container, (T) Helpers.reactionFromElastic(hit.getId(), hit.getSourceAsMap(), hit.getScore())); - } - } else if (NamingConstants.BINGO_MOLECULES.equals(this.indexName)) { - for (SearchHit hit : hits) { - collector.accumulator().accept(container, (T) Helpers.moleculeFromElastic(hit.getId(), hit.getSourceAsMap(), hit.getScore())); + Object[] searchAfterParameters = null; + + long processedRecords = 0; + boolean continueSearch = true; + + while (continueSearch) { + int currentBatchSize = (int) Math.min(BATCH_SIZE, limit - processedRecords); + SearchRequest searchRequest = compileRequest(searchAfterParameters, currentBatchSize); + SearchResponse searchResponse; + try { + searchResponse = elasticClient.search(searchRequest, RequestOptions.DEFAULT); + } catch (IOException e) { + throw new BingoElasticException("Couldn't complete search in Elasticsearch", e); + } + + SearchHit[] hits = searchResponse.getHits().getHits(); + if (hits.length == 0) { + break; + } + + for (SearchHit hit : hits) { + if (processedRecords >= limit) { + break; } - } else { - throw new BingoElasticException("Unsupported index " + this.indexName); + T record = convertHitToRecord(hit); + collector.accumulator().accept(container, record); + processedRecords++; } - } catch (IOException e) { - throw new BingoElasticException("Couldn't complete search in Elasticsearch", e.getCause()); + + searchAfterParameters = hits[hits.length - 1].getSortValues(); + continueSearch = !this.comparators.isEmpty() && hits.length == BATCH_SIZE; + } + + return collector.finisher().apply(container); + } + + + private T convertHitToRecord(SearchHit hit) { + if (NamingConstants.BINGO_REACTIONS.equals(this.indexName)) { + return (T) Helpers.reactionFromElastic(hit.getId(), hit.getSourceAsMap(), hit.getScore()); + } else if (NamingConstants.BINGO_MOLECULES.equals(this.indexName)) { + return (T) Helpers.moleculeFromElastic(hit.getId(), hit.getSourceAsMap(), hit.getScore()); + } else { + throw new BingoElasticException("Unsupported index " + this.indexName); } - return (R) container; } private QueryBuilder[] generateClauses(List fingerprint, String field) { @@ -95,15 +143,26 @@ private QueryBuilder[] generateClauses(List fingerprint, String field) return bits; } - private SearchRequest compileRequest() { + private SearchRequest compileRequest(Object[] searchAfterParameters, int batchSize) { SearchRequest searchRequest = new SearchRequest(this.indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + boolean similarityRequested = false; boolean isEmptyFingerprint = false; + + searchSourceBuilder.size(batchSize); + + if (!comparators.isEmpty()) { + comparators.stream().map(IndigoComparator::toSortBuilder).forEach(searchSourceBuilder::sort); + searchSourceBuilder.sort(new FieldSortBuilder("_doc").order(SortOrder.ASC)); + } + + if (this.predicates.isEmpty()) { searchSourceBuilder.query(QueryBuilders.matchAllQuery()); } else { BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + Script script = null; float threshold = 0.0f; for (IndigoPredicate predicate : this.predicates) { @@ -141,13 +200,26 @@ private SearchRequest compileRequest() { } searchSourceBuilder.fetchSource(new String[]{"*"}, new String[]{SIM_FINGERPRINT, SIM_FINGERPRINT_LEN, SUB_FINGERPRINT_LEN, SUB_FINGERPRINT}); searchSourceBuilder.minScore(threshold); - searchSourceBuilder.size(this.size); searchSourceBuilder.query(QueryBuilders.scriptScoreQuery(boolQueryBuilder, script)); } + + if (searchAfterParameters != null) { + searchSourceBuilder.searchAfter(searchAfterParameters); + } + searchRequest.source(searchSourceBuilder); return searchRequest; } + @Override + public ElasticStream sorted(Comparator comparator) { + if (!(comparator instanceof IndigoComparator)) { + throw new IllegalArgumentException("Comparator used isn't an IndigoComparator"); + } + comparators.add((IndigoComparator) comparator); + return this; + } + private Script generateIdentityScore() { Map map = new HashMap<>(); map.put("source", "_score"); @@ -255,11 +327,6 @@ public Stream sorted() { throw new BingoElasticException("sorted() operation on this stream isn't implemented"); } - @Override - public Stream sorted(Comparator comparator) { - throw new BingoElasticException("sorted() operation on this stream isn't implemented"); - } - @Override public Stream peek(Consumer action) { throw new BingoElasticException("peek() operation on this stream isn't implemented"); diff --git a/bingo/bingo-elastic/java/src/main/java/com/epam/indigo/sort/FieldComparator.java b/bingo/bingo-elastic/java/src/main/java/com/epam/indigo/sort/FieldComparator.java new file mode 100644 index 0000000000..6edc4118f8 --- /dev/null +++ b/bingo/bingo-elastic/java/src/main/java/com/epam/indigo/sort/FieldComparator.java @@ -0,0 +1,27 @@ +package com.epam.indigo.sort; + +import com.epam.indigo.model.IndigoRecord; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.sort.SortOrder; + +public class FieldComparator extends IndigoComparator { + + protected String fieldName; + + public FieldComparator(final String fieldName, final SortOrder sortOrder) { + super(sortOrder); + this.fieldName = fieldName; + } + + @Override + public int compare(final T o1, final T o2) { + // does not expect to be called + return 0; + } + + @Override + public SortBuilder toSortBuilder() { + return new FieldSortBuilder(this.fieldName).order(this.sortOrder); + } +} diff --git a/bingo/bingo-elastic/java/src/main/java/com/epam/indigo/sort/IndigoComparator.java b/bingo/bingo-elastic/java/src/main/java/com/epam/indigo/sort/IndigoComparator.java new file mode 100644 index 0000000000..c0ee0fd106 --- /dev/null +++ b/bingo/bingo-elastic/java/src/main/java/com/epam/indigo/sort/IndigoComparator.java @@ -0,0 +1,19 @@ +package com.epam.indigo.sort; + +import com.epam.indigo.model.IndigoRecord; +import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.sort.SortOrder; + +import java.util.Comparator; + + +public abstract class IndigoComparator implements Comparator { + protected SortOrder sortOrder; + + public IndigoComparator(SortOrder sortOrder) { + this.sortOrder = sortOrder; + } + + public abstract SortBuilder toSortBuilder(); + +} diff --git a/bingo/bingo-elastic/java/src/main/java/com/epam/indigo/sort/ScoreComparator.java b/bingo/bingo-elastic/java/src/main/java/com/epam/indigo/sort/ScoreComparator.java new file mode 100644 index 0000000000..aa315264b7 --- /dev/null +++ b/bingo/bingo-elastic/java/src/main/java/com/epam/indigo/sort/ScoreComparator.java @@ -0,0 +1,28 @@ +package com.epam.indigo.sort; + +import com.epam.indigo.model.IndigoRecord; +import org.elasticsearch.search.sort.ScoreSortBuilder; +import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.sort.SortOrder; + +public class ScoreComparator extends IndigoComparator { + + public ScoreComparator() { + super(SortOrder.DESC); + } + + public ScoreComparator(SortOrder sortOrder) { + super(sortOrder); + } + + @Override + public SortBuilder toSortBuilder() { + return new ScoreSortBuilder().order(sortOrder); + } + + @Override + public int compare(final T o1, final T o2) { + // does not expect to be called + return 0; + } +} diff --git a/bingo/bingo-elastic/java/src/test/java/com/epam/indigo/elastic/FullUsageMoleculeTest.java b/bingo/bingo-elastic/java/src/test/java/com/epam/indigo/elastic/FullUsageMoleculeTest.java index f7bbf6cded..72156e3f46 100644 --- a/bingo/bingo-elastic/java/src/test/java/com/epam/indigo/elastic/FullUsageMoleculeTest.java +++ b/bingo/bingo-elastic/java/src/test/java/com/epam/indigo/elastic/FullUsageMoleculeTest.java @@ -187,7 +187,7 @@ public void rangeQueryWithTanimoto() { .filter(new RangeQuery<>(fieldName, 10, 100)) .collect(Collectors.toList()); - assertEquals(Math.min(10, cnt), similarRecords.size()); + assertEquals(cnt, similarRecords.size()); } catch (Exception exception) { Assertions.fail("Exception happened during test " + exception.getMessage()); } @@ -272,11 +272,11 @@ public void reactionTanimoto() { } @Test - @DisplayName("Page size of 2000 should throw exception") + @DisplayName("Page size of Integer.MAX_VALUE should throw exception") public void pageSizeOverLimit() { assertThrows(IllegalArgumentException.class, () -> repository.stream() .filter(new KeywordQuery<>("test", "test")) - .limit(2000) + .limit((long) Integer.MAX_VALUE + 1) .collect(Collectors.toList())); } } diff --git a/bingo/bingo-elastic/java/src/test/java/com/epam/indigo/elastic/SaveMoleculeFromIndigoRecordTest.java b/bingo/bingo-elastic/java/src/test/java/com/epam/indigo/elastic/SaveMoleculeFromIndigoRecordTest.java index c338bc426a..692d464da0 100644 --- a/bingo/bingo-elastic/java/src/test/java/com/epam/indigo/elastic/SaveMoleculeFromIndigoRecordTest.java +++ b/bingo/bingo-elastic/java/src/test/java/com/epam/indigo/elastic/SaveMoleculeFromIndigoRecordTest.java @@ -77,8 +77,10 @@ public void saveFromSdfFile() { Helpers.iterateSdf("src/test/resources/rand_queries_small.sdf").forEach(indigoRecordList::add); repository.indexRecords(indigoRecordList, indigoRecordList.size()); TimeUnit.SECONDS.sleep(5); - List collect = repository.stream().collect(Collectors.toList()); - assertEquals(10, collect.size()); + List fullCollection = repository.stream().collect(Collectors.toList()); + List limitCollection = repository.stream().limit(20).collect(Collectors.toList()); + assertEquals(20, limitCollection.size()); + assertEquals(371, fullCollection.size()); } catch (Exception exception) { Assertions.fail(exception); } @@ -93,7 +95,7 @@ public void saveFromCmlFile() { repository.indexRecords(indigoRecordList, indigoRecordList.size()); TimeUnit.SECONDS.sleep(5); List collect = repository.stream().collect(Collectors.toList()); - assertEquals(10, collect.size()); + assertEquals(163, collect.size()); } catch (Exception exception) { Assertions.fail(exception); }