Skip to content

Commit

Permalink
MB-57888: WIP: Updated Merge Process to Support Index Update
Browse files Browse the repository at this point in the history
  • Loading branch information
Likith101 committed Oct 30, 2024
1 parent ef2b41d commit 5290ba3
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 17 deletions.
2 changes: 2 additions & 0 deletions build.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math"
"os"

index "github.com/blevesearch/bleve_index_api"
"github.com/blevesearch/vellum"
)

Expand Down Expand Up @@ -169,6 +170,7 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64
sectionsIndexOffset: sectionsIndexOffset,
fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)),
docValueOffset: 0, // docValueOffsets identified automatically by the section
updatedFields: make(map[string]index.FieldInfo),

Check failure on line 173 in build.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 173 in build.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 173 in build.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 173 in build.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 173 in build.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 173 in build.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo
fieldFSTs: make(map[uint16]*vellum.FST),
vecIndexCache: newVectorIndexCache(),
// following fields gets populated by loadFieldsNew
Expand Down
61 changes: 54 additions & 7 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sort"

"github.com/RoaringBitmap/roaring"
index "github.com/blevesearch/bleve_index_api"
seg "github.com/blevesearch/scorch_segment_api/v2"
"github.com/golang/snappy"
)
Expand All @@ -48,6 +49,9 @@ func (*ZapPlugin) Merge(segments []seg.Segment, drops []*roaring.Bitmap, path st
default:
panic(fmt.Sprintf("oops, unexpected segment type: %T", segment))
}
if s, ok := segment.(seg.UpdatableSegment); ok {

Check failure on line 52 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: seg.UpdatableSegment

Check failure on line 52 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: seg.UpdatableSegment

Check failure on line 52 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: seg.UpdatableSegment

Check failure on line 52 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: seg.UpdatableSegment

Check failure on line 52 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: seg.UpdatableSegment

Check failure on line 52 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: seg.UpdatableSegment
segmentBases[segmenti].updatedFields = s.UpdatedFields()
}
}
return mergeSegmentBases(segmentBases, drops, path, DefaultChunkMode, closeCh, s)
}
Expand Down Expand Up @@ -109,6 +113,19 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat
return newDocNums, uint64(cr.Count()), nil
}

func filterFields(fieldsInv []string, fieldInfo map[string]*index.FieldInfo) []string {

Check failure on line 116 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 116 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 116 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 116 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 116 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 116 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo
rv := make([]string, 0)
for _, field := range fieldsInv {
if val, ok := fieldInfo[field]; ok {
if val.All {
continue
}
}
rv = append(rv, field)
}
return rv
}

func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}) (
newDocNums [][]uint64, numDocs, storedIndexOffset uint64,
Expand All @@ -117,6 +134,8 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,

var fieldsSame bool
fieldsSame, fieldsInv = mergeFields(segments)
updatedFields := mergeUpdatedFields(segments)
fieldsInv = filterFields(fieldsInv, updatedFields)
fieldsMap = mapFields(fieldsInv)

numDocs = computeNewDocCount(segments, drops)
Expand All @@ -130,15 +149,16 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
// offsets in the fields section index of the file (the final merged file).
mergeOpaque := map[int]resetable{}
args := map[string]interface{}{
"chunkMode": chunkMode,
"fieldsSame": fieldsSame,
"fieldsMap": fieldsMap,
"numDocs": numDocs,
"chunkMode": chunkMode,
"fieldsSame": fieldsSame,
"fieldsMap": fieldsMap,
"numDocs": numDocs,
"updatedFields": updatedFields,
}

if numDocs > 0 {
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh)
fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh, updatedFields)
if err != nil {
return nil, 0, 0, nil, nil, 0, err
}
Expand Down Expand Up @@ -358,7 +378,7 @@ type varintEncoder func(uint64) (int, error)

func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64,
w *CountHashWriter, closeCh chan struct{}) (uint64, [][]uint64, error) {
w *CountHashWriter, closeCh chan struct{}, updatedFields map[string]*index.FieldInfo) (uint64, [][]uint64, error) {

Check failure on line 381 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 381 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 381 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 381 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 381 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 381 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo
var rv [][]uint64 // The remapped or newDocNums for each segment.

var newDocNum uint64
Expand Down Expand Up @@ -397,7 +417,7 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
// optimize when the field mapping is the same across all
// segments and there are no deletions, via byte-copying
// of stored docs bytes directly to the writer
if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) {
if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) && len(updatedFields) == 0 {
err := segment.copyStoredDocs(newDocNum, docNumOffsets, w)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -471,6 +491,11 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,

// now walk the non-"_id" fields in order
for fieldID := 1; fieldID < len(fieldsInv); fieldID++ {
if val, ok := updatedFields[fieldsInv[fieldID]]; ok {
if val.Store {
continue
}
}
storedFieldValues := vals[fieldID]

stf := typs[fieldID]
Expand Down Expand Up @@ -606,6 +631,28 @@ func mergeFields(segments []*SegmentBase) (bool, []string) {
return fieldsSame, rv
}

func mergeUpdatedFields(segments []*SegmentBase) map[string]*index.FieldInfo {

Check failure on line 634 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 634 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 634 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 634 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 634 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 634 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo
fieldInfo := make(map[string]*index.FieldInfo)

Check failure on line 635 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 635 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 635 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 635 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 635 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 635 in merge.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo

for _, segment := range segments {
for field, info := range segment.updatedFields {
if _, ok := fieldInfo[field]; !ok {
fieldInfo[field] = &index.FieldInfo{
All: info.All,
Index: info.Index,
Store: info.Store,
}
} else {
fieldInfo[field].All = fieldInfo[field].All || info.All
fieldInfo[field].Index = fieldInfo[field].Index || info.Index
fieldInfo[field].Store = fieldInfo[field].Store || info.Store
}
}

}
return fieldInfo
}

func isClosed(closeCh chan struct{}) bool {
select {
case <-closeCh:
Expand Down
16 changes: 13 additions & 3 deletions section_faiss_vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se
if _, ok := sb.fieldsMap[fieldName]; !ok {
continue
}
if vo.updatedFields[fieldName].Index {
continue
}

// check if the section address is a valid one for "fieldName" in the
// segment sb. the local fieldID (fetched by the fieldsMap of the sb)
Expand Down Expand Up @@ -689,9 +692,10 @@ func (v *faissVectorIndexSection) getvectorIndexOpaque(opaque map[int]resetable)

func (v *faissVectorIndexSection) InitOpaque(args map[string]interface{}) resetable {
rv := &vectorIndexOpaque{
fieldAddrs: make(map[uint16]int),
vecIDMap: make(map[int64]*vecInfo),
vecFieldMap: make(map[uint16]*indexContent),
fieldAddrs: make(map[uint16]int),
vecIDMap: make(map[int64]*vecInfo),
vecFieldMap: make(map[uint16]*indexContent),
updatedFields: make(map[string]*index.FieldInfo),
}
for k, v := range args {
rv.Set(k, v)
Expand Down Expand Up @@ -730,6 +734,8 @@ type vectorIndexOpaque struct {
// index to be build.
vecFieldMap map[uint16]*indexContent

updatedFields map[string]*index.FieldInfo

tmp0 []byte
}

Expand Down Expand Up @@ -776,4 +782,8 @@ func (v *vectorIndexOpaque) Reset() (err error) {
}

func (v *vectorIndexOpaque) Set(key string, val interface{}) {
switch key {
case "updatedFields":
v.updatedFields = val.(map[string]*index.FieldInfo)
}
}
27 changes: 20 additions & 7 deletions section_inverted_text_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func (i *invertedTextIndexSection) AddrForField(opaque map[int]resetable, fieldI
func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
fieldsInv []string, fieldsMap map[string]uint16, fieldsSame bool,
newDocNumsIn [][]uint64, newSegDocCount uint64, chunkMode uint32,
w *CountHashWriter, closeCh chan struct{}) (map[int]int, uint64, error) {
updatedFields map[string]*index.FieldInfo, w *CountHashWriter,

Check failure on line 66 in section_inverted_text_index.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 66 in section_inverted_text_index.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 66 in section_inverted_text_index.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 66 in section_inverted_text_index.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 66 in section_inverted_text_index.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 66 in section_inverted_text_index.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo
closeCh chan struct{}) (map[int]int, uint64, error) {
var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64)
var bufLoc []uint64

Expand Down Expand Up @@ -107,9 +108,15 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.
return nil, 0, seg.ErrClosed
}

dict, err2 := segment.dictionary(fieldName)
if err2 != nil {
return nil, 0, err2
var dict *Dictionary
var err2 error
if !updatedFields[fieldName].Index {
dict, err2 = segment.dictionary(fieldName)
if err2 != nil {
return nil, 0, err2
}
} else {
dict = nil
}
if dict != nil && dict.fst != nil {
itr, err2 := dict.fst.Iterator(nil, nil)
Expand Down Expand Up @@ -225,7 +232,7 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.

postItr = postings.iterator(true, true, true, postItr)

if fieldsSame {
if fieldsSame && len(updatedFields) == 0 {
// can optimize by copying freq/norm/loc bytes directly
lastDocNum, lastFreq, lastNorm, err = mergeTermFreqNormLocsByCopying(
term, postItr, newDocNums[itrI], newRoaring,
Expand Down Expand Up @@ -298,7 +305,9 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.
if isClosed(closeCh) {
return nil, 0, seg.ErrClosed
}

if updatedFields[fieldName].DocValues {
continue
}
fieldIDPlus1 := uint16(segment.fieldsMap[fieldName])
if dvIter, exists := segment.fieldDvReaders[SectionInvertedTextIndex][fieldIDPlus1-1]; exists &&
dvIter != nil {
Expand Down Expand Up @@ -379,7 +388,7 @@ func (i *invertedTextIndexSection) Merge(opaque map[int]resetable, segments []*S
w *CountHashWriter, closeCh chan struct{}) error {
io := i.getInvertedIndexOpaque(opaque)
fieldAddrs, _, err := mergeAndPersistInvertedSection(segments, drops, fieldsInv,
io.FieldsMap, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, w, closeCh)
io.FieldsMap, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, io.updatedFields, w, closeCh)
if err != nil {
return err
}
Expand Down Expand Up @@ -948,6 +957,8 @@ type invertedIndexOpaque struct {

fieldAddrs map[int]int

updatedFields map[string]*index.FieldInfo

Check failure on line 960 in section_inverted_text_index.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 960 in section_inverted_text_index.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 960 in section_inverted_text_index.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 960 in section_inverted_text_index.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 960 in section_inverted_text_index.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 960 in section_inverted_text_index.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo

bytesWritten uint64
fieldsSame bool
numDocs uint64
Expand Down Expand Up @@ -1015,5 +1026,7 @@ func (i *invertedIndexOpaque) Set(key string, val interface{}) {
i.FieldsMap = val.(map[string]uint16)
case "numDocs":
i.numDocs = val.(uint64)
case "updatedFields":
i.updatedFields = val.(map[string]*index.FieldInfo)
}
}
7 changes: 7 additions & 0 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"unsafe"

"github.com/RoaringBitmap/roaring"
index "github.com/blevesearch/bleve_index_api"
mmap "github.com/blevesearch/mmap-go"
segment "github.com/blevesearch/scorch_segment_api/v2"
"github.com/blevesearch/vellum"
Expand Down Expand Up @@ -108,6 +109,8 @@ type SegmentBase struct {
bytesRead uint64
bytesWritten uint64

updatedFields map[string]index.FieldInfo

Check failure on line 112 in segment.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 112 in segment.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 112 in segment.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 112 in segment.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 112 in segment.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 112 in segment.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo

m sync.Mutex
fieldFSTs map[uint16]*vellum.FST

Expand Down Expand Up @@ -884,3 +887,7 @@ func (s *SegmentBase) loadDvReaders() error {

return nil
}

func (s *SegmentBase) UpdatedFields() map[string]index.FieldInfo {

Check failure on line 891 in segment.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 891 in segment.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 891 in segment.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 891 in segment.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 891 in segment.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 891 in segment.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo
return s.updatedFields
}

0 comments on commit 5290ba3

Please sign in to comment.