diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go index 9149dda6613..2f728b07366 100644 --- a/exporter/internal/queue/batcher.go +++ b/exporter/internal/queue/batcher.go @@ -28,14 +28,11 @@ type BaseBatcher struct { batchCfg exporterbatcher.Config queue Queue[internal.Request] maxWorkers int + workerPool chan bool stopWG sync.WaitGroup } func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) (Batcher, error) { - if maxWorkers != 0 { - return nil, errors.ErrUnsupported - } - if batchCfg.Enabled { return nil, errors.ErrUnsupported } @@ -50,6 +47,16 @@ func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], }, nil } +func (qb *BaseBatcher) startWorkerPool() { + if qb.maxWorkers == 0 { + return + } + qb.workerPool = make(chan bool, qb.maxWorkers) + for i := 0; i < qb.maxWorkers; i++ { + qb.workerPool <- true + } +} + // flush exports the incoming batch synchronously. func (qb *BaseBatcher) flush(batchToFlush batch) { err := batchToFlush.req.Export(batchToFlush.ctx) @@ -61,9 +68,18 @@ func (qb *BaseBatcher) flush(batchToFlush batch) { // flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available. func (qb *BaseBatcher) flushAsync(batchToFlush batch) { qb.stopWG.Add(1) + if qb.maxWorkers == 0 { + go func() { + defer qb.stopWG.Done() + qb.flush(batchToFlush) + }() + return + } + <-qb.workerPool go func() { defer qb.stopWG.Done() qb.flush(batchToFlush) + qb.workerPool <- true }() } diff --git a/exporter/internal/queue/disabled_batcher.go b/exporter/internal/queue/disabled_batcher.go index c5078885538..6eb1df1dace 100644 --- a/exporter/internal/queue/disabled_batcher.go +++ b/exporter/internal/queue/disabled_batcher.go @@ -17,6 +17,8 @@ type DisabledBatcher struct { // Start starts the goroutine that reads from the queue and flushes asynchronously. func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { + qb.startWorkerPool() + // This goroutine reads and then flushes. // 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped. // 2. flushAsync() blocks until there are idle workers in the worker pool. diff --git a/exporter/internal/queue/disabled_batcher_test.go b/exporter/internal/queue/disabled_batcher_test.go index e031ce83870..c9c69a61f05 100644 --- a/exporter/internal/queue/disabled_batcher_test.go +++ b/exporter/internal/queue/disabled_batcher_test.go @@ -17,60 +17,58 @@ import ( "go.opentelemetry.io/collector/exporter/internal" ) -func TestDisabledBatcher_InfiniteWorkerPool(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.Enabled = false - - q := NewBoundedMemoryQueue[internal.Request]( - MemoryQueueSettings[internal.Request]{ - Sizer: &RequestSizer[internal.Request]{}, - Capacity: 10, - }) - - maxWorkers := 0 - ba, err := NewBatcher(cfg, q, maxWorkers) - require.NoError(t, err) - - require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, q.Shutdown(context.Background())) - require.NoError(t, ba.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - - require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink})) - require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 - }, 30*time.Millisecond, 10*time.Millisecond) - - require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 25 - }, 30*time.Millisecond, 10*time.Millisecond) - - require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink})) - - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 38 - }, 30*time.Millisecond, 10*time.Millisecond) -} - -func TestDisabledBatcher_LimitedWorkerNotImplemented(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.Enabled = false - maxWorkers := 1 - - q := NewBoundedMemoryQueue[internal.Request]( - MemoryQueueSettings[internal.Request]{ - Sizer: &RequestSizer[internal.Request]{}, - Capacity: 10, +func TestDisabledBatcher_Basic(t *testing.T) { + tests := []struct { + name string + maxWorkers int + }{ + { + name: "infinate_workers", + maxWorkers: 0, + }, + { + name: "one_worker", + maxWorkers: 1, + }, + { + name: "three_workers", + maxWorkers: 3, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = false + + q := NewBoundedMemoryQueue[internal.Request]( + MemoryQueueSettings[internal.Request]{ + Sizer: &RequestSizer[internal.Request]{}, + Capacity: 10, + }) + + ba, err := NewBatcher(cfg, q, tt.maxWorkers) + require.NoError(t, err) + + require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, q.Shutdown(context.Background())) + require.NoError(t, ba.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 35, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 75 + }, 30*time.Millisecond, 10*time.Millisecond) }) - - _, err := NewBatcher(cfg, q, maxWorkers) - require.Error(t, err) + } } func TestDisabledBatcher_BatchingNotImplemented(t *testing.T) {