diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/exporterhelper/internal/bounded_memory_queue_test.go index 176484ec034..86ad732e2dd 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue_test.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue_test.go @@ -7,10 +7,11 @@ package internal import ( "context" + "errors" "fmt" "strconv" + "sync" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -98,61 +99,80 @@ func TestShutdownWhileNotEmpty(t *testing.T) { } func Benchmark_QueueUsage_10000_1_50000(b *testing.B) { - queueUsage(b, 10000, 1, 50000) + benchmarkQueueUsage(b, 10000, 1, 50000) } func Benchmark_QueueUsage_10000_2_50000(b *testing.B) { - queueUsage(b, 10000, 2, 50000) + benchmarkQueueUsage(b, 10000, 2, 50000) } func Benchmark_QueueUsage_10000_5_50000(b *testing.B) { - queueUsage(b, 10000, 5, 50000) + benchmarkQueueUsage(b, 10000, 5, 50000) } func Benchmark_QueueUsage_10000_10_50000(b *testing.B) { - queueUsage(b, 10000, 10, 50000) + benchmarkQueueUsage(b, 10000, 10, 50000) } func Benchmark_QueueUsage_50000_1_50000(b *testing.B) { - queueUsage(b, 50000, 1, 50000) + benchmarkQueueUsage(b, 50000, 1, 50000) } func Benchmark_QueueUsage_50000_2_50000(b *testing.B) { - queueUsage(b, 50000, 2, 50000) + benchmarkQueueUsage(b, 50000, 2, 50000) } func Benchmark_QueueUsage_50000_5_50000(b *testing.B) { - queueUsage(b, 50000, 5, 50000) + benchmarkQueueUsage(b, 50000, 5, 50000) } func Benchmark_QueueUsage_50000_10_50000(b *testing.B) { - queueUsage(b, 50000, 10, 50000) + benchmarkQueueUsage(b, 50000, 10, 50000) } func Benchmark_QueueUsage_10000_1_250000(b *testing.B) { - queueUsage(b, 10000, 1, 250000) + benchmarkQueueUsage(b, 10000, 1, 250000) } func Benchmark_QueueUsage_10000_2_250000(b *testing.B) { - queueUsage(b, 10000, 2, 250000) + benchmarkQueueUsage(b, 10000, 2, 250000) } func Benchmark_QueueUsage_10000_5_250000(b *testing.B) { - queueUsage(b, 10000, 5, 250000) + benchmarkQueueUsage(b, 10000, 5, 250000) } func Benchmark_QueueUsage_10000_10_250000(b *testing.B) { - queueUsage(b, 10000, 10, 250000) + benchmarkQueueUsage(b, 10000, 10, 250000) } -func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int) { +func TestQueueUsage(t *testing.T) { + t.Run("with enough workers", func(t *testing.T) { + queueUsage(t, 10000, 5, 1000) + }) + t.Run("past capacity", func(t *testing.T) { + queueUsage(t, 10000, 2, 50000) + }) +} + +func benchmarkQueueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int) { b.ReportAllocs() for i := 0; i < b.N; i++ { - q := NewBoundedMemoryQueue[string](capacity) - consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) error { - time.Sleep(1 * time.Millisecond) - return nil - }) - require.NoError(b, consumers.Start(context.Background(), componenttest.NewNopHost())) - for j := 0; j < numberOfItems; j++ { - _ = q.Offer(context.Background(), fmt.Sprintf("%d", j)) + queueUsage(b, capacity, numConsumers, numberOfItems) + } +} + +func queueUsage(tb testing.TB, capacity int, numConsumers int, numberOfItems int) { + var wg sync.WaitGroup + wg.Add(numberOfItems) + q := NewBoundedMemoryQueue[string](capacity) + consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) error { + wg.Done() + return nil + }) + require.NoError(tb, consumers.Start(context.Background(), componenttest.NewNopHost())) + for j := 0; j < numberOfItems; j++ { + if err := q.Offer(context.Background(), fmt.Sprintf("%d", j)); errors.Is(err, ErrQueueIsFull) { + wg.Done() } - assert.NoError(b, consumers.Shutdown(context.Background())) } + assert.NoError(tb, consumers.Shutdown(context.Background())) + + wg.Wait() } func TestZeroSizeNoConsumers(t *testing.T) {