
package module
v5.1.1-rc1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2024 License: Apache-2.0 Imports: 51 Imported by: 38


The Golang Implementation of Apache RocketMQ Client


Here is the golang implementation of the client for Apache RocketMQ.


We build the following protocols described in rocketmq-apis on top of gRPC-go, utilizing Protocol buffers to serialize and deserialize data in transmission.

Quick Start


With Go modules(Go 1.11+), simply add the following import to your code, and then go [build|run|test] will automatically fetch the necessary dependencies.

import "github.com/apache/rocketmq-clients/golang"

Otherwise, to install the golang package, run the following command:

go get -u github.com/apache/rocketmq-clients/golang/v5




View Source
const (
	CLIENT_LOG_ROOT     = "rocketmq.client.logRoot"
	CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex"
	CLIENT_LOG_FILESIZE = "rocketmq.client.logFileMaxSize"
	CLIENT_LOG_LEVEL    = "rocketmq.client.logLevel"
	// CLIENT_LOG_ADDITIVE        = "rocketmq.client.log.additive"
	CLIENT_LOG_FILENAME = "rocketmq.client.logFileName"
	// CLIENT_LOG_ASYNC_QUEUESIZE = "rocketmq.client.logAsyncQueueSize"
	ENABLE_CONSOLE_APPENDER = "mq.consoleAppender.enabled"
View Source
const (
	MESSAGE_ID_VERSION_V0             string = "00"
	MESSAGE_ID_VERSION_V1             string = "01"
View Source
const (
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION           = "messaging.rocketmq.operation"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE           = "messaging.rocketmq.namespace"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG                 = "messaging.rocketmq.message_tag"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS                = "messaging.rocketmq.message_keys"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_ID           = "messaging.rocketmq.client_id"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE        = "messaging.rocketmq.message_type"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP        = "messaging.rocketmq.client_group"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT             = "messaging.rocketmq.attempt"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE          = "messaging.rocketmq.batch_size"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP  = "messaging.rocketmq.delivery_timestamp"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP = "messaging.rocketmq.available_timestamp"
	SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY          = "messaging.rocketmq.access_key"




	// Messaging span attribute name list
	SPAN_ATTRIBUTE_KEY_MESSAGING_SYSTEM             = "messaging.system"
	SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION        = "messaging.destination"
	SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL           = "messaging.protocol"
	SPAN_ATTRIBUTE_KEY_MESSAGING_URL                = "messaging.url"
	SPAN_ATTRIBUTE_KEY_MESSAGING_ID                 = "messaging.message_id"
	SPAN_ATTRIBUTE_KEY_MESSAGING_PAYLOAD_SIZE_BYTES = "messaging.message_payload_size_bytes"
	SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION          = "messaging.operation"



	// Span annotation
	SPAN_ANNOTATION_MESSAGE_KEYS      = "__message_keys"

RocketMQ span attribute name list

View Source
const (


View Source
var (
	MLatencyMs = stats.Int64("publish_latency", "Publish latency in milliseconds", "ms")

	PublishLatencyView = view.View{
		Name:        "rocketmq_send_cost_time",
		Description: "Publish latency",
		Measure:     MLatencyMs,
		Aggregation: view.Distribution(1, 5, 10, 20, 50, 200, 500),
		TagKeys:     []tag.Key{topicTag, clientIdTag, invocationStatusTag},
View Source
var (
	ErrNoAvailableBrokers = errors.New("rocketmq: no available brokers")
View Source
var (
	ErrNoAvailableEndpoints = errors.New("rocketmq: no available endpoints")
View Source
var NewClient = func(config *Config, opts ...ClientOption) (Client, error) {
	endpoints, err := utils.ParseTarget(config.Endpoint)
	if err != nil {
		return nil, err
	cli := &defaultClient{
		config:                        config,
		opts:                          defaultNSOptions,
		clientID:                      utils.GenClientID(),
		accessPoint:                   endpoints,
		messageInterceptors:           make([]MessageInterceptor, 0),
		endpointsTelemetryClientTable: make(map[string]*defaultClientSession),
		on:                            *atomic.NewBool(true),
	cli.log = sugarBaseLogger.With("client_id", cli.clientID)
	for _, opt := range opts {
	cli.done = make(chan struct{}, 1)
	cli.clientMeterProvider = NewDefaultClientMeterProvider(cli)
	return cli, nil
View Source
var NewClientConcrete = func(config *Config, opts ...ClientOption) (*defaultClient, error) {
	endpoints, err := utils.ParseTarget(config.Endpoint)
	if err != nil {
		return nil, err
	cli := &defaultClient{
		config:                        config,
		opts:                          defaultNSOptions,
		clientID:                      utils.GenClientID(),
		accessPoint:                   endpoints,
		messageInterceptors:           make([]MessageInterceptor, 0),
		endpointsTelemetryClientTable: make(map[string]*defaultClientSession),
		on:                            *atomic.NewBool(true),
		clientManager:                 &MockClientManager{},
	cli.log = sugarBaseLogger.With("client_id", cli.clientID)
	for _, opt := range opts {
	cli.done = make(chan struct{}, 1)
	cli.clientMeterProvider = NewDefaultClientMeterProvider(cli)
	return cli, nil
View Source
var NewClientConn = func(endpoint string, opts ...ConnOption) (ClientConn, error) {
	client := &clientConn{
		opts:     defaultConnOptions,
		validate: validator.New(),
	if len(endpoint) == 0 {
		return nil, ErrNoAvailableEndpoints
	for _, opt := range opts {

	baseCtx := context.TODO()
	if client.opts.Context != nil {
		baseCtx = client.opts.Context

	ctx, cancel := context.WithCancel(baseCtx)

	client.ctx = ctx
	client.cancel = cancel
	client.creds = credentials.NewTLS(client.opts.TLS)

	if client.opts.MaxCallSendMsgSize > 0 || client.opts.MaxCallRecvMsgSize > 0 {
		if client.opts.MaxCallRecvMsgSize > 0 && client.opts.MaxCallSendMsgSize > client.opts.MaxCallRecvMsgSize {
			return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", client.opts.MaxCallRecvMsgSize, client.opts.MaxCallSendMsgSize)
		if client.opts.MaxCallSendMsgSize > 0 {
			client.callOpts = append(client.callOpts, grpc.MaxCallSendMsgSize(client.opts.MaxCallSendMsgSize))
		if client.opts.MaxCallRecvMsgSize > 0 {
			client.callOpts = append(client.callOpts, grpc.MaxCallRecvMsgSize(client.opts.MaxCallRecvMsgSize))

	conn, err := client.dial(endpoint)
	if err != nil {
		return nil, err
	client.conn = conn

	return client, nil
View Source
var NewDefaultClientManager = func() *defaultClientManager {
	return &defaultClientManager{
		rpcClientTable: make(map[string]RpcClient),
		done:           make(chan struct{}),
		opts:           defaultClientManagerOptions,
View Source
var NewDefaultClientMeter = func(exporter view.Exporter, on bool, endpoints *v2.Endpoints, clientID string) *defaultClientMeter {
	return &defaultClientMeter{
		enabled:     *atomic.NewBool(on),
		endpoints:   endpoints,
		ocaExporter: exporter,
View Source
var NewDefaultClientMeterProvider = func(client *defaultClient) ClientMeterProvider {
	cmp := &defaultClientMeterProvider{
		client:      client,
		clientMeter: NewDefaultClientMeter(nil, false, nil, "nil"),
	return cmp
View Source
var NewDefaultMessageMeterInterceptor = func(clientMeterProvider ClientMeterProvider) *defaultMessageMeterInterceptor {
	return &defaultMessageMeterInterceptor{
		clientMeterProvider: clientMeterProvider,
View Source
var NewFilterExpression = func(expression string) *FilterExpression {
	return &FilterExpression{
		expression:     expression,
		expressionType: TAG,
View Source
var NewFilterExpressionWithType = func(expression string, expressionType FilterExpressionType) *FilterExpression {
	return &FilterExpression{
		expression:     expression,
		expressionType: expressionType,
View Source
var NewProducer = func(config *Config, opts ...ProducerOption) (Producer, error) {
	copyOpt := defaultProducerOptions
	po := &copyOpt
	for _, opt := range opts {
	cli, err := po.clientFunc(config)
	if err != nil {
		return nil, err
	p := &defaultProducer{
		po:      *po,
		cli:     cli.(*defaultClient),
		checker: po.checker,
	p.cli.initTopics = po.topics
	endpoints, err := utils.ParseTarget(config.Endpoint)
	if err != nil {
		return nil, err
	p.pSetting = &producerSettings{
		clientId:   p.cli.GetClientID(),
		endpoints:  endpoints,
		clientType: v2.ClientType_PRODUCER,
		retryPolicy: &v2.RetryPolicy{
			MaxAttempts: po.maxAttempts,
			Strategy: &v2.RetryPolicy_ExponentialBackoff{
				ExponentialBackoff: &v2.ExponentialBackoff{
					Max:        durationpb.New(time.Duration(0)),
					Initial:    durationpb.New(time.Duration(0)),
					Multiplier: 1,
		requestTimeout:      p.cli.opts.timeout,
		validateMessageType: *atomic.NewBool(true),
		maxBodySizeBytes:    *atomic.NewInt32(4 * 1024 * 1024),
	for _, topic := range po.topics {
		topicResource := &v2.Resource{
			Name:              topic,
			ResourceNamespace: config.NameSpace,
		p.pSetting.topics.Store(topic, topicResource)
	p.cli.settings = p.pSetting
	p.cli.clientImpl = p
	return p, nil
View Source
var NewPublishingLoadBalancer = func(messageQueues []*v2.MessageQueue) (PublishingLoadBalancer, error) {
	plb := &publishingLoadBalancer{
		messageQueues: messageQueues,
	return plb, nil
View Source
var NewPublishingMessage = func(msg *Message, namespace string, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) {
	if msg == nil {
		return nil, fmt.Errorf("message is nil")
	pMsg := &PublishingMessage{
		msg: msg,

	maxBodySizeBytes := int(settings.maxBodySizeBytes.Load())

	length := len(msg.Body)
	if length > maxBodySizeBytes {
		return nil, fmt.Errorf("message body size exceeds the threshold, max size=%d bytes", maxBodySizeBytes)

	pMsg.encoding = v2.Encoding_IDENTITY

	pMsg.namespace = namespace

	pMsg.messageId = GetMessageIdCodecInstance().NextMessageId().String()

	if msg.GetMessageGroup() == nil && msg.GetDeliveryTimestamp() == nil && !txEnabled {
		pMsg.messageType = v2.MessageType_NORMAL
		return pMsg, nil

	if msg.GetMessageGroup() != nil && !txEnabled {
		pMsg.messageType = v2.MessageType_FIFO
		return pMsg, nil

	if msg.GetDeliveryTimestamp() != nil && !txEnabled {
		pMsg.messageType = v2.MessageType_DELAY
		return pMsg, nil

	if msg.GetMessageGroup() == nil && msg.GetDeliveryTimestamp() == nil && txEnabled {
		pMsg.messageType = v2.MessageType_TRANSACTION
		return pMsg, nil

	return nil, fmt.Errorf("transactional message should not set messageGroup or deliveryTimestamp")
View Source
var NewRpcClient = func(target string, opts ...RpcClientOption) (RpcClient, error) {
	rc := &rpcClient{
		target: target,
		opts:   defaultRpcClientOptions,
	for _, opt := range opts {
	conn, err := rc.opts.clientConnFunc(target, rc.opts.connOptions...)
	if err != nil {
		return nil, fmt.Errorf("create grpc conn failed, err=%w", err)
	rc.conn = conn
	rc.msc = v2.NewMessagingServiceClient(conn.Conn())
	rc.activityNanoTime = time.Now()
	sugarBaseLogger.Infof("create rpc client success, target=%v", target)
	return rc, nil
View Source
var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (SimpleConsumer, error) {
	copyOpt := defaultSimpleConsumerOptions
	scOpts := &copyOpt
	for _, opt := range opts {
	if len(config.ConsumerGroup) == 0 {
		return nil, fmt.Errorf("consumerGroup could not be nil")
	cli, err := scOpts.clientFunc(config)
	if err != nil {
		return nil, err
	sc := &defaultSimpleConsumer{
		scOpts:    *scOpts,
		cli:       cli.(*defaultClient),
		groupName: config.ConsumerGroup,

		awaitDuration:           scOpts.awaitDuration,
		subscriptionExpressions: scOpts.subscriptionExpressions,
	if sc.subscriptionExpressions == nil {
		sc.subscriptionExpressions = make(map[string]*FilterExpression)
	sc.cli.initTopics = make([]string, 0)
	for topic := range scOpts.subscriptionExpressions {
		sc.cli.initTopics = append(sc.cli.initTopics, topic)
	endpoints, err := utils.ParseTarget(config.Endpoint)
	if err != nil {
		return nil, err
	sc.scSettings = &simpleConsumerSettings{
		clientId:       sc.cli.GetClientID(),
		endpoints:      endpoints,
		clientType:     v2.ClientType_SIMPLE_CONSUMER,
		requestTimeout: sc.cli.opts.timeout,

		groupName: &v2.Resource{
			Name:              sc.groupName,
			ResourceNamespace: config.NameSpace,
		longPollingTimeout:      scOpts.awaitDuration,
		subscriptionExpressions: scOpts.subscriptionExpressions,
	sc.cli.settings = sc.scSettings
	sc.cli.clientImpl = sc
	return sc, nil
View Source
var NewSubscriptionLoadBalancer = func(messageQueues []*v2.MessageQueue) (SubscriptionLoadBalancer, error) {
	slb := &subscriptionLoadBalancer{
		messageQueues: messageQueues,
	return slb, nil
View Source
var NewTransactionImpl = func(producerImpl Producer) *transactionImpl {
	return &transactionImpl{
		producerImpl: producerImpl,
		messages:     make(map[string]*PublishingMessage),
View Source
var SUB_ALL = NewFilterExpression("*")


func InitLogger

func InitLogger()

func NewDefaultClientSession

func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientSession, error)

func ResetLogger

func ResetLogger()


type Client

type Client interface {
	GetClientID() string
	Sign(ctx context.Context) context.Context
	GracefulStop() error

type ClientConn

type ClientConn interface {
	Conn() *grpc.ClientConn
	Close() error

type ClientConnFunc

type ClientConnFunc func(string, ...ConnOption) (ClientConn, error)

type ClientManager

type ClientManager interface {
	RegisterClient(client Client)
	UnRegisterClient(client Client)
	QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) (*v2.QueryRouteResponse, error)
	HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, duration time.Duration) (*v2.HeartbeatResponse, error)
	SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, duration time.Duration) (*v2.SendMessageResponse, error)
	Telemetry(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error)
	EndTransaction(ctx context.Context, endpoints *v2.Endpoints, request *v2.EndTransactionRequest, duration time.Duration) (*v2.EndTransactionResponse, error)
	NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, request *v2.NotifyClientTerminationRequest, duration time.Duration) (*v2.NotifyClientTerminationResponse, error)
	ReceiveMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error)
	AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, duration time.Duration) (*v2.AckMessageResponse, error)
	ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, request *v2.ChangeInvisibleDurationRequest, duration time.Duration) (*v2.ChangeInvisibleDurationResponse, error)

type ClientMeterProvider

type ClientMeterProvider interface {
	Reset(metric *v2.Metric)
	// contains filtered or unexported methods

type ClientOption

type ClientOption interface {
	// contains filtered or unexported methods

A ClientOption sets options such as timeout, etc.

func WithClientConnFunc

func WithClientConnFunc(f ClientConnFunc) ClientOption

WithClientConnFunc returns a Option that sets ClientConnFunc for nameserver. Default is NewClientConn.

func WithConnOptions

func WithConnOptions(opts ...ConnOption) ClientOption

WithConnOptions returns a Option that sets ConnOption for grpc ClientConn.

func WithQueryRouteTimeout

func WithQueryRouteTimeout(d time.Duration) ClientOption

WithQueryRouteTimeout returns a Option that sets timeout duration for nameserver. Default is 3s.

func WithRpcClientOptions

func WithRpcClientOptions(opts ...RpcClientOption) ClientOption

WithRpcClientOptions returns a Option that sets RpcClientOption for grpc ClientConn.

type ClientSettings

type ClientSettings interface {
	GetClientID() string
	GetClientType() v2.ClientType
	GetAccessPoint() *v2.Endpoints
	GetRetryPolicy() *v2.RetryPolicy
	GetRequestTimeout() time.Duration
	// contains filtered or unexported methods

type Config

type Config struct {
	Endpoint      string `validate:"required"`
	NameSpace     string
	ConsumerGroup string
	Credentials   *credentials.SessionCredentials `validate:"required"`

type ConnOption

type ConnOption interface {
	// contains filtered or unexported methods

A ConnOption sets options such as tls.Config, etc.

func WithContext

func WithContext(ctx context.Context) ConnOption

WithContext is the default client context; it can be used to cancel grpc dial out and other operations that do not have an explicit context.

func WithDialOptions

func WithDialOptions(dialOptions ...grpc.DialOption) ConnOption

WithDialOptions returns a ConnOption that sets grpc.DialOption for grpc.DialContext.

func WithDialTimeout

func WithDialTimeout(dur time.Duration) ConnOption

WithDialTimeout returns a ConnOption that sets DialTimeout for grpc.DialContext. Default it is 5 second.

func WithMaxCallRecvMsgSize

func WithMaxCallRecvMsgSize(size int) ConnOption

WithMaxCallRecvMsgSize returns a ConnOption that sets client-side request send limit in bytes for grpc.DialContext.

func WithMaxCallSendMsgSize

func WithMaxCallSendMsgSize(size int) ConnOption

WithMaxCallSendMsgSize returns a ConnOption that sets the client-side response receive limit. If 0, it defaults to "math.MaxInt32", because range response can easily exceed request send limits. Make sure that "MaxCallRecvMsgSize" >= server-side default send/recv limit.

func WithTLSConfig

func WithTLSConfig(tc *tls.Config) ConnOption

WithTLSConfig returns a ConnOption that sets tls.Config for grpc.DialContext. Default it is x509 insecure tls.Config.

func WithZapLogger

func WithZapLogger(logger *zap.Logger) ConnOption

type Consumer

type Consumer interface {
	GetGroupName() string
	// contains filtered or unexported methods

type ErrRpcStatus

type ErrRpcStatus struct {
	Code    int32
	Message string

func AsErrRpcStatus

func AsErrRpcStatus(err error) (*ErrRpcStatus, bool)

func (*ErrRpcStatus) Error

func (err *ErrRpcStatus) Error() string

func (*ErrRpcStatus) GetCode

func (err *ErrRpcStatus) GetCode() int32

func (*ErrRpcStatus) GetMessage

func (err *ErrRpcStatus) GetMessage() string

type FilterExpression

type FilterExpression struct {
	// contains filtered or unexported fields

type FilterExpressionType

type FilterExpressionType int32
const (
	SQL92 FilterExpressionType = iota

type InvocationStatus

type InvocationStatus string
const (
	InvocationStatus_SUCCESS InvocationStatus = "success"
	InvocationStatus_FAILURE InvocationStatus = "failure"

type Message

type Message struct {
	Topic string
	Body  []byte
	Tag   *string
	// contains filtered or unexported fields

func (*Message) AddProperty

func (msg *Message) AddProperty(key, value string)

func (*Message) GetDeliveryTimestamp

func (msg *Message) GetDeliveryTimestamp() *time.Time

func (*Message) GetKeys

func (msg *Message) GetKeys() []string

func (*Message) GetMessageCommon

func (msg *Message) GetMessageCommon() *MessageCommon

func (*Message) GetMessageGroup

func (msg *Message) GetMessageGroup() *string

func (*Message) GetProperties

func (msg *Message) GetProperties() map[string]string

func (*Message) GetTag

func (msg *Message) GetTag() *string

func (*Message) SetDelayTimestamp

func (msg *Message) SetDelayTimestamp(deliveryTimestamp time.Time)

func (*Message) SetKeys

func (msg *Message) SetKeys(keys ...string)

func (*Message) SetMessageGroup

func (msg *Message) SetMessageGroup(messageGroup string)

func (*Message) SetTag

func (msg *Message) SetTag(tag string)

type MessageCommon

type MessageCommon struct {
	// contains filtered or unexported fields

type MessageHookPoints

type MessageHookPoints int32
const (
	MessageHookPoints_SEND MessageHookPoints = iota

type MessageHookPointsStatus

type MessageHookPointsStatus int32
const (
	MessageHookPointsStatus_UNSET MessageHookPointsStatus = iota

type MessageId

type MessageId interface {
	// GetVersion Get the version of the messageId
	GetVersion() string
	// String string-formed string id
	String() string

MessageId Abstract message id

func NewMessageId

func NewMessageId(version, suffix string) MessageId

type MessageIdCodec

type MessageIdCodec interface {
	NextMessageId() MessageId
	Decode(messageId string) MessageId

* The codec for the message-id.

Codec here provides the following two functions:

1. Provide decoding function of message-id of all versions above v0.

2. Provide a generator of message-id of v1 version.

The message-id of versions above V1 consists of 17 bytes in total. The first two bytes represent the version number. For V1, these two bytes are 0x0001.

V1 message id example


V1 version message id generation rules

process id(lower 2bytes)

mac address(lower 6bytes) │ sequence number(big endian)

             ▲        │          ▲ (4bytes)
             │        │          │
       ┌─────┴─────┐ ┌┴┐ ┌───┐ ┌─┴─┐
0x01+  │     6     │ │2│ │ 4 │ │ 4 │
       └───────────┘ └─┘ └─┬─┘ └───┘
    seconds since 2021-01-01 00:00:00(UTC+0)
                  (lower 4bytes)

func GetMessageIdCodecInstance

func GetMessageIdCodecInstance() MessageIdCodec

type MessageInterceptor

type MessageInterceptor interface {
	// contains filtered or unexported methods

type MessageMeterInterceptor

type MessageMeterInterceptor interface {

type MessageView

type MessageView struct {
	ReceiptHandle string
	// contains filtered or unexported fields

func (*MessageView) GetBody

func (msg *MessageView) GetBody() []byte

func (*MessageView) GetBornHost

func (msg *MessageView) GetBornHost() *string

func (*MessageView) GetBornTimestamp

func (msg *MessageView) GetBornTimestamp() *time.Time

func (*MessageView) GetDeliveryAttempt

func (msg *MessageView) GetDeliveryAttempt() int32

func (*MessageView) GetDeliveryTimestamp

func (msg *MessageView) GetDeliveryTimestamp() *time.Time

func (*MessageView) GetKeys

func (msg *MessageView) GetKeys() []string

func (*MessageView) GetMessageCommon

func (msg *MessageView) GetMessageCommon() *MessageCommon

func (*MessageView) GetMessageGroup

func (msg *MessageView) GetMessageGroup() *string

func (*MessageView) GetMessageId

func (msg *MessageView) GetMessageId() string

func (*MessageView) GetOffset

func (msg *MessageView) GetOffset() int64

func (*MessageView) GetProperties

func (msg *MessageView) GetProperties() map[string]string

func (*MessageView) GetReceiptHandle

func (msg *MessageView) GetReceiptHandle() string

func (*MessageView) GetTag

func (msg *MessageView) GetTag() *string

func (*MessageView) GetTopic

func (msg *MessageView) GetTopic() string

func (*MessageView) GetTraceContext

func (msg *MessageView) GetTraceContext() *string

func (*MessageView) SetDelayTimeLevel

func (msg *MessageView) SetDelayTimeLevel(deliveryTimestamp time.Time)

func (*MessageView) SetKeys

func (msg *MessageView) SetKeys(keys ...string)

func (*MessageView) SetMessageGroup

func (msg *MessageView) SetMessageGroup(messageGroup string)

func (*MessageView) SetTag

func (msg *MessageView) SetTag(tag string)

type MockClient

type MockClient struct {
	// contains filtered or unexported fields

MockClient is a mock of Client interface.

func NewMockClient

func NewMockClient(ctrl *gomock.Controller) *MockClient

NewMockClient creates a new mock instance.

func (*MockClient) EXPECT

func (m *MockClient) EXPECT() *MockClientMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockClient) GetClientID

func (m *MockClient) GetClientID() string

GetClientID mocks base method.

func (*MockClient) GracefulStop

func (m *MockClient) GracefulStop() error

GracefulStop mocks base method.

func (*MockClient) Sign

func (m *MockClient) Sign(ctx context.Context) context.Context

Sign mocks base method.

type MockClientManager

type MockClientManager struct {
	// contains filtered or unexported fields

MockClientManager is a mock of ClientManager interface.

func NewMockClientManager

func NewMockClientManager(ctrl *gomock.Controller) *MockClientManager

NewMockClientManager creates a new mock instance.

func (*MockClientManager) AckMessage

func (m *MockClientManager) AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, duration time.Duration) (*v2.AckMessageResponse, error)

AckMessage mocks base method.

func (*MockClientManager) ChangeInvisibleDuration

func (m *MockClientManager) ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, request *v2.ChangeInvisibleDurationRequest, duration time.Duration) (*v2.ChangeInvisibleDurationResponse, error)

ChangeInvisibleDuration mocks base method.

func (*MockClientManager) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockClientManager) EndTransaction

func (m *MockClientManager) EndTransaction(ctx context.Context, endpoints *v2.Endpoints, request *v2.EndTransactionRequest, duration time.Duration) (*v2.EndTransactionResponse, error)

EndTransaction mocks base method.

func (*MockClientManager) HeartBeat

func (m *MockClientManager) HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, duration time.Duration) (*v2.HeartbeatResponse, error)

HeartBeat mocks base method.

func (*MockClientManager) NotifyClientTermination

func (m *MockClientManager) NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, request *v2.NotifyClientTerminationRequest, duration time.Duration) (*v2.NotifyClientTerminationResponse, error)

NotifyClientTermination mocks base method.

func (*MockClientManager) QueryRoute

func (m *MockClientManager) QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) (*v2.QueryRouteResponse, error)

QueryRoute mocks base method.

func (*MockClientManager) ReceiveMessage

ReceiveMessage mocks base method.

func (*MockClientManager) RegisterClient

func (m *MockClientManager) RegisterClient(client Client)

RegisterClient mocks base method.

func (*MockClientManager) SendMessage

func (m *MockClientManager) SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, duration time.Duration) (*v2.SendMessageResponse, error)

SendMessage mocks base method.

func (*MockClientManager) Telemetry

Telemetry mocks base method.

func (*MockClientManager) UnRegisterClient

func (m *MockClientManager) UnRegisterClient(client Client)

UnRegisterClient mocks base method.

type MockClientManagerMockRecorder

type MockClientManagerMockRecorder struct {
	// contains filtered or unexported fields

MockClientManagerMockRecorder is the mock recorder for MockClientManager.

func (*MockClientManagerMockRecorder) AckMessage

func (mr *MockClientManagerMockRecorder) AckMessage(ctx, endpoints, request, duration interface{}) *gomock.Call

AckMessage indicates an expected call of AckMessage.

func (*MockClientManagerMockRecorder) ChangeInvisibleDuration

func (mr *MockClientManagerMockRecorder) ChangeInvisibleDuration(ctx, endpoints, request, duration interface{}) *gomock.Call

ChangeInvisibleDuration indicates an expected call of ChangeInvisibleDuration.

func (*MockClientManagerMockRecorder) EndTransaction

func (mr *MockClientManagerMockRecorder) EndTransaction(ctx, endpoints, request, duration interface{}) *gomock.Call

EndTransaction indicates an expected call of EndTransaction.

func (*MockClientManagerMockRecorder) HeartBeat

func (mr *MockClientManagerMockRecorder) HeartBeat(ctx, endpoints, request, duration interface{}) *gomock.Call

HeartBeat indicates an expected call of HeartBeat.

func (*MockClientManagerMockRecorder) NotifyClientTermination

func (mr *MockClientManagerMockRecorder) NotifyClientTermination(ctx, endpoints, request, duration interface{}) *gomock.Call

NotifyClientTermination indicates an expected call of NotifyClientTermination.

func (*MockClientManagerMockRecorder) QueryRoute

func (mr *MockClientManagerMockRecorder) QueryRoute(ctx, endpoints, request, duration interface{}) *gomock.Call

QueryRoute indicates an expected call of QueryRoute.

func (*MockClientManagerMockRecorder) ReceiveMessage

func (mr *MockClientManagerMockRecorder) ReceiveMessage(ctx, endpoints, request interface{}) *gomock.Call

ReceiveMessage indicates an expected call of ReceiveMessage.

func (*MockClientManagerMockRecorder) RegisterClient

func (mr *MockClientManagerMockRecorder) RegisterClient(client interface{}) *gomock.Call

RegisterClient indicates an expected call of RegisterClient.

func (*MockClientManagerMockRecorder) SendMessage

func (mr *MockClientManagerMockRecorder) SendMessage(ctx, endpoints, request, duration interface{}) *gomock.Call

SendMessage indicates an expected call of SendMessage.

func (*MockClientManagerMockRecorder) Telemetry

func (mr *MockClientManagerMockRecorder) Telemetry(ctx, endpoints, duration interface{}) *gomock.Call

Telemetry indicates an expected call of Telemetry.

func (*MockClientManagerMockRecorder) UnRegisterClient

func (mr *MockClientManagerMockRecorder) UnRegisterClient(client interface{}) *gomock.Call

UnRegisterClient indicates an expected call of UnRegisterClient.

type MockClientMockRecorder

type MockClientMockRecorder struct {
	// contains filtered or unexported fields

MockClientMockRecorder is the mock recorder for MockClient.

func (*MockClientMockRecorder) GetClientID

func (mr *MockClientMockRecorder) GetClientID() *gomock.Call

GetClientID indicates an expected call of GetClientID.

func (*MockClientMockRecorder) GracefulStop

func (mr *MockClientMockRecorder) GracefulStop() *gomock.Call

GracefulStop indicates an expected call of GracefulStop.

func (*MockClientMockRecorder) Sign

func (mr *MockClientMockRecorder) Sign(ctx interface{}) *gomock.Call

Sign indicates an expected call of Sign.

type MockRpcClient

type MockRpcClient struct {
	// contains filtered or unexported fields

MockRpcClient is a mock of RpcClient interface.

func NewMockRpcClient

func NewMockRpcClient(ctrl *gomock.Controller) *MockRpcClient

NewMockRpcClient creates a new mock instance.

func (*MockRpcClient) AckMessage

func (m *MockRpcClient) AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error)

AckMessage mocks base method.

func (*MockRpcClient) ChangeInvisibleDuration

ChangeInvisibleDuration mocks base method.

func (*MockRpcClient) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockRpcClient) EndTransaction

func (m *MockRpcClient) EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error)

EndTransaction mocks base method.

func (*MockRpcClient) GetTarget

func (m *MockRpcClient) GetTarget() string

GetTarget mocks base method.

func (*MockRpcClient) GracefulStop

func (m *MockRpcClient) GracefulStop() error

GracefulStop mocks base method.

func (*MockRpcClient) HeartBeat

func (m *MockRpcClient) HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error)

HeartBeat mocks base method.

func (*MockRpcClient) NotifyClientTermination

NotifyClientTermination mocks base method.

func (*MockRpcClient) QueryRoute

func (m *MockRpcClient) QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error)

QueryRoute mocks base method.

func (*MockRpcClient) ReceiveMessage

ReceiveMessage mocks base method.

func (*MockRpcClient) SendMessage

func (m *MockRpcClient) SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error)

SendMessage mocks base method.

func (*MockRpcClient) Telemetry

Telemetry mocks base method.

type MockRpcClientMockRecorder

type MockRpcClientMockRecorder struct {
	// contains filtered or unexported fields

MockRpcClientMockRecorder is the mock recorder for MockRpcClient.

func (*MockRpcClientMockRecorder) AckMessage

func (mr *MockRpcClientMockRecorder) AckMessage(ctx, request interface{}) *gomock.Call

AckMessage indicates an expected call of AckMessage.

func (*MockRpcClientMockRecorder) ChangeInvisibleDuration

func (mr *MockRpcClientMockRecorder) ChangeInvisibleDuration(ctx, request interface{}) *gomock.Call

ChangeInvisibleDuration indicates an expected call of ChangeInvisibleDuration.

func (*MockRpcClientMockRecorder) EndTransaction

func (mr *MockRpcClientMockRecorder) EndTransaction(ctx, request interface{}) *gomock.Call

EndTransaction indicates an expected call of EndTransaction.

func (*MockRpcClientMockRecorder) GetTarget

func (mr *MockRpcClientMockRecorder) GetTarget() *gomock.Call

GetTarget indicates an expected call of GetTarget.

func (*MockRpcClientMockRecorder) GracefulStop

func (mr *MockRpcClientMockRecorder) GracefulStop() *gomock.Call

GracefulStop indicates an expected call of GracefulStop.

func (*MockRpcClientMockRecorder) HeartBeat

func (mr *MockRpcClientMockRecorder) HeartBeat(ctx, request interface{}) *gomock.Call

HeartBeat indicates an expected call of HeartBeat.

func (*MockRpcClientMockRecorder) NotifyClientTermination

func (mr *MockRpcClientMockRecorder) NotifyClientTermination(ctx, request interface{}) *gomock.Call

NotifyClientTermination indicates an expected call of NotifyClientTermination.

func (*MockRpcClientMockRecorder) QueryRoute

func (mr *MockRpcClientMockRecorder) QueryRoute(ctx, request interface{}) *gomock.Call

QueryRoute indicates an expected call of QueryRoute.

func (*MockRpcClientMockRecorder) ReceiveMessage

func (mr *MockRpcClientMockRecorder) ReceiveMessage(ctx, request interface{}) *gomock.Call

ReceiveMessage indicates an expected call of ReceiveMessage.

func (*MockRpcClientMockRecorder) SendMessage

func (mr *MockRpcClientMockRecorder) SendMessage(ctx, request interface{}) *gomock.Call

SendMessage indicates an expected call of SendMessage.

func (*MockRpcClientMockRecorder) Telemetry

func (mr *MockRpcClientMockRecorder) Telemetry(ctx interface{}) *gomock.Call

Telemetry indicates an expected call of Telemetry.

type MockisClient

type MockisClient struct {
	// contains filtered or unexported fields

MockisClient is a mock of isClient interface.

func NewMockisClient

func NewMockisClient(ctrl *gomock.Controller) *MockisClient

NewMockisClient creates a new mock instance.

func (*MockisClient) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

type MockisClientMockRecorder

type MockisClientMockRecorder struct {
	// contains filtered or unexported fields

MockisClientMockRecorder is the mock recorder for MockisClient.

type NewClientFunc

type NewClientFunc func(*Config, ...ClientOption) (Client, error)

type Producer

type Producer interface {
	Send(context.Context, *Message) ([]*SendReceipt, error)
	SendWithTransaction(context.Context, *Message, Transaction) ([]*SendReceipt, error)
	SendAsync(context.Context, *Message, func(context.Context, []*SendReceipt, error))
	BeginTransaction() Transaction
	Start() error
	GracefulStop() error
	// contains filtered or unexported methods

type ProducerOption

type ProducerOption interface {
	// contains filtered or unexported methods

A ProducerOption sets options such as tls.Config, etc.

func WithClientFunc

func WithClientFunc(f NewClientFunc) ProducerOption

WithClientFunc returns a ProducerOption that sets ClientFunc for producer. Default is nameserver.New.

func WithMaxAttempts

func WithMaxAttempts(m int32) ProducerOption

func WithTopics

func WithTopics(t ...string) ProducerOption

func WithTransactionChecker

func WithTransactionChecker(checker *TransactionChecker) ProducerOption

type PublishingLoadBalancer

type PublishingLoadBalancer interface {
	TakeMessageQueueByMessageGroup(messageGroup *string) ([]*v2.MessageQueue, error)
	TakeMessageQueues(excluded sync.Map, count int) ([]*v2.MessageQueue, error)
	CopyAndUpdate([]*v2.MessageQueue) PublishingLoadBalancer

type PublishingMessage

type PublishingMessage struct {
	// contains filtered or unexported fields

type RpcClient

type RpcClient interface {
	GracefulStop() error
	HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error)
	QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error)
	SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error)
	Telemetry(ctx context.Context) (v2.MessagingService_TelemetryClient, error)
	EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error)
	NotifyClientTermination(ctx context.Context, request *v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, error)
	ReceiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error)
	AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error)
	ChangeInvisibleDuration(ctx context.Context, request *v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, error)

	GetTarget() string
	// contains filtered or unexported methods

type RpcClientOption

type RpcClientOption interface {
	// contains filtered or unexported methods

A RpcClientOption sets options such as tls.Config, etc.

func WithHealthCheckDuration

func WithHealthCheckDuration(d time.Duration) RpcClientOption

WithHealthCheckDuration returns a RpcClientOption that sets healthCheckDuration for RpcClient. Default is 15s.

func WithHeartbeatDuration

func WithHeartbeatDuration(d time.Duration) RpcClientOption

WithHeartbeatDuration returns a RpcClientOption that sets heartbeatDuration for RpcClient. Default is 10s.

func WithRpcClientClientConnFunc

func WithRpcClientClientConnFunc(f ClientConnFunc) RpcClientOption

WithRpcClientClientConnFunc returns a RpcClientOption that sets ClientConnFunc for RpcClient. Default is NewClientConn.

func WithRpcClientConnOption

func WithRpcClientConnOption(opts ...ConnOption) RpcClientOption

WithRpcClientConnOption returns a RpcClientOption that sets ConnOption for RpcClient.

func WithRpcClientTimeout

func WithRpcClientTimeout(d time.Duration) RpcClientOption

WithRpcClientTimeout returns a RpcClientOption that sets time for RpcClient when heartbeat and health check. Default is 5s.

type SendReceipt

type SendReceipt struct {
	MessageID     string
	TransactionId string
	Offset        int64
	Endpoints     *v2.Endpoints

type SimpleConsumer

type SimpleConsumer interface {

	Start() error
	GracefulStop() error

	Subscribe(topic string, filterExpression *FilterExpression) error
	Unsubscribe(topic string) error
	Ack(ctx context.Context, messageView *MessageView) error
	Receive(ctx context.Context, maxMessageNum int32, invisibleDuration time.Duration) ([]*MessageView, error)
	ChangeInvisibleDuration(messageView *MessageView, invisibleDuration time.Duration) error
	ChangeInvisibleDurationAsync(messageView *MessageView, invisibleDuration time.Duration)

type SimpleConsumerOption

type SimpleConsumerOption interface {
	// contains filtered or unexported methods

A ConsumerOption sets options such as tag, etc.

func WithAwaitDuration

func WithAwaitDuration(awaitDuration time.Duration) SimpleConsumerOption

func WithSubscriptionExpressions

func WithSubscriptionExpressions(subscriptionExpressions map[string]*FilterExpression) SimpleConsumerOption

WithTag returns a consumerOption that sets tag for consumer. Note: Default it uses *.

type SubscriptionLoadBalancer

type SubscriptionLoadBalancer interface {
	TakeMessageQueue() (*v2.MessageQueue, error)
	CopyAndUpdate([]*v2.MessageQueue) SubscriptionLoadBalancer

type Transaction

type Transaction interface {
	Commit() error
	RollBack() error

type TransactionChecker

type TransactionChecker struct {
	Check func(msg *MessageView) TransactionResolution

type TransactionResolution

type TransactionResolution int32
const (
	UNKNOWN TransactionResolution = iota // 开始生成枚举值, 默认为0

type UnifiedMessage

type UnifiedMessage struct {
	// contains filtered or unexported fields

func (*UnifiedMessage) GetMessage

func (uMsg *UnifiedMessage) GetMessage() *Message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL