Documentation
¶
Overview ¶
Package stage provides a framework for creating stages
Stage ¶
## Overview
The `stage` package is a fundamental building block of the ETL project, providing a flexible and powerful mechanism for defining and executing individual stages within a pipeline. A stage represents a specific data processing step, consisting of a converter and one or more processors, which work together to transform and enhance the data flowing through the pipeline.
## What's a Stage?
A stage is a self-contained unit of data processing within a pipeline. It encapsulates a converter and a set of processors that operate on the data sequentially. The converter is responsible for transforming the data from one format to another, while the processors perform specific operations on the data, such as filtering, aggregating, or enriching.
Stages are highly modular and reusable, allowing them to be easily combined and composed to create complex data processing workflows.
## How It Works
At the core of the `stage` package is the `IStage` interface, which defines the contract for a stage. The `Stage` struct implements this interface, providing the necessary functionality for creating and running stages.
To create a stage, you instantiate a new `Stage` using the `New` factory function, specifying the stage name, description, converter, and a variadic list of processors. The converter is defined using the `converter.IConverter` interface, while the processors are defined using the `processor.IProcessor` interface.
When the `Run` method is called on a stage, it executes the following steps:
1. It iterates through the processors sequentially, passing the output of each processor as the input to the next one. This ensures that the data is processed in a sequential manner, allowing for data dependencies and ordering.
2. After all the processors have finished executing, the stage applies the converter to transform the processed data into the desired output format. The converter operates concurrently on the data using the `concurrentloop` package, enabling efficient parallel processing.
3. Finally, the stage returns the converted data as a `task.Task`, which encapsulates both the processed and converted data.
Throughout the execution, the stage maintains comprehensive observability, including metrics, logging, and tracing, to monitor and debug the stage's performance and behavior.
## Features
1. **Modularity and Reusability**: Stages are designed to be modular and reusable, allowing for easy composition and combination to create complex data processing workflows.
2. **Sequential Processing**: The stage executes processors sequentially, ensuring that data dependencies and ordering are maintained. This is particularly useful when the output of one processor depends on the output of a previous processor.
3. **Concurrent Conversion**: The stage applies the converter concurrently to the processed data, leveraging the `concurrentloop` package for efficient parallel processing. This improves the overall performance of the stage.
4. **Observability**: The stage package provides comprehensive observability features, including metrics, logging, and tracing, to monitor and debug the stage's execution.
5. **Metrics**: Stage metrics are exposed using the `expvar` package, allowing for easy integration with monitoring systems. Metrics include counters for created, running, failed, and done stages, as well as duration, progress, and progress percentage.
6. **Logging**: The package utilizes the `sypl` library for structured logging, providing rich context and consistent log levels throughout the codebase. Log messages include relevant information such as stage status, counters, duration, and progress.
7. **Tracing**: Tracing is implemented using the `customapm` package, which integrates with Elastic APM (Application Performance Monitoring) under the hood. This enables distributed tracing of the stage's execution, allowing developers to gain deep insights into the performance and behavior of their stages.
8. **Error Handling**: The stage package includes robust error handling mechanisms, with detailed error messages and proper propagation of errors throughout the stage's execution.
9. **Progress Tracking**: The package provides progress tracking capabilities, including absolute progress and percentage completion, enabling real-time monitoring of the stage's execution.
10. **Flexible Configuration**: Stages can be configured with various options, such as custom converters, processors, and on-finished callbacks, using a functional options pattern.
11. **Thorough Testing**: The codebase includes comprehensive unit tests, ensuring the reliability and correctness of the stage functionality. The tests cover various scenarios, including the usage of different processors and converters.
12. **Well-Documented**: The code is thoroughly documented, with clear comments explaining the purpose and functionality of each component. The package also includes usage examples and test cases.
13. **Idiomatic Go**: The codebase follows idiomatic Go practices, leveraging the language's features and conventions for clean and efficient code.
14. **Customizable**: The stage package provides a high level of customization through the use of interfaces and generic types. Developers can easily create custom converters and processors to meet their specific data processing requirements.
## Architectural Modularity and Flexibility
The stage package is designed with architectural modularity and flexibility in mind. It leverages Go's interfaces and generic types to provide a highly extensible and customizable stage framework.
The `IStage` interface defines the contract for a stage, allowing for easy integration of custom stage implementations. The `converter.IConverter` and `processor.IProcessor` interfaces enable the creation of custom converters and processors, respectively.
The use of generic types for `ProcessingData` and `ConvertedData` allows stages to handle various data types, making the package adaptable to different data processing scenarios.
The functional options pattern, used in the `New` factory function and various configuration methods, provides a clean and flexible way to customize stage behavior without modifying the core stage struct.
## Applied Best Practices
The stage package adheres to best practices and idiomatic Go programming principles:
- **Interface-Driven Design**: The package heavily relies on interfaces, such as `IStage`, `converter.IConverter`, and `processor.IProcessor`, to provide abstraction and extensibility. This allows for easy integration of custom implementations and facilitates testing.
- **Functional Options**: The package utilizes the functional options pattern for stage configuration, providing a clean and flexible way to customize stage behavior.
- **Error Handling**: The package follows Go's error handling conventions, returning errors from functions and methods when necessary. Errors are propagated and handled appropriately throughout the stage's execution.
- **Testing**: The package includes comprehensive unit tests, covering various scenarios and edge cases. The tests ensure the correctness and reliability of the stage functionality.
- **Naming Conventions**: The codebase follows Go's naming conventions, using descriptive and meaningful names for variables, functions, and types.
- **Code Organization**: The package is well-organized, with separate files for different components and concerns. This promotes code readability and maintainability.
By applying these best practices, the stage package maintains a high level of code quality, reliability, and ease of use.
Index ¶
- Constants
- type Func
- type IStage
- type OnFinished
- type Stage
- func (s *Stage[ProcessingData, ConvertedData]) GetCounterCreated() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetCounterDone() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetCounterFailed() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetCounterRunning() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetCreatedAt() time.Time
- func (s *Stage[ProcessingData, ConvertedData]) GetDescription() string
- func (s *Stage[ProcessingData, ConvertedData]) GetDuration() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetLogger() sypl.ISypl
- func (s *Stage[ProcessingData, ConvertedData]) GetMetrics() map[string]string
- func (s *Stage[ProcessingData, ConvertedData]) GetName() string
- func (s *Stage[ProcessingData, ConvertedData]) GetOnFinished() OnFinished[ProcessingData, ConvertedData]
- func (s *Stage[ProcessingData, ConvertedData]) GetProgress() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetProgressPercent() *expvar.String
- func (s *Stage[ProcessingData, ConvertedData]) GetStatus() *expvar.String
- func (s *Stage[ProcessingData, ConvertedData]) GetType() string
- func (s *Stage[ProcessingData, ConvertedData]) Run(ctx context.Context, tsk task.Task[ProcessingData, ConvertedData]) (task.Task[ProcessingData, ConvertedData], error)
- func (s *Stage[ProcessingData, ConvertedData]) SetOnFinished(onFinished OnFinished[ProcessingData, ConvertedData])
- func (s *Stage[ProcessingData, ConvertedData]) SetProgressPercent()
Examples ¶
Constants ¶
const Type = "stage"
Type of the entity.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Func ¶
type Func[ProcessedData, ConvertedOut any] func(p IStage[ProcessedData, ConvertedOut]) IStage[ProcessedData, ConvertedOut]
Func allows to specify message's options.
func WithOnFinished ¶
func WithOnFinished[ProcessedData, ConvertedOut any](onFinished OnFinished[ProcessedData, ConvertedOut]) Func[ProcessedData, ConvertedOut]
WithOnFinished sets the OnFinished function.
type IStage ¶
type IStage[ProcessedData, ConvertedOut any] interface { shared.IMeta shared.IMetrics // GetProgress returns the `CounterProgress` of the stage. GetProgress() *expvar.Int // GetProgressPercent returns the `ProgressPercent` of the stage. GetProgressPercent() *expvar.String // SetProgressPercent sets the `ProgressPercent` of the stage. SetProgressPercent() // GetOnFinished returns the `OnFinished` function. GetOnFinished() OnFinished[ProcessedData, ConvertedOut] // SetOnFinished sets the `OnFinished` function. SetOnFinished(onFinished OnFinished[ProcessedData, ConvertedOut]) // Run the stage function. Run( ctx context.Context, task task.Task[ProcessedData, ConvertedOut], ) (task.Task[ProcessedData, ConvertedOut], error) }
IStage defines what a `Stage` must do.
func New ¶
func New[ProcessingData, ConvertedData any]( name string, description string, conversor converter.IConverter[ProcessingData, ConvertedData], processors ...processor.IProcessor[ProcessingData], ) (IStage[ProcessingData, ConvertedData], error)
New returns a new stage.
Example (Storage_processor) ¶
Demonstrates the usage of a stage with the storage processor (memory storage).
// Sample data. tests := []Test{ { ID: "1", Name: "John", }, { ID: "2", Name: "Peter", }, } // Memory storage from DAL. memoryStorage, err := memory.New(context.Background()) if err != nil { log.Fatalln(err) } // Storage processor, concurrency set to 1. s, err := storage.New[Test](memoryStorage, 1, "example-") if err != nil { log.Fatalln(err) } // Stage with the storage processor. stg1, err := New( "stage-1", "main stage", // Add as many as you want. passthru.Must[Test](), // Pass-through, does nothing. // Add as many as you want. s, ) if err != nil { log.Fatalln(err) } // Run the stage passing the processing data as a task. tasksOut, err := stg1.Run(context.Background(), task.Task[Test, Test]{ ProcessingData: tests, }) if err != nil { log.Fatalln(err) } // String builder to contain the output of the stage and the memory storage. var buf strings.Builder // Iterate over tasksOut and write to the buffer. for _, v := range tasksOut.ConvertedData { buf.WriteString(fmt.Sprintf("%s %s\n", v.ID, v.Name)) } // Get a list from the memory storage. // // NOTE: The usage of memory.ResponseList[Test] wrapper. var fromMemory memory.ResponseList[Test] if err := memoryStorage.List(context.Background(), "etl", &fromMemory, &list.List{}); err != nil { log.Fatalln(err) } // Iterate over fromMemory so we can add to the buffer. for _, v := range fromMemory.Items { buf.WriteString(fmt.Sprintf("%s %s\n", v.ID, v.Name)) } // Get the content of the buffer. bufferContent := buf.String() ////// // Check in the buffer if the name John and Peter appears 2 times. ////// if strings.Count(bufferContent, "John") == 2 { fmt.Println("John appears 2 times") } if strings.Count(bufferContent, "Peter") == 2 { fmt.Println("Peter appears 2 times") }
Output: John appears 2 times Peter appears 2 times
type OnFinished ¶
type OnFinished[ProcessedData, ConvertedOut any] func(ctx context.Context, s IStage[ProcessedData, ConvertedOut], tskIn task.Task[ProcessedData, ConvertedOut], tskOut task.Task[ProcessedData, ConvertedOut])
OnFinished is the function that is called when a processor finishes its execution.
type Stage ¶
type Stage[ProcessingData, ConvertedData any] struct { // Description of the stage. Description string `json:"description"` // Conversor to be used tsk the stage. Conversor converter.IConverter[ProcessingData, ConvertedData] `json:"-" validate:"required"` // Logger is the internal logger. Logger sypl.ISypl `json:"-" validate:"required"` // Name of the stage. Name string `json:"name" validate:"required"` // OnFinished is the function that is called when a processor finishes its // execution. OnFinished OnFinished[ProcessingData, ConvertedData] `json:"-"` // Processors to be run tsk the stage. Processors []processor.IProcessor[ProcessingData] `json:"processors" validate:"required,gt=0"` // Metrics. CounterCreated *expvar.Int `json:"counterCreated"` CounterDone *expvar.Int `json:"counterDone"` CounterFailed *expvar.Int `json:"counterFailed"` CounterRunning *expvar.Int `json:"counterRunning"` CreatedAt time.Time `json:"createdAt"` Duration *expvar.Int `json:"duration"` Progress *expvar.Int `json:"progress"` ProgressPercent *expvar.String `json:"progressPercent"` Status *expvar.String `json:"status"` }
Stage definition.
func (*Stage[ProcessingData, ConvertedData]) GetCounterCreated ¶
GetCounterCreated returns the `CounterCreated` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetCounterDone ¶
GetCounterDone returns the `CounterDone` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetCounterFailed ¶
GetCounterFailed returns the `CounterFailed` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetCounterRunning ¶
GetCounterRunning returns the `CounterRunning` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetCreatedAt ¶ added in v2.0.6
GetCreatedAt returns the created at time.
func (*Stage[ProcessingData, ConvertedData]) GetDescription ¶
GetDescription returns the `Description` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetDuration ¶ added in v2.0.6
GetDuration returns the `CounterDuration` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetLogger ¶
GetLogger returns the `Logger` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetMetrics ¶ added in v2.0.7
GetMetrics returns the stage's metrics.
func (*Stage[ProcessingData, ConvertedData]) GetOnFinished ¶
func (s *Stage[ProcessingData, ConvertedData]) GetOnFinished() OnFinished[ProcessingData, ConvertedData]
GetOnFinished returns the `OnFinished` function.
func (*Stage[ProcessingData, ConvertedData]) GetProgress ¶ added in v2.0.6
GetProgress returns the `CounterProgress` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetProgressPercent ¶ added in v2.0.6
GetProgressPercent returns the `ProgressPercent` of the stage.
func (*Stage[ProcessingData, ConvertedData]) Run ¶
func (s *Stage[ProcessingData, ConvertedData]) Run(ctx context.Context, tsk task.Task[ProcessingData, ConvertedData]) (task.Task[ProcessingData, ConvertedData], error)
Run the transform function.
func (*Stage[ProcessingData, ConvertedData]) SetOnFinished ¶
func (s *Stage[ProcessingData, ConvertedData]) SetOnFinished(onFinished OnFinished[ProcessingData, ConvertedData])
SetOnFinished sets the `OnFinished` function.
func (*Stage[ProcessingData, ConvertedData]) SetProgressPercent ¶ added in v2.0.6
func (s *Stage[ProcessingData, ConvertedData]) SetProgressPercent()
SetProgressPercent sets the `ProgressPercent` of the stage.