Skip to content

Commit 7899ebe

Browse files
[azservicebus] Fixing a memory leak when doing cross receiver settlement. (#22368)
Fixing a memory leak when doing cross receiver settlement. go-amqp holds onto some tracking data that won't get cleared if we don't try to settle through the original Receiver. Our fix in here, combined with go-amqp's changes to route to the original receiver, should seal that up. Benchmark added that also doubles as a stress test. Fixes #22318
1 parent e62d2c1 commit 7899ebe

21 files changed

+470
-160
lines changed

sdk/messaging/azservicebus/amqp_message.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ type AMQPAnnotatedMessage struct {
5757
// Properties corresponds to the properties section of an AMQP message.
5858
Properties *AMQPAnnotatedMessageProperties
5959

60-
linkName string
61-
6260
// inner is the AMQP message we originally received, which contains some hidden
6361
// data that's needed to settle with go-amqp. We strip out most of the underlying
6462
// data so it's fairly minimal.
@@ -273,7 +271,6 @@ func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message, receivingLinkName stri
273271
DeliveryTag: goAMQPMessage.DeliveryTag,
274272
Footer: footer,
275273
Header: header,
276-
linkName: receivingLinkName,
277274
Properties: properties,
278275
inner: goAMQPMessage,
279276
}

sdk/messaging/azservicebus/client_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,12 @@ func TestNewClientWithAzureIdentity(t *testing.T) {
7676

7777
receiver, err := client.NewReceiverForQueue(queue, nil)
7878
require.NoError(t, err)
79-
actualSettler, _ := receiver.settler.(*messageSettler)
80-
actualSettler.onlyDoBackupSettlement = true // this'll also exercise the management link
8179

8280
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
8381
require.NoError(t, err)
8482

8583
require.EqualValues(t, []string{"hello - authenticating with a TokenCredential"}, getSortedBodies(messages))
84+
forceManagementSettlement(t, messages)
8685

8786
for _, m := range messages {
8887
err = receiver.CompleteMessage(context.TODO(), m, nil)
@@ -550,7 +549,7 @@ func TestNewClientUnitTests(t *testing.T) {
550549
MaxRetryDelay: 12 * time.Hour,
551550
}, receiver.retryOptions)
552551

553-
actualSettler := receiver.settler.(*messageSettler)
552+
actualSettler := receiver.settler
554553

555554
require.Equal(t, RetryOptions{
556555
MaxRetries: 101,
@@ -580,3 +579,9 @@ func assertRPCNotFound(t *testing.T, err error) {
580579
require.ErrorAs(t, err, &rpcError)
581580
require.Equal(t, http.StatusNotFound, rpcError.RPCCode())
582581
}
582+
583+
func forceManagementSettlement(t *testing.T, messages []*ReceivedMessage) {
584+
for _, m := range messages {
585+
m.settleOnMgmtLink = true
586+
}
587+
}

sdk/messaging/azservicebus/internal/mgmt.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,10 +374,10 @@ func SetSessionState(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName str
374374
return nil
375375
}
376376

377-
// SendDisposition allows you settle a message using the management link, rather than via your
377+
// SettleOnMgmtLink allows you settle a message using the management link, rather than via your
378378
// *amqp.Receiver. Use this if the receiver has been closed/lost or if the message isn't associated
379379
// with a link (ex: deferred messages).
380-
func SendDisposition(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]any) error {
380+
func SettleOnMgmtLink(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]any) error {
381381
if lockToken == nil {
382382
err := errors.New("lock token on the message is not set, thus cannot send disposition")
383383
return err

sdk/messaging/azservicebus/internal/mock/emulation/events.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ const (
6868
DispositionTypeAccept DispositionType = "accept"
6969
DispositionTypeReject DispositionType = "reject"
7070
DispositionTypeRelease DispositionType = "release"
71+
DispositionTypeModify DispositionType = "modify" // used for abandoning a message
7172
)
7273

7374
type DispositionEvent struct {
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
dependencies:
22
- name: stress-test-addons
33
repository: https://stresstestcharts.blob.core.windows.net/helm/
4-
version: 0.3.0
5-
digest: sha256:3e21a7fdf5d6b37e871a6dd9f755888166fbb24802aa517f51d1d9223b47656e
6-
generated: "2023-09-26T11:43:56.706771668-07:00"
4+
version: 0.3.1
5+
digest: sha256:28e374f8db5c46447b2a1491d4361ceb126536c425cbe54be49017120fe7b27d
6+
generated: "2024-02-05T17:21:31.510400504-08:00"

sdk/messaging/azservicebus/internal/stress/Dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ ENV CGO_ENABLED=0
66
ADD . /src
77
WORKDIR /src/internal/stress
88
RUN go build -o stress .
9+
WORKDIR /src/internal/stress/tests/benchmarks
10+
RUN go test -c
911

1012
# The first container is just for building the artifacts, and contains all the source, etc...
1113
# That container instance only ever lives on your local machine (or the build machine).
@@ -15,5 +17,6 @@ RUN go build -o stress .
1517
FROM mcr.microsoft.com/cbl-mariner/base/core:2.0
1618
WORKDIR /app
1719
COPY --from=build /src/internal/stress/stress /app/stress
20+
COPY --from=build /src/internal/stress/tests/benchmarks/benchmarks.test /app/benchmarks.test
1821
RUN yum update -y && yum install -y ca-certificates
1922
ENTRYPOINT ["/bin/bash"]

sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
displayNames:
2-
# this makes it so these don't show up in the scenario names,
2+
# this makes it so these don't show up in the scenario names,
33
# since they're just clutter.
44
1.5Gi": ""
55
4Gi": ""
@@ -23,7 +23,7 @@ matrix:
2323
testTarget: finitePeeks
2424
memory: "0.5Gi"
2525
finiteSendAndReceive:
26-
testTarget: finiteSendAndReceive
26+
testTarget: finiteSendAndReceive
2727
memory: "0.5Gi"
2828
finiteSessions:
2929
testTarget: finiteSessions
@@ -52,10 +52,14 @@ matrix:
5252
memory: "0.5Gi"
5353
rapidOpenClose:
5454
testTarget: rapidOpenClose
55-
memory: "0.5Gi"
55+
memory: "0.5Gi"
5656
receiveCancellation:
5757
testTarget: receiveCancellation
5858
memory: "0.5Gi"
5959
sendAndReceiveDrain:
6060
testTarget: sendAndReceiveDrain
6161
memory: "0.5Gi"
62+
benchmarkBackupSettlementLeak:
63+
benchmark: true
64+
testTarget: "BenchmarkBackupSettlementLeakWhileOldReceiverStillAlive"
65+
memory: "1.0Gi"

sdk/messaging/azservicebus/internal/stress/shared/streaming_batch.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,25 @@ func (sw *senderWrapper) NewMessageBatch(ctx context.Context, options *azservice
3434
return sw.inner.NewMessageBatch(ctx, options)
3535
}
3636

37-
func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender) (*StreamingMessageBatch, error) {
37+
func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender, expectedTotal int) (*StreamingMessageBatch, error) {
3838
batch, err := sender.NewMessageBatch(ctx, nil)
3939

4040
if err != nil {
4141
return nil, err
4242
}
4343

4444
return &StreamingMessageBatch{
45-
sender: sender,
46-
currentBatch: batch,
45+
sender: sender,
46+
currentBatch: batch,
47+
expectedTotal: expectedTotal,
4748
}, nil
4849
}
4950

5051
type StreamingMessageBatch struct {
51-
sender internalBatchSender
52-
currentBatch internalBatch
52+
sender internalBatchSender
53+
currentBatch internalBatch
54+
expectedTotal int
55+
total int
5356
}
5457

5558
// Add appends to the current batch. If it's full it'll send it, allocate a new one.
@@ -65,11 +68,13 @@ func (sb *StreamingMessageBatch) Add(ctx context.Context, msg *azservicebus.Mess
6568
return err
6669
}
6770

68-
log.Printf("Sending message batch (%d messages)", sb.currentBatch.NumMessages())
71+
log.Printf("Sending message batch with %d messages. %d/%d messages sent so far.", sb.currentBatch.NumMessages(), sb.total, sb.expectedTotal)
6972
if err := sb.sender.SendMessageBatch(ctx, sb.currentBatch); err != nil {
7073
return err
7174
}
7275

76+
sb.total += int(sb.currentBatch.NumMessages())
77+
7378
// throttle a teeny bit.
7479
time.Sleep(time.Second)
7580

sdk/messaging/azservicebus/internal/stress/shared/utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func MustGenerateMessages(sc *StressContext, sender *TrackingSender, messageLimi
3030

3131
log.Printf("Sending %d messages", messageLimit)
3232

33-
streamingBatch, err := NewStreamingMessageBatch(ctx, &senderWrapper{inner: sender})
33+
streamingBatch, err := NewStreamingMessageBatch(ctx, &senderWrapper{inner: sender}, messageLimit)
3434
sc.PanicOnError("failed to create streaming batch", err)
3535

3636
extraBytes := make([]byte, numExtraBytes)

sdk/messaging/azservicebus/internal/stress/templates/stress-test-job.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ spec:
1717
- >
1818
set -ex;
1919
mkdir -p "$DEBUG_SHARE";
20+
{{ if ne .Stress.benchmark true }}
2021
/app/stress tests "{{ .Stress.testTarget }}" 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
22+
{{ else }}
23+
/app/benchmarks.test -test.timeout 24h -test.bench {{ .Stress.testTarget }} 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
24+
{{ end }}
2125
# Pulls the image on pod start, always. We tend to push to the same image and tag over and over again
2226
# when iterating, so this is a must.
2327
imagePullPolicy: Always

0 commit comments

Comments
 (0)