Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Closable ¶
type Closable interface {
Close(ctx StreamContext) error
}
type DefaultSourceTuple ¶
type DefaultSourceTuple struct {
// contains filtered or unexported fields
}
func NewDefaultSourceTuple ¶
func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple
func (*DefaultSourceTuple) Message ¶
func (t *DefaultSourceTuple) Message() map[string]interface{}
func (*DefaultSourceTuple) Meta ¶
func (t *DefaultSourceTuple) Meta() map[string]interface{}
type Function ¶
type Function interface { //The argument is a list of xsql.Expr Validate(args []interface{}) error //Execute the function, return the result and if execution is successful. //If execution fails, return the error and false. Exec(args []interface{}, ctx FunctionContext) (interface{}, bool) //If this function is an aggregate function. Each parameter of an aggregate function will be a slice IsAggregate() bool }
type FunctionContext ¶
type FunctionContext interface { StreamContext GetFuncId() int }
type Logger ¶
type Logger interface { Debug(args ...interface{}) Info(args ...interface{}) Warn(args ...interface{}) Error(args ...interface{}) Debugln(args ...interface{}) Infoln(args ...interface{}) Warnln(args ...interface{}) Errorln(args ...interface{}) Debugf(format string, args ...interface{}) Infof(format string, args ...interface{}) Warnf(format string, args ...interface{}) Errorf(format string, args ...interface{}) }
type Operator ¶
type Operator interface { Emitter Collector Exec(StreamContext, chan<- error) GetName() string GetMetrics() [][]interface{} }
type Rewindable ¶
type Rule ¶
type Rule struct { Triggered bool `json:"triggered"` Id string `json:"id"` Sql string `json:"sql"` Actions []map[string]interface{} `json:"actions"` Options *RuleOption `json:"options"` }
type RuleOption ¶
type RuleOption struct { IsEventTime bool `json:"isEventTime" yaml:"isEventTime"` LateTol int64 `json:"lateTolerance" yaml:"lateTolerance"` Concurrency int `json:"concurrency" yaml:"concurrency"` BufferLength int `json:"bufferLength" yaml:"bufferLength"` SendMetaToSink bool `json:"sendMetaToSink" yaml:"sendMetaToSink"` Qos Qos `json:"qos" yaml:"qos"` CheckpointInterval int `json:"checkpointInterval" yaml:"checkpointInterval"` }
type Sink ¶
type Sink interface { //Should be sync function for normal case. The container will run it in go func Open(ctx StreamContext) error //Called during initialization. Configure the sink with the properties from rule action definition Configure(props map[string]interface{}) error //Called when each row of data has transferred to this sink Collect(ctx StreamContext, data interface{}) error Closable }
type Source ¶
type Source interface { //Should be sync function for normal case. The container will run it in go func Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error) //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties //read from the yaml Configure(datasource string, jsonConfig string, props map[string]interface{}) error Closable }
type SourceTuple ¶
type StreamContext ¶
type StreamContext interface { context.Context GetLogger() Logger GetRuleId() string GetOpId() string GetInstanceId() int WithMeta(ruleId string, opId string, store Store) StreamContext WithInstance(instanceId int) StreamContext WithCancel() (StreamContext, context.CancelFunc) SetError(e error) //State handling IncrCounter(key string, amount int) error GetCounter(key string) (int, error) PutState(key string, value interface{}) error GetState(key string) (interface{}, error) DeleteState(key string) error }
Click to show internal directories.
Click to hide internal directories.