Skip to content

Commit

Permalink
[chore] [exporterhelper] Add an option for items based queue sizing
Browse files Browse the repository at this point in the history
Introduce an option to limit the queue size by the number of items instead of number of requests. This is preliminary step for having the exporter helper v2 with a batcher sender placed after the queue sender. Otherwise, it'll be hard for the users to estimate the queue size based on the number of requests without batch processor in front of it.

This change doesn't effect the existing functionality and the items based queue limiting cannot be utilized yet.
  • Loading branch information
dmitryax committed Dec 19, 2023
1 parent a7dc838 commit 6da46cd
Show file tree
Hide file tree
Showing 6 changed files with 507 additions and 105 deletions.
19 changes: 12 additions & 7 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,24 @@ import (
// the producer are dropped.
type boundedMemoryQueue[T any] struct {
component.StartFunc
QueueCapacityLimiter[T]
items chan queueRequest[T]
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T any](capacity int) Queue[T] {
func NewBoundedMemoryQueue[T any](capacityLimiter QueueCapacityLimiter[T]) Queue[T] {
return &boundedMemoryQueue[T]{
items: make(chan queueRequest[T], capacity),
QueueCapacityLimiter: capacityLimiter,
items: make(chan queueRequest[T], capacityLimiter.Capacity()),
}
}

// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
if !q.QueueCapacityLimiter.claim(req) {
return ErrQueueIsFull
}
select {
case q.items <- queueRequest[T]{ctx: ctx, req: req}:
return nil
Expand All @@ -45,6 +50,7 @@ func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) err
if !ok {
return false
}
q.QueueCapacityLimiter.release(item.req)
// the memory queue doesn't handle consume errors
_ = consumeFunc(item.ctx, item.req)
return true
Expand All @@ -58,11 +64,10 @@ func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {

// Size returns the current size of the queue
func (q *boundedMemoryQueue[T]) Size() int {
return len(q.items)
}

func (q *boundedMemoryQueue[T]) Capacity() int {
return cap(q.items)
if _, ok := q.QueueCapacityLimiter.(*nopCapacityLimiter[T]); ok {
return len(q.items)
}
return int(q.QueueCapacityLimiter.size())
}

type queueRequest[T any] struct {
Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func TestBoundedQueue(t *testing.T) {
q := NewBoundedMemoryQueue[string](1)
q := NewBoundedMemoryQueue[string](NewNopCapacityLimiter[string](1))

assert.NoError(t, q.Offer(context.Background(), "a"))

Expand Down Expand Up @@ -73,7 +73,7 @@ func TestBoundedQueue(t *testing.T) {
// only after Stop will mean the consumers are still locked while
// trying to perform the final consumptions.
func TestShutdownWhileNotEmpty(t *testing.T) {
q := NewBoundedMemoryQueue[string](1000)
q := NewBoundedMemoryQueue[string](NewNopCapacityLimiter[string](1000))

assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -159,7 +159,7 @@ func benchmarkQueueUsage(b *testing.B, capacity int, numConsumers int, numberOfI
func queueUsage(tb testing.TB, capacity int, numConsumers int, numberOfItems int) {
var wg sync.WaitGroup
wg.Add(numberOfItems)
q := NewBoundedMemoryQueue[string](capacity)
q := NewBoundedMemoryQueue[string](NewNopCapacityLimiter[string](capacity))
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) error {
wg.Done()
return nil
Expand All @@ -176,7 +176,7 @@ func queueUsage(tb testing.TB, capacity int, numConsumers int, numberOfItems int
}

func TestZeroSizeNoConsumers(t *testing.T) {
q := NewBoundedMemoryQueue[string](0)
q := NewBoundedMemoryQueue[string](NewNopCapacityLimiter[string](0))

err := q.Start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
Expand Down
139 changes: 120 additions & 19 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"
"sync"

"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -41,6 +42,8 @@ import (
// index index x
// xxxx deleted
type persistentQueue[T any] struct {
QueueCapacityLimiter[T]

set exporter.CreateSettings
storageID component.ID
dataType component.DataType
Expand All @@ -55,6 +58,8 @@ type persistentQueue[T any] struct {
mu sync.Mutex
readIndex uint64
writeIndex uint64
initIndexSize uint64
initQueueSize uint64
currentlyDispatchedItems []uint64
refClient int64
stopped bool
Expand All @@ -68,6 +73,7 @@ const (
readIndexKey = "ri"
writeIndexKey = "wi"
currentlyDispatchedItemsKey = "di"
currentlyQueueSizeKey = "si"
)

var (
Expand All @@ -78,15 +84,17 @@ var (
)

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue[T any](capacity int, dataType component.DataType, storageID component.ID, marshaler func(req T) ([]byte, error), unmarshaler func([]byte) (T, error), set exporter.CreateSettings) Queue[T] {
func NewPersistentQueue[T any](cl QueueCapacityLimiter[T], dataType component.DataType, storageID component.ID,
marshaler func(req T) ([]byte, error), unmarshaler func([]byte) (T, error), set exporter.CreateSettings) Queue[T] {
return &persistentQueue[T]{
set: set,
storageID: storageID,
dataType: dataType,
unmarshaler: unmarshaler,
marshaler: marshaler,
capacity: uint64(capacity),
putChan: make(chan struct{}, capacity),
QueueCapacityLimiter: cl,
set: set,
storageID: storageID,
dataType: dataType,
unmarshaler: unmarshaler,
marshaler: marshaler,
capacity: uint64(cl.Capacity()),
putChan: make(chan struct{}, cl.Capacity()),
}
}

Expand All @@ -110,11 +118,17 @@ func (pq *persistentQueue[T]) initClient(ctx context.Context, client storage.Cli

// Ensure the communication channel has the same size as the queue
// We might already have items here from requeueing non-dispatched requests
for len(pq.putChan) < int(pq.size()) {
for len(pq.putChan) < int(pq.indexSize()) {
pq.putChan <- struct{}{}
}
}

// capacityLimiterDisabled returns true if the queue capacity is limited by the number of requests and handled by the queue itself.
func (pq *persistentQueue[T]) capacityLimiterDisabled() bool {
_, ok := pq.QueueCapacityLimiter.(*nopCapacityLimiter[T])
return ok
}

func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Context) {
riOp := storage.GetOperation(readIndexKey)
wiOp := storage.GetOperation(writeIndexKey)
Expand All @@ -137,6 +151,28 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
pq.readIndex = 0
pq.writeIndex = 0
}

// Read snapshot of the queue size from storage. It's not a problem if the value cannot be fetched,
// or it's not accurate. The queue size will be corrected once the recovered queue is drained.
if !pq.capacityLimiterDisabled() {
pq.initIndexSize = pq.writeIndex - pq.readIndex
if pq.initIndexSize > 0 {
res, err := pq.client.Get(ctx, currentlyQueueSizeKey)
if err == nil {
pq.initQueueSize, err = bytesToItemIndex(res)
}
if err != nil {
if errors.Is(err, errValueNotSet) {
pq.set.Logger.Warn("Cannot read the queue size snapshot from storage. "+
"The reported queue size will be inaccurate until the initial queue is drained. "+
"It's expected when the items sized queue enabled for the first time", zap.Error(err))
} else {
pq.set.Logger.Error("Failed to read the queue size snapshot from storage. "+
"The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err))
}
}
}
}
}

// Consume applies the provided function on the head of queue.
Expand All @@ -159,15 +195,19 @@ func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error
}
}

func (pq *persistentQueue[T]) size() uint64 {
return pq.writeIndex - pq.readIndex
// Size returns the current size of the queue.
func (pq *persistentQueue[T]) Size() int {
if pq.capacityLimiterDisabled() {
pq.mu.Lock()
defer pq.mu.Unlock()
return int(pq.indexSize())
}
return int(pq.initQueueSize + pq.QueueCapacityLimiter.size())
}

// Size returns the number of currently available items, which were not picked by consumers yet
func (pq *persistentQueue[T]) Size() int {
pq.mu.Lock()
defer pq.mu.Unlock()
return int(pq.size())
// indexSize returns the number of requests in the queue. The caller must hold the mutex.
func (pq *persistentQueue[T]) indexSize() uint64 {
return pq.writeIndex - pq.readIndex
}

// Capacity returns the number of currently available items, which were not picked by consumers yet
Expand All @@ -181,7 +221,17 @@ func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error {
defer pq.mu.Unlock()
// Mark this queue as stopped, so consumer don't start any more work.
pq.stopped = true
return pq.unrefClient(ctx)
return multierr.Combine(
pq.writeQueueSize(ctx),
pq.unrefClient(ctx),
)
}

func (pq *persistentQueue[T]) writeQueueSize(ctx context.Context) error {
if pq.capacityLimiterDisabled() {
return nil
}
return pq.client.Set(ctx, currentlyQueueSizeKey, itemIndexToBytes(pq.QueueCapacityLimiter.size()))
}

// unrefClient unrefs the client, and closes if no more references. Callers MUST hold the mutex.
Expand All @@ -205,8 +255,7 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {

// putInternal is the internal version that requires caller to hold the mutex lock.
func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
if pq.size() >= pq.capacity {
pq.set.Logger.Warn("Maximum queue capacity reached")
if !pq.claim(req) {
return ErrQueueIsFull
}

Expand All @@ -231,9 +280,26 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
// Inform the loop that there's some data to process
pq.putChan <- struct{}{}

// Flush the queue size to storage on every 10 reads. The flushed value is used to recover the queue size
// in case of the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.writeIndex % 10) == 0 {
if err := pq.writeQueueSize(ctx); err != nil {
pq.set.Logger.Error("Error writing queue size to storage", zap.Error(err))
}
}

return nil
}

// claim checks if the queue has enough capacity to store the request.
func (pq *persistentQueue[T]) claim(req T) bool {
if pq.capacityLimiterDisabled() {
return pq.indexSize() < pq.capacity
}
return pq.QueueCapacityLimiter.claim(req)

}

// getNextItem pulls the next available item from the persistent storage along with a callback function that should be
// called after the item is processed to clean up the storage. If no new item is available, returns false.
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), bool) {
Expand Down Expand Up @@ -274,6 +340,16 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
return request, nil, false
}

pq.releaseCapacity(request)

// Flush the queue size to storage on every 10 reads. The flushed value is used to recover the queue size
// in case of the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.writeIndex % 10) == 0 {
if qsErr := pq.writeQueueSize(ctx); qsErr != nil {
pq.set.Logger.Error("Error writing queue size to storage", zap.Error(err))
}
}

// Increase the reference count, so the client is not closed while the request is being processed.
// The client cannot be closed because we hold the lock since last we checked `stopped`.
pq.refClient++
Expand Down Expand Up @@ -301,6 +377,31 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
}, true
}

func (pq *persistentQueue[T]) releaseCapacity(req T) {
if pq.capacityLimiterDisabled() {
return
}

// If the recovered queue size is not emptied yet, decrease it first.
if pq.initIndexSize > 0 {
pq.initIndexSize--
if pq.initIndexSize == 0 {
pq.initQueueSize = 0
return
}
reqSize := pq.QueueCapacityLimiter.sizeOf(req)
if pq.initQueueSize < reqSize {
pq.initQueueSize = 0
return
}
pq.initQueueSize -= reqSize
return
}

// Otherwise, decrease the current queue size.
pq.QueueCapacityLimiter.release(req)
}

// retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
// and moves the items at the back of the queue.
func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Context) {
Expand Down
Loading

0 comments on commit 6da46cd

Please sign in to comment.