Skip to content

Commit

Permalink
[chore] [exporterhelper] Items based queue sizing with bounded channel (
Browse files Browse the repository at this point in the history
#9164)

Introduce an option to limit the queue size by the number of items
instead of number of requests. This is preliminary step for having the
exporter helper v2 with a batcher sender placed after the queue sender.
Otherwise, it'll be hard for the users to estimate the queue size based
on the number of requests without batch processor in front of it.

This change doesn't effect the existing functionality and the items
based queue limiting cannot be utilized yet.

Updates
#8122

Alternative to
#9147
  • Loading branch information
dmitryax authored Dec 22, 2023
1 parent e67cd8b commit fe2c989
Show file tree
Hide file tree
Showing 7 changed files with 525 additions and 167 deletions.
23 changes: 8 additions & 15 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,26 @@ import (
// the producer are dropped.
type boundedMemoryQueue[T any] struct {
component.StartFunc
*queueCapacityLimiter[T]
items chan queueRequest[T]
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T any](capacity int) Queue[T] {
func NewBoundedMemoryQueue[T any](sizer Sizer[T], capacity int) Queue[T] {
return &boundedMemoryQueue[T]{
items: make(chan queueRequest[T], capacity),
queueCapacityLimiter: newQueueCapacityLimiter[T](sizer, capacity),
items: make(chan queueRequest[T], capacity),
}
}

// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
select {
case q.items <- queueRequest[T]{ctx: ctx, req: req}:
return nil
default:
if !q.queueCapacityLimiter.claim(req) {
return ErrQueueIsFull
}
q.items <- queueRequest[T]{ctx: ctx, req: req}
return nil
}

// Consume applies the provided function on the head of queue.
Expand All @@ -45,6 +46,7 @@ func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) err
if !ok {
return false
}
q.queueCapacityLimiter.release(item.req)
// the memory queue doesn't handle consume errors
_ = consumeFunc(item.ctx, item.req)
return true
Expand All @@ -56,15 +58,6 @@ func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
return nil
}

// Size returns the current size of the queue
func (q *boundedMemoryQueue[T]) Size() int {
return len(q.items)
}

func (q *boundedMemoryQueue[T]) Capacity() int {
return cap(q.items)
}

type queueRequest[T any] struct {
req T
ctx context.Context
Expand Down
72 changes: 33 additions & 39 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package internal
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"testing"
Expand All @@ -23,7 +22,7 @@ import (
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func TestBoundedQueue(t *testing.T) {
q := NewBoundedMemoryQueue[string](1)
q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 1)

assert.NoError(t, q.Offer(context.Background(), "a"))

Expand Down Expand Up @@ -73,7 +72,7 @@ func TestBoundedQueue(t *testing.T) {
// only after Stop will mean the consumers are still locked while
// trying to perform the final consumptions.
func TestShutdownWhileNotEmpty(t *testing.T) {
q := NewBoundedMemoryQueue[string](1000)
q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 1000)

assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
for i := 0; i < 10; i++ {
Expand All @@ -98,75 +97,70 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
}))
}

func Benchmark_QueueUsage_10000_1_50000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 1, 50000)
func Benchmark_QueueUsage_10000_requests_1_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 1, 50000)
}

func Benchmark_QueueUsage_10000_2_50000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 2, 50000)
}
func Benchmark_QueueUsage_10000_5_50000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 5, 50000)
}
func Benchmark_QueueUsage_10000_10_50000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 10, 50000)
func Benchmark_QueueUsage_10000_requests_10_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 10, 50000)
}

func Benchmark_QueueUsage_50000_1_50000(b *testing.B) {
benchmarkQueueUsage(b, 50000, 1, 50000)
func Benchmark_QueueUsage_50000_requests_1_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 50000, 1, 50000)
}

func Benchmark_QueueUsage_50000_2_50000(b *testing.B) {
benchmarkQueueUsage(b, 50000, 2, 50000)
}
func Benchmark_QueueUsage_50000_5_50000(b *testing.B) {
benchmarkQueueUsage(b, 50000, 5, 50000)
func Benchmark_QueueUsage_50000_requests_10_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 50000, 10, 50000)
}
func Benchmark_QueueUsage_50000_10_50000(b *testing.B) {
benchmarkQueueUsage(b, 50000, 10, 50000)

func Benchmark_QueueUsage_10000_requests_1_250000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 1, 250000)
}

func Benchmark_QueueUsage_10000_1_250000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 1, 250000)
func Benchmark_QueueUsage_10000_requests_10_250000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 10, 250000)
}

func Benchmark_QueueUsage_10000_2_250000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 2, 250000)
func Benchmark_QueueUsage_1M_items_10_250k(b *testing.B) {
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 1000000, 10, 250000)
}
func Benchmark_QueueUsage_10000_5_250000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 5, 250000)

func Benchmark_QueueUsage_1M_items_10_1M(b *testing.B) {
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 1000000, 10, 1000000)
}
func Benchmark_QueueUsage_10000_10_250000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 10, 250000)

func Benchmark_QueueUsage_100M_items_10_10M(b *testing.B) {
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 100000000, 10, 10000000)
}

func TestQueueUsage(t *testing.T) {
t.Run("with enough workers", func(t *testing.T) {
queueUsage(t, 10000, 5, 1000)
queueUsage(t, &RequestSizer[fakeReq]{}, 10000, 5, 1000)
})
t.Run("past capacity", func(t *testing.T) {
queueUsage(t, 10000, 2, 50000)
queueUsage(t, &RequestSizer[fakeReq]{}, 10000, 2, 50000)
})
}

func benchmarkQueueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int) {
func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], capacity int, numConsumers int,
numberOfItems int) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
queueUsage(b, capacity, numConsumers, numberOfItems)
queueUsage(b, sizer, capacity, numConsumers, numberOfItems)
}
}

func queueUsage(tb testing.TB, capacity int, numConsumers int, numberOfItems int) {
func queueUsage(tb testing.TB, sizer Sizer[fakeReq], capacity int, numConsumers int, numberOfItems int) {
var wg sync.WaitGroup
wg.Add(numberOfItems)
q := NewBoundedMemoryQueue[string](capacity)
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) error {
q := NewBoundedMemoryQueue[fakeReq](sizer, capacity)
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, fakeReq) error {
wg.Done()
return nil
})
require.NoError(tb, consumers.Start(context.Background(), componenttest.NewNopHost()))
for j := 0; j < numberOfItems; j++ {
if err := q.Offer(context.Background(), fmt.Sprintf("%d", j)); errors.Is(err, ErrQueueIsFull) {
if err := q.Offer(context.Background(), fakeReq{10}); errors.Is(err, ErrQueueIsFull) {
wg.Done()
}
}
Expand All @@ -176,7 +170,7 @@ func queueUsage(tb testing.TB, capacity int, numConsumers int, numberOfItems int
}

func TestZeroSizeNoConsumers(t *testing.T) {
q := NewBoundedMemoryQueue[string](0)
q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 0)

err := q.Start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit fe2c989

Please sign in to comment.