diff --git a/build.go b/build.go index 53fd34d1..cbbd2abc 100644 --- a/build.go +++ b/build.go @@ -171,6 +171,7 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64 docValueOffset: 0, // docValueOffsets identified automatically by the section fieldFSTs: make(map[uint16]*vellum.FST), vecIndexCache: newVectorIndexCache(), + synIndexCache: newSynonymIndexCache(), // following fields gets populated by loadFieldsNew fieldsMap: make(map[string]uint16), dictLocs: make([]uint64, 0), diff --git a/cmd/zap/cmd/synonym.go b/cmd/zap/cmd/synonym.go new file mode 100644 index 00000000..fbc579b4 --- /dev/null +++ b/cmd/zap/cmd/synonym.go @@ -0,0 +1,140 @@ +// Copyright (c) 2024 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "bytes" + "encoding/binary" + "fmt" + + "github.com/RoaringBitmap/roaring/roaring64" + "github.com/blevesearch/vellum" + "github.com/spf13/cobra" +) + +var thesaurusCmd = &cobra.Command{ + Use: "thesaurus [path] [name]", + Short: "thesaurus prints the thesaurus with the specified name", + Long: `The thesaurus command lets you print the thesaurus with the specified name.`, + RunE: func(cmd *cobra.Command, args []string) error { + pos := segment.FieldsIndexOffset() + if pos == 0 { + // this is the case only for older file formats + return fmt.Errorf("file format not supported") + } + if len(args) < 2 { + return fmt.Errorf("must specify thesaurus name") + } + + pos, err := segment.ThesaurusAddr(args[1]) + if err != nil { + return fmt.Errorf("error determining address: %v", err) + } + fmt.Printf("thesaurus with name %s starts at %d (%x)\n", args[1], pos, pos) + + data := segment.Data() + vellumLen, read := binary.Uvarint(data[pos : pos+binary.MaxVarintLen64]) + pos += uint64(read) + fmt.Printf("vellum length: %d\n", vellumLen) + + fstBytes := data[pos : pos+vellumLen] + pos += vellumLen + fst, err := vellum.Load(fstBytes) + if err != nil { + return fmt.Errorf("thesaurus name %s vellum err: %v", args[1], err) + } + fmt.Printf("raw vellum data:\n % x\n", fstBytes) + + numSyns, n := binary.Uvarint(data[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + if numSyns == 0 { + return fmt.Errorf("no synonyms found") + } + synTermMap := make(map[uint32][]byte, numSyns) + for i := 0; i < int(numSyns); i++ { + synID, n := binary.Uvarint(data[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + termLen, n := binary.Uvarint(data[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + if termLen == 0 { + return fmt.Errorf("term length is 0") + } + term := data[pos : pos+uint64(termLen)] + pos += uint64(termLen) + synTermMap[uint32(synID)] = term + } + + fmt.Printf("termID to term mapping:\n") + fmt.Printf(" termID\tterm\n") + for k, v := range synTermMap { + fmt.Printf(" %d\t%s\n", k, string(v)) + } + fmt.Printf("thesaurus (term -> [{termID|docNum},...]):\n") + var totalTerms int + itr, err := fst.Iterator(nil, nil) + for err == nil { + var sl *roaring64.Bitmap + currTerm, currVal := itr.Current() + sl, err = readSynonymsList(currVal, data) + if err != nil { + return err + } + sitr := sl.Iterator() + printStr := fmt.Sprintf(" %s -> [", currTerm) + for sitr.HasNext() { + encodedVal := sitr.Next() + tID, docNum := decodeSynonym(encodedVal) + str := fmt.Sprintf("{%d|%d},", tID, docNum) + printStr += str + } + printStr = printStr[:len(printStr)-1] + "]" + fmt.Printf("%s\n", printStr) + totalTerms++ + err = itr.Next() + } + fmt.Printf("Total terms in thesaurus : %d\n", totalTerms) + if err != nil && err != vellum.ErrIteratorDone { + return fmt.Errorf("error iterating thesaurus: %v", err) + } + return nil + }, +} + +func readSynonymsList(postingsOffset uint64, data []byte) (*roaring64.Bitmap, error) { + var n uint64 + var read int + + var postingsLen uint64 + postingsLen, read = binary.Uvarint(data[postingsOffset : postingsOffset+binary.MaxVarintLen64]) + n += uint64(read) + + buf := bytes.NewReader(data[postingsOffset+n : postingsOffset+n+postingsLen]) + r := roaring64.NewBitmap() + + _, err := r.ReadFrom(buf) + if err != nil { + return nil, fmt.Errorf("error loading roaring bitmap: %v", err) + } + + return r, nil +} + +func decodeSynonym(synonymCode uint64) (synonymID uint32, docID uint32) { + return uint32(synonymCode >> 32), uint32(synonymCode) +} + +func init() { + RootCmd.AddCommand(thesaurusCmd) +} diff --git a/doc_test.go b/doc_test.go index 85e227b2..476a4672 100644 --- a/doc_test.go +++ b/doc_test.go @@ -176,3 +176,157 @@ func (s *stubField) NumPlainTextBytes() uint64 { func (s *stubField) Compose(field string, length int, freq index.TokenFrequencies) { } + +// ----------------------------------------------------------------------------- +type stubSynonymField struct { + name string + analyzer string + input []string + synonyms []string + + synonymMap map[string][]string +} + +func (s *stubSynonymField) Name() string { + return s.name +} + +func (s *stubSynonymField) Value() []byte { + return nil +} + +func (s *stubSynonymField) ArrayPositions() []uint64 { + return nil +} + +func (s *stubSynonymField) EncodedFieldType() byte { + return 0 +} + +func (s *stubSynonymField) Analyze() { + var analyzedInput []string + if len(s.input) > 0 { + analyzedInput = make([]string, 0, len(s.input)) + for _, term := range s.input { + analyzedInput = append(analyzedInput, analyzeStubTerm(term, s.analyzer)) + } + } + analyzedSynonyms := make([]string, 0, len(s.synonyms)) + for _, syn := range s.synonyms { + analyzedSynonyms = append(analyzedSynonyms, analyzeStubTerm(syn, s.analyzer)) + } + s.synonymMap = processSynonymData(analyzedInput, analyzedSynonyms) +} + +func (s *stubSynonymField) Options() index.FieldIndexingOptions { + return 0 +} + +func (s *stubSynonymField) AnalyzedLength() int { + return 0 +} + +func (s *stubSynonymField) AnalyzedTokenFrequencies() index.TokenFrequencies { + return nil +} + +func (s *stubSynonymField) NumPlainTextBytes() uint64 { + return 0 +} + +func (sf *stubSynonymField) IterateSynonyms(visitor func(term string, synonyms []string)) { + for term, synonyms := range sf.synonymMap { + visitor(term, synonyms) + } +} + +func processSynonymData(input []string, synonyms []string) map[string][]string { + var synonymMap map[string][]string + if len(input) > 0 { + // Map each term to the same list of synonyms. + synonymMap = make(map[string][]string, len(input)) + for _, term := range input { + synonymMap[term] = append([]string(nil), synonyms...) // Avoid sharing slices. + } + } else { + synonymMap = make(map[string][]string, len(synonyms)) + // Precompute a map where each synonym points to all other synonyms. + for i, elem := range synonyms { + synonymMap[elem] = make([]string, 0, len(synonyms)-1) + for j, otherElem := range synonyms { + if i != j { + synonymMap[elem] = append(synonymMap[elem], otherElem) + } + } + } + } + return synonymMap +} + +func analyzeStubTerm(term string, analyzer string) string { + lowerCaseTerm := strings.ToLower(term) + return lowerCaseTerm +} + +func newStubSynonymField(name string, analyzer string, input []string, synonyms []string) index.SynonymField { + return &stubSynonymField{ + name: name, + analyzer: analyzer, + input: input, + synonyms: synonyms, + } +} + +// ----------------------------------------------------------------------------- +type stubSynonymDocument struct { + id string + fields []index.Field +} + +func (s *stubSynonymDocument) ID() string { + return s.id +} + +func (s *stubSynonymDocument) Size() int { + return 0 +} + +func (s *stubSynonymDocument) VisitFields(visitor index.FieldVisitor) { + for _, f := range s.fields { + visitor(f) + } +} + +func (s *stubSynonymDocument) HasComposite() bool { + return false +} + +func (s *stubSynonymDocument) VisitComposite(visitor index.CompositeFieldVisitor) { +} + +func (s *stubSynonymDocument) NumPlainTextBytes() uint64 { + return 0 +} +func (s *stubSynonymDocument) StoredFieldsBytes() uint64 { + return 0 +} + +func (s *stubSynonymDocument) AddIDField() { + s.fields = append(s.fields, newStubFieldSplitString("_id", nil, s.id, true, false, false)) +} + +func (s *stubSynonymDocument) VisitSynonymFields(visitor index.SynonymFieldVisitor) { + for _, f := range s.fields { + if sf, ok := f.(index.SynonymField); ok { + visitor(sf) + } + } +} + +func newStubSynonymDocument(id string, synonymField index.SynonymField) index.SynonymDocument { + rv := &stubSynonymDocument{ + id: id, + fields: []index.Field{synonymField}, + } + return rv +} diff --git a/faiss_vector_posting.go b/faiss_vector_posting.go index 9c53c074..46dafbbe 100644 --- a/faiss_vector_posting.go +++ b/faiss_vector_posting.go @@ -87,7 +87,9 @@ var emptyVecPostingsIterator = &VecPostingsIterator{} var emptyVecPostingsList = &VecPostingsList{} func (vpl *VecPostingsList) Iterator(prealloc segment.VecPostingsIterator) segment.VecPostingsIterator { - + if vpl.postings == nil { + return emptyVecPostingsIterator + } // tbd: do we check the cardinality of postings and scores? var preallocPI *VecPostingsIterator pi, ok := prealloc.(*VecPostingsIterator) diff --git a/go.mod b/go.mod index 6b6438cd..8ba99cb8 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,11 @@ go 1.21 require ( github.com/RoaringBitmap/roaring v1.9.3 - github.com/blevesearch/bleve_index_api v1.1.13 + github.com/blevesearch/bleve_index_api v1.2.0 github.com/blevesearch/go-faiss v1.0.24 github.com/blevesearch/mmap-go v1.0.4 - github.com/blevesearch/scorch_segment_api/v2 v2.2.16 - github.com/blevesearch/vellum v1.0.11 + github.com/blevesearch/scorch_segment_api/v2 v2.3.0 + github.com/blevesearch/vellum v1.1.0 github.com/golang/snappy v0.0.4 github.com/spf13/cobra v1.7.0 ) diff --git a/go.sum b/go.sum index fa076a92..0c4e0622 100644 --- a/go.sum +++ b/go.sum @@ -2,16 +2,16 @@ github.com/RoaringBitmap/roaring v1.9.3 h1:t4EbC5qQwnisr5PrP9nt0IRhRTb9gMUgQF4t4 github.com/RoaringBitmap/roaring v1.9.3/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA= github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= -github.com/blevesearch/bleve_index_api v1.1.13 h1:+nrA6oRJr85aCPyqaeZtsruObwKojutfonHJin/BP48= -github.com/blevesearch/bleve_index_api v1.1.13/go.mod h1:PbcwjIcRmjhGbkS/lJCpfgVSMROV6TRubGGAODaK1W8= +github.com/blevesearch/bleve_index_api v1.2.0 h1:/DXMMWBwx/UmGKM1xDhTwDoJI5yQrG6rqRWPFcOgUVo= +github.com/blevesearch/bleve_index_api v1.2.0/go.mod h1:PbcwjIcRmjhGbkS/lJCpfgVSMROV6TRubGGAODaK1W8= github.com/blevesearch/go-faiss v1.0.24 h1:K79IvKjoKHdi7FdiXEsAhxpMuns0x4fM0BO93bW5jLI= github.com/blevesearch/go-faiss v1.0.24/go.mod h1:OMGQwOaRRYxrmeNdMrXJPvVx8gBnvE5RYrr0BahNnkk= github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc= github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs= -github.com/blevesearch/scorch_segment_api/v2 v2.2.16 h1:uGvKVvG7zvSxCwcm4/ehBa9cCEuZVE+/zvrSl57QUVY= -github.com/blevesearch/scorch_segment_api/v2 v2.2.16/go.mod h1:VF5oHVbIFTu+znY1v30GjSpT5+9YFs9dV2hjvuh34F0= -github.com/blevesearch/vellum v1.0.11 h1:SJI97toEFTtA9WsDZxkyGTaBWFdWl1n2LEDCXLCq/AU= -github.com/blevesearch/vellum v1.0.11/go.mod h1:QgwWryE8ThtNPxtgWJof5ndPfx0/YMBh+W2weHKPw8Y= +github.com/blevesearch/scorch_segment_api/v2 v2.3.0 h1:vxCjbXAkkEBSb4AB3Iqgr/EJcPyYRsiGxpcvsS8E1Dw= +github.com/blevesearch/scorch_segment_api/v2 v2.3.0/go.mod h1:5y+TgXYSx+xJGaCwSlvy9G/UJBIY5wzvIkhvhBm2ATc= +github.com/blevesearch/vellum v1.1.0 h1:CinkGyIsgVlYf8Y2LUQHvdelgXr6PYuvoDIajq6yR9w= +github.com/blevesearch/vellum v1.1.0/go.mod h1:QgwWryE8ThtNPxtgWJof5ndPfx0/YMBh+W2weHKPw8Y= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/new.go b/new.go index f0d37c43..c99b933d 100644 --- a/new.go +++ b/new.go @@ -174,23 +174,6 @@ func (s *interim) convert() (uint64, uint64, error) { s.FieldsMap = map[string]uint16{} } - args := map[string]interface{}{ - "results": s.results, - "chunkMode": s.chunkMode, - } - if s.opaque == nil { - s.opaque = map[int]resetable{} - for i, x := range segmentSections { - s.opaque[int(i)] = x.InitOpaque(args) - } - } else { - for k, v := range args { - for _, op := range s.opaque { - op.Set(k, v) - } - } - } - s.getOrDefineField("_id") // _id field is fieldID 0 for _, result := range s.results { @@ -208,6 +191,25 @@ func (s *interim) convert() (uint64, uint64, error) { s.FieldsMap[fieldName] = uint16(fieldID + 1) } + args := map[string]interface{}{ + "results": s.results, + "chunkMode": s.chunkMode, + "fieldsMap": s.FieldsMap, + "fieldsInv": s.FieldsInv, + } + if s.opaque == nil { + s.opaque = map[int]resetable{} + for i, x := range segmentSections { + s.opaque[int(i)] = x.InitOpaque(args) + } + } else { + for k, v := range args { + for _, op := range s.opaque { + op.Set(k, v) + } + } + } + s.processDocuments() storedIndexOffset, err := s.writeStoredFields() diff --git a/section.go b/section.go index 1ace25e3..13d8f57b 100644 --- a/section.go +++ b/section.go @@ -58,6 +58,7 @@ type resetable interface { const ( SectionInvertedTextIndex = iota SectionFaissVectorIndex + SectionSynonymIndex ) // ----------------------------------------------------------------------------- diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 1c9f91a0..6242d2b4 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -36,10 +36,10 @@ const defaultFaissOMPThreads = 1 func init() { rand.Seed(time.Now().UTC().UnixNano()) registerSegmentSection(SectionFaissVectorIndex, &faissVectorIndexSection{}) - isFieldNotApplicableToInvertedTextSection = func(field index.Field) bool { + invertedTextIndexSectionExclusionChecks = append(invertedTextIndexSectionExclusionChecks, func(field index.Field) bool { _, ok := field.(index.VectorField) return ok - } + }) faiss.SetOMPThreads(defaultFaissOMPThreads) } diff --git a/section_inverted_text_index.go b/section_inverted_text_index.go index ea8722e4..db7b1a91 100644 --- a/section_inverted_text_index.go +++ b/section_inverted_text_index.go @@ -34,16 +34,35 @@ func init() { type invertedTextIndexSection struct { } -// this function is something that tells the inverted index section whether to -// process a particular field or not - since it might be processed by another -// section this function helps in avoiding unnecessary work. -// (only used by faiss vector section currently, will need a separate API for every -// section we introduce in the future or a better way forward - TODO) -var isFieldNotApplicableToInvertedTextSection func(field index.Field) bool +// This function checks whether the inverted text index section should avoid processing +// a particular field, preventing unnecessary work if another section will handle it. +// +// NOTE: The exclusion check is applicable only to the InvertedTextIndexSection +// because it serves as a catch-all section. This section processes every field +// unless explicitly excluded, similar to a "default" case in a switch statement. +// Other sections, such as VectorSection and SynonymSection, rely on inclusion +// checks to process only specific field types (e.g., index.VectorField or +// index.SynonymField). Any new section added in the future must define its +// special field type and inclusion logic explicitly. +var isFieldExcludedFromInvertedTextIndexSection = func(field index.Field) bool { + for _, excludeField := range invertedTextIndexSectionExclusionChecks { + if excludeField(field) { + // atleast one section has agreed to exclude this field + // from inverted text index section processing and has + // agreed to process it independently + return true + } + } + // no section has excluded this field from inverted index processing + // so it should be processed by the inverted index section + return false +} + +// List of checks to determine if a field is excluded from the inverted text index section +var invertedTextIndexSectionExclusionChecks = make([]func(field index.Field) bool, 0) func (i *invertedTextIndexSection) Process(opaque map[int]resetable, docNum uint32, field index.Field, fieldID uint16) { - if isFieldNotApplicableToInvertedTextSection == nil || - !isFieldNotApplicableToInvertedTextSection(field) { + if !isFieldExcludedFromInvertedTextIndexSection(field) { invIndexOpaque := i.getInvertedIndexOpaque(opaque) invIndexOpaque.process(field, fieldID, docNum) } diff --git a/section_synonym_index.go b/section_synonym_index.go new file mode 100644 index 00000000..9bfd804d --- /dev/null +++ b/section_synonym_index.go @@ -0,0 +1,786 @@ +// Copyright (c) 2024 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "bytes" + "encoding/binary" + "fmt" + "math" + "sort" + + "github.com/RoaringBitmap/roaring" + "github.com/RoaringBitmap/roaring/roaring64" + index "github.com/blevesearch/bleve_index_api" + seg "github.com/blevesearch/scorch_segment_api/v2" + "github.com/blevesearch/vellum" +) + +func init() { + registerSegmentSection(SectionSynonymIndex, &synonymIndexSection{}) + invertedTextIndexSectionExclusionChecks = append(invertedTextIndexSectionExclusionChecks, func(field index.Field) bool { + _, ok := field.(index.SynonymField) + return ok + }) +} + +// ----------------------------------------------------------------------------- + +type synonymIndexOpaque struct { + results []index.Document + + // indicates whether the following structs are initialized + init bool + + // FieldsMap maps field name to field id and must be set in + // the index opaque using the key "fieldsMap" + // used for ensuring accurate mapping between fieldID and + // thesaurusID + // name -> field id + 1 + FieldsMap map[string]uint16 + + // ThesaurusMap adds 1 to thesaurus id to avoid zero value issues + // name -> thesaurus id + 1 + ThesaurusMap map[string]uint16 + + // ThesaurusMapInv is the inverse of ThesaurusMap + // thesaurus id + 1 -> name + ThesaurusInv []string + + // Thesaurus for each thesaurus ID + // thesaurus id -> LHS term -> synonym postings list id + 1 + Thesauri []map[string]uint64 + + // LHS Terms for each thesaurus ID, where terms are sorted ascending + // thesaurus id -> []term + ThesaurusKeys [][]string + + // FieldIDtoThesaurusID maps the field id to the thesaurus id + // field id -> thesaurus id + FieldIDtoThesaurusID map[uint16]int + + // SynonymIDtoTerm maps synonym id to term for each thesaurus + // thesaurus id -> synonym id -> term + SynonymTermToID []map[string]uint32 + + // SynonymTermToID maps term to synonym id for each thesaurus + // thesaurus id -> term -> synonym id + // this is the inverse of SynonymIDtoTerm for each thesaurus + SynonymIDtoTerm []map[uint32]string + + // synonym postings list -> synonym bitmap + Synonyms []*roaring64.Bitmap + + // A reusable vellum FST builder that will be stored in the synonym opaque + // and reused across multiple document batches during the persist phase + // of the synonym index section, the FST builder is used to build the + // FST for each thesaurus, which maps terms to their corresponding synonym bitmaps. + builder *vellum.Builder + + // A reusable buffer for the vellum FST builder. It streams data written + // into the builder into a byte slice. The final byte slice represents + // the serialized vellum FST, which will be written to disk + builderBuf bytes.Buffer + + // A reusable buffer for temporary use within the synonym index opaque + tmp0 []byte + + // A map linking thesaurus IDs to their corresponding thesaurus' file offsets + thesaurusAddrs map[int]int +} + +// Set the fieldsMap and results in the synonym index opaque before the section processes a synonym field. +func (so *synonymIndexOpaque) Set(key string, value interface{}) { + switch key { + case "results": + so.results = value.([]index.Document) + case "fieldsMap": + so.FieldsMap = value.(map[string]uint16) + } +} + +// Reset the synonym index opaque after a batch of documents have been processed into a segment. +func (so *synonymIndexOpaque) Reset() (err error) { + // cleanup stuff over here + so.results = nil + so.init = false + so.ThesaurusMap = nil + so.ThesaurusInv = so.ThesaurusInv[:0] + for i := range so.Thesauri { + so.Thesauri[i] = nil + } + so.Thesauri = so.Thesauri[:0] + for i := range so.ThesaurusKeys { + so.ThesaurusKeys[i] = so.ThesaurusKeys[i][:0] + } + so.ThesaurusKeys = so.ThesaurusKeys[:0] + for _, idn := range so.Synonyms { + idn.Clear() + } + so.Synonyms = so.Synonyms[:0] + so.builderBuf.Reset() + if so.builder != nil { + err = so.builder.Reset(&so.builderBuf) + } + so.FieldIDtoThesaurusID = nil + so.SynonymTermToID = so.SynonymTermToID[:0] + so.SynonymIDtoTerm = so.SynonymIDtoTerm[:0] + + so.tmp0 = so.tmp0[:0] + return err +} + +func (so *synonymIndexOpaque) process(field index.SynonymField, fieldID uint16, docNum uint32) { + // if this is the first time we are processing a synonym field in this batch + // we need to allocate memory for the thesauri and related data structures + if !so.init { + so.realloc() + so.init = true + } + + // get the thesaurus id for this field + tid := so.FieldIDtoThesaurusID[fieldID] + + // get the thesaurus for this field + thesaurus := so.Thesauri[tid] + + termSynMap := so.SynonymTermToID[tid] + + field.IterateSynonyms(func(term string, synonyms []string) { + pid := thesaurus[term] - 1 + + bs := so.Synonyms[pid] + + for _, syn := range synonyms { + code := encodeSynonym(termSynMap[syn], docNum) + bs.Add(code) + } + }) +} + +// a one-time call to allocate memory for the thesauri and synonyms which takes +// all the documents in the result batch and the fieldsMap and predetermines the +// size of the data structures in the synonymIndexOpaque +func (so *synonymIndexOpaque) realloc() { + var pidNext int + var sidNext uint32 + so.ThesaurusMap = map[string]uint16{} + so.FieldIDtoThesaurusID = map[uint16]int{} + + // count the number of unique thesauri from the batch of documents + for _, result := range so.results { + if synDoc, ok := result.(index.SynonymDocument); ok { + synDoc.VisitSynonymFields(func(synField index.SynonymField) { + fieldIDPlus1 := so.FieldsMap[synField.Name()] + so.getOrDefineThesaurus(fieldIDPlus1-1, synField.Name()) + }) + } + } + + for _, result := range so.results { + if synDoc, ok := result.(index.SynonymDocument); ok { + synDoc.VisitSynonymFields(func(synField index.SynonymField) { + fieldIDPlus1 := so.FieldsMap[synField.Name()] + thesaurusID := so.getOrDefineThesaurus(fieldIDPlus1-1, synField.Name()) + + thesaurus := so.Thesauri[thesaurusID] + thesaurusKeys := so.ThesaurusKeys[thesaurusID] + + synTermMap := so.SynonymIDtoTerm[thesaurusID] + + termSynMap := so.SynonymTermToID[thesaurusID] + + // iterate over all the term-synonyms pair from the field + synField.IterateSynonyms(func(term string, synonyms []string) { + _, exists := thesaurus[term] + if !exists { + pidNext++ + pidPlus1 := uint64(pidNext) + + thesaurus[term] = pidPlus1 + thesaurusKeys = append(thesaurusKeys, term) + } + for _, syn := range synonyms { + _, exists := termSynMap[syn] + if !exists { + termSynMap[syn] = sidNext + synTermMap[sidNext] = syn + sidNext++ + } + } + }) + so.ThesaurusKeys[thesaurusID] = thesaurusKeys + }) + } + } + + numSynonymsLists := pidNext + + if cap(so.Synonyms) >= numSynonymsLists { + so.Synonyms = so.Synonyms[:numSynonymsLists] + } else { + synonyms := make([]*roaring64.Bitmap, numSynonymsLists) + copy(synonyms, so.Synonyms[:cap(so.Synonyms)]) + for i := 0; i < numSynonymsLists; i++ { + if synonyms[i] == nil { + synonyms[i] = roaring64.New() + } + } + so.Synonyms = synonyms + } + + for _, thes := range so.ThesaurusKeys { + sort.Strings(thes) + } +} + +// getOrDefineThesaurus returns the thesaurus id for the given field id and thesaurus name. +func (so *synonymIndexOpaque) getOrDefineThesaurus(fieldID uint16, thesaurusName string) int { + thesaurusIDPlus1, exists := so.ThesaurusMap[thesaurusName] + if !exists { + // need to create a new thesaurusID for this thesaurusName and + thesaurusIDPlus1 = uint16(len(so.ThesaurusInv) + 1) + so.ThesaurusMap[thesaurusName] = thesaurusIDPlus1 + so.ThesaurusInv = append(so.ThesaurusInv, thesaurusName) + + so.Thesauri = append(so.Thesauri, make(map[string]uint64)) + + so.SynonymIDtoTerm = append(so.SynonymIDtoTerm, make(map[uint32]string)) + + so.SynonymTermToID = append(so.SynonymTermToID, make(map[string]uint32)) + + // map the fieldID to the thesaurusID + so.FieldIDtoThesaurusID[fieldID] = int(thesaurusIDPlus1 - 1) + + n := len(so.ThesaurusKeys) + if n < cap(so.ThesaurusKeys) { + so.ThesaurusKeys = so.ThesaurusKeys[:n+1] + so.ThesaurusKeys[n] = so.ThesaurusKeys[n][:0] + } else { + so.ThesaurusKeys = append(so.ThesaurusKeys, []string(nil)) + } + } + + return int(thesaurusIDPlus1 - 1) +} + +// grabBuf returns a reusable buffer of the given size from the synonymIndexOpaque. +func (so *synonymIndexOpaque) grabBuf(size int) []byte { + buf := so.tmp0 + if cap(buf) < size { + buf = make([]byte, size) + so.tmp0 = buf + } + return buf[:size] +} + +func (so *synonymIndexOpaque) writeThesauri(w *CountHashWriter) (thesOffsets []uint64, err error) { + + if so.results == nil || len(so.results) == 0 { + return nil, nil + } + + thesOffsets = make([]uint64, len(so.ThesaurusInv)) + + buf := so.grabBuf(binary.MaxVarintLen64) + + if so.builder == nil { + so.builder, err = vellum.New(&so.builderBuf, nil) + if err != nil { + return nil, err + } + } + + for thesaurusID, terms := range so.ThesaurusKeys { + thes := so.Thesauri[thesaurusID] + for _, term := range terms { // terms are already sorted + pid := thes[term] - 1 + postingsBS := so.Synonyms[pid] + postingsOffset, err := writeSynonyms(postingsBS, w, buf) + if err != nil { + return nil, err + } + + if postingsOffset > uint64(0) { + err = so.builder.Insert([]byte(term), postingsOffset) + if err != nil { + return nil, err + } + } + } + + err = so.builder.Close() + if err != nil { + return nil, err + } + + thesOffsets[thesaurusID] = uint64(w.Count()) + + vellumData := so.builderBuf.Bytes() + + // write out the length of the vellum data + n := binary.PutUvarint(buf, uint64(len(vellumData))) + _, err = w.Write(buf[:n]) + if err != nil { + return nil, err + } + + // write this vellum to disk + _, err = w.Write(vellumData) + if err != nil { + return nil, err + } + + // reset vellum for reuse + so.builderBuf.Reset() + + err = so.builder.Reset(&so.builderBuf) + if err != nil { + return nil, err + } + + // write out the synTermMap for this thesaurus + err := writeSynTermMap(so.SynonymIDtoTerm[thesaurusID], w, buf) + if err != nil { + return nil, err + } + + thesaurusStart := w.Count() + + n = binary.PutUvarint(buf, fieldNotUninverted) + _, err = w.Write(buf[:n]) + if err != nil { + return nil, err + } + + n = binary.PutUvarint(buf, fieldNotUninverted) + _, err = w.Write(buf[:n]) + if err != nil { + return nil, err + } + + n = binary.PutUvarint(buf, thesOffsets[thesaurusID]) + _, err = w.Write(buf[:n]) + if err != nil { + return nil, err + } + so.thesaurusAddrs[thesaurusID] = thesaurusStart + } + return thesOffsets, nil +} + +// ----------------------------------------------------------------------------- + +type synonymIndexSection struct { +} + +func (s *synonymIndexSection) getSynonymIndexOpaque(opaque map[int]resetable) *synonymIndexOpaque { + if _, ok := opaque[SectionSynonymIndex]; !ok { + opaque[SectionSynonymIndex] = s.InitOpaque(nil) + } + return opaque[SectionSynonymIndex].(*synonymIndexOpaque) +} + +// Implementations of the Section interface for the synonym index section. +// InitOpaque initializes the synonym index opaque, which sets the FieldsMap and +// results in the opaque before the section processes a synonym field. +func (s *synonymIndexSection) InitOpaque(args map[string]interface{}) resetable { + rv := &synonymIndexOpaque{ + thesaurusAddrs: map[int]int{}, + } + for k, v := range args { + rv.Set(k, v) + } + + return rv +} + +// Process processes a synonym field by adding the synonyms to the thesaurus +// pointed to by the fieldID, implements the Process API for the synonym index section. +func (s *synonymIndexSection) Process(opaque map[int]resetable, docNum uint32, field index.Field, fieldID uint16) { + if fieldID == math.MaxUint16 { + return + } + if sf, ok := field.(index.SynonymField); ok { + so := s.getSynonymIndexOpaque(opaque) + so.process(sf, fieldID, docNum) + } +} + +// Persist serializes and writes the thesauri processed to the writer, along +// with the synonym postings lists, and the synonym term map. Implements the +// Persist API for the synonym index section. +func (s *synonymIndexSection) Persist(opaque map[int]resetable, w *CountHashWriter) (n int64, err error) { + synIndexOpaque := s.getSynonymIndexOpaque(opaque) + _, err = synIndexOpaque.writeThesauri(w) + return 0, err +} + +// AddrForField returns the file offset of the thesaurus for the given fieldID, +// it uses the FieldIDtoThesaurusID map to translate the fieldID to the thesaurusID, +// and returns the corresponding thesaurus offset from the thesaurusAddrs map. +// Implements the AddrForField API for the synonym index section. +func (s *synonymIndexSection) AddrForField(opaque map[int]resetable, fieldID int) int { + synIndexOpaque := s.getSynonymIndexOpaque(opaque) + if synIndexOpaque == nil || synIndexOpaque.FieldIDtoThesaurusID == nil { + return 0 + } + tid, exists := synIndexOpaque.FieldIDtoThesaurusID[uint16(fieldID)] + if !exists { + return 0 + } + return synIndexOpaque.thesaurusAddrs[tid] +} + +// Merge merges the thesauri, synonym postings lists and synonym term maps from +// the segments into a single thesaurus and serializes and writes the merged +// thesaurus and associated data to the writer. Implements the Merge API for the +// synonym index section. +func (s *synonymIndexSection) Merge(opaque map[int]resetable, segments []*SegmentBase, + drops []*roaring.Bitmap, fieldsInv []string, newDocNumsIn [][]uint64, + w *CountHashWriter, closeCh chan struct{}) error { + so := s.getSynonymIndexOpaque(opaque) + thesaurusAddrs, fieldIDtoThesaurusID, err := mergeAndPersistSynonymSection(segments, drops, fieldsInv, newDocNumsIn, w, closeCh) + if err != nil { + return err + } + + so.thesaurusAddrs = thesaurusAddrs + so.FieldIDtoThesaurusID = fieldIDtoThesaurusID + return nil +} + +// ----------------------------------------------------------------------------- + +// encodeSynonym encodes a synonymID and a docID into a single uint64 value. +// The encoding format splits the 64 bits as follows: +// +// 63 32 31 0 +// +-----------+----------+ +// | synonymID | docNum | +// +-----------+----------+ +// +// The upper 32 bits (63-32) store the synonymID, and the lower 32 bits (31-0) store the docID. +// +// Parameters: +// +// synonymID - A 32-bit unsigned integer representing the ID of the synonym. +// docID - A 32-bit unsigned integer representing the document ID. +// +// Returns: +// +// A 64-bit unsigned integer that combines the synonymID and docID. +func encodeSynonym(synonymID uint32, docID uint32) uint64 { + return uint64(synonymID)<<32 | uint64(docID) +} + +// writeSynonyms serilizes and writes the synonym postings list to the writer, by first +// serializing the postings list to a byte slice and then writing the length +// of the byte slice followed by the byte slice itself. +func writeSynonyms(postings *roaring64.Bitmap, w *CountHashWriter, bufMaxVarintLen64 []byte) ( + offset uint64, err error) { + termCardinality := postings.GetCardinality() + if termCardinality <= 0 { + return 0, nil + } + + postingsOffset := uint64(w.Count()) + + buf, err := postings.ToBytes() + if err != nil { + return 0, err + } + // write out the length + n := binary.PutUvarint(bufMaxVarintLen64, uint64(len(buf))) + _, err = w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return 0, err + } + // write out the roaring bytes + _, err = w.Write(buf) + if err != nil { + return 0, err + } + + return postingsOffset, nil +} + +// writeSynTermMap serializes and writes the synonym term map to the writer, by first +// writing the length of the map followed by the map entries, where each entry +// consists of the synonym ID, the length of the term, and the term itself. +func writeSynTermMap(synTermMap map[uint32]string, w *CountHashWriter, bufMaxVarintLen64 []byte) error { + if len(synTermMap) == 0 { + return nil + } + n := binary.PutUvarint(bufMaxVarintLen64, uint64(len(synTermMap))) + _, err := w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return err + } + + for sid, term := range synTermMap { + n = binary.PutUvarint(bufMaxVarintLen64, uint64(sid)) + _, err = w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return err + } + + n = binary.PutUvarint(bufMaxVarintLen64, uint64(len(term))) + _, err = w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return err + } + + _, err = w.Write([]byte(term)) + if err != nil { + return err + } + } + + return nil +} + +func mergeAndPersistSynonymSection(segments []*SegmentBase, dropsIn []*roaring.Bitmap, + fieldsInv []string, newDocNumsIn [][]uint64, w *CountHashWriter, + closeCh chan struct{}) (map[int]int, map[uint16]int, error) { + + var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64) + + var synonyms *SynonymsList + var synItr *SynonymsIterator + + thesaurusAddrs := make(map[int]int) + + var vellumBuf bytes.Buffer + newVellum, err := vellum.New(&vellumBuf, nil) + if err != nil { + return nil, nil, err + } + + newRoaring := roaring64.NewBitmap() + + newDocNums := make([][]uint64, 0, len(segments)) + + drops := make([]*roaring.Bitmap, 0, len(segments)) + + thesauri := make([]*Thesaurus, 0, len(segments)) + + itrs := make([]vellum.Iterator, 0, len(segments)) + + fieldIDtoThesaurusID := make(map[uint16]int) + + var thesaurusID int + var newSynonymID uint32 + + // for each field + for fieldID, fieldName := range fieldsInv { + // collect FST iterators from all active segments for this field + newDocNums = newDocNums[:0] + drops = drops[:0] + thesauri = thesauri[:0] + itrs = itrs[:0] + newSynonymID = 0 + synTermMap := make(map[uint32]string) + termSynMap := make(map[string]uint32) + + for segmentI, segment := range segments { + // check for the closure in meantime + if isClosed(closeCh) { + return nil, nil, seg.ErrClosed + } + + thes, err2 := segment.thesaurus(fieldName) + if err2 != nil { + return nil, nil, err2 + } + if thes != nil && thes.fst != nil { + itr, err2 := thes.fst.Iterator(nil, nil) + if err2 != nil && err2 != vellum.ErrIteratorDone { + return nil, nil, err2 + } + if itr != nil { + newDocNums = append(newDocNums, newDocNumsIn[segmentI]) + if dropsIn[segmentI] != nil && !dropsIn[segmentI].IsEmpty() { + drops = append(drops, dropsIn[segmentI]) + } else { + drops = append(drops, nil) + } + thesauri = append(thesauri, thes) + itrs = append(itrs, itr) + } + } + } + + // if no iterators, skip this field + if len(itrs) == 0 { + continue + } + + var prevTerm []byte + + newRoaring.Clear() + + finishTerm := func(term []byte) error { + postingsOffset, err := writeSynonyms(newRoaring, w, bufMaxVarintLen64) + if err != nil { + return err + } + if postingsOffset > 0 { + err = newVellum.Insert(term, postingsOffset) + if err != nil { + return err + } + } + newRoaring.Clear() + return nil + } + + enumerator, err := newEnumerator(itrs) + + for err == nil { + term, itrI, postingsOffset := enumerator.Current() + + if prevTerm != nil && !bytes.Equal(prevTerm, term) { + // check for the closure in meantime + if isClosed(closeCh) { + return nil, nil, seg.ErrClosed + } + + // if the term changed, write out the info collected + // for the previous term + err = finishTerm(prevTerm) + if err != nil { + return nil, nil, err + } + } + + synonyms, err = thesauri[itrI].synonymsListFromOffset( + postingsOffset, drops[itrI], synonyms) + if err != nil { + return nil, nil, err + } + synItr = synonyms.iterator(synItr) + + var next seg.Synonym + next, err = synItr.Next() + for next != nil && err == nil { + synNewDocNum := newDocNums[itrI][next.Number()] + if synNewDocNum == docDropped { + return nil, nil, fmt.Errorf("see hit with dropped docNum") + } + nextTerm := next.Term() + var synNewID uint32 + if synID, ok := termSynMap[nextTerm]; ok { + synNewID = synID + } else { + synNewID = newSynonymID + termSynMap[nextTerm] = newSynonymID + synTermMap[newSynonymID] = nextTerm + newSynonymID++ + } + synNewCode := encodeSynonym(synNewID, uint32(synNewDocNum)) + newRoaring.Add(synNewCode) + next, err = synItr.Next() + } + if err != nil { + return nil, nil, err + } + + prevTerm = prevTerm[:0] // copy to prevTerm in case Next() reuses term mem + prevTerm = append(prevTerm, term...) + err = enumerator.Next() + } + if err != vellum.ErrIteratorDone { + return nil, nil, err + } + // close the enumerator to free the underlying iterators + err = enumerator.Close() + if err != nil { + return nil, nil, err + } + + if prevTerm != nil { + err = finishTerm(prevTerm) + if err != nil { + return nil, nil, err + } + } + + err = newVellum.Close() + if err != nil { + return nil, nil, err + } + vellumData := vellumBuf.Bytes() + + thesOffset := uint64(w.Count()) + + // write out the length of the vellum data + n := binary.PutUvarint(bufMaxVarintLen64, uint64(len(vellumData))) + _, err = w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return nil, nil, err + } + + // write this vellum to disk + _, err = w.Write(vellumData) + if err != nil { + return nil, nil, err + } + + // reset vellum buffer and vellum builder + vellumBuf.Reset() + err = newVellum.Reset(&vellumBuf) + if err != nil { + return nil, nil, err + } + + // write out the synTermMap for this thesaurus + err = writeSynTermMap(synTermMap, w, bufMaxVarintLen64) + if err != nil { + return nil, nil, err + } + + thesStart := w.Count() + + // the synonym index section does not have any doc value data + // so we write two special entries to indicate that + // the field is not uninverted and the thesaurus offset + n = binary.PutUvarint(bufMaxVarintLen64, fieldNotUninverted) + _, err = w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return nil, nil, err + } + + n = binary.PutUvarint(bufMaxVarintLen64, fieldNotUninverted) + _, err = w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return nil, nil, err + } + + // write out the thesaurus offset from which the vellum data starts + n = binary.PutUvarint(bufMaxVarintLen64, thesOffset) + _, err = w.Write(bufMaxVarintLen64[:n]) + if err != nil { + return nil, nil, err + } + + // if we have a new thesaurus, add it to the thesaurus map + fieldIDtoThesaurusID[uint16(fieldID)] = thesaurusID + thesaurusAddrs[thesaurusID] = thesStart + thesaurusID++ + } + + return thesaurusAddrs, fieldIDtoThesaurusID, nil +} diff --git a/segment.go b/segment.go index 8780ead1..41abde25 100644 --- a/segment.go +++ b/segment.go @@ -56,6 +56,7 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) { fieldsMap: make(map[string]uint16), fieldFSTs: make(map[uint16]*vellum.FST), vecIndexCache: newVectorIndexCache(), + synIndexCache: newSynonymIndexCache(), fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)), }, f: f, @@ -113,6 +114,7 @@ type SegmentBase struct { // this cache comes into play when vectors are supported in builds. vecIndexCache *vectorIndexCache + synIndexCache *synonymIndexCache } func (sb *SegmentBase) Size() int { @@ -149,7 +151,11 @@ func (sb *SegmentBase) updateSize() { func (sb *SegmentBase) AddRef() {} func (sb *SegmentBase) DecRef() (err error) { return nil } -func (sb *SegmentBase) Close() (err error) { sb.vecIndexCache.Clear(); return nil } +func (sb *SegmentBase) Close() (err error) { + sb.vecIndexCache.Clear() + sb.synIndexCache.Clear() + return nil +} // Segment implements a persisted segment.Segment interface, by // embedding an mmap()'ed SegmentBase. @@ -472,6 +478,48 @@ func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) { return rv, nil } +// Thesaurus returns the thesaurus with the specified name, or an empty thesaurus if not found. +func (s *SegmentBase) Thesaurus(name string) (segment.Thesaurus, error) { + thesaurus, err := s.thesaurus(name) + if err == nil && thesaurus == nil { + return emptyThesaurus, nil + } + return thesaurus, err +} + +func (sb *SegmentBase) thesaurus(name string) (rv *Thesaurus, err error) { + fieldIDPlus1 := sb.fieldsMap[name] + if fieldIDPlus1 == 0 { + return nil, nil + } + pos := sb.fieldsSectionsMap[fieldIDPlus1-1][SectionSynonymIndex] + if pos > 0 { + rv = &Thesaurus{ + sb: sb, + name: name, + fieldID: fieldIDPlus1 - 1, + } + // skip the doc value offsets as doc values are not supported in thesaurus + for i := 0; i < 2; i++ { + _, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + } + thesLoc, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + fst, synTermMap, err := sb.synIndexCache.loadOrCreate(rv.fieldID, sb.mem[thesLoc:]) + if err != nil { + return nil, fmt.Errorf("thesaurus name %s err: %v", name, err) + } + rv.fst = fst + rv.synIDTermMap = synTermMap + rv.fstReader, err = rv.fst.Reader() + if err != nil { + return nil, fmt.Errorf("thesaurus name %s vellum reader err: %v", name, err) + } + } + return rv, nil +} + // visitDocumentCtx holds data structures that are reusable across // multiple VisitDocument() calls to avoid memory allocations type visitDocumentCtx struct { @@ -648,8 +696,9 @@ func (s *Segment) Close() (err error) { } func (s *Segment) closeActual() (err error) { - // clear contents from the vector index cache before un-mmapping + // clear contents from the vector and synonym index cache before un-mmapping s.vecIndexCache.Clear() + s.synIndexCache.Clear() if s.mm != nil { err = s.mm.Unmap() @@ -719,6 +768,25 @@ func (s *Segment) DictAddr(field string) (uint64, error) { return s.dictLocs[fieldIDPlus1-1], nil } +// ThesaurusAddr is a helper function to compute the file offset where the +// thesaurus is stored with the specified name. +func (s *Segment) ThesaurusAddr(name string) (uint64, error) { + fieldIDPlus1, ok := s.fieldsMap[name] + if !ok { + return 0, fmt.Errorf("no such thesaurus '%s'", name) + } + thesaurusStart := s.fieldsSectionsMap[fieldIDPlus1-1][SectionSynonymIndex] + if thesaurusStart == 0 { + return 0, fmt.Errorf("no such thesaurus '%s'", name) + } + for i := 0; i < 2; i++ { + _, n := binary.Uvarint(s.mem[thesaurusStart : thesaurusStart+binary.MaxVarintLen64]) + thesaurusStart += uint64(n) + } + thesLoc, _ := binary.Uvarint(s.mem[thesaurusStart : thesaurusStart+binary.MaxVarintLen64]) + return thesLoc, nil +} + func (s *Segment) getSectionDvOffsets(fieldID int, secID uint16) (uint64, uint64, uint64, error) { // Version is gonna be 16 var fieldLocStart uint64 = fieldNotUninverted diff --git a/synonym_cache.go b/synonym_cache.go new file mode 100644 index 00000000..0b8d56c2 --- /dev/null +++ b/synonym_cache.go @@ -0,0 +1,126 @@ +// Copyright (c) 2024 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "encoding/binary" + "fmt" + "sync" + + "github.com/blevesearch/vellum" +) + +func newSynonymIndexCache() *synonymIndexCache { + return &synonymIndexCache{ + cache: make(map[uint16]*synonymCacheEntry), + } +} + +type synonymIndexCache struct { + m sync.RWMutex + + cache map[uint16]*synonymCacheEntry +} + +// Clear clears the synonym cache which would mean tha the termID to term map would no longer be available. +func (sc *synonymIndexCache) Clear() { + sc.m.Lock() + sc.cache = nil + sc.m.Unlock() +} + +// loadOrCreate loads the synonym index cache for the specified fieldID if it is already present, +// or creates it if not. The synonym index cache for a fieldID consists of a tuple: +// - A Vellum FST (Finite State Transducer) representing the thesaurus. +// - A map associating synonym IDs to their corresponding terms. +// This function returns the loaded or newly created tuple (FST and map). +func (sc *synonymIndexCache) loadOrCreate(fieldID uint16, mem []byte) (*vellum.FST, map[uint32][]byte, error) { + sc.m.RLock() + entry, ok := sc.cache[fieldID] + if ok { + sc.m.RUnlock() + return entry.load() + } + + sc.m.RUnlock() + + sc.m.Lock() + defer sc.m.Unlock() + + entry, ok = sc.cache[fieldID] + if ok { + return entry.load() + } + + return sc.createAndCacheLOCKED(fieldID, mem) +} + +// createAndCacheLOCKED creates the synonym index cache for the specified fieldID and caches it. +func (sc *synonymIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte) (*vellum.FST, map[uint32][]byte, error) { + var pos uint64 + vellumLen, read := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) + if vellumLen == 0 || read <= 0 { + return nil, nil, fmt.Errorf("vellum length is 0") + } + pos += uint64(read) + fstBytes := mem[pos : pos+vellumLen] + fst, err := vellum.Load(fstBytes) + if err != nil { + return nil, nil, fmt.Errorf("vellum err: %v", err) + } + pos += vellumLen + numSyns, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + if numSyns == 0 { + return nil, nil, fmt.Errorf("no synonyms found") + } + synTermMap := make(map[uint32][]byte, numSyns) + for i := 0; i < int(numSyns); i++ { + synID, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + termLen, n := binary.Uvarint(mem[pos : pos+binary.MaxVarintLen64]) + pos += uint64(n) + if termLen == 0 { + return nil, nil, fmt.Errorf("term length is 0") + } + term := mem[pos : pos+uint64(termLen)] + pos += uint64(termLen) + synTermMap[uint32(synID)] = term + } + sc.insertLOCKED(fieldID, fst, synTermMap) + return fst, synTermMap, nil +} + +// insertLOCKED inserts the vellum FST and the map of synonymID to term into the cache for the specified fieldID. +func (sc *synonymIndexCache) insertLOCKED(fieldID uint16, fst *vellum.FST, synTermMap map[uint32][]byte) { + _, ok := sc.cache[fieldID] + if !ok { + sc.cache[fieldID] = &synonymCacheEntry{ + fst: fst, + synTermMap: synTermMap, + } + } +} + +// synonymCacheEntry is a tuple of the vellum FST and the map of synonymID to term, +// and is the value stored in the synonym cache, for a given fieldID. +type synonymCacheEntry struct { + fst *vellum.FST + synTermMap map[uint32][]byte +} + +func (ce *synonymCacheEntry) load() (*vellum.FST, map[uint32][]byte, error) { + return ce.fst, ce.synTermMap, nil +} diff --git a/synonym_posting.go b/synonym_posting.go new file mode 100644 index 00000000..a9e5755b --- /dev/null +++ b/synonym_posting.go @@ -0,0 +1,239 @@ +// Copyright (c) 2024 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "bytes" + "encoding/binary" + "fmt" + "reflect" + + "github.com/RoaringBitmap/roaring" + "github.com/RoaringBitmap/roaring/roaring64" + segment "github.com/blevesearch/scorch_segment_api/v2" +) + +var reflectStaticSizeSynonymsList int +var reflectStaticSizeSynonymsIterator int +var reflectStaticSizeSynonym int + +func init() { + var sl SynonymsList + reflectStaticSizeSynonymsList = int(reflect.TypeOf(sl).Size()) + var si SynonymsIterator + reflectStaticSizeSynonymsIterator = int(reflect.TypeOf(si).Size()) + var s Synonym + reflectStaticSizeSynonym = int(reflect.TypeOf(s).Size()) +} + +// SynonymsList represents a list of synonyms for a term, stored in a Roaring64 bitmap. +type SynonymsList struct { + sb *SegmentBase + synonymsOffset uint64 + synonyms *roaring64.Bitmap + except *roaring.Bitmap + + synIDTermMap map[uint32][]byte + + buffer *bytes.Reader +} + +// immutable, empty synonyms list +var emptySynonymsList = &SynonymsList{} + +func (p *SynonymsList) Size() int { + sizeInBytes := reflectStaticSizeSynonymsList + SizeOfPtr + + if p.except != nil { + sizeInBytes += int(p.except.GetSizeInBytes()) + } + + return sizeInBytes +} + +// Iterator creates and returns a SynonymsIterator for the SynonymsList. +// If the synonyms bitmap is nil, it returns an empty iterator. +func (s *SynonymsList) Iterator(prealloc segment.SynonymsIterator) segment.SynonymsIterator { + if s.synonyms == nil { + return emptySynonymsIterator + } + + var preallocSI *SynonymsIterator + pi, ok := prealloc.(*SynonymsIterator) + if ok && pi != nil { + preallocSI = pi + } + if preallocSI == emptySynonymsIterator { + preallocSI = nil + } + + return s.iterator(preallocSI) +} + +// iterator initializes a SynonymsIterator for the SynonymsList and returns it. +// If a preallocated iterator is provided, it resets and reuses it; otherwise, it creates a new one. +func (s *SynonymsList) iterator(rv *SynonymsIterator) *SynonymsIterator { + if rv == nil { + rv = &SynonymsIterator{} + } else { + *rv = SynonymsIterator{} // clear the struct + } + rv.synonyms = s + rv.except = s.except + rv.Actual = s.synonyms.Iterator() + rv.ActualBM = s.synonyms + rv.synIDTermMap = s.synIDTermMap + return rv +} + +// read initializes a SynonymsList by reading data from the given synonymsOffset in the Thesaurus. +// It reads and parses the Roaring64 bitmap that represents the synonyms. +func (rv *SynonymsList) read(synonymsOffset uint64, t *Thesaurus) error { + rv.synonymsOffset = synonymsOffset + + var n uint64 + var read int + + var synonymsLen uint64 + synonymsLen, read = binary.Uvarint(t.sb.mem[synonymsOffset+n : synonymsOffset+n+binary.MaxVarintLen64]) + n += uint64(read) + + roaringBytes := t.sb.mem[synonymsOffset+n : synonymsOffset+n+synonymsLen] + + if rv.synonyms == nil { + rv.synonyms = roaring64.NewBitmap() + } + + rv.buffer.Reset(roaringBytes) + + _, err := rv.synonyms.ReadFrom(rv.buffer) + if err != nil { + return fmt.Errorf("error loading roaring bitmap: %v", err) + } + + return nil +} + +// ----------------------------------------------------------------------------- + +// SynonymsIterator provides a way to iterate through the synonyms list. +type SynonymsIterator struct { + synonyms *SynonymsList + except *roaring.Bitmap + + Actual roaring64.IntPeekable64 + ActualBM *roaring64.Bitmap + + synIDTermMap map[uint32][]byte + nextSyn Synonym +} + +// immutable, empty synonyms iterator +var emptySynonymsIterator = &SynonymsIterator{} + +func (i *SynonymsIterator) Size() int { + sizeInBytes := reflectStaticSizeSynonymsIterator + SizeOfPtr + + i.nextSyn.Size() + + return sizeInBytes +} + +// Next returns the next Synonym in the iteration or an error if the end is reached. +func (i *SynonymsIterator) Next() (segment.Synonym, error) { + return i.next() +} + +// next retrieves the next synonym from the iterator, populates the nextSyn field, +// and returns it. If no valid synonym is found, it returns an error. +func (i *SynonymsIterator) next() (segment.Synonym, error) { + synID, docNum, exists, err := i.nextSynonym() + if err != nil || !exists { + return nil, err + } + + if i.synIDTermMap == nil { + return nil, fmt.Errorf("synIDTermMap is nil") + } + + // If the synonymID is not found in the map, return an error + term, exists := i.synIDTermMap[synID] + if !exists { + return nil, fmt.Errorf("synonymID %d not found in map", synID) + } + + i.nextSyn = Synonym{} // clear the struct + rv := &i.nextSyn + rv.term = string(term) + rv.docNum = docNum + + return rv, nil +} + +// nextSynonym decodes the next synonym from the roaring bitmap iterator, +// ensuring it is not in the "except" set. Returns the synonymID, docNum, +// and a flag indicating success. +func (i *SynonymsIterator) nextSynonym() (uint32, uint32, bool, error) { + // If no synonyms are available, return early + if i.Actual == nil || i.synonyms == nil || i.synonyms == emptySynonymsList { + return 0, 0, false, nil + } + + var code uint64 + var docNum uint32 + var synID uint32 + + // Loop to find the next valid docNum, checking against the except + for i.Actual.HasNext() { + code = i.Actual.Next() + synID, docNum = decodeSynonym(code) + + // If docNum is not in the 'except' set, it's a valid result + if i.except == nil || !i.except.Contains(docNum) { + return synID, docNum, true, nil + } + } + + // If no valid docNum is found, return false + return 0, 0, false, nil +} + +// Synonym represents a single synonym, containing the term, synonymID, and document number. +type Synonym struct { + term string + docNum uint32 +} + +// Size returns the memory size of the Synonym, including the length of the term string. +func (p *Synonym) Size() int { + sizeInBytes := reflectStaticSizeSynonym + SizeOfPtr + + len(p.term) + + return sizeInBytes +} + +// Term returns the term of the Synonym. +func (s *Synonym) Term() string { + return s.term +} + +// Number returns the document number of the Synonym. +func (s *Synonym) Number() uint32 { + return s.docNum +} + +// decodeSynonym decodes a synonymCode into its synonymID and document ID components. +func decodeSynonym(synonymCode uint64) (synonymID uint32, docID uint32) { + return uint32(synonymCode >> 32), uint32(synonymCode) +} diff --git a/thesaurus.go b/thesaurus.go new file mode 100644 index 00000000..f4f0d002 --- /dev/null +++ b/thesaurus.go @@ -0,0 +1,159 @@ +// Copyright (c) 2024 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "bytes" + "fmt" + + "github.com/RoaringBitmap/roaring" + index "github.com/blevesearch/bleve_index_api" + segment "github.com/blevesearch/scorch_segment_api/v2" + "github.com/blevesearch/vellum" +) + +// Thesaurus is the zap representation of a Thesaurus +type Thesaurus struct { + sb *SegmentBase + name string + fieldID uint16 + synIDTermMap map[uint32][]byte + fst *vellum.FST + + fstReader *vellum.Reader +} + +// represents an immutable, empty Thesaurus +var emptyThesaurus = &Thesaurus{} + +// SynonymsList returns the synonyms list for the specified term +func (t *Thesaurus) SynonymsList(term []byte, except *roaring.Bitmap, prealloc segment.SynonymsList) (segment.SynonymsList, error) { + var preallocSL *SynonymsList + sl, ok := prealloc.(*SynonymsList) + if ok && sl != nil { + preallocSL = sl + } + return t.synonymsList(term, except, preallocSL) +} + +func (t *Thesaurus) synonymsList(term []byte, except *roaring.Bitmap, rv *SynonymsList) (*SynonymsList, error) { + if t.fstReader == nil { + if rv == nil || rv == emptySynonymsList { + return emptySynonymsList, nil + } + return t.synonymsListInit(rv, except), nil + } + + synonymsOffset, exists, err := t.fstReader.Get(term) + + if err != nil { + return nil, fmt.Errorf("vellum err: %v", err) + } + if !exists { + if rv == nil || rv == emptySynonymsList { + return emptySynonymsList, nil + } + return t.synonymsListInit(rv, except), nil + } + + return t.synonymsListFromOffset(synonymsOffset, except, rv) +} + +func (t *Thesaurus) synonymsListFromOffset(synonymsOffset uint64, except *roaring.Bitmap, rv *SynonymsList) (*SynonymsList, error) { + rv = t.synonymsListInit(rv, except) + + err := rv.read(synonymsOffset, t) + if err != nil { + return nil, err + } + + return rv, nil +} + +func (t *Thesaurus) synonymsListInit(rv *SynonymsList, except *roaring.Bitmap) *SynonymsList { + if rv == nil || rv == emptySynonymsList { + rv = &SynonymsList{} + rv.buffer = bytes.NewReader(nil) + } else { + synonyms := rv.synonyms + buf := rv.buffer + if synonyms != nil { + synonyms.Clear() + } + if buf != nil { + buf.Reset(nil) + } + + *rv = SynonymsList{} // clear the struct + + rv.synonyms = synonyms + rv.buffer = buf + } + rv.sb = t.sb + rv.except = except + rv.synIDTermMap = t.synIDTermMap + return rv +} + +func (t *Thesaurus) Contains(key []byte) (bool, error) { + if t.fst != nil { + return t.fst.Contains(key) + } + return false, nil +} + +// AutomatonIterator returns an iterator which only visits terms +// having the the vellum automaton and start/end key range +func (t *Thesaurus) AutomatonIterator(a segment.Automaton, + startKeyInclusive, endKeyExclusive []byte) segment.ThesaurusIterator { + if t.fst != nil { + rv := &ThesaurusIterator{ + t: t, + } + + itr, err := t.fst.Search(a, startKeyInclusive, endKeyExclusive) + if err == nil { + rv.itr = itr + } else if err != vellum.ErrIteratorDone { + rv.err = err + } + + return rv + } + return emptyThesaurusIterator +} + +var emptyThesaurusIterator = &ThesaurusIterator{} + +// ThesaurusIterator is an iterator for term dictionary +type ThesaurusIterator struct { + t *Thesaurus + itr vellum.Iterator + err error + entry index.ThesaurusEntry +} + +// Next returns the next entry in the dictionary +func (i *ThesaurusIterator) Next() (*index.ThesaurusEntry, error) { + if i.err != nil && i.err != vellum.ErrIteratorDone { + return nil, i.err + } else if i.itr == nil || i.err == vellum.ErrIteratorDone { + return nil, nil + } + term, _ := i.itr.Current() + i.entry.Term = string(term) + i.err = i.itr.Next() + return &i.entry, nil +} diff --git a/thesaurus_test.go b/thesaurus_test.go new file mode 100644 index 00000000..8f205e75 --- /dev/null +++ b/thesaurus_test.go @@ -0,0 +1,375 @@ +// Copyright (c) 2024 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zap + +import ( + "os" + "sort" + "strconv" + "testing" + + "errors" + + "github.com/RoaringBitmap/roaring" + index "github.com/blevesearch/bleve_index_api" + segment "github.com/blevesearch/scorch_segment_api/v2" +) + +func buildTestSynonymDocument(id string, synonymSource string, terms []string, synonyms []string) index.Document { + // Create the synonym document using stubs. + stubAnalyzer := "standard" + // Create the synonym field. + synField := newStubSynonymField(synonymSource, stubAnalyzer, terms, synonyms) + // Analyze the synonyms. + synField.Analyze() + // Create the synonym document. + synDoc := newStubSynonymDocument(id, synField) + synDoc.AddIDField() + return synDoc +} + +func buildTestSegmentForThesaurus(results []index.Document) (*SegmentBase, error) { + seg, _, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode) + return seg.(*SegmentBase), err +} + +func extractSynonymsForTermFromThesaurus(thes segment.Thesaurus, term string, except *roaring.Bitmap) ([]string, error) { + list, err := thes.SynonymsList([]byte(term), except, nil) + if err != nil { + return nil, err + } + if list == nil { + return nil, errors.New("expected synonyms list") + } + listItr := list.Iterator(nil) + if listItr == nil { + return nil, errors.New("expected non-nil iterator") + } + var synonyms []string + for { + next, err := listItr.Next() + if err != nil { + return nil, err + } + if next == nil { + break + } + synonyms = append(synonyms, next.Term()) + } + return synonyms, nil +} + +func checkWithDeletes(except *roaring.Bitmap, collectionName string, testSynonymMap map[string][]string, seg segment.Segment) error { + dict, err := seg.Dictionary(collectionName) + if err != nil { + return err + } + if dict != emptyDictionary { + pl, err := dict.PostingsList([]byte{'a'}, nil, nil) + if err != nil { + return err + } + if pl != emptyPostingsList { + return errors.New("expected empty postings list") + } + } + synSeg, ok := seg.(segment.ThesaurusSegment) + if !ok { + return errors.New("expected synonym segment") + } + thes, err := synSeg.Thesaurus(collectionName) + if err != nil { + return err + } + if thes == emptyThesaurus { + return errors.New("expected a thesaurus") + } + for term, expectedSynonyms := range testSynonymMap { + synonyms, err := extractSynonymsForTermFromThesaurus(thes, term, except) + if err != nil { + return err + } + if len(synonyms) != len(expectedSynonyms) { + return errors.New("unexpected number of synonyms, expected: " + + strconv.Itoa(len(expectedSynonyms)) + " got: " + + strconv.Itoa(len(synonyms)) + " for term: " + term) + } + sort.Strings(synonyms) + sort.Strings(expectedSynonyms) + for i, synonym := range synonyms { + if synonym != expectedSynonyms[i] { + return errors.New("unexpected synonym" + synonym + " for term: " + term) + } + } + } + return nil +} + +func testSegmentSynonymAccuracy(testSynonymDefinitions []testSynonymDefinition, seg segment.Segment) error { + collSynMap := make(map[string][]testSynonymDefinition) + for _, testSynonymDefinition := range testSynonymDefinitions { + collSynMap[testSynonymDefinition.collectionName] = append(collSynMap[testSynonymDefinition.collectionName], testSynonymDefinition) + } + for collectionName, testSynonymMap := range collSynMap { + expectedSynonymMap := createExpectedSynonymMap(testSynonymMap) + err := checkWithDeletes(nil, collectionName, expectedSynonymMap, seg) + if err != nil { + return err + } + for i := 0; i < len(testSynonymMap); i++ { + except := roaring.New() + except.Add(uint32(i)) + modifiedSynonymMap := append([]testSynonymDefinition{}, testSynonymMap[:i]...) + modifiedSynonymMap = append(modifiedSynonymMap, testSynonymMap[i+1:]...) + expectedSynonymMap = createExpectedSynonymMap(modifiedSynonymMap) + err = checkWithDeletes(except, collectionName, expectedSynonymMap, seg) + if err != nil { + return err + } + } + } + return nil +} + +type testSynonymDefinition struct { + collectionName string + terms []string + synonyms []string +} + +func createEquivalentSynonymMap(input []string, resultMap map[string][]string) map[string][]string { + if resultMap == nil { + resultMap = make(map[string][]string) + } + for _, elem := range input { + for _, otherElem := range input { + if elem != otherElem { + resultMap[elem] = append(resultMap[elem], otherElem) + } + } + } + return resultMap +} + +func createExpectedSynonymMap(input []testSynonymDefinition) map[string][]string { + rv := make(map[string][]string) + for _, testSynonymDefinition := range input { + if testSynonymDefinition.terms == nil { + rv = createEquivalentSynonymMap(testSynonymDefinition.synonyms, rv) + } else { + for _, term := range testSynonymDefinition.terms { + rv[term] = append(rv[term], testSynonymDefinition.synonyms...) + } + } + } + return rv +} + +func buildSegment(testSynonymDefinitions []testSynonymDefinition) (segment.Segment, string, error) { + var testSynonymDocuments []index.Document + for i, testSynonymDefinition := range testSynonymDefinitions { + testSynonymDocuments = append(testSynonymDocuments, buildTestSynonymDocument( + strconv.Itoa(i), + testSynonymDefinition.collectionName, + testSynonymDefinition.terms, + testSynonymDefinition.synonyms, + )) + } + sb, err := buildTestSegmentForThesaurus(testSynonymDocuments) + if err != nil { + return nil, "", err + } + tmpDir, err := os.MkdirTemp("", "zap-") + if err != nil { + return nil, "", err + } + + err = os.RemoveAll(tmpDir) + if err != nil { + return nil, "", err + } + err = PersistSegmentBase(sb, tmpDir) + if err != nil { + return nil, "", err + } + seg, err := zapPlugin.Open(tmpDir) + if err != nil { + return nil, "", err + } + err = testSegmentSynonymAccuracy(testSynonymDefinitions, seg) + if err != nil { + return nil, "", err + } + return seg, tmpDir, nil +} + +func mergeSegments(segs []segment.Segment, drops []*roaring.Bitmap, testSynonymDefinitions []testSynonymDefinition) (string, error) { + tmpDir, err := os.MkdirTemp("", "mergedzap-") + if err != nil { + return "", err + } + err = os.RemoveAll(tmpDir) + if err != nil { + return "", err + } + // Test Merging of multiple segments + _, _, err = zapPlugin.Merge(segs, drops, tmpDir, nil, nil) + if err != nil { + return "", err + } + + seg, err := zapPlugin.Open(tmpDir) + if err != nil { + return "", err + } + err = testSegmentSynonymAccuracy(testSynonymDefinitions, seg) + if err != nil { + return "", err + } + cerr := seg.Close() + if cerr != nil { + return "", err + } + return tmpDir, nil +} + +func TestSynonymSegment(t *testing.T) { + synonymSourceOneName := "coll0" + synonymSourceTwoName := "coll1" + testSynonymDefinitions := []testSynonymDefinition{ + { + collectionName: synonymSourceOneName, + terms: nil, + synonyms: []string{ + "adeptness", + "aptitude", + "facility", + "faculty", + "capacity", + "power", + "knack", + "proficiency", + "ability", + }, + }, + { + collectionName: synonymSourceOneName, + terms: []string{"afflict"}, + synonyms: []string{ + "affect", + "bother", + "distress", + "oppress", + "trouble", + "torment", + }, + }, + { + collectionName: synonymSourceOneName, + terms: []string{"capacity"}, + synonyms: []string{ + "volume", + "content", + "size", + "dimensions", + "measure", + }, + }, + { + collectionName: synonymSourceTwoName, + synonyms: []string{ + "absolutely", + "unqualifiedly", + "unconditionally", + "unreservedly", + "unexceptionally", + "unequivocally", + }, + }, + { + collectionName: synonymSourceTwoName, + terms: []string{"abrupt"}, + synonyms: []string{ + "sudden", + "hasty", + "quick", + "precipitate", + "snappy", + }, + }, + } + // single segment test + seg1, dir, err := buildSegment(testSynonymDefinitions) + if err != nil { + t.Fatalf("error building segment: %v", err) + } + defer func() { + cerr := seg1.Close() + if cerr != nil { + t.Fatalf("error closing seg: %v", err) + } + err := os.RemoveAll(dir) + if err != nil { + t.Fatalf("error removing dir: %v", err) + } + }() + + // multiple segment test + numSegs := 3 + numDocs := 5 + segData := make([][]testSynonymDefinition, numSegs) + + segData[0] = make([]testSynonymDefinition, 0) + segData[0] = testSynonymDefinitions[:2] // 2 docs + + segData[1] = make([]testSynonymDefinition, 0) + segData[1] = testSynonymDefinitions[2:4] // 2 docs + + segData[2] = make([]testSynonymDefinition, 0) + segData[2] = testSynonymDefinitions[4:] // 1 doc + + segs := make([]segment.Segment, numSegs) + dirs := make([]string, numSegs) + for i, data := range segData { + seg, dir, err := buildSegment(data) + if err != nil { + t.Fatalf("error building segment: %v", err) + } + segs[i] = seg + dirs[i] = dir + } + drops := make([]*roaring.Bitmap, numDocs) + for i := 0; i < numDocs; i++ { + drops[i] = roaring.New() + } + mergeDir, err := mergeSegments(segs, drops, testSynonymDefinitions) + if err != nil { + t.Fatalf("error merging segments: %v", err) + } + for i := 0; i < numSegs; i++ { + cerr := segs[i].Close() + if cerr != nil { + t.Fatalf("error closing seg: %v", err) + } + err := os.RemoveAll(dirs[i]) + if err != nil { + t.Fatalf("error removing dir: %v", err) + } + err = os.RemoveAll(mergeDir) + if err != nil { + t.Fatalf("error removing dir: %v", err) + } + } +} diff --git a/zap.md b/zap.md index 675ac56c..8bafac62 100644 --- a/zap.md +++ b/zap.md @@ -28,9 +28,9 @@ ### Chunked data - [--------] - [ ] - [--------] + [--------] + [ ] + [--------] ## Overview @@ -65,7 +65,6 @@ Footer section describes the configuration of particular ZAP file. The format of V. Version. CC. CRC32. - ## Stored Fields Stored Fields Index is `D#` consecutive 64-bit unsigned integers - offsets, where relevant Stored Fields Data records are located. @@ -114,10 +113,9 @@ Sections Index is a set of NF uint64 addresses (0 through F# - 1) each of which NS. Number of index sections Sn. nth index section - ## Inverted Text Index Section -Each fields has its own types of indexes in separate sections as indicated above. This can be a vector index or inverted text index. +Each field has its own types of indexes in separate sections as indicated above. This can be a vector index or inverted text index. In case of inverted text index, the dictionary is encoded in [Vellum](https://github.com/couchbase/vellum) format. Dictionary consists of pairs `(term, offset)`, where `offset` indicates the position of postings (list of documents) for this particular term. @@ -151,6 +149,8 @@ In case of inverted text index, the dictionary is encoded in [Vellum](https://gi | | | | |================================================================+- Vector Index Section | | | + | +================================================================+- Synonym Index Section + | | | | |================================================================+- Sections Info +-----------------------------+ | | | | @@ -162,22 +162,76 @@ In case of inverted text index, the dictionary is encoded in [Vellum](https://gi ITI - Inverted Text Index +## Synonym Index Section -## Doc Values +In a synonyms index, the relationship between a term and its synonyms is represented using a Thesaurus. The Thesaurus is encoded in the [Vellum](https://github.com/couchbase/vellum) format and consists of pairs in the form `(term, offset)`. Here, the offset specifies the position of the postings list containing the synonyms for the given term. The postings list is stored as a Roaring64 bitmap, with each entry representing an encoded synonym for the term. -DocValue start and end offsets are stored within the section content of each field. This allows each field having its own type of index to choose whether to store the doc values or not. For example, it may not make sense to store doc values for vector indexing and so, the offsets can be invalid ones for it whereas the fields having text indexing may have valid doc values offsets. + |================================================================+- Inverted Text Index Section + | | + |================================================================+- Vector Index Section + | | + +================================================================+- Synonym Index Section + | | + | (Offset) +~~~~~+----------+...+---+ | + | +--------->| RL | ROARING64 BITMAP | | + | | +~~~~~+----------+...+---+ +-------------------+ + | |(Term -> Offset) | + | +--------+ | + | | Term ID to Term map (NST Entries) | + | +~~~~+~~~~+~~~~~[{~~~~~+~~~~+~~~~~~}{~~~~~+~~~~+~~~~~~}...{~~~~~+~~~~+~~~~~~}] | + | +->| VL | VD | NST || TID | TL | Term || TID | TL | Term | | TID | TL | Term | | + | | +~~~~+~~~~+~~~~~[{~~~~~+~~~~+~~~~~~}{~~~~~+~~~~+~~~~~~}...{~~~~~+~~~~+~~~~~~}] | + | | | + | +----------------------------+ | + | | | + | +~~~~~~~~~~+~~~~~~~~+~~~~~~~~~~~~~~~~~+ | + +-----> DV Start | DV End | ThesaurusOffset | | + | | +~~~~~~~~~~+~~~~~~~~+~~~~~~~~~~~~~~~~~+ +-------------------+ + | | | + | | | + | |================================================================+- Sections Info + +-----------------------------+ | + | | | + | +-------+-----+-----+------+~~~~~~~~+~~~~~~~~+--+...+--+ | + | | ... | SI | SI ADDR | NS | Length | Name | | + | +-------+-----+------------+~~~~~~~~+~~~~~~~~+--+...+--+ | + +================================================================+ + + SI - Synonym Index + VL - Vellum Length + VD - Vellum Data (Term -> Offset) + RL - Roaring64 Length + NST - Number of entries in the term ID to term map + TID - Term ID (32-bit) + TL - Term Length + +### Synonym Encoding + ROARING64 BITMAP - +================================================================+ - | +------...--+ | - | +->+ DocValues +<-+ | - | | +------...--+ | | - |==|=================|===========================================+- Inverted Text - ++~+~~~~~~~~~+~~~~~~~+~~+~~~~~~~~+-----------------------...--+ | Index Section - || DV START | DV END | LENGTH | VELLUM DATA: TERM -> OFFSET| | - ++~~~~~~~~~~~+~~~~~~~~~~+~~~~~~~~+-----------------------...--+ | - +================================================================+ + Each 64-bit entry consists of two parts: the first 32 bits represent the Term ID (TID), + and the next 32 bits represent the Document Number (DN). + + [{~~~~~+~~~~}{~~~~~+~~~~}...{~~~~~+~~~~}] + | TID | DN || TID | DN | | TID | DN | + [{~~~~~+~~~~}{~~~~~+~~~~}...{~~~~~+~~~~}] + + TID - Term ID (32-bit) + DN - Document Number (32-bit) + +## Doc Values + +DocValue start and end offsets are stored within the section content of each field. This allows each field having its own type of index to choose whether to store the doc values or not. For example, it may not make sense to store doc values for vector indexing and so, the offsets can be invalid ones for it whereas the fields having text indexing may have valid doc values offsets. + +================================================================+ + | +------...--+ | + | +->+ DocValues +<-+ | + | | +------...--+ | | + |==|=================|===========================================+- Inverted Text + ++~+~~~~~~~~~+~~~~~~~+~~+~~~~~~~~+-----------------------...--+ | Index Section + || DV START | DV END | LENGTH | VELLUM DATA: TERM -> OFFSET| | + ++~~~~~~~~~~~+~~~~~~~~~~+~~~~~~~~+-----------------------...--+ | + +================================================================+ DocValues is chunked Snappy-compressed values for each document and field.