From 8ac8c6d4616216f5e99517a832757e6619def9bb Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Mon, 1 May 2023 11:00:48 -0700 Subject: [PATCH] [azservicebus, azeventhubs] Stress test and logging improvement (#20710) Logging improvements: * Updating the logging to print more tracing information (per-link) in prep for the bigger release coming up. * Trimming out some of the verbose logging, seeing if I can get it a bit more reasonable. Stress tests: * Add a timestamp to the log name we generate and also default to append, not overwrite. * Use 0.5 cores, 0.5GB as our baseline. Some pods use more and I'll tune them more later. --- .../azeventhubs/internal/eh/stress/deploy.ps1 | 2 +- .../eh/stress/templates/stress-test-job.yaml | 6 +- sdk/messaging/azservicebus/client_test.go | 12 +- .../azservicebus/internal/amqpLinks.go | 161 ++++++++++-------- .../azservicebus/internal/amqp_test_utils.go | 9 + .../internal/amqplinks_unit_test.go | 20 +-- sdk/messaging/azservicebus/internal/rpc.go | 3 - .../azservicebus/internal/rpc_test.go | 21 +-- .../azservicebus/internal/stress/.gitignore | 2 +- .../azservicebus/internal/stress/.helmignore | 1 + .../azservicebus/internal/stress/deploy.ps1 | 23 +++ .../internal/stress/scenarios-matrix.yaml | 24 +-- .../stress/templates/stress-test-job.yaml | 4 +- .../internal/stress/tests/finite_peeks.go | 14 +- .../stress/tests/mostly_idle_receiver.go | 2 +- .../internal/test/test_helpers.go | 25 +++ .../azservicebus/internal/utils/logger.go | 37 ++++ .../azservicebus/internal/utils/retrier.go | 10 +- .../internal/utils/retrier_test.go | 10 +- sdk/messaging/azservicebus/receiver.go | 20 +-- .../azservicebus/receiver_simulated_test.go | 6 +- sdk/messaging/azservicebus/receiver_test.go | 18 +- .../azservicebus/receiver_unit_test.go | 8 +- 23 files changed, 272 insertions(+), 166 deletions(-) create mode 100644 sdk/messaging/azservicebus/internal/utils/logger.go diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/deploy.ps1 b/sdk/messaging/azeventhubs/internal/eh/stress/deploy.ps1 index 2abdb8722ec9..79cd01e79509 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/deploy.ps1 +++ b/sdk/messaging/azeventhubs/internal/eh/stress/deploy.ps1 @@ -4,7 +4,7 @@ Set-Location $PSScriptRoot function deployUsingLocalAddons() { $azureSDKToolsRoot="" $stressTestAddonsFolder = "$azureSDKToolsRoot/tools/stress-cluster/cluster/kubernetes/stress-test-addons" - $clusterResourceGroup = " set -ex; mkdir -p "$DEBUG_SHARE"; - /app/stress "{{.Stress.testTarget}}" "-rounds" "{{.Stress.rounds}}" "-prefetch" "{{.Stress.prefetch}}" "{{.Stress.verbose}}" "-sleepAfter" "{{.Stress.sleepAfter}}" | tee "${DEBUG_SHARE}/{{ .Stress.Scenario }}.log"; + /app/stress "{{.Stress.testTarget}}" "-rounds" "{{.Stress.rounds}}" "-prefetch" "{{.Stress.prefetch}}" "{{.Stress.verbose}}" "-sleepAfter" "{{.Stress.sleepAfter}}" | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log"; # Pulls the image on pod start, always. We tend to push to the same image and tag over and over again # when iterating, so this is a must. imagePullPolicy: Always @@ -33,8 +33,8 @@ spec: # just uses 'limits' for both. resources: limits: - memory: "1.5Gi" - cpu: "1" + memory: "0.5Gi" + cpu: "0.5" {{- include "stress-test-addons.container-env" . | nindent 6 }} {{- end -}} diff --git a/sdk/messaging/azservicebus/client_test.go b/sdk/messaging/azservicebus/client_test.go index fd7e7bf37732..e4eef534558a 100644 --- a/sdk/messaging/azservicebus/client_test.go +++ b/sdk/messaging/azservicebus/client_test.go @@ -442,14 +442,16 @@ func TestClientUnauthorizedCreds(t *testing.T) { }) t.Run("invalid identity creds", func(t *testing.T) { - tenantID := os.Getenv("AZURE_TENANT_ID") - clientID := os.Getenv("AZURE_CLIENT_ID") - endpoint := os.Getenv("SERVICEBUS_ENDPOINT") + identityVars := test.GetIdentityVars(t) - cliCred, err := azidentity.NewClientSecretCredential(tenantID, clientID, "bogus-client-secret", nil) + if identityVars == nil { + return + } + + cliCred, err := azidentity.NewClientSecretCredential(identityVars.TenantID, identityVars.ClientID, "bogus-client-secret", nil) require.NoError(t, err) - client, err := NewClient(endpoint, cliCred, nil) + client, err := NewClient(identityVars.Endpoint, cliCred, nil) require.NoError(t, err) defer test.RequireClose(t, client) diff --git a/sdk/messaging/azservicebus/internal/amqpLinks.go b/sdk/messaging/azservicebus/internal/amqpLinks.go index c187eacaf51b..a7d0590fc669 100644 --- a/sdk/messaging/azservicebus/internal/amqpLinks.go +++ b/sdk/messaging/azservicebus/internal/amqpLinks.go @@ -59,6 +59,13 @@ type AMQPLinks interface { // ClosedPermanently is true if AMQPLinks.Close(ctx, true) has been called. ClosedPermanently() bool + + // Writef logs a message, with a prefix that represents the AMQPLinks instance + // for better traceability. + Writef(evt azlog.Event, format string, args ...any) + + // Prefix is the current logging prefix, usable for logging and continuity. + Prefix() string } // AMQPLinksImpl manages the set of AMQP links (and detritus) typically needed to work @@ -107,7 +114,7 @@ type AMQPLinksImpl struct { ns NamespaceForAMQPLinks - name string + utils.Logger } // CreateLinkFunc creates the links, using the given session. Typically you'll only create either an @@ -132,6 +139,7 @@ func NewAMQPLinks(args NewAMQPLinksArgs) AMQPLinks { closedPermanently: false, getRecoveryKindFunc: args.GetRecoveryKindFunc, ns: args.NS, + Logger: utils.NewLogger(), } return l @@ -145,7 +153,7 @@ func (links *AMQPLinksImpl) ManagementPath() string { // recoverLink will recycle all associated links (mgmt, receiver, sender and session) // and recreate them using the link.linkCreator function. func (links *AMQPLinksImpl) recoverLink(ctx context.Context, theirLinkRevision LinkID) error { - log.Writef(exported.EventConn, "Recovering link only") + links.Writef(exported.EventConn, "Recovering link only") links.mu.RLock() closedPermanently := links.closedPermanently @@ -190,40 +198,44 @@ func (links *AMQPLinksImpl) RecoverIfNeeded(ctx context.Context, theirID LinkID, return nil } - log.Writef(exported.EventConn, "[%s] Recovering link for error %s", links.name, origErr.Error()) + links.Writef(exported.EventConn, "Recovering link for error %s", origErr.Error()) rk := links.getRecoveryKindFunc(origErr) if rk == RecoveryKindLink { + oldPrefix := links.Prefix() + if err := links.recoverLink(ctx, theirID); err != nil { - azlog.Writef(exported.EventConn, "[%s] Error when recovering link for recovery: %s", links.name, err) + links.Writef(exported.EventConn, "Error when recovering link for recovery: %s", err) return err } - log.Writef(exported.EventConn, "[%s] Recovered links", links.name) + links.Writef(exported.EventConn, "Recovered links (old: %s)", oldPrefix) return nil } else if rk == RecoveryKindConn { + oldPrefix := links.Prefix() + if err := links.recoverConnection(ctx, theirID); err != nil { - log.Writef(exported.EventConn, "[%s] failed to recreate connection: %s", links.name, err.Error()) + links.Writef(exported.EventConn, "failed to recreate connection: %s", err.Error()) return err } - log.Writef(exported.EventConn, "[%s] Recovered connection and links", links.name) + links.Writef(exported.EventConn, "Recovered connection and links (old: %s)", oldPrefix) return nil } - log.Writef(exported.EventConn, "[%s] Recovered, no action needed", links.name) + links.Writef(exported.EventConn, "Recovered, no action needed") return nil } func (links *AMQPLinksImpl) recoverConnection(ctx context.Context, theirID LinkID) error { - log.Writef(exported.EventConn, "Recovering connection (and links)") + links.Writef(exported.EventConn, "Recovering connection (and links)") links.mu.Lock() defer links.mu.Unlock() if theirID.Link == links.id.Link { - log.Writef(exported.EventConn, "closing old link: current:%v, old:%v", links.id, theirID) + links.Writef(exported.EventConn, "closing old link: current:%v, old:%v", links.id, theirID) // we're clearing out this link because the connection is about to get recreated. So we can // safely ignore any problems here, we're just trying to make sure the state is reset. @@ -233,7 +245,7 @@ func (links *AMQPLinksImpl) recoverConnection(ctx context.Context, theirID LinkI created, err := links.ns.Recover(ctx, uint64(theirID.Conn)) if err != nil { - log.Writef(exported.EventConn, "Recover connection failure: %s", err) + links.Writef(exported.EventConn, "Recover connection failure: %s", err) return err } @@ -243,7 +255,7 @@ func (links *AMQPLinksImpl) recoverConnection(ctx context.Context, theirID LinkI // (if it wasn't the same then we've already recovered and created a new link, // so no recovery would be needed) if created || theirID.Link == links.id.Link { - log.Writef(exported.EventConn, "recreating link: c: %v, current:%v, old:%v", created, links.id, theirID) + links.Writef(exported.EventConn, "recreating link: c: %v, current:%v, old:%v", created, links.id, theirID) // best effort close, the connection these were built on is gone. _ = links.closeWithoutLocking(ctx, false) @@ -303,21 +315,21 @@ func (l *AMQPLinksImpl) Get(ctx context.Context) (*LinksWithID, error) { }, nil } -func (l *AMQPLinksImpl) Retry(ctx context.Context, eventName log.Event, operation string, fn RetryWithLinksFn, o exported.RetryOptions) error { +func (links *AMQPLinksImpl) Retry(ctx context.Context, eventName log.Event, operation string, fn RetryWithLinksFn, o exported.RetryOptions) error { var lastID LinkID didQuickRetry := false isFatalErrorFunc := func(err error) bool { - return l.getRecoveryKindFunc(err) == RecoveryKindFatal + return links.getRecoveryKindFunc(err) == RecoveryKindFatal } - return utils.Retry(ctx, eventName, operation, func(ctx context.Context, args *utils.RetryFnArgs) error { - if err := l.RecoverIfNeeded(ctx, lastID, args.LastErr); err != nil { + return utils.Retry(ctx, eventName, links.Prefix()+"("+operation+")", func(ctx context.Context, args *utils.RetryFnArgs) error { + if err := links.RecoverIfNeeded(ctx, lastID, args.LastErr); err != nil { return err } - linksWithVersion, err := l.Get(ctx) + linksWithVersion, err := links.Get(ctx) if err != nil { return err @@ -345,7 +357,7 @@ func (l *AMQPLinksImpl) Retry(ctx context.Context, eventName log.Event, operatio // Whereas normally you'd do (for non-detach errors): // 0th attempt // (actual retries) - log.Writef(exported.EventConn, "(%s) Link was previously detached. Attempting quick reconnect to recover from error: %s", operation, err.Error()) + links.Writef(exported.EventConn, "(%s) Link was previously detached. Attempting quick reconnect to recover from error: %s", operation, err.Error()) didQuickRetry = true args.ResetAttempts() } @@ -387,29 +399,29 @@ func (l *AMQPLinksImpl) Close(ctx context.Context, permanent bool) error { // eats the cost of recovery, instead of doing it immediately. This is useful // if you're trying to exit out of a function quickly but still need to react // to a returned error. -func (l *AMQPLinksImpl) CloseIfNeeded(ctx context.Context, err error) RecoveryKind { - l.mu.Lock() - defer l.mu.Unlock() +func (links *AMQPLinksImpl) CloseIfNeeded(ctx context.Context, err error) RecoveryKind { + links.mu.Lock() + defer links.mu.Unlock() if IsCancelError(err) { - log.Writef(exported.EventConn, "[%s] No close needed for cancellation", l.name) + links.Writef(exported.EventConn, "No close needed for cancellation") return RecoveryKindNone } - rk := l.getRecoveryKindFunc(err) + rk := links.getRecoveryKindFunc(err) switch rk { case RecoveryKindLink: - log.Writef(exported.EventConn, "[%s] Closing links for error %s", l.name, err.Error()) - _ = l.closeWithoutLocking(ctx, false) + links.Writef(exported.EventConn, "Closing links for error %s", err.Error()) + _ = links.closeWithoutLocking(ctx, false) return rk case RecoveryKindFatal: - log.Writef(exported.EventConn, "[%s] Fatal error cleanup", l.name) + links.Writef(exported.EventConn, "Fatal error cleanup") fallthrough case RecoveryKindConn: - log.Writef(exported.EventConn, "[%s] Closing connection AND links for error %s", l.name, err.Error()) - _ = l.closeWithoutLocking(ctx, false) - _ = l.ns.Close(false) + links.Writef(exported.EventConn, "Closing connection AND links for error %s", err.Error()) + _ = links.closeWithoutLocking(ctx, false) + _ = links.ns.Close(false) return rk case RecoveryKindNone: return rk @@ -419,46 +431,46 @@ func (l *AMQPLinksImpl) CloseIfNeeded(ctx context.Context, err error) RecoveryKi } // initWithoutLocking will create a new link, unconditionally. -func (l *AMQPLinksImpl) initWithoutLocking(ctx context.Context) error { - tmpCancelAuthRefreshLink, _, err := l.ns.NegotiateClaim(ctx, l.entityPath) +func (links *AMQPLinksImpl) initWithoutLocking(ctx context.Context) error { + tmpCancelAuthRefreshLink, _, err := links.ns.NegotiateClaim(ctx, links.entityPath) if err != nil { - if err := l.closeWithoutLocking(ctx, false); err != nil { - log.Writef(exported.EventConn, "Failure during link cleanup after negotiateClaim: %s", err.Error()) + if err := links.closeWithoutLocking(ctx, false); err != nil { + links.Writef(exported.EventConn, "Failure during link cleanup after negotiateClaim: %s", err.Error()) } return err } - l.cancelAuthRefreshLink = tmpCancelAuthRefreshLink + links.cancelAuthRefreshLink = tmpCancelAuthRefreshLink - tmpCancelAuthRefreshMgmtLink, _, err := l.ns.NegotiateClaim(ctx, l.managementPath) + tmpCancelAuthRefreshMgmtLink, _, err := links.ns.NegotiateClaim(ctx, links.managementPath) if err != nil { - if err := l.closeWithoutLocking(ctx, false); err != nil { - log.Writef(exported.EventConn, "Failure during link cleanup after negotiate claim for mgmt link: %s", err.Error()) + if err := links.closeWithoutLocking(ctx, false); err != nil { + links.Writef(exported.EventConn, "Failure during link cleanup after negotiate claim for mgmt link: %s", err.Error()) } return err } - l.cancelAuthRefreshMgmtLink = tmpCancelAuthRefreshMgmtLink + links.cancelAuthRefreshMgmtLink = tmpCancelAuthRefreshMgmtLink - tmpSession, cr, err := l.ns.NewAMQPSession(ctx) + tmpSession, cr, err := links.ns.NewAMQPSession(ctx) if err != nil { - if err := l.closeWithoutLocking(ctx, false); err != nil { - log.Writef(exported.EventConn, "Failure during link cleanup after creating AMQP session: %s", err.Error()) + if err := links.closeWithoutLocking(ctx, false); err != nil { + links.Writef(exported.EventConn, "Failure during link cleanup after creating AMQP session: %s", err.Error()) } return err } - l.session = tmpSession - l.id.Conn = cr + links.session = tmpSession + links.id.Conn = cr - tmpSender, tmpReceiver, err := l.createLink(ctx, l.session) + tmpSender, tmpReceiver, err := links.createLink(ctx, links.session) if err != nil { - if err := l.closeWithoutLocking(ctx, false); err != nil { - log.Writef(exported.EventConn, "Failure during link cleanup after creating link: %s", err.Error()) + if err := links.closeWithoutLocking(ctx, false); err != nil { + links.Writef(exported.EventConn, "Failure during link cleanup after creating link: %s", err.Error()) } return err } @@ -467,28 +479,29 @@ func (l *AMQPLinksImpl) initWithoutLocking(ctx context.Context) error { panic("Both tmpReceiver and tmpSender are nil") } - l.Sender, l.Receiver = tmpSender, tmpReceiver + links.Sender, links.Receiver = tmpSender, tmpReceiver - tmpRPCLink, err := l.ns.NewRPCLink(ctx, l.ManagementPath()) + tmpRPCLink, err := links.ns.NewRPCLink(ctx, links.ManagementPath()) if err != nil { - if err := l.closeWithoutLocking(ctx, false); err != nil { - log.Writef("Failure during link cleanup after creating mgmt client: %s", err.Error()) + if err := links.closeWithoutLocking(ctx, false); err != nil { + links.Writef(exported.EventConn, "Failure during link cleanup after creating mgmt client: %s", err.Error()) } return err } - l.RPCLink = tmpRPCLink - l.id.Link++ + links.RPCLink = tmpRPCLink + links.id.Link++ - if l.Sender != nil { - linkName := l.Sender.LinkName() - l.name = fmt.Sprintf("c:%d, l:%d, s:name:%s", l.id.Conn, l.id.Link, linkName) - } else if l.Receiver != nil { - l.name = fmt.Sprintf("c:%d, l:%d, r:name:%s", l.id.Conn, l.id.Link, l.Receiver.LinkName()) + if links.Sender != nil { + linkName := links.Sender.LinkName() + links.SetPrefix("c:%d, l:%d, s:name:%0.6s", links.id.Conn, links.id.Link, linkName) + } else if links.Receiver != nil { + linkName := links.Receiver.LinkName() + links.SetPrefix("c:%d, l:%d, r:name:%0.6s", links.id.Conn, links.id.Link, linkName) } - log.Writef(exported.EventConn, "[%s] Links created", l.name) + links.Writef(exported.EventConn, "Links created") return nil } @@ -502,39 +515,39 @@ func (l *AMQPLinksImpl) initWithoutLocking(ctx context.Context) error { // Regardless of cancellation or Close() call failures, all local state will be cleaned up. // // NOTE: No locking is done in this function, call `Close` if you require locking. -func (l *AMQPLinksImpl) closeWithoutLocking(ctx context.Context, permanent bool) error { - if l.closedPermanently { +func (links *AMQPLinksImpl) closeWithoutLocking(ctx context.Context, permanent bool) error { + if links.closedPermanently { return nil } - log.Writef(exported.EventConn, "[%s] Links closing (permanent: %v)", l.name, permanent) + links.Writef(exported.EventConn, "Links closing (permanent: %v)", permanent) defer func() { if permanent { - l.closedPermanently = true + links.closedPermanently = true } }() var messages []string - if l.cancelAuthRefreshLink != nil { - l.cancelAuthRefreshLink() - l.cancelAuthRefreshLink = nil + if links.cancelAuthRefreshLink != nil { + links.cancelAuthRefreshLink() + links.cancelAuthRefreshLink = nil } - if l.cancelAuthRefreshMgmtLink != nil { - l.cancelAuthRefreshMgmtLink() - l.cancelAuthRefreshMgmtLink = nil + if links.cancelAuthRefreshMgmtLink != nil { + links.cancelAuthRefreshMgmtLink() + links.cancelAuthRefreshMgmtLink = nil } closeables := []struct { name string instance amqpwrap.Closeable }{ - {"Sender", l.Sender}, - {"Receiver", l.Receiver}, - {"Session", l.session}, - {"RPC", l.RPCLink}, + {"Sender", links.Sender}, + {"Receiver", links.Receiver}, + {"Session", links.session}, + {"RPC", links.RPCLink}, } wasCancelled := false @@ -546,6 +559,8 @@ func (l *AMQPLinksImpl) closeWithoutLocking(ctx context.Context, permanent bool) continue } + links.Writef(exported.EventConn, "Closing %s", c.name) + if err := c.instance.Close(ctx); err != nil { if IsCancelError(err) { wasCancelled = true @@ -555,7 +570,7 @@ func (l *AMQPLinksImpl) closeWithoutLocking(ctx context.Context, permanent bool) } } - l.Sender, l.Receiver, l.session, l.RPCLink = nil, nil, nil, nil + links.Sender, links.Receiver, links.session, links.RPCLink = nil, nil, nil, nil if wasCancelled { return ctx.Err() diff --git a/sdk/messaging/azservicebus/internal/amqp_test_utils.go b/sdk/messaging/azservicebus/internal/amqp_test_utils.go index 5b4680769ae2..ef336e3f2650 100644 --- a/sdk/messaging/azservicebus/internal/amqp_test_utils.go +++ b/sdk/messaging/azservicebus/internal/amqp_test_utils.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/Azure/azure-sdk-for-go/sdk/internal/log" + azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/go-amqp" @@ -207,6 +208,14 @@ func (l *FakeAMQPLinks) Retry(ctx context.Context, eventName log.Event, operatio return fn(ctx, lwr, &utils.RetryFnArgs{}) } +func (l *FakeAMQPLinks) Writef(evt azlog.Event, format string, args ...any) { + log.Writef(evt, "[prefix] "+format, args...) +} + +func (l *FakeAMQPLinks) Prefix() string { + return "prefix" +} + func (l *FakeAMQPLinks) Close(ctx context.Context, permanently bool) error { if permanently { l.permanently = true diff --git a/sdk/messaging/azservicebus/internal/amqplinks_unit_test.go b/sdk/messaging/azservicebus/internal/amqplinks_unit_test.go index 5fdd1f3b894a..70e06658f287 100644 --- a/sdk/messaging/azservicebus/internal/amqplinks_unit_test.go +++ b/sdk/messaging/azservicebus/internal/amqplinks_unit_test.go @@ -88,7 +88,7 @@ func TestAMQPLinksRetriesUnit(t *testing.T) { logMessages := endLogging() if testData.ExpectReset { - require.Contains(t, logMessages, fmt.Sprintf("[azsb.Conn] (OverallOperation) Link was previously detached. Attempting quick reconnect to recover from error: %s", err.Error())) + require.Contains(t, logMessages, fmt.Sprintf("[azsb.Conn] [c:100, l:1, s:name:sender] (OverallOperation) Link was previously detached. Attempting quick reconnect to recover from error: %s", err.Error())) } else { for _, msg := range logMessages { require.NotContains(t, msg, "Link was previously detached") @@ -126,11 +126,11 @@ func TestAMQPLinks_Logging(t *testing.T) { actualLogs := endCapture() expectedLogs := []string{ - "[azsb.Conn] [] Recovering link for error amqp: link closed", + "[azsb.Conn] Recovering link for error amqp: link closed", "[azsb.Conn] Recovering link only", - "[azsb.Conn] [] Links closing (permanent: false)", - "[azsb.Conn] [c:100, l:1, r:name:fakelink] Links created", - "[azsb.Conn] [c:100, l:1, r:name:fakelink] Recovered links"} + "[azsb.Conn] Links closing (permanent: false)", + "[azsb.Conn] [c:100, l:1, r:name:fakeli] Links created", + "[azsb.Conn] [c:100, l:1, r:name:fakeli] Recovered links (old: )"} require.Equal(t, expectedLogs, actualLogs) }) @@ -161,14 +161,14 @@ func TestAMQPLinks_Logging(t *testing.T) { actualLogs := endCapture() expectedLogs := []string{ - "[azsb.Conn] [] Recovering link for error amqp: connection closed", + "[azsb.Conn] Recovering link for error amqp: connection closed", "[azsb.Conn] Recovering connection (and links)", "[azsb.Conn] closing old link: current:{0 0}, old:{0 0}", - "[azsb.Conn] [] Links closing (permanent: false)", + "[azsb.Conn] Links closing (permanent: false)", "[azsb.Conn] recreating link: c: true, current:{0 0}, old:{0 0}", - "[azsb.Conn] [] Links closing (permanent: false)", - "[azsb.Conn] [c:101, l:1, r:name:fakelink] Links created", - "[azsb.Conn] [c:101, l:1, r:name:fakelink] Recovered connection and links"} + "[azsb.Conn] Links closing (permanent: false)", + "[azsb.Conn] [c:101, l:1, r:name:fakeli] Links created", + "[azsb.Conn] [c:101, l:1, r:name:fakeli] Recovered connection and links (old: )"} require.Equal(t, expectedLogs, actualLogs) }) diff --git a/sdk/messaging/azservicebus/internal/rpc.go b/sdk/messaging/azservicebus/internal/rpc.go index a69c37c4a248..2dbb180b54a1 100644 --- a/sdk/messaging/azservicebus/internal/rpc.go +++ b/sdk/messaging/azservicebus/internal/rpc.go @@ -148,13 +148,10 @@ func NewRPCLink(ctx context.Context, args RPCLinkArgs) (amqpwrap.RPCLink, error) return link, nil } -const responseRouterShutdownMessage = "Response router has shut down" - // responseRouter is responsible for taking any messages received on the 'response' // link and forwarding it to the proper channel. The channel is being select'd by the // original `RPC` call. func (l *rpcLink) responseRouter() { - defer azlog.Writef(l.logEvent, responseRouterShutdownMessage) defer close(l.responseRouterClosed) for { diff --git a/sdk/messaging/azservicebus/internal/rpc_test.go b/sdk/messaging/azservicebus/internal/rpc_test.go index 4bf5a2865278..6ca286c85d9a 100644 --- a/sdk/messaging/azservicebus/internal/rpc_test.go +++ b/sdk/messaging/azservicebus/internal/rpc_test.go @@ -8,7 +8,6 @@ import ( "errors" "net" "testing" - "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/go-amqp" @@ -36,10 +35,6 @@ func TestRPCLinkNonErrorRequiresRecovery(t *testing.T) { defer func() { require.NoError(t, link.Close(context.Background())) }() - messagesCh := make(chan string, 10000) - endCapture := test.CaptureLogsForTestWithChannel(messagesCh, false) - defer endCapture() - responses := []*rpcTestResp{ // this error requires recovery (in this case, connection but there's no // distinction between types in RPCLink) @@ -53,23 +48,11 @@ func TestRPCLinkNonErrorRequiresRecovery(t *testing.T) { }) require.Nil(t, resp) - // (give the response router a teeny bit to shut down) - time.Sleep(500 * time.Millisecond) + linkImpl := link.(*rpcLink) + <-linkImpl.responseRouterClosed var netOpError net.Error require.ErrorAs(t, err, &netOpError) - -LogLoop: - for { - select { - case msg := <-messagesCh: - if msg == "[rpctesting] "+responseRouterShutdownMessage { - break LogLoop - } - default: - require.Fail(t, "RPC router never shut down") - } - } } func TestRPCLinkNonErrorRequiresNoRecovery(t *testing.T) { diff --git a/sdk/messaging/azservicebus/internal/stress/.gitignore b/sdk/messaging/azservicebus/internal/stress/.gitignore index fcef718cad3c..099d255e217d 100644 --- a/sdk/messaging/azservicebus/internal/stress/.gitignore +++ b/sdk/messaging/azservicebus/internal/stress/.gitignore @@ -3,4 +3,4 @@ stress.exe logs generatedValues.yaml deploy-test-*.ps1 - +*.log diff --git a/sdk/messaging/azservicebus/internal/stress/.helmignore b/sdk/messaging/azservicebus/internal/stress/.helmignore index 43294410b6b0..d7c1ed9789e0 100644 --- a/sdk/messaging/azservicebus/internal/stress/.helmignore +++ b/sdk/messaging/azservicebus/internal/stress/.helmignore @@ -3,3 +3,4 @@ stress.exe .env Dockerfile *.go +*.log diff --git a/sdk/messaging/azservicebus/internal/stress/deploy.ps1 b/sdk/messaging/azservicebus/internal/stress/deploy.ps1 index 8369fb669503..e1026202066f 100644 --- a/sdk/messaging/azservicebus/internal/stress/deploy.ps1 +++ b/sdk/messaging/azservicebus/internal/stress/deploy.ps1 @@ -1,2 +1,25 @@ Set-Location $PSScriptRoot + +function deployUsingLocalAddons() { + $azureSDKToolsRoot="" + $stressTestAddonsFolder = "$azureSDKToolsRoot/tools/stress-cluster/cluster/kubernetes/stress-test-addons" + $clusterResourceGroup = "" + $clusterSubscription = "" + $helmEnv = "pg2" + + if (-not (Get-ChildItem $stressTestAddonsFolder)) { + Write-Host "Can't find the the new stress test adons folder at $stressTestAddonsFolder" + return + } + + pwsh "$azureSDKToolsRoot/eng/common/scripts/stress-testing/deploy-stress-tests.ps1" ` + -LocalAddonsPath "$stressTestAddonsFolder" ` + -clusterGroup "$clusterResourceGroup" ` + -subscription "$clusterSubscription" ` + -Environment $helmEnv ` + -Login ` + -PushImages +} + +#deployUsingLocalAddons pwsh "../../../../../eng/common/scripts/stress-testing/deploy-stress-tests.ps1" -Login -PushImages @args diff --git a/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml b/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml index bfca4a7af5f7..4c095d870e01 100644 --- a/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml +++ b/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml @@ -12,46 +12,46 @@ matrix: scenarios: constantDetach: testTarget: constantDetach - memory: "1.5Gi" + memory: "0.5Gi" constantDetachmentSender: testTarget: constantDetachmentSender - memory: "1.5Gi" + memory: "0.5Gi" emptySessions: testTarget: emptySessions memory: "1.0Gi" finitePeeks: testTarget: finitePeeks - memory: "1.5Gi" + memory: "0.5Gi" finiteSendAndReceive: testTarget: finiteSendAndReceive - memory: "1.5Gi" + memory: "0.5Gi" finiteSessions: testTarget: finiteSessions memory: "4Gi" idleFastReconnect: testTarget: idleFastReconnect - memory: "1.5Gi" + memory: "0.5Gi" infiniteSendAndReceive: testTarget: infiniteSendAndReceive - memory: "1.5Gi" + memory: "0.5Gi" infiniteSendAndReceiveWithChaos: testTarget: infiniteSendAndReceive # this value is injected as a label value in templates/deploy-job.yaml # this'll activate our standard chaos policy, which is at the bottom of that file. chaos: "true" - memory: "1.5Gi" + memory: "0.5Gi" longRunningRenewLock: testTarget: longRunningRenewLock - memory: "1.5Gi" + memory: "0.5Gi" mostlyIdleReceiver: testTarget: mostlyIdleReceiver - memory: "1.5Gi" + memory: "0.5Gi" rapidOpenClose: testTarget: rapidOpenClose - memory: "1.5Gi" + memory: "0.5Gi" receiveCancellation: testTarget: receiveCancellation - memory: "1.5Gi" + memory: "0.5Gi" sendAndReceiveDrain: testTarget: sendAndReceiveDrain - memory: "1.5Gi" + memory: "0.5Gi" diff --git a/sdk/messaging/azservicebus/internal/stress/templates/stress-test-job.yaml b/sdk/messaging/azservicebus/internal/stress/templates/stress-test-job.yaml index a2d1c74d746d..b183e545ffd8 100644 --- a/sdk/messaging/azservicebus/internal/stress/templates/stress-test-job.yaml +++ b/sdk/messaging/azservicebus/internal/stress/templates/stress-test-job.yaml @@ -17,7 +17,7 @@ spec: - > set -ex; mkdir -p "$DEBUG_SHARE"; - /app/stress tests "{{ .Stress.testTarget }}" | tee "${DEBUG_SHARE}/{{ .Stress.Scenario }}.log"; + /app/stress tests "{{ .Stress.testTarget }}" | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log"; # Pulls the image on pod start, always. We tend to push to the same image and tag over and over again # when iterating, so this is a must. imagePullPolicy: Always @@ -29,7 +29,7 @@ spec: resources: limits: memory: {{.Stress.memory }} - cpu: "1" + cpu: "0.5" {{- include "stress-test-addons.container-env" . | nindent 6 }} {{- end -}} diff --git a/sdk/messaging/azservicebus/internal/stress/tests/finite_peeks.go b/sdk/messaging/azservicebus/internal/stress/tests/finite_peeks.go index ca0dc64ae33c..ba8735bf536c 100644 --- a/sdk/messaging/azservicebus/internal/stress/tests/finite_peeks.go +++ b/sdk/messaging/azservicebus/internal/stress/tests/finite_peeks.go @@ -28,11 +28,13 @@ func FinitePeeks(remainingArgs []string) { sender, err := client.NewSender(queueName, nil) sc.PanicOnError("failed to create sender", err) + log.Printf("Sending a single message") err = sender.SendMessage(sc.Context, &azservicebus.Message{ Body: []byte("peekable message"), }, nil) sc.PanicOnError("failed to send message", err) + log.Printf("Closing sender") _ = sender.Close(sc.Context) receiver, err := client.NewReceiverForQueue(queueName, nil) @@ -51,9 +53,13 @@ func FinitePeeks(remainingArgs []string) { sc.PanicOnError("failed to abandon message", receiver.AbandonMessage(sc.Context, tmp[0], nil)) - for i := 0; i < 10000; i++ { - log.Printf("Sleeping for 1 second before iteration %d", i) - time.Sleep(time.Second) + const maxPeeks = 10000 + const peekSleep = 500 * time.Millisecond + + log.Printf("Now peeking %d times, every %dms", maxPeeks, peekSleep/time.Millisecond) + + for i := 1; i <= maxPeeks; i++ { + time.Sleep(peekSleep) seqNum := int64(0) @@ -65,4 +71,6 @@ func FinitePeeks(remainingArgs []string) { receiverStats.AddReceived(int32(1)) } + + log.Printf("Done, peeked %d times", maxPeeks) } diff --git a/sdk/messaging/azservicebus/internal/stress/tests/mostly_idle_receiver.go b/sdk/messaging/azservicebus/internal/stress/tests/mostly_idle_receiver.go index d707f1edd747..1d9b40c23c1d 100644 --- a/sdk/messaging/azservicebus/internal/stress/tests/mostly_idle_receiver.go +++ b/sdk/messaging/azservicebus/internal/stress/tests/mostly_idle_receiver.go @@ -83,7 +83,7 @@ func MostlyIdleReceiver(remainingArgs []string) { messages, err := receiver.ReceiveMessages(sc.Context, 1, nil) sc.PanicOnError(fmt.Sprintf("failed receiving messages for duration %s", duration), err) - log.Printf("Received messages %#v", messages) + log.Printf("Received %d messages", len(messages)) stats.AddReceived(int32(len(messages))) for _, msg := range messages { diff --git a/sdk/messaging/azservicebus/internal/test/test_helpers.go b/sdk/messaging/azservicebus/internal/test/test_helpers.go index 7a1fb62812cc..49636ec457ed 100644 --- a/sdk/messaging/azservicebus/internal/test/test_helpers.go +++ b/sdk/messaging/azservicebus/internal/test/test_helpers.go @@ -67,6 +67,31 @@ func GetConnectionStringListenOnly(t *testing.T) string { return getEnvOrSkipTest(t, "SERVICEBUS_CONNECTION_STRING_LISTEN_ONLY") } +func GetIdentityVars(t *testing.T) *struct { + TenantID string + ClientID string + Secret string + Endpoint string +} { + runningLiveTest := GetConnectionString(t) != "" + + if !runningLiveTest { + return nil + } + + return &struct { + TenantID string + ClientID string + Secret string + Endpoint string + }{ + TenantID: getEnvOrSkipTest(t, "AZURE_TENANT_ID"), + ClientID: getEnvOrSkipTest(t, "AZURE_CLIENT_ID"), + Endpoint: getEnvOrSkipTest(t, "SERVICEBUS_ENDPOINT"), + Secret: getEnvOrSkipTest(t, "AZURE_CLIENT_SECRET"), + } +} + func getEnvOrSkipTest(t *testing.T, name string) string { cs := os.Getenv(name) diff --git a/sdk/messaging/azservicebus/internal/utils/logger.go b/sdk/messaging/azservicebus/internal/utils/logger.go new file mode 100644 index 000000000000..7298427e0251 --- /dev/null +++ b/sdk/messaging/azservicebus/internal/utils/logger.go @@ -0,0 +1,37 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package utils + +import ( + "fmt" + "sync/atomic" + + azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log" +) + +type Logger struct { + prefix *atomic.Value +} + +func NewLogger() Logger { + value := &atomic.Value{} + value.Store("") + + return Logger{ + prefix: value, + } +} + +func (l *Logger) SetPrefix(format string, args ...any) { + l.prefix.Store(fmt.Sprintf("["+format+"] ", args...)) +} + +func (l *Logger) Prefix() string { + return l.prefix.Load().(string) +} + +func (l *Logger) Writef(evt azlog.Event, format string, args ...any) { + prefix := l.prefix.Load().(string) + azlog.Writef(evt, prefix+format, args...) +} diff --git a/sdk/messaging/azservicebus/internal/utils/retrier.go b/sdk/messaging/azservicebus/internal/utils/retrier.go index 53d03a68202e..5fe3434d7842 100644 --- a/sdk/messaging/azservicebus/internal/utils/retrier.go +++ b/sdk/messaging/azservicebus/internal/utils/retrier.go @@ -50,7 +50,7 @@ func Retry(ctx context.Context, eventName log.Event, operation string, fn func(c for i := int32(0); i <= ro.MaxRetries; i++ { if i > 0 { sleep := calcDelay(ro, i) - log.Writef(eventName, "(%s) Retry attempt %d sleeping for %s", operation, i, sleep) + log.Writef(eventName, "%s Retry attempt %d sleeping for %s", operation, i, sleep) select { case <-ctx.Done(): @@ -66,7 +66,7 @@ func Retry(ctx context.Context, eventName log.Event, operation string, fn func(c err = fn(ctx, &args) if args.resetAttempts { - log.Writef(eventName, "(%s) Resetting retry attempts", operation) + log.Writef(eventName, "%s Resetting retry attempts", operation) // it looks weird, but we're doing -1 here because the post-increment // will set it back to 0, which is what we want - go back to the 0th @@ -79,13 +79,13 @@ func Retry(ctx context.Context, eventName log.Event, operation string, fn func(c if err != nil { if isFatalFn(err) { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - log.Writef(eventName, "(%s) Retry attempt %d was cancelled, stopping: %s", operation, i, err.Error()) + log.Writef(eventName, "%s Retry attempt %d was cancelled, stopping: %s", operation, i, err.Error()) } else { - log.Writef(eventName, "(%s) Retry attempt %d returned non-retryable error: %s", operation, i, err.Error()) + log.Writef(eventName, "%s Retry attempt %d returned non-retryable error: %s", operation, i, err.Error()) } return err } else { - log.Writef(eventName, "(%s) Retry attempt %d returned retryable error: %s", operation, i, err.Error()) + log.Writef(eventName, "%s Retry attempt %d returned retryable error: %s", operation, i, err.Error()) } continue diff --git a/sdk/messaging/azservicebus/internal/utils/retrier_test.go b/sdk/messaging/azservicebus/internal/utils/retrier_test.go index c316fcdf23a1..4c7282a5e897 100644 --- a/sdk/messaging/azservicebus/internal/utils/retrier_test.go +++ b/sdk/messaging/azservicebus/internal/utils/retrier_test.go @@ -294,7 +294,7 @@ func TestRetryLogging(t *testing.T) { t.Run("normal error", func(t *testing.T) { logsFn := test.CaptureLogsForTest(false) - err := Retry(context.Background(), testLogEvent, "my_operation", func(ctx context.Context, args *RetryFnArgs) error { + err := Retry(context.Background(), testLogEvent, "(my_operation)", func(ctx context.Context, args *RetryFnArgs) error { azlog.Writef("TestFunc", "Attempt %d, within test func, returning error hello", args.I) return errors.New("hello") }, func(err error) bool { @@ -325,7 +325,7 @@ func TestRetryLogging(t *testing.T) { t.Run("normal error2", func(t *testing.T) { test.EnableStdoutLogging(t) - err := Retry(context.Background(), testLogEvent, "my_operation", func(ctx context.Context, args *RetryFnArgs) error { + err := Retry(context.Background(), testLogEvent, "(my_operation)", func(ctx context.Context, args *RetryFnArgs) error { azlog.Writef("TestFunc", "Attempt %d, within test func, returning error hello", args.I) return errors.New("hello") }, func(err error) bool { @@ -339,7 +339,7 @@ func TestRetryLogging(t *testing.T) { t.Run("cancellation error", func(t *testing.T) { logsFn := test.CaptureLogsForTest(false) - err := Retry(context.Background(), testLogEvent, "test_operation", func(ctx context.Context, args *RetryFnArgs) error { + err := Retry(context.Background(), testLogEvent, "(test_operation)", func(ctx context.Context, args *RetryFnArgs) error { azlog.Writef("TestFunc", "Attempt %d, within test func", args.I) return context.Canceled @@ -359,7 +359,7 @@ func TestRetryLogging(t *testing.T) { t.Run("custom fatal error", func(t *testing.T) { logsFn := test.CaptureLogsForTest(false) - err := Retry(context.Background(), testLogEvent, "test_operation", func(ctx context.Context, args *RetryFnArgs) error { + err := Retry(context.Background(), testLogEvent, "(test_operation)", func(ctx context.Context, args *RetryFnArgs) error { azlog.Writef("TestFunc", "Attempt %d, within test func", args.I) return errors.New("custom fatal error") @@ -380,7 +380,7 @@ func TestRetryLogging(t *testing.T) { logsFn := test.CaptureLogsForTest(false) reset := false - err := Retry(context.Background(), testLogEvent, "test_operation", func(ctx context.Context, args *RetryFnArgs) error { + err := Retry(context.Background(), testLogEvent, "(test_operation)", func(ctx context.Context, args *RetryFnArgs) error { azlog.Writef("TestFunc", "Attempt %d, within test func", args.I) if !reset { diff --git a/sdk/messaging/azservicebus/receiver.go b/sdk/messaging/azservicebus/receiver.go index 59b5b48db698..240fd4b2924b 100644 --- a/sdk/messaging/azservicebus/receiver.go +++ b/sdk/messaging/azservicebus/receiver.go @@ -11,7 +11,6 @@ import ( "sync/atomic" "time" - "github.com/Azure/azure-sdk-for-go/sdk/internal/log" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported" @@ -321,7 +320,7 @@ func (r *Receiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage, o func (r *Receiver) Close(ctx context.Context) error { cancelReleaser := r.cancelReleaser.Swap(emptyCancelFn).(func() string) releaserID := cancelReleaser() - log.Writef(EventReceiver, "Stopped message releaser with ID '%s'", releaserID) + r.amqpLinks.Writef(EventReceiver, "Stopped message releaser with ID '%s'", releaserID) r.cleanupOnClose() return r.amqpLinks.Close(ctx, true) @@ -388,21 +387,20 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt // might have exited before all credits were used up. currentReceiverCredits := int64(linksWithID.Receiver.Credits()) creditsToIssue := int64(maxMessages) - currentReceiverCredits - log.Writef(EventReceiver, "Asking for %d credits", maxMessages) if creditsToIssue > 0 { - log.Writef(EventReceiver, "Only need to issue %d additional credits", creditsToIssue) + r.amqpLinks.Writef(EventReceiver, "Issuing %d credits, have %d", creditsToIssue, currentReceiverCredits) if err := linksWithID.Receiver.IssueCredit(uint32(creditsToIssue)); err != nil { return nil, err } } else { - log.Writef(EventReceiver, "No additional credits needed, still have %d credits active", currentReceiverCredits) + r.amqpLinks.Writef(EventReceiver, "Have %d credits, no new credits needed", currentReceiverCredits) } result := r.fetchMessages(ctx, linksWithID.Receiver, maxMessages, r.defaultTimeAfterFirstMsg) - log.Writef(EventReceiver, "Received %d/%d messages", len(result.Messages), maxMessages) + r.amqpLinks.Writef(EventReceiver, "Received %d/%d messages", len(result.Messages), maxMessages) // this'll only close anything if the error indicates that the link/connection is bad. // it's safe to call with cancellation errors. @@ -417,7 +415,7 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt releaserFunc := r.newReleaserFunc(linksWithID.Receiver) go releaserFunc() } else { - log.Writef(EventReceiver, "Failure when receiving messages: %s", result.Error) + r.amqpLinks.Writef(EventReceiver, "Failure when receiving messages: %s", result.Error) } // If the user does get some messages we ignore 'error' and return only the messages. @@ -616,8 +614,6 @@ func (r *Receiver) newReleaserFunc(receiver amqpwrap.AMQPReceiver) func() { return func() { defer close(done) - log.Writef(EventReceiver, "[%s] Message releaser starting...", receiver.LinkName()) - for { // we might not have all the messages we need here. msg, err := receiver.Receive(ctx, nil) @@ -631,10 +627,12 @@ func (r *Receiver) newReleaserFunc(receiver amqpwrap.AMQPReceiver) func() { } if internal.IsCancelError(err) { - log.Writef(exported.EventReceiver, "[%s] Message releaser pausing. Released %d messages", receiver.LinkName(), released) + if released > 0 { + r.amqpLinks.Writef(exported.EventReceiver, "Message releaser pausing. Released %d messages", released) + } break } else if internal.GetRecoveryKind(err) != internal.RecoveryKindNone { - log.Writef(exported.EventReceiver, "[%s] Message releaser stopping because of link failure. Released %d messages. Will start again after next receive: %s", receiver.LinkName(), released, err) + r.amqpLinks.Writef(exported.EventReceiver, "Message releaser stopping because of link failure. Released %d messages. Will start again after next receive: %s", released, err) break } } diff --git a/sdk/messaging/azservicebus/receiver_simulated_test.go b/sdk/messaging/azservicebus/receiver_simulated_test.go index f61cd95878b5..18f7210aacda 100644 --- a/sdk/messaging/azservicebus/receiver_simulated_test.go +++ b/sdk/messaging/azservicebus/receiver_simulated_test.go @@ -632,7 +632,9 @@ func TestReceiver_CreditsDontExceedMax(t *testing.T) { messages, err = receiver.ReceiveMessages(baseReceiveCtx, 5000, nil) require.NoError(t, err) require.Equal(t, []string{"hello world"}, getSortedBodies(messages)) - require.Contains(t, logsFn(), "[azsb.Receiver] No additional credits needed, still have 5000 credits active") + logs := logsFn() + + require.Contains(t, logs, "[azsb.Receiver] [c:1, l:1, r:name:c:001|] Have 5000 credits, no new credits needed") ctx, cancel = context.WithTimeout(baseReceiveCtx, time.Second) defer cancel() @@ -644,7 +646,7 @@ func TestReceiver_CreditsDontExceedMax(t *testing.T) { messages, err = receiver.ReceiveMessages(ctx, 5000, nil) require.ErrorIs(t, err, context.DeadlineExceeded) require.Empty(t, messages) - require.Contains(t, logsFn(), "[azsb.Receiver] Only need to issue 1 additional credits") + require.Contains(t, logsFn(), "[azsb.Receiver] [c:1, l:1, r:name:c:001|] Issuing 1 credits, have 4999") require.Equal(t, 1, len(md.Events.GetOpenConns())) require.Equal(t, 3+3, len(md.Events.GetOpenLinks()), "Sender and Receiver each own 3 links apiece ($mgmt, actual link)") diff --git a/sdk/messaging/azservicebus/receiver_test.go b/sdk/messaging/azservicebus/receiver_test.go index 640a6abe651e..0d691447c9cc 100644 --- a/sdk/messaging/azservicebus/receiver_test.go +++ b/sdk/messaging/azservicebus/receiver_test.go @@ -6,7 +6,7 @@ package azservicebus import ( "context" "fmt" - "os" + "regexp" "sort" "strings" "testing" @@ -569,8 +569,11 @@ func TestReceiver_RenewMessageLock(t *testing.T) { logMessages := endCaptureFn() failedOnFirstTry := false + + re := regexp.MustCompile(`^\[azsb.Receiver\] \[c:1, l:1, r:name:[^\]]+\] \(renewMessageLock\) Retry attempt 0 returned non-retryable error`) + for _, msg := range logMessages { - if strings.HasPrefix(msg, "[azsb.Receiver] (renewMessageLock) Retry attempt 0 returned non-retryable error") { + if re.MatchString(msg) { failedOnFirstTry = true } } @@ -866,14 +869,15 @@ func TestReceiverUnauthorizedCreds(t *testing.T) { }) t.Run("invalid identity creds", func(t *testing.T) { - tenantID := os.Getenv("AZURE_TENANT_ID") - clientID := os.Getenv("AZURE_CLIENT_ID") - endpoint := os.Getenv("SERVICEBUS_ENDPOINT") + identityVars := test.GetIdentityVars(t) + if identityVars == nil { + return + } - cliCred, err := azidentity.NewClientSecretCredential(tenantID, clientID, "bogus-client-secret", nil) + cliCred, err := azidentity.NewClientSecretCredential(identityVars.TenantID, identityVars.ClientID, "bogus-client-secret", nil) require.NoError(t, err) - client, err := NewClient(endpoint, cliCred, nil) + client, err := NewClient(identityVars.Endpoint, cliCred, nil) require.NoError(t, err) defer test.RequireClose(t, client) diff --git a/sdk/messaging/azservicebus/receiver_unit_test.go b/sdk/messaging/azservicebus/receiver_unit_test.go index d06f86f5ddf4..50bf141d70f0 100644 --- a/sdk/messaging/azservicebus/receiver_unit_test.go +++ b/sdk/messaging/azservicebus/receiver_unit_test.go @@ -183,9 +183,11 @@ func TestReceiver_releaserFunc(t *testing.T) { <-receiverClosed t.Logf("Receiver has closed") + logs := logsFn() + require.Contains(t, - logsFn(), - fmt.Sprintf("[azsb.Receiver] [fakelink] Message releaser pausing. Released %d messages", successfulReleases), + logs, + fmt.Sprintf("[azsb.Receiver] [prefix] Message releaser pausing. Released %d messages", successfulReleases), ) } @@ -224,7 +226,7 @@ func TestReceiver_releaserFunc_errorOnFirstMessage(t *testing.T) { require.Contains(t, logsFn(), - fmt.Sprintf("[azsb.Receiver] [fakelink] Message releaser stopping because of link failure. Released 0 messages. Will start again after next receive: %s", &amqp.LinkError{})) + fmt.Sprintf("[azsb.Receiver] Message releaser stopping because of link failure. Released 0 messages. Will start again after next receive: %s", &amqp.LinkError{})) } func TestReceiver_releaserFunc_receiveAndDeleteIsNoop(t *testing.T) {