Skip to content

Commit

Permalink
[Messaging] Update message size on link creation (#44328)
Browse files Browse the repository at this point in the history
The focus of these changes is to invalidate the
cached maximum message size when the link is created,
and populate with an updated value from the new link.

This is necessary for large message support, where
the maximum message size for entities can be
reconfigred adjusted on the fly.  Because the client
had cached the value, it would not be aware of the
change and would enforce the wrong size for batch
creation.
  • Loading branch information
jsquire authored and benbp committed Jun 5, 2024
1 parent b5fc321 commit 0e9da53
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 26 deletions.
2 changes: 2 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

### Other Changes

- The client will now refresh the maximum message size each time a new AMQP link is opened; this is necessary for large message support, where the maximum message size for entities can be reconfigureed adjusted on the fly. Because the client had cached the value, it would not be aware of the change and would enforce the wrong size for batch creation.

## 5.12.0-beta.1 (2024-05-17)

### Features Added
Expand Down
29 changes: 16 additions & 13 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -592,19 +592,22 @@ protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAs
{
link = await ConnectionScope.OpenProducerLinkAsync(partitionId, ActiveFeatures, partitionOptions, operationTimeout, timeout, producerIdentifier, cancellationToken).ConfigureAwait(false);

if (!MaximumMessageSize.HasValue)
{
// This delay is necessary to prevent the link from causing issues for subsequent
// operations after creating a batch. Without it, operations using the link consistently
// timeout. The length of the delay does not appear significant, just the act of introducing
// an asynchronous delay.
//
// For consistency the value used by the legacy Event Hubs client has been brought forward and
// used here.

await Task.Delay(15, cancellationToken).ConfigureAwait(false);
MaximumMessageSize = (long)link.Settings.MaxMessageSize;
}
// Update the known maximum message size each time a link is opened, as the
// configuration can be changed on-the-fly and may not match the previously cached value.
//
// This delay is necessary to prevent the link from causing issues for subsequent
// operations after creating a batch. Without it, operations using the link consistently
// timeout. The length of the delay does not appear significant, just the act of introducing
// an asynchronous delay.
//
// For consistency the value used by the legacy Event Hubs client has been brought forward and
// used here.

await Task.Delay(15, cancellationToken).ConfigureAwait(false);
MaximumMessageSize = (long)link.Settings.MaxMessageSize;

// Unlike the maximum message size, the publishing properties will not change arbitrarily, so
// there is no need to update them each time a link is opened.

if (InitializedPartitionProperties == null)
{
Expand Down
2 changes: 2 additions & 0 deletions sdk/servicebus/Azure.Messaging.ServiceBus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

### Other Changes

- The client will now refresh the maximum message size each time a new AMQP link is opened; this is necessary for large message support, where the maximum message size for entities can be reconfigureed adjusted on the fly. Because the client had cached the value, it would not be aware of the change and would enforce the wrong size for batch creation.

- Updated the `Microsoft.Azure.Amqp` dependency to 2.6.7, which contains a fix for decoding messages with a null format code as the body.

## 7.18.0-beta.1 (2024-05-08)
Expand Down
27 changes: 14 additions & 13 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -602,19 +602,20 @@ protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureSenderStateAsyn
timeout: timeout,
cancellationToken: cancellationToken).ConfigureAwait(false);

if (!MaxMessageSize.HasValue)
{
// This delay is necessary to prevent the link from causing issues for subsequent
// operations after creating a batch. Without it, operations using the link consistently
// timeout. The length of the delay does not appear significant, just the act of introducing
// an asynchronous delay.
//
// For consistency the value used by the legacy Service Bus client has been brought forward and
// used here.

await Task.Delay(15, cancellationToken).ConfigureAwait(false);
MaxMessageSize = (long)link.Settings.MaxMessageSize;
}
// Update the known maximum message size each time a link is opened, as the
// configuration can be changed on-the-fly and may not match the previously cached value.
//
// This delay is necessary to prevent the link from causing issues for subsequent
// operations after creating a batch. Without it, operations using the link consistently
// timeout. The length of the delay does not appear significant, just the act of introducing
// an asynchronous delay.
//
// For consistency the value used by the legacy Service Bus client has been brought forward and
// used here.

await Task.Delay(15, cancellationToken).ConfigureAwait(false);
MaxMessageSize = (long)link.Settings.MaxMessageSize;

ServiceBusEventSource.Log.CreateSendLinkComplete(Identifier);
link.Closed += OnSenderLinkClosed;
return link;
Expand Down

0 comments on commit 0e9da53

Please sign in to comment.