Skip to content

Commit

Permalink
[chore] [exporterhelper] Add an option for items based queue sizing
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Dec 20, 2023
1 parent 44fbb84 commit 5d5ac37
Show file tree
Hide file tree
Showing 7 changed files with 536 additions and 163 deletions.
23 changes: 8 additions & 15 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,26 @@ import (
// the producer are dropped.
type boundedMemoryQueue[T any] struct {
component.StartFunc
QueueCapacityLimiter[T]
items chan queueRequest[T]
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T any](capacity int) Queue[T] {
func NewBoundedMemoryQueue[T any](capacityLimiter QueueCapacityLimiter[T]) Queue[T] {
return &boundedMemoryQueue[T]{
items: make(chan queueRequest[T], capacity),
QueueCapacityLimiter: capacityLimiter,
items: make(chan queueRequest[T], capacityLimiter.Capacity()),
}
}

// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
select {
case q.items <- queueRequest[T]{ctx: ctx, req: req}:
return nil
default:
if !q.QueueCapacityLimiter.claim(req) {
return ErrQueueIsFull
}
q.items <- queueRequest[T]{ctx: ctx, req: req}
return nil
}

// Consume applies the provided function on the head of queue.
Expand All @@ -45,6 +46,7 @@ func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) err
if !ok {
return false
}
q.QueueCapacityLimiter.release(item.req)
// the memory queue doesn't handle consume errors
_ = consumeFunc(item.ctx, item.req)
return true
Expand All @@ -56,15 +58,6 @@ func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
return nil
}

// Size returns the current size of the queue
func (q *boundedMemoryQueue[T]) Size() int {
return len(q.items)
}

func (q *boundedMemoryQueue[T]) Capacity() int {
return cap(q.items)
}

type queueRequest[T any] struct {
req T
ctx context.Context
Expand Down
71 changes: 32 additions & 39 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package internal
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"testing"
Expand All @@ -23,7 +22,7 @@ import (
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func TestBoundedQueue(t *testing.T) {
q := NewBoundedMemoryQueue[string](1)
q := NewBoundedMemoryQueue[string](NewRequestsCapacityLimiter[string](1))

assert.NoError(t, q.Offer(context.Background(), "a"))

Expand Down Expand Up @@ -73,7 +72,7 @@ func TestBoundedQueue(t *testing.T) {
// only after Stop will mean the consumers are still locked while
// trying to perform the final consumptions.
func TestShutdownWhileNotEmpty(t *testing.T) {
q := NewBoundedMemoryQueue[string](1000)
q := NewBoundedMemoryQueue[string](NewRequestsCapacityLimiter[string](1000))

assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
for i := 0; i < 10; i++ {
Expand All @@ -98,75 +97,69 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
}))
}

func Benchmark_QueueUsage_10000_1_50000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 1, 50000)
func Benchmark_QueueUsage_10000_requests_1_50000(b *testing.B) {
benchmarkQueueUsage(b, NewRequestsCapacityLimiter[fakeReq](10000), 1, 50000)
}

func Benchmark_QueueUsage_10000_2_50000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 2, 50000)
}
func Benchmark_QueueUsage_10000_5_50000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 5, 50000)
}
func Benchmark_QueueUsage_10000_10_50000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 10, 50000)
func Benchmark_QueueUsage_10000_requests_10_50000(b *testing.B) {
benchmarkQueueUsage(b, NewRequestsCapacityLimiter[fakeReq](10000), 10, 50000)
}

func Benchmark_QueueUsage_50000_1_50000(b *testing.B) {
benchmarkQueueUsage(b, 50000, 1, 50000)
func Benchmark_QueueUsage_50000_requests_1_50000(b *testing.B) {
benchmarkQueueUsage(b, NewRequestsCapacityLimiter[fakeReq](50000), 1, 50000)
}

func Benchmark_QueueUsage_50000_2_50000(b *testing.B) {
benchmarkQueueUsage(b, 50000, 2, 50000)
}
func Benchmark_QueueUsage_50000_5_50000(b *testing.B) {
benchmarkQueueUsage(b, 50000, 5, 50000)
func Benchmark_QueueUsage_50000_requests_10_50000(b *testing.B) {
benchmarkQueueUsage(b, NewRequestsCapacityLimiter[fakeReq](50000), 10, 50000)
}
func Benchmark_QueueUsage_50000_10_50000(b *testing.B) {
benchmarkQueueUsage(b, 50000, 10, 50000)

func Benchmark_QueueUsage_10000_requests_1_250000(b *testing.B) {
benchmarkQueueUsage(b, NewRequestsCapacityLimiter[fakeReq](10000), 1, 250000)
}

func Benchmark_QueueUsage_10000_1_250000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 1, 250000)
func Benchmark_QueueUsage_10000_requests_10_250000(b *testing.B) {
benchmarkQueueUsage(b, NewRequestsCapacityLimiter[fakeReq](10000), 10, 250000)
}

func Benchmark_QueueUsage_10000_2_250000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 2, 250000)
func Benchmark_QueueUsage_1M_items_10_250k(b *testing.B) {
benchmarkQueueUsage(b, NewItemsCapacityLimiter[fakeReq](1000000), 10, 250000)
}
func Benchmark_QueueUsage_10000_5_250000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 5, 250000)

func Benchmark_QueueUsage_1M_items_10_1M(b *testing.B) {
benchmarkQueueUsage(b, NewItemsCapacityLimiter[fakeReq](1000000), 10, 1000000)
}
func Benchmark_QueueUsage_10000_10_250000(b *testing.B) {
benchmarkQueueUsage(b, 10000, 10, 250000)

func Benchmark_QueueUsage_100M_items_10_10M(b *testing.B) {
benchmarkQueueUsage(b, NewItemsCapacityLimiter[fakeReq](100000000), 10, 10000000)
}

func TestQueueUsage(t *testing.T) {
t.Run("with enough workers", func(t *testing.T) {
queueUsage(t, 10000, 5, 1000)
queueUsage(t, NewRequestsCapacityLimiter[fakeReq](10000), 5, 1000)
})
t.Run("past capacity", func(t *testing.T) {
queueUsage(t, 10000, 2, 50000)
queueUsage(t, NewRequestsCapacityLimiter[fakeReq](10000), 2, 50000)
})
}

func benchmarkQueueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int) {
func benchmarkQueueUsage(b *testing.B, capacityLimiter QueueCapacityLimiter[fakeReq], numConsumers int, numberOfItems int) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
queueUsage(b, capacity, numConsumers, numberOfItems)
queueUsage(b, capacityLimiter, numConsumers, numberOfItems)
}
}

func queueUsage(tb testing.TB, capacity int, numConsumers int, numberOfItems int) {
func queueUsage(tb testing.TB, capacityLimiter QueueCapacityLimiter[fakeReq], numConsumers int, numberOfItems int) {
var wg sync.WaitGroup
wg.Add(numberOfItems)
q := NewBoundedMemoryQueue[string](capacity)
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) error {
q := NewBoundedMemoryQueue[fakeReq](capacityLimiter)
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, fakeReq) error {
wg.Done()
return nil
})
require.NoError(tb, consumers.Start(context.Background(), componenttest.NewNopHost()))
for j := 0; j < numberOfItems; j++ {
if err := q.Offer(context.Background(), fmt.Sprintf("%d", j)); errors.Is(err, ErrQueueIsFull) {
if err := q.Offer(context.Background(), fakeReq{10}); errors.Is(err, ErrQueueIsFull) {
wg.Done()
}
}
Expand All @@ -176,7 +169,7 @@ func queueUsage(tb testing.TB, capacity int, numConsumers int, numberOfItems int
}

func TestZeroSizeNoConsumers(t *testing.T) {
q := NewBoundedMemoryQueue[string](0)
q := NewBoundedMemoryQueue[string](NewRequestsCapacityLimiter[string](0))

err := q.Start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit 5d5ac37

Please sign in to comment.