Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AmqpMessageConverter cleanup and dispose of message #19822

Closed
wants to merge 2 commits into from

Conversation

danielmarbach
Copy link
Contributor

Additional split of #19683

All SDK Contribution checklist:

This checklist is used to make sure that common guidelines for a pull request are followed.

  • Please open PR in Draft mode if it is:
    • Work in progress or not intended to be merged.
    • Encountering multiple pipeline failures and working on fixes.
  • If an SDK is being regenerated based on a new swagger spec, a link to the pull request containing these swagger spec changes has been included above.
  • I have read the contribution guidelines.
  • The pull request does not introduce breaking changes.

General Guidelines and Best Practices

  • Title of the pull request is clear and informative.
  • There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, see this page.

Testing Guidelines

  • Pull request includes test coverage for the included changes.

SDK Generation Guidelines

  • The generate.cmd file for the SDK has been updated with the version of AutoRest, as well as the commitid of your swagger spec or link to the swagger spec, used to generate the code. (Track 2 only)
  • The *.csproj and AssemblyInfo.cs files have been updated with the new version of the SDK. Please double check nuget.org current release version.

Additional management plane SDK specific contribution checklist:

Note: Only applies to Microsoft.Azure.Management.[RP] or Azure.ResourceManager.[RP]

  • Include updated management metadata.
  • Update AzureRP.props to add/remove version info to maintain up to date API versions.

Management plane SDK Troubleshooting

  • If this is very first SDK for a services and you are adding new service folders directly under /SDK, please add new service label and/or contact assigned reviewer.
  • If the check fails at the Verify Code Generation step, please ensure:
    • Do not modify any code in generated folders.
    • Do not selectively include/remove generated files in the PR.
    • Do use generate.ps1/cmd to generate this PR instead of calling autorest directly.
      Please pay attention to the @microsoft.csharp version output after running generate.ps1. If it is lower than current released version (2.3.82), please run it again as it should pull down the latest version,

Old outstanding PR cleanup

Please note:
If PRs (including draft) has been out for more than 60 days and there are no responses from our query or followups, they will be closed to maintain a concise list for our reviewers.

@ghost ghost added Service Bus customer-reported Issues that are reported by GitHub users external to the Azure organization. labels Mar 25, 2021
@ghost
Copy link

ghost commented Mar 25, 2021

Thank you for your contribution @danielmarbach! We will review the pull request and get back to you soon.

@ghost ghost added the Community Contribution Community members are working on the issue label Mar 25, 2021
@@ -29,14 +28,14 @@ internal static class AmqpMessageConverter
/// <summary>The size, in bytes, to use as a buffer for stream operations.</summary>
private const int StreamBufferSizeInBytes = 512;

public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable<SBMessage> source, bool forceBatch = false)
public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable<ServiceBusMessage> source, bool forceBatch = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for killing the alias. I find it much more readable without it. 😄

@@ -92,8 +91,12 @@ private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable<SBMessage> sour
batchEnvelope = AmqpMessage.Create(batchMessages.Select(message =>
{
message.Batchable = true;
using var messageStream = message.ToStream();
return new Data { Value = ReadStreamToArraySegment(messageStream) };
// once the data is packaged the temporary message is no longer required
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that I'm fully comfortable with the flow of control here; BuildAmqpBatchFromMessages assumes that it now has ownership of the batchMessages that were passed to it and should manage their life span. It is safe in this instance because we have a single call site and we're doing a short-lived transform when calling this method, but it definitely isn't a clear contract to callers.

I'm not opposed, as we definitely should have been disposing them and I can't think of a clean way to do this inline within the Select of the call site. The alternative would likely be refactoring the call site to use a temporary set and then enumerate it again after the call here just to walk through the set and dispose all of the messages.

If we decide to stick with this approach, can we please add a <remark> to BuildAmqpBatchFromMessages making it clear that ownership of the batchMessages is transferred and the life span will be managed internally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it bothered me as well but I wanted to be mindful to not make too many changes. What about this then?

        private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable<ServiceBusMessage> source, bool forceBatch)
        {
            AmqpMessage firstAmqpMessage = null;
            ServiceBusMessage firstMessage = null;
            List<AmqpMessage> amqpMessagesForBatching = new List<AmqpMessage>();
            foreach (var sbMessage in source)
            {
                if (firstAmqpMessage == null)
                {
                    firstAmqpMessage = SBMessageToAmqpMessage(sbMessage);
                    firstMessage = sbMessage;
                    amqpMessagesForBatching.Add(firstAmqpMessage);
                }
                else
                {
                    amqpMessagesForBatching.Add(SBMessageToAmqpMessage(sbMessage));
                }
            }

            var amqpMessageBatch = BuildAmqpBatchFromMessages(amqpMessagesForBatching, firstMessage, forceBatch);
            amqpMessagesForBatching.ForEach(m => m.Dispose());
            return amqpMessageBatch;
        }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a reasonable alternative; it's a trade-off between the extra allocation and enumeration versus having the ownership transfer. I'm on the fence, honestly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I can add more branches to it if needed but I thought since the original one also always does a ToList we also have the list allocation in the empty case don't we?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, we do. The more I think on this, the more I think it makes sense to go with your current approach and just comment the method docs and at the call site. It's a more efficient path and I don't see this as an area that is going to have high code churn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was trivial to write a kind of a benchmark.

BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19042
AMD Ryzen Threadripper 1920X, 1 CPU, 24 logical and 12 physical cores
.NET Core SDK=5.0.200
  [Host]     : .NET Core 3.1.12 (CoreCLR 4.700.21.6504, CoreFX 4.700.21.6905), X64 RyuJIT
  Job-RECHSF : .NET Core 3.1.12 (CoreCLR 4.700.21.6504, CoreFX 4.700.21.6905), X64 RyuJIT

InvocationCount=320000  
Method Elements Mean Error StdDev Median Ratio RatioSD Gen 0 Gen 1 Gen 2 Allocated
Linq 0 132.69 ns 2.516 ns 4.064 ns 132.53 ns 1.00 0.00 0.0625 - - 264 B
NoLinq 0 83.53 ns 1.652 ns 1.545 ns 83.57 ns 0.63 0.02 0.0375 - - 168 B
Linq 2 242.37 ns 4.734 ns 3.954 ns 243.15 ns 1.00 0.00 0.1500 - - 632 B
NoLinq 2 233.91 ns 6.289 ns 18.543 ns 239.50 ns 0.94 0.09 0.1063 - - 456 B
Linq 4 330.08 ns 6.388 ns 7.604 ns 327.87 ns 1.00 0.00 0.1906 - - 808 B
NoLinq 4 360.18 ns 7.960 ns 22.710 ns 364.13 ns 1.00 0.05 0.1469 - - 616 B
Linq 8 441.22 ns 5.757 ns 5.104 ns 441.21 ns 1.00 0.00 0.2750 - - 1160 B
NoLinq 8 516.35 ns 8.582 ns 7.608 ns 516.58 ns 1.17 0.02 0.2438 - - 1024 B
Linq 16 855.97 ns 17.332 ns 50.558 ns 869.16 ns 1.00 0.00 0.4438 - - 1864 B
NoLinq 16 973.34 ns 19.425 ns 48.376 ns 985.88 ns 1.15 0.04 0.4313 - - 1816 B

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using System;
using System.Collections.Generic;
using System.Linq;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Engines;
using BenchmarkDotNet.Exporters;
using BenchmarkDotNet.Jobs;

namespace MicroBenchmarks.ServiceBus
{
    [Config(typeof(Config))]
    public class BatchMessagesLinqVsNoLinq
    {
        private IEnumerable<object> data;
        private Consumer consumer;

        private class Config : ManualConfig
        {
            public Config()
            {
                AddExporter(MarkdownExporter.GitHub);
                AddDiagnoser(MemoryDiagnoser.Default);
                AddJob(Job.Default.WithInvocationCount(320000));
            }
        }

        [Params(0, 2, 4, 8, 16)]
        public int Elements { get; set; }

        [IterationSetup]
        public void Setup()
        {
            consumer = new Consumer();
            data = Enumerable.Range(0, Elements)
                .Select(i => new object());
        }

        [Benchmark(Baseline = true)]
        public object Linq()
        {
            return BuildAmqpBatchFromMessageLinq(data, true);
        }

        object BuildAmqpBatchFromMessageLinq(IEnumerable<object> source, bool forceBatch)
        {
            IDisposable firstAmqpMessage = null;
            object firstMessage = null;

            return BuildAmqpBatchFromMessagesLinq(
                source.Select(sbMessage =>
                {
                    if (firstAmqpMessage == null)
                    {
                        firstAmqpMessage = new SomeDisposable();
                        firstMessage = sbMessage;
                        return firstAmqpMessage;
                    }
                    else
                    {
                        return new SomeDisposable();
                    }
                }).ToList(), firstMessage, forceBatch);
        }

        object BuildAmqpBatchFromMessagesLinq(
            IList<IDisposable> batchMessages,
            object firstMessage,
            bool forceBatch)
        {
            object batchEnvelope;

            if (batchMessages.Count == 1 && !forceBatch)
            {
                batchEnvelope = batchMessages[0];
            }
            else
            {
                batchEnvelope = batchMessages.Select(m =>
                {
                    using (m)
                    {
                        consumer.Consume<object>(m);
                        return new object();
                    }
                }).ToArray();
            }

            return batchEnvelope;
        }

        class SomeDisposable : IDisposable
        {
            public void Dispose()
            {
            }
        }

        [Benchmark]
        public object NoLinq()
        {
            return BuildAmqpBatchFromMessageNoLinq(data, true);
        }

        object BuildAmqpBatchFromMessageNoLinq(IEnumerable<object> source, bool forceBatch)
        {
            IDisposable firstAmqpMessage = null;
            object firstMessage = null;
            List<IDisposable> amqpMessagesForBatching = new List<IDisposable>();
            foreach (var sbMessage in source)
            {
                if (firstAmqpMessage == null)
                {
                    firstAmqpMessage = new SomeDisposable();
                    firstMessage = sbMessage;
                    amqpMessagesForBatching.Add(firstAmqpMessage);
                }
                else
                {
                    amqpMessagesForBatching.Add(new SomeDisposable());
                }
            }

            var amqpMessageBatch = BuildAmqpBatchFromMessagesNoLinq(amqpMessagesForBatching, firstMessage, forceBatch);
            amqpMessagesForBatching.ForEach(m => m.Dispose());
            return amqpMessageBatch;
        }

        object BuildAmqpBatchFromMessagesNoLinq(
            IReadOnlyList<IDisposable> batchMessages,
            object firstMessage,
            bool forceBatch)
        {
            object batchEnvelope;

            if (batchMessages.Count == 1 && !forceBatch)
            {
                batchEnvelope = batchMessages[0];
            }
            else
            {
                batchEnvelope = batchMessages.Select(m =>
                {
                    consumer.Consume<object>(m);
                    return new object();
                }).ToArray();
            }

            return batchEnvelope;
        }
    }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing that; I should have gone down that path rather than making the assumption. Things seem to be close enough to be reasonably equivalent, which makes the argument that we should take the more readable path with fewer surprises, I'd think. Your proposed alternative looks good. We may want to consider shifting the Dispose calls to a finally block just to be safe in the face of exceptions.

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is interesting that for larger number of messages (4, 8, 16) it actually becomes a bit slower with nolinq. The memory usage is still lower though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsquire I pushed the finally stuff

@danielmarbach
Copy link
Contributor Author

Anything you want me to address here before this is ready?

@JoshLove-msft
Copy link
Member

/azp run net - servicebus - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

}
finally
{
foreach (var amqpMessage in amqpMessagesForBatching)
Copy link
Member

@JoshLove-msft JoshLove-msft Mar 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can dispose these since the underlying buffer gets used in the AmqpMessage that is sent. The live tests are failing with these errors:

  Error Message:
   System.ObjectDisposedException : Cannot access a disposed object.
Object name: 'AmqpDataMessage'.
  Stack Trace:
     at Microsoft.Azure.Amqp.AmqpMessage.ThrowIfDisposed()
   at Microsoft.Azure.Amqp.SendingAmqpLink.BeginSendMessage(AmqpMessage message, ArraySegment`1 deliveryTag, ArraySegment`1 txnId, TimeSpan timeout, AsyncCallback callback, Object state)
   at Microsoft.Azure.Amqp.SendingAmqpLink.<>c__DisplayClass11_0.<SendMessageAsync>b__0(AsyncCallback c, Object s)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncImpl(Func`3 beginMethod, Func`2 endFunction, Action`1 endAction, Object state, TaskCreationOptions creationOptions)
   at Microsoft.Azure.Amqp.SendingAmqpLink.SendMessageAsync(AmqpMessage message, ArraySegment`1 deliveryTag, ArraySegment`1 txnId, TimeSpan timeout)
   at Azure.Messaging.ServiceBus.Amqp.AmqpSender.SendBatchInternalAsync(IEnumerable`1 messages, TimeSpan timeout, CancellationToken cancellationToken) in /_/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs:line 270
   at Azure.Messaging.ServiceBus.Amqp.AmqpSender.SendBatchInternalAsync(IEnumerable`1 messages, TimeSpan timeout, CancellationToken cancellationToken) in /_/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs:line 286
   at Azure.Messaging.ServiceBus.Amqp.AmqpSender.<>c.<<SendAsync>b__20_0>d.MoveNext() in /_/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs:line 313
--- End of stack trace from previous location where exception was thrown ---

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When adding messages to a ServiceBusMessageBatch, we create an envelope message and dispose that - https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs#L133

I think that this would handle properly disposing the buffer used by the actual message that is added to the envelope.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I missed that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As did I. Nice catch, Josh.

@danielmarbach danielmarbach mentioned this pull request Mar 30, 2021
11 tasks
@danielmarbach
Copy link
Contributor Author

Moved out the alias cleanup into a dedicated PR

#19938

@danielmarbach danielmarbach deleted the extensions-cleanup branch March 30, 2021 10:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Community Contribution Community members are working on the issue customer-reported Issues that are reported by GitHub users external to the Azure organization. Service Bus
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants