Skip to content

Commit

Permalink
[exporter][chore] Exporter queue batcher flushes current batch on shu…
Browse files Browse the repository at this point in the history
…tdown (open-telemetry#11666)

#### Description

This PR changes exporter queue batcher to flush the current batch on
shutdown.

#### Link to tracking issue

open-telemetry#10368
open-telemetry#8122
  • Loading branch information
sfc-gh-sili authored and djaglowski committed Nov 21, 2024
1 parent 010a72f commit 6e9bf8d
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 18 deletions.
6 changes: 0 additions & 6 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,3 @@ func (qb *BaseBatcher) flushAsync(batchToFlush batch) {
qb.workerPool <- true
}()
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *BaseBatcher) Shutdown(_ context.Context) error {
qb.stopWG.Wait()
return nil
}
35 changes: 23 additions & 12 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,7 @@ func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() {
case <-qb.shutdownCh:
return
case <-qb.timer.C:
qb.currentBatchMu.Lock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.currentBatchMu.Unlock()
continue
}
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flushAsync() blocks until successfully started a goroutine for flushing.
qb.flushAsync(batchToFlush)
qb.resetTimer()
qb.flushCurrentBatchIfNecessary()
}
}
}()
Expand All @@ -155,6 +144,28 @@ func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error {

qb.startReadingFlushingGoroutine()
qb.startTimeBasedFlushingGoroutine()
return nil
}

// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() {
qb.currentBatchMu.Lock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.currentBatchMu.Unlock()
return
}
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flushAsync() blocks until successfully started a goroutine for flushing.
qb.flushAsync(batchToFlush)
qb.resetTimer()
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *DefaultBatcher) Shutdown(_ context.Context) error {
qb.flushCurrentBatchIfNecessary()
qb.stopWG.Wait()
return nil
}
37 changes: 37 additions & 0 deletions exporter/internal/queue/default_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,40 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) {
})
}
}

func TestDefaultBatcher_Shutdown(t *testing.T) {
batchCfg := exporterbatcher.NewDefaultConfig()
batchCfg.MinSizeItems = 10
batchCfg.FlushTimeout = 100 * time.Second

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

ba, err := NewBatcher(batchCfg, q,
func(ctx context.Context, req internal.Request) error { return req.Export(ctx) },
2)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))

sink := newFakeRequestSink()

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 1, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink}))

// Give the batcher some time to read from queue
time.Sleep(100 * time.Millisecond)

assert.Equal(t, int64(0), sink.requestsCount.Load())
assert.Equal(t, int64(0), sink.itemsCount.Load())

require.NoError(t, q.Shutdown(context.Background()))
require.NoError(t, ba.Shutdown(context.Background()))

assert.Equal(t, int64(1), sink.requestsCount.Load())
assert.Equal(t, int64(3), sink.itemsCount.Load())
}
6 changes: 6 additions & 0 deletions exporter/internal/queue/disabled_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
}()
return nil
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *DisabledBatcher) Shutdown(_ context.Context) error {
qb.stopWG.Wait()
return nil
}

0 comments on commit 6e9bf8d

Please sign in to comment.