Skip to content

Commit

Permalink
Merge pull request #2 from go-pkgz/perf
Browse files Browse the repository at this point in the history
Performance optimization
  • Loading branch information
umputun authored Feb 14, 2025
2 parents f8b1cd3 + 80b956e commit 6a04124
Show file tree
Hide file tree
Showing 7 changed files with 1,066 additions and 397 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.prof
pool.test
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ linters:
- gofmt
- goimports
- intrange
- mnd
- nilerr
- predeclared
- testifylint
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -421,11 +421,11 @@ p := pool.New[string](2, worker).
```

Available options:
- `WithBatchSize(size int)` - enables batch processing, accumulating items before sending to workers
- `WithWorkerChanSize(size int)` - sets buffer size for worker channels
- `WithChunkFn(fn func(T) string)` - controls work distribution by key
- `WithContinueOnError()` - continues processing on errors
- `WithCompleteFn(fn func(ctx, id, worker))` - called on worker completion
- `WithBatchSize(size int)` - enables batch processing, accumulating items before sending to workers (default: 10)
- `WithWorkerChanSize(size int)` - sets buffer size for worker channels (default: 1)
- `WithChunkFn(fn func(T) string)` - controls work distribution by key (default: none, random distribution)
- `WithContinueOnError()` - continues processing on errors (default: false)
- `WithCompleteFn(fn func(ctx, id, worker))` - called on worker completion (default: none)

### Alternative pool implementations

Expand Down
321 changes: 321 additions & 0 deletions bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
package pool

import (
"context"
"os"
"runtime/pprof"
"strconv"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

// benchTask is a somewhat realistic task that combines CPU work with memory allocation
func benchTask(size int) []int { //nolint:unparam // size is used in the benchmark
task := func(n int) int { // simulate some CPU work
sum := 0
for i := 0; i < n; i++ {
sum += i
}
return sum
}
res := make([]int, 0, size)
for i := 0; i < size; i++ {
res = append(res, task(1))
}
return res
}

func TestPoolPerf(t *testing.T) {
n := 1000
ctx := context.Background()

var egDuration time.Duration
t.Run("errgroup", func(t *testing.T) {
var count2 int32
st := time.Now()
defer func() {
egDuration = time.Since(st)
t.Logf("elapsed errgroup: %v", time.Since(st))
}()
g, _ := errgroup.WithContext(ctx)
g.SetLimit(8)
for i := 0; i < 1000000; i++ {
g.Go(func() error {
benchTask(n)
atomic.AddInt32(&count2, 1)
return nil
})
}
require.NoError(t, g.Wait())
assert.Equal(t, int32(1000000), atomic.LoadInt32(&count2))
})

t.Run("pool default", func(t *testing.T) {
// pool with 8 workers
var count1 int32
worker := WorkerFunc[int](func(context.Context, int) error {
benchTask(n)
atomic.AddInt32(&count1, 1)
return nil
})

st := time.Now()
p := New[int](8, worker)
require.NoError(t, p.Go(ctx))
go func() {
for i := 0; i < 1000000; i++ {
p.Submit(i)
}
assert.NoError(t, p.Close(ctx))
}()
require.NoError(t, p.Wait(ctx))
assert.Equal(t, int32(1000000), atomic.LoadInt32(&count1))
t.Logf("elapsed pool: %v", time.Since(st))
assert.Less(t, time.Since(st), egDuration)
})

t.Run("pool with 100 chan size", func(t *testing.T) {
// pool with 8 workers
var count1 int32
worker := WorkerFunc[int](func(context.Context, int) error {
benchTask(n)
atomic.AddInt32(&count1, 1)
return nil
})

st := time.Now()
p := New[int](8, worker).WithWorkerChanSize(100)
require.NoError(t, p.Go(ctx))
go func() {
for i := 0; i < 1000000; i++ {
p.Submit(i)
}
assert.NoError(t, p.Close(ctx))
}()
require.NoError(t, p.Wait(ctx))
assert.Equal(t, int32(1000000), atomic.LoadInt32(&count1))
t.Logf("elapsed pool: %v", time.Since(st))
assert.Less(t, time.Since(st), egDuration)
})

t.Run("pool with 100 chan size and 100 batch size", func(t *testing.T) {
// pool with 8 workers
var count1 int32
worker := WorkerFunc[int](func(context.Context, int) error {
benchTask(n)
atomic.AddInt32(&count1, 1)
return nil
})

st := time.Now()
p := New[int](8, worker).WithWorkerChanSize(100).WithBatchSize(100)
require.NoError(t, p.Go(ctx))
go func() {
for i := 0; i < 1000000; i++ {
p.Submit(i)
}
assert.NoError(t, p.Close(ctx))
}()
require.NoError(t, p.Wait(ctx))
assert.Equal(t, int32(1000000), atomic.LoadInt32(&count1))
t.Logf("elapsed pool: %v", time.Since(st))
assert.Less(t, time.Since(st), egDuration)
})

t.Run("pool with 100 chan size and 100 batch size and chunking", func(t *testing.T) {
// pool with 8 workers
var count1 int32
worker := WorkerFunc[int](func(context.Context, int) error {
benchTask(n)
atomic.AddInt32(&count1, 1)
return nil
})

st := time.Now()
p := New[int](8, worker).WithWorkerChanSize(100).WithBatchSize(100).WithChunkFn(func(v int) string {
return strconv.Itoa(v % 8) // distribute by modulo
})
require.NoError(t, p.Go(ctx))
go func() {
for i := 0; i < 1000000; i++ {
p.Submit(i)
}
assert.NoError(t, p.Close(ctx))
}()
require.NoError(t, p.Wait(ctx))
assert.Equal(t, int32(1000000), atomic.LoadInt32(&count1))
t.Logf("elapsed pool: %v", time.Since(st))
assert.Less(t, time.Since(st), egDuration)
})

}

func BenchmarkPoolCompare(b *testing.B) {
ctx := context.Background()
iterations := 10000
workers := 8
n := 1000

b.Run("errgroup", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
var count int32
g, _ := errgroup.WithContext(ctx)
g.SetLimit(workers)

for j := 0; j < iterations; j++ {
g.Go(func() error {
benchTask(n)
atomic.AddInt32(&count, 1)
return nil
})
}
require.NoError(b, g.Wait())
require.Equal(b, int32(iterations), atomic.LoadInt32(&count))
}
})

b.Run("pool default", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
var count int32
p := New[int](workers, WorkerFunc[int](func(context.Context, int) error {
benchTask(n)
atomic.AddInt32(&count, 1)
return nil
}))

require.NoError(b, p.Go(ctx))
go func() {
for j := 0; j < iterations; j++ {
p.Submit(j)
}
p.Close(ctx)
}()
require.NoError(b, p.Wait(ctx))
require.Equal(b, int32(iterations), atomic.LoadInt32(&count))
}
})

b.Run("pool with chan=100", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
var count int32
p := New[int](workers, WorkerFunc[int](func(context.Context, int) error {
benchTask(n)
atomic.AddInt32(&count, 1)
return nil
})).WithWorkerChanSize(100)

require.NoError(b, p.Go(ctx))
go func() {
for j := 0; j < iterations; j++ {
p.Submit(j)
}
p.Close(ctx)
}()
require.NoError(b, p.Wait(ctx))
require.Equal(b, int32(iterations), atomic.LoadInt32(&count))
}
})

b.Run("pool with batching", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
var count int32
p := New[int](workers, WorkerFunc[int](func(context.Context, int) error {
benchTask(n)
atomic.AddInt32(&count, 1)
return nil
})).WithWorkerChanSize(100).WithBatchSize(100)

require.NoError(b, p.Go(ctx))
go func() {
for j := 0; j < iterations; j++ {
p.Submit(j)
}
p.Close(ctx)
}()
require.NoError(b, p.Wait(ctx))
require.Equal(b, int32(iterations), atomic.LoadInt32(&count))
}
})

b.Run("pool with batching and chunking", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
var count int32
p := New[int](workers, WorkerFunc[int](func(context.Context, int) error {
benchTask(n)
atomic.AddInt32(&count, 1)
return nil
})).WithWorkerChanSize(100).WithBatchSize(100).WithChunkFn(func(v int) string {
return strconv.Itoa(v % workers)
})

require.NoError(b, p.Go(ctx))
go func() {
for j := 0; j < iterations; j++ {
p.Submit(j)
}
p.Close(ctx)
}()
require.NoError(b, p.Wait(ctx))
require.Equal(b, int32(iterations), atomic.LoadInt32(&count))
}
})
}

func TestPoolWithProfiling(t *testing.T) {
// run only if env PROFILING is set
if os.Getenv("PROFILING") == "" {
t.Skip("skipping profiling test; set PROFILING to run")
}

// start CPU profile
cpuFile, err := os.Create("cpu.prof")
require.NoError(t, err)
defer cpuFile.Close()
require.NoError(t, pprof.StartCPUProfile(cpuFile))
defer pprof.StopCPUProfile()

// create memory profile
memFile, err := os.Create("mem.prof")
require.NoError(t, err)
defer memFile.Close()

// run pool test
iterations := 100000
ctx := context.Background()
worker := WorkerFunc[int](func(context.Context, int) error {
benchTask(30000)
return nil
})

// test pool implementation
p := New[int](4, worker).WithWorkerChanSize(100)
require.NoError(t, p.Go(ctx))

done := make(chan struct{})
go func() {
for i := 0; i < iterations; i++ {
p.Submit(i)
}
p.Close(ctx)
close(done)
}()

select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("timeout")
}

// create memory profile after test
require.NoError(t, pprof.WriteHeapProfile(memFile))
}
Loading

0 comments on commit 6a04124

Please sign in to comment.