shared

package
v0.0.0-...-45adee8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2024 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AttrAMQPDeliveryState string = "amqp.delivery_state"
	AttrAMQPStatusCode    string = "amqp.status_code"

	// TODO: I made these up entirely
	AttrMessageCount string = "amqp.message_count"
)
View Source
const (
	MetricStressSuccessfulCancels = "stress.cancels"
)

these metrics are specific to stress tests and wouldn't be in customer code.

Variables

This section is empty.

Functions

func AddAuthFlags

func AddAuthFlags(fs *flag.FlagSet) func() (*azservicebus.Client, *admin.Client, error)

AddAuthFlags adds the flags needed for authenticating to Service Bus. Returns a function that can be called after the flags have been parsed, which will create the an *azservicebus.Client.

func ConstantlyUpdateQueue

func ConstantlyUpdateQueue(ctx context.Context, adminClient *admin.Client, queue string, updateInterval time.Duration) error

ConstantlyUpdateQueue updates queue, changing the MaxDeliveryCount properly between 11 and 10, every `updateInterval` This will cause Service Bus to issue force-detaches to our links, allowing us to exercise our recovery logic.

func ForceQueueDetach

func ForceQueueDetach(ctx context.Context, adminClient *admin.Client, queue string) error

func LoadEnvironment

func LoadEnvironment() error

LoadEnvironment loads an .env file. If the env var `ENV_FILE` exists, we assume the value is a path to an .env file Otherwise we fall back to loading from the current directory.

func MustCreateAutoDeletingQueue

func MustCreateAutoDeletingQueue(sc *StressContext, queueName string, qp *admin.QueueProperties) *admin.Client

MustCreateAutoDeletingQueue creates a queue that will auto-delete 10 minutes after activity has ceased.

func MustCreateSubscriptions

func MustCreateSubscriptions(sc *StressContext, topicName string, subscriptionNames []string, options *MustCreateSubscriptionsOptions) func()

func MustGenerateMessages

func MustGenerateMessages(sc *StressContext, sender *TrackingSender, messageLimit int, numExtraBytes int)

func NewCtrlCContext

func NewCtrlCContext() (context.Context, context.CancelFunc)

NewCtrlCContext creates a context that cancels if the user hits ctrl+c.

func TrackDuration

func TrackDuration(ctx context.Context, tc *TelemetryClientWrapper, name Metric) func(map[string]string)

TrackDuration tracks durations (as a metric), using the initial call to TrackDuration as the start. The duration is ended when you call the returned function. TrackDuration respects any included baggage in the context.

func TrackError

func TrackError(ctx context.Context, tc *TelemetryClientWrapper, err error)

TrackError tracks an error (using the AppInsights exceptions table). TrackError respects any included baggage in the context.

NOTE: this function does not consider context cancellations/deadlines as errors.

func TrackMetric

func TrackMetric(ctx context.Context, tc *TelemetryClientWrapper, name Metric, value float64, attrs map[string]string)

TrackMetric tracks metric and respects any included baggage in the context.

func UpdateBaggage

func UpdateBaggage(ctx context.Context, baggage map[string]string) map[string]string

func WithBaggage

func WithBaggage(ctx context.Context, baggage map[string]string) context.Context

Types

type BaseTelemetry

type BaseTelemetry struct {
	Properties map[string]string
}

type EventTelemetry

type EventTelemetry struct {
	Properties map[string]string
}

func NewEventTelemetry

func NewEventTelemetry(name string) *EventTelemetry

type ExceptionTelemetry

type ExceptionTelemetry struct {
	BaseTelemetry BaseTelemetry
}

func NewExceptionTelemetry

func NewExceptionTelemetry(err error) *ExceptionTelemetry

type Metric

type Metric string
const (
	MetricConnectionLost Metric = "messaging.servicebus.connectionlost"
	MetricMessagesSent   Metric = "messaging.servicebus.messages.sent"

	// metrics related to Service Bus sessions (NOT amqp sessions)
	MetricSessionAccept    Metric = "messaging.servicebus.session.accept"
	MetricSessionTimeoutMS Metric = "messaging.servicebus.session.timeout"

	MetricSettlementRequestDuration Metric = "messaging.servicebus.settlement.request.duration"
	MetricReceiveLag                Metric = "messaging.servicebus.receiver.lag"

	MetricAMQPSendDuration Metric = "messaging.az.amqp.producer.send.duration"

	MetricAMQPMgmtRequestDuration Metric = "messaging.az.amqp.management.request.duration"

	MetricAMQPSettlementRequestDuration Metric = "messaging.servicebus.settlement.request.duration"
	MetricAMQPSettlementSequenceNum     Metric = "messaging.servicebus.settlement.sequence_number"

	// TODO: I've made these up entirely.
	MetricMessageReceived Metric = "messaging.servicebus.messages.received"
	MetricMessagePeeked   Metric = "messaging.servicebus.messages.peeked"
	MetricCloseDuration   Metric = "messaging.servicebus.close.duration"
	MetricLockRenew       Metric = "messaging.servicebus.lockrenew.duration" // TODO: separate for session vs message lock?
)

These names are modeled off of the metrics from Java https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusMeter.java

and from our standard for attributes: https://gist.github.com/lmolkova/e4215c0f44a49ef824983382762e6b92

type MetricTelemetry

type MetricTelemetry struct {
	Name          string
	Value         any
	BaseTelemetry BaseTelemetry
}

type MustCreateSubscriptionsOptions

type MustCreateSubscriptionsOptions struct {
	Topic        *admin.CreateTopicOptions
	Subscription *admin.CreateSubscriptionOptions
}

type StreamingMessageBatch

type StreamingMessageBatch struct {
	// contains filtered or unexported fields
}

func NewStreamingMessageBatch

func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender, expectedTotal int) (*StreamingMessageBatch, error)

func (*StreamingMessageBatch) Add

Add appends to the current batch. If it's full it'll send it, allocate a new one.

func (*StreamingMessageBatch) Close

func (sb *StreamingMessageBatch) Close(ctx context.Context) error

Close sends any messages currently held in our batch.

type StressContext

type StressContext struct {
	TC      *TelemetryClientWrapper
	Context context.Context

	// TestRunID represents the test run and can be used to tie into other container metrics generated within the test cluster.
	TestRunID string

	// Nano is the nanoseconds start time for the stress test run
	Nano string

	// Endpoint is the value from SERVICEBUS_ENDPOINT
	Endpoint string

	Cred azcore.TokenCredential
	// contains filtered or unexported fields
}

StressContext holds onto some common useful state for stress tests, including some simple stats tracking, a telemetry client and a context that represents the lifetime of the test itself (and will be cancelled if the user quits out of the stress)

func MustCreateStressContext

func MustCreateStressContext(testName string, options *StressContextOptions) *StressContext

func (*StressContext) Assert

func (tracker *StressContext) Assert(condition bool, message string)

func (*StressContext) End

func (sc *StressContext) End()

func (*StressContext) Equal

func (tracker *StressContext) Equal(val1 any, val2 any)

func (*StressContext) Failf

func (tracker *StressContext) Failf(format string, args ...any)

func (*StressContext) LogIfFailed

func (sc *StressContext) LogIfFailed(message string, err error)

func (*StressContext) Nil

func (tracker *StressContext) Nil(val1 any)

func (*StressContext) NoError

func (tracker *StressContext) NoError(err error)

func (*StressContext) NoErrorf

func (tracker *StressContext) NoErrorf(err error, format string, args ...any)

func (*StressContext) PanicOnError

func (tracker *StressContext) PanicOnError(message string, err error)

PanicOnError logs, sends telemetry and then closes on error

func (*StressContext) Start

func (sc *StressContext) Start(entityName string, attributes map[string]string)

type StressContextOptions

type StressContextOptions struct {
	// Duration is the amount of time the stress test should run before
	// the StressContext.Context expires.
	Duration time.Duration

	// CommonBaggage will be added as part of the telemetry client, and will be included in each
	// metric/event/error that's reported.
	CommonBaggage map[string]string

	// EmitStartEvent enables the automatic sending of the "Start" event for our test to telemetry.
	EmitStartEvent bool
}

type TelemetryClientWrapper

type TelemetryClientWrapper struct {
	// contains filtered or unexported fields
}

TelemetryClientWrapper is a wrapper for telemetry client, once we get that phased back in.

func (*TelemetryClientWrapper) Context

func (*TelemetryClientWrapper) Flush

func (tc *TelemetryClientWrapper) Flush()

func (*TelemetryClientWrapper) Track

func (tc *TelemetryClientWrapper) Track(evt any)

func (*TelemetryClientWrapper) TrackEvent

func (tc *TelemetryClientWrapper) TrackEvent(name string)

func (*TelemetryClientWrapper) TrackException

func (tc *TelemetryClientWrapper) TrackException(err error)

type TelemetryClientWrapperContext

type TelemetryClientWrapperContext struct {
	CommonProperties map[string]string
}

type TestContext

type TestContext struct {
	*StressContext
	Client *azservicebus.Client
}

type TrackingReceiver

type TrackingReceiver struct {
	// contains filtered or unexported fields
}

TrackingReceiver reports metrics and errors automatically for its methods.

func NewTrackingReceiverForQueue

func NewTrackingReceiverForQueue(tc *TelemetryClientWrapper, client *azservicebus.Client, queueName string, options *azservicebus.ReceiverOptions) (*TrackingReceiver, error)

func NewTrackingReceiverForSubscription

func NewTrackingReceiverForSubscription(tc *TelemetryClientWrapper, client *azservicebus.Client, topicName string, subscriptionName string, options *azservicebus.ReceiverOptions) (*TrackingReceiver, error)

func (*TrackingReceiver) AbandonMessage

func (*TrackingReceiver) Close

func (tr *TrackingReceiver) Close(ctx context.Context) error

func (*TrackingReceiver) CompleteMessage

func (*TrackingReceiver) PeekMessages

func (tr *TrackingReceiver) PeekMessages(ctx context.Context, maxMessageCount int, options *azservicebus.PeekMessagesOptions) ([]*azservicebus.ReceivedMessage, error)

func (*TrackingReceiver) ReceiveMessages

func (tr *TrackingReceiver) ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error)

func (*TrackingReceiver) RenewMessageLock

type TrackingSender

type TrackingSender struct {
	// contains filtered or unexported fields
}

TrackingSender reports metrics and errors automatically for its methods.

func NewTrackingSender

func NewTrackingSender(tc *TelemetryClientWrapper, client *azservicebus.Client, queueOrTopic string, options *azservicebus.NewSenderOptions) (*TrackingSender, error)

func (*TrackingSender) Close

func (ts *TrackingSender) Close(ctx context.Context) error

func (*TrackingSender) NewMessageBatch

func (*TrackingSender) SendMessage

func (ts *TrackingSender) SendMessage(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error

func (*TrackingSender) SendMessageBatch

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL