Skip to content

Commit

Permalink
Add middleware support to pool package (#1)
Browse files Browse the repository at this point in the history
* Add middleware support to pool package

Introduce middleware functionality to the pool package, enabling the addition of cross-cutting concerns like retries, timeouts, panic recovery, validation, metrics, and logging. Update README documentation with examples of built-in and custom middleware usage. Include comprehensive test cases

* lint: minor warns

* docs: update README to include middleware features
  • Loading branch information
umputun authored Feb 13, 2025
1 parent f9be306 commit 3cecbde
Show file tree
Hide file tree
Showing 11 changed files with 987 additions and 0 deletions.
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
- Error handling with continue/stop options
- Context-based cancellation and timeouts
- Optional completion callbacks
- Extensible middleware system for custom functionality
- Built-in middlewares for common tasks
- No external dependencies except for the testing framework

## Quick Start
Expand Down Expand Up @@ -217,6 +219,51 @@ When to use custom distribution:
- Ensure exclusive access to resources
- Process data consistently

## Middleware Support

The package supports middleware pattern similar to HTTP middleware in Go. Middleware can be used to add cross-cutting concerns like:
- Retries with backoff
- Timeouts
- Panic recovery
- Metrics and logging
- Error handling

Built-in middleware:
```go
// Add retry with exponential backoff
p.Use(middleware.Retry[string](3, time.Second))

// Add timeout per operation
p.Use(middleware.Timeout[string](5 * time.Second))

// Add panic recovery
p.Use(middleware.Recovery[string](func(p interface{}) {
log.Printf("recovered from panic: %v", p)
}))

// Add validation before processing
p.Use(middleware.Validate([string]validator))
```

Custom middleware:
```go
logging := func(next pool.Worker[string]) pool.Worker[string] {
return pool.WorkerFunc[string](func(ctx context.Context, v string) error {
log.Printf("processing: %v", v)
err := next.Do(ctx, v)
log.Printf("completed: %v, err: %v", v, err)
return err
})
}

p.Use(logging)
```

Multiple middleware execute in the same order as provided:
```go
p.Use(logging, metrics, retry) // order: logging -> metrics -> retry -> worker
```

## Install and update

```bash
Expand Down
27 changes: 27 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,31 @@
// Or by collecting all at once:
//
// results, err := collector.All()
//
// Middleware Support:
//
// The pool supports middleware pattern similar to HTTP middleware in Go. Middleware can be used
// to add functionality like retries, timeouts, metrics, or error handling:
//
// // retry middleware
// retryMiddleware := func(next Worker[string]) Worker[string] {
// return WorkerFunc[string](func(ctx context.Context, v string) error {
// var lastErr error
// for i := 0; i < 3; i++ {
// if err := next.Do(ctx, v); err == nil {
// return nil
// } else {
// lastErr = err
// }
// time.Sleep(time.Second * time.Duration(i))
// }
// return fmt.Errorf("failed after 3 attempts: %w", lastErr)
// })
// }
//
// p := New[string](2, worker).Use(retryMiddleware)
//
// Multiple middleware can be chained, and they execute in the same order as provided:
//
// p.Use(logging, metrics, retry) // executes: logging -> metrics -> retry -> worker
package pool
140 changes: 140 additions & 0 deletions exmples/middleware/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Task Processor with Middleware - Example

This example demonstrates how to use middleware in [go-pkgz/pool](https://github.com/go-pkgz/pool) package to build a robust task processing system. It shows both built-in middleware usage and custom middleware creation, emphasizing how middleware can add cross-cutting functionality without modifying the core processing logic.

## What Makes it Special?

1. Middleware composition:
- Shows how multiple middleware work together
- Demonstrates middleware execution order
- Combines both built-in and custom middleware

2. Cross-cutting concerns:
- Input validation before processing
- Automatic retries for failed tasks
- Panic recovery for robustness
- Structured logging for observability

3. Real-world patterns:
- Configuration management
- Error handling
- Metrics collection
- Structured logging with slog

## Features

- Task validation before processing
- Automatic retries with exponential backoff
- Panic recovery with custom handler
- Structured JSON logging
- Performance metrics collection
- Configurable worker count and retry attempts

## Installation

```bash
go build
```

## Usage

```bash
go run main.go [options]
```

Options:
- `-workers` - number of worker goroutines (default: 2)
- `-retries` - number of retries for failed tasks (default: 3)

Example:
```bash
go run main.go -workers 4 -retries 5
```

## Implementation Details

The implementation demonstrates several key concepts:

1. Middleware creation:
```go
func makeStructuredLogger(logger *slog.Logger) pool.Middleware[Task] {
return func(next pool.Worker[Task]) pool.Worker[Task] {
return pool.WorkerFunc[Task](func(ctx context.Context, task Task) error {
// pre-processing logging
err := next.Do(ctx, task)
// post-processing logging
return err
})
}
}
```

2. Middleware composition:
```go
pool.New[Task](workers, makeWorker()).Use(
middleware.Validate(validator), // validate first
middleware.Retry[Task](retries), // then retry on failure
middleware.Recovery[Task](handler), // recover from panics
customLogger, // log everything
)
```

3. Task processing:
```go
type Task struct {
ID string `json:"id"`
Priority int `json:"priority"`
Payload string `json:"payload"`
}
```

## Output Example

```json
{
"time": "2025-02-12T10:00:00Z",
"level": "DEBUG",
"msg": "processing task",
"task_id": "1",
"priority": 1,
"payload": {"id":"1","priority":1,"payload":"normal task"}
}
{
"time": "2025-02-12T10:00:00Z",
"level": "INFO",
"msg": "task completed",
"task_id": "1",
"duration_ms": 100
}
{
"time": "2025-02-12T10:00:00Z",
"level": "ERROR",
"msg": "task failed",
"task_id": "2",
"duration_ms": 100,
"error": "failed to process task 2"
}
```

## Architecture

The program is structured in several logical components:

```
main
├── setupConfig - configuration and logger setup
├── makeWorker - core worker implementation
├── makeValidator - input validation rules
├── makePool - pool creation with middleware
└── runPool - execution and task submission
```

Each component is isolated and has a single responsibility, making the code easy to maintain and test.

## Notes

- Middleware executes in the order it's added to Use()
- The first middleware wraps the outermost layer
- Built-in middleware handles common patterns
- Custom middleware can add any functionality
- Structured logging as an example of cross-cutting concern
9 changes: 9 additions & 0 deletions exmples/middleware/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module examples/middleware

go 1.23.6

require github.com/go-pkgz/pool v0.3.2

require golang.org/x/sync v0.11.0 // indirect

replace github.com/go-pkgz/pool => ../..
10 changes: 10 additions & 0 deletions exmples/middleware/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading

0 comments on commit 3cecbde

Please sign in to comment.