pool

package module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2025 License: MIT Imports: 11 Imported by: 0

README

pool Build Status Coverage Status godoc

pool is a Go package that provides a generic, efficient worker pool implementation for parallel task processing. Built for Go 1.21+, it offers a flexible API with features like batching, work distribution strategies, and comprehensive metrics collection.

Features

  • Generic implementation supporting any data type
  • Configurable number of parallel workers
  • Support for both stateless shared workers and per-worker instances
  • Batching capability for processing multiple items at once
  • Customizable work distribution through chunk functions
  • Built-in metrics collection (processing times, counts, etc.)
  • Error handling with continue/stop options
  • Context-based cancellation and timeouts
  • Optional completion callbacks
  • Extensible middleware system for custom functionality
  • Built-in middlewares for common tasks
  • No external dependencies except for the testing framework

Quick Start

Here's a practical example showing how to process a list of URLs in parallel:

func main() {
    // create a worker that fetches URLs
    worker := pool.WorkerFunc[string](func(ctx context.Context, url string) error {
        resp, err := http.Get(url)
        if err != nil {
            return fmt.Errorf("failed to fetch %s: %w", url, err)
        }
        defer resp.Body.Close()
        
        if resp.StatusCode != http.StatusOK {
            return fmt.Errorf("bad status code from %s: %d", url, resp.StatusCode)
        }
        return nil
    })
	
    // create a pool with 5 workers 
    p := pool.New[string](5, worker).WithContinueOnError(), // don't stop on errors

    // start the pool
    if err := p.Go(context.Background()); err != nil {
        log.Fatal(err)
    }

    // submit URLs for processing
    urls := []string{
        "https://example.com",
        "https://example.org",
        "https://example.net",
    }
    
    go func() {
        // submit URLs and signal when done
        defer p.Close(context.Background())
        for _, url := range urls {
            p.Submit(url)
        }
    }()

    // wait for all URLs to be processed
    if err := p.Wait(context.Background()); err != nil {
        log.Printf("some URLs failed: %v", err)
    }

    // get metrics
    metrics := p.Metrics()
    stats := metrics.GetStats()
    fmt.Printf("Processed: %d, Errors: %d, Time taken: %v\n",
        stats.Processed, stats.Errors, stats.TotalTime)
}

For more examples, see the examples directory.

Motivation

While Go provides excellent primitives for concurrent programming with goroutines, channels, and sync primitives, building production-ready concurrent data processing systems often requires more sophisticated patterns. This package emerged from real-world needs encountered in various projects where basic concurrency primitives weren't enough.

Common challenges this package addresses:

  1. Stateful Processing

    • Need to maintain worker-specific state (counters, caches, connections)
    • Each worker requires its own resources (database connections, file handles)
    • State needs to be isolated to avoid synchronization
  2. Controlled Work Distribution

    • Ensuring related items are processed by the same worker
    • Maintaining processing order for specific groups of items
    • Optimizing cache usage by routing similar items together
  3. Resource Management

    • Limiting number of goroutines in large-scale processing
    • Managing cleanup of worker resources
    • Handling graceful shutdown
  4. Performance Optimization

    • Batching items to reduce channel communication overhead
    • Balancing worker load with different distribution strategies
    • Buffering to handle uneven processing speeds
  5. Operational Visibility

    • Need for detailed metrics about processing
    • Understanding bottlenecks and performance issues
    • Monitoring system health

Core Concepts

Worker Types

The pool supports three ways to implement and manage workers:

  1. Core Interface:

    // Worker is the interface that wraps the Do method
    type Worker[T any] interface {
        Do(ctx context.Context, v T) error
    }
    
    // WorkerFunc is an adapter to allow using ordinary functions as Workers
    type WorkerFunc[T any] func(ctx context.Context, v T) error
    
    func (f WorkerFunc[T]) Do(ctx context.Context, v T) error { return f(ctx, v) }
    
  2. Stateless Shared Workers:

    // single worker instance shared between all goroutines
    worker := pool.WorkerFunc[string](func(ctx context.Context, v string) error {
        // process v
        return nil
    })
    
    p := pool.New[string](5, worker)
    
    • One worker instance serves all goroutines
    • Good for stateless operations
    • More memory efficient
  3. Per-Worker Instances:

    type dbWorker struct {
        conn *sql.DB
        processed int
    }
    
    func (w *dbWorker) Do(ctx context.Context, v string) error {
        w.processed++
        return w.conn.ExecContext(ctx, "INSERT INTO items (value) VALUES (?)", v)
    }
    
    // create new instance for each goroutine
    maker := func() pool.Worker[string] {
        w := &dbWorker{
            conn: openConnection(), // each worker gets own connection
        }
        return w
    }
    
    p := pool.NewStateful[string](5, maker)
    
Batching Processing

Batching reduces channel communication overhead by processing multiple items at once:

// process items in batches of 10
p := pool.New[string](2, worker).WithBatchSize(10)

// worker receives items one by one
worker := pool.WorkerFunc[string](func(ctx context.Context, v string) error {
    // v is one item from the batch
    return nil
})

How batching works:

  1. Pool accumulates submitted items internally until batch size is reached
  2. Full batch is sent to worker as a single channel operation
  3. Worker processes each item in the batch sequentially
  4. Last batch may be smaller if items don't divide evenly

When to use batching:

  • High-volume processing where channel operations are a bottleneck
  • When processing overhead per item is low compared to channel communication
Work Distribution

Control how work is distributed among workers using chunk functions:

// distribute by first character of string
p := pool.New[string](3, worker).WithChunkFn(func(v string) string {
	return v[:1] // same first char goes to same worker
})

// distribute by user ID to ensure user's tasks go to same worker
p := pool.New[Task](3, worker).WithChunkFn(func(t Task) string {
	return strconv.Itoa(t.UserID)
})

How distribution works:

  1. Without chunk function:

    • Items are distributed randomly among workers
    • Good for independent tasks
  2. With chunk function:

    • Function returns string key for each item
    • Items with the same key always go to the same worker
    • Uses consistent hashing to map keys to workers

When to use custom distribution:

  • Maintain ordering for related items
  • Optimize cache usage by worker
  • Ensure exclusive access to resources
  • Process data consistently

Middleware Support

The package supports middleware pattern similar to HTTP middleware in Go. Middleware can be used to add cross-cutting concerns like:

  • Retries with backoff
  • Timeouts
  • Panic recovery
  • Metrics and logging
  • Error handling

Built-in middleware:

// Add retry with exponential backoff
p.Use(middleware.Retry[string](3, time.Second))

// Add timeout per operation
p.Use(middleware.Timeout[string](5 * time.Second))

// Add panic recovery
p.Use(middleware.Recovery[string](func(p interface{}) {
    log.Printf("recovered from panic: %v", p)
}))

// Add validation before processing
p.Use(middleware.Validate([string]validator))

Custom middleware:

logging := func(next pool.Worker[string]) pool.Worker[string] {
    return pool.WorkerFunc[string](func(ctx context.Context, v string) error {
        log.Printf("processing: %v", v)
        err := next.Do(ctx, v)
        log.Printf("completed: %v, err: %v", v, err)
        return err
    })
}

p.Use(logging)

Multiple middleware execute in the same order as provided:

p.Use(logging, metrics, retry)  // order: logging -> metrics -> retry -> worker

Install and update

go get -u github.com/go-pkgz/pool

Usage Examples

Basic Example
func main() {
    // create a worker function processing strings
    worker := pool.WorkerFunc[string](func(ctx context.Context, v string) error {
        fmt.Printf("processing: %s\n", v)
        return nil
    })

    // create a pool with 2 workers
    p := pool.New[string](2, worker)

    // start the pool
    if err := p.Go(context.Background()); err != nil {
        log.Fatal(err)
    }

    // submit work
    p.Submit("task1")
    p.Submit("task2")
    p.Submit("task3")

    // close the pool and wait for completion
    if err := p.Close(context.Background()); err != nil {
        log.Fatal(err)
    }
}
Error Handling
worker := pool.WorkerFunc[string](func(ctx context.Context, v string) error {
    if strings.Contains(v, "error") {
        return fmt.Errorf("failed to process %s", v)
    }
    return nil
})

// continue processing on errors
p := pool.New[string](2, worker).WithContinueOnError()
Collecting Results
// create a collector for results
collector := pool.NewCollector[Result](ctx, 10)

// worker that produces results
worker := pool.WorkerFunc[Input](func(ctx context.Context, v Input) error {
    result := process(v)
    collector.Submit(result)
    return nil
})

p := pool.New[Input](2, worker)

// get results through iteration
for v, err := range collector.Iter() {
    if err != nil {
        return err
    }
    // use v
}

// or collect all at once
results, err := collector.All()
Metrics and Monitoring
// create worker with metrics tracking
worker := pool.WorkerFunc[string](func(ctx context.Context, v string) error {
    m := metrics.Get(ctx)
    if strings.HasPrefix(v, "important") {
        m.Inc("important-tasks")
    }
    return process(v)
})

// create and run pool
p := pool.New[string](2, worker)
p.Go(context.Background())

// process work
p.Submit("task1")
p.Submit("important-task2")
p.Close(context.Background())

// get metrics
metrics := p.Metrics()
stats := metrics.GetStats()
fmt.Printf("Processed: %d\n", stats.Processed)
fmt.Printf("Errors: %d\n", stats.Errors)
fmt.Printf("Processing time: %v\n", stats.ProcessingTime)
fmt.Printf("Wait time: %v\n", stats.WaitTime)
fmt.Printf("Total time: %v\n", stats.TotalTime)

// get custom metrics
fmt.Printf("Important tasks: %d\n", metrics.Get("important-tasks"))

Flow Control

The package provides several methods for flow control and completion:

// Submit adds items to the pool. Not safe for concurrent use.
// Used by the producer (sender) of data.
p.Submit(item)

// Send safely adds items to the pool from multiple goroutines.
// Used when submitting from worker to another pool, or when multiple goroutines send data.
p.Send(item)

// Close tells workers no more data will be submitted.
// Used by the producer (sender) of data.
p.Close(ctx)  

// Wait blocks until all processing is done.
// Used by the consumer (receiver) of results.
p.Wait(ctx)   

Common usage patterns:

// 1. Single producer submitting items
go func() {
    defer p.Close(ctx) // signal no more data
    for _, task := range tasks {
        p.Submit(task) // Submit is safe here - single goroutine
    }
}()

// 2. Workers submitting to next stage
p1 := pool.New[int](5, pool.WorkerFunc[int](func(ctx context.Context, v int) error {
    result := process(v)
    p2.Send(result) // Send is safe for concurrent calls from workers
    return nil
}))

// 3. Consumer waiting for completion
if err := p.Wait(ctx); err != nil {
    // handle error
}

Pool completion callback allows executing code when all workers are done:

p := pool.New[string](5, worker).
    WithPoolCompleteFn(func(ctx context.Context) error {
        // called once after all workers complete
        log.Println("all workers finished")
        return nil
    })

The completion callback executes when:

  • All workers have completed processing
  • Errors occurred but pool continued (WithContinueOnError())
  • Does not execute on context cancellation

Important notes:

  • Use Submit when sending items from a single goroutine
  • Use Send when workers need to submit items to another pool
  • Pool completion callback helps coordinate multi-stage processing
  • Errors in completion callback are included in pool's error result

Optional parameters

Configure pool behavior using With methods:

p := pool.New[string](2, worker).  // pool with 2 workers
    WithBatchSize(10).             // process items in batches
    WithWorkerChanSize(5).         // set worker channel buffer size
    WithChunkFn(chunkFn).          // control work distribution
    WithContinueOnError().         // don't stop on errors
    WithCompleteFn(completeFn)     // called when worker finishes

Available options:

  • WithBatchSize(size int) - enables batch processing, accumulating items before sending to workers (default: 10)
  • WithWorkerChanSize(size int) - sets buffer size for worker channels (default: 1)
  • WithChunkFn(fn func(T) string) - controls work distribution by key (default: none, random distribution)
  • WithContinueOnError() - continues processing on errors (default: false)
  • WithWorkerCompleteFn(fn func(ctx, id, worker)) - called on worker completion (default: none)
  • WithPoolCompleteFn(fn func(ctx)) - called on pool completion, i.e., when all workers have completed (default: none)

Collector

The Collector helps manage asynchronous results from pool workers in a synchronous way. It's particularly useful when you need to gather and process results from worker's processing. The Collector uses Go generics and is compatible with any result type.

Features
  • Generic implementation supporting any result type
  • Context awareness with graceful cancellation
  • Buffered collection with configurable size
  • Built-in iterator pattern
  • Ability to collect all results at once
Example Usage
// create a collector for results with buffer of 10
collector := pool.NewCollector[string](ctx, 10)

// worker submits results to collector
worker := pool.WorkerFunc[int](func(ctx context.Context, v int) error {
    result := process(v)
    collector.Submit(result)
    return nil
})

// create and run pool
p := pool.New[int](5, worker)
require.NoError(t, p.Go(ctx))

// submit items
for i := 0; i < 100; i++ {
    p.Submit(i)
}
p.Close(ctx)

// Option 1: process results as they arrive with iterator
for result, err := range collector.Iter() {
    if err != nil {
        return err // context cancelled or other error
    }
    // process result
}

// Option 2: get all results at once
results, err := collector.All()
if err != nil {
    return err
}
// use results slice
API Reference
// create new collector
collector := pool.NewCollector[ResultType](ctx, bufferSize)

// submit result to collector
collector.Submit(result)

// close collector when done submitting
collector.Close()

// iterate over results
for result, err := range collector.Iter() {
    // process result
}

// get all results
results, err := collector.All()
Best Practices
  1. Buffer Size: Choose based on expected throughput and memory constraints

    • Too small: may block workers
    • Too large: may use excessive memory
  2. Error Handling: Always check error from iterator

    for result, err := range collector.Iter() {
        if err != nil {
            // handle context cancellation
            return err
        }
    }
    
  3. Context Usage: Pass context that matches pool's lifecycle

    collector := pool.NewCollector[Result](poolCtx, size)
    
  4. Cleanup: Close collector when done submitting

    defer collector.Close()
    

Performance

The pool package is designed for high performance and efficiency. Benchmarks show that it consistently outperforms both the standard errgroup-based approach and traditional goroutine patterns with shared channels.

Benchmark Results

Tests running 1,000,000 tasks with 8 workers on Apple M4 Max:

errgroup:                                     1.878s
pool (default):                               1.213s (~35% faster)
pool (chan size=100):                         1.199s
pool (chan size=100, batch size=100):         1.105s (~41% faster)
pool (with chunking):                         1.113s

Detailed benchmark comparison (lower is better):

errgroup:                                     18.56ms/op
pool (default):                               12.29ms/op
pool (chan size=100):                         12.35ms/op
pool (batch size=100):                        11.22ms/op
pool (with batching and chunking):            11.43ms/op
Why Pool is Faster
  1. Efficient Channel Usage

    • The pool uses dedicated channels per worker when chunking is enabled
    • Default channel buffer size is optimized for common use cases
    • Minimizes channel contention compared to shared channel approaches
  2. Smart Batching

    • Reduces channel communication overhead by processing multiple items at once
    • Default batch size of 10 provides good balance between latency and throughput
    • Accumulators pre-allocated with capacity to minimize memory allocations
  3. Work Distribution

    • Optional chunking ensures related tasks go to the same worker
    • Improves cache locality and reduces cross-worker coordination
    • Hash-based distribution provides good load balancing
  4. Resource Management

    • Workers are pre-initialized and reused
    • No per-task goroutine creation overhead
    • Efficient cleanup and resource handling
Configuration Impact
  • Default Settings: Out of the box, the pool is ~35% faster than errgroup
  • Channel Buffering: Increasing channel size can help with bursty workloads
  • Batching: Adding batching improves performance by another ~6%
  • Chunking: Optional chunking has minimal overhead when enabled
When to Use What
  1. Default Settings - Good for most use cases

    p := pool.New[string](5, worker)
    
  2. High-Throughput - For heavy workloads with many items

    p := pool.New[string](5, worker).
        WithWorkerChanSize(100).
        WithBatchSize(100)
    
  3. Related Items - When items need to be processed by the same worker

    p := pool.New[string](5, worker).
        WithChunkFn(func(v string) string {
            return v[:1] // group by first character
        })
    
Alternative pool implementations
  • pond - pond is a minimalistic and high-performance Go library designed to elegantly manage concurrent tasks.
  • goworker - goworker is a Resque-compatible, Go-based background worker. It allows you to push jobs into a queue using an expressive language like Ruby while harnessing the efficiency and concurrency of Go to minimize job latency and cost.
  • gowp - golang worker pool
  • conc - better structured concurrency for go
  • for more see awesome-go goroutines list

Contributing

Contributions to pool are welcome! Please submit a pull request or open an issue for any bugs or feature requests.

License

pool is available under the MIT license. See the LICENSE file for more info.

Documentation

Overview

Package pool provides a simple worker pool implementation with a single stage only. It allows submitting tasks to be processed in parallel by a number of workers.

The package supports both stateless and stateful workers through two distinct constructors:

  • New - for pools with a single shared worker instance
  • NewStateful - for pools where each goroutine gets its own worker instance

Worker Types:

The package provides a simple Worker interface that can be implemented in two ways:

type Worker[T any] interface {
    Do(ctx context.Context, v T) error
}

1. Direct implementation for complex stateful workers:

type dbWorker struct {
    conn *sql.DB
}

func (w *dbWorker) Do(ctx context.Context, v string) error {
    return w.conn.ExecContext(ctx, "INSERT INTO items (value) VALUES (?)", v)
}

2. Function adapter for simple stateless workers:

worker := pool.WorkerFunc[string](func(ctx context.Context, v string) error {
    // process the value
    return nil
})

Basic Usage:

For stateless operations (like HTTP requests, parsing operations, etc.):

worker := pool.WorkerFunc[string](func(ctx context.Context, v string) error {
    resp, err := http.Get(v)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    return nil
})

p := pool.New[string](2, worker)
if err := p.Go(context.Background()); err != nil {
    return err
}

// submit work
p.Submit("task1")
p.Submit("task2")

if err := p.Close(context.Background()); err != nil {
    return err
}

For stateful operations (like database connections, file handles, etc.):

maker := func() pool.Worker[string] {
    return &dbWorker{
        conn: openConnection(),
    }
}
p := pool.NewStateful[string](2, maker)

Features:

  • Generic worker pool implementation supporting any data type
  • Configurable number of workers running in parallel
  • Support for both stateless shared workers and per-worker instances
  • Batching capability for processing multiple items at once
  • Customizable work distribution through chunk functions
  • Built-in metrics collection including processing times and counts
  • Error handling with options to continue or stop on errors
  • Context-based cancellation and timeouts
  • Optional completion callbacks

Advanced Features:

Batching:

p := New[string](2, worker).WithBatchSize(10)

Chunked distribution:

p := New[string](2, worker).WithChunkFn(func(v string) string {
    return v // items with same hash go to same worker
})

Error handling:

p := New[string](2, worker).WithContinueOnError()

Metrics:

The pool automatically tracks standard stats metrics (processed counts, errors, timings). Workers can also record additional custom metrics:

m := metrics.Get(ctx)
m.Inc("custom-counter")

Access metrics:

metrics := p.Metrics()
value := metrics.Get("custom-counter")

Statistical metrics including:

  • Number of processed items
  • Number of errors
  • Number of dropped items
  • Processing time
  • Wait time
  • Initialization time
  • Total time

Access stats:

metrics := p.Metrics()
stats := metrics.GetStats()
fmt.Printf("processed: %d, errors: %d", stats.Processed, stats.Errors)

Data Collection:

For collecting results from workers, use the Collector:

collector := pool.NewCollector[Result](ctx, 10)
worker := pool.WorkerFunc[Input](func(ctx context.Context, v Input) error {
    result := process(v)
    collector.Submit(result)
    return nil
})

Results can be retrieved either through iteration:

for v, err := range collector.Iter() {
    if err != nil {
        return err
    }
    // use v
}

Or by collecting all at once:

results, err := collector.All()

Middleware Support:

The pool supports middleware pattern similar to HTTP middleware in Go. Middleware can be used to add functionality like retries, timeouts, metrics, or error handling:

// retry middleware
retryMiddleware := func(next Worker[string]) Worker[string] {
    return WorkerFunc[string](func(ctx context.Context, v string) error {
        var lastErr error
        for i := 0; i < 3; i++ {
            if err := next.Do(ctx, v); err == nil {
                return nil
            } else {
                lastErr = err
            }
            time.Sleep(time.Second * time.Duration(i))
        }
        return fmt.Errorf("failed after 3 attempts: %w", lastErr)
    })
}

p := New[string](2, worker).Use(retryMiddleware)

Multiple middleware can be chained, and they execute in the same order as provided:

p.Use(logging, metrics, retry)  // executes: logging -> metrics -> retry -> worker
Example (Basic)
// collect output
var out []string
var mu sync.Mutex

worker := WorkerFunc[int](func(_ context.Context, v int) error {
	mu.Lock()
	out = append(out, fmt.Sprintf("processed: %d", v))
	mu.Unlock()
	return nil
})

p := New[int](2, worker)
if err := p.Go(context.Background()); err != nil {
	panic(err) // handle error, don't panic in real code
}

// submit work
p.Submit(1)
p.Submit(2)
p.Submit(3)

_ = p.Close(context.Background())

// print collected output in sorted order
sort.Strings(out)
for _, s := range out {
	fmt.Println(s)
}
Output:

processed: 1
processed: 2
processed: 3
Example (ChainedCalculation)
// stage 1: calculate fibonacci numbers in parallel
type FibResult struct {
	n   int
	fib uint64
}
stage1Collector := NewCollector[FibResult](context.Background(), 10)

fibWorker := WorkerFunc[int](func(_ context.Context, n int) error {
	var a, b uint64 = 0, 1
	for i := 0; i < n; i++ {
		a, b = b, a+b
	}
	stage1Collector.Submit(FibResult{n: n, fib: a})
	return nil
})

// stage 2: calculate factors for each fibonacci number
type FactorsResult struct {
	n       uint64
	factors []uint64
}
stage2Collector := NewCollector[FactorsResult](context.Background(), 10)

factorsWorker := WorkerFunc[FibResult](func(_ context.Context, res FibResult) error {
	if res.fib <= 1 {
		stage2Collector.Submit(FactorsResult{n: res.fib, factors: []uint64{res.fib}})
		return nil
	}

	var factors []uint64
	n := res.fib
	for i := uint64(2); i*i <= n; i++ {
		for n%i == 0 {
			factors = append(factors, i)
			n /= i
		}
	}
	if n > 1 {
		factors = append(factors, n)
	}

	stage2Collector.Submit(FactorsResult{n: res.fib, factors: factors})
	return nil
})

// create and start both pools
pool1 := New[int](3, fibWorker)
pool1.Go(context.Background())

pool2 := NewStateful[FibResult](2, func() Worker[FibResult] {
	return factorsWorker
})
pool2.Go(context.Background())

// submit numbers to calculate
numbers := []int{5, 7, 10}
for _, n := range numbers {
	pool1.Submit(n)
}

// close pools and collectors in order
pool1.Close(context.Background())
stage1Collector.Close()

// process stage 1 results in stage 2
for fibRes, err := range stage1Collector.Iter() {
	if err != nil {
		fmt.Printf("stage 1 error: %v\n", err)
		continue
	}
	pool2.Submit(fibRes)
}

pool2.Close(context.Background())
stage2Collector.Close()

// collect and sort final results to ensure deterministic output order
results, _ := stage2Collector.All()
sort.Slice(results, func(i, j int) bool {
	return results[i].n < results[j].n
})

// print results in sorted order
for _, res := range results {
	fmt.Printf("number %d has factors %v\n", res.n, res.factors)
}
Output:

number 5 has factors [5]
number 13 has factors [13]
number 55 has factors [5 11]
Example (FibCalculator)
// FibResult type to store both input and calculated Fibonacci number
type FibResult struct {
	n   int
	fib uint64
}

// create collector for results
collector := NewCollector[FibResult](context.Background(), 10)

// worker calculating fibonacci numbers
worker := WorkerFunc[int](func(_ context.Context, n int) error {
	if n <= 0 {
		return fmt.Errorf("invalid input: %d", n)
	}

	// calculate fibonacci number
	var a, b uint64 = 0, 1
	for i := 0; i < n; i++ {
		a, b = b, a+b
	}

	collector.Submit(FibResult{n: n, fib: a})
	return nil
})

// create pool with 3 workers
p := New[int](3, worker)
p.Go(context.Background())

// submit numbers to calculate asynchronously
go func() {
	numbers := []int{5, 7, 10, 3, 8}
	for _, n := range numbers {
		p.Submit(n)
	}
	p.Close(context.Background())
	collector.Close()
}()

// collect results and sort them by input number for consistent output
results, _ := collector.All()
sort.Slice(results, func(i, j int) bool {
	return results[i].n < results[j].n
})

// print results
for _, res := range results {
	fmt.Printf("fib(%d) = %d\n", res.n, res.fib)
}
Output:

fib(3) = 2
fib(5) = 5
fib(7) = 13
fib(8) = 21
fib(10) = 55
Example (Middleware)
// Create a worker that sometimes fails
worker := WorkerFunc[string](func(_ context.Context, v string) error {
	if v == "fail" {
		return errors.New("simulated failure")
	}
	fmt.Printf("processed: %s\n", v)
	return nil
})

// Create logging middleware
logging := func(next Worker[string]) Worker[string] {
	return WorkerFunc[string](func(ctx context.Context, v string) error {
		fmt.Printf("starting: %s\n", v)
		err := next.Do(ctx, v)
		fmt.Printf("completed: %s, err: %v\n", v, err)
		return err
	})
}

// Create retry middleware
retry := func(attempts int) Middleware[string] {
	return func(next Worker[string]) Worker[string] {
		return WorkerFunc[string](func(ctx context.Context, v string) error {
			var lastErr error
			for i := 0; i < attempts; i++ {
				var err error
				if err = next.Do(ctx, v); err == nil {
					return nil
				}
				lastErr = err
				fmt.Printf("attempt %d failed: %v\n", i+1, err)
			}
			return fmt.Errorf("failed after %d attempts: %w", attempts, lastErr)
		})
	}
}

// Create pool with both middleware - retry first since we want logging to be outermost
p := New[string](1, worker).Use(retry(2), logging)
p.Go(context.Background())

// Process items
p.Submit("ok")   // should succeed first time
p.Submit("fail") // should fail after retries
p.Close(context.Background())
Output:

starting: ok
processed: ok
completed: ok, err: <nil>
starting: fail
completed: fail, err: simulated failure
attempt 1 failed: simulated failure
starting: fail
completed: fail, err: simulated failure
attempt 2 failed: simulated failure
Example (WithCollector)
type Item struct {
	val   int
	label string
}

// create collector for results with buffer size 10
collector := NewCollector[Item](context.Background(), 10)

// create worker that processes numbers and sends results to collector
worker := WorkerFunc[int](func(_ context.Context, v int) error {
	result := Item{
		val:   v * 2,  // double the value
		label: "proc", // add label
	}
	collector.Submit(result)
	return nil
})

// create and start pool
p := New[int](2, worker)
p.Go(context.Background())

// submit items asynchronously
go func() {
	for i := 1; i <= 3; i++ {
		p.Submit(i)
	}
	p.Close(context.Background())
	collector.Close() // close collector after pool is done
}()

// collect results and sort them for deterministic output
results, _ := collector.All()
sort.Slice(results, func(i, j int) bool {
	return results[i].val < results[j].val
})

// print sorted results
for _, res := range results {
	fmt.Printf("got result: %d (%s)\n", res.val, res.label)
}
Output:

got result: 2 (proc)
got result: 4 (proc)
got result: 6 (proc)
Example (WithCollectorIterator)
collector := NewCollector[string](context.Background(), 5)

worker := WorkerFunc[int](func(_ context.Context, v int) error {
	collector.Submit(fmt.Sprintf("value %d", v))
	return nil
})

p := New[int](2, worker)
p.Go(context.Background())

// submit items asynchronously
go func() {
	for i := 1; i <= 3; i++ {
		p.Submit(i)
	}
	p.Close(context.Background())
	collector.Close()
}()

// collect all values first
var values []string
for val, err := range collector.Iter() {
	if err != nil {
		fmt.Printf("error: %v\n", err)
		continue
	}
	values = append(values, val)
}

// sort and print values for deterministic output
sort.Strings(values)
for _, val := range values {
	fmt.Printf("processed: %s\n", val)
}
Output:

processed: value 1
processed: value 2
processed: value 3
Example (WithContext)
started := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

worker := WorkerFunc[int](func(ctx context.Context, v int) error {
	close(started) // signal that worker started
	<-ctx.Done()   // wait for cancellation
	return ctx.Err()
})

p := New[int](1, worker).WithBatchSize(0) // disable batching
p.Go(ctx)
p.Submit(1)

<-started // wait for worker to start
cancel()  // cancel context
err := p.Close(context.Background())
fmt.Printf("got error: %v\n", err != nil)
Output:

got error: true
Example (WithError)
// collect output to ensure deterministic order
var out []string
var mu sync.Mutex

worker := WorkerFunc[int](func(_ context.Context, v int) error {
	if v == 0 {
		return fmt.Errorf("zero value not allowed")
	}
	mu.Lock()
	out = append(out, fmt.Sprintf("processed: %d", v))
	mu.Unlock()
	return nil
})

p := New[int](1, worker).WithContinueOnError() // don't stop on errors
p.Go(context.Background())

p.Submit(1)
p.Submit(0) // this will fail but processing continues
p.Submit(2)

err := p.Close(context.Background())
if err != nil {
	mu.Lock()
	out = append(out, fmt.Sprintf("finished with error: %v", err))
	mu.Unlock()
}

// print collected output in sorted order
sort.Strings(out)
for _, s := range out {
	fmt.Println(s)
}
Output:

finished with error: total errors: 1, last error: worker 0 failed: zero value not allowed
processed: 1
processed: 2
Example (WithRouting)
// collect output with sync.Map for thread safety
var out sync.Map

worker := WorkerFunc[int](func(ctx context.Context, v int) error {
	out.Store(v, fmt.Sprintf("worker %d got %d", metrics.WorkerID(ctx), v))
	return nil
})

// create pool with chunk function that routes based on even/odd
p := New[int](2, worker).WithChunkFn(func(v int) string {
	if v%2 == 0 {
		return "even"
	}
	return "odd"
},
)
p.Go(context.Background())

// Submit all numbers
for i := 1; i <= 4; i++ {
	p.Submit(i)
}

p.Close(context.Background())

// print in order to ensure deterministic output
for i := 1; i <= 4; i++ {
	if v, ok := out.Load(i); ok {
		fmt.Println(v)
	}
}
Output:

worker 0 got 1
worker 1 got 2
worker 0 got 3
worker 1 got 4
Example (WorkerTypes)
// These two workers are functionally equivalent:
// 1. Implementing Worker interface explicitly
// 2. Using WorkerFunc adapter - same thing, just shorter
workerFn := WorkerFunc[string](func(_ context.Context, v string) error {
	fmt.Printf("processed: %s\n", v)
	return nil
})

// Run first pool to completion
p1 := New[string](1, &processingWorker{})
p1.Go(context.Background())
p1.Submit("task1")
p1.Close(context.Background())

// Then run second pool
p2 := New[string](1, workerFn)
p2.Go(context.Background())
p2.Submit("task2")
p2.Close(context.Background())
Output:

processed: task1
processed: task2

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Collector

type Collector[V any] struct {
	// contains filtered or unexported fields
}

Collector provides synchronous access to async data from pool's response channel

func NewCollector

func NewCollector[V any](ctx context.Context, size int) *Collector[V]

NewCollector creates a new collector with a given context and buffer size for the channel

func (*Collector[V]) All

func (c *Collector[V]) All() (res []V, err error)

All gets all data from the collector

func (*Collector[V]) Close

func (c *Collector[V]) Close()

Close closes the collector

func (*Collector[V]) Iter

func (c *Collector[V]) Iter() iter.Seq2[V, error]

Iter returns an iterator over collector values

func (*Collector[V]) Submit

func (c *Collector[V]) Submit(v V)

Submit sends a value to the collector

type GroupCompleteFn added in v0.6.0

type GroupCompleteFn[T any] func(ctx context.Context) error

GroupCompleteFn called once when all workers are done

type Middleware added in v0.4.0

type Middleware[T any] func(Worker[T]) Worker[T]

Middleware wraps worker and adds functionality

type Send

type Send[T any] func(val T) error

Send func called by worker code to publish results

type Worker

type Worker[T any] interface {
	Do(ctx context.Context, v T) error
}

Worker is the interface that wraps the Submit method.

type WorkerCompleteFn added in v0.6.0

type WorkerCompleteFn[T any] func(ctx context.Context, id int, worker Worker[T]) error

WorkerCompleteFn called on worker completion

type WorkerFunc

type WorkerFunc[T any] func(ctx context.Context, v T) error

WorkerFunc is an adapter to allow the use of ordinary functions as Workers.

func (WorkerFunc[T]) Do

func (f WorkerFunc[T]) Do(ctx context.Context, v T) error

Do calls f(ctx, v).

type WorkerGroup

type WorkerGroup[T any] struct {
	// contains filtered or unexported fields
}

WorkerGroup represents a pool of workers processing items in parallel. Supports both direct item processing and batching modes.

func New

func New[T any](size int, worker Worker[T]) *WorkerGroup[T]

New creates a worker pool with a shared worker instance. All goroutines share the same worker, suitable for stateless processing.

func NewStateful added in v0.2.0

func NewStateful[T any](size int, maker func() Worker[T]) *WorkerGroup[T]

NewStateful creates a worker pool where each goroutine gets its own worker instance. Suitable for operations requiring state (e.g., database connections).

func (*WorkerGroup[T]) Close

func (p *WorkerGroup[T]) Close(ctx context.Context) error

Close pool. Has to be called by consumer as the indication of "all records submitted". The call is blocking till all processing completed by workers. After this call poll can't be reused. Returns an error if any happened during the run

func (*WorkerGroup[T]) Go

func (p *WorkerGroup[T]) Go(ctx context.Context) error

Go activates the pool and starts worker goroutines. Must be called before submitting items.

func (*WorkerGroup[T]) Metrics

func (p *WorkerGroup[T]) Metrics() *metrics.Value

Metrics returns combined metrics from all workers

func (*WorkerGroup[T]) Send added in v0.6.0

func (p *WorkerGroup[T]) Send(v T)

Send adds an item to the pool for processing. Safe for concurrent use, intended for worker-to-pool submissions or for use by multiple concurrent producers.

func (*WorkerGroup[T]) Submit

func (p *WorkerGroup[T]) Submit(v T)

Submit adds an item to the pool for processing. May block if worker channels are full. Not thread-safe, intended for use by the main thread ot a single producer's thread.

func (*WorkerGroup[T]) Use added in v0.4.0

func (p *WorkerGroup[T]) Use(middlewares ...Middleware[T]) *WorkerGroup[T]

Use applies middlewares to the worker group's worker. Middlewares are applied in the same order as they are provided, matching the HTTP middleware pattern in Go. The first middleware is the outermost wrapper, and the last middleware is the innermost wrapper closest to the original worker.

func (*WorkerGroup[T]) Wait

func (p *WorkerGroup[T]) Wait(ctx context.Context) error

Wait till workers completed and the result channel closed.

func (*WorkerGroup[T]) WithBatchSize added in v0.3.0

func (p *WorkerGroup[T]) WithBatchSize(size int) *WorkerGroup[T]

WithBatchSize enables item batching with specified size. Items are accumulated until batch is full before processing. Set to 0 to disable batching. Default: 10

func (*WorkerGroup[T]) WithChunkFn added in v0.3.0

func (p *WorkerGroup[T]) WithChunkFn(fn func(T) string) *WorkerGroup[T]

WithChunkFn enables predictable item distribution. Items with the same key (returned by fn) are processed by the same worker. Useful for maintaining order within groups of related items. Default: none (random distribution)

func (*WorkerGroup[T]) WithContinueOnError added in v0.3.0

func (p *WorkerGroup[T]) WithContinueOnError() *WorkerGroup[T]

WithContinueOnError sets whether the pool should continue on error. Default: false

func (*WorkerGroup[T]) WithPoolCompleteFn added in v0.6.0

func (p *WorkerGroup[T]) WithPoolCompleteFn(fn GroupCompleteFn[T]) *WorkerGroup[T]

WithPoolCompleteFn sets callback executed once when all workers are done

func (*WorkerGroup[T]) WithWorkerChanSize added in v0.3.0

func (p *WorkerGroup[T]) WithWorkerChanSize(size int) *WorkerGroup[T]

WithWorkerChanSize sets channel buffer size for each worker. Larger sizes can help with bursty workloads but increase memory usage. Default: 1

func (*WorkerGroup[T]) WithWorkerCompleteFn added in v0.6.0

func (p *WorkerGroup[T]) WithWorkerCompleteFn(fn WorkerCompleteFn[T]) *WorkerGroup[T]

WithWorkerCompleteFn sets callback executed on worker completion. Useful for cleanup or finalization of worker resources. Default: none (disabled)

type WorkerMaker

type WorkerMaker[T any] func() Worker[T]

WorkerMaker is a function that returns a new Worker

Directories

Path Synopsis
Package metrics provides a way to collect metrics in a thread-safe way
Package metrics provides a way to collect metrics in a thread-safe way
Package middleware provides common middleware implementations for the pool package.
Package middleware provides common middleware implementations for the pool package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL