Skip to content

Commit

Permalink
[exporter][chore] Change queue batcher to use exportFunc instead of r…
Browse files Browse the repository at this point in the history
…equest.export() (open-telemetry#11636)

#### Description

This PR changes queue batcher to use `exportFunc` instead of
`request.export()`. This makes testing easier and avoid passing
unnecessary detail to the exporter batcher.

#### Link to tracking issue

open-telemetry#8122
open-telemetry#10368
  • Loading branch information
sfc-gh-sili authored and djaglowski committed Nov 21, 2024
1 parent 93af6c2 commit 18fe3ba
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
10 changes: 8 additions & 2 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,21 @@ type BaseBatcher struct {
queue Queue[internal.Request]
maxWorkers int
workerPool chan bool
exportFunc func(ctx context.Context, req internal.Request) error
stopWG sync.WaitGroup
}

func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) (Batcher, error) {
func NewBatcher(batchCfg exporterbatcher.Config,
queue Queue[internal.Request],
exportFunc func(ctx context.Context, req internal.Request) error,
maxWorkers int) (Batcher, error) {
if !batchCfg.Enabled {
return &DisabledBatcher{
BaseBatcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
exportFunc: exportFunc,
stopWG: sync.WaitGroup{},
},
}, nil
Expand All @@ -48,6 +53,7 @@ func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request],
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
exportFunc: exportFunc,
stopWG: sync.WaitGroup{},
},
}, nil
Expand All @@ -65,7 +71,7 @@ func (qb *BaseBatcher) startWorkerPool() {

// flush exports the incoming batch synchronously.
func (qb *BaseBatcher) flush(batchToFlush batch) {
err := batchToFlush.req.Export(batchToFlush.ctx)
err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req)
for _, idx := range batchToFlush.idxList {
qb.queue.OnProcessingFinished(idx, err)
}
Expand Down
16 changes: 12 additions & 4 deletions exporter/internal/queue/default_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
Capacity: 10,
})

ba, err := NewBatcher(cfg, q, tt.maxWorkers)
ba, err := NewBatcher(cfg, q,
func(ctx context.Context, req internal.Request) error { return req.Export(ctx) },
tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -108,7 +110,9 @@ func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
Capacity: 10,
})

ba, err := NewBatcher(cfg, q, tt.maxWorkers)
ba, err := NewBatcher(cfg, q,
func(ctx context.Context, req internal.Request) error { return req.Export(ctx) },
tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -172,7 +176,9 @@ func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) {
Capacity: 10,
})

ba, err := NewBatcher(cfg, q, tt.maxWorkers)
ba, err := NewBatcher(cfg, q,
func(ctx context.Context, req internal.Request) error { return req.Export(ctx) },
tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -236,7 +242,9 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) {
Capacity: 10,
})

ba, err := NewBatcher(cfg, q, tt.maxWorkers)
ba, err := NewBatcher(cfg, q,
func(ctx context.Context, req internal.Request) error { return req.Export(ctx) },
tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
Expand Down
4 changes: 3 additions & 1 deletion exporter/internal/queue/disabled_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ func TestDisabledBatcher_Basic(t *testing.T) {
Capacity: 10,
})

ba, err := NewBatcher(cfg, q, tt.maxWorkers)
ba, err := NewBatcher(cfg, q,
func(ctx context.Context, req internal.Request) error { return req.Export(ctx) },
tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
Expand Down

0 comments on commit 18fe3ba

Please sign in to comment.