Documentation
¶
Overview ¶
Package cogroup provides a elegant goroutine group with context controls. It's designed to meet the following requirements.
- Tasks can be executed without order
- Group `wait` command will close the write acces to the task queue
- Upstream context can cancel the task queue
- When the context is canceled, the tasks in queue will be no longer consumed
- Panic recover for a single task execution
- Only spawn specified number of goroutines to consume the task
- `Wait` will block until tasks are finished or canceled, and return with the queue length
Index ¶
- func GetWorkerID(ctx context.Context) int
- type CoGroup
- func (g *CoGroup) Add(f func(context.Context) error)
- func (g *CoGroup) GetWorkers() int
- func (g *CoGroup) Insert(f func(context.Context) error) (success bool)
- func (g *CoGroup) Reset()
- func (g *CoGroup) Size() int
- func (g *CoGroup) StartWithWorker(worker Worker)
- func (g *CoGroup) TryInsert(f func(context.Context) error) (success bool)
- func (g *CoGroup) Wait() int
- type Worker
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetWorkerID ¶ added in v0.1.6
GetWorkerID Get worker id from the context
Types ¶
type CoGroup ¶
type CoGroup struct {
// contains filtered or unexported fields
}
CoGroup Coroutine group struct holds the group state: the task queue, context and signals.
func Start ¶
Start will initialize a cogroup and start the group goroutines.
Parameter `n` specifies the number the goroutine to start as workers to consume the task queue.
Parameter `m` specifies the size of the task queue buffer, if the buffer is full, the `Insert` method will block till there's more room or a cancel signal was received.
Parameter `sink` specifies whether to pass the group context to the task.
Example ¶
package main import ( "context" "time" "github.com/devfans/cogroup" ) func main() { f := func(context.Context) error { <-time.After(time.Second) return nil } g := cogroup.Start(context.Background(), 2, 10, false) for i := 0; i < 10; i++ { g.Add(f) } g.Wait() }
Output:
Example (StartWithCancelContext) ¶
package main import ( "context" "time" "github.com/devfans/cogroup" ) func main() { f := func(ctx context.Context) error { <-time.After(time.Second) workerID := cogroup.GetWorkerID(ctx) println(workerID, " did one task") return nil } ctx, cancel := context.WithCancel(context.Background()) g := cogroup.Start(ctx, 2, 10, false) go func() { <-time.After(1 * time.Second) cancel() }() for i := 0; i < 100; i++ { g.Add(f) } println("Tasks left:", g.Wait()) }
Output:
func (*CoGroup) GetWorkers ¶ added in v0.1.4
GetWorkers Get the number of total group workers
func (*CoGroup) Insert ¶
Insert a task into the task queue with blocking if the task queue buffer is full. If the group context was canceled already, it will abort with a false return.
func (*CoGroup) Reset ¶
func (g *CoGroup) Reset()
Reset the cogroup, it will call the group `Wait` first before do a internal reset.
func (*CoGroup) StartWithWorker ¶ added in v1.0.1
StartWithWorker will register customized worker and start the group goroutines
If worker is `nil`, the default plain worker will be used.