Skip to content

Commit

Permalink
remove unnecessary bytes written
Browse files Browse the repository at this point in the history
  • Loading branch information
Thejas-bhat committed Oct 25, 2024
1 parent a72794c commit 704bbef
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 24 deletions.
5 changes: 2 additions & 3 deletions contentcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"encoding/binary"
"io"
"reflect"
"sync/atomic"

"github.com/golang/snappy"
)
Expand Down Expand Up @@ -112,11 +111,11 @@ func (c *chunkedContentCoder) Close() error {
}

func (c *chunkedContentCoder) incrementBytesWritten(val uint64) {
atomic.AddUint64(&c.bytesWritten, val)
c.bytesWritten += val
}

func (c *chunkedContentCoder) getBytesWritten() uint64 {
return atomic.LoadUint64(&c.bytesWritten)
return c.bytesWritten
}

func (c *chunkedContentCoder) flushContents() error {
Expand Down
5 changes: 2 additions & 3 deletions intcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"encoding/binary"
"io"
"sync/atomic"
)

// We can safely use 0 to represent termNotEncoded since 0
Expand Down Expand Up @@ -79,11 +78,11 @@ func (c *chunkedIntCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) {
}

func (c *chunkedIntCoder) incrementBytesWritten(val uint64) {
atomic.AddUint64(&c.bytesWritten, val)
c.bytesWritten += val
}

func (c *chunkedIntCoder) getBytesWritten() uint64 {
return atomic.LoadUint64(&c.bytesWritten)
return c.bytesWritten
}

// Add encodes the provided integers into the correct chunk for the provided
Expand Down
19 changes: 17 additions & 2 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,28 @@ import (
"github.com/golang/snappy"
)

var DefaultFileMergerBufferSize = 1024 * 1024
var DefaultFileMergerBufferSize = 10 * 1024 * 1024

const docDropped = math.MaxUint64 // sentinel docNum to represent a deleted doc

// Merge takes a slice of segments and bit masks describing which
// documents may be dropped, and creates a new segment containing the
// remaining data. This new segment is built at the specified path.
func (*ZapPlugin) Merge(segments []seg.Segment, drops []*roaring.Bitmap, path string,
func (z *ZapPlugin) Merge(segments []seg.Segment, drops []*roaring.Bitmap, path string,
closeCh chan struct{}, s seg.StatsReporter) (
[][]uint64, uint64, error) {
return z.merge(segments, drops, path, closeCh, s, nil)
}

func (z *ZapPlugin) MergeEx(segments []seg.Segment, drops []*roaring.Bitmap, path string,
closeCh chan struct{}, s seg.StatsReporter, config map[string]interface{}) (
[][]uint64, uint64, error) {
return z.merge(segments, drops, path, closeCh, s, config)
}

func (*ZapPlugin) merge(segments []seg.Segment, drops []*roaring.Bitmap, path string,
closeCh chan struct{}, s seg.StatsReporter, config map[string]interface{}) (
[][]uint64, uint64, error) {
segmentBases := make([]*SegmentBase, len(segments))
for segmenti, segment := range segments {
switch segmentx := segment.(type) {
Expand Down Expand Up @@ -88,6 +100,7 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat
return nil, 0, err
}

fmt.Println("size of buffer before flush is", br.Size(), br.Buffered())
err = br.Flush()
if err != nil {
cleanup()
Expand Down Expand Up @@ -373,6 +386,8 @@ func persistMergedRest(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
}
vellumData := vellumBuf.Bytes()

fmt.Println("the length of vellum data is ", len(vellumData))

// write out the length of the vellum data
n := binary.PutUvarint(bufMaxVarintLen64, uint64(len(vellumData)))
_, err = w.Write(bufMaxVarintLen64[:n])
Expand Down
12 changes: 8 additions & 4 deletions new.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,15 @@ var ValidateDocFields = func(field index.Field) error {
// New creates an in-memory zap-encoded SegmentBase from a set of Documents
func (z *ZapPlugin) New(results []index.Document) (
segment.Segment, uint64, error) {
return z.newWithChunkMode(results, DefaultChunkMode)
return z.newWithChunkMode(results, DefaultChunkMode, nil)
}

func (z *ZapPlugin) NewEx(results []index.Document, config map[string]interface{}) (
segment.Segment, uint64, error) {
return z.newWithChunkMode(results, DefaultChunkMode, config)
}
func (*ZapPlugin) newWithChunkMode(results []index.Document,
chunkMode uint32) (segment.Segment, uint64, error) {
chunkMode uint32, config map[string]interface{}) (segment.Segment, uint64, error) {
s := interimPool.Get().(*interim)

var br bytes.Buffer
Expand Down Expand Up @@ -497,11 +501,11 @@ func (s *interim) processDocument(docNum uint64,
}

func (s *interim) getBytesWritten() uint64 {
return atomic.LoadUint64(&s.bytesWritten)
return s.bytesWritten
}

func (s *interim) incrementBytesWritten(val uint64) {
atomic.AddUint64(&s.bytesWritten, val)
s.bytesWritten += val
}

func (s *interim) writeStoredFields() (
Expand Down
57 changes: 45 additions & 12 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,39 @@ var reflectStaticSizeSegmentBase int
func init() {
var sb SegmentBase
reflectStaticSizeSegmentBase = int(unsafe.Sizeof(sb))

fmt.Println("segment offset mem", unsafe.Offsetof(sb.mem))
fmt.Println("segment offset dictLocs", unsafe.Offsetof(sb.dictLocs))
fmt.Println("segment offset fieldDvNames", unsafe.Offsetof(sb.fieldDvNames))
fmt.Println("segment offset fieldsInv", unsafe.Offsetof(sb.fieldsInv))
fmt.Println("segment offset fieldsMap", unsafe.Offsetof(sb.fieldsMap))
fmt.Println("segment offset fieldDvReaders", unsafe.Offsetof(sb.fieldDvReaders))
fmt.Println("segment offset sb.m", unsafe.Offsetof(sb.m))
fmt.Println("segment offset fieldFSTs", unsafe.Offsetof(sb.fieldFSTs))
fmt.Println("segment offset numDocs", unsafe.Offsetof(sb.numDocs))
fmt.Println("segment offset storedIndexOffset", unsafe.Offsetof(sb.storedIndexOffset))
fmt.Println("segment offset fieldsIndexOffset", unsafe.Offsetof(sb.fieldsIndexOffset))
fmt.Println("segment offset docValueOffset", unsafe.Offsetof(sb.docValueOffset))
fmt.Println("segment offset size", unsafe.Offsetof(sb.size))
fmt.Println("segment offset bytesRead", unsafe.Offsetof(sb.bytesRead))
fmt.Println("segment offset bytesWritten", unsafe.Offsetof(sb.bytesWritten))
fmt.Println("segment offset memCRC", unsafe.Offsetof(sb.memCRC))
fmt.Println("segment offset chunkMode", unsafe.Offsetof(sb.chunkMode))

}

// Open returns a zap impl of a segment along with keeping some config values in
// mind during the segment's lifetime.
func (z *ZapPlugin) OpenEx(path string, config map[string]interface{}) (segment.Segment, error) {
return z.open(path, config)
}

// Open returns a zap impl of a segment
func (*ZapPlugin) Open(path string) (segment.Segment, error) {
func (z *ZapPlugin) Open(path string) (segment.Segment, error) {
return z.open(path, nil)
}

func (*ZapPlugin) open(path string, config map[string]interface{}) (segment.Segment, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
Expand Down Expand Up @@ -89,26 +118,30 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) {
// SegmentBase is a memory only, read-only implementation of the
// segment.Segment interface, using zap's data representation.
type SegmentBase struct {
mem []byte
memCRC uint32
chunkMode uint32
fieldsMap map[string]uint16 // fieldName -> fieldID+1
fieldsInv []string // fieldID -> fieldName
mem []byte
dictLocs []uint64
fieldDvNames []string // field names cached in fieldDvReaders
fieldsInv []string // fieldID -> fieldName

fieldsMap map[string]uint16 // fieldName -> fieldID+1
fieldDvReaders map[uint16]*docValueReader // naive chunk cache per field

m sync.Mutex
fieldFSTs map[uint16]*vellum.FST

numDocs uint64
storedIndexOffset uint64
fieldsIndexOffset uint64
docValueOffset uint64
dictLocs []uint64
fieldDvReaders map[uint16]*docValueReader // naive chunk cache per field
fieldDvNames []string // field names cached in fieldDvReaders
size uint64

size uint64

// atomic access to these variables
bytesRead uint64
bytesWritten uint64

m sync.Mutex
fieldFSTs map[uint16]*vellum.FST
memCRC uint32
chunkMode uint32
}

func (sb *SegmentBase) Size() int {
Expand Down

0 comments on commit 704bbef

Please sign in to comment.