Skip to content

Commit

Permalink
[azservicebus] Fixing a memory leak when doing cross receiver settlem…
Browse files Browse the repository at this point in the history
…ent. (#22368)

Fixing a memory leak when doing cross receiver settlement.

go-amqp holds onto some tracking data that won't get cleared if we don't try to settle through the original Receiver. Our fix in here, combined with go-amqp's changes to route to the original receiver, should seal that up.

Benchmark added that also doubles as a stress test.

Fixes #22318
  • Loading branch information
richardpark-msft committed Feb 14, 2024
1 parent e62d2c1 commit 7899ebe
Show file tree
Hide file tree
Showing 21 changed files with 470 additions and 160 deletions.
3 changes: 0 additions & 3 deletions sdk/messaging/azservicebus/amqp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ type AMQPAnnotatedMessage struct {
// Properties corresponds to the properties section of an AMQP message.
Properties *AMQPAnnotatedMessageProperties

linkName string

// inner is the AMQP message we originally received, which contains some hidden
// data that's needed to settle with go-amqp. We strip out most of the underlying
// data so it's fairly minimal.
Expand Down Expand Up @@ -273,7 +271,6 @@ func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message, receivingLinkName stri
DeliveryTag: goAMQPMessage.DeliveryTag,
Footer: footer,
Header: header,
linkName: receivingLinkName,
Properties: properties,
inner: goAMQPMessage,
}
Expand Down
11 changes: 8 additions & 3 deletions sdk/messaging/azservicebus/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ func TestNewClientWithAzureIdentity(t *testing.T) {

receiver, err := client.NewReceiverForQueue(queue, nil)
require.NoError(t, err)
actualSettler, _ := receiver.settler.(*messageSettler)
actualSettler.onlyDoBackupSettlement = true // this'll also exercise the management link

messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
require.NoError(t, err)

require.EqualValues(t, []string{"hello - authenticating with a TokenCredential"}, getSortedBodies(messages))
forceManagementSettlement(t, messages)

for _, m := range messages {
err = receiver.CompleteMessage(context.TODO(), m, nil)
Expand Down Expand Up @@ -550,7 +549,7 @@ func TestNewClientUnitTests(t *testing.T) {
MaxRetryDelay: 12 * time.Hour,
}, receiver.retryOptions)

actualSettler := receiver.settler.(*messageSettler)
actualSettler := receiver.settler

require.Equal(t, RetryOptions{
MaxRetries: 101,
Expand Down Expand Up @@ -580,3 +579,9 @@ func assertRPCNotFound(t *testing.T, err error) {
require.ErrorAs(t, err, &rpcError)
require.Equal(t, http.StatusNotFound, rpcError.RPCCode())
}

func forceManagementSettlement(t *testing.T, messages []*ReceivedMessage) {
for _, m := range messages {
m.settleOnMgmtLink = true
}
}
4 changes: 2 additions & 2 deletions sdk/messaging/azservicebus/internal/mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,10 @@ func SetSessionState(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName str
return nil
}

// SendDisposition allows you settle a message using the management link, rather than via your
// SettleOnMgmtLink allows you settle a message using the management link, rather than via your
// *amqp.Receiver. Use this if the receiver has been closed/lost or if the message isn't associated
// with a link (ex: deferred messages).
func SendDisposition(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]any) error {
func SettleOnMgmtLink(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]any) error {
if lockToken == nil {
err := errors.New("lock token on the message is not set, thus cannot send disposition")
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
DispositionTypeAccept DispositionType = "accept"
DispositionTypeReject DispositionType = "reject"
DispositionTypeRelease DispositionType = "release"
DispositionTypeModify DispositionType = "modify" // used for abandoning a message
)

type DispositionEvent struct {
Expand Down
6 changes: 3 additions & 3 deletions sdk/messaging/azservicebus/internal/stress/Chart.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dependencies:
- name: stress-test-addons
repository: https://stresstestcharts.blob.core.windows.net/helm/
version: 0.3.0
digest: sha256:3e21a7fdf5d6b37e871a6dd9f755888166fbb24802aa517f51d1d9223b47656e
generated: "2023-09-26T11:43:56.706771668-07:00"
version: 0.3.1
digest: sha256:28e374f8db5c46447b2a1491d4361ceb126536c425cbe54be49017120fe7b27d
generated: "2024-02-05T17:21:31.510400504-08:00"
3 changes: 3 additions & 0 deletions sdk/messaging/azservicebus/internal/stress/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ ENV CGO_ENABLED=0
ADD . /src
WORKDIR /src/internal/stress
RUN go build -o stress .
WORKDIR /src/internal/stress/tests/benchmarks
RUN go test -c

# The first container is just for building the artifacts, and contains all the source, etc...
# That container instance only ever lives on your local machine (or the build machine).
Expand All @@ -15,5 +17,6 @@ RUN go build -o stress .
FROM mcr.microsoft.com/cbl-mariner/base/core:2.0
WORKDIR /app
COPY --from=build /src/internal/stress/stress /app/stress
COPY --from=build /src/internal/stress/tests/benchmarks/benchmarks.test /app/benchmarks.test
RUN yum update -y && yum install -y ca-certificates
ENTRYPOINT ["/bin/bash"]
10 changes: 7 additions & 3 deletions sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
displayNames:
# this makes it so these don't show up in the scenario names,
# this makes it so these don't show up in the scenario names,
# since they're just clutter.
1.5Gi": ""
4Gi": ""
Expand All @@ -23,7 +23,7 @@ matrix:
testTarget: finitePeeks
memory: "0.5Gi"
finiteSendAndReceive:
testTarget: finiteSendAndReceive
testTarget: finiteSendAndReceive
memory: "0.5Gi"
finiteSessions:
testTarget: finiteSessions
Expand Down Expand Up @@ -52,10 +52,14 @@ matrix:
memory: "0.5Gi"
rapidOpenClose:
testTarget: rapidOpenClose
memory: "0.5Gi"
memory: "0.5Gi"
receiveCancellation:
testTarget: receiveCancellation
memory: "0.5Gi"
sendAndReceiveDrain:
testTarget: sendAndReceiveDrain
memory: "0.5Gi"
benchmarkBackupSettlementLeak:
benchmark: true
testTarget: "BenchmarkBackupSettlementLeakWhileOldReceiverStillAlive"
memory: "1.0Gi"
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,25 @@ func (sw *senderWrapper) NewMessageBatch(ctx context.Context, options *azservice
return sw.inner.NewMessageBatch(ctx, options)
}

func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender) (*StreamingMessageBatch, error) {
func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender, expectedTotal int) (*StreamingMessageBatch, error) {
batch, err := sender.NewMessageBatch(ctx, nil)

if err != nil {
return nil, err
}

return &StreamingMessageBatch{
sender: sender,
currentBatch: batch,
sender: sender,
currentBatch: batch,
expectedTotal: expectedTotal,
}, nil
}

type StreamingMessageBatch struct {
sender internalBatchSender
currentBatch internalBatch
sender internalBatchSender
currentBatch internalBatch
expectedTotal int
total int
}

// Add appends to the current batch. If it's full it'll send it, allocate a new one.
Expand All @@ -65,11 +68,13 @@ func (sb *StreamingMessageBatch) Add(ctx context.Context, msg *azservicebus.Mess
return err
}

log.Printf("Sending message batch (%d messages)", sb.currentBatch.NumMessages())
log.Printf("Sending message batch with %d messages. %d/%d messages sent so far.", sb.currentBatch.NumMessages(), sb.total, sb.expectedTotal)
if err := sb.sender.SendMessageBatch(ctx, sb.currentBatch); err != nil {
return err
}

sb.total += int(sb.currentBatch.NumMessages())

// throttle a teeny bit.
time.Sleep(time.Second)

Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/internal/stress/shared/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func MustGenerateMessages(sc *StressContext, sender *TrackingSender, messageLimi

log.Printf("Sending %d messages", messageLimit)

streamingBatch, err := NewStreamingMessageBatch(ctx, &senderWrapper{inner: sender})
streamingBatch, err := NewStreamingMessageBatch(ctx, &senderWrapper{inner: sender}, messageLimit)
sc.PanicOnError("failed to create streaming batch", err)

extraBytes := make([]byte, numExtraBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ spec:
- >
set -ex;
mkdir -p "$DEBUG_SHARE";
{{ if ne .Stress.benchmark true }}
/app/stress tests "{{ .Stress.testTarget }}" 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
{{ else }}
/app/benchmarks.test -test.timeout 24h -test.bench {{ .Stress.testTarget }} 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
{{ end }}
# Pulls the image on pod start, always. We tend to push to the same image and tag over and over again
# when iterating, so this is a must.
imagePullPolicy: Always
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package benchmarks

import (
"context"
"fmt"
"log"
"math"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/stress/shared"
)

// BackupSettlementLeak checks that, when we use backup settlement, that we're not
// leaking memory. This came up in a couple of issues for a customer:
// - https://github.com/Azure/azure-sdk-for-go/issues/22318
// - https://github.com/Azure/azure-sdk-for-go/issues/22157
//
// The use case for backup settlement is for when the original link we've received
// on has gone offline, so we need to settle via the management$ link instead. However,
// the underlying go-amqp link is tracking several bits of state for the message which
// will never get cleared. Since that receiver was dead it was going to get garbage
// collected anyways, so this was non-issue.
//
// This customer's use case was slightly different - they were completing on a separate
// receiver even when the original receiving link was still alive. This means the memory
// leak is just accumulating and never gets garbage collected since there's no trigger
// to know when to clear out any tracking state for the message.
func BenchmarkBackupSettlementLeakWhileOldReceiverStillAlive(b *testing.B) {
b.StopTimer()

sc := shared.MustCreateStressContext("BenchmarkBackupSettlementLeak", nil)
defer sc.End()

sent := int64(100000)

client, queueName := mustInitBenchmarkBackupSettlementLeak(sc, b, int(sent))

oldReceiver, err := client.NewReceiverForQueue(queueName, nil)
sc.NoError(err)

newReceiver, err := client.NewReceiverForQueue(queueName, nil)
sc.NoError(err)

b.StartTimer()

var completed int64
expected := maxDeliveryCount * int64(sent)

for completed < expected {
// receive from the old receiver and...
receiveCtx, cancel := context.WithTimeout(context.Background(), time.Minute)

messages, err := oldReceiver.ReceiveMessages(receiveCtx, int(math.Min(float64(expected-completed), 5000)), &azservicebus.ReceiveMessagesOptions{
// not super scientific - mostly just want to get slightly fuller batches
TimeAfterFirstMessage: 30 * time.Second,
})
cancel()
sc.NoError(err)

wg := sync.WaitGroup{}
wg.Add(len(messages))

// ...completing on another receiver
for _, m := range messages {
m := m

go func() {
defer wg.Done()

// abandon it so we see the message a few times (until it's deadlettered after 10 tries)
err := newReceiver.AbandonMessage(context.Background(), m, nil)
sc.NoError(err)
atomic.AddInt64(&completed, 1)
}()
}

wg.Wait()

b.Logf("Settled %d/%d", completed, sent)
}

log.Printf("Forcing garbage collection\n")
runtime.GC()
log.Printf("Done with collection\n")
time.Sleep(1 * time.Minute)
}

func mustInitBenchmarkBackupSettlementLeak(sc *shared.StressContext, b *testing.B, numToSend int) (*azservicebus.Client, string) {
queueName := fmt.Sprintf("backup-settlement-tester-%s", sc.Nano)
shared.MustCreateAutoDeletingQueue(sc, queueName, &admin.QueueProperties{
MaxDeliveryCount: to.Ptr[int32](maxDeliveryCount),
})

client, err := azservicebus.NewClientFromConnectionString(sc.ConnectionString, nil)
sc.PanicOnError("failed to create client", err)

sender, err := shared.NewTrackingSender(sc.TC, client, queueName, nil)
sc.PanicOnError("create a sender", err)

shared.MustGenerateMessages(sc, sender, numToSend, 0)

return client, queueName
}

const maxDeliveryCount = 20
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package benchmarks

import (
"log"
"os"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/stress/shared"
)

func TestMain(m *testing.M) {
if os.Getenv("ENV_FILE") == "" {
os.Setenv("ENV_FILE", "../../../../.env")
}

err := shared.LoadEnvironment()

if err != nil {
log.Printf("Failed to load env file, benchmarks will not run: %s", err)
return
}

os.Exit(m.Run())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## Generating

`go test -memprofile mem.out -bench .`

## Visualizing

Run:
* `sudo apt install graphviz`
* `go tool pprof -http localhost:8000 -base mem.out.before_fix mem.out.after_fix`
13 changes: 7 additions & 6 deletions sdk/messaging/azservicebus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
"github.com/Azure/go-amqp"
)

Expand Down Expand Up @@ -126,10 +127,9 @@ type ReceivedMessage struct {
// and Header fields.
RawAMQPMessage *AMQPAnnotatedMessage

// deferred indicates we received it using ReceiveDeferredMessages. These messages
// will still go through the normal Receiver.Settle functions but internally will
// always be settled with the management link.
deferred bool
linkName string // used when we call into the management link. It counts towards a link not being considered idle.

settleOnMgmtLink bool // used for cases like when a message is received that was deferred. It can only be settled on the management link.
}

// Message creates a shallow copy of the fields from this message to an instance of
Expand Down Expand Up @@ -310,10 +310,11 @@ func (m *Message) toAMQPMessage() *amqp.Message {
// newReceivedMessage creates a received message from an AMQP message.
// NOTE: this converter assumes that the Body of this message will be the first
// serialized byte array in the Data section of the messsage.
func newReceivedMessage(amqpMsg *amqp.Message, receivingLinkName string) *ReceivedMessage {
func newReceivedMessage(amqpMsg *amqp.Message, receiver amqpwrap.AMQPReceiver) *ReceivedMessage {
msg := &ReceivedMessage{
RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg, receivingLinkName),
RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg, receiver.LinkName()),
State: MessageStateActive,
linkName: receiver.LinkName(),
}

if len(msg.RawAMQPMessage.Body.Data) == 1 {
Expand Down
Loading

0 comments on commit 7899ebe

Please sign in to comment.