diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go index ae62704ff5b..43b6baf7d02 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -40,17 +40,9 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil) } -// Consume applies the provided function on the head of queue. -// The call blocks until there is an item available or the queue is stopped. -// The function returns true when an item is consumed or false if the queue is stopped and emptied. -func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { +func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) { item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) - if !ok { - return false - } - // the memory queue doesn't handle consume errors - _ = consumeFunc(item.ctx, item.req) - return true + return 0, item.ctx, item.req, ok } // Should be called to remove the item of the given index from the queue once processing is finished. diff --git a/exporter/internal/queue/bounded_memory_queue_test.go b/exporter/internal/queue/bounded_memory_queue_test.go index 3884a50cc84..4ffbf0c9738 100644 --- a/exporter/internal/queue/bounded_memory_queue_test.go +++ b/exporter/internal/queue/bounded_memory_queue_test.go @@ -26,7 +26,7 @@ func TestBoundedQueue(t *testing.T) { require.NoError(t, q.Offer(context.Background(), "a")) numConsumed := 0 - assert.True(t, q.Consume(func(_ context.Context, item string) error { + assert.True(t, consume(q, func(_ context.Context, item string) error { assert.Equal(t, "a", item) numConsumed++ return nil @@ -42,7 +42,7 @@ func TestBoundedQueue(t *testing.T) { require.ErrorIs(t, q.Offer(context.Background(), "c"), ErrQueueIsFull) assert.Equal(t, 1, q.Size()) - assert.True(t, q.Consume(func(_ context.Context, item string) error { + assert.True(t, consume(q, func(_ context.Context, item string) error { assert.Equal(t, "b", item) numConsumed++ return nil @@ -51,7 +51,7 @@ func TestBoundedQueue(t *testing.T) { for _, toAddItem := range []string{"d", "e", "f"} { require.NoError(t, q.Offer(context.Background(), toAddItem)) - assert.True(t, q.Consume(func(_ context.Context, item string) error { + assert.True(t, consume(q, func(_ context.Context, item string) error { assert.Equal(t, toAddItem, item) numConsumed++ return nil @@ -59,7 +59,7 @@ func TestBoundedQueue(t *testing.T) { } assert.Equal(t, 5, numConsumed) require.NoError(t, q.Shutdown(context.Background())) - assert.False(t, q.Consume(func(_ context.Context, item string) error { + assert.False(t, consume(q, func(_ context.Context, item string) error { panic(item) })) } @@ -82,7 +82,7 @@ func TestShutdownWhileNotEmpty(t *testing.T) { assert.Equal(t, 10, q.Size()) numConsumed := 0 for i := 0; i < 10; i++ { - assert.True(t, q.Consume(func(_ context.Context, item string) error { + assert.True(t, consume(q, func(_ context.Context, item string) error { assert.Equal(t, strconv.FormatInt(int64(i), 10), item) numConsumed++ return nil @@ -91,7 +91,7 @@ func TestShutdownWhileNotEmpty(t *testing.T) { assert.Equal(t, 10, numConsumed) assert.Equal(t, 0, q.Size()) - assert.False(t, q.Consume(func(_ context.Context, item string) error { + assert.False(t, consume(q, func(_ context.Context, item string) error { panic(item) })) } diff --git a/exporter/internal/queue/consumers.go b/exporter/internal/queue/consumers.go index 7c57fea9620..8eab2db6bbe 100644 --- a/exporter/internal/queue/consumers.go +++ b/exporter/internal/queue/consumers.go @@ -40,9 +40,12 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error { startWG.Done() defer qc.stopWG.Done() for { - if !qc.queue.Consume(qc.consumeFunc) { + index, ctx, req, ok := qc.queue.Read(context.Background()) + if !ok { return } + consumeErr := qc.consumeFunc(ctx, req) + qc.queue.OnProcessingFinished(index, consumeErr) } }() } diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index 31133b4b2d6..e378e41699c 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -189,37 +189,6 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) ( return bytesToItemIndex(val) } -// Consume applies the provided function on the head of queue. -// The call blocks until there is an item available or the queue is stopped. -// The function returns true when an item is consumed or false if the queue is stopped. -func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - for { - var ( - index uint64 - req T - consumed bool - ) - - // If we are stopped we still process all the other events in the channel before, but we - // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. - _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { - index, req, consumed = pq.getNextItem(context.Background()) - if !consumed { - return 0 - } - return pq.set.Sizer.Sizeof(req) - }) - if !ok { - return false - } - if consumed { - consumeErr := consumeFunc(context.Background(), req) - pq.OnProcessingFinished(index, consumeErr) - return true - } - } -} - func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error { // If the queue is not initialized, there is nothing to shut down. if pq.client == nil { @@ -304,6 +273,33 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { return nil } +func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context, T, bool) { + for { + var ( + index uint64 + req T + consumed bool + ) + _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { + size := int64(0) + index, req, consumed = pq.getNextItem(ctx) + if consumed { + size = pq.set.Sizer.Sizeof(req) + } + return size + }) + if !ok { + return 0, nil, req, false + } + if consumed { + return index, context.TODO(), req, true + } + + // If ok && !consumed, it means we are stopped. In this case, we still process all the other events + // in the channel before, so we will free the channel fast and get to the stop. + } +} + // getNextItem pulls the next available item from the persistent storage along with its index. Once processing is // finished, the index should be called with OnProcessingFinished to clean up the storage. If no new item is available, // returns false. diff --git a/exporter/internal/queue/persistent_queue_test.go b/exporter/internal/queue/persistent_queue_test.go index 61c9285d40b..81945d295c9 100644 --- a/exporter/internal/queue/persistent_queue_test.go +++ b/exporter/internal/queue/persistent_queue_test.go @@ -413,7 +413,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) { require.NoError(t, err) } assert.Equal(t, 3, ps.Size()) - require.True(t, ps.Consume(func(context.Context, tracesRequest) error { + require.True(t, consume(ps, func(context.Context, tracesRequest) error { return experr.NewShutdownErr(nil) })) assert.Equal(t, 2, ps.Size()) @@ -486,7 +486,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { // We should be able to pull all remaining items now for i := 0; i < 4; i++ { - newPs.Consume(func(_ context.Context, traces tracesRequest) error { + consume(newPs, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, req, traces) return nil }) @@ -520,7 +520,7 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { require.NoError(t, err) } - require.True(t, ps.Consume(func(context.Context, tracesRequest) error { + require.True(t, consume(ps, func(context.Context, tracesRequest) error { // put one more item in require.NoError(t, ps.Offer(context.Background(), req)) require.Equal(t, 5, ps.Size()) @@ -564,7 +564,7 @@ func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) { go func() { defer conWg.Done() for i := 0; i < 10; i++ { - assert.True(t, pq.Consume(func(context.Context, tracesRequest) error { return nil })) + assert.True(t, consume(pq, func(context.Context, tracesRequest) error { return nil })) } }() } @@ -615,13 +615,13 @@ func TestPersistentQueue_PutCloseReadClose(t *testing.T) { require.Equal(t, 2, newPs.Size()) // Let's read both of the elements we put - newPs.Consume(func(_ context.Context, traces tracesRequest) error { + consume(newPs, func(_ context.Context, traces tracesRequest) error { require.Equal(t, req, traces) return nil }) assert.Equal(t, 1, newPs.Size()) - newPs.Consume(func(_ context.Context, traces tracesRequest) error { + consume(newPs, func(_ context.Context, traces tracesRequest) error { require.Equal(t, req, traces) return nil }) @@ -663,7 +663,7 @@ func BenchmarkPersistentQueue_TraceSpans(b *testing.B) { } for i := 0; i < bb.N; i++ { - require.True(bb, ps.Consume(func(context.Context, tracesRequest) error { return nil })) + require.True(bb, consume(ps, func(context.Context, tracesRequest) error { return nil })) } require.NoError(b, ext.Shutdown(context.Background())) }) @@ -781,7 +781,7 @@ func TestPersistentQueue_StorageFull(t *testing.T) { // Subsequent items succeed, as deleting the first item frees enough space for the state update reqCount-- for i := reqCount; i > 0; i-- { - require.True(t, ps.Consume(func(context.Context, tracesRequest) error { return nil })) + require.True(t, consume(ps, func(context.Context, tracesRequest) error { return nil })) } // We should be able to put a new item in @@ -856,7 +856,7 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) { require.ErrorIs(t, pq.Offer(context.Background(), newTracesRequest(5, 5)), ErrQueueIsFull) assert.Equal(t, 100, pq.Size()) - assert.True(t, pq.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 40, traces.traces.SpanCount()) return nil })) @@ -874,13 +874,13 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) { // Check the combined queue size. assert.Equal(t, 70, newPQ.Size()) - assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 40, traces.traces.SpanCount()) return nil })) assert.Equal(t, 30, newPQ.Size()) - assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 20, traces.traces.SpanCount()) return nil })) @@ -901,7 +901,7 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(5, 5))) assert.Equal(t, 3, pq.Size()) - assert.True(t, pq.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 40, traces.traces.SpanCount()) return nil })) @@ -920,14 +920,14 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { assert.Equal(t, 12, newPQ.Size()) // Consuming a restored request should reduce the restored size by 20 but it should not go to below zero - assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 20, traces.traces.SpanCount()) return nil })) assert.Equal(t, 0, newPQ.Size()) // Consuming another restored request should not affect the restored size since it's already dropped to 0. - assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 25, traces.traces.SpanCount()) return nil })) @@ -937,7 +937,7 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { require.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(5, 5))) assert.Equal(t, 25, newPQ.Size()) - assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 10, traces.traces.SpanCount()) return nil })) @@ -961,7 +961,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { // Read the first request just to populate the read index in the storage. // Otherwise, the write index won't be restored either. - assert.True(t, pq.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 40, traces.traces.SpanCount()) return nil })) @@ -979,7 +979,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { // Queue is full require.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) - assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 20, traces.traces.SpanCount()) return nil })) @@ -988,7 +988,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { // Still full require.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) - assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 25, traces.traces.SpanCount()) return nil })) @@ -1015,7 +1015,7 @@ func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) { // Consume 30 items for i := 0; i < 3; i++ { - assert.True(t, pq.Consume(func(context.Context, tracesRequest) error { return nil })) + assert.True(t, consume(pq, func(context.Context, tracesRequest) error { return nil })) } // The used size is now 30, but the snapshot should have 50, because it's taken every 5 read/writes. assert.Equal(t, 30, pq.Size()) @@ -1027,12 +1027,12 @@ func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) { // In reality the size should be 30. Once the queue is drained, it will be updated to the correct size. assert.Equal(t, 50, newPQ.Size()) - assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil })) - assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil })) + assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil })) + assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil })) assert.Equal(t, 30, newPQ.Size()) // Now the size must be correctly reflected - assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil })) + assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil })) assert.Equal(t, 0, newPQ.Size()) assert.NoError(t, newPQ.Shutdown(context.Background())) diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go index 54e8346a575..ee79408361a 100644 --- a/exporter/internal/queue/queue.go +++ b/exporter/internal/queue/queue.go @@ -25,14 +25,15 @@ type Queue[T any] interface { // without violating capacity restrictions. If success returns no error. // It returns ErrQueueIsFull if no space is currently available. Offer(ctx context.Context, item T) error - // Consume applies the provided function on the head of queue. - // The call blocks until there is an item available or the queue is stopped. - // The function returns true when an item is consumed or false if the queue is stopped. - Consume(func(ctx context.Context, item T) error) bool // Size returns the current Size of the queue Size() int // Capacity returns the capacity of the queue. Capacity() int + // Read pulls the next available item from the queue along with its index. Once processing is + // finished, the index should be called with OnProcessingFinished to clean up the storage. + // The function blocks until an item is available or if the queue is stopped. + // Returns false if reading failed or if the queue is stopped. + Read(context.Context) (uint64, context.Context, T, bool) // Should be called to remove the item of the given index from the queue once processing is finished. OnProcessingFinished(index uint64, consumeErr error) } diff --git a/exporter/internal/queue/queue_test.go b/exporter/internal/queue/queue_test.go new file mode 100644 index 00000000000..88276a82db9 --- /dev/null +++ b/exporter/internal/queue/queue_test.go @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue + +import ( + "context" +) + +func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool { + index, ctx, req, ok := q.Read(context.Background()) + if !ok { + return false + } + consumeErr := consumeFunc(ctx, req) + q.OnProcessingFinished(index, consumeErr) + return true +}