Skip to content

Commit

Permalink
add fallback to filter Exp for incorrect secondary index
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr committed Jan 5, 2025
1 parent 992c194 commit b57a3cb
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1547,10 +1547,12 @@ public Filter sIndexFilter(Map<QualifierKey, Object> qualifierMap) {
/**
* FilterOperations that require both sIndexFilter and FilterExpression
*/
protected static final List<FilterOperation> dualFilterOperations = Arrays.asList(
MAP_VAL_EQ_BY_KEY, MAP_VAL_GT_BY_KEY, MAP_VAL_GTEQ_BY_KEY, MAP_VAL_LT_BY_KEY, MAP_VAL_LTEQ_BY_KEY,
MAP_VAL_BETWEEN_BY_KEY, MAP_KEYS_BETWEEN, MAP_VAL_BETWEEN
);
protected static final List<FilterOperation> dualFilterOperations =
// Arrays.asList(
// MAP_VAL_EQ_BY_KEY, MAP_VAL_GT_BY_KEY, MAP_VAL_GTEQ_BY_KEY, MAP_VAL_LT_BY_KEY, MAP_VAL_LTEQ_BY_KEY,
// MAP_VAL_BETWEEN_BY_KEY, MAP_KEYS_BETWEEN, MAP_VAL_BETWEEN
// );
Arrays.stream(FilterOperation.values()).toList();

@SuppressWarnings("unchecked")
private static Exp processMetadataFieldInOrNot(Map<QualifierKey, Object> qualifierMap, boolean notIn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.springframework.data.aerospike.query;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
Expand All @@ -32,6 +33,14 @@
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.lang.Nullable;

import java.util.List;

import static com.aerospike.client.ResultCode.INDEX_GENERIC;
import static com.aerospike.client.ResultCode.INDEX_MAXCOUNT;
import static com.aerospike.client.ResultCode.INDEX_NAME_MAXLEN;
import static com.aerospike.client.ResultCode.INDEX_NOTFOUND;
import static com.aerospike.client.ResultCode.INDEX_NOTREADABLE;
import static com.aerospike.client.ResultCode.INDEX_OOM;
import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;

/**
Expand All @@ -47,6 +56,8 @@ public class QueryEngine {
"Query without a filter will initiate a scan. Since scans are potentially dangerous operations, they are " +
"disabled by default in spring-data-aerospike. " +
"If you still need to use them, enable them via `scans-enabled` property.";
public static final List<Integer> SEC_INDEX_ERROR_RESULT_CODES = List.of(
INDEX_NOTFOUND, INDEX_OOM, INDEX_NOTREADABLE, INDEX_GENERIC, INDEX_NAME_MAXLEN, INDEX_MAXCOUNT);
private final IAerospikeClient client;
@Getter
private final StatementBuilder statementBuilder;
Expand Down Expand Up @@ -110,8 +121,18 @@ public KeyRecordIterator select(String namespace, String set, String[] binNames,
throw new IllegalStateException(SCANS_DISABLED_MESSAGE);
}

RecordSet rs = client.query(localQueryPolicy, statement);
return new KeyRecordIterator(namespace, rs);
try {
RecordSet rs = client.query(localQueryPolicy, statement);
return new KeyRecordIterator(namespace, rs);
} catch (AerospikeException e) {
if (statement.getFilter() != null && SEC_INDEX_ERROR_RESULT_CODES.contains(e.getResultCode())) {
// retry without sIndex filter
statement.setFilter(null);
RecordSet rs = client.query(localQueryPolicy, statement);
return new KeyRecordIterator(namespace, rs);
}
throw e;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.springframework.data.aerospike.query;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
Expand All @@ -32,6 +33,7 @@
import reactor.core.publisher.Mono;

import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;
import static org.springframework.data.aerospike.query.QueryEngine.SEC_INDEX_ERROR_RESULT_CODES;

/**
* This class provides a multi-filter reactive query engine that augments the query capability in Aerospike.
Expand Down Expand Up @@ -103,7 +105,19 @@ public Flux<KeyRecord> select(String namespace, String set, String[] binNames, @
return Flux.error(new IllegalStateException(QueryEngine.SCANS_DISABLED_MESSAGE));
}

return client.query(localQueryPolicy, statement);
return client.query(localQueryPolicy, statement)
.onErrorResume(throwable -> {
if (throwable instanceof AerospikeException ae
&& statement.getFilter() != null
&& SEC_INDEX_ERROR_RESULT_CODES.contains(ae.getResultCode()))
{
// retry without sIndex filter
statement.setFilter(null);
return client.query(localQueryPolicy, statement);
}
// for other exceptions
return Mono.error(throwable);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -131,19 +132,20 @@ private void setFilterFromSingleQualifier(Statement stmt, Qualifier qualifier) {
}

private boolean isIndexedBin(Statement stmt, Qualifier qualifier) {
boolean hasIndex = false, hasField = false;
List<Index> indexesForField = new ArrayList<>();
boolean hasField = false;
if (StringUtils.hasLength(qualifier.getBinName())) {
hasField = true;
hasIndex = indexesCache.hasIndexFor(
indexesForField = indexesCache.getAllIndexesForField(
new IndexedField(stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName())
);
}

if (log.isDebugEnabled() && hasField) {
log.debug("Qualifier #{}, bin {}.{}.{} has secondary index: {}",
qualifier.hashCode(), stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName(), hasIndex);
log.debug("Qualifier #{}, bin {}.{}.{} has {} secondary index(es)",
qualifier.hashCode(), stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName(), indexesForField.size());
}
return hasIndex;
return !indexesForField.isEmpty();
}

private int getMinBinValuesRatioForQualifier(Statement stmt, Qualifier qualifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected Qualifier(IQualifierBuilder builder) {
}
}

protected Qualifier(Qualifier qualifier) {
public Qualifier(Qualifier qualifier) {
if (!qualifier.getImmutableMap().isEmpty()) {
internalMap.putAll(qualifier.getImmutableMap());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.springframework.data.aerospike.core.reactive;

import org.junit.jupiter.api.BeforeAll;
import com.aerospike.client.query.IndexType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
Expand Down Expand Up @@ -31,12 +31,6 @@
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ReactiveAerospikeTemplateFindByQueryTests extends BaseReactiveIntegrationTests {

@BeforeAll
public void beforeAllSetUp() {
additionalAerospikeTestOperations.deleteAllAndVerify(Person.class);
additionalAerospikeTestOperations.deleteAllAndVerify(Person.class, OVERRIDE_SET_NAME);
}

@Override
@BeforeEach
public void setUp() {
Expand All @@ -45,6 +39,19 @@ public void setUp() {
super.setUp();
}

@Test
public void findWithFilterEqual_String_fallbackToFilterExp() {
reactiveTemplate.createIndex(Person.class, "person_first_name_index_numeric", "firstName",
IndexType.NUMERIC).block(); // incompatible secondary index (should be STRING) causes "index not found" exception
Query query = QueryUtils.createQueryForMethodWithArgs(serverVersionSupport, "findByFirstName", "Dave");
reactiveTemplate.insert(new Person(nextId(), "Dave", "Matthews")).block();
// after getting index exception there is a fallback to filter exp only
List<Person> result = reactiveTemplate.find(query, Person.class).collectList().block();
assertThat(Objects.requireNonNull(result).stream().map(Person::getFirstName).collect(Collectors.toList()))
.containsExactly("Dave");
reactiveTemplate.deleteIndex(Person.class, "person_first_name_index_numeric").block(); // incompatible secondary index (should be STRING) causes "index not found" exception
}

@Test
public void findAll_OrderByFirstName() {
List<Person> persons = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void afterAll() {
template.deleteAll(allPersons);
template.deleteAll(allPersons, OVERRIDE_SET_NAME);
additionalAerospikeTestOperations.dropIndex(Person.class, "person_first_name_index");
additionalAerospikeTestOperations.dropIndex(Person.class, "person_byte_array_index");
}

@Test
Expand All @@ -112,6 +113,17 @@ public void findWithFilterEqual_String() {
assertThat(result).containsOnly(dave);
}

@Test
public void findWithFilterEqual_String_fallbackToFilterExp() {
additionalAerospikeTestOperations.createIndex(Person.class, "person_first_name_index_numeric", "firstName",
IndexType.NUMERIC); // incompatible secondary index (should be STRING) causes "index not found" exception
Query query = QueryUtils.createQueryForMethodWithArgs(serverVersionSupport, "findByFirstName", "Dave");
// after getting index exception there is a fallback to filter exp only
Stream<Person> result = template.find(query, Person.class);
assertThat(result).containsOnly(dave);
additionalAerospikeTestOperations.dropIndex(Person.class, "person_first_name_index_numeric");
}

@Test
public void findWithFilterEqual_ByteArray() {
if (serverVersionSupport.isServerVersionGtOrEq7()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void binIsIndexed() {

// 3 events: Created query, Bin has secondary index, Secondary index filter is not set
assertThat(memoryAppender.countEventsForLogger(LOGGER_NAME)).isEqualTo(3);
String msg = "bin TEST.testSet.testField has secondary index: false";
String msg = "bin TEST.testSet.testField has 0 secondary index(es)";
assertThat(memoryAppender.search(msg, Level.DEBUG).size()).isEqualTo(1);
assertThat(memoryAppender.contains(msg, Level.INFO)).isFalse();
}
Expand Down

0 comments on commit b57a3cb

Please sign in to comment.