Skip to content

Commit

Permalink
WIP compaction & segments
Browse files Browse the repository at this point in the history
  • Loading branch information
CorentinB committed Jul 13, 2024
1 parent 61b7b9c commit 5a00379
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 59 deletions.
143 changes: 143 additions & 0 deletions internal/pkg/queue/compact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package queue

import (
"encoding/gob"
"fmt"
"io"
"os"
"path"
"time"

"github.com/sirupsen/logrus"
)

func (q *PersistentGroupedQueue) compact() {
q.mutex.Lock()
defer q.mutex.Unlock()

// Create a new segment for compacted data
newSegmentName := fmt.Sprintf("segment_%d", time.Now().UnixNano())
newSegmentPath := path.Join(q.segmentDir, newSegmentName)
newSegment, err := os.Create(newSegmentPath)
if err != nil {
q.logError("Failed to create new segment for compaction", err)
return
}
defer newSegment.Close()

newEncoder := gob.NewEncoder(newSegment)
newHostIndex := make(map[string][]uint64)

// Iterate through all active segments
for _, segmentPath := range q.activeSegments {
file, err := os.Open(segmentPath)
if err != nil {
q.logError("Failed to open segment for compaction", err)
continue
}
defer file.Close()

decoder := gob.NewDecoder(file)

for {
var item Item
err := decoder.Decode(&item)
if err == io.EOF {
break
}
if err != nil {
q.logError("Failed to decode item during compaction", err)
continue
}

// Check if this item is still in the queue
positions, exists := q.hostIndex[item.Host]
if !exists {
continue
}

itemPosition, err := file.Seek(0, io.SeekCurrent)
if err != nil {
q.logError("Failed to get current position in segment", err)
continue
}

if !containsUint64(positions, uint64(itemPosition)) {
continue
}

// Item is still in queue, write it to new segment
newPosition, err := newSegment.Seek(0, io.SeekCurrent)
if err != nil {
q.logError("Failed to get position in new segment", err)
continue
}

err = newEncoder.Encode(&item)
if err != nil {
q.logError("Failed to encode item to new segment", err)
continue
}

newHostIndex[item.Host] = append(newHostIndex[item.Host], uint64(newPosition))
}
}

// Replace old segments with new one
for _, segmentPath := range q.activeSegments {
err := os.Remove(segmentPath)
if err != nil {
q.logError("Failed to remove old segment", err)
}
}

// Update queue state
q.activeSegments = []string{newSegmentPath}
q.currentSegment.Close()
q.currentSegment, err = os.OpenFile(newSegmentPath, os.O_RDWR, 0644)
if err != nil {
q.logError("Failed to open new segment after compaction", err)
return
}
q.queueEncoder = gob.NewEncoder(q.currentSegment)
q.queueDecoder = gob.NewDecoder(q.currentSegment)
q.hostIndex = newHostIndex

// Save updated metadata
err = q.saveMetadata()
if err != nil {
q.logError("Failed to save metadata after compaction", err)
}

q.logInfo("Compaction completed successfully")
}

// Helper function to check if a uint64 slice contains a value
func containsUint64(slice []uint64, value uint64) bool {
for _, v := range slice {
if v == value {
return true
}
}
return false
}

// Helper function for logging errors
func (q *PersistentGroupedQueue) logError(message string, err error) {
if q.LoggingChan != nil {
q.LoggingChan <- &LogMessage{
Level: logrus.ErrorLevel,
Message: fmt.Sprintf("%s: %v", message, err),
}
}
}

// Helper function for logging info
func (q *PersistentGroupedQueue) logInfo(message string) {
if q.LoggingChan != nil {
q.LoggingChan <- &LogMessage{
Level: logrus.InfoLevel,
Message: message,
}
}
}
48 changes: 44 additions & 4 deletions internal/pkg/queue/dequeue.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package queue

import (
"encoding/gob"
"fmt"
"io"
"os"
)

// Dequeue removes and returns the next item from the queue
Expand All @@ -17,6 +19,9 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) {

for len(q.hostOrder) == 0 {
q.cond.Wait() // This unlocks the mutex while waiting
if q.closed {
return nil, ErrQueueClosed
}
}

// Loop through hosts until we find one with items or we've checked all hosts
Expand All @@ -42,13 +47,38 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) {
position := positions[0]
q.hostIndex[host] = positions[1:]

// Find the segment containing this position
var segment *os.File
var segmentOffset uint64
for _, segmentPath := range q.activeSegments {
info, err := os.Stat(segmentPath)
if err != nil {
return nil, fmt.Errorf("failed to get segment info: %w", err)
}
if uint64(info.Size()) > position {
segment, err = os.OpenFile(segmentPath, os.O_RDWR, 0644)
if err != nil {
return nil, fmt.Errorf("failed to open segment: %w", err)
}
break
}
segmentOffset += uint64(info.Size())
}

if segment == nil {
return nil, fmt.Errorf("failed to find segment for position %d", position)
}
defer segment.Close()

// Seek to position and decode item
_, err := q.queueFile.Seek(int64(position), io.SeekStart)
_, err := segment.Seek(int64(position-segmentOffset), io.SeekStart)
if err != nil {
return nil, fmt.Errorf("failed to seek to item position: %w", err)
}

decoder := gob.NewDecoder(segment)
var item Item
err = q.queueDecoder.Decode(&item)
err = decoder.Decode(&item)
if err != nil {
return nil, fmt.Errorf("failed to decode item: %w", err)
}
Expand All @@ -59,6 +89,16 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) {
// Update stats
updateDequeueStats(q, host)

// Check if compaction is needed
totalItems := 0
for _, pos := range q.hostIndex {
totalItems += len(pos)
}
utilization := float64(totalItems) / float64(q.segmentSize*int64(len(q.activeSegments)))
if utilization < q.compactThreshold {
go q.compact() // Run compaction in background
}

err = q.saveMetadata()
if err != nil {
return nil, fmt.Errorf("failed to save metadata: %w", err)
Expand All @@ -67,6 +107,6 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) {
return &item, nil
}

// If we've checked all hosts and found no items, loop back to wait again
return q.Dequeue()
// If we've checked all hosts and found no items, return queue empty error
return nil, ErrQueueEmpty
}
16 changes: 14 additions & 2 deletions internal/pkg/queue/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,22 @@ func (q *PersistentGroupedQueue) Enqueue(item *Item) error {
q.mutex.Lock()
defer q.mutex.Unlock()

// Check if current segment is full
info, err := q.currentSegment.Stat()
if err != nil {
return fmt.Errorf("failed to get segment info: %w", err)
}

if info.Size() >= q.segmentSize {
if err := q.createNewSegment(); err != nil {
return fmt.Errorf("failed to create new segment: %w", err)
}
}

// Find free position
startPos, err := q.queueFile.Seek(0, io.SeekEnd)
startPos, err := q.currentSegment.Seek(0, io.SeekEnd)
if err != nil {
return fmt.Errorf("failed to seek to end of file: %w", err)
return fmt.Errorf("failed to seek to end of segment: %w", err)
}

// Encode and write item
Expand Down
92 changes: 54 additions & 38 deletions internal/pkg/queue/metadata.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
package queue

import (
"encoding/gob"
"fmt"
"io"
"io/ioutil"
"os"
"path"
)

func (q *PersistentGroupedQueue) loadMetadata() error {
q.mutex.Lock()
defer q.mutex.Unlock()

_, err := q.metadataFile.Seek(0, 0)
if err != nil {
return fmt.Errorf("failed to seek to start of metadata file: %w", err)
}
decoder := gob.NewDecoder(q.metadataFile)

var metadata struct {
HostIndex map[string][]uint64
HostOrder []string
CurrentHost int
Stats QueueStats
ActiveSegments []string
HostIndex map[string][]uint64
HostOrder []string
CurrentHost int
SegmentSize int64
CompactThreshold float64
Stats QueueStats
}

err = q.metadataDecoder.Decode(&metadata)
err := decoder.Decode(&metadata)
if err != nil {
if err == io.EOF {
// No metadata yet, this might be a new queue
Expand All @@ -30,52 +31,67 @@ func (q *PersistentGroupedQueue) loadMetadata() error {
return fmt.Errorf("failed to decode metadata: %w", err)
}

q.activeSegments = metadata.ActiveSegments
q.hostIndex = metadata.HostIndex
q.hostOrder = metadata.HostOrder
q.currentHost = metadata.CurrentHost
q.segmentSize = metadata.SegmentSize
q.compactThreshold = metadata.CompactThreshold
q.stats = metadata.Stats

// Reinitialize maps if they're nil
if q.stats.ElementsPerHost == nil {
q.stats.ElementsPerHost = make(map[string]int)
}
if q.stats.HostDistribution == nil {
q.stats.HostDistribution = make(map[string]float64)
}

return nil
}

func (q *PersistentGroupedQueue) saveMetadata() error {
q.statsMutex.RLock()
defer q.statsMutex.RUnlock()

_, err := q.metadataFile.Seek(0, 0)
// Create a temporary file for writing metadata
tempFile, err := ioutil.TempFile(path.Dir(q.metadataFile.Name()), "metadata_temp_")
if err != nil {
return fmt.Errorf("failed to seek to start of metadata file: %w", err)
return fmt.Errorf("failed to create temporary metadata file: %w", err)
}
defer os.Remove(tempFile.Name())
defer tempFile.Close()

err = q.metadataFile.Truncate(0)
if err != nil {
return fmt.Errorf("failed to truncate metadata file: %w", err)
}
encoder := gob.NewEncoder(tempFile)

// Create a metadata struct to hold all the necessary information
metadata := struct {
HostIndex map[string][]uint64
HostOrder []string
CurrentHost int
Stats QueueStats
ActiveSegments []string
HostIndex map[string][]uint64
HostOrder []string
CurrentHost int
SegmentSize int64
CompactThreshold float64
Stats QueueStats
}{
HostIndex: q.hostIndex,
HostOrder: q.hostOrder,
CurrentHost: q.currentHost,
Stats: q.stats,
ActiveSegments: q.activeSegments,
HostIndex: q.hostIndex,
HostOrder: q.hostOrder,
CurrentHost: q.currentHost,
SegmentSize: q.segmentSize,
CompactThreshold: q.compactThreshold,
Stats: q.stats,
}

err = q.metadataEncoder.Encode(metadata)
// Encode the metadata
err = encoder.Encode(metadata)
if err != nil {
return fmt.Errorf("failed to encode metadata: %w", err)
}

// Ensure all data is written to the temporary file
err = tempFile.Sync()
if err != nil {
return fmt.Errorf("failed to sync temporary metadata file: %w", err)
}

// Close the temporary file before renaming
tempFile.Close()

// Atomically replace the old metadata file with the new one
err = os.Rename(tempFile.Name(), q.metadataFile.Name())
if err != nil {
return fmt.Errorf("failed to rename temporary metadata file: %w", err)
}

return nil
}
Loading

0 comments on commit 5a00379

Please sign in to comment.