diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index 779fcbf2d5..fc9c98161b 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -3,11 +3,10 @@ const RabbitMQ.Client.AmqpTcpEndpoint.DefaultAmqpSslPort = 5671 -> int const RabbitMQ.Client.AmqpTcpEndpoint.UseDefaultPort = -1 -> int const RabbitMQ.Client.ConnectionFactory.DefaultChannelMax = 2047 -> ushort const RabbitMQ.Client.ConnectionFactory.DefaultFrameMax = 0 -> uint -const RabbitMQ.Client.ConnectionFactory.DefaultMaxMessageSize = 134217728 -> uint +const RabbitMQ.Client.ConnectionFactory.DefaultMaxInboundMessageBodySize = 67108864 -> uint const RabbitMQ.Client.ConnectionFactory.DefaultPass = "guest" -> string const RabbitMQ.Client.ConnectionFactory.DefaultUser = "guest" -> string const RabbitMQ.Client.ConnectionFactory.DefaultVHost = "/" -> string -const RabbitMQ.Client.ConnectionFactory.MaximumMaxMessageSize = 536870912 -> uint const RabbitMQ.Client.Constants.AccessRefused = 403 -> int const RabbitMQ.Client.Constants.ChannelError = 504 -> int const RabbitMQ.Client.Constants.CommandInvalid = 503 -> int @@ -82,14 +81,13 @@ RabbitMQ.Client.AmqpTcpEndpoint.AddressFamily.set -> void RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint() -> void RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint(string hostName, int portOrMinusOne = -1) -> void RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl) -> void -RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl, uint maxMessageSize) -> void RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint(System.Uri uri) -> void RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint(System.Uri uri, RabbitMQ.Client.SslOption ssl) -> void RabbitMQ.Client.AmqpTcpEndpoint.Clone() -> object RabbitMQ.Client.AmqpTcpEndpoint.CloneWithHostname(string hostname) -> RabbitMQ.Client.AmqpTcpEndpoint RabbitMQ.Client.AmqpTcpEndpoint.HostName.get -> string RabbitMQ.Client.AmqpTcpEndpoint.HostName.set -> void -RabbitMQ.Client.AmqpTcpEndpoint.MaxMessageSize.get -> uint +RabbitMQ.Client.AmqpTcpEndpoint.MaxInboundMessageBodySize.get -> uint RabbitMQ.Client.AmqpTcpEndpoint.Port.get -> int RabbitMQ.Client.AmqpTcpEndpoint.Port.set -> void RabbitMQ.Client.AmqpTcpEndpoint.Protocol.get -> RabbitMQ.Client.IProtocol @@ -225,8 +223,8 @@ RabbitMQ.Client.ConnectionFactory.HandshakeContinuationTimeout.get -> System.Tim RabbitMQ.Client.ConnectionFactory.HandshakeContinuationTimeout.set -> void RabbitMQ.Client.ConnectionFactory.HostName.get -> string RabbitMQ.Client.ConnectionFactory.HostName.set -> void -RabbitMQ.Client.ConnectionFactory.MaxMessageSize.get -> uint -RabbitMQ.Client.ConnectionFactory.MaxMessageSize.set -> void +RabbitMQ.Client.ConnectionFactory.MaxInboundMessageBodySize.get -> uint +RabbitMQ.Client.ConnectionFactory.MaxInboundMessageBodySize.set -> void RabbitMQ.Client.ConnectionFactory.NetworkRecoveryInterval.get -> System.TimeSpan RabbitMQ.Client.ConnectionFactory.NetworkRecoveryInterval.set -> void RabbitMQ.Client.ConnectionFactory.Password.get -> string @@ -787,6 +785,7 @@ readonly RabbitMQ.Client.ConnectionConfig.HandshakeContinuationTimeout -> System readonly RabbitMQ.Client.ConnectionConfig.HeartbeatInterval -> System.TimeSpan readonly RabbitMQ.Client.ConnectionConfig.MaxChannelCount -> ushort readonly RabbitMQ.Client.ConnectionConfig.MaxFrameSize -> uint +readonly RabbitMQ.Client.ConnectionConfig.MaxInboundMessageBodySize -> uint readonly RabbitMQ.Client.ConnectionConfig.NetworkRecoveryInterval -> System.TimeSpan readonly RabbitMQ.Client.ConnectionConfig.Password -> string readonly RabbitMQ.Client.ConnectionConfig.RequestedConnectionTimeout -> System.TimeSpan @@ -884,6 +883,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~const RabbitMQ.Client.RabbitMQActivitySource.PublisherSourceName = "RabbitMQ.Client.Publisher" -> string ~const RabbitMQ.Client.RabbitMQActivitySource.SubscriberSourceName = "RabbitMQ.Client.Subscriber" -> string ~override RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task +~RabbitMQ.Client.AmqpTcpEndpoint.AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl, uint maxInboundMessageBodySize) -> void ~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(RabbitMQ.Client.IEndpointResolver endpointResolver, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable endpoints, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task diff --git a/projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs b/projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs index 42c3a49137..0e8b5934a2 100644 --- a/projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs +++ b/projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs @@ -62,7 +62,7 @@ public class AmqpTcpEndpoint private int _port; - private readonly uint _maxMessageSize; + private readonly uint _maxInboundMessageBodySize; /// /// Creates a new instance of the . @@ -70,14 +70,15 @@ public class AmqpTcpEndpoint /// Hostname. /// Port number. If the port number is -1, the default port number will be used. /// Ssl option. - /// Maximum message size from RabbitMQ. . It defaults to - /// MaximumMaxMessageSize if the parameter is greater than MaximumMaxMessageSize. - public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl, uint maxMessageSize) + /// Maximum message size from RabbitMQ. + public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl, + uint maxInboundMessageBodySize) { HostName = hostName; _port = portOrMinusOne; Ssl = ssl; - _maxMessageSize = Math.Min(maxMessageSize, ConnectionFactory.MaximumMaxMessageSize); + _maxInboundMessageBodySize = Math.Min(maxInboundMessageBodySize, + InternalConstants.DefaultRabbitMqMaxInboundMessageBodySize); } /// @@ -87,7 +88,7 @@ public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl, uint /// Port number. If the port number is -1, the default port number will be used. /// Ssl option. public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl) : - this(hostName, portOrMinusOne, ssl, ConnectionFactory.DefaultMaxMessageSize) + this(hostName, portOrMinusOne, ssl, ConnectionFactory.DefaultMaxInboundMessageBodySize) { } @@ -134,7 +135,7 @@ public AmqpTcpEndpoint(Uri uri) : this(uri.Host, uri.Port) /// A copy with the same hostname, port, and TLS settings public object Clone() { - return new AmqpTcpEndpoint(HostName, _port, Ssl, _maxMessageSize); + return new AmqpTcpEndpoint(HostName, _port, Ssl, _maxInboundMessageBodySize); } /// @@ -144,7 +145,7 @@ public object Clone() /// A copy with the provided hostname and port/TLS settings of this endpoint public AmqpTcpEndpoint CloneWithHostname(string hostname) { - return new AmqpTcpEndpoint(hostname, _port, Ssl, _maxMessageSize); + return new AmqpTcpEndpoint(hostname, _port, Ssl, _maxInboundMessageBodySize); } /// @@ -195,11 +196,11 @@ public IProtocol Protocol /// /// Get the maximum size for a message in bytes. - /// The default value is defined in . + /// The default value is defined in . /// - public uint MaxMessageSize + public uint MaxInboundMessageBodySize { - get { return _maxMessageSize; } + get { return _maxInboundMessageBodySize; } } /// diff --git a/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs b/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs index 3aba1f9ab2..7aeb12ce7d 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionConfig.cs @@ -90,6 +90,11 @@ public sealed class ConnectionConfig /// public readonly uint MaxFrameSize; + /// + /// Maximum body size of a message (in bytes). + /// + public readonly uint MaxInboundMessageBodySize; + /// /// Set to false to make automatic connection recovery not recover topology (exchanges, queues, bindings, etc). /// @@ -149,7 +154,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password, ICredentialsProvider credentialsProvider, ICredentialsRefresher credentialsRefresher, IEnumerable authMechanisms, IDictionary clientProperties, string? clientProvidedName, - ushort maxChannelCount, uint maxFrameSize, bool topologyRecoveryEnabled, + ushort maxChannelCount, uint maxFrameSize, uint maxInboundMessageBodySize, bool topologyRecoveryEnabled, TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler, TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout, bool dispatchConsumersAsync, int dispatchConsumerConcurrency, @@ -165,6 +170,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password, ClientProvidedName = clientProvidedName; MaxChannelCount = maxChannelCount; MaxFrameSize = maxFrameSize; + MaxInboundMessageBodySize = maxInboundMessageBodySize; TopologyRecoveryEnabled = topologyRecoveryEnabled; TopologyRecoveryFilter = topologyRecoveryFilter; TopologyRecoveryExceptionHandler = topologyRecoveryExceptionHandler; diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index 11d26869fa..1ca5b9c408 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -59,7 +59,7 @@ namespace RabbitMQ.Client /// factory.VirtualHost = ConnectionFactory.DefaultVHost; /// factory.HostName = hostName; /// factory.Port = AmqpTcpEndpoint.UseDefaultPort; - /// factory.MaxMessageSize = 512 * 1024 * 1024; + /// factory.MaxInboundMessageBodySize = 512 * 1024 * 1024; /// // /// IConnection conn = factory.CreateConnection(); /// // @@ -107,15 +107,9 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor public const uint DefaultFrameMax = 0; /// - /// Default value for ConnectionFactory's MaxMessageSize. + /// Default value for ConnectionFactory's MaxInboundMessageBodySize. /// - public const uint DefaultMaxMessageSize = 134217728; - /// - /// Largest message size, in bytes, allowed in RabbitMQ. - /// Note: rabbit.max_message_size setting (https://www.rabbitmq.com/configure.html) - /// configures the largest message size which should be lower than this maximum of 536 Mbs. - /// - public const uint MaximumMaxMessageSize = 536870912; + public const uint DefaultMaxInboundMessageBodySize = 1_048_576 * 64; /// /// Default value for desired heartbeat interval. Default is 60 seconds, @@ -291,13 +285,13 @@ public ConnectionFactory() /// public AmqpTcpEndpoint Endpoint { - get { return new AmqpTcpEndpoint(HostName, Port, Ssl, MaxMessageSize); } + get { return new AmqpTcpEndpoint(HostName, Port, Ssl, MaxInboundMessageBodySize); } set { Port = value.Port; HostName = value.HostName; Ssl = value.Ssl; - MaxMessageSize = value.MaxMessageSize; + MaxInboundMessageBodySize = value.MaxInboundMessageBodySize; } } @@ -359,7 +353,7 @@ public AmqpTcpEndpoint Endpoint /// Maximum allowed message size, in bytes, from RabbitMQ. /// Corresponds to the ConnectionFactory.DefaultMaxMessageSize setting. /// - public uint MaxMessageSize { get; set; } = DefaultMaxMessageSize; + public uint MaxInboundMessageBodySize { get; set; } = DefaultMaxInboundMessageBodySize; /// /// The uri to use for the connection. @@ -484,7 +478,7 @@ public Task CreateConnectionAsync(IEnumerable hostnames, public Task CreateConnectionAsync(IEnumerable hostnames, string clientProvidedName, CancellationToken cancellationToken = default) { - IEnumerable endpoints = hostnames.Select(h => new AmqpTcpEndpoint(h, Port, Ssl, MaxMessageSize)); + IEnumerable endpoints = hostnames.Select(h => new AmqpTcpEndpoint(h, Port, Ssl, MaxInboundMessageBodySize)); return CreateConnectionAsync(EndpointResolverFactory(endpoints), clientProvidedName, cancellationToken); } @@ -602,6 +596,7 @@ private ConnectionConfig CreateConfig(string clientProvidedName) clientProvidedName, RequestedChannelMax, RequestedFrameMax, + MaxInboundMessageBodySize, TopologyRecoveryEnabled, TopologyRecoveryFilter, TopologyRecoveryExceptionHandler, diff --git a/projects/RabbitMQ.Client/client/api/InternalConstants.cs b/projects/RabbitMQ.Client/client/api/InternalConstants.cs index 8ee85ae173..058d686389 100644 --- a/projects/RabbitMQ.Client/client/api/InternalConstants.cs +++ b/projects/RabbitMQ.Client/client/api/InternalConstants.cs @@ -37,5 +37,12 @@ internal static class InternalConstants { internal static readonly TimeSpan DefaultConnectionAbortTimeout = TimeSpan.FromSeconds(5); internal static readonly TimeSpan DefaultConnectionCloseTimeout = TimeSpan.FromSeconds(30); + + /// + /// Largest message size, in bytes, allowed in RabbitMQ. + /// Note: rabbit.max_message_size setting (https://www.rabbitmq.com/configure.html) + /// configures the largest message size which should be lower than this maximum of 128MiB. + /// + internal const uint DefaultRabbitMqMaxInboundMessageBodySize = 1_048_576 * 128; } } diff --git a/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs b/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs index f63e87b9a5..7a9cb9c519 100644 --- a/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs +++ b/projects/RabbitMQ.Client/client/impl/CommandAssembler.cs @@ -55,8 +55,11 @@ internal sealed class CommandAssembler private int _offset; private AssemblyState _state; - public CommandAssembler() + private readonly uint _maxBodyLength; + + public CommandAssembler(uint maxBodyLength) { + _maxBodyLength = maxBodyLength; Reset(); } @@ -152,6 +155,13 @@ private bool ParseHeaderFrame(in InboundFrame frame) { throw new UnexpectedFrameException(frame.Type); } + + if (totalBodyBytes > _maxBodyLength) + { + string msg = $"Frame body size '{totalBodyBytes}' exceeds maximum of '{_maxBodyLength}' bytes"; + throw new MalformedFrameException(message: msg, canShutdownCleanly: false); + } + _rentedHeaderArray = totalBodyBytes != 0 ? frame.TakeoverPayload() : Array.Empty(); _headerMemory = frame.Payload.Slice(12); diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs index c0ab30ac65..fb940ff863 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs @@ -171,7 +171,7 @@ await FinishCloseAsync(cancellationToken) } ushort channelMax = (ushort)NegotiatedMaxValue(_config.MaxChannelCount, connectionTune.m_channelMax); - _sessionManager = new SessionManager(this, channelMax); + _sessionManager = new SessionManager(this, channelMax, _config.MaxInboundMessageBodySize); uint frameMax = NegotiatedMaxValue(_config.MaxFrameSize, connectionTune.m_frameMax); FrameMax = frameMax; diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 9e3d12f6d0..7536a1d339 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -71,8 +71,8 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) _connectionUnblockedWrapper = new EventingWrapper("OnConnectionUnblocked", onException); _connectionShutdownWrapper = new EventingWrapper("OnShutdown", onException); - _sessionManager = new SessionManager(this, 0); - _session0 = new MainSession(this); + _sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize); + _session0 = new MainSession(this, config.MaxInboundMessageBodySize); _channel0 = new Channel(_config, _session0); ; ClientProperties = new Dictionary(_config.ClientProperties) diff --git a/projects/RabbitMQ.Client/client/impl/Frame.cs b/projects/RabbitMQ.Client/client/impl/Frame.cs index 5c25223481..2b252127e4 100644 --- a/projects/RabbitMQ.Client/client/impl/Frame.cs +++ b/projects/RabbitMQ.Client/client/impl/Frame.cs @@ -254,7 +254,8 @@ private static void ProcessProtocolHeader(ReadOnlySequence buffer) } } - internal static async ValueTask ReadFromPipeAsync(PipeReader reader, uint maxMessageSize, + internal static async ValueTask ReadFromPipeAsync(PipeReader reader, + uint maxInboundMessageBodySize, CancellationToken mainLoopCancellationToken) { ReadResult result = await reader.ReadAsync(mainLoopCancellationToken) @@ -266,7 +267,7 @@ internal static async ValueTask ReadFromPipeAsync(PipeReader reade InboundFrame frame; // Loop until we have enough data to read an entire frame, or until the pipe is completed. - while (!TryReadFrame(ref buffer, maxMessageSize, out frame)) + while (!TryReadFrame(ref buffer, maxInboundMessageBodySize, out frame)) { reader.AdvanceTo(buffer.Start, buffer.End); @@ -283,7 +284,8 @@ internal static async ValueTask ReadFromPipeAsync(PipeReader reade return frame; } - internal static bool TryReadFrameFromPipe(PipeReader reader, uint maxMessageSize, out InboundFrame frame) + internal static bool TryReadFrameFromPipe(PipeReader reader, + uint maxInboundMessageBodySize, out InboundFrame frame) { if (reader.TryRead(out ReadResult result)) { @@ -291,7 +293,7 @@ internal static bool TryReadFrameFromPipe(PipeReader reader, uint maxMessageSize MaybeThrowEndOfStream(result, buffer); - if (TryReadFrame(ref buffer, maxMessageSize, out frame)) + if (TryReadFrame(ref buffer, maxInboundMessageBodySize, out frame)) { reader.AdvanceTo(buffer.Start); return true; @@ -306,7 +308,8 @@ internal static bool TryReadFrameFromPipe(PipeReader reader, uint maxMessageSize return false; } - internal static bool TryReadFrame(ref ReadOnlySequence buffer, uint maxMessageSize, out InboundFrame frame) + internal static bool TryReadFrame(ref ReadOnlySequence buffer, + uint maxInboundMessageBodySize, out InboundFrame frame) { if (buffer.Length < 7) { @@ -332,9 +335,9 @@ internal static bool TryReadFrame(ref ReadOnlySequence buffer, uint maxMes FrameType type = (FrameType)firstByte; int channel = NetworkOrderDeserializer.ReadUInt16(buffer.Slice(1)); int payloadSize = NetworkOrderDeserializer.ReadInt32(buffer.Slice(3)); - if ((maxMessageSize > 0) && (payloadSize > maxMessageSize)) + if ((maxInboundMessageBodySize > 0) && (payloadSize > maxInboundMessageBodySize)) { - string msg = $"Frame payload size '{payloadSize}' exceeds maximum of '{maxMessageSize}' bytes"; + string msg = $"Frame payload size '{payloadSize}' exceeds maximum of '{maxInboundMessageBodySize}' bytes"; throw new MalformedFrameException(message: msg, canShutdownCleanly: false); } diff --git a/projects/RabbitMQ.Client/client/impl/MainSession.cs b/projects/RabbitMQ.Client/client/impl/MainSession.cs index c64b2ca60c..f18c86d14b 100644 --- a/projects/RabbitMQ.Client/client/impl/MainSession.cs +++ b/projects/RabbitMQ.Client/client/impl/MainSession.cs @@ -49,7 +49,8 @@ internal sealed class MainSession : Session, IDisposable private volatile bool _closing; private readonly SemaphoreSlim _closingSemaphore = new SemaphoreSlim(1, 1); - public MainSession(Connection connection) : base(connection, 0) + public MainSession(Connection connection, uint maxBodyLength) + : base(connection, 0, maxBodyLength) { } diff --git a/projects/RabbitMQ.Client/client/impl/Session.cs b/projects/RabbitMQ.Client/client/impl/Session.cs index 1fdd285c87..4f32a81097 100644 --- a/projects/RabbitMQ.Client/client/impl/Session.cs +++ b/projects/RabbitMQ.Client/client/impl/Session.cs @@ -40,9 +40,10 @@ internal class Session : SessionBase { private readonly CommandAssembler _assembler; - public Session(Connection connection, ushort channelNumber) : base(connection, channelNumber) + public Session(Connection connection, ushort channelNumber, uint maxBodyLength) + : base(connection, channelNumber) { - _assembler = new CommandAssembler(); + _assembler = new CommandAssembler(maxBodyLength); } public override async Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken) diff --git a/projects/RabbitMQ.Client/client/impl/SessionManager.cs b/projects/RabbitMQ.Client/client/impl/SessionManager.cs index a3071e472a..8181f9641a 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionManager.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionManager.cs @@ -39,14 +39,18 @@ namespace RabbitMQ.Client.Impl internal class SessionManager { public readonly ushort ChannelMax; + + private readonly uint _maxInboundMessageBodySize; private readonly IntAllocator _ints; private readonly Connection _connection; private readonly Dictionary _sessionMap = new Dictionary(); - public SessionManager(Connection connection, ushort channelMax) + public SessionManager(Connection connection, + ushort channelMax, uint maxInboundMessageBodySize) { _connection = connection; ChannelMax = (channelMax == 0) ? ushort.MaxValue : channelMax; + _maxInboundMessageBodySize = maxInboundMessageBodySize; _ints = new IntAllocator(1, ChannelMax); } @@ -71,7 +75,8 @@ public ISession Create() throw new ChannelAllocationException(); } - ISession session = new Session(_connection, (ushort)channelNumber); + ISession session = new Session(_connection, + (ushort)channelNumber, _maxInboundMessageBodySize); session.SessionShutdown += HandleSessionShutdown; _sessionMap[channelNumber] = session; return session; diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs index f221a294bd..6ae1681071 100644 --- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs @@ -280,12 +280,14 @@ await _pipeReader.CompleteAsync() public ValueTask ReadFrameAsync(CancellationToken mainLoopCancellationToken) { - return InboundFrame.ReadFromPipeAsync(_pipeReader, _amqpTcpEndpoint.MaxMessageSize, mainLoopCancellationToken); + return InboundFrame.ReadFromPipeAsync(_pipeReader, + _amqpTcpEndpoint.MaxInboundMessageBodySize, mainLoopCancellationToken); } public bool TryReadFrame(out InboundFrame frame) { - return InboundFrame.TryReadFrameFromPipe(_pipeReader, _amqpTcpEndpoint.MaxMessageSize, out frame); + return InboundFrame.TryReadFrameFromPipe(_pipeReader, + _amqpTcpEndpoint.MaxInboundMessageBodySize, out frame); } public async Task SendProtocolHeaderAsync(CancellationToken cancellationToken) diff --git a/projects/Test/Integration/Integration.csproj b/projects/Test/Integration/Integration.csproj index 1d532b9ff2..7322f4b3d4 100644 --- a/projects/Test/Integration/Integration.csproj +++ b/projects/Test/Integration/Integration.csproj @@ -16,7 +16,7 @@ ../../rabbit.snk true true - 7.3 + 8.0 diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index 7bab6cfb41..1578a9f277 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -174,19 +174,22 @@ public async Task CanNotModifyPayloadAfterPublish() } [Fact] - public async Task TestMaxMessageSize() + public async Task TestMaxInboundMessageBodySize() { - var re = new ManualResetEventSlim(); - const ushort maxMsgSize = 1024; + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + using var cts = new CancellationTokenSource(WaitSpan); + using CancellationTokenRegistration ctr = cts.Token.Register(() => tcs.SetCanceled()); + + const ushort maxMsgSize = 8192; int count = 0; byte[] msg0 = _encoding.GetBytes("hi"); byte[] msg1 = GetRandomBody(maxMsgSize * 2); - var cf = CreateConnectionFactory(); + ConnectionFactory cf = CreateConnectionFactory(); cf.AutomaticRecoveryEnabled = false; cf.TopologyRecoveryEnabled = false; - cf.MaxMessageSize = maxMsgSize; + cf.MaxInboundMessageBodySize = maxMsgSize; bool sawConnectionShutdown = false; bool sawChannelShutdown = false; @@ -200,9 +203,9 @@ public async Task TestMaxMessageSize() sawConnectionShutdown = true; }; - Assert.Equal(maxMsgSize, cf.MaxMessageSize); - Assert.Equal(maxMsgSize, cf.Endpoint.MaxMessageSize); - Assert.Equal(maxMsgSize, c.Endpoint.MaxMessageSize); + Assert.Equal(maxMsgSize, cf.MaxInboundMessageBodySize); + Assert.Equal(maxMsgSize, cf.Endpoint.MaxInboundMessageBodySize); + Assert.Equal(maxMsgSize, c.Endpoint.MaxInboundMessageBodySize); using (IChannel channel = await c.CreateChannelAsync()) { @@ -222,7 +225,7 @@ public async Task TestMaxMessageSize() consumer.Shutdown += (o, a) => { - re.Set(); + tcs.SetResult(true); }; consumer.Registered += (o, a) => @@ -249,7 +252,7 @@ public async Task TestMaxMessageSize() await channel.BasicPublishAsync("", q.QueueName, msg0); await channel.BasicPublishAsync("", q.QueueName, msg1); - Assert.True(re.Wait(TimeSpan.FromSeconds(5))); + Assert.True(await tcs.Task); Assert.Equal(1, count); Assert.True(sawConnectionShutdown); diff --git a/projects/Test/Integration/TestConnectionFactory.cs b/projects/Test/Integration/TestConnectionFactory.cs index 660517040e..8dfed9650c 100644 --- a/projects/Test/Integration/TestConnectionFactory.cs +++ b/projects/Test/Integration/TestConnectionFactory.cs @@ -65,7 +65,7 @@ public void TestProperties() string v = "vhost"; string h = "192.168.0.1"; int p = 5674; - uint mms = 512 * 1024 * 1024; + uint mms = 64 * 1024 * 1024; var cf = new ConnectionFactory { @@ -74,19 +74,19 @@ public void TestProperties() VirtualHost = v, HostName = h, Port = p, - MaxMessageSize = mms + MaxInboundMessageBodySize = mms }; - Assert.Equal(cf.UserName, u); - Assert.Equal(cf.Password, pw); - Assert.Equal(cf.VirtualHost, v); - Assert.Equal(cf.HostName, h); - Assert.Equal(cf.Port, p); - Assert.Equal(cf.MaxMessageSize, mms); + Assert.Equal(u, cf.UserName); + Assert.Equal(pw, cf.Password); + Assert.Equal(v, cf.VirtualHost); + Assert.Equal(h, cf.HostName); + Assert.Equal(p, cf.Port); + Assert.Equal(mms, cf.MaxInboundMessageBodySize); - Assert.Equal(cf.Endpoint.HostName, h); - Assert.Equal(cf.Endpoint.Port, p); - Assert.Equal(cf.Endpoint.MaxMessageSize, mms); + Assert.Equal(h, cf.Endpoint.HostName); + Assert.Equal(p, cf.Endpoint.Port); + Assert.Equal(mms, cf.Endpoint.MaxInboundMessageBodySize); } [Fact] @@ -234,12 +234,12 @@ public async Task TestCreateConnectionUsesDefaultMaxMessageSize() cf.AutomaticRecoveryEnabled = true; cf.HostName = "localhost"; - Assert.Equal(ConnectionFactory.DefaultMaxMessageSize, cf.MaxMessageSize); - Assert.Equal(ConnectionFactory.DefaultMaxMessageSize, cf.Endpoint.MaxMessageSize); + Assert.Equal(ConnectionFactory.DefaultMaxInboundMessageBodySize, cf.MaxInboundMessageBodySize); + Assert.Equal(ConnectionFactory.DefaultMaxInboundMessageBodySize, cf.Endpoint.MaxInboundMessageBodySize); using (IConnection conn = await cf.CreateConnectionAsync()) { - Assert.Equal(ConnectionFactory.DefaultMaxMessageSize, conn.Endpoint.MaxMessageSize); + Assert.Equal(ConnectionFactory.DefaultMaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); await conn.CloseAsync(); } } @@ -339,17 +339,19 @@ public async Task TestCreateConnectionUsesValidEndpointWhenMultipleSupplied() [Fact] public void TestCreateAmqpTCPEndPointOverridesMaxMessageSizeWhenGreaterThanMaximumAllowed() { - _ = new AmqpTcpEndpoint("localhost", -1, new SslOption(), ConnectionFactory.MaximumMaxMessageSize); + var ep = new AmqpTcpEndpoint("localhost", -1, new SslOption(), + 2 * InternalConstants.DefaultRabbitMqMaxInboundMessageBodySize); + Assert.Equal(InternalConstants.DefaultRabbitMqMaxInboundMessageBodySize, ep.MaxInboundMessageBodySize); } [Fact] public async Task TestCreateConnectionUsesConfiguredMaxMessageSize() { ConnectionFactory cf = CreateConnectionFactory(); - cf.MaxMessageSize = 1500; + cf.MaxInboundMessageBodySize = 1500; using (IConnection conn = await cf.CreateConnectionAsync()) { - Assert.Equal(cf.MaxMessageSize, conn.Endpoint.MaxMessageSize); + Assert.Equal(cf.MaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); await conn.CloseAsync(); } } @@ -357,12 +359,12 @@ public async Task TestCreateConnectionUsesConfiguredMaxMessageSize() public async Task TestCreateConnectionWithAmqpEndpointListUsesAmqpTcpEndpointMaxMessageSize() { ConnectionFactory cf = CreateConnectionFactory(); - cf.MaxMessageSize = 1500; + cf.MaxInboundMessageBodySize = 1500; var ep = new AmqpTcpEndpoint("localhost"); - Assert.Equal(ConnectionFactory.DefaultMaxMessageSize, ep.MaxMessageSize); + Assert.Equal(ConnectionFactory.DefaultMaxInboundMessageBodySize, ep.MaxInboundMessageBodySize); using (IConnection conn = await cf.CreateConnectionAsync(new List { ep })) { - Assert.Equal(ConnectionFactory.DefaultMaxMessageSize, conn.Endpoint.MaxMessageSize); + Assert.Equal(ConnectionFactory.DefaultMaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); await conn.CloseAsync(); } } @@ -371,11 +373,11 @@ public async Task TestCreateConnectionWithAmqpEndpointListUsesAmqpTcpEndpointMax public async Task TestCreateConnectionWithAmqpEndpointResolverUsesAmqpTcpEndpointMaxMessageSize() { ConnectionFactory cf = CreateConnectionFactory(); - cf.MaxMessageSize = 1500; + cf.MaxInboundMessageBodySize = 1500; var ep = new AmqpTcpEndpoint("localhost", -1, new SslOption(), 1200); using (IConnection conn = await cf.CreateConnectionAsync(new List { ep })) { - Assert.Equal(ep.MaxMessageSize, conn.Endpoint.MaxMessageSize); + Assert.Equal(ep.MaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); await conn.CloseAsync(); } } @@ -384,10 +386,10 @@ public async Task TestCreateConnectionWithAmqpEndpointResolverUsesAmqpTcpEndpoin public async Task TestCreateConnectionWithHostnameListUsesConnectionFactoryMaxMessageSize() { ConnectionFactory cf = CreateConnectionFactory(); - cf.MaxMessageSize = 1500; + cf.MaxInboundMessageBodySize = 1500; using (IConnection conn = await cf.CreateConnectionAsync(new List { "localhost" })) { - Assert.Equal(cf.MaxMessageSize, conn.Endpoint.MaxMessageSize); + Assert.Equal(cf.MaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); await conn.CloseAsync(); } }