-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AmqpMessageConverter cleanup and dispose of message #19822
Conversation
Thank you for your contribution @danielmarbach! We will review the pull request and get back to you soon. |
@@ -29,14 +28,14 @@ internal static class AmqpMessageConverter | |||
/// <summary>The size, in bytes, to use as a buffer for stream operations.</summary> | |||
private const int StreamBufferSizeInBytes = 512; | |||
|
|||
public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable<SBMessage> source, bool forceBatch = false) | |||
public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable<ServiceBusMessage> source, bool forceBatch = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for killing the alias. I find it much more readable without it. 😄
@@ -92,8 +91,12 @@ private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable<SBMessage> sour | |||
batchEnvelope = AmqpMessage.Create(batchMessages.Select(message => | |||
{ | |||
message.Batchable = true; | |||
using var messageStream = message.ToStream(); | |||
return new Data { Value = ReadStreamToArraySegment(messageStream) }; | |||
// once the data is packaged the temporary message is no longer required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that I'm fully comfortable with the flow of control here; BuildAmqpBatchFromMessages
assumes that it now has ownership of the batchMessages
that were passed to it and should manage their life span. It is safe in this instance because we have a single call site and we're doing a short-lived transform when calling this method, but it definitely isn't a clear contract to callers.
I'm not opposed, as we definitely should have been disposing them and I can't think of a clean way to do this inline within the Select
of the call site. The alternative would likely be refactoring the call site to use a temporary set and then enumerate it again after the call here just to walk through the set and dispose all of the messages.
If we decide to stick with this approach, can we please add a <remark>
to BuildAmqpBatchFromMessages
making it clear that ownership of the batchMessages
is transferred and the life span will be managed internally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it bothered me as well but I wanted to be mindful to not make too many changes. What about this then?
private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable<ServiceBusMessage> source, bool forceBatch)
{
AmqpMessage firstAmqpMessage = null;
ServiceBusMessage firstMessage = null;
List<AmqpMessage> amqpMessagesForBatching = new List<AmqpMessage>();
foreach (var sbMessage in source)
{
if (firstAmqpMessage == null)
{
firstAmqpMessage = SBMessageToAmqpMessage(sbMessage);
firstMessage = sbMessage;
amqpMessagesForBatching.Add(firstAmqpMessage);
}
else
{
amqpMessagesForBatching.Add(SBMessageToAmqpMessage(sbMessage));
}
}
var amqpMessageBatch = BuildAmqpBatchFromMessages(amqpMessagesForBatching, firstMessage, forceBatch);
amqpMessagesForBatching.ForEach(m => m.Dispose());
return amqpMessageBatch;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's a reasonable alternative; it's a trade-off between the extra allocation and enumeration versus having the ownership transfer. I'm on the fence, honestly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well I can add more branches to it if needed but I thought since the original one also always does a ToList
we also have the list allocation in the empty case don't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point, we do. The more I think on this, the more I think it makes sense to go with your current approach and just comment the method docs and at the call site. It's a more efficient path and I don't see this as an area that is going to have high code churn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was trivial to write a kind of a benchmark.
BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19042
AMD Ryzen Threadripper 1920X, 1 CPU, 24 logical and 12 physical cores
.NET Core SDK=5.0.200
[Host] : .NET Core 3.1.12 (CoreCLR 4.700.21.6504, CoreFX 4.700.21.6905), X64 RyuJIT
Job-RECHSF : .NET Core 3.1.12 (CoreCLR 4.700.21.6504, CoreFX 4.700.21.6905), X64 RyuJIT
InvocationCount=320000
Method | Elements | Mean | Error | StdDev | Median | Ratio | RatioSD | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|---|---|---|---|
Linq | 0 | 132.69 ns | 2.516 ns | 4.064 ns | 132.53 ns | 1.00 | 0.00 | 0.0625 | - | - | 264 B |
NoLinq | 0 | 83.53 ns | 1.652 ns | 1.545 ns | 83.57 ns | 0.63 | 0.02 | 0.0375 | - | - | 168 B |
Linq | 2 | 242.37 ns | 4.734 ns | 3.954 ns | 243.15 ns | 1.00 | 0.00 | 0.1500 | - | - | 632 B |
NoLinq | 2 | 233.91 ns | 6.289 ns | 18.543 ns | 239.50 ns | 0.94 | 0.09 | 0.1063 | - | - | 456 B |
Linq | 4 | 330.08 ns | 6.388 ns | 7.604 ns | 327.87 ns | 1.00 | 0.00 | 0.1906 | - | - | 808 B |
NoLinq | 4 | 360.18 ns | 7.960 ns | 22.710 ns | 364.13 ns | 1.00 | 0.05 | 0.1469 | - | - | 616 B |
Linq | 8 | 441.22 ns | 5.757 ns | 5.104 ns | 441.21 ns | 1.00 | 0.00 | 0.2750 | - | - | 1160 B |
NoLinq | 8 | 516.35 ns | 8.582 ns | 7.608 ns | 516.58 ns | 1.17 | 0.02 | 0.2438 | - | - | 1024 B |
Linq | 16 | 855.97 ns | 17.332 ns | 50.558 ns | 869.16 ns | 1.00 | 0.00 | 0.4438 | - | - | 1864 B |
NoLinq | 16 | 973.34 ns | 19.425 ns | 48.376 ns | 985.88 ns | 1.15 | 0.04 | 0.4313 | - | - | 1816 B |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using System;
using System.Collections.Generic;
using System.Linq;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Engines;
using BenchmarkDotNet.Exporters;
using BenchmarkDotNet.Jobs;
namespace MicroBenchmarks.ServiceBus
{
[Config(typeof(Config))]
public class BatchMessagesLinqVsNoLinq
{
private IEnumerable<object> data;
private Consumer consumer;
private class Config : ManualConfig
{
public Config()
{
AddExporter(MarkdownExporter.GitHub);
AddDiagnoser(MemoryDiagnoser.Default);
AddJob(Job.Default.WithInvocationCount(320000));
}
}
[Params(0, 2, 4, 8, 16)]
public int Elements { get; set; }
[IterationSetup]
public void Setup()
{
consumer = new Consumer();
data = Enumerable.Range(0, Elements)
.Select(i => new object());
}
[Benchmark(Baseline = true)]
public object Linq()
{
return BuildAmqpBatchFromMessageLinq(data, true);
}
object BuildAmqpBatchFromMessageLinq(IEnumerable<object> source, bool forceBatch)
{
IDisposable firstAmqpMessage = null;
object firstMessage = null;
return BuildAmqpBatchFromMessagesLinq(
source.Select(sbMessage =>
{
if (firstAmqpMessage == null)
{
firstAmqpMessage = new SomeDisposable();
firstMessage = sbMessage;
return firstAmqpMessage;
}
else
{
return new SomeDisposable();
}
}).ToList(), firstMessage, forceBatch);
}
object BuildAmqpBatchFromMessagesLinq(
IList<IDisposable> batchMessages,
object firstMessage,
bool forceBatch)
{
object batchEnvelope;
if (batchMessages.Count == 1 && !forceBatch)
{
batchEnvelope = batchMessages[0];
}
else
{
batchEnvelope = batchMessages.Select(m =>
{
using (m)
{
consumer.Consume<object>(m);
return new object();
}
}).ToArray();
}
return batchEnvelope;
}
class SomeDisposable : IDisposable
{
public void Dispose()
{
}
}
[Benchmark]
public object NoLinq()
{
return BuildAmqpBatchFromMessageNoLinq(data, true);
}
object BuildAmqpBatchFromMessageNoLinq(IEnumerable<object> source, bool forceBatch)
{
IDisposable firstAmqpMessage = null;
object firstMessage = null;
List<IDisposable> amqpMessagesForBatching = new List<IDisposable>();
foreach (var sbMessage in source)
{
if (firstAmqpMessage == null)
{
firstAmqpMessage = new SomeDisposable();
firstMessage = sbMessage;
amqpMessagesForBatching.Add(firstAmqpMessage);
}
else
{
amqpMessagesForBatching.Add(new SomeDisposable());
}
}
var amqpMessageBatch = BuildAmqpBatchFromMessagesNoLinq(amqpMessagesForBatching, firstMessage, forceBatch);
amqpMessagesForBatching.ForEach(m => m.Dispose());
return amqpMessageBatch;
}
object BuildAmqpBatchFromMessagesNoLinq(
IReadOnlyList<IDisposable> batchMessages,
object firstMessage,
bool forceBatch)
{
object batchEnvelope;
if (batchMessages.Count == 1 && !forceBatch)
{
batchEnvelope = batchMessages[0];
}
else
{
batchEnvelope = batchMessages.Select(m =>
{
consumer.Consume<object>(m);
return new object();
}).ToArray();
}
return batchEnvelope;
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing that; I should have gone down that path rather than making the assumption. Things seem to be close enough to be reasonably equivalent, which makes the argument that we should take the more readable path with fewer surprises, I'd think. Your proposed alternative looks good. We may want to consider shifting the Dispose
calls to a finally
block just to be safe in the face of exceptions.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is interesting that for larger number of messages (4, 8, 16) it actually becomes a bit slower with nolinq. The memory usage is still lower though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jsquire I pushed the finally stuff
Anything you want me to address here before this is ready? |
/azp run net - servicebus - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
} | ||
finally | ||
{ | ||
foreach (var amqpMessage in amqpMessagesForBatching) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can dispose these since the underlying buffer gets used in the AmqpMessage that is sent. The live tests are failing with these errors:
Error Message:
System.ObjectDisposedException : Cannot access a disposed object.
Object name: 'AmqpDataMessage'.
Stack Trace:
at Microsoft.Azure.Amqp.AmqpMessage.ThrowIfDisposed()
at Microsoft.Azure.Amqp.SendingAmqpLink.BeginSendMessage(AmqpMessage message, ArraySegment`1 deliveryTag, ArraySegment`1 txnId, TimeSpan timeout, AsyncCallback callback, Object state)
at Microsoft.Azure.Amqp.SendingAmqpLink.<>c__DisplayClass11_0.<SendMessageAsync>b__0(AsyncCallback c, Object s)
at System.Threading.Tasks.TaskFactory`1.FromAsyncImpl(Func`3 beginMethod, Func`2 endFunction, Action`1 endAction, Object state, TaskCreationOptions creationOptions)
at Microsoft.Azure.Amqp.SendingAmqpLink.SendMessageAsync(AmqpMessage message, ArraySegment`1 deliveryTag, ArraySegment`1 txnId, TimeSpan timeout)
at Azure.Messaging.ServiceBus.Amqp.AmqpSender.SendBatchInternalAsync(IEnumerable`1 messages, TimeSpan timeout, CancellationToken cancellationToken) in /_/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs:line 270
at Azure.Messaging.ServiceBus.Amqp.AmqpSender.SendBatchInternalAsync(IEnumerable`1 messages, TimeSpan timeout, CancellationToken cancellationToken) in /_/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs:line 286
at Azure.Messaging.ServiceBus.Amqp.AmqpSender.<>c.<<SendAsync>b__20_0>d.MoveNext() in /_/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs:line 313
--- End of stack trace from previous location where exception was thrown ---
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When adding messages to a ServiceBusMessageBatch
, we create an envelope message and dispose that - https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs#L133
I think that this would handle properly disposing the buffer used by the actual message that is added to the envelope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I missed that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As did I. Nice catch, Josh.
Moved out the alias cleanup into a dedicated PR |
Additional split of #19683
All SDK Contribution checklist:
This checklist is used to make sure that common guidelines for a pull request are followed.
Draft
mode if it is:General Guidelines and Best Practices
Testing Guidelines
SDK Generation Guidelines
*.csproj
andAssemblyInfo.cs
files have been updated with the new version of the SDK. Please double check nuget.org current release version.Additional management plane SDK specific contribution checklist:
Note: Only applies to
Microsoft.Azure.Management.[RP]
orAzure.ResourceManager.[RP]
Management plane SDK Troubleshooting
new service
label and/or contact assigned reviewer.Verify Code Generation
step, please ensure:generate.ps1/cmd
to generate this PR instead of callingautorest
directly.Please pay attention to the @microsoft.csharp version output after running generate.ps1. If it is lower than current released version (2.3.82), please run it again as it should pull down the latest version,
Old outstanding PR cleanup
Please note:
If PRs (including draft) has been out for more than 60 days and there are no responses from our query or followups, they will be closed to maintain a concise list for our reviewers.