-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sender should only copy the payload when really needed #19683
Conversation
Thank you for your contribution @danielmarbach! We will review the pull request and get back to you soon. |
stream.CopyTo(memStream, StreamBufferSizeInBytes); | ||
|
||
return new ArraySegment<byte>(memStream.ToArray()); | ||
switch (stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19042
AMD Ryzen 9 3950X, 1 CPU, 32 logical and 16 physical cores
.NET Core SDK=5.0.201
[Host] : .NET Core 3.1.13 (CoreCLR 4.700.21.11102, CoreFX 4.700.21.11602), X64 RyuJIT
Job-AYYPIA : .NET Core 3.1.13 (CoreCLR 4.700.21.11102, CoreFX 4.700.21.11602), X64 RyuJIT
InvocationCount=800000
Method | Mean | Error | StdDev | Ratio | RatioSD | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|---|---|
ConvertViaMemoryStream | 306.6 ns | 6.13 ns | 15.94 ns | 1.00 | 0.00 | 0.1100 | - | - | 928 B |
PatternMatch | 132.8 ns | 1.13 ns | 0.88 ns | 0.48 | 0.04 | 0.0325 | - | - | 280 B |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using System;
using System.IO;
using System.Text;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Exporters;
using BenchmarkDotNet.Jobs;
using Microsoft.Azure.Amqp;
[Config(typeof(Config))]
public class BufferListStreamDirectly
{
private BufferListStream stream;
private AmqpMessage amqpMessage;
private class Config : ManualConfig
{
public Config()
{
AddExporter(MarkdownExporter.GitHub);
AddDiagnoser(MemoryDiagnoser.Default);
AddJob(Job.Default.WithInvocationCount(800000));
}
}
[IterationSetup]
public void Setup()
{
stream = new BufferListStream(new[] {new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello"))});
amqpMessage = AmqpMessage.Create(stream, false);
}
[IterationCleanup]
public void Cleanup()
{
amqpMessage.Dispose();
}
private const int StreamBufferSizeInBytes = 512;
[Benchmark(Baseline = true)]
public ArraySegment<byte> ConvertViaMemoryStream()
{
using var messageStream = amqpMessage.ToStream();
using var memStream = new MemoryStream(StreamBufferSizeInBytes);
messageStream.CopyTo(memStream, StreamBufferSizeInBytes);
return new ArraySegment<byte>(memStream.ToArray());
}
[Benchmark]
public ArraySegment<byte> PatternMatch()
{
using var messageStream = amqpMessage.ToStream();
return messageStream switch
{
BufferListStream bufferListStream => bufferListStream.ReadBytes((int) stream.Length),
_ => throw new InvalidOperationException()
};
}
}
yield return new Data | ||
{ | ||
Value = new ArraySegment<byte>(data.IsEmpty ? Array.Empty<byte>() : data.ToArray()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removes the need for ToArray
and copying the memory entirely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19042
AMD Ryzen 9 3950X, 1 CPU, 32 logical and 16 physical cores
.NET Core SDK=5.0.201
[Host] : .NET Core 3.1.13 (CoreCLR 4.700.21.11102, CoreFX 4.700.21.11602), X64 RyuJIT
Job-QXDBJK : .NET Core 3.1.13 (CoreCLR 4.700.21.11102, CoreFX 4.700.21.11602), X64 RyuJIT
InvocationCount=320000
Method | Elements | Mean | Error | StdDev | Ratio | RatioSD | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|---|---|---|
ToArray | 1 | 225.0 ns | 3.10 ns | 2.90 ns | 1.00 | 0.00 | 0.0406 | - | - | 344 B |
Marshal | 1 | 218.9 ns | 2.25 ns | 2.10 ns | 0.97 | 0.02 | 0.0344 | - | - | 304 B |
ToArray | 2 | 386.2 ns | 2.70 ns | 2.39 ns | 1.00 | 0.00 | 0.0594 | - | - | 512 B |
Marshal | 2 | 396.9 ns | 3.45 ns | 3.23 ns | 1.03 | 0.01 | 0.0500 | - | - | 432 B |
ToArray | 4 | 749.5 ns | 14.98 ns | 24.19 ns | 1.00 | 0.00 | 0.1000 | - | - | 848 B |
Marshal | 4 | 771.4 ns | 15.45 ns | 20.08 ns | 1.04 | 0.02 | 0.0813 | - | - | 688 B |
ToArray | 8 | 1,515.2 ns | 18.95 ns | 16.80 ns | 1.00 | 0.00 | 0.1813 | - | - | 1520 B |
Marshal | 8 | 1,508.8 ns | 29.63 ns | 27.72 ns | 1.00 | 0.02 | 0.1406 | - | - | 1200 B |
ToArray | 16 | 2,992.2 ns | 40.45 ns | 37.84 ns | 1.00 | 0.00 | 0.3469 | - | - | 2912 B |
Marshal | 16 | 2,990.0 ns | 37.82 ns | 35.38 ns | 1.00 | 0.01 | 0.2688 | - | - | 2272 B |
ToArray | 32 | 5,852.9 ns | 57.16 ns | 53.47 ns | 1.00 | 0.00 | 0.6844 | 0.0094 | - | 5728 B |
Marshal | 32 | 5,877.4 ns | 64.63 ns | 57.29 ns | 1.00 | 0.01 | 0.5313 | 0.0063 | - | 4448 B |
ToArray | 64 | 11,681.2 ns | 91.30 ns | 85.40 ns | 1.00 | 0.00 | 1.3563 | 0.0344 | - | 11360 B |
Marshal | 64 | 11,401.9 ns | 74.13 ns | 69.34 ns | 0.98 | 0.01 | 1.0500 | 0.0281 | - | 8800 B |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Exporters;
using BenchmarkDotNet.Jobs;
using Microsoft.Azure.Amqp;
[Config(typeof(Config))]
public class DataConversion
{
private IEnumerable<ReadOnlyMemory<byte>> data;
private class Config : ManualConfig
{
public Config()
{
AddExporter(MarkdownExporter.GitHub);
AddDiagnoser(MemoryDiagnoser.Default);
AddJob(Job.Default.WithInvocationCount(320000));
}
}
[Params(1, 2, 4, 8, 16, 32, 64)]
public int Elements { get; set; }
[IterationSetup]
public void Setup()
{
data = Enumerable.Range(0, Elements)
.Select(i => (ReadOnlyMemory<byte>) Encoding.UTF8.GetBytes($"Hello World {i}"));
}
[Benchmark(Baseline = true)]
public ArraySegment<byte>[] ToArray()
{
return ConvertBefore(data);
}
static ArraySegment<byte>[] ConvertBefore(IEnumerable<ReadOnlyMemory<byte>> allData)
{
return allData.Select(data => new ArraySegment<byte>(data.IsEmpty ? Array.Empty<byte>() : data.ToArray())).ToArray();
}
[Benchmark]
public ArraySegment<byte>[] Marshal()
{
return ConvertAfter(data);
}
static ArraySegment<byte>[] ConvertAfter(IEnumerable<ReadOnlyMemory<byte>> allData)
{
return allData.Select(data =>
{
ArraySegment<byte> segment;
if (!data.IsEmpty)
{
if (!MemoryMarshal.TryGetArray(data, out segment))
{
segment = new ArraySegment<byte>(data.ToArray());
}
}
else
{
segment = new ArraySegment<byte>(Array.Empty<byte>());
}
return segment;
}).ToArray();
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removes the need for
ToArray
and copying the memory entirely
This change LGTM.
{ | ||
case BufferListStream bufferListStream: | ||
return bufferListStream.ReadBytes((int)stream.Length); | ||
default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not have enough time to check whether we could even eliminate this branch. If we can always assume BufferListStream
we can even squeeze more out of this due to eliminated branching. So if anyone of the maintainers can answer me the question and save me a bunch of investigation I would love that :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like it currently is a BufferListStream, but the AmqpMessage.ToStream() method returns a Stream
, so I don't think we can take a dependency on that.
{ | ||
if (!MemoryMarshal.TryGetArray(data, out segment)) | ||
{ | ||
segment = new ArraySegment<byte>(data.ToArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is also for safety reasons so that we could fall back on the more allocation heavy path if needed
return new ArraySegment<byte>(memStream.ToArray()); | ||
switch (stream) | ||
{ | ||
case BufferListStream bufferListStream: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit uneasy about this as it isn't exposed as part of ToStream. Right now the code would always hit this branch, but if the implementation were to change in the AMQP lib, it would start hitting the default case which hasn't been fully tested. I think I would prefer if we didn't make this particular change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand. The default case is the existing code. Amqp lib in place uses BufferedListStream alll over the place. This lib also uses it internally. So if the lib switches away from it it is anyway a major breaking change. So the default case is fully tested. Can you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm saying that if we make this change, the "default" branch will no longer be hit - we will only be hitting the BufferListStream case. I agree that it is currently being tested, but with this change it will no longer be tested going forward. If we just removed the default case, then changes to the AMQP library to return a different stream (which in theory wouldn't necessarily be breaking since the API says that it returns a Stream) would cause the code to break. In other words, I don't agree that the lib changing the ToStream method to return a different stream implementation would be considered a breaking change. Of course, removing the type or making another breaking change to it, would be considered a breaking change.
The places where we use BufferListStream are in APIs that actually require a BufferListStream (there also are a couple places where we use it internally, but we wouldn't be broken unless the BufferListStream made a breaking change). I don't think there are any places where we use it when the API expects or returns a Stream, and we just use the fact that we know it is actually implemented with a BufferListStream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then the best way to move forward would be to throw on default. Since we take a more or less exact dependency to a specific version of MS Amqp and release new versions of the SDK when that dependency is updated any potential problem would be caught anyway.
I know this code is a liskov violator but I think that is a reasonable tradeoff given the constraints this lib operates on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd agree that throwing on default would be better than just leaving in default, but I also think it would be even better to just not do this change 😄
I think it would be somewhat of a dangerous precedent to take for us to start peeking into the implementation of APIs to make optimizations (though maybe we can think about making an exception here?).
I'd like to hear what others think about this change as well.
/cc @jsquire @pakrym
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can see if the AmqpLib can add a ToBufferStream method..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hehe. I'm not going to fight for it 🤣 I can easily isolate the other change into a dedicated PR if needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that would be great! We can discuss this specific change further, but the other one should be good to go.
All SDK Contribution checklist:
This checklist is used to make sure that common guidelines for a pull request are followed.
Draft
mode if it is:General Guidelines and Best Practices
Testing Guidelines
SDK Generation Guidelines
*.csproj
andAssemblyInfo.cs
files have been updated with the new version of the SDK. Please double check nuget.org current release version.Additional management plane SDK specific contribution checklist:
Note: Only applies to
Microsoft.Azure.Management.[RP]
orAzure.ResourceManager.[RP]
Management plane SDK Troubleshooting
new service
label and/or contact assigned reviewer.Verify Code Generation
step, please ensure:generate.ps1/cmd
to generate this PR instead of callingautorest
directly.Please pay attention to the @microsoft.csharp version output after running generate.ps1. If it is lower than current released version (2.3.82), please run it again as it should pull down the latest version,
Old outstanding PR cleanup
Please note:
If PRs (including draft) has been out for more than 60 days and there are no responses from our query or followups, they will be closed to maintain a concise list for our reviewers.