Skip to content

Commit

Permalink
[chore] Remove impossible infinite workers use-case for Batcher
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Dec 28, 2024
1 parent ffcef93 commit 574c86e
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 67 deletions.
53 changes: 19 additions & 34 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type Batcher interface {
type BaseBatcher struct {
batchCfg exporterbatcher.Config
queue Queue[internal.Request]
maxWorkers int
workerPool chan bool
exportFunc func(ctx context.Context, req internal.Request) error
stopWG sync.WaitGroup
Expand All @@ -38,35 +37,26 @@ func NewBatcher(batchCfg exporterbatcher.Config,
maxWorkers int,
) (Batcher, error) {
if !batchCfg.Enabled {
return &DisabledBatcher{
BaseBatcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
exportFunc: exportFunc,
stopWG: sync.WaitGroup{},
},
}, nil
return &DisabledBatcher{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil
}

return &DefaultBatcher{
BaseBatcher: BaseBatcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
exportFunc: exportFunc,
stopWG: sync.WaitGroup{},
},
}, nil
return &DefaultBatcher{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil
}

func (qb *BaseBatcher) startWorkerPool() {
if qb.maxWorkers == 0 {
return
func newBaseBatcher(batchCfg exporterbatcher.Config,
queue Queue[internal.Request],
exportFunc func(ctx context.Context, req internal.Request) error,
maxWorkers int,
) BaseBatcher {
workerPool := make(chan bool, maxWorkers)
for i := 0; i < maxWorkers; i++ {
workerPool <- true
}
qb.workerPool = make(chan bool, qb.maxWorkers)
for i := 0; i < qb.maxWorkers; i++ {
qb.workerPool <- true
return BaseBatcher{
batchCfg: batchCfg,
queue: queue,
workerPool: workerPool,
exportFunc: exportFunc,
}
}

Expand All @@ -81,17 +71,12 @@ func (qb *BaseBatcher) flush(batchToFlush batch) {
// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available.
func (qb *BaseBatcher) flushAsync(batchToFlush batch) {
qb.stopWG.Add(1)
if qb.maxWorkers == 0 {
go func() {
defer qb.stopWG.Done()
qb.flush(batchToFlush)
}()
return
}
<-qb.workerPool
go func() {
defer qb.stopWG.Done()
defer func() {
qb.workerPool <- true
qb.stopWG.Done()
}()
qb.flush(batchToFlush)
qb.workerPool <- true
}()
}
6 changes: 0 additions & 6 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,6 @@ func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() {

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error {
// maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics.
if qb.maxWorkers == -1 {
return nil
}

qb.startWorkerPool()
qb.shutdownCh = make(chan bool, 1)

if qb.batchCfg.FlushTimeout == 0 {
Expand Down
16 changes: 0 additions & 16 deletions exporter/internal/queue/default_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
Expand Down Expand Up @@ -83,10 +79,6 @@ func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
Expand Down Expand Up @@ -153,10 +145,6 @@ func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
Expand Down Expand Up @@ -220,10 +208,6 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
Expand Down
7 changes: 0 additions & 7 deletions exporter/internal/queue/disabled_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ type DisabledBatcher struct {

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
// maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics.
if qb.maxWorkers == -1 {
return nil
}

qb.startWorkerPool()

// This goroutine reads and then flushes.
// 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped.
// 2. flushAsync() blocks until there are idle workers in the worker pool.
Expand Down
4 changes: 0 additions & 4 deletions exporter/internal/queue/disabled_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ func TestDisabledBatcher_Basic(t *testing.T) {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
Expand Down

0 comments on commit 574c86e

Please sign in to comment.