From 8cbe4d2aba9983435e186f2aa9f29386ebe19f19 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Fri, 11 Oct 2024 10:17:38 -0700 Subject: [PATCH] [exporter] exporter queue Read() (#11396) #### Description This PR adds a public function `GetNextItem` to queue (both persistent queue and bounded memory queue) Why this change? Instead of blocking until consumption of the item is done, we would like to separate the API for reading and committing consumption. Before: `Consume(consumeFunc)` After: `idx, item = Read()` `OnProcessingFinished(idx)` #### Link to tracking issue https://github.com/open-telemetry/opentelemetry-collector/issues/8122 https://github.com/open-telemetry/opentelemetry-collector/issues/10368 #### Testing #### Documentation --- .../internal/queue/bounded_memory_queue.go | 12 +--- .../queue/bounded_memory_queue_test.go | 12 ++-- exporter/internal/queue/consumers.go | 5 +- exporter/internal/queue/persistent_queue.go | 58 +++++++++---------- .../internal/queue/persistent_queue_test.go | 44 +++++++------- exporter/internal/queue/queue.go | 9 +-- exporter/internal/queue/queue_test.go | 18 ++++++ 7 files changed, 84 insertions(+), 74 deletions(-) create mode 100644 exporter/internal/queue/queue_test.go diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go index ae62704ff5b..43b6baf7d02 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -40,17 +40,9 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil) } -// Consume applies the provided function on the head of queue. -// The call blocks until there is an item available or the queue is stopped. -// The function returns true when an item is consumed or false if the queue is stopped and emptied. -func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { +func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) { item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) - if !ok { - return false - } - // the memory queue doesn't handle consume errors - _ = consumeFunc(item.ctx, item.req) - return true + return 0, item.ctx, item.req, ok } // Should be called to remove the item of the given index from the queue once processing is finished. diff --git a/exporter/internal/queue/bounded_memory_queue_test.go b/exporter/internal/queue/bounded_memory_queue_test.go index 3884a50cc84..4ffbf0c9738 100644 --- a/exporter/internal/queue/bounded_memory_queue_test.go +++ b/exporter/internal/queue/bounded_memory_queue_test.go @@ -26,7 +26,7 @@ func TestBoundedQueue(t *testing.T) { require.NoError(t, q.Offer(context.Background(), "a")) numConsumed := 0 - assert.True(t, q.Consume(func(_ context.Context, item string) error { + assert.True(t, consume(q, func(_ context.Context, item string) error { assert.Equal(t, "a", item) numConsumed++ return nil @@ -42,7 +42,7 @@ func TestBoundedQueue(t *testing.T) { require.ErrorIs(t, q.Offer(context.Background(), "c"), ErrQueueIsFull) assert.Equal(t, 1, q.Size()) - assert.True(t, q.Consume(func(_ context.Context, item string) error { + assert.True(t, consume(q, func(_ context.Context, item string) error { assert.Equal(t, "b", item) numConsumed++ return nil @@ -51,7 +51,7 @@ func TestBoundedQueue(t *testing.T) { for _, toAddItem := range []string{"d", "e", "f"} { require.NoError(t, q.Offer(context.Background(), toAddItem)) - assert.True(t, q.Consume(func(_ context.Context, item string) error { + assert.True(t, consume(q, func(_ context.Context, item string) error { assert.Equal(t, toAddItem, item) numConsumed++ return nil @@ -59,7 +59,7 @@ func TestBoundedQueue(t *testing.T) { } assert.Equal(t, 5, numConsumed) require.NoError(t, q.Shutdown(context.Background())) - assert.False(t, q.Consume(func(_ context.Context, item string) error { + assert.False(t, consume(q, func(_ context.Context, item string) error { panic(item) })) } @@ -82,7 +82,7 @@ func TestShutdownWhileNotEmpty(t *testing.T) { assert.Equal(t, 10, q.Size()) numConsumed := 0 for i := 0; i < 10; i++ { - assert.True(t, q.Consume(func(_ context.Context, item string) error { + assert.True(t, consume(q, func(_ context.Context, item string) error { assert.Equal(t, strconv.FormatInt(int64(i), 10), item) numConsumed++ return nil @@ -91,7 +91,7 @@ func TestShutdownWhileNotEmpty(t *testing.T) { assert.Equal(t, 10, numConsumed) assert.Equal(t, 0, q.Size()) - assert.False(t, q.Consume(func(_ context.Context, item string) error { + assert.False(t, consume(q, func(_ context.Context, item string) error { panic(item) })) } diff --git a/exporter/internal/queue/consumers.go b/exporter/internal/queue/consumers.go index 7c57fea9620..8eab2db6bbe 100644 --- a/exporter/internal/queue/consumers.go +++ b/exporter/internal/queue/consumers.go @@ -40,9 +40,12 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error { startWG.Done() defer qc.stopWG.Done() for { - if !qc.queue.Consume(qc.consumeFunc) { + index, ctx, req, ok := qc.queue.Read(context.Background()) + if !ok { return } + consumeErr := qc.consumeFunc(ctx, req) + qc.queue.OnProcessingFinished(index, consumeErr) } }() } diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index 31133b4b2d6..e378e41699c 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -189,37 +189,6 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) ( return bytesToItemIndex(val) } -// Consume applies the provided function on the head of queue. -// The call blocks until there is an item available or the queue is stopped. -// The function returns true when an item is consumed or false if the queue is stopped. -func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - for { - var ( - index uint64 - req T - consumed bool - ) - - // If we are stopped we still process all the other events in the channel before, but we - // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. - _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { - index, req, consumed = pq.getNextItem(context.Background()) - if !consumed { - return 0 - } - return pq.set.Sizer.Sizeof(req) - }) - if !ok { - return false - } - if consumed { - consumeErr := consumeFunc(context.Background(), req) - pq.OnProcessingFinished(index, consumeErr) - return true - } - } -} - func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error { // If the queue is not initialized, there is nothing to shut down. if pq.client == nil { @@ -304,6 +273,33 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { return nil } +func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context, T, bool) { + for { + var ( + index uint64 + req T + consumed bool + ) + _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { + size := int64(0) + index, req, consumed = pq.getNextItem(ctx) + if consumed { + size = pq.set.Sizer.Sizeof(req) + } + return size + }) + if !ok { + return 0, nil, req, false + } + if consumed { + return index, context.TODO(), req, true + } + + // If ok && !consumed, it means we are stopped. In this case, we still process all the other events + // in the channel before, so we will free the channel fast and get to the stop. + } +} + // getNextItem pulls the next available item from the persistent storage along with its index. Once processing is // finished, the index should be called with OnProcessingFinished to clean up the storage. If no new item is available, // returns false. diff --git a/exporter/internal/queue/persistent_queue_test.go b/exporter/internal/queue/persistent_queue_test.go index 61c9285d40b..81945d295c9 100644 --- a/exporter/internal/queue/persistent_queue_test.go +++ b/exporter/internal/queue/persistent_queue_test.go @@ -413,7 +413,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) { require.NoError(t, err) } assert.Equal(t, 3, ps.Size()) - require.True(t, ps.Consume(func(context.Context, tracesRequest) error { + require.True(t, consume(ps, func(context.Context, tracesRequest) error { return experr.NewShutdownErr(nil) })) assert.Equal(t, 2, ps.Size()) @@ -486,7 +486,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { // We should be able to pull all remaining items now for i := 0; i < 4; i++ { - newPs.Consume(func(_ context.Context, traces tracesRequest) error { + consume(newPs, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, req, traces) return nil }) @@ -520,7 +520,7 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { require.NoError(t, err) } - require.True(t, ps.Consume(func(context.Context, tracesRequest) error { + require.True(t, consume(ps, func(context.Context, tracesRequest) error { // put one more item in require.NoError(t, ps.Offer(context.Background(), req)) require.Equal(t, 5, ps.Size()) @@ -564,7 +564,7 @@ func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) { go func() { defer conWg.Done() for i := 0; i < 10; i++ { - assert.True(t, pq.Consume(func(context.Context, tracesRequest) error { return nil })) + assert.True(t, consume(pq, func(context.Context, tracesRequest) error { return nil })) } }() } @@ -615,13 +615,13 @@ func TestPersistentQueue_PutCloseReadClose(t *testing.T) { require.Equal(t, 2, newPs.Size()) // Let's read both of the elements we put - newPs.Consume(func(_ context.Context, traces tracesRequest) error { + consume(newPs, func(_ context.Context, traces tracesRequest) error { require.Equal(t, req, traces) return nil }) assert.Equal(t, 1, newPs.Size()) - newPs.Consume(func(_ context.Context, traces tracesRequest) error { + consume(newPs, func(_ context.Context, traces tracesRequest) error { require.Equal(t, req, traces) return nil }) @@ -663,7 +663,7 @@ func BenchmarkPersistentQueue_TraceSpans(b *testing.B) { } for i := 0; i < bb.N; i++ { - require.True(bb, ps.Consume(func(context.Context, tracesRequest) error { return nil })) + require.True(bb, consume(ps, func(context.Context, tracesRequest) error { return nil })) } require.NoError(b, ext.Shutdown(context.Background())) }) @@ -781,7 +781,7 @@ func TestPersistentQueue_StorageFull(t *testing.T) { // Subsequent items succeed, as deleting the first item frees enough space for the state update reqCount-- for i := reqCount; i > 0; i-- { - require.True(t, ps.Consume(func(context.Context, tracesRequest) error { return nil })) + require.True(t, consume(ps, func(context.Context, tracesRequest) error { return nil })) } // We should be able to put a new item in @@ -856,7 +856,7 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) { require.ErrorIs(t, pq.Offer(context.Background(), newTracesRequest(5, 5)), ErrQueueIsFull) assert.Equal(t, 100, pq.Size()) - assert.True(t, pq.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 40, traces.traces.SpanCount()) return nil })) @@ -874,13 +874,13 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) { // Check the combined queue size. assert.Equal(t, 70, newPQ.Size()) - assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 40, traces.traces.SpanCount()) return nil })) assert.Equal(t, 30, newPQ.Size()) - assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 20, traces.traces.SpanCount()) return nil })) @@ -901,7 +901,7 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(5, 5))) assert.Equal(t, 3, pq.Size()) - assert.True(t, pq.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 40, traces.traces.SpanCount()) return nil })) @@ -920,14 +920,14 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { assert.Equal(t, 12, newPQ.Size()) // Consuming a restored request should reduce the restored size by 20 but it should not go to below zero - assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 20, traces.traces.SpanCount()) return nil })) assert.Equal(t, 0, newPQ.Size()) // Consuming another restored request should not affect the restored size since it's already dropped to 0. - assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 25, traces.traces.SpanCount()) return nil })) @@ -937,7 +937,7 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { require.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(5, 5))) assert.Equal(t, 25, newPQ.Size()) - assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 10, traces.traces.SpanCount()) return nil })) @@ -961,7 +961,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { // Read the first request just to populate the read index in the storage. // Otherwise, the write index won't be restored either. - assert.True(t, pq.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 40, traces.traces.SpanCount()) return nil })) @@ -979,7 +979,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { // Queue is full require.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) - assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 20, traces.traces.SpanCount()) return nil })) @@ -988,7 +988,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { // Still full require.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) - assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error { + assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { assert.Equal(t, 25, traces.traces.SpanCount()) return nil })) @@ -1015,7 +1015,7 @@ func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) { // Consume 30 items for i := 0; i < 3; i++ { - assert.True(t, pq.Consume(func(context.Context, tracesRequest) error { return nil })) + assert.True(t, consume(pq, func(context.Context, tracesRequest) error { return nil })) } // The used size is now 30, but the snapshot should have 50, because it's taken every 5 read/writes. assert.Equal(t, 30, pq.Size()) @@ -1027,12 +1027,12 @@ func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) { // In reality the size should be 30. Once the queue is drained, it will be updated to the correct size. assert.Equal(t, 50, newPQ.Size()) - assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil })) - assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil })) + assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil })) + assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil })) assert.Equal(t, 30, newPQ.Size()) // Now the size must be correctly reflected - assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil })) + assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil })) assert.Equal(t, 0, newPQ.Size()) assert.NoError(t, newPQ.Shutdown(context.Background())) diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go index 54e8346a575..ee79408361a 100644 --- a/exporter/internal/queue/queue.go +++ b/exporter/internal/queue/queue.go @@ -25,14 +25,15 @@ type Queue[T any] interface { // without violating capacity restrictions. If success returns no error. // It returns ErrQueueIsFull if no space is currently available. Offer(ctx context.Context, item T) error - // Consume applies the provided function on the head of queue. - // The call blocks until there is an item available or the queue is stopped. - // The function returns true when an item is consumed or false if the queue is stopped. - Consume(func(ctx context.Context, item T) error) bool // Size returns the current Size of the queue Size() int // Capacity returns the capacity of the queue. Capacity() int + // Read pulls the next available item from the queue along with its index. Once processing is + // finished, the index should be called with OnProcessingFinished to clean up the storage. + // The function blocks until an item is available or if the queue is stopped. + // Returns false if reading failed or if the queue is stopped. + Read(context.Context) (uint64, context.Context, T, bool) // Should be called to remove the item of the given index from the queue once processing is finished. OnProcessingFinished(index uint64, consumeErr error) } diff --git a/exporter/internal/queue/queue_test.go b/exporter/internal/queue/queue_test.go new file mode 100644 index 00000000000..88276a82db9 --- /dev/null +++ b/exporter/internal/queue/queue_test.go @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue + +import ( + "context" +) + +func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool { + index, ctx, req, ok := q.Read(context.Background()) + if !ok { + return false + } + consumeErr := consumeFunc(ctx, req) + q.OnProcessingFinished(index, consumeErr) + return true +}