Documentation
¶
Overview ¶
Package pool provides a simple worker pool implementation with a single stage only. It allows submitting tasks to be processed in parallel by a number of workers.
The package supports both stateless and stateful workers through two distinct constructors:
- New - for pools with a single shared worker instance
- NewStateful - for pools where each goroutine gets its own worker instance
Worker Types:
The package provides a simple Worker interface that can be implemented in two ways:
type Worker[T any] interface { Do(ctx context.Context, v T) error }
1. Direct implementation for complex stateful workers:
type dbWorker struct { conn *sql.DB } func (w *dbWorker) Do(ctx context.Context, v string) error { return w.conn.ExecContext(ctx, "INSERT INTO items (value) VALUES (?)", v) }
2. Function adapter for simple stateless workers:
worker := pool.WorkerFunc[string](func(ctx context.Context, v string) error { // process the value return nil })
Basic Usage:
For stateless operations (like HTTP requests, parsing operations, etc.):
worker := pool.WorkerFunc[string](func(ctx context.Context, v string) error { resp, err := http.Get(v) if err != nil { return err } defer resp.Body.Close() return nil }) p := pool.New[string](2, worker) if err := p.Go(context.Background()); err != nil { return err } // submit work p.Submit("task1") p.Submit("task2") if err := p.Close(context.Background()); err != nil { return err }
For stateful operations (like database connections, file handles, etc.):
maker := func() pool.Worker[string] { return &dbWorker{ conn: openConnection(), } } p := pool.NewStateful[string](2, maker)
Features:
- Generic worker pool implementation supporting any data type
- Configurable number of workers running in parallel
- Support for both stateless shared workers and per-worker instances
- Batching capability for processing multiple items at once
- Customizable work distribution through chunk functions
- Built-in metrics collection including processing times and counts
- Error handling with options to continue or stop on errors
- Context-based cancellation and timeouts
- Optional completion callbacks
Advanced Features:
Batching:
p := New[string](2, worker).WithBatchSize(10)
Chunked distribution:
p := New[string](2, worker).WithChunkFn(func(v string) string { return v // items with same hash go to same worker })
Error handling:
p := New[string](2, worker).WithContinueOnError()
Metrics:
The pool automatically tracks standard stats metrics (processed counts, errors, timings). Workers can also record additional custom metrics:
m := metrics.Get(ctx) m.Inc("custom-counter")
Access metrics:
metrics := p.Metrics() value := metrics.Get("custom-counter")
Statistical metrics including:
- Number of processed items
- Number of errors
- Number of dropped items
- Processing time
- Wait time
- Initialization time
- Total time
Access stats:
metrics := p.Metrics() stats := metrics.GetStats() fmt.Printf("processed: %d, errors: %d", stats.Processed, stats.Errors)
Data Collection:
For collecting results from workers, use the Collector:
collector := pool.NewCollector[Result](ctx, 10) worker := pool.WorkerFunc[Input](func(ctx context.Context, v Input) error { result := process(v) collector.Submit(result) return nil })
Results can be retrieved either through iteration:
for v, err := range collector.Iter() { if err != nil { return err } // use v }
Or by collecting all at once:
results, err := collector.All()
Middleware Support:
The pool supports middleware pattern similar to HTTP middleware in Go. Middleware can be used to add functionality like retries, timeouts, metrics, or error handling:
// retry middleware retryMiddleware := func(next Worker[string]) Worker[string] { return WorkerFunc[string](func(ctx context.Context, v string) error { var lastErr error for i := 0; i < 3; i++ { if err := next.Do(ctx, v); err == nil { return nil } else { lastErr = err } time.Sleep(time.Second * time.Duration(i)) } return fmt.Errorf("failed after 3 attempts: %w", lastErr) }) } p := New[string](2, worker).Use(retryMiddleware)
Multiple middleware can be chained, and they execute in the same order as provided:
p.Use(logging, metrics, retry) // executes: logging -> metrics -> retry -> worker
Example (Basic) ¶
// collect output var out []string var mu sync.Mutex worker := WorkerFunc[int](func(_ context.Context, v int) error { mu.Lock() out = append(out, fmt.Sprintf("processed: %d", v)) mu.Unlock() return nil }) p := New[int](2, worker) if err := p.Go(context.Background()); err != nil { panic(err) // handle error, don't panic in real code } // submit work p.Submit(1) p.Submit(2) p.Submit(3) _ = p.Close(context.Background()) // print collected output in sorted order sort.Strings(out) for _, s := range out { fmt.Println(s) }
Output: processed: 1 processed: 2 processed: 3
Example (ChainedCalculation) ¶
// stage 1: calculate fibonacci numbers in parallel type FibResult struct { n int fib uint64 } stage1Collector := NewCollector[FibResult](context.Background(), 10) fibWorker := WorkerFunc[int](func(_ context.Context, n int) error { var a, b uint64 = 0, 1 for i := 0; i < n; i++ { a, b = b, a+b } stage1Collector.Submit(FibResult{n: n, fib: a}) return nil }) // stage 2: calculate factors for each fibonacci number type FactorsResult struct { n uint64 factors []uint64 } stage2Collector := NewCollector[FactorsResult](context.Background(), 10) factorsWorker := WorkerFunc[FibResult](func(_ context.Context, res FibResult) error { if res.fib <= 1 { stage2Collector.Submit(FactorsResult{n: res.fib, factors: []uint64{res.fib}}) return nil } var factors []uint64 n := res.fib for i := uint64(2); i*i <= n; i++ { for n%i == 0 { factors = append(factors, i) n /= i } } if n > 1 { factors = append(factors, n) } stage2Collector.Submit(FactorsResult{n: res.fib, factors: factors}) return nil }) // create and start both pools pool1 := New[int](3, fibWorker) pool1.Go(context.Background()) pool2 := NewStateful[FibResult](2, func() Worker[FibResult] { return factorsWorker }) pool2.Go(context.Background()) // submit numbers to calculate numbers := []int{5, 7, 10} for _, n := range numbers { pool1.Submit(n) } // close pools and collectors in order pool1.Close(context.Background()) stage1Collector.Close() // process stage 1 results in stage 2 for fibRes, err := range stage1Collector.Iter() { if err != nil { fmt.Printf("stage 1 error: %v\n", err) continue } pool2.Submit(fibRes) } pool2.Close(context.Background()) stage2Collector.Close() // collect and sort final results to ensure deterministic output order results, _ := stage2Collector.All() sort.Slice(results, func(i, j int) bool { return results[i].n < results[j].n }) // print results in sorted order for _, res := range results { fmt.Printf("number %d has factors %v\n", res.n, res.factors) }
Output: number 5 has factors [5] number 13 has factors [13] number 55 has factors [5 11]
Example (FibCalculator) ¶
// FibResult type to store both input and calculated Fibonacci number type FibResult struct { n int fib uint64 } // create collector for results collector := NewCollector[FibResult](context.Background(), 10) // worker calculating fibonacci numbers worker := WorkerFunc[int](func(_ context.Context, n int) error { if n <= 0 { return fmt.Errorf("invalid input: %d", n) } // calculate fibonacci number var a, b uint64 = 0, 1 for i := 0; i < n; i++ { a, b = b, a+b } collector.Submit(FibResult{n: n, fib: a}) return nil }) // create pool with 3 workers p := New[int](3, worker) p.Go(context.Background()) // submit numbers to calculate asynchronously go func() { numbers := []int{5, 7, 10, 3, 8} for _, n := range numbers { p.Submit(n) } p.Close(context.Background()) collector.Close() }() // collect results and sort them by input number for consistent output results, _ := collector.All() sort.Slice(results, func(i, j int) bool { return results[i].n < results[j].n }) // print results for _, res := range results { fmt.Printf("fib(%d) = %d\n", res.n, res.fib) }
Output: fib(3) = 2 fib(5) = 5 fib(7) = 13 fib(8) = 21 fib(10) = 55
Example (Middleware) ¶
// Create a worker that sometimes fails worker := WorkerFunc[string](func(_ context.Context, v string) error { if v == "fail" { return errors.New("simulated failure") } fmt.Printf("processed: %s\n", v) return nil }) // Create logging middleware logging := func(next Worker[string]) Worker[string] { return WorkerFunc[string](func(ctx context.Context, v string) error { fmt.Printf("starting: %s\n", v) err := next.Do(ctx, v) fmt.Printf("completed: %s, err: %v\n", v, err) return err }) } // Create retry middleware retry := func(attempts int) Middleware[string] { return func(next Worker[string]) Worker[string] { return WorkerFunc[string](func(ctx context.Context, v string) error { var lastErr error for i := 0; i < attempts; i++ { var err error if err = next.Do(ctx, v); err == nil { return nil } lastErr = err fmt.Printf("attempt %d failed: %v\n", i+1, err) } return fmt.Errorf("failed after %d attempts: %w", attempts, lastErr) }) } } // Create pool with both middleware - retry first since we want logging to be outermost p := New[string](1, worker).Use(retry(2), logging) p.Go(context.Background()) // Process items p.Submit("ok") // should succeed first time p.Submit("fail") // should fail after retries p.Close(context.Background())
Output: starting: ok processed: ok completed: ok, err: <nil> starting: fail completed: fail, err: simulated failure attempt 1 failed: simulated failure starting: fail completed: fail, err: simulated failure attempt 2 failed: simulated failure
Example (WithCollector) ¶
type Item struct { val int label string } // create collector for results with buffer size 10 collector := NewCollector[Item](context.Background(), 10) // create worker that processes numbers and sends results to collector worker := WorkerFunc[int](func(_ context.Context, v int) error { result := Item{ val: v * 2, // double the value label: "proc", // add label } collector.Submit(result) return nil }) // create and start pool p := New[int](2, worker) p.Go(context.Background()) // submit items asynchronously go func() { for i := 1; i <= 3; i++ { p.Submit(i) } p.Close(context.Background()) collector.Close() // close collector after pool is done }() // collect results and sort them for deterministic output results, _ := collector.All() sort.Slice(results, func(i, j int) bool { return results[i].val < results[j].val }) // print sorted results for _, res := range results { fmt.Printf("got result: %d (%s)\n", res.val, res.label) }
Output: got result: 2 (proc) got result: 4 (proc) got result: 6 (proc)
Example (WithCollectorIterator) ¶
collector := NewCollector[string](context.Background(), 5) worker := WorkerFunc[int](func(_ context.Context, v int) error { collector.Submit(fmt.Sprintf("value %d", v)) return nil }) p := New[int](2, worker) p.Go(context.Background()) // submit items asynchronously go func() { for i := 1; i <= 3; i++ { p.Submit(i) } p.Close(context.Background()) collector.Close() }() // collect all values first var values []string for val, err := range collector.Iter() { if err != nil { fmt.Printf("error: %v\n", err) continue } values = append(values, val) } // sort and print values for deterministic output sort.Strings(values) for _, val := range values { fmt.Printf("processed: %s\n", val) }
Output: processed: value 1 processed: value 2 processed: value 3
Example (WithContext) ¶
started := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() worker := WorkerFunc[int](func(ctx context.Context, v int) error { close(started) // signal that worker started <-ctx.Done() // wait for cancellation return ctx.Err() }) p := New[int](1, worker).WithBatchSize(0) // disable batching p.Go(ctx) p.Submit(1) <-started // wait for worker to start cancel() // cancel context err := p.Close(context.Background()) fmt.Printf("got error: %v\n", err != nil)
Output: got error: true
Example (WithError) ¶
// collect output to ensure deterministic order var out []string var mu sync.Mutex worker := WorkerFunc[int](func(_ context.Context, v int) error { if v == 0 { return fmt.Errorf("zero value not allowed") } mu.Lock() out = append(out, fmt.Sprintf("processed: %d", v)) mu.Unlock() return nil }) p := New[int](1, worker).WithContinueOnError() // don't stop on errors p.Go(context.Background()) p.Submit(1) p.Submit(0) // this will fail but processing continues p.Submit(2) err := p.Close(context.Background()) if err != nil { mu.Lock() out = append(out, fmt.Sprintf("finished with error: %v", err)) mu.Unlock() } // print collected output in sorted order sort.Strings(out) for _, s := range out { fmt.Println(s) }
Output: finished with error: total errors: 1, last error: worker 0 failed: zero value not allowed processed: 1 processed: 2
Example (WithRouting) ¶
// collect output with sync.Map for thread safety var out sync.Map worker := WorkerFunc[int](func(ctx context.Context, v int) error { out.Store(v, fmt.Sprintf("worker %d got %d", metrics.WorkerID(ctx), v)) return nil }) // create pool with chunk function that routes based on even/odd p := New[int](2, worker).WithChunkFn(func(v int) string { if v%2 == 0 { return "even" } return "odd" }, ) p.Go(context.Background()) // Submit all numbers for i := 1; i <= 4; i++ { p.Submit(i) } p.Close(context.Background()) // print in order to ensure deterministic output for i := 1; i <= 4; i++ { if v, ok := out.Load(i); ok { fmt.Println(v) } }
Output: worker 0 got 1 worker 1 got 2 worker 0 got 3 worker 1 got 4
Example (WorkerTypes) ¶
// These two workers are functionally equivalent: // 1. Implementing Worker interface explicitly // 2. Using WorkerFunc adapter - same thing, just shorter workerFn := WorkerFunc[string](func(_ context.Context, v string) error { fmt.Printf("processed: %s\n", v) return nil }) // Run first pool to completion p1 := New[string](1, &processingWorker{}) p1.Go(context.Background()) p1.Submit("task1") p1.Close(context.Background()) // Then run second pool p2 := New[string](1, workerFn) p2.Go(context.Background()) p2.Submit("task2") p2.Close(context.Background())
Output: processed: task1 processed: task2
Index ¶
- type Collector
- type GroupCompleteFn
- type Middleware
- type Send
- type Worker
- type WorkerCompleteFn
- type WorkerFunc
- type WorkerGroup
- func (p *WorkerGroup[T]) Close(ctx context.Context) error
- func (p *WorkerGroup[T]) Go(ctx context.Context) error
- func (p *WorkerGroup[T]) Metrics() *metrics.Value
- func (p *WorkerGroup[T]) Send(v T)
- func (p *WorkerGroup[T]) Submit(v T)
- func (p *WorkerGroup[T]) Use(middlewares ...Middleware[T]) *WorkerGroup[T]
- func (p *WorkerGroup[T]) Wait(ctx context.Context) error
- func (p *WorkerGroup[T]) WithBatchSize(size int) *WorkerGroup[T]
- func (p *WorkerGroup[T]) WithChunkFn(fn func(T) string) *WorkerGroup[T]
- func (p *WorkerGroup[T]) WithContinueOnError() *WorkerGroup[T]
- func (p *WorkerGroup[T]) WithPoolCompleteFn(fn GroupCompleteFn[T]) *WorkerGroup[T]
- func (p *WorkerGroup[T]) WithWorkerChanSize(size int) *WorkerGroup[T]
- func (p *WorkerGroup[T]) WithWorkerCompleteFn(fn WorkerCompleteFn[T]) *WorkerGroup[T]
- type WorkerMaker
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Collector ¶
type Collector[V any] struct { // contains filtered or unexported fields }
Collector provides synchronous access to async data from pool's response channel
func NewCollector ¶
NewCollector creates a new collector with a given context and buffer size for the channel
type GroupCompleteFn ¶ added in v0.6.0
GroupCompleteFn called once when all workers are done
type Middleware ¶ added in v0.4.0
Middleware wraps worker and adds functionality
type WorkerCompleteFn ¶ added in v0.6.0
WorkerCompleteFn called on worker completion
type WorkerFunc ¶
WorkerFunc is an adapter to allow the use of ordinary functions as Workers.
type WorkerGroup ¶
type WorkerGroup[T any] struct { // contains filtered or unexported fields }
WorkerGroup represents a pool of workers processing items in parallel. Supports both direct item processing and batching modes.
func New ¶
func New[T any](size int, worker Worker[T]) *WorkerGroup[T]
New creates a worker pool with a shared worker instance. All goroutines share the same worker, suitable for stateless processing.
func NewStateful ¶ added in v0.2.0
func NewStateful[T any](size int, maker func() Worker[T]) *WorkerGroup[T]
NewStateful creates a worker pool where each goroutine gets its own worker instance. Suitable for operations requiring state (e.g., database connections).
func (*WorkerGroup[T]) Close ¶
func (p *WorkerGroup[T]) Close(ctx context.Context) error
Close pool. Has to be called by consumer as the indication of "all records submitted". The call is blocking till all processing completed by workers. After this call poll can't be reused. Returns an error if any happened during the run
func (*WorkerGroup[T]) Go ¶
func (p *WorkerGroup[T]) Go(ctx context.Context) error
Go activates the pool and starts worker goroutines. Must be called before submitting items.
func (*WorkerGroup[T]) Metrics ¶
func (p *WorkerGroup[T]) Metrics() *metrics.Value
Metrics returns combined metrics from all workers
func (*WorkerGroup[T]) Send ¶ added in v0.6.0
func (p *WorkerGroup[T]) Send(v T)
Send adds an item to the pool for processing. Safe for concurrent use, intended for worker-to-pool submissions or for use by multiple concurrent producers.
func (*WorkerGroup[T]) Submit ¶
func (p *WorkerGroup[T]) Submit(v T)
Submit adds an item to the pool for processing. May block if worker channels are full. Not thread-safe, intended for use by the main thread ot a single producer's thread.
func (*WorkerGroup[T]) Use ¶ added in v0.4.0
func (p *WorkerGroup[T]) Use(middlewares ...Middleware[T]) *WorkerGroup[T]
Use applies middlewares to the worker group's worker. Middlewares are applied in the same order as they are provided, matching the HTTP middleware pattern in Go. The first middleware is the outermost wrapper, and the last middleware is the innermost wrapper closest to the original worker.
func (*WorkerGroup[T]) Wait ¶
func (p *WorkerGroup[T]) Wait(ctx context.Context) error
Wait till workers completed and the result channel closed.
func (*WorkerGroup[T]) WithBatchSize ¶ added in v0.3.0
func (p *WorkerGroup[T]) WithBatchSize(size int) *WorkerGroup[T]
WithBatchSize enables item batching with specified size. Items are accumulated until batch is full before processing. Set to 0 to disable batching. Default: 10
func (*WorkerGroup[T]) WithChunkFn ¶ added in v0.3.0
func (p *WorkerGroup[T]) WithChunkFn(fn func(T) string) *WorkerGroup[T]
WithChunkFn enables predictable item distribution. Items with the same key (returned by fn) are processed by the same worker. Useful for maintaining order within groups of related items. Default: none (random distribution)
func (*WorkerGroup[T]) WithContinueOnError ¶ added in v0.3.0
func (p *WorkerGroup[T]) WithContinueOnError() *WorkerGroup[T]
WithContinueOnError sets whether the pool should continue on error. Default: false
func (*WorkerGroup[T]) WithPoolCompleteFn ¶ added in v0.6.0
func (p *WorkerGroup[T]) WithPoolCompleteFn(fn GroupCompleteFn[T]) *WorkerGroup[T]
WithPoolCompleteFn sets callback executed once when all workers are done
func (*WorkerGroup[T]) WithWorkerChanSize ¶ added in v0.3.0
func (p *WorkerGroup[T]) WithWorkerChanSize(size int) *WorkerGroup[T]
WithWorkerChanSize sets channel buffer size for each worker. Larger sizes can help with bursty workloads but increase memory usage. Default: 1
func (*WorkerGroup[T]) WithWorkerCompleteFn ¶ added in v0.6.0
func (p *WorkerGroup[T]) WithWorkerCompleteFn(fn WorkerCompleteFn[T]) *WorkerGroup[T]
WithWorkerCompleteFn sets callback executed on worker completion. Useful for cleanup or finalization of worker resources. Default: none (disabled)
type WorkerMaker ¶
WorkerMaker is a function that returns a new Worker
Directories
¶
Path | Synopsis |
---|---|
Package metrics provides a way to collect metrics in a thread-safe way
|
Package metrics provides a way to collect metrics in a thread-safe way |
Package middleware provides common middleware implementations for the pool package.
|
Package middleware provides common middleware implementations for the pool package. |