Skip to content

Commit

Permalink
Presearch Code Refactor (#2112)
Browse files Browse the repository at this point in the history
- Refactor the presearch code path to make it more generic and
extensible.
- Add an ExtractFields API for obtaining the set of fields applicable to
a generic query.
- Add support for a new API to set the index mapping for an alias.
This is useful when an alias contains partitions of the same index, as
the index
mapping would be consistent across all indexes and can be inferred
directly from
  the alias.
  • Loading branch information
CascadingRadium authored Dec 17, 2024
1 parent be61628 commit ff6ba91
Show file tree
Hide file tree
Showing 7 changed files with 467 additions and 68 deletions.
131 changes: 108 additions & 23 deletions index_alias_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package bleve

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -31,6 +32,10 @@ type indexAliasImpl struct {
indexes []Index
mutex sync.RWMutex
open bool
// if all the indexes in tha alias have the same mapping
// then the user can set the mapping here to avoid
// checking the mapping of each index in the alias
mapping mapping.IndexMapping
}

// NewIndexAlias creates a new IndexAlias over the provided
Expand Down Expand Up @@ -168,7 +173,10 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
// indicates that this index alias is set as an Index
// in another alias, so we need to do a preSearch search
// and NOT a real search
return preSearchDataSearch(ctx, req, i.indexes...)
flags := &preSearchFlags{
knn: requestHasKNN(req), // set knn flag if the request has KNN
}
return preSearchDataSearch(ctx, req, flags, i.indexes...)
}

// at this point we know we are doing a real search
Expand All @@ -182,12 +190,10 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
// if necessary
var preSearchData map[string]map[string]interface{}
if req.PreSearchData != nil {
if requestHasKNN(req) {
var err error
preSearchData, err = redistributeKNNPreSearchData(req, i.indexes)
if err != nil {
return nil, err
}
var err error
preSearchData, err = redistributePreSearchData(req, i.indexes)
if err != nil {
return nil, err
}
}

Expand All @@ -208,9 +214,10 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
// - the request requires preSearch
var preSearchDuration time.Duration
var sr *SearchResult
if req.PreSearchData == nil && preSearchRequired(req) {
flags := preSearchRequired(req, i.mapping)
if req.PreSearchData == nil && flags != nil {
searchStart := time.Now()
preSearchResult, err := preSearch(ctx, req, i.indexes...)
preSearchResult, err := preSearch(ctx, req, flags, i.indexes...)
if err != nil {
return nil, err
}
Expand All @@ -221,17 +228,17 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
return preSearchResult, nil
}
// finalize the preSearch result now
finalizePreSearchResult(req, preSearchResult)
finalizePreSearchResult(req, flags, preSearchResult)

// if there are no errors, then merge the data in the preSearch result
// and construct the preSearchData to be used in the actual search
// if the request is satisfied by the preSearch result, then we can
// directly return the preSearch result as the final result
if requestSatisfiedByPreSearch(req) {
if requestSatisfiedByPreSearch(req, flags) {
sr = finalizeSearchResult(req, preSearchResult)
// no need to run the 2nd phase MultiSearch(..)
} else {
preSearchData, err = constructPreSearchData(req, preSearchResult, i.indexes)
preSearchData, err = constructPreSearchData(req, flags, preSearchResult, i.indexes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -352,6 +359,20 @@ func (i *indexAliasImpl) Close() error {
return nil
}

// SetIndexMapping sets the mapping for the alias and must be used
// ONLY when all the indexes in the alias have the same mapping.
// This is to avoid checking the mapping of each index in the alias
// when executing a search request.
func (i *indexAliasImpl) SetIndexMapping(m mapping.IndexMapping) error {
i.mutex.Lock()
defer i.mutex.Unlock()
if !i.open {
return ErrorIndexClosed
}
i.mapping = m
return nil
}

func (i *indexAliasImpl) Mapping() mapping.IndexMapping {
i.mutex.RLock()
defer i.mutex.RUnlock()
Expand All @@ -360,6 +381,11 @@ func (i *indexAliasImpl) Mapping() mapping.IndexMapping {
return nil
}

// if the mapping is already set, return it
if i.mapping != nil {
return i.mapping
}

err := i.isAliasToSingleIndex()
if err != nil {
return nil
Expand Down Expand Up @@ -520,21 +546,35 @@ type asyncSearchResult struct {
Err error
}

func preSearchRequired(req *SearchRequest) bool {
return requestHasKNN(req)
// preSearchFlags is a struct to hold flags indicating why preSearch is required
type preSearchFlags struct {
knn bool
}

// preSearchRequired checks if preSearch is required and returns the presearch flags struct
// indicating which preSearch is required
func preSearchRequired(req *SearchRequest, m mapping.IndexMapping) *preSearchFlags {
// Check for KNN query
knn := requestHasKNN(req)
if knn {
return &preSearchFlags{
knn: knn,
}
}
return nil
}

func preSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
func preSearch(ctx context.Context, req *SearchRequest, flags *preSearchFlags, indexes ...Index) (*SearchResult, error) {
// create a dummy request with a match none query
// since we only care about the preSearchData in PreSearch
dummyRequest := &SearchRequest{
Query: query.NewMatchNoneQuery(),
}
newCtx := context.WithValue(ctx, search.PreSearchKey, true)
if requestHasKNN(req) {
if flags.knn {
addKnnToDummyRequest(dummyRequest, req)
}
return preSearchDataSearch(newCtx, dummyRequest, indexes...)
return preSearchDataSearch(newCtx, dummyRequest, flags, indexes...)
}

// if the request is satisfied by just the preSearch result,
Expand Down Expand Up @@ -585,20 +625,26 @@ func finalizeSearchResult(req *SearchRequest, preSearchResult *SearchResult) *Se
return preSearchResult
}

func requestSatisfiedByPreSearch(req *SearchRequest) bool {
if requestHasKNN(req) && isKNNrequestSatisfiedByPreSearch(req) {
func requestSatisfiedByPreSearch(req *SearchRequest, flags *preSearchFlags) bool {
if flags == nil {
return false
}
if flags.knn && isKNNrequestSatisfiedByPreSearch(req) {
return true
}
return false
}

func constructPreSearchData(req *SearchRequest, preSearchResult *SearchResult, indexes []Index) (map[string]map[string]interface{}, error) {
func constructPreSearchData(req *SearchRequest, flags *preSearchFlags, preSearchResult *SearchResult, indexes []Index) (map[string]map[string]interface{}, error) {
if flags == nil || preSearchResult == nil {
return nil, fmt.Errorf("invalid input, flags: %v, preSearchResult: %v", flags, preSearchResult)
}
mergedOut := make(map[string]map[string]interface{}, len(indexes))
for _, index := range indexes {
mergedOut[index.Name()] = make(map[string]interface{})
}
var err error
if requestHasKNN(req) {
if flags.knn {
mergedOut, err = constructKnnPreSearchData(mergedOut, preSearchResult, indexes)
if err != nil {
return nil, err
Expand All @@ -607,7 +653,7 @@ func constructPreSearchData(req *SearchRequest, preSearchResult *SearchResult, i
return mergedOut, nil
}

func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
func preSearchDataSearch(ctx context.Context, req *SearchRequest, flags *preSearchFlags, indexes ...Index) (*SearchResult, error) {
asyncResults := make(chan *asyncSearchResult, len(indexes))
// run search on each index in separate go routine
var waitGroup sync.WaitGroup
Expand Down Expand Up @@ -638,7 +684,7 @@ func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Ind
if prp == nil {
// first valid preSearch result
// create a new preSearch result processor
prp = createPreSearchResultProcessor(req)
prp = createPreSearchResultProcessor(req, flags)
}
prp.add(asr.Result, asr.Name)
if sr == nil {
Expand Down Expand Up @@ -684,6 +730,45 @@ func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Ind
return sr, nil
}

// redistributePreSearchData redistributes the preSearchData sent in the search request to an index alias
// which would happen in the case of an alias tree and depending on the level of the tree, the preSearchData
// needs to be redistributed to the indexes at that level
func redistributePreSearchData(req *SearchRequest, indexes []Index) (map[string]map[string]interface{}, error) {
rv := make(map[string]map[string]interface{})
for _, index := range indexes {
rv[index.Name()] = make(map[string]interface{})
}
if knnHits, ok := req.PreSearchData[search.KnnPreSearchDataKey].([]*search.DocumentMatch); ok {
// the preSearchData for KNN is a list of DocumentMatch objects
// that need to be redistributed to the right index.
// This is used only in the case of an alias tree, where the indexes
// are at the leaves of the tree, and the master alias is at the root.
// At each level of the tree, the preSearchData needs to be redistributed
// to the indexes/aliases at that level. Because the preSearchData is
// specific to each final index at the leaf.
segregatedKnnHits, err := validateAndDistributeKNNHits(knnHits, indexes)
if err != nil {
return nil, err
}
for _, index := range indexes {
rv[index.Name()][search.KnnPreSearchDataKey] = segregatedKnnHits[index.Name()]
}
}
return rv, nil
}

// finalizePreSearchResult finalizes the preSearch result by applying the finalization steps
// specific to the preSearch flags
func finalizePreSearchResult(req *SearchRequest, flags *preSearchFlags, preSearchResult *SearchResult) {
// if flags is nil then return
if flags == nil {
return
}
if flags.knn {
preSearchResult.Hits = finalizeKNNResults(req, preSearchResult.Hits)
}
}

// hitsInCurrentPage returns the hits in the current page
// using the From and Size parameters in the request
func hitsInCurrentPage(req *SearchRequest, hits []*search.DocumentMatch) []*search.DocumentMatch {
Expand Down
49 changes: 42 additions & 7 deletions pre_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type preSearchResultProcessor interface {
finalize(*SearchResult)
}

// -----------------------------------------------------------------------------
// KNN preSearchResultProcessor for handling KNN presearch results
type knnPreSearchResultProcessor struct {
addFn func(sr *SearchResult, indexName string)
finalizeFn func(sr *SearchResult)
Expand All @@ -44,16 +46,49 @@ func (k *knnPreSearchResultProcessor) finalize(sr *SearchResult) {
}

// -----------------------------------------------------------------------------
// Master struct that can hold any number of presearch result processors
type compositePreSearchResultProcessor struct {
presearchResultProcessors []preSearchResultProcessor
}

// Implements the add method, which forwards to all the internal processors
func (m *compositePreSearchResultProcessor) add(sr *SearchResult, indexName string) {
for _, p := range m.presearchResultProcessors {
p.add(sr, indexName)
}
}

func finalizePreSearchResult(req *SearchRequest, preSearchResult *SearchResult) {
if requestHasKNN(req) {
preSearchResult.Hits = finalizeKNNResults(req, preSearchResult.Hits)
// Implements the finalize method, which forwards to all the internal processors
func (m *compositePreSearchResultProcessor) finalize(sr *SearchResult) {
for _, p := range m.presearchResultProcessors {
p.finalize(sr)
}
}

func createPreSearchResultProcessor(req *SearchRequest) preSearchResultProcessor {
if requestHasKNN(req) {
return newKnnPreSearchResultProcessor(req)
// -----------------------------------------------------------------------------
// Function to create the appropriate preSearchResultProcessor(s)
func createPreSearchResultProcessor(req *SearchRequest, flags *preSearchFlags) preSearchResultProcessor {
// return nil for invalid input
if flags == nil || req == nil {
return nil
}
var processors []preSearchResultProcessor
// Add KNN processor if the request has KNN
if flags.knn {
if knnProcessor := newKnnPreSearchResultProcessor(req); knnProcessor != nil {
processors = append(processors, knnProcessor)
}
}
// Return based on the number of processors, optimizing for the common case of 1 processor
// If there are no processors, return nil
switch len(processors) {
case 0:
return nil
case 1:
return processors[0]
default:
return &compositePreSearchResultProcessor{
presearchResultProcessors: processors,
}
}
return &knnPreSearchResultProcessor{} // equivalent to nil
}
5 changes: 5 additions & 0 deletions search.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,3 +589,8 @@ func (r *SearchRequest) SortFunc() func(data sort.Interface) {

return sort.Sort
}

func isMatchNoneQuery(q query.Query) bool {
_, ok := q.(*query.MatchNoneQuery)
return ok
}
54 changes: 54 additions & 0 deletions search/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,3 +423,57 @@ func DumpQuery(m mapping.IndexMapping, query Query) (string, error) {
data, err := json.MarshalIndent(q, "", " ")
return string(data), err
}

// FieldSet represents a set of queried fields.
type FieldSet map[string]struct{}

// ExtractFields returns a set of fields referenced by the query.
// The returned set may be nil if the query does not explicitly reference any field
// and the DefaultSearchField is unset in the index mapping.
func ExtractFields(q Query, m mapping.IndexMapping, fs FieldSet) (FieldSet, error) {
if q == nil || m == nil {
return fs, nil
}
var err error
switch q := q.(type) {
case FieldableQuery:
f := q.Field()
if f == "" {
f = m.DefaultSearchField()
}
if f != "" {
if fs == nil {
fs = make(FieldSet)
}
fs[f] = struct{}{}
}
case *QueryStringQuery:
var expandedQuery Query
expandedQuery, err = expandQuery(m, q)
if err == nil {
fs, err = ExtractFields(expandedQuery, m, fs)
}
case *BooleanQuery:
for _, subq := range []Query{q.Must, q.Should, q.MustNot} {
fs, err = ExtractFields(subq, m, fs)
if err != nil {
break
}
}
case *ConjunctionQuery:
for _, subq := range q.Conjuncts {
fs, err = ExtractFields(subq, m, fs)
if err != nil {
break
}
}
case *DisjunctionQuery:
for _, subq := range q.Disjuncts {
fs, err = ExtractFields(subq, m, fs)
if err != nil {
break
}
}
}
return fs, err
}
Loading

0 comments on commit ff6ba91

Please sign in to comment.