Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Presearch Code Refactor #2112

Merged
merged 6 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 94 additions & 18 deletions index_alias_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type indexAliasImpl struct {
indexes []Index
mutex sync.RWMutex
open bool
// if all the indexes in tha alias have the same mapping
// then the user can set the mapping here to avoid
// checking the mapping of each index in the alias
mapping mapping.IndexMapping
}

// NewIndexAlias creates a new IndexAlias over the provided
Expand Down Expand Up @@ -168,7 +172,10 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
// indicates that this index alias is set as an Index
// in another alias, so we need to do a preSearch search
// and NOT a real search
return preSearchDataSearch(ctx, req, i.indexes...)
flags := &preSearchFlags{
knn: requestHasKNN(req), // set knn flag if the request has KNN
}
return preSearchDataSearch(ctx, req, flags, i.indexes...)
}

// at this point we know we are doing a real search
Expand All @@ -184,7 +191,7 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
if req.PreSearchData != nil {
if requestHasKNN(req) {
CascadingRadium marked this conversation as resolved.
Show resolved Hide resolved
var err error
preSearchData, err = redistributeKNNPreSearchData(req, i.indexes)
preSearchData, err = redistributePreSearchData(req, i.indexes)
if err != nil {
return nil, err
}
Expand All @@ -208,9 +215,10 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
// - the request requires preSearch
var preSearchDuration time.Duration
var sr *SearchResult
if req.PreSearchData == nil && preSearchRequired(req) {
flags := preSearchRequired(req, i.mapping)
if req.PreSearchData == nil && flags != nil {
searchStart := time.Now()
preSearchResult, err := preSearch(ctx, req, i.indexes...)
preSearchResult, err := preSearch(ctx, req, flags, i.indexes...)
if err != nil {
return nil, err
}
Expand All @@ -221,17 +229,17 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
return preSearchResult, nil
}
// finalize the preSearch result now
finalizePreSearchResult(req, preSearchResult)
finalizePreSearchResult(req, flags, preSearchResult)

// if there are no errors, then merge the data in the preSearch result
// and construct the preSearchData to be used in the actual search
// if the request is satisfied by the preSearch result, then we can
// directly return the preSearch result as the final result
if requestSatisfiedByPreSearch(req) {
if requestSatisfiedByPreSearch(req, flags) {
sr = finalizeSearchResult(req, preSearchResult)
// no need to run the 2nd phase MultiSearch(..)
} else {
preSearchData, err = constructPreSearchData(req, preSearchResult, i.indexes)
preSearchData, err = constructPreSearchData(req, flags, preSearchResult, i.indexes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -352,6 +360,20 @@ func (i *indexAliasImpl) Close() error {
return nil
}

// SetIndexMapping sets the mapping for the alias and must be used
// ONLY when all the indexes in the alias have the same mapping.
// This is to avoid checking the mapping of each index in the alias
// when executing a search request.
func (i *indexAliasImpl) SetIndexMapping(m mapping.IndexMapping) error {
i.mutex.Lock()
defer i.mutex.Unlock()
if !i.open {
return ErrorIndexClosed
}
i.mapping = m
return nil
}

func (i *indexAliasImpl) Mapping() mapping.IndexMapping {
i.mutex.RLock()
defer i.mutex.RUnlock()
Expand All @@ -360,6 +382,11 @@ func (i *indexAliasImpl) Mapping() mapping.IndexMapping {
return nil
}

// if the mapping is already set, return it
if i.mapping != nil {
return i.mapping
}

err := i.isAliasToSingleIndex()
if err != nil {
return nil
Expand Down Expand Up @@ -520,21 +547,35 @@ type asyncSearchResult struct {
Err error
}

func preSearchRequired(req *SearchRequest) bool {
return requestHasKNN(req)
// preSearchFlags is a struct to hold flags indicating why preSearch is required
type preSearchFlags struct {
knn bool
}

// preSearchRequired checks if preSearch is required and returns the presearch flags struct
// indicating which preSearch is required
func preSearchRequired(req *SearchRequest, m mapping.IndexMapping) *preSearchFlags {
// Check for KNN query
knn := requestHasKNN(req)
if knn {
return &preSearchFlags{
knn: knn,
}
}
return nil
}

func preSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
func preSearch(ctx context.Context, req *SearchRequest, flags *preSearchFlags, indexes ...Index) (*SearchResult, error) {
// create a dummy request with a match none query
// since we only care about the preSearchData in PreSearch
dummyRequest := &SearchRequest{
Query: query.NewMatchNoneQuery(),
}
newCtx := context.WithValue(ctx, search.PreSearchKey, true)
if requestHasKNN(req) {
if flags.knn {
addKnnToDummyRequest(dummyRequest, req)
}
return preSearchDataSearch(newCtx, dummyRequest, indexes...)
return preSearchDataSearch(newCtx, dummyRequest, flags, indexes...)
}

// if the request is satisfied by just the preSearch result,
Expand Down Expand Up @@ -585,20 +626,20 @@ func finalizeSearchResult(req *SearchRequest, preSearchResult *SearchResult) *Se
return preSearchResult
}

func requestSatisfiedByPreSearch(req *SearchRequest) bool {
if requestHasKNN(req) && isKNNrequestSatisfiedByPreSearch(req) {
func requestSatisfiedByPreSearch(req *SearchRequest, flags *preSearchFlags) bool {
CascadingRadium marked this conversation as resolved.
Show resolved Hide resolved
if flags.knn && isKNNrequestSatisfiedByPreSearch(req) {
return true
}
return false
}

func constructPreSearchData(req *SearchRequest, preSearchResult *SearchResult, indexes []Index) (map[string]map[string]interface{}, error) {
func constructPreSearchData(req *SearchRequest, flags *preSearchFlags, preSearchResult *SearchResult, indexes []Index) (map[string]map[string]interface{}, error) {
mergedOut := make(map[string]map[string]interface{}, len(indexes))
for _, index := range indexes {
mergedOut[index.Name()] = make(map[string]interface{})
}
var err error
if requestHasKNN(req) {
if flags.knn {
mergedOut, err = constructKnnPreSearchData(mergedOut, preSearchResult, indexes)
if err != nil {
return nil, err
Expand All @@ -607,7 +648,7 @@ func constructPreSearchData(req *SearchRequest, preSearchResult *SearchResult, i
return mergedOut, nil
}

func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
func preSearchDataSearch(ctx context.Context, req *SearchRequest, flags *preSearchFlags, indexes ...Index) (*SearchResult, error) {
asyncResults := make(chan *asyncSearchResult, len(indexes))
// run search on each index in separate go routine
var waitGroup sync.WaitGroup
Expand Down Expand Up @@ -638,7 +679,7 @@ func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Ind
if prp == nil {
// first valid preSearch result
// create a new preSearch result processor
prp = createPreSearchResultProcessor(req)
prp = createPreSearchResultProcessor(req, flags)
}
prp.add(asr.Result, asr.Name)
if sr == nil {
Expand Down Expand Up @@ -684,6 +725,41 @@ func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Ind
return sr, nil
}

// redistributePreSearchData redistributes the preSearchData sent in the search request to an index alias
// which would happen in the case of an alias tree and depending on the level of the tree, the preSearchData
// needs to be redistributed to the indexes at that level
func redistributePreSearchData(req *SearchRequest, indexes []Index) (map[string]map[string]interface{}, error) {
rv := make(map[string]map[string]interface{})
for _, index := range indexes {
rv[index.Name()] = make(map[string]interface{})
}
if knnHits, ok := req.PreSearchData[search.KnnPreSearchDataKey].([]*search.DocumentMatch); ok {
// the preSearchData for KNN is a list of DocumentMatch objects
// that need to be redistributed to the right index.
// This is used only in the case of an alias tree, where the indexes
// are at the leaves of the tree, and the master alias is at the root.
// At each level of the tree, the preSearchData needs to be redistributed
// to the indexes/aliases at that level. Because the preSearchData is
// specific to each final index at the leaf.
segregatedKnnHits, err := validateAndDistributeKNNHits(knnHits, indexes)
if err != nil {
return nil, err
}
for _, index := range indexes {
rv[index.Name()][search.KnnPreSearchDataKey] = segregatedKnnHits[index.Name()]
}
}
return rv, nil
}

// finalizePreSearchResult finalizes the preSearch result by applying the finalization steps
// specific to the preSearch flags
func finalizePreSearchResult(req *SearchRequest, flags *preSearchFlags, preSearchResult *SearchResult) {
if flags.knn {
preSearchResult.Hits = finalizeKNNResults(req, preSearchResult.Hits)
}
}

// hitsInCurrentPage returns the hits in the current page
// using the From and Size parameters in the request
func hitsInCurrentPage(req *SearchRequest, hits []*search.DocumentMatch) []*search.DocumentMatch {
Expand Down
45 changes: 38 additions & 7 deletions pre_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type preSearchResultProcessor interface {
finalize(*SearchResult)
}

// -----------------------------------------------------------------------------
// KNN preSearchResultProcessor for handling KNN presearch results
type knnPreSearchResultProcessor struct {
addFn func(sr *SearchResult, indexName string)
finalizeFn func(sr *SearchResult)
Expand All @@ -44,16 +46,45 @@ func (k *knnPreSearchResultProcessor) finalize(sr *SearchResult) {
}

// -----------------------------------------------------------------------------
// Master struct that can hold any number of presearch result processors
type compositePreSearchResultProcessor struct {
presearchResultProcessors []preSearchResultProcessor
}

func finalizePreSearchResult(req *SearchRequest, preSearchResult *SearchResult) {
if requestHasKNN(req) {
preSearchResult.Hits = finalizeKNNResults(req, preSearchResult.Hits)
// Implements the add method, which forwards to all the internal processors
func (m *compositePreSearchResultProcessor) add(sr *SearchResult, indexName string) {
for _, p := range m.presearchResultProcessors {
p.add(sr, indexName)
}
}

func createPreSearchResultProcessor(req *SearchRequest) preSearchResultProcessor {
if requestHasKNN(req) {
return newKnnPreSearchResultProcessor(req)
// Implements the finalize method, which forwards to all the internal processors
func (m *compositePreSearchResultProcessor) finalize(sr *SearchResult) {
for _, p := range m.presearchResultProcessors {
p.finalize(sr)
}
}

// -----------------------------------------------------------------------------
// Function to create the appropriate preSearchResultProcessor(s)
func createPreSearchResultProcessor(req *SearchRequest, flags *preSearchFlags) preSearchResultProcessor {
var processors []preSearchResultProcessor
// Add KNN processor if the request has KNN
if flags.knn {
if knnProcessor := newKnnPreSearchResultProcessor(req); knnProcessor != nil {
processors = append(processors, knnProcessor)
}
}
// Return based on the number of processors, optimizing for the common case of 1 processor
// If there are no processors, return nil
switch len(processors) {
case 0:
return nil
case 1:
return processors[0]
default:
return &compositePreSearchResultProcessor{
presearchResultProcessors: processors,
}
}
return &knnPreSearchResultProcessor{} // equivalent to nil
}
5 changes: 5 additions & 0 deletions search.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,3 +589,8 @@ func (r *SearchRequest) SortFunc() func(data sort.Interface) {

return sort.Sort
}

func isMatchNoneQuery(q query.Query) bool {
CascadingRadium marked this conversation as resolved.
Show resolved Hide resolved
_, ok := q.(*query.MatchNoneQuery)
return ok
}
39 changes: 2 additions & 37 deletions search_knn.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (i *indexImpl) runKnnCollector(ctx context.Context, req *SearchRequest, rea
continue
}

if _, ok := filterQ.(*query.MatchNoneQuery); ok {
if isMatchNoneQuery(filterQ) {
// Filtering required since no hits are eligible.
requiresFiltering[idx] = true
// a match none query just means none the documents are eligible
Expand Down Expand Up @@ -559,7 +559,7 @@ func requestHasKNN(req *SearchRequest) bool {
func isKNNrequestSatisfiedByPreSearch(req *SearchRequest) bool {
// if req.Query is not match_none => then we need to go to phase 2
// to perform the actual query.
if _, ok := req.Query.(*query.MatchNoneQuery); !ok {
if !isMatchNoneQuery(req.Query) {
return false
}
// req.Query is a match_none query
Expand Down Expand Up @@ -598,41 +598,6 @@ func addKnnToDummyRequest(dummyReq *SearchRequest, realReq *SearchRequest) {
dummyReq.Sort = realReq.Sort
}

// the preSearchData for KNN is a list of DocumentMatch objects
// that need to be redistributed to the right index.
// This is used only in the case of an alias tree, where the indexes
// are at the leaves of the tree, and the master alias is at the root.
// At each level of the tree, the preSearchData needs to be redistributed
// to the indexes/aliases at that level. Because the preSearchData is
// specific to each final index at the leaf.
func redistributeKNNPreSearchData(req *SearchRequest, indexes []Index) (map[string]map[string]interface{}, error) {
knnHits, ok := req.PreSearchData[search.KnnPreSearchDataKey].([]*search.DocumentMatch)
if !ok {
return nil, fmt.Errorf("request does not have knn preSearchData for redistribution")
}
segregatedKnnHits, err := validateAndDistributeKNNHits(knnHits, indexes)
if err != nil {
return nil, err
}

rv := make(map[string]map[string]interface{})
for _, index := range indexes {
rv[index.Name()] = make(map[string]interface{})
}

for _, index := range indexes {
for k, v := range req.PreSearchData {
switch k {
case search.KnnPreSearchDataKey:
rv[index.Name()][k] = segregatedKnnHits[index.Name()]
default:
rv[index.Name()][k] = v
}
}
}
return rv, nil
}

func newKnnPreSearchResultProcessor(req *SearchRequest) *knnPreSearchResultProcessor {
kArray := make([]int64, len(req.KNN))
for i, knnReq := range req.KNN {
Expand Down
2 changes: 1 addition & 1 deletion search_no_knn.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func requestHasKNN(req *SearchRequest) bool {
func addKnnToDummyRequest(dummyReq *SearchRequest, realReq *SearchRequest) {
}

func redistributeKNNPreSearchData(req *SearchRequest, indexes []Index) (map[string]map[string]interface{}, error) {
func validateAndDistributeKNNHits(knnHits []*search.DocumentMatch, indexes []Index) (map[string][]*search.DocumentMatch, error) {
return nil, nil
}

Expand Down
Loading