Skip to content

Commit

Permalink
[chore] [exporterhelper] Add an option for items based queue sizing
Browse files Browse the repository at this point in the history
Introduce an option to limit the queue size by the number of items instead of number of requests. This is preliminary step for having the exporter helper v2 with a batcher sender placed after the queue sender. Otherwise, it'll be hard for the users to estimate the queue size based on the number of requests without batch processor in front of it.

This change doesn't effect the existing functionality and the items based queue limiting cannot be utilized yet.
  • Loading branch information
dmitryax committed Dec 20, 2023
1 parent 44fbb84 commit 7af07ae
Show file tree
Hide file tree
Showing 9 changed files with 572 additions and 135 deletions.
32 changes: 14 additions & 18 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,55 +16,51 @@ import (
// the producer are dropped.
type boundedMemoryQueue[T any] struct {
component.StartFunc
items chan queueRequest[T]
QueueCapacityLimiter[T]
inCh chan<- queueRequest[T]
outCh <-chan queueRequest[T]
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T any](capacity int) Queue[T] {
func NewBoundedMemoryQueue[T any](capacityLimiter QueueCapacityLimiter[T]) Queue[T] {
inCh, outCh := newUnboundedChannel[queueRequest[T]]()
return &boundedMemoryQueue[T]{
items: make(chan queueRequest[T], capacity),
QueueCapacityLimiter: capacityLimiter,
inCh: inCh,
outCh: outCh,
}
}

// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
select {
case q.items <- queueRequest[T]{ctx: ctx, req: req}:
return nil
default:
if !q.QueueCapacityLimiter.claim(req) {
return ErrQueueIsFull
}
q.inCh <- queueRequest[T]{ctx: ctx, req: req}
return nil
}

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
item, ok := <-q.items
item, ok := <-q.outCh
if !ok {
return false
}
q.QueueCapacityLimiter.release(item.req)
// the memory queue doesn't handle consume errors
_ = consumeFunc(item.ctx, item.req)
return true
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
close(q.items)
close(q.inCh)
return nil
}

// Size returns the current size of the queue
func (q *boundedMemoryQueue[T]) Size() int {
return len(q.items)
}

func (q *boundedMemoryQueue[T]) Capacity() int {
return cap(q.items)
}

type queueRequest[T any] struct {
req T
ctx context.Context
Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func TestBoundedQueue(t *testing.T) {
q := NewBoundedMemoryQueue[string](1)
q := NewBoundedMemoryQueue[string](NewRequestsCapacityLimiter[string](1))

assert.NoError(t, q.Offer(context.Background(), "a"))

Expand Down Expand Up @@ -73,7 +73,7 @@ func TestBoundedQueue(t *testing.T) {
// only after Stop will mean the consumers are still locked while
// trying to perform the final consumptions.
func TestShutdownWhileNotEmpty(t *testing.T) {
q := NewBoundedMemoryQueue[string](1000)
q := NewBoundedMemoryQueue[string](NewRequestsCapacityLimiter[string](1000))

assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -159,7 +159,7 @@ func benchmarkQueueUsage(b *testing.B, capacity int, numConsumers int, numberOfI
func queueUsage(tb testing.TB, capacity int, numConsumers int, numberOfItems int) {
var wg sync.WaitGroup
wg.Add(numberOfItems)
q := NewBoundedMemoryQueue[string](capacity)
q := NewBoundedMemoryQueue[string](NewRequestsCapacityLimiter[string](capacity))
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) error {
wg.Done()
return nil
Expand All @@ -176,7 +176,7 @@ func queueUsage(tb testing.TB, capacity int, numConsumers int, numberOfItems int
}

func TestZeroSizeNoConsumers(t *testing.T) {
q := NewBoundedMemoryQueue[string](0)
q := NewBoundedMemoryQueue[string](NewRequestsCapacityLimiter[string](0))

err := q.Start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
Expand Down
150 changes: 116 additions & 34 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"fmt"
"strconv"
"sync"
"sync/atomic"

"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -41,20 +43,24 @@ import (
// index index x
// xxxx deleted
type persistentQueue[T any] struct {
QueueCapacityLimiter[T]

set exporter.CreateSettings
storageID component.ID
dataType component.DataType
client storage.Client
unmarshaler func(data []byte) (T, error)
marshaler func(req T) ([]byte, error)

putChan chan struct{}
capacity uint64
inCh chan<- struct{}
outCh <-chan struct{}

// mu guards everything declared below.
mu sync.Mutex
readIndex uint64
writeIndex uint64
initIndexSize uint64
initQueueSize *atomic.Uint64
currentlyDispatchedItems []uint64
refClient int64
stopped bool
Expand All @@ -68,6 +74,7 @@ const (
readIndexKey = "ri"
writeIndexKey = "wi"
currentlyDispatchedItemsKey = "di"
queueSizeKey = "si"
)

var (
Expand All @@ -78,15 +85,19 @@ var (
)

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue[T any](capacity int, dataType component.DataType, storageID component.ID, marshaler func(req T) ([]byte, error), unmarshaler func([]byte) (T, error), set exporter.CreateSettings) Queue[T] {
func NewPersistentQueue[T any](cl QueueCapacityLimiter[T], dataType component.DataType, storageID component.ID,
marshaler func(req T) ([]byte, error), unmarshaler func([]byte) (T, error), set exporter.CreateSettings) Queue[T] {
inCh, outCh := newUnboundedChannel[struct{}]()
return &persistentQueue[T]{
set: set,
storageID: storageID,
dataType: dataType,
unmarshaler: unmarshaler,
marshaler: marshaler,
capacity: uint64(capacity),
putChan: make(chan struct{}, capacity),
QueueCapacityLimiter: cl,
set: set,
storageID: storageID,
dataType: dataType,
unmarshaler: unmarshaler,
marshaler: marshaler,
initQueueSize: &atomic.Uint64{},
inCh: inCh,
outCh: outCh,
}
}

Expand All @@ -107,12 +118,6 @@ func (pq *persistentQueue[T]) initClient(ctx context.Context, client storage.Cli
pq.initPersistentContiguousStorage(ctx)
// Make sure the leftover requests are handled
pq.retrieveAndEnqueueNotDispatchedReqs(ctx)

// Ensure the communication channel has the same size as the queue
// We might already have items here from requeueing non-dispatched requests
for len(pq.putChan) < int(pq.size()) {
pq.putChan <- struct{}{}
}
}

func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Context) {
Expand All @@ -137,6 +142,39 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
pq.readIndex = 0
pq.writeIndex = 0
}
pq.initIndexSize = pq.writeIndex - pq.readIndex

// Ensure the communication channel has the same size as the queue
for i := 0; i < int(pq.initIndexSize); i++ {
pq.inCh <- struct{}{}
}

// Read snapshot of the queue size from storage. It's not a problem if the value cannot be fetched,
// or it's not accurate. The queue size will be corrected once the recovered queue is drained.
if pq.initIndexSize > 0 {
// If the queue is sized by the number of requests, no need to read the queue size from storage.
if _, ok := pq.QueueCapacityLimiter.(*requestsCapacityLimiter[T]); ok {
pq.initQueueSize.Store(pq.initIndexSize)
return
}

res, err := pq.client.Get(ctx, queueSizeKey)
if err == nil {
var restoredQueueSize uint64
restoredQueueSize, err = bytesToItemIndex(res)
pq.initQueueSize.Store(restoredQueueSize)
}
if err != nil {
if errors.Is(err, errValueNotSet) {
pq.set.Logger.Warn("Cannot read the queue size snapshot from storage. "+
"The reported queue size will be inaccurate until the initial queue is drained. "+
"It's expected when the items sized queue enabled for the first time", zap.Error(err))
} else {
pq.set.Logger.Error("Failed to read the queue size snapshot from storage. "+
"The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err))
}

Check warning on line 175 in exporter/exporterhelper/internal/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/persistent_queue.go#L173-L175

Added lines #L173 - L175 were not covered by tests
}
}
}

// Consume applies the provided function on the head of queue.
Expand All @@ -146,7 +184,7 @@ func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error
for {
// If we are stopped we still process all the other events in the channel before, but we
// return fast in the `getNextItem`, so we will free the channel fast and get to the stop.
_, ok := <-pq.putChan
_, ok := <-pq.outCh
if !ok {
return false
}
Expand All @@ -159,29 +197,32 @@ func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error
}
}

func (pq *persistentQueue[T]) size() uint64 {
return pq.writeIndex - pq.readIndex
}

// Size returns the number of currently available items, which were not picked by consumers yet
// Size returns the current size of the queue.
func (pq *persistentQueue[T]) Size() int {
pq.mu.Lock()
defer pq.mu.Unlock()
return int(pq.size())
}

// Capacity returns the number of currently available items, which were not picked by consumers yet
func (pq *persistentQueue[T]) Capacity() int {
return int(pq.capacity)
return int(pq.initQueueSize.Load()) + pq.QueueCapacityLimiter.Size()
}

func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error {
close(pq.putChan)
close(pq.inCh)
pq.mu.Lock()
defer pq.mu.Unlock()
// Mark this queue as stopped, so consumer don't start any more work.
pq.stopped = true
return pq.unrefClient(ctx)
return multierr.Combine(
pq.backupQueueSize(ctx),
pq.unrefClient(ctx),
)
}

// backupQueueSize writes the current queue size to storage. The value is used to recover the queue size
// in case if the collector is killed.
func (pq *persistentQueue[T]) backupQueueSize(ctx context.Context) error {
// No need to write the queue size if the queue is sized by the number of requests.
// That information is already stored as difference between read and write indexes.
if _, ok := pq.QueueCapacityLimiter.(*requestsCapacityLimiter[T]); ok {
return nil
}
return pq.client.Set(ctx, queueSizeKey, itemIndexToBytes(uint64(pq.Size())))
}

// unrefClient unrefs the client, and closes if no more references. Callers MUST hold the mutex.
Expand All @@ -205,7 +246,7 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {

// putInternal is the internal version that requires caller to hold the mutex lock.
func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
if pq.size() >= pq.capacity {
if !pq.QueueCapacityLimiter.claim(req) {
pq.set.Logger.Warn("Maximum queue capacity reached")
return ErrQueueIsFull
}
Expand All @@ -224,12 +265,21 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
storage.SetOperation(itemKey, reqBuf),
}
if storageErr := pq.client.Batch(ctx, ops...); storageErr != nil {
pq.QueueCapacityLimiter.release(req)
return storageErr
}

pq.writeIndex = newIndex
// Inform the loop that there's some data to process
pq.putChan <- struct{}{}
pq.inCh <- struct{}{}

// Back up the queue size to storage every 10 writes. The stored value is used to recover the queue size
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.writeIndex % 10) == 5 {
if err := pq.backupQueueSize(ctx); err != nil {
pq.set.Logger.Error("Error writing queue size to storage", zap.Error(err))
}

Check warning on line 281 in exporter/exporterhelper/internal/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/persistent_queue.go#L280-L281

Added lines #L280 - L281 were not covered by tests
}

return nil
}
Expand Down Expand Up @@ -274,6 +324,16 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
return request, nil, false
}

pq.releaseCapacity(request)

// Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.writeIndex % 10) == 0 {
if qsErr := pq.backupQueueSize(ctx); qsErr != nil {
pq.set.Logger.Error("Error writing queue size to storage", zap.Error(err))
}

Check warning on line 334 in exporter/exporterhelper/internal/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/persistent_queue.go#L333-L334

Added lines #L333 - L334 were not covered by tests
}

// Increase the reference count, so the client is not closed while the request is being processed.
// The client cannot be closed because we hold the lock since last we checked `stopped`.
pq.refClient++
Expand Down Expand Up @@ -301,6 +361,28 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
}, true
}

// releaseCapacity releases the capacity of the queue. The caller must hold the mutex.
func (pq *persistentQueue[T]) releaseCapacity(req T) {
// If the recovered queue size is not emptied yet, decrease it first.
if pq.initIndexSize > 0 {
pq.initIndexSize--
if pq.initIndexSize == 0 {
pq.initQueueSize.Store(0)
return
}
reqSize := pq.QueueCapacityLimiter.sizeOf(req)
if pq.initQueueSize.Load() < reqSize {
pq.initQueueSize.Store(0)
return
}
pq.initQueueSize.Add(^(reqSize - 1))
return
}

// Otherwise, decrease the current queue size.
pq.QueueCapacityLimiter.release(req)
}

// retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
// and moves the items at the back of the queue.
func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Context) {
Expand Down
Loading

0 comments on commit 7af07ae

Please sign in to comment.