Skip to content

Commit

Permalink
[exporter] exporter queue Read() (open-telemetry#11396)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This PR adds a public function `GetNextItem` to queue (both persistent
queue and bounded memory queue)

Why this change?
Instead of blocking until consumption of the item is done, we would like
to separate the API for reading and committing consumption.

Before:
`Consume(consumeFunc)`

After:
`idx, item = Read()`
`OnProcessingFinished(idx)`

<!-- Issue number if applicable -->
#### Link to tracking issue

open-telemetry#8122
open-telemetry#10368

<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
sfc-gh-sili authored and HongChenTW committed Dec 19, 2024
1 parent 6c512fe commit b6b7cc5
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 74 deletions.
12 changes: 2 additions & 10 deletions exporter/internal/queue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,9 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil)
}

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) })
if !ok {
return false
}
// the memory queue doesn't handle consume errors
_ = consumeFunc(item.ctx, item.req)
return true
return 0, item.ctx, item.req, ok
}

// Should be called to remove the item of the given index from the queue once processing is finished.
Expand Down
12 changes: 6 additions & 6 deletions exporter/internal/queue/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestBoundedQueue(t *testing.T) {
require.NoError(t, q.Offer(context.Background(), "a"))

numConsumed := 0
assert.True(t, q.Consume(func(_ context.Context, item string) error {
assert.True(t, consume(q, func(_ context.Context, item string) error {
assert.Equal(t, "a", item)
numConsumed++
return nil
Expand All @@ -42,7 +42,7 @@ func TestBoundedQueue(t *testing.T) {
require.ErrorIs(t, q.Offer(context.Background(), "c"), ErrQueueIsFull)
assert.Equal(t, 1, q.Size())

assert.True(t, q.Consume(func(_ context.Context, item string) error {
assert.True(t, consume(q, func(_ context.Context, item string) error {
assert.Equal(t, "b", item)
numConsumed++
return nil
Expand All @@ -51,15 +51,15 @@ func TestBoundedQueue(t *testing.T) {

for _, toAddItem := range []string{"d", "e", "f"} {
require.NoError(t, q.Offer(context.Background(), toAddItem))
assert.True(t, q.Consume(func(_ context.Context, item string) error {
assert.True(t, consume(q, func(_ context.Context, item string) error {
assert.Equal(t, toAddItem, item)
numConsumed++
return nil
}))
}
assert.Equal(t, 5, numConsumed)
require.NoError(t, q.Shutdown(context.Background()))
assert.False(t, q.Consume(func(_ context.Context, item string) error {
assert.False(t, consume(q, func(_ context.Context, item string) error {
panic(item)
}))
}
Expand All @@ -82,7 +82,7 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
assert.Equal(t, 10, q.Size())
numConsumed := 0
for i := 0; i < 10; i++ {
assert.True(t, q.Consume(func(_ context.Context, item string) error {
assert.True(t, consume(q, func(_ context.Context, item string) error {
assert.Equal(t, strconv.FormatInt(int64(i), 10), item)
numConsumed++
return nil
Expand All @@ -91,7 +91,7 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
assert.Equal(t, 10, numConsumed)
assert.Equal(t, 0, q.Size())

assert.False(t, q.Consume(func(_ context.Context, item string) error {
assert.False(t, consume(q, func(_ context.Context, item string) error {
panic(item)
}))
}
Expand Down
5 changes: 4 additions & 1 deletion exporter/internal/queue/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {
startWG.Done()
defer qc.stopWG.Done()
for {
if !qc.queue.Consume(qc.consumeFunc) {
index, ctx, req, ok := qc.queue.Read(context.Background())
if !ok {
return
}
consumeErr := qc.consumeFunc(ctx, req)
qc.queue.OnProcessingFinished(index, consumeErr)
}
}()
}
Expand Down
58 changes: 27 additions & 31 deletions exporter/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,37 +189,6 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) (
return bytesToItemIndex(val)
}

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
for {
var (
index uint64
req T
consumed bool
)

// If we are stopped we still process all the other events in the channel before, but we
// return fast in the `getNextItem`, so we will free the channel fast and get to the stop.
_, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 {
index, req, consumed = pq.getNextItem(context.Background())
if !consumed {
return 0
}
return pq.set.Sizer.Sizeof(req)
})
if !ok {
return false
}
if consumed {
consumeErr := consumeFunc(context.Background(), req)
pq.OnProcessingFinished(index, consumeErr)
return true
}
}
}

func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error {
// If the queue is not initialized, there is nothing to shut down.
if pq.client == nil {
Expand Down Expand Up @@ -304,6 +273,33 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
return nil
}

func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context, T, bool) {
for {
var (
index uint64
req T
consumed bool
)
_, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 {
size := int64(0)
index, req, consumed = pq.getNextItem(ctx)
if consumed {
size = pq.set.Sizer.Sizeof(req)
}
return size
})
if !ok {
return 0, nil, req, false
}
if consumed {
return index, context.TODO(), req, true
}

// If ok && !consumed, it means we are stopped. In this case, we still process all the other events
// in the channel before, so we will free the channel fast and get to the stop.
}
}

// getNextItem pulls the next available item from the persistent storage along with its index. Once processing is
// finished, the index should be called with OnProcessingFinished to clean up the storage. If no new item is available,
// returns false.
Expand Down
44 changes: 22 additions & 22 deletions exporter/internal/queue/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) {
require.NoError(t, err)
}
assert.Equal(t, 3, ps.Size())
require.True(t, ps.Consume(func(context.Context, tracesRequest) error {
require.True(t, consume(ps, func(context.Context, tracesRequest) error {
return experr.NewShutdownErr(nil)
}))
assert.Equal(t, 2, ps.Size())
Expand Down Expand Up @@ -486,7 +486,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) {

// We should be able to pull all remaining items now
for i := 0; i < 4; i++ {
newPs.Consume(func(_ context.Context, traces tracesRequest) error {
consume(newPs, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, req, traces)
return nil
})
Expand Down Expand Up @@ -520,7 +520,7 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) {
require.NoError(t, err)
}

require.True(t, ps.Consume(func(context.Context, tracesRequest) error {
require.True(t, consume(ps, func(context.Context, tracesRequest) error {
// put one more item in
require.NoError(t, ps.Offer(context.Background(), req))
require.Equal(t, 5, ps.Size())
Expand Down Expand Up @@ -564,7 +564,7 @@ func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) {
go func() {
defer conWg.Done()
for i := 0; i < 10; i++ {
assert.True(t, pq.Consume(func(context.Context, tracesRequest) error { return nil }))
assert.True(t, consume(pq, func(context.Context, tracesRequest) error { return nil }))
}
}()
}
Expand Down Expand Up @@ -615,13 +615,13 @@ func TestPersistentQueue_PutCloseReadClose(t *testing.T) {
require.Equal(t, 2, newPs.Size())

// Let's read both of the elements we put
newPs.Consume(func(_ context.Context, traces tracesRequest) error {
consume(newPs, func(_ context.Context, traces tracesRequest) error {
require.Equal(t, req, traces)
return nil
})
assert.Equal(t, 1, newPs.Size())

newPs.Consume(func(_ context.Context, traces tracesRequest) error {
consume(newPs, func(_ context.Context, traces tracesRequest) error {
require.Equal(t, req, traces)
return nil
})
Expand Down Expand Up @@ -663,7 +663,7 @@ func BenchmarkPersistentQueue_TraceSpans(b *testing.B) {
}

for i := 0; i < bb.N; i++ {
require.True(bb, ps.Consume(func(context.Context, tracesRequest) error { return nil }))
require.True(bb, consume(ps, func(context.Context, tracesRequest) error { return nil }))
}
require.NoError(b, ext.Shutdown(context.Background()))
})
Expand Down Expand Up @@ -781,7 +781,7 @@ func TestPersistentQueue_StorageFull(t *testing.T) {
// Subsequent items succeed, as deleting the first item frees enough space for the state update
reqCount--
for i := reqCount; i > 0; i-- {
require.True(t, ps.Consume(func(context.Context, tracesRequest) error { return nil }))
require.True(t, consume(ps, func(context.Context, tracesRequest) error { return nil }))
}

// We should be able to put a new item in
Expand Down Expand Up @@ -856,7 +856,7 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) {
require.ErrorIs(t, pq.Offer(context.Background(), newTracesRequest(5, 5)), ErrQueueIsFull)
assert.Equal(t, 100, pq.Size())

assert.True(t, pq.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 40, traces.traces.SpanCount())
return nil
}))
Expand All @@ -874,13 +874,13 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) {
// Check the combined queue size.
assert.Equal(t, 70, newPQ.Size())

assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 40, traces.traces.SpanCount())
return nil
}))
assert.Equal(t, 30, newPQ.Size())

assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 20, traces.traces.SpanCount())
return nil
}))
Expand All @@ -901,7 +901,7 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) {
assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(5, 5)))
assert.Equal(t, 3, pq.Size())

assert.True(t, pq.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 40, traces.traces.SpanCount())
return nil
}))
Expand All @@ -920,14 +920,14 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) {
assert.Equal(t, 12, newPQ.Size())

// Consuming a restored request should reduce the restored size by 20 but it should not go to below zero
assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 20, traces.traces.SpanCount())
return nil
}))
assert.Equal(t, 0, newPQ.Size())

// Consuming another restored request should not affect the restored size since it's already dropped to 0.
assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 25, traces.traces.SpanCount())
return nil
}))
Expand All @@ -937,7 +937,7 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) {
require.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(5, 5)))
assert.Equal(t, 25, newPQ.Size())

assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 10, traces.traces.SpanCount())
return nil
}))
Expand All @@ -961,7 +961,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) {

// Read the first request just to populate the read index in the storage.
// Otherwise, the write index won't be restored either.
assert.True(t, pq.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 40, traces.traces.SpanCount())
return nil
}))
Expand All @@ -979,7 +979,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) {
// Queue is full
require.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5)))

assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 20, traces.traces.SpanCount())
return nil
}))
Expand All @@ -988,7 +988,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) {
// Still full
require.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5)))

assert.True(t, newPQ.Consume(func(_ context.Context, traces tracesRequest) error {
assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error {
assert.Equal(t, 25, traces.traces.SpanCount())
return nil
}))
Expand All @@ -1015,7 +1015,7 @@ func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) {

// Consume 30 items
for i := 0; i < 3; i++ {
assert.True(t, pq.Consume(func(context.Context, tracesRequest) error { return nil }))
assert.True(t, consume(pq, func(context.Context, tracesRequest) error { return nil }))
}
// The used size is now 30, but the snapshot should have 50, because it's taken every 5 read/writes.
assert.Equal(t, 30, pq.Size())
Expand All @@ -1027,12 +1027,12 @@ func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) {
// In reality the size should be 30. Once the queue is drained, it will be updated to the correct size.
assert.Equal(t, 50, newPQ.Size())

assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil }))
assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil }))
assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil }))
assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil }))
assert.Equal(t, 30, newPQ.Size())

// Now the size must be correctly reflected
assert.True(t, newPQ.Consume(func(context.Context, tracesRequest) error { return nil }))
assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil }))
assert.Equal(t, 0, newPQ.Size())

assert.NoError(t, newPQ.Shutdown(context.Background()))
Expand Down
9 changes: 5 additions & 4 deletions exporter/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ type Queue[T any] interface {
// without violating capacity restrictions. If success returns no error.
// It returns ErrQueueIsFull if no space is currently available.
Offer(ctx context.Context, item T) error
// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
Consume(func(ctx context.Context, item T) error) bool
// Size returns the current Size of the queue
Size() int
// Capacity returns the capacity of the queue.
Capacity() int
// Read pulls the next available item from the queue along with its index. Once processing is
// finished, the index should be called with OnProcessingFinished to clean up the storage.
// The function blocks until an item is available or if the queue is stopped.
// Returns false if reading failed or if the queue is stopped.
Read(context.Context) (uint64, context.Context, T, bool)
// Should be called to remove the item of the given index from the queue once processing is finished.
OnProcessingFinished(index uint64, consumeErr error)
}
Expand Down
18 changes: 18 additions & 0 deletions exporter/internal/queue/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue

import (
"context"
)

func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool {
index, ctx, req, ok := q.Read(context.Background())
if !ok {
return false
}
consumeErr := consumeFunc(ctx, req)
q.OnProcessingFinished(index, consumeErr)
return true
}

0 comments on commit b6b7cc5

Please sign in to comment.