Skip to content

Commit

Permalink
add tests for metrics and pool finalization
Browse files Browse the repository at this point in the history
  • Loading branch information
umputun committed Feb 7, 2025
1 parent 3631f01 commit 31ca9f7
Show file tree
Hide file tree
Showing 2 changed files with 253 additions and 0 deletions.
132 changes: 132 additions & 0 deletions metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,135 @@ func TestMetricsStats_Timing(t *testing.T) {
"total time (%v) should be >= processing time (%v)",
stats.TotalTime, stats.ProcessingTime)
}

func TestMetrics_ContextValues(t *testing.T) {
t.Run("worker id from context", func(t *testing.T) {
t.Run("valid worker id", func(t *testing.T) {
ctx := WithWorkerID(context.Background(), 123)
assert.Equal(t, 123, WorkerID(ctx))
})

t.Run("missing worker id", func(t *testing.T) {
ctx := context.Background()
assert.Equal(t, 0, WorkerID(ctx))
})

t.Run("invalid worker id type", func(t *testing.T) {
ctx := context.WithValue(context.Background(), widContextKey, "not an int")
assert.Equal(t, 0, WorkerID(ctx))
})
})

t.Run("metrics from context", func(t *testing.T) {
t.Run("valid metrics", func(t *testing.T) {
ctx := Make(context.Background())
m := Get(ctx)
assert.NotNil(t, m)

// verify it's a working metrics instance
m.Inc("test")
assert.Equal(t, 1, m.Get("test"))
})

t.Run("missing metrics", func(t *testing.T) {
ctx := context.Background()
m := Get(ctx)
assert.NotNil(t, m, "should return new metrics instance")

// verify it's a working metrics instance
m.Inc("test")
assert.Equal(t, 1, m.Get("test"))
})

t.Run("invalid metrics type", func(t *testing.T) {
ctx := context.WithValue(context.Background(), metricsContextKey, "not metrics")
m := Get(ctx)
assert.NotNil(t, m, "should return new metrics instance")

// verify it's a working metrics instance
m.Inc("test")
assert.Equal(t, 1, m.Get("test"))
})

t.Run("metrics values isolated", func(t *testing.T) {
ctx1 := Make(context.Background())
ctx2 := Make(context.Background())

m1 := Get(ctx1)
m2 := Get(ctx2)

m1.Inc("test")
assert.Equal(t, 1, m1.Get("test"))
assert.Equal(t, 0, m2.Get("test"), "metrics should be isolated")
})
})

t.Run("combined worker id and metrics", func(t *testing.T) {
ctx := Make(context.Background())
ctx = WithWorkerID(ctx, 42)

assert.Equal(t, 42, WorkerID(ctx))
m := Get(ctx)
assert.NotNil(t, m)

// verify metrics working
m.Inc("test")
assert.Equal(t, 1, m.Get("test"))
})
}

func TestMetrics_Stats(t *testing.T) {
t.Run("all durations", func(t *testing.T) {
m := New()
m.AddDuration(DurationProc, time.Second)
m.AddDuration(DurationWait, 2*time.Second)
m.AddDuration(DurationInit, 3*time.Second)
m.AddDuration(DurationWrap, 4*time.Second)

stats := m.Stats()
assert.Equal(t, time.Second, stats.ProcessingTime)
assert.Equal(t, 2*time.Second, stats.WaitTime)
assert.Equal(t, 3*time.Second, stats.InitTime)
assert.Equal(t, 4*time.Second, stats.WrapTime)

// total time should be max of time.Since(startTime) and sum of all durations
expectedTotal := 10 * time.Second // sum of all durations
assert.GreaterOrEqual(t, stats.TotalTime, expectedTotal)
})

t.Run("all counters", func(t *testing.T) {
m := New()
m.Inc(CountProcessed)
m.Inc(CountProcessed)
m.Inc(CountErrors)
m.Inc(CountDropped)
m.Inc(CountDropped)
m.Inc(CountDropped)

stats := m.Stats()
assert.Equal(t, 2, stats.Processed)
assert.Equal(t, 1, stats.Errors)
assert.Equal(t, 3, stats.Dropped)
})

t.Run("total time calculation", func(t *testing.T) {
m := New()

// make sure some time passes
time.Sleep(time.Millisecond * 10)

// add durations less than elapsed time
m.AddDuration(DurationProc, time.Millisecond)
m.AddDuration(DurationWait, time.Millisecond)

stats := m.Stats()
// total time should be time.Since(startTime) as it's greater
assert.Greater(t, stats.TotalTime, 2*time.Millisecond)

// now add duration greater than elapsed
m.AddDuration(DurationProc, time.Second)
stats = m.Stats()
// total time should be sum of durations as it's greater
assert.Greater(t, stats.TotalTime, time.Second)
})
}
121 changes: 121 additions & 0 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,3 +484,124 @@ func TestPool_MetricsAsStruct(t *testing.T) {
// verify actual vs reported processing
assert.Equal(t, int(processed), stats.Processed, "processed count mismatch")
}

func TestPool_FinalizeWorker(t *testing.T) {
t.Run("batch processing with errors", func(t *testing.T) {
var processed []string
worker := WorkerFunc[string](func(_ context.Context, v string) error {
if v == "error" {
return fmt.Errorf("test error")
}
processed = append(processed, v)
return nil
})

p, err := New[string](1, worker)
require.NoError(t, err)
require.NoError(t, p.Go(context.Background()))

// fill batch buffer with items including error
p.buf[0] = []string{"ok1", "error", "ok2"}

// should process until error
err = p.finalizeWorker(context.Background(), 0, worker)
require.Error(t, err)
require.Contains(t, err.Error(), "test error")
assert.Equal(t, []string{"ok1"}, processed)
})

t.Run("batch processing continues on error", func(t *testing.T) {
var processed []string
worker := WorkerFunc[string](func(_ context.Context, v string) error {
if v == "error" {
return fmt.Errorf("test error")
}
processed = append(processed, v)
return nil
})

p, err := New[string](1, worker,
Options[string]().WithContinueOnError(),
)
require.NoError(t, err)
require.NoError(t, p.Go(context.Background()))

// fill batch buffer with items including error
p.buf[0] = []string{"ok1", "error", "ok2"}

// should process all items
err = p.finalizeWorker(context.Background(), 0, worker)
require.NoError(t, err)
assert.Equal(t, []string{"ok1", "ok2"}, processed)
})

t.Run("completeFn error", func(t *testing.T) {
worker := WorkerFunc[string](func(_ context.Context, v string) error {
return nil
})

completeFnError := fmt.Errorf("complete error")
p, err := New[string](1, worker,
Options[string]().WithCompleteFn(func(context.Context, int, Worker[string]) error {
return completeFnError
}),
)
require.NoError(t, err)
require.NoError(t, p.Go(context.Background()))

err = p.finalizeWorker(context.Background(), 0, worker)
require.Error(t, err)
require.ErrorIs(t, err, completeFnError)
})

t.Run("batch error prevents completeFn", func(t *testing.T) {
var completeFnCalled bool
worker := WorkerFunc[string](func(_ context.Context, v string) error {
return fmt.Errorf("batch error")
})

p, err := New[string](1, worker,
Options[string]().WithCompleteFn(func(context.Context, int, Worker[string]) error {
completeFnCalled = true
return fmt.Errorf("complete error")
}),
)
require.NoError(t, err)
require.NoError(t, p.Go(context.Background()))

// fill batch buffer with an item
p.buf[0] = []string{"task"}

err = p.finalizeWorker(context.Background(), 0, worker)
require.Error(t, err)
require.Contains(t, err.Error(), "batch error")
assert.False(t, completeFnCalled, "completeFn should not be called after batch error")
})

t.Run("context cancellation", func(t *testing.T) {
processed := make(chan string, 1)
worker := WorkerFunc[string](func(ctx context.Context, v string) error {
select {
case <-ctx.Done():
return ctx.Err()
case processed <- v:
return nil
}
})

ctx, cancel := context.WithCancel(context.Background())
p, err := New[string](1, worker)
require.NoError(t, err)
require.NoError(t, p.Go(ctx))

// fill batch buffer
p.buf[0] = []string{"task1", "task2"}

// cancel context before finalization
cancel()

err = p.finalizeWorker(ctx, 0, worker)
require.Error(t, err)
require.ErrorIs(t, err, context.Canceled)
})
}

0 comments on commit 31ca9f7

Please sign in to comment.