diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs index 3f7f4693e2..4597269ccf 100644 --- a/projects/RabbitMQ.Client/client/api/IChannel.cs +++ b/projects/RabbitMQ.Client/client/api/IChannel.cs @@ -276,6 +276,16 @@ ValueTask BasicPublishAsync(CachedString exchange, CachedString rou /// void ExchangeBind(string destination, string source, string routingKey, IDictionary arguments); + /// + /// Asynchronously binds an exchange to an exchange. + /// + /// + /// + /// Routing key must be shorter than 255 bytes. + /// + /// + ValueTask ExchangeBindAsync(string destination, string source, string routingKey, IDictionary arguments); + /// /// Like ExchangeBind but sets nowait to true. /// @@ -289,10 +299,17 @@ ValueTask BasicPublishAsync(CachedString exchange, CachedString rou /// Declare an exchange. /// /// The exchange is declared non-passive and non-internal. - /// The "nowait" option is not exercised. + /// The "nowait" option is not used. /// void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments); + /// Asynchronously declare an exchange. + /// + /// The exchange is declared non-passive and non-internal. + /// The "nowait" option is not exercised. + /// + ValueTask ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments); + /// /// Same as ExchangeDeclare but sets nowait to true and returns void (as there /// will be no response from the server). @@ -315,6 +332,19 @@ ValueTask BasicPublishAsync(CachedString exchange, CachedString rou /// void ExchangeDelete(string exchange, bool ifUnused); + /* + * TODO LRB rabbitmq/rabbitmq-dotnet-client#1347 + /// + /// Asynchronously delete an exchange. + /// + ValueTask ExchangeDeleteAsync(string exchange, bool ifUnused); + */ + + /// + /// Asynchronously delete an exchange. + /// + ValueTask ExchangeDeleteAsync(string exchange, bool ifUnused); + /// /// Like ExchangeDelete but sets nowait to true. /// @@ -411,17 +441,30 @@ ValueTask BasicPublishAsync(CachedString exchange, CachedString rou uint ConsumerCount(string queue); /// - /// Delete a queue. + /// Deletes a queue. See the Queues guide to learn more. + /// + /// The name of the queue. + /// Only delete the queue if it is unused. + /// Only delete the queue if it is empty. + /// Returns the number of messages purged during deletion. + uint QueueDelete(string queue, bool ifUnused, bool ifEmpty); + + /// + /// Asynchronously deletes a queue. See the Queues guide to learn more. /// /// ///Returns the number of messages purged during queue deletion. /// - uint QueueDelete(string queue, bool ifUnused, bool ifEmpty); + ValueTask QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty); /// ///Same as QueueDelete but sets nowait parameter to true ///and returns void (as there will be no response from the server) /// + /// The name of the queue. + /// Only delete the queue if it is unused. + /// Only delete the queue if it is empty. + /// Returns the number of messages purged during deletion. void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty); /// diff --git a/projects/RabbitMQ.Client/client/api/QueueDeleteOk.cs b/projects/RabbitMQ.Client/client/api/QueueDeleteOk.cs new file mode 100644 index 0000000000..49578a48b8 --- /dev/null +++ b/projects/RabbitMQ.Client/client/api/QueueDeleteOk.cs @@ -0,0 +1,55 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2020 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +namespace RabbitMQ.Client +{ + /// + /// Represents Queue deletion information. + /// + public class QueueDeleteOk + { + private readonly uint _messageCount; + + /// + /// Creates a new instance of . + /// + /// Message count. + public QueueDeleteOk(uint messageCount) + { + _messageCount = messageCount; + } + + /// + /// Count of messages purged when queue was deleted. + /// + public uint MessageCount => _messageCount; + } +} diff --git a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs new file mode 100644 index 0000000000..01ac486ad9 --- /dev/null +++ b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs @@ -0,0 +1,226 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2020 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client.client.framing; +using RabbitMQ.Client.Exceptions; +using RabbitMQ.Client.Framing.Impl; + +namespace RabbitMQ.Client.Impl +{ + internal abstract class AsyncRpcContinuation : IRpcContinuation, IDisposable + { + private readonly CancellationTokenSource _ct; + + protected readonly TaskCompletionSource _tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + private bool _disposedValue; + + public AsyncRpcContinuation(TimeSpan continuationTimeout) + { + _ct = new CancellationTokenSource(continuationTimeout); + _ct.Token.Register(() => + { + if (_tcs.TrySetCanceled()) + { + // TODO LRB #1347 + // Cancellation was successful, does this mean we should set a TimeoutException + // in the same manner as BlockingCell? + } + }, useSynchronizationContext: false); + } + + public TaskAwaiter GetAwaiter() => _tcs.Task.GetAwaiter(); + + // TODO LRB #1347 + // What to do if setting a result fails? + public abstract void HandleCommand(in IncomingCommand cmd); + + public void HandleChannelShutdown(ShutdownEventArgs reason) => _tcs.SetException(new OperationInterruptedException(reason)); + + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + _ct.Dispose(); + } + + _disposedValue = true; + } + } + + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } + + internal class ConnectionSecureOrTuneContinuation : AsyncRpcContinuation + { + public ConnectionSecureOrTuneContinuation(TimeSpan continuationTimeout) : base(continuationTimeout) + { + } + + public override void HandleCommand(in IncomingCommand cmd) + { + try + { + if (cmd.CommandId == ProtocolCommandId.ConnectionSecure) + { + var secure = new ConnectionSecure(cmd.MethodBytes.Span); + _tcs.TrySetResult(new ConnectionSecureOrTune { m_challenge = secure._challenge }); + } + else if (cmd.CommandId == ProtocolCommandId.ConnectionTune) + { + var tune = new ConnectionTune(cmd.MethodBytes.Span); + // TODO LRB #1347 + // What to do if setting a result fails? + _tcs.TrySetResult(new ConnectionSecureOrTune + { + m_tuneDetails = new() { m_channelMax = tune._channelMax, m_frameMax = tune._frameMax, m_heartbeatInSeconds = tune._heartbeat } + }); + } + else + { + _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); + } + } + finally + { + cmd.ReturnMethodBuffer(); + } + } + } + + internal class SimpleAsyncRpcContinuation : AsyncRpcContinuation + { + private readonly ProtocolCommandId _expectedCommandId; + + public SimpleAsyncRpcContinuation(ProtocolCommandId expectedCommandId, TimeSpan continuationTimeout) : base(continuationTimeout) + { + _expectedCommandId = expectedCommandId; + } + + public override void HandleCommand(in IncomingCommand cmd) + { + try + { + if (cmd.CommandId == _expectedCommandId) + { + _tcs.TrySetResult(true); + } + else + { + _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); + } + } + finally + { + cmd.ReturnMethodBuffer(); + } + } + } + + internal class ExchangeDeclareAsyncRpcContinuation : SimpleAsyncRpcContinuation + { + public ExchangeDeclareAsyncRpcContinuation(TimeSpan continuationTimeout) : base(ProtocolCommandId.ExchangeDeclareOk, continuationTimeout) + { + } + } + + internal class ExchangeDeleteAsyncRpcContinuation : SimpleAsyncRpcContinuation + { + public ExchangeDeleteAsyncRpcContinuation(TimeSpan continuationTimeout) : base(ProtocolCommandId.ExchangeDeleteOk, continuationTimeout) + { + } + } + + internal class QueueDeclareAsyncRpcContinuation : AsyncRpcContinuation + { + public QueueDeclareAsyncRpcContinuation(TimeSpan continuationTimeout) : base(continuationTimeout) + { + } + + public override void HandleCommand(in IncomingCommand cmd) + { + try + { + var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodBytes.Span); + var result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount); + if (cmd.CommandId == ProtocolCommandId.QueueDeclareOk) + { + _tcs.TrySetResult(result); + } + else + { + _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); + } + } + finally + { + cmd.ReturnMethodBuffer(); + } + } + } + + internal class QueueDeleteAsyncRpcContinuation : AsyncRpcContinuation + { + public QueueDeleteAsyncRpcContinuation(TimeSpan continuationTimeout) : base(continuationTimeout) + { + } + + public override void HandleCommand(in IncomingCommand cmd) + { + try + { + var result = new Client.Framing.Impl.QueueDeleteOk(cmd.MethodBytes.Span); + if (cmd.CommandId == ProtocolCommandId.QueueDeleteOk) + { + _tcs.TrySetResult(result); + } + else + { + _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); + } + } + finally + { + cmd.ReturnMethodBuffer(); + } + } + } +} diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index ed3a60efc9..38f7d71695 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -372,6 +372,13 @@ public void ExchangeDeclare(string exchange, string type, bool durable, bool aut _connection.RecordExchange(new RecordedExchange(exchange, type, durable, autoDelete, arguments)); } + public async ValueTask ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) + { + ThrowIfDisposed(); + await _innerChannel.ExchangeDeclareAsync(exchange, type, durable, autoDelete, arguments); + _connection.RecordExchange(new RecordedExchange(exchange, type, durable, autoDelete, arguments)); + } + public void ExchangeDeclareNoWait(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) { ThrowIfDisposed(); @@ -388,6 +395,12 @@ public void ExchangeDelete(string exchange, bool ifUnused) _connection.DeleteRecordedExchange(exchange); } + public async ValueTask ExchangeDeleteAsync(string exchange, bool ifUnused) + { + await InnerChannel.ExchangeDeleteAsync(exchange, ifUnused); + _connection.DeleteRecordedExchange(exchange); + } + public void ExchangeDeleteNoWait(string exchange, bool ifUnused) { InnerChannel.ExchangeDeleteNoWait(exchange, ifUnused); @@ -455,12 +468,28 @@ public uint QueueDelete(string queue, bool ifUnused, bool ifEmpty) return result; } + public async ValueTask QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty) + { + ThrowIfDisposed(); + uint result = await _innerChannel.QueueDeleteAsync(queue, ifUnused, ifEmpty); + _connection.DeleteRecordedQueue(queue); + return result; + } + public void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty) { InnerChannel.QueueDeleteNoWait(queue, ifUnused, ifEmpty); _connection.DeleteRecordedQueue(queue); } + public async ValueTask QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty) + { + ThrowIfDisposed(); + uint result = await _innerChannel.QueueDeleteAsync(queue, ifUnused, ifEmpty); + _connection.DeleteRecordedQueue(queue); + return result; + } + public uint QueuePurge(string queue) => InnerChannel.QueuePurge(queue); diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index decf6ac84f..aa90c1073d 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -32,6 +32,7 @@ using System; using System.Buffers; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Runtime.CompilerServices; using System.Text; @@ -198,6 +199,7 @@ public void Close(ushort replyCode, string replyText, bool abort) private async Task CloseAsync(ShutdownEventArgs reason, bool abort) { + // TODO LRB #1347 use async continuation var k = new ShutdownContinuation(); ChannelShutdown += k.OnConnectionShutdown; @@ -246,7 +248,7 @@ internal async ValueTask ConnectionOpenAsync(string virtualHost) internal async ValueTask ConnectionSecureOkAsync(byte[] response) { - var k = new ConnectionSecureOrTuneContinuation(); + using var k = new ConnectionSecureOrTuneContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync().ConfigureAwait(false); Enqueue(k); try @@ -273,7 +275,7 @@ internal async ValueTask ConnectionSecureOkAsync(byte[] internal async ValueTask ConnectionStartOkAsync(IDictionary clientProperties, string mechanism, byte[] response, string locale) { - var k = new ConnectionSecureOrTuneContinuation(); + using var k = new ConnectionSecureOrTuneContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync().ConfigureAwait(false); Enqueue(k); try @@ -468,7 +470,7 @@ private void OnSessionShutdown(object sender, ShutdownEventArgs reason) internal bool SetCloseReason(ShutdownEventArgs reason) { - return System.Threading.Interlocked.CompareExchange(ref _closeReason, reason, null) is null; + return Interlocked.CompareExchange(ref _closeReason, reason, null) is null; } public override string ToString() @@ -485,6 +487,7 @@ protected virtual void Dispose(bool disposing) { // dispose managed resources this.Abort(); + _rpcSemaphore.Dispose(); } // dispose unmanaged resources @@ -755,7 +758,7 @@ protected void HandleConnectionClose(in IncomingCommand cmd) protected void HandleConnectionSecure(in IncomingCommand cmd) { - var k = (ConnectionSecureOrTuneContinuation)_continuationQueue.Next(); + using var k = (ConnectionSecureOrTuneContinuation)_continuationQueue.Next(); k.HandleCommand(IncomingCommand.Empty); // release the continuation. } @@ -783,7 +786,7 @@ protected void HandleConnectionStart(in IncomingCommand cmd) protected void HandleConnectionTune(in IncomingCommand cmd) { - var k = (ConnectionSecureOrTuneContinuation)_continuationQueue.Next(); + using var k = (ConnectionSecureOrTuneContinuation)_continuationQueue.Next(); k.HandleCommand(cmd); // release the continuation. } @@ -1059,6 +1062,11 @@ public void ExchangeDeclare(string exchange, string type, bool durable, bool aut _Private_ExchangeDeclare(exchange, type, false, durable, autoDelete, false, false, arguments); } + public ValueTask ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) + { + return DoExchangeDeclareAsync(exchange, type, false, durable, autoDelete, arguments); + } + public void ExchangeDeclareNoWait(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) { _Private_ExchangeDeclare(exchange, type, false, durable, autoDelete, false, true, arguments); @@ -1074,6 +1082,11 @@ public void ExchangeDelete(string exchange, bool ifUnused) _Private_ExchangeDelete(exchange, ifUnused, false); } + public ValueTask ExchangeDeleteAsync(string exchange, bool ifUnused) + { + return DoExchangeDeleteAsync(exchange, ifUnused); + } + public void ExchangeDeleteNoWait(string exchange, bool ifUnused) { _Private_ExchangeDelete(exchange, ifUnused, true); @@ -1101,12 +1114,12 @@ public void QueueBindNoWait(string queue, string exchange, string routingKey, ID public QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) { - return QueueDeclare(queue, false, durable, exclusive, autoDelete, arguments); + return DoQueueDeclare(queue, false, durable, exclusive, autoDelete, arguments); } public ValueTask QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) { - return QueueDeclareAsync(queue, false, durable, exclusive, autoDelete, arguments); + return DoQueueDeclareAsync(queue, false, durable, exclusive, autoDelete, arguments); } public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) @@ -1116,7 +1129,7 @@ public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool public QueueDeclareOk QueueDeclarePassive(string queue) { - return QueueDeclare(queue, true, false, false, false, null); + return DoQueueDeclare(queue, true, false, false, false, null); } public uint MessageCount(string queue) @@ -1136,11 +1149,36 @@ public uint QueueDelete(string queue, bool ifUnused, bool ifEmpty) return _Private_QueueDelete(queue, ifUnused, ifEmpty, false); } + public async ValueTask QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty) + { + var k = new QueueDeleteAsyncRpcContinuation(); + await _rpcSemaphore.WaitAsync().ConfigureAwait(false); + try + { + Enqueue(k); + + var method = new QueueDelete(queue, ifUnused, ifEmpty, false); + await ModelSendAsync(method).ConfigureAwait(false); + + QueueDeleteOk result = await k; + return result.MessageCount; + } + finally + { + _rpcSemaphore.Release(); + } + } + public void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty) { _Private_QueueDelete(queue, ifUnused, ifEmpty, true); } + public ValueTask QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty) + { + return DoQueueDeleteAsync(queue, ifUnused, ifEmpty); + } + public uint QueuePurge(string queue) { return _Private_QueuePurge(queue, false); @@ -1237,7 +1275,49 @@ await CloseAsync(new ShutdownEventArgs(ShutdownInitiator.Library, } } - private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) + private async ValueTask DoExchangeDeclareAsync(string exchange, string type, bool passive, bool durable, bool autoDelete, IDictionary arguments) + { + using var k = new ExchangeDeclareAsyncRpcContinuation(ContinuationTimeout); + await _rpcSemaphore.WaitAsync().ConfigureAwait(false); + try + { + Enqueue(k); + + var method = new ExchangeDeclare(exchange, type, passive, durable, autoDelete, false, false, arguments); + await ModelSendAsync(method).ConfigureAwait(false); + + bool result = await k; + Debug.Assert(result); + return; + } + finally + { + _rpcSemaphore.Release(); + } + } + + private async ValueTask DoExchangeDeleteAsync(string exchange, bool ifUnused) + { + using var k = new ExchangeDeleteAsyncRpcContinuation(ContinuationTimeout); + await _rpcSemaphore.WaitAsync().ConfigureAwait(false); + try + { + Enqueue(k); + + var method = new ExchangeDelete(exchange, ifUnused, Nowait: false); + await ModelSendAsync(method).ConfigureAwait(false); + + bool result = await k; + Debug.Assert(result); + return; + } + finally + { + _rpcSemaphore.Release(); + } + } + + private QueueDeclareOk DoQueueDeclare(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) { var k = new QueueDeclareRpcContinuation(); @@ -1258,9 +1338,9 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo return result; } - private async ValueTask QueueDeclareAsync(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) + private async ValueTask DoQueueDeclareAsync(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) { - var k = new QueueDeclareAsyncRpcContinuation(); + using var k = new QueueDeclareAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync().ConfigureAwait(false); try { @@ -1279,48 +1359,23 @@ private async ValueTask QueueDeclareAsync(string queue, bool pas } } - public class BasicConsumerRpcContinuation : SimpleBlockingRpcContinuation - { - public IBasicConsumer m_consumer; - public string m_consumerTag; - } - - public class BasicGetRpcContinuation : SimpleBlockingRpcContinuation + private async ValueTask DoQueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty) { - public BasicGetResult m_result; - } - - public class ConnectionStartRpcContinuation : SimpleBlockingRpcContinuation - { - public ConnectionSecureOrTune m_result; - } + using var k = new QueueDeleteAsyncRpcContinuation(ContinuationTimeout); + await _rpcSemaphore.WaitAsync().ConfigureAwait(false); + try + { + Enqueue(k); - public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation - { - public QueueDeclareOk m_result; - } + var method = new QueueDelete(queue, ifUnused, ifEmpty, Nowait: false); + await ModelSendAsync(method).ConfigureAwait(false); - public class QueueDeclareAsyncRpcContinuation : AsyncRpcContinuation - { - public override void HandleCommand(in IncomingCommand cmd) + QueueDeleteOk result = await k; + return result._messageCount; + } + finally { - try - { - var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodBytes.Span); - var result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount); - if (cmd.CommandId == ProtocolCommandId.QueueDeclareOk) - { - _tcs.TrySetResult(result); - } - else - { - _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); - } - } - finally - { - cmd.ReturnMethodBuffer(); - } + _rpcSemaphore.Release(); } } } diff --git a/projects/RabbitMQ.Client/client/impl/SimpleBlockingRpcContinuation.cs b/projects/RabbitMQ.Client/client/impl/RpcContinuations.cs similarity index 62% rename from projects/RabbitMQ.Client/client/impl/SimpleBlockingRpcContinuation.cs rename to projects/RabbitMQ.Client/client/impl/RpcContinuations.cs index bb40f6be82..5db9654d22 100644 --- a/projects/RabbitMQ.Client/client/impl/SimpleBlockingRpcContinuation.cs +++ b/projects/RabbitMQ.Client/client/impl/RpcContinuations.cs @@ -30,57 +30,11 @@ //--------------------------------------------------------------------------- using System; -using System.Runtime.CompilerServices; -using System.Threading.Tasks; -using RabbitMQ.Client.client.framing; using RabbitMQ.Client.Exceptions; -using RabbitMQ.Client.Framing.Impl; using RabbitMQ.Util; namespace RabbitMQ.Client.Impl { - internal abstract class AsyncRpcContinuation : IRpcContinuation - { - protected readonly TaskCompletionSource _tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - - public TaskAwaiter GetAwaiter() => _tcs.Task.GetAwaiter(); - - public abstract void HandleCommand(in IncomingCommand cmd); - - public void HandleChannelShutdown(ShutdownEventArgs reason) => _tcs.SetException(new OperationInterruptedException(reason)); - } - - internal class ConnectionSecureOrTuneContinuation : AsyncRpcContinuation - { - public override void HandleCommand(in IncomingCommand cmd) - { - try - { - if (cmd.CommandId == ProtocolCommandId.ConnectionSecure) - { - var secure = new ConnectionSecure(cmd.MethodBytes.Span); - _tcs.TrySetResult(new ConnectionSecureOrTune { m_challenge = secure._challenge }); - } - else if (cmd.CommandId == ProtocolCommandId.ConnectionTune) - { - var tune = new ConnectionTune(cmd.MethodBytes.Span); - _tcs.TrySetResult(new ConnectionSecureOrTune - { - m_tuneDetails = new() { m_channelMax = tune._channelMax, m_frameMax = tune._frameMax, m_heartbeatInSeconds = tune._heartbeat } - }); - } - else - { - _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); - } - } - finally - { - cmd.ReturnMethodBuffer(); - } - } - } - internal class SimpleBlockingRpcContinuation : IRpcContinuation { private readonly BlockingCell> m_cell = new BlockingCell>(); @@ -121,4 +75,20 @@ public void HandleChannelShutdown(ShutdownEventArgs reason) m_cell.ContinueWithValue(Either.Right(reason)); } } + + internal class BasicConsumerRpcContinuation : SimpleBlockingRpcContinuation + { + public IBasicConsumer m_consumer; + public string m_consumerTag; + } + + internal class BasicGetRpcContinuation : SimpleBlockingRpcContinuation + { + public BasicGetResult m_result; + } + + internal class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation + { + public QueueDeclareOk m_result; + } } diff --git a/projects/Unit/APIApproval.Approve.verified.txt b/projects/Unit/APIApproval.Approve.verified.txt index 8df115691b..a9e0824c64 100644 --- a/projects/Unit/APIApproval.Approve.verified.txt +++ b/projects/Unit/APIApproval.Approve.verified.txt @@ -432,9 +432,11 @@ namespace RabbitMQ.Client void ExchangeBind(string destination, string source, string routingKey, System.Collections.Generic.IDictionary arguments); void ExchangeBindNoWait(string destination, string source, string routingKey, System.Collections.Generic.IDictionary arguments); void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, System.Collections.Generic.IDictionary arguments); + System.Threading.Tasks.ValueTask ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, System.Collections.Generic.IDictionary arguments); void ExchangeDeclareNoWait(string exchange, string type, bool durable, bool autoDelete, System.Collections.Generic.IDictionary arguments); void ExchangeDeclarePassive(string exchange); void ExchangeDelete(string exchange, bool ifUnused); + System.Threading.Tasks.ValueTask ExchangeDeleteAsync(string exchange, bool ifUnused); void ExchangeDeleteNoWait(string exchange, bool ifUnused); void ExchangeUnbind(string destination, string source, string routingKey, System.Collections.Generic.IDictionary arguments); void ExchangeUnbindNoWait(string destination, string source, string routingKey, System.Collections.Generic.IDictionary arguments); @@ -446,6 +448,7 @@ namespace RabbitMQ.Client void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool autoDelete, System.Collections.Generic.IDictionary arguments); RabbitMQ.Client.QueueDeclareOk QueueDeclarePassive(string queue); uint QueueDelete(string queue, bool ifUnused, bool ifEmpty); + System.Threading.Tasks.ValueTask QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty); void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty); uint QueuePurge(string queue); void QueueUnbind(string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary arguments); diff --git a/projects/Unit/TestExchangeDeclare.cs b/projects/Unit/TestExchangeDeclare.cs index 555c58c1ad..3a59888d43 100644 --- a/projects/Unit/TestExchangeDeclare.cs +++ b/projects/Unit/TestExchangeDeclare.cs @@ -32,6 +32,7 @@ using System; using System.Collections.Generic; using System.Threading; +using System.Threading.Tasks; using Xunit; using Xunit.Abstractions; @@ -79,5 +80,41 @@ public void TestConcurrentExchangeDeclare() Assert.Null(nse); _channel.ExchangeDelete(x); } + + /* + * TODO LRB rabbitmq/rabbitmq-dotnet-client#1347 + [Fact] + public async void TestConcurrentExchangeDeclareAsync() + { + string x = GenerateExchangeName(); + var rnd = new Random(); + + var ts = new List(); + NotSupportedException nse = null; + for (int i = 0; i < 256; i++) + { + async Task f() + { + try + { + // sleep for a random amount of time to increase the chances + // of thread interleaving. MK. + await Task.Delay(rnd.Next(5, 50)); + await _channel.ExchangeDeclareAsync(x, "fanout", false, false, null); + } + catch (NotSupportedException e) + { + nse = e; + } + } + var t = Task.Run(f); + ts.Add(t); + } + + await Task.WhenAll(ts); + Assert.Null(nse); + await _channel.ExchangeDeleteAsync(x, false); + } + */ } } diff --git a/projects/Unit/TestQueueDeclare.cs b/projects/Unit/TestQueueDeclare.cs index 8896468527..738349e682 100644 --- a/projects/Unit/TestQueueDeclare.cs +++ b/projects/Unit/TestQueueDeclare.cs @@ -53,7 +53,6 @@ public async void TestQueueDeclareAsync() } [Fact] - [Trait("Category", "RequireSMP")] public void TestConcurrentQueueDeclare() { string q = GenerateQueueName(); @@ -91,7 +90,6 @@ public void TestConcurrentQueueDeclare() } [Fact] - [Trait("Category", "RequireSMP")] public async void TestConcurrentQueueDeclareAsync() { string q = GenerateQueueName(); @@ -121,7 +119,8 @@ async Task f() await Task.WhenAll(ts); Assert.Null(nse); - _channel.QueueDelete(q); + uint deletedMessageCount = await _channel.QueueDeleteAsync(q, false, false); + Assert.Equal((uint)0, deletedMessageCount); } } }