Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling a case to avoid bringing faiss index to memory unnecessarily #279

Merged
merged 2 commits into from
Oct 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 41 additions & 27 deletions section_faiss_vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ func (v *faissVectorIndexSection) AddrForField(opaque map[int]resetable, fieldID
return vo.fieldAddrs[uint16(fieldID)]
}

// metadata corresponding to a serialized vector index
type vecIndexMeta struct {
// information specific to a vector index - (including metadata and
// the index pointer itself)
type vecIndexInfo struct {
startOffset int
indexSize uint64
vecIds []int64
indexOptimizedFor string
index *faiss.IndexImpl
}

// keep in mind with respect to update and delete operations with respect to vectors
Expand All @@ -87,7 +89,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se
// in the segment this will help by avoiding multiple allocation
// calls.
vecSegs := make([]*SegmentBase, 0, len(segments))
indexes := make([]*vecIndexMeta, 0, len(segments))
indexes := make([]*vecIndexInfo, 0, len(segments))

for fieldID, fieldName := range fieldsInv {
indexes = indexes[:0] // resizing the slices
Expand Down Expand Up @@ -128,7 +130,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se
pos += n

vecSegs = append(vecSegs, sb)
indexes = append(indexes, &vecIndexMeta{
indexes = append(indexes, &vecIndexInfo{
vecIds: make([]int64, 0, numVecs),
indexOptimizedFor: index.VectorIndexOptimizationsReverseLookup[int(indexOptimizationTypeInt)],
})
Expand Down Expand Up @@ -182,7 +184,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se
}

func (v *vectorIndexOpaque) flushSectionMetadata(fieldID int, w *CountHashWriter,
vecToDocID map[int64]uint64, indexes []*vecIndexMeta) error {
vecToDocID map[int64]uint64, indexes []*vecIndexInfo) error {
tempBuf := v.grabBuf(binary.MaxVarintLen64)

// early exit if there are absolutely no valid vectors present in the segment
Expand Down Expand Up @@ -275,9 +277,14 @@ func calculateNprobe(nlist int, indexOptimizedFor string) int32 {
// todo: naive implementation. need to keep in mind the perf implications and improve on this.
// perhaps, parallelized merging can help speed things up over here.
func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
indexes []*vecIndexMeta, w *CountHashWriter, closeCh chan struct{}) error {
vecIndexes []*vecIndexInfo, w *CountHashWriter, closeCh chan struct{}) error {

vecIndexes := make([]*faiss.IndexImpl, 0, len(sbs))
// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
var dims, metric int
var indexOptimizedFor string

var validMerge bool
var finalVecIDCap, indexDataCap, reconsCap int
for segI, segBase := range sbs {
// Considering merge operations on vector indexes are expensive, it is
Expand All @@ -287,26 +294,37 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
freeReconstructedIndexes(vecIndexes)
return seg.ErrClosed
}
if len(vecIndexes[segI].vecIds) == 0 {
// no valid vectors for this index, don't bring it into memory
continue
}

// read the index bytes. todo: parallelize this
indexBytes := segBase.mem[indexes[segI].startOffset : indexes[segI].startOffset+int(indexes[segI].indexSize)]
indexBytes := segBase.mem[vecIndexes[segI].startOffset : vecIndexes[segI].startOffset+int(vecIndexes[segI].indexSize)]
index, err := faiss.ReadIndexFromBuffer(indexBytes, faissIOFlags)
if err != nil {
freeReconstructedIndexes(vecIndexes)
return err
}
if len(indexes[segI].vecIds) > 0 {
indexReconsLen := len(indexes[segI].vecIds) * index.D()
if len(vecIndexes[segI].vecIds) > 0 {
indexReconsLen := len(vecIndexes[segI].vecIds) * index.D()
if indexReconsLen > reconsCap {
reconsCap = indexReconsLen
}
indexDataCap += indexReconsLen
finalVecIDCap += len(indexes[segI].vecIds)
finalVecIDCap += len(vecIndexes[segI].vecIds)
}
vecIndexes = append(vecIndexes, index)
vecIndexes[segI].index = index

validMerge = true
// set the dims and metric values from the constructed index.
dims = index.D()
metric = int(index.MetricType())
indexOptimizedFor = vecIndexes[segI].indexOptimizedFor
}

// no vector indexes to merge
if len(vecIndexes) == 0 {
// not a valid merge operation as there are no valid indexes to merge.
if !validMerge {
return nil
}

Expand All @@ -326,18 +344,18 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,

// reconstruct the vectors only if present, it could be that
// some of the indexes had all of their vectors updated/deleted.
if len(indexes[i].vecIds) > 0 {
neededReconsLen := len(indexes[i].vecIds) * vecIndexes[i].D()
if len(vecIndexes[i].vecIds) > 0 {
neededReconsLen := len(vecIndexes[i].vecIds) * vecIndexes[i].index.D()
recons = recons[:neededReconsLen]
// todo: parallelize reconstruction
recons, err = vecIndexes[i].ReconstructBatch(indexes[i].vecIds, recons)
recons, err = vecIndexes[i].index.ReconstructBatch(vecIndexes[i].vecIds, recons)
if err != nil {
freeReconstructedIndexes(vecIndexes)
return err
}
indexData = append(indexData, recons...)
// Adding vector IDs in the same order as the vectors
finalVecIDs = append(finalVecIDs, indexes[i].vecIds...)
finalVecIDs = append(finalVecIDs, vecIndexes[i].vecIds...)
}
}

Expand All @@ -351,12 +369,6 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,

nvecs := len(finalVecIDs)

// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
dims := vecIndexes[0].D()
metric := vecIndexes[0].MetricType()
indexOptimizedFor := indexes[0].indexOptimizedFor

// index type to be created after merge based on the number of vectors
// in indexData added into the index.
nlist := determineCentroids(nvecs)
Expand Down Expand Up @@ -419,9 +431,11 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
}

// todo: can be parallelized.
func freeReconstructedIndexes(indexes []*faiss.IndexImpl) {
for _, index := range indexes {
index.Close()
func freeReconstructedIndexes(indexes []*vecIndexInfo) {
for _, entry := range indexes {
if entry.index != nil {
entry.index.Close()
}
}
}

Expand Down
Loading