-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdoc.go
175 lines (175 loc) · 4.9 KB
/
doc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// Package pool provides a simple worker pool implementation with a single stage only.
// It allows submitting tasks to be processed in parallel by a number of workers.
//
// The package supports both stateless and stateful workers through two distinct constructors:
// - New - for pools with a single shared worker instance
// - NewStateful - for pools where each goroutine gets its own worker instance
//
// Worker Types:
//
// The package provides a simple Worker interface that can be implemented in two ways:
//
// type Worker[T any] interface {
// Do(ctx context.Context, v T) error
// }
//
// 1. Direct implementation for complex stateful workers:
//
// type dbWorker struct {
// conn *sql.DB
// }
//
// func (w *dbWorker) Do(ctx context.Context, v string) error {
// return w.conn.ExecContext(ctx, "INSERT INTO items (value) VALUES (?)", v)
// }
//
// 2. Function adapter for simple stateless workers:
//
// worker := pool.WorkerFunc[string](func(ctx context.Context, v string) error {
// // process the value
// return nil
// })
//
// Basic Usage:
//
// For stateless operations (like HTTP requests, parsing operations, etc.):
//
// worker := pool.WorkerFunc[string](func(ctx context.Context, v string) error {
// resp, err := http.Get(v)
// if err != nil {
// return err
// }
// defer resp.Body.Close()
// return nil
// })
//
// p := pool.New[string](2, worker)
// if err := p.Go(context.Background()); err != nil {
// return err
// }
//
// // submit work
// p.Submit("task1")
// p.Submit("task2")
//
// if err := p.Close(context.Background()); err != nil {
// return err
// }
//
// For stateful operations (like database connections, file handles, etc.):
//
// maker := func() pool.Worker[string] {
// return &dbWorker{
// conn: openConnection(),
// }
// }
// p := pool.NewStateful[string](2, maker)
//
// Features:
//
// - Generic worker pool implementation supporting any data type
// - Configurable number of workers running in parallel
// - Support for both stateless shared workers and per-worker instances
// - Batching capability for processing multiple items at once
// - Customizable work distribution through chunk functions
// - Built-in metrics collection including processing times and counts
// - Error handling with options to continue or stop on errors
// - Context-based cancellation and timeouts
// - Optional completion callbacks
//
// Advanced Features:
//
// Batching:
//
// p := New[string](2, worker).WithBatchSize(10)
//
// Chunked distribution:
//
// p := New[string](2, worker).WithChunkFn(func(v string) string {
// return v // items with same hash go to same worker
// })
//
// Error handling:
//
// p := New[string](2, worker).WithContinueOnError()
//
// Metrics:
//
// The pool automatically tracks standard stats metrics (processed counts, errors, timings).
// Workers can also record additional custom metrics:
//
// m := metrics.Get(ctx)
// m.Inc("custom-counter")
//
// Access metrics:
//
// metrics := p.Metrics()
// value := metrics.Get("custom-counter")
//
// Statistical metrics including:
//
// - Number of processed items
// - Number of errors
// - Number of dropped items
// - Processing time
// - Wait time
// - Initialization time
// - Total time
//
// Access stats:
//
// metrics := p.Metrics()
// stats := metrics.GetStats()
// fmt.Printf("processed: %d, errors: %d", stats.Processed, stats.Errors)
//
// Data Collection:
//
// For collecting results from workers, use the Collector:
//
// collector := pool.NewCollector[Result](ctx, 10)
// worker := pool.WorkerFunc[Input](func(ctx context.Context, v Input) error {
// result := process(v)
// collector.Submit(result)
// return nil
// })
//
// Results can be retrieved either through iteration:
//
// for v, err := range collector.Iter() {
// if err != nil {
// return err
// }
// // use v
// }
//
// Or by collecting all at once:
//
// results, err := collector.All()
//
// Middleware Support:
//
// The pool supports middleware pattern similar to HTTP middleware in Go. Middleware can be used
// to add functionality like retries, timeouts, metrics, or error handling:
//
// // retry middleware
// retryMiddleware := func(next Worker[string]) Worker[string] {
// return WorkerFunc[string](func(ctx context.Context, v string) error {
// var lastErr error
// for i := 0; i < 3; i++ {
// if err := next.Do(ctx, v); err == nil {
// return nil
// } else {
// lastErr = err
// }
// time.Sleep(time.Second * time.Duration(i))
// }
// return fmt.Errorf("failed after 3 attempts: %w", lastErr)
// })
// }
//
// p := New[string](2, worker).Use(retryMiddleware)
//
// Multiple middleware can be chained, and they execute in the same order as provided:
//
// p.Use(logging, metrics, retry) // executes: logging -> metrics -> retry -> worker
package pool