Skip to content

Commit

Permalink
MB-59102: Merging kNN hits from a distributed index with exact search…
Browse files Browse the repository at this point in the history
… hits (#1936)

- Reverts commit #1910, which
was an earlier attempt to address this issue.
- Implements the PreSearch Construct in Bleve alias search, enabling a
preliminary query to collect metadata from all alias indexes before
executing the main search query in MultiSearch. PreSearch gathers KNN
results from all alias indexes, selecting the top K results. This
facilitates the main Bleve Query to operate within the context of
documents that matched the KNN query, ensuring seamless functionality of
existing Bleve constructs such as Faceting, Sorting, Pagination,
SearchAfter, and SearchBefore.
- Introduces the KNN Collector construct to merge and obtain accurate
Top K results from multiple Zap Segments' KNN results.
- Enhances KNN Unit Tests for greater generality.
- Addresses an issue where errors generated within the Top N Document
handler were being discarded.
- Resolves an issue where document matches failing to meet the
SearchAfter clause weren't being returned to the pool.
  • Loading branch information
CascadingRadium authored Dec 15, 2023
1 parent 2962697 commit 138fec5
Show file tree
Hide file tree
Showing 19 changed files with 1,254 additions and 1,130 deletions.
2 changes: 2 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
ErrorUnknownIndexType
ErrorEmptyID
ErrorIndexReadInconsistency
ErrorPreSearchFailed
)

// Error represents a more strongly typed bleve error for detecting
Expand All @@ -47,4 +48,5 @@ var errorMessages = map[Error]string{
ErrorUnknownIndexType: "unknown index type",
ErrorEmptyID: "document ID cannot be empty",
ErrorIndexReadInconsistency: "index read inconsistency detected",
ErrorPreSearchFailed: "index pre-search failed",
}
195 changes: 177 additions & 18 deletions index_alias_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/blevesearch/bleve/v2/mapping"
"github.com/blevesearch/bleve/v2/search"
"github.com/blevesearch/bleve/v2/search/query"
index "github.com/blevesearch/bleve_index_api"
)

Expand Down Expand Up @@ -160,12 +161,58 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
if len(i.indexes) < 1 {
return nil, ErrorAliasEmpty
}
if _, ok := ctx.Value(search.PreSearchKey).(bool); ok {
// since presearchKey is set, it means that the request
// is being executed as part of a presearch, which
// indicates that this index alias is set as an Index
// in another alias, so we need to do a presearch search
// and NOT a real search
return PreSearchDataSearch(ctx, req, i.indexes...)
}

// at this point we know we are doing a real search
// either after a presearch is done, or directly
// on the alias

// check if request has preSearchData which would indicate that the
// request has already been preSearched and we can skip the
// preSearch step now, we call an optional function to
// redistribute the preSearchData to the individual indexes
// if necessary
var preSearchData []map[string]interface{}
if req.PreSearchData != nil {
if requestHasKNN(req) {
preSearchData = make([]map[string]interface{}, len(i.indexes))
for i := 0; i < len(preSearchData); i++ {
preSearchData[i] = make(map[string]interface{})
}
redistributeKNNPreSearchData(req, preSearchData)
}
}

// short circuit the simple case
if len(i.indexes) == 1 {
if preSearchData != nil {
req.PreSearchData = preSearchData[0]
}
return i.indexes[0].SearchInContext(ctx, req)
}
return MultiSearch(ctx, req, i.indexes...)

// at this stage we know we have multiple indexes
// check if preSearchData needs to be gathered from all indexes
// before executing the query
var err error
// only perform presearch if
// - the request does not already have preSearchData
// - the request requires presearch
if req.PreSearchData == nil && PreSearchRequired(ctx, req) {
preSearchData, err = PreSearch(ctx, req, i.indexes...)
if err != nil {
return nil, err
}
}

return MultiSearch(ctx, req, preSearchData, i.indexes...)
}

func (i *indexAliasImpl) Fields() ([]string, error) {
Expand Down Expand Up @@ -428,19 +475,136 @@ func (i *indexAliasImpl) Swap(in, out []Index) {
// the actual final results.
// Perhaps that part needs to be optional,
// could be slower in remote usages.
func createChildSearchRequest(req *SearchRequest) *SearchRequest {
return copySearchRequest(req)
func createChildSearchRequest(req *SearchRequest, preSearchData map[string]interface{}) *SearchRequest {
return copySearchRequest(req, preSearchData)
}

type asyncSearchResult struct {
Name string
Result *SearchResult
Err error
Name string
IndexNum int
Result *SearchResult
Err error
}

func PreSearchRequired(ctx context.Context, req *SearchRequest) bool {
return requestHasKNN(req)
}

func PreSearch(ctx context.Context, req *SearchRequest, indexes ...Index) ([]map[string]interface{}, error) {
// create a dummy request with a match none query
// since we only care about the preSearchData in PreSearch
dummyRequest := &SearchRequest{
Query: query.NewMatchNoneQuery(),
}
newCtx := context.WithValue(ctx, search.PreSearchKey, true)
if requestHasKNN(req) {
addKnnToDummyRequest(dummyRequest, req)
}
res, err := PreSearchDataSearch(newCtx, dummyRequest, indexes...)
if err != nil {
return nil, err
}
preSearchData, err := mergePreSearchData(req, res, len(indexes))
if err != nil {
return nil, err
}
return preSearchData, nil
}

func tagHitsWithIndexNum(sr *SearchResult, indexNum int) {
for _, hit := range sr.Hits {
hit.IndexId = append(hit.IndexId, indexNum)
}
}

func mergePreSearchData(req *SearchRequest, res *SearchResult, numIndexes int) ([]map[string]interface{}, error) {
mergedOut := make([]map[string]interface{}, numIndexes)
for i := 0; i < len(mergedOut); i++ {
mergedOut[i] = make(map[string]interface{})
}
if requestHasKNN(req) {
mergeKNNDocumentMatches(req, res.Hits, mergedOut)
}
return mergedOut, nil
}

func PreSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
searchStart := time.Now()
asyncResults := make(chan *asyncSearchResult, len(indexes))

// run search on each index in separate go routine
var waitGroup sync.WaitGroup

var searchChildIndex = func(in Index, childReq *SearchRequest, idx int) {
rv := asyncSearchResult{Name: in.Name(), IndexNum: idx}
rv.Result, rv.Err = in.SearchInContext(ctx, childReq)
asyncResults <- &rv
waitGroup.Done()
}

waitGroup.Add(len(indexes))
for idx, in := range indexes {
go searchChildIndex(in, createChildSearchRequest(req, nil), idx)
}

// on another go routine, close after finished
go func() {
waitGroup.Wait()
close(asyncResults)
}()

var sr *SearchResult
indexErrors := make(map[string]error)

for asr := range asyncResults {
if asr.Err == nil {
if sr == nil {
// first result
sr = asr.Result
tagHitsWithIndexNum(sr, asr.IndexNum)
} else {
// merge with previous
tagHitsWithIndexNum(asr.Result, asr.IndexNum)
sr.Merge(asr.Result)
}
} else {
indexErrors[asr.Name] = asr.Err
}
}

// merge just concatenated all the hits
// now lets clean it up

// handle case where no results were successful
if sr == nil {
sr = &SearchResult{
Status: &SearchStatus{
Errors: make(map[string]error),
},
}
}

searchDuration := time.Since(searchStart)
sr.Took = searchDuration

// fix up errors
if len(indexErrors) > 0 {
if sr.Status.Errors == nil {
sr.Status.Errors = make(map[string]error)
}
for indexName, indexErr := range indexErrors {
sr.Status.Errors[indexName] = indexErr
sr.Status.Total++
sr.Status.Failed++
}
}

return sr, nil
}

// MultiSearch executes a SearchRequest across multiple Index objects,
// then merges the results. The indexes must honor any ctx deadline.
func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
func MultiSearch(ctx context.Context, req *SearchRequest, preSearchData []map[string]interface{}, indexes ...Index) (*SearchResult, error) {

searchStart := time.Now()
asyncResults := make(chan *asyncSearchResult, len(indexes))
Expand All @@ -452,10 +616,6 @@ func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*Se
req.SearchAfter = req.SearchBefore
req.SearchBefore = nil
}
originalSize := req.Size
if len(indexes) > 1 {
req.Size = adjustRequestSizeForKNN(req, len(indexes))
}

// run search on each index in separate go routine
var waitGroup sync.WaitGroup
Expand All @@ -468,8 +628,12 @@ func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*Se
}

waitGroup.Add(len(indexes))
for _, in := range indexes {
go searchChildIndex(in, createChildSearchRequest(req))
for idx, in := range indexes {
var md map[string]interface{}
if preSearchData != nil {
md = preSearchData[idx]
}
go searchChildIndex(in, createChildSearchRequest(req, md))
}

// on another go routine, close after finished
Expand All @@ -494,7 +658,6 @@ func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*Se
indexErrors[asr.Name] = asr.Err
}
}
req.Size = originalSize

// merge just concatenated all the hits
// now lets clean it up
Expand All @@ -508,10 +671,6 @@ func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*Se
}
}

if len(indexes) > 1 {
mergeKNNResults(req, sr)
}

sortFunc := req.SortFunc()
// sort all hits with the requested order
if len(req.Sort) > 0 {
Expand Down
18 changes: 9 additions & 9 deletions index_alias_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ func TestMultiSearchNoError(t *testing.T) {
MaxScore: 2.0,
}

results, err := MultiSearch(context.Background(), sr, ei1, ei2)
results, err := MultiSearch(context.Background(), sr, nil, ei1, ei2)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -625,7 +625,7 @@ func TestMultiSearchSomeError(t *testing.T) {
}}
ei2 := &stubIndex{name: "ei2", err: fmt.Errorf("deliberate error")}
sr := NewSearchRequest(NewTermQuery("test"))
res, err := MultiSearch(context.Background(), sr, ei1, ei2)
res, err := MultiSearch(context.Background(), sr, nil, ei1, ei2)
if err != nil {
t.Errorf("expected no error, got %v", err)
}
Expand All @@ -652,7 +652,7 @@ func TestMultiSearchAllError(t *testing.T) {
ei1 := &stubIndex{name: "ei1", err: fmt.Errorf("deliberate error")}
ei2 := &stubIndex{name: "ei2", err: fmt.Errorf("deliberate error")}
sr := NewSearchRequest(NewTermQuery("test"))
res, err := MultiSearch(context.Background(), sr, ei1, ei2)
res, err := MultiSearch(context.Background(), sr, nil, ei1, ei2)
if err != nil {
t.Errorf("expected no error, got %v", err)
}
Expand Down Expand Up @@ -708,7 +708,7 @@ func TestMultiSearchSecondPage(t *testing.T) {
checkRequest: checkRequest,
}
sr := NewSearchRequestOptions(NewTermQuery("test"), 10, 10, false)
_, err := MultiSearch(context.Background(), sr, ei1, ei2)
_, err := MultiSearch(context.Background(), sr, nil, ei1, ei2)
if err != nil {
t.Errorf("unexpected error %v", err)
}
Expand Down Expand Up @@ -786,7 +786,7 @@ func TestMultiSearchTimeout(t *testing.T) {
defer cancel()
query := NewTermQuery("test")
sr := NewSearchRequest(query)
res, err := MultiSearch(ctx, sr, ei1, ei2)
res, err := MultiSearch(ctx, sr, nil, ei1, ei2)
if err != nil {
t.Errorf("expected no error, got %v", err)
}
Expand All @@ -806,7 +806,7 @@ func TestMultiSearchTimeout(t *testing.T) {
// now run a search again with an absurdly low timeout (should timeout)
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Microsecond)
defer cancel()
res, err = MultiSearch(ctx, sr, ei1, ei2)
res, err = MultiSearch(ctx, sr, nil, ei1, ei2)
if err != nil {
t.Errorf("expected no error, got %v", err)
}
Expand All @@ -833,7 +833,7 @@ func TestMultiSearchTimeout(t *testing.T) {
// now run a search again with a normal timeout, but cancel it first
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
cancel()
res, err = MultiSearch(ctx, sr, ei1, ei2)
res, err = MultiSearch(ctx, sr, nil, ei1, ei2)
if err != nil {
t.Errorf("expected no error, got %v", err)
}
Expand Down Expand Up @@ -968,7 +968,7 @@ func TestMultiSearchTimeoutPartial(t *testing.T) {
MaxScore: 2.0,
}

res, err := MultiSearch(ctx, sr, ei1, ei2, ei3)
res, err := MultiSearch(ctx, sr, nil, ei1, ei2, ei3)
if err != nil {
t.Fatalf("expected no err, got %v", err)
}
Expand Down Expand Up @@ -1222,7 +1222,7 @@ func TestMultiSearchCustomSort(t *testing.T) {
MaxScore: 3.0,
}

results, err := MultiSearch(context.Background(), sr, ei1, ei2)
results, err := MultiSearch(context.Background(), sr, nil, ei1, ei2)
if err != nil {
t.Error(err)
}
Expand Down
Loading

0 comments on commit 138fec5

Please sign in to comment.