From f25ea2d3d5e51df93f6ff5f43f0f2ffac9ab002d Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 16 Jan 2020 15:52:53 -0800 Subject: [PATCH 1/2] Convert timeouts and other time intervals to TimeSpan Fixes #646 Convert more values to TimeSpan Finish conversion to TimeSpan Revert "Remove NuGet.Build.Tasks.Pack" This reverts commit 04518f156bd4a45b3902988715e9333c63dc7224. --- .../src/client/api/ConnectionFactory.cs | 43 ++++++----- .../src/client/api/IConnection.cs | 50 ++----------- .../src/client/api/IConnectionFactory.cs | 4 +- .../src/client/api/ITcpClient.cs | 2 +- .../client/impl/AutorecoveringConnection.cs | 28 +++---- .../src/client/impl/Connection.cs | 74 ++++++++----------- .../src/client/impl/IFrameHandler.cs | 10 +-- .../src/client/impl/IFullModel.cs | 2 +- .../src/client/impl/IProtocolExtensions.cs | 9 +-- .../src/client/impl/ModelBase.cs | 6 +- .../src/client/impl/SessionManager.cs | 2 +- .../src/client/impl/SocketFrameHandler.cs | 36 ++++----- .../src/client/impl/TcpClientAdapter.cs | 6 +- .../RabbitMQ.Client/src/util/BlockingCell.cs | 30 +------- .../src/unit/APIApproval.Approve.approved.txt | 56 +++++++------- projects/client/Unit/src/unit/Fixtures.cs | 10 +-- .../client/Unit/src/unit/TestBlockingCell.cs | 32 +++----- .../Unit/src/unit/TestFloodPublishing.cs | 2 +- .../client/Unit/src/unit/TestHeartbeats.cs | 8 +- .../client/Unit/src/unit/TestSharedQueue.cs | 26 ++++--- 20 files changed, 172 insertions(+), 264 deletions(-) diff --git a/projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs b/projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs index c557b4fe3f..eee2b068ba 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs @@ -103,9 +103,9 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IAsyncConnectionF public const ushort DefaultChannelMax = 2047; /// - /// Default value for connection attempt timeout, in milliseconds. + /// Default value for connection attempt timeout. /// - public const int DefaultConnectionTimeout = 30 * 1000; + public static readonly TimeSpan DefaultConnectionTimeout = TimeSpan.FromSeconds(30); /// /// Default value for the desired maximum frame size. Default is 0 ("no limit"). @@ -113,10 +113,10 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IAsyncConnectionF public const uint DefaultFrameMax = 0; /// - /// Default value for desired heartbeat interval, in seconds. Default is 60, - /// 0 means "heartbeats are disabled". + /// Default value for desired heartbeat interval. Default is 60 seconds, + /// TimeSpan.Zero means "heartbeats are disabled". /// - public const ushort DefaultHeartbeat = 60; // + public static readonly TimeSpan DefaultHeartbeat = TimeSpan.FromSeconds(60); // /// /// Default password (value: "guest"). @@ -227,19 +227,19 @@ public TimeSpan ContinuationTimeout public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort; /// - /// Timeout setting for connection attempts (in milliseconds). + /// Timeout setting for connection attempts. /// - public int RequestedConnectionTimeout { get; set; } = DefaultConnectionTimeout; + public TimeSpan RequestedConnectionTimeout { get; set; } = DefaultConnectionTimeout; /// - /// Timeout setting for socket read operations (in milliseconds). + /// Timeout setting for socket read operations. /// - public int SocketReadTimeout { get; set; } = DefaultConnectionTimeout; + public TimeSpan SocketReadTimeout { get; set; } = DefaultConnectionTimeout; /// - /// Timeout setting for socket write operations (in milliseconds). + /// Timeout setting for socket write operations. /// - public int SocketWriteTimeout { get; set; } = DefaultConnectionTimeout; + public TimeSpan SocketWriteTimeout { get; set; } = DefaultConnectionTimeout; /// /// Ssl options setting. @@ -295,9 +295,9 @@ public AmqpTcpEndpoint Endpoint public uint RequestedFrameMax { get; set; } = DefaultFrameMax; /// - /// Heartbeat timeout to use when negotiating with the server (in seconds). + /// Heartbeat timeout to use when negotiating with the server. /// - public ushort RequestedHeartbeat { get; set; } = DefaultHeartbeat; + public TimeSpan RequestedHeartbeat { get; set; } = DefaultHeartbeat; /// /// When set to true, background thread will be used for the I/O loop. @@ -525,13 +525,22 @@ private IFrameHandler CreateFrameHandlerForHostname(string hostname) return CreateFrameHandler(this.Endpoint.CloneWithHostname(hostname)); } - private IFrameHandler ConfigureFrameHandler(IFrameHandler fh) { - // make sure socket timeouts are higher than heartbeat - fh.ReadTimeout = Math.Max(SocketReadTimeout, RequestedHeartbeat * 1000); - fh.WriteTimeout = Math.Max(SocketWriteTimeout, RequestedHeartbeat * 1000); // TODO: add user-provided configurator, like in the Java client + fh.ReadTimeout = RequestedHeartbeat; + fh.WriteTimeout = RequestedHeartbeat; + + if (SocketReadTimeout > RequestedHeartbeat) + { + fh.ReadTimeout = SocketReadTimeout; + } + + if (SocketWriteTimeout > RequestedHeartbeat) + { + fh.WriteTimeout = SocketWriteTimeout; + } + return fh; } diff --git a/projects/client/RabbitMQ.Client/src/client/api/IConnection.cs b/projects/client/RabbitMQ.Client/src/client/api/IConnection.cs index a7949e7569..4364ec213a 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/IConnection.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/IConnection.cs @@ -108,9 +108,9 @@ public interface IConnection : NetworkConnection, IDisposable uint FrameMax { get; } /// - /// The current heartbeat setting for this connection (0 for disabled), in seconds. + /// The current heartbeat setting for this connection (System.TimeSpan.Zero for disabled). /// - ushort Heartbeat { get; } + TimeSpan Heartbeat { get; } /// /// Returns true if the connection is still in a state where it can be used. @@ -213,22 +213,6 @@ public interface IConnection : NetworkConnection, IDisposable /// void Abort(ushort reasonCode, string reasonText); - /// - /// Abort this connection and all its channels and wait with a - /// timeout for all the in-progress close operations to complete. - /// - /// - /// This method, behaves in a similar way as method with the - /// only difference that it explictly specifies a timeout given - /// for all the in-progress close operations to complete. - /// If timeout is reached and the close operations haven't finished, then socket is forced to close. - /// - /// The timeout value is in milliseconds. - /// To wait infinitely for the close operations to complete use . - /// - /// - void Abort(int timeout); - /// /// Abort this connection and all its channels and wait with a /// timeout for all the in-progress close operations to complete. @@ -244,25 +228,6 @@ public interface IConnection : NetworkConnection, IDisposable /// void Abort(TimeSpan timeout); - /// - /// Abort this connection and all its channels and wait with a - /// timeout for all the in-progress close operations to complete. - /// - /// - /// The method behaves in the same way as , with the only - /// difference that the connection is closed with the given connection close code and message. - /// - /// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification). - /// - /// - /// A message indicating the reason for closing the connection. - /// - /// - /// Operation timeout in milliseconds. - /// - /// - void Abort(ushort reasonCode, string reasonText, int timeout); - /// /// Abort this connection and all its channels and wait with a /// timeout for all the in-progress close operations to complete. @@ -319,18 +284,17 @@ public interface IConnection : NetworkConnection, IDisposable /// It can also throw when socket was closed unexpectedly. /// If timeout is reached and the close operations haven't finished, then socket is forced to close. /// - /// The timeout value is in milliseconds. - /// To wait infinitely for the close operations to complete use . + /// To wait infinitely for the close operations to complete use . /// /// - void Close(int timeout); + void Close(TimeSpan timeout); /// /// Close this connection and all its channels /// and wait with a timeout for all the in-progress close operations to complete. /// /// - /// The method behaves in the same way as , with the only + /// The method behaves in the same way as , with the only /// difference that the connection is closed with the given connection close code and message. /// /// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification). @@ -339,10 +303,10 @@ public interface IConnection : NetworkConnection, IDisposable /// A message indicating the reason for closing the connection. /// /// - /// Operation timeout in milliseconds. + /// Operation timeout. /// /// - void Close(ushort reasonCode, string reasonText, int timeout); + void Close(ushort reasonCode, string reasonText, TimeSpan timeout); /// /// Create and return a fresh channel, session, and model. diff --git a/projects/client/RabbitMQ.Client/src/client/api/IConnectionFactory.cs b/projects/client/RabbitMQ.Client/src/client/api/IConnectionFactory.cs index 77d94e6028..170504ccc4 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/IConnectionFactory.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/IConnectionFactory.cs @@ -68,9 +68,9 @@ public interface IConnectionFactory uint RequestedFrameMax { get; set; } /// - /// Heartbeat setting to request (in seconds). + /// Heartbeat setting to request. /// - ushort RequestedHeartbeat { get; set; } + TimeSpan RequestedHeartbeat { get; set; } /// /// When set to true, background threads will be used for I/O and heartbeats. diff --git a/projects/client/RabbitMQ.Client/src/client/api/ITcpClient.cs b/projects/client/RabbitMQ.Client/src/client/api/ITcpClient.cs index d76bd815db..b7ab55f042 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/ITcpClient.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/ITcpClient.cs @@ -12,7 +12,7 @@ public interface ITcpClient : IDisposable { bool Connected { get; } - int ReceiveTimeout { get; set; } + TimeSpan ReceiveTimeout { get; set; } Socket Client { get; } diff --git a/projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs b/projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs index 63b44abfc3..f7fb7b160e 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs @@ -276,7 +276,7 @@ public uint FrameMax get { return m_delegate.FrameMax; } } - public ushort Heartbeat + public TimeSpan Heartbeat { get { return m_delegate.Heartbeat; } } @@ -599,28 +599,14 @@ public void Abort(ushort reasonCode, string reasonText) m_delegate.Abort(reasonCode, reasonText); } - ///API-side invocation of connection abort with timeout. - public void Abort(int timeout) - { - StopRecoveryLoop(); - if (m_delegate.IsOpen) - m_delegate.Abort(timeout); - } - ///API-side invocation of connection abort with timeout. public void Abort(TimeSpan timeout) { StopRecoveryLoop(); if (m_delegate.IsOpen) + { m_delegate.Abort(timeout); - } - - ///API-side invocation of connection abort with timeout. - public void Abort(ushort reasonCode, string reasonText, int timeout) - { - StopRecoveryLoop(); - if (m_delegate.IsOpen) - m_delegate.Abort(reasonCode, reasonText, timeout); + } } ///API-side invocation of connection abort with timeout. @@ -648,19 +634,23 @@ public void Close(ushort reasonCode, string reasonText) } ///API-side invocation of connection.close with timeout. - public void Close(int timeout) + public void Close(TimeSpan timeout) { StopRecoveryLoop(); if (m_delegate.IsOpen) + { m_delegate.Close(timeout); + } } ///API-side invocation of connection.close with timeout. - public void Close(ushort reasonCode, string reasonText, int timeout) + public void Close(ushort reasonCode, string reasonText, TimeSpan timeout) { StopRecoveryLoop(); if (m_delegate.IsOpen) + { m_delegate.Close(reasonCode, reasonText, timeout); + } } public IModel CreateModel() diff --git a/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs b/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs index 24249e5e69..4fdd8a1c7c 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs @@ -89,7 +89,7 @@ public class Connection : IConnection // Heartbeats // - private ushort m_heartbeat = 0; + private TimeSpan m_heartbeat = TimeSpan.Zero; private TimeSpan m_heartbeatTimeSpan = TimeSpan.FromSeconds(0); private int m_missedHeartbeats = 0; @@ -280,7 +280,7 @@ public AmqpTcpEndpoint Endpoint public uint FrameMax { get; set; } - public ushort Heartbeat + public TimeSpan Heartbeat { get { return m_heartbeat; } set @@ -288,8 +288,8 @@ public ushort Heartbeat m_heartbeat = value; // timers fire at slightly below half the interval to avoid race // conditions - m_heartbeatTimeSpan = TimeSpan.FromMilliseconds((value * 1000) / 4); - m_frameHandler.ReadTimeout = value * 1000 * 2; + m_heartbeatTimeSpan = TimeSpan.FromMilliseconds(m_heartbeat.TotalMilliseconds / 4); + m_frameHandler.ReadTimeout = TimeSpan.FromMilliseconds(m_heartbeat.TotalMilliseconds * 2); } } @@ -352,16 +352,14 @@ public static IDictionary DefaultClientProperties() return table; } - public void Abort(ushort reasonCode, string reasonText, - ShutdownInitiator initiator, int timeout) + public void Abort(ushort reasonCode, string reasonText, ShutdownInitiator initiator, TimeSpan timeout) { - Close(new ShutdownEventArgs(initiator, reasonCode, reasonText), - true, timeout); + Close(new ShutdownEventArgs(initiator, reasonCode, reasonText), true, timeout); } public void Close(ShutdownEventArgs reason) { - Close(reason, false, Timeout.Infinite); + Close(reason, false, Timeout.InfiniteTimeSpan); } ///Try to close connection in a graceful way @@ -376,10 +374,10 @@ public void Close(ShutdownEventArgs reason) /// /// ///Timeout determines how much time internal close operations should be given - ///to complete. Negative or Timeout.Infinite value mean infinity. + ///to complete. System.Threading.Timeout.InfiniteTimeSpan value means infinity. /// /// - public void Close(ShutdownEventArgs reason, bool abort, int timeout) + public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout) { if (!SetCloseReason(reason)) { @@ -397,8 +395,7 @@ public void Close(ShutdownEventArgs reason, bool abort, int timeout) { // Try to send connection.close // Wait for CloseOk in the MainLoop - m_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode, - reason.ReplyText)); + m_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode, reason.ReplyText)); } catch (AlreadyClosedException ace) { @@ -435,7 +432,7 @@ public void Close(ShutdownEventArgs reason, bool abort, int timeout) } } - var receivedSignal = m_appContinuation.WaitOne(BlockingCell.validatedTimeout(timeout)); + var receivedSignal = m_appContinuation.WaitOne(timeout); if (!receivedSignal) { @@ -450,7 +447,7 @@ public void ClosingLoop() { try { - m_frameHandler.ReadTimeout = 0; + m_frameHandler.ReadTimeout = TimeSpan.Zero; // Wait for response/socket closure or timeout while (!m_closed) { @@ -720,7 +717,7 @@ public void MainLoopIteration() public void NotifyHeartbeatListener() { - if (m_heartbeat != 0) + if (m_heartbeat != TimeSpan.Zero) { m_heartbeatRead.Set(); } @@ -944,7 +941,7 @@ public bool SetCloseReason(ShutdownEventArgs reason) public void MaybeStartHeartbeatTimers() { - if (Heartbeat != 0) + if (Heartbeat != TimeSpan.Zero) { m_hasDisposedHeartBeatReadTimer = false; m_hasDisposedHeartBeatWriteTimer = false; @@ -1020,7 +1017,7 @@ public void HeartbeatReadTimerCallback(object state) } else if (_heartbeatReadTimer != null) { - _heartbeatReadTimer.Change(Heartbeat * 1000, Timeout.Infinite); + _heartbeatReadTimer.Change((int)Heartbeat.TotalMilliseconds, Timeout.Infinite); } } catch (ObjectDisposedException) @@ -1143,59 +1140,47 @@ public void UpdateSecret(string newSecret, string reason) ///API-side invocation of connection abort. public void Abort() { - Abort(Timeout.Infinite); + Abort(Timeout.InfiniteTimeSpan); } ///API-side invocation of connection abort. public void Abort(ushort reasonCode, string reasonText) { - Abort(reasonCode, reasonText, Timeout.Infinite); - } - - ///API-side invocation of connection abort with timeout. - public void Abort(int timeout) - { - Abort(Constants.ReplySuccess, "Connection close forced", timeout); + Abort(reasonCode, reasonText, Timeout.InfiniteTimeSpan); } ///API-side invocation of connection abort with timeout. public void Abort(TimeSpan timeout) { - Abort(Constants.ReplySuccess, "Connection close forced", Convert.ToInt32(timeout.TotalMilliseconds)); - } - - ///API-side invocation of connection abort with timeout. - public void Abort(ushort reasonCode, string reasonText, int timeout) - { - Abort(reasonCode, reasonText, ShutdownInitiator.Application, timeout); + Abort(Constants.ReplySuccess, "Connection close forced", timeout); } ///API-side invocation of connection abort with timeout. public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout) { - Abort(reasonCode, reasonText, ShutdownInitiator.Application, Convert.ToInt32(timeout.TotalMilliseconds)); + Abort(reasonCode, reasonText, ShutdownInitiator.Application, timeout); } ///API-side invocation of connection.close. public void Close() { - Close(Constants.ReplySuccess, "Goodbye", Timeout.Infinite); + Close(Constants.ReplySuccess, "Goodbye", Timeout.InfiniteTimeSpan); } ///API-side invocation of connection.close. public void Close(ushort reasonCode, string reasonText) { - Close(reasonCode, reasonText, Timeout.Infinite); + Close(reasonCode, reasonText, Timeout.InfiniteTimeSpan); } ///API-side invocation of connection.close with timeout. - public void Close(int timeout) + public void Close(TimeSpan timeout) { Close(Constants.ReplySuccess, "Goodbye", timeout); } ///API-side invocation of connection.close with timeout. - public void Close(ushort reasonCode, string reasonText, int timeout) + public void Close(ushort reasonCode, string reasonText, TimeSpan timeout) { Close(new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText), false, timeout); } @@ -1270,7 +1255,7 @@ void StartAndTune() var connectionStartCell = new BlockingCell(); m_model0.m_connectionStartCell = connectionStartCell; m_model0.HandshakeContinuationTimeout = m_factory.HandshakeContinuationTimeout; - m_frameHandler.ReadTimeout = (int)m_factory.HandshakeContinuationTimeout.TotalMilliseconds; + m_frameHandler.ReadTimeout = m_factory.HandshakeContinuationTimeout; m_frameHandler.SendHeader(); var connectionStart = connectionStartCell.WaitForValue(); @@ -1359,13 +1344,12 @@ void StartAndTune() connectionTune.m_frameMax); FrameMax = frameMax; - var heartbeat = (ushort)NegotiatedMaxValue(m_factory.RequestedHeartbeat, - connectionTune.m_heartbeat); - Heartbeat = heartbeat; + TimeSpan requestedHeartbeat = m_factory.RequestedHeartbeat; // TODO LRB + var heartbeatInSeconds = NegotiatedMaxValue((uint)requestedHeartbeat.TotalSeconds, + (uint)connectionTune.m_heartbeatInSeconds); + Heartbeat = TimeSpan.FromSeconds(heartbeatInSeconds); - m_model0.ConnectionTuneOk(channelMax, - frameMax, - heartbeat); + m_model0.ConnectionTuneOk(channelMax, frameMax, (ushort)Heartbeat.TotalSeconds); // now we can start heartbeat timers MaybeStartHeartbeatTimers(); diff --git a/projects/client/RabbitMQ.Client/src/client/impl/IFrameHandler.cs b/projects/client/RabbitMQ.Client/src/client/impl/IFrameHandler.cs index 9403e06683..9376160188 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/IFrameHandler.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/IFrameHandler.cs @@ -56,11 +56,11 @@ public interface IFrameHandler int RemotePort { get; } - ///Socket read timeout, in milliseconds. Zero signals "infinity". - int ReadTimeout { set; } + ///Socket read timeout. System.Threading.Timeout.InfiniteTimeSpan signals "infinity". + TimeSpan ReadTimeout { set; } - ///Socket write timeout, in milliseconds. Zero signals "infinity". - int WriteTimeout { set; } + ///Socket write timeout. System.Threading.Timeout.InfiniteTimeSpan signals "infinity". + TimeSpan WriteTimeout { set; } void Close(); @@ -75,4 +75,4 @@ public interface IFrameHandler void WriteFrameSet(IList frames); } -} +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/impl/IFullModel.cs b/projects/client/RabbitMQ.Client/src/client/impl/IFullModel.cs index 264b832547..1ce91adc0d 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/IFullModel.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/IFullModel.cs @@ -396,7 +396,7 @@ public struct ConnectionTuneDetails public uint m_frameMax; ///The peer's suggested heartbeat parameter. - public ushort m_heartbeat; + public ushort m_heartbeatInSeconds; } diff --git a/projects/client/RabbitMQ.Client/src/client/impl/IProtocolExtensions.cs b/projects/client/RabbitMQ.Client/src/client/impl/IProtocolExtensions.cs index b3fc65486d..65b06aa8c7 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/IProtocolExtensions.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/IProtocolExtensions.cs @@ -52,12 +52,11 @@ public static IFrameHandler CreateFrameHandler( this IProtocol protocol, AmqpTcpEndpoint endpoint, Func socketFactory, - int connectionTimeout, - int readTimeout, - int writeTimeout) + TimeSpan connectionTimeout, + TimeSpan readTimeout, + TimeSpan writeTimeout) { - return new SocketFrameHandler(endpoint, socketFactory, - connectionTimeout, readTimeout, writeTimeout); + return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout); } } } \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs index d68c497ecf..6fa990c631 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs @@ -1034,9 +1034,7 @@ public void HandleConnectionStart(byte versionMajor, ///Handle incoming Connection.Tune ///methods. - public void HandleConnectionTune(ushort channelMax, - uint frameMax, - ushort heartbeat) + public void HandleConnectionTune(ushort channelMax, uint frameMax, ushort heartbeatInSeconds) { var k = (ConnectionStartRpcContinuation)m_continuationQueue.Next(); k.m_result = new ConnectionSecureOrTune @@ -1045,7 +1043,7 @@ public void HandleConnectionTune(ushort channelMax, { m_channelMax = channelMax, m_frameMax = frameMax, - m_heartbeat = heartbeat + m_heartbeatInSeconds = heartbeatInSeconds } }; k.HandleCommand(null); // release the continuation. diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SessionManager.cs b/projects/client/RabbitMQ.Client/src/client/impl/SessionManager.cs index 12e2ca4e6c..fc05f15228 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/SessionManager.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/SessionManager.cs @@ -90,7 +90,7 @@ public int Count ///when we decide to close the connection. public void AutoCloseConnection() { - m_connection.Abort(Constants.ReplySuccess, "AutoClose", ShutdownInitiator.Library, Timeout.Infinite); + m_connection.Abort(Constants.ReplySuccess, "AutoClose", ShutdownInitiator.Library, Timeout.InfiniteTimeSpan); } ///If m_autoClose and there are no active sessions diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs index e67cddd20e..dc5312c823 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs @@ -54,9 +54,9 @@ static class TaskExtensions { public static Task CompletedTask = Task.FromResult(0); - public static async Task TimeoutAfter(this Task task, int millisecondsTimeout) + public static async Task TimeoutAfter(this Task task, TimeSpan timeout) { - if (task == await Task.WhenAny(task, Task.Delay(millisecondsTimeout)).ConfigureAwait(false)) + if (task == await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false)) await task; else { @@ -73,7 +73,8 @@ public class SocketFrameHandler : IFrameHandler // Socket poll timeout in ms. If the socket does not // become writeable in this amount of time, we throw // an exception. - private int m_writeableStateTimeout = 30000; + private TimeSpan m_writeableStateTimeout = TimeSpan.FromSeconds(30); + private int m_writeableStateTimeoutMicroSeconds; private readonly NetworkBinaryReader m_reader; private readonly ITcpClient m_socket; private readonly NetworkBinaryWriter m_writer; @@ -83,7 +84,7 @@ public class SocketFrameHandler : IFrameHandler private bool _ssl = false; public SocketFrameHandler(AmqpTcpEndpoint endpoint, Func socketFactory, - int connectionTimeout, int readTimeout, int writeTimeout) + TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout) { Endpoint = endpoint; @@ -103,8 +104,8 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint, } Stream netstream = m_socket.GetStream(); - netstream.ReadTimeout = readTimeout; - netstream.WriteTimeout = writeTimeout; + netstream.ReadTimeout = (int)readTimeout.TotalMilliseconds; + netstream.WriteTimeout = (int)writeTimeout.TotalMilliseconds; if (endpoint.Ssl.Enabled) { @@ -122,7 +123,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint, m_reader = new NetworkBinaryReader(new BufferedStream(netstream, m_socket.Client.ReceiveBufferSize)); m_writer = new NetworkBinaryWriter(netstream); - m_writeableStateTimeout = writeTimeout; + WriteTimeout = writeTimeout; } public AmqpTcpEndpoint Endpoint { get; set; } @@ -146,7 +147,7 @@ public int RemotePort get { return ((IPEndPoint)LocalEndPoint).Port; } } - public int ReadTimeout + public TimeSpan ReadTimeout { set { @@ -164,12 +165,13 @@ public int ReadTimeout } } - public int WriteTimeout + public TimeSpan WriteTimeout { set { m_writeableStateTimeout = value; - m_socket.Client.SendTimeout = value; + m_socket.Client.SendTimeout = (int)m_writeableStateTimeout.TotalMilliseconds; + m_writeableStateTimeoutMicroSeconds = m_socket.Client.SendTimeout * 1000; } } @@ -229,7 +231,7 @@ public void WriteFrame(OutboundFrame frame) var ms = new MemoryStream(); var nbw = new NetworkBinaryWriter(ms); frame.WriteTo(nbw); - m_socket.Client.Poll(m_writeableStateTimeout, SelectMode.SelectWrite); + m_socket.Client.Poll(m_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite); Write(ms.GetBufferSegment()); } @@ -241,7 +243,7 @@ public void WriteFrameSet(IList frames) { frames[i].WriteTo(nbw); } - m_socket.Client.Poll(m_writeableStateTimeout, SelectMode.SelectWrite); + m_socket.Client.Poll(m_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite); Write(ms.GetBufferSegment()); } @@ -267,21 +269,21 @@ private bool ShouldTryIPv6(AmqpTcpEndpoint endpoint) private ITcpClient ConnectUsingIPv6(AmqpTcpEndpoint endpoint, Func socketFactory, - int timeout) + TimeSpan timeout) { return ConnectUsingAddressFamily(endpoint, socketFactory, timeout, AddressFamily.InterNetworkV6); } private ITcpClient ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func socketFactory, - int timeout) + TimeSpan timeout) { return ConnectUsingAddressFamily(endpoint, socketFactory, timeout, AddressFamily.InterNetwork); } private ITcpClient ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func socketFactory, - int timeout, AddressFamily family) + TimeSpan timeout, AddressFamily family) { ITcpClient socket = socketFactory(family); try { @@ -293,7 +295,7 @@ private ITcpClient ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, } } - private void ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, int timeout) + private void ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, TimeSpan timeout) { try { @@ -322,4 +324,4 @@ private void ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, int time } } } -} +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/impl/TcpClientAdapter.cs b/projects/client/RabbitMQ.Client/src/client/impl/TcpClientAdapter.cs index ebd189affb..3ba2981b83 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/TcpClientAdapter.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/TcpClientAdapter.cs @@ -88,17 +88,17 @@ public virtual bool Connected } } - public virtual int ReceiveTimeout + public virtual TimeSpan ReceiveTimeout { get { AssertSocket(); - return sock.ReceiveTimeout; + return TimeSpan.FromMilliseconds(sock.ReceiveTimeout); } set { AssertSocket(); - sock.ReceiveTimeout = value; + sock.ReceiveTimeout = (int)value.TotalMilliseconds; } } diff --git a/projects/client/RabbitMQ.Client/src/util/BlockingCell.cs b/projects/client/RabbitMQ.Client/src/util/BlockingCell.cs index 33b4c84786..c1fb62f6bb 100644 --- a/projects/client/RabbitMQ.Client/src/util/BlockingCell.cs +++ b/projects/client/RabbitMQ.Client/src/util/BlockingCell.cs @@ -89,25 +89,7 @@ public T WaitForValue(TimeSpan timeout) } throw new TimeoutException(); } - ///Retrieve the cell's value, waiting for the given - ///timeout if no value is immediately available. - /// - /// - /// If a value is present in the cell at the time the call is - /// made, the call will return immediately. Otherwise, the - /// calling thread blocks until either a value appears, or - /// operation times out. - /// - /// - /// If no value was available before the timeout, an exception - /// is thrown. - /// - /// - /// - public T WaitForValue(int timeout) - { - return WaitForValue(TimeSpan.FromMilliseconds(timeout)); - } + ///Retrieve the cell's value, blocking if none exists ///at present, or supply a value to an empty cell, thereby ///filling it. @@ -116,13 +98,5 @@ public T WaitForValue() { return WaitForValue(TimeSpan.FromMinutes(60)); } - - ///Return valid timeout value - ///If value of the parameter is less then zero, return 0 - ///to mean infinity - public static int validatedTimeout(int timeout) - { - return (timeout != Timeout.Infinite) && (timeout < 0) ? 0 : timeout; - } } -} +} \ No newline at end of file diff --git a/projects/client/Unit/src/unit/APIApproval.Approve.approved.txt b/projects/client/Unit/src/unit/APIApproval.Approve.approved.txt index bd6a7e1e4b..0687b48ee1 100644 --- a/projects/client/Unit/src/unit/APIApproval.Approve.approved.txt +++ b/projects/client/Unit/src/unit/APIApproval.Approve.approved.txt @@ -85,9 +85,9 @@ namespace RabbitMQ.Client { public static readonly System.Collections.Generic.IList DefaultAuthMechanisms; public const ushort DefaultChannelMax = 2047; - public const int DefaultConnectionTimeout = 30000; + public static readonly System.TimeSpan DefaultConnectionTimeout; public const uint DefaultFrameMax = 0u; - public const ushort DefaultHeartbeat = 60; + public static readonly System.TimeSpan DefaultHeartbeat; public const string DefaultPass = "guest"; public const string DefaultUser = "guest"; public const string DefaultVHost = "/"; @@ -109,11 +109,11 @@ namespace RabbitMQ.Client public string Password { get; set; } public int Port { get; set; } public ushort RequestedChannelMax { get; set; } - public int RequestedConnectionTimeout { get; set; } + public System.TimeSpan RequestedConnectionTimeout { get; set; } public uint RequestedFrameMax { get; set; } - public ushort RequestedHeartbeat { get; set; } - public int SocketReadTimeout { get; set; } - public int SocketWriteTimeout { get; set; } + public System.TimeSpan RequestedHeartbeat { get; set; } + public System.TimeSpan SocketReadTimeout { get; set; } + public System.TimeSpan SocketWriteTimeout { get; set; } public RabbitMQ.Client.SslOption Ssl { get; set; } public bool TopologyRecoveryEnabled { get; set; } public System.Uri Uri { get; set; } @@ -298,7 +298,7 @@ namespace RabbitMQ.Client RabbitMQ.Client.ConsumerWorkService ConsumerWorkService { get; } RabbitMQ.Client.AmqpTcpEndpoint Endpoint { get; } uint FrameMax { get; } - ushort Heartbeat { get; } + System.TimeSpan Heartbeat { get; } bool IsOpen { get; } RabbitMQ.Client.AmqpTcpEndpoint[] KnownHosts { get; } RabbitMQ.Client.IProtocol Protocol { get; } @@ -312,14 +312,12 @@ namespace RabbitMQ.Client public event System.EventHandler RecoverySucceeded; void Abort(); void Abort(ushort reasonCode, string reasonText); - void Abort(int timeout); void Abort(System.TimeSpan timeout); - void Abort(ushort reasonCode, string reasonText, int timeout); void Abort(ushort reasonCode, string reasonText, System.TimeSpan timeout); void Close(); void Close(ushort reasonCode, string reasonText); - void Close(int timeout); - void Close(ushort reasonCode, string reasonText, int timeout); + void Close(System.TimeSpan timeout); + void Close(ushort reasonCode, string reasonText, System.TimeSpan timeout); RabbitMQ.Client.IModel CreateModel(); void HandleConnectionBlocked(string reason); void HandleConnectionUnblocked(); @@ -334,7 +332,7 @@ namespace RabbitMQ.Client string Password { get; set; } ushort RequestedChannelMax { get; set; } uint RequestedFrameMax { get; set; } - ushort RequestedHeartbeat { get; set; } + System.TimeSpan RequestedHeartbeat { get; set; } System.Uri Uri { get; set; } bool UseBackgroundThreadsForIO { get; set; } string UserName { get; set; } @@ -515,7 +513,7 @@ namespace RabbitMQ.Client { System.Net.Sockets.Socket Client { get; } bool Connected { get; } - int ReceiveTimeout { get; set; } + System.TimeSpan ReceiveTimeout { get; set; } void Close(); System.Threading.Tasks.Task ConnectAsync(string host, int port); System.Net.Sockets.NetworkStream GetStream(); @@ -614,7 +612,7 @@ namespace RabbitMQ.Client public TcpClientAdapter(System.Net.Sockets.Socket socket) { } public virtual System.Net.Sockets.Socket Client { get; } public virtual bool Connected { get; } - public virtual int ReceiveTimeout { get; set; } + public virtual System.TimeSpan ReceiveTimeout { get; set; } public virtual void Close() { } public virtual System.Threading.Tasks.Task ConnectAsync(string host, int port) { } [System.ObsoleteAttribute("Override Dispose(bool) instead.")] @@ -1492,7 +1490,7 @@ namespace RabbitMQ.Client.Framing.Impl public RabbitMQ.Client.ConsumerWorkService ConsumerWorkService { get; } public RabbitMQ.Client.AmqpTcpEndpoint Endpoint { get; } public uint FrameMax { get; set; } - public ushort Heartbeat { get; set; } + public System.TimeSpan Heartbeat { get; set; } public System.Guid Id { get; } public bool IsOpen { get; } public RabbitMQ.Client.AmqpTcpEndpoint[] KnownHosts { get; set; } @@ -1509,19 +1507,17 @@ namespace RabbitMQ.Client.Framing.Impl public event System.EventHandler ConnectionShutdown; public event System.EventHandler ConnectionUnblocked; public event System.EventHandler RecoverySucceeded; - public void Abort(ushort reasonCode, string reasonText, RabbitMQ.Client.ShutdownInitiator initiator, int timeout) { } + public void Abort(ushort reasonCode, string reasonText, RabbitMQ.Client.ShutdownInitiator initiator, System.TimeSpan timeout) { } public void Abort() { } public void Abort(ushort reasonCode, string reasonText) { } - public void Abort(int timeout) { } public void Abort(System.TimeSpan timeout) { } - public void Abort(ushort reasonCode, string reasonText, int timeout) { } public void Abort(ushort reasonCode, string reasonText, System.TimeSpan timeout) { } public void Close(RabbitMQ.Client.ShutdownEventArgs reason) { } - public void Close(RabbitMQ.Client.ShutdownEventArgs reason, bool abort, int timeout) { } + public void Close(RabbitMQ.Client.ShutdownEventArgs reason, bool abort, System.TimeSpan timeout) { } public void Close() { } public void Close(ushort reasonCode, string reasonText) { } - public void Close(int timeout) { } - public void Close(ushort reasonCode, string reasonText, int timeout) { } + public void Close(System.TimeSpan timeout) { } + public void Close(ushort reasonCode, string reasonText, System.TimeSpan timeout) { } public void ClosingLoop() { } public RabbitMQ.Client.Impl.Command ConnectionCloseWrapper(ushort reasonCode, string reasonText) { } public RabbitMQ.Client.IModel CreateModel() { } @@ -1561,7 +1557,7 @@ namespace RabbitMQ.Client.Framing.Impl } public class static IProtocolExtensions { - public static RabbitMQ.Client.Impl.IFrameHandler CreateFrameHandler(this RabbitMQ.Client.IProtocol protocol, RabbitMQ.Client.AmqpTcpEndpoint endpoint, System.Func socketFactory, int connectionTimeout, int readTimeout, int writeTimeout) { } + public static RabbitMQ.Client.Impl.IFrameHandler CreateFrameHandler(this RabbitMQ.Client.IProtocol protocol, RabbitMQ.Client.AmqpTcpEndpoint endpoint, System.Func socketFactory, System.TimeSpan connectionTimeout, System.TimeSpan readTimeout, System.TimeSpan writeTimeout) { } } public abstract class ProtocolBase : RabbitMQ.Client.IProtocol { @@ -1707,7 +1703,7 @@ namespace RabbitMQ.Client.Impl { public ushort m_channelMax; public uint m_frameMax; - public ushort m_heartbeat; + public ushort m_heartbeatInSeconds; } public abstract class ContentHeaderBase : RabbitMQ.Client.IContentHeader { @@ -1816,10 +1812,10 @@ namespace RabbitMQ.Client.Impl RabbitMQ.Client.AmqpTcpEndpoint Endpoint { get; } System.Net.EndPoint LocalEndPoint { get; } int LocalPort { get; } - int ReadTimeout { set; } + System.TimeSpan ReadTimeout { set; } System.Net.EndPoint RemoteEndPoint { get; } int RemotePort { get; } - int WriteTimeout { set; } + System.TimeSpan WriteTimeout { set; } void Close(); RabbitMQ.Client.Impl.InboundFrame ReadFrame(); void SendHeader(); @@ -2078,7 +2074,7 @@ namespace RabbitMQ.Client.Impl public void HandleConnectionOpenOk(string knownHosts) { } public void HandleConnectionSecure(byte[] challenge) { } public void HandleConnectionStart(byte versionMajor, byte versionMinor, System.Collections.Generic.IDictionary serverProperties, byte[] mechanisms, byte[] locales) { } - public void HandleConnectionTune(ushort channelMax, uint frameMax, ushort heartbeat) { } + public void HandleConnectionTune(ushort channelMax, uint frameMax, ushort heartbeatInSeconds) { } public void HandleConnectionUnblocked() { } public void HandleQueueDeclareOk(string queue, uint messageCount, uint consumerCount) { } protected void Initialise(RabbitMQ.Client.Impl.ISession session) { } @@ -2255,14 +2251,14 @@ namespace RabbitMQ.Client.Impl } public class SocketFrameHandler : RabbitMQ.Client.Impl.IFrameHandler { - public SocketFrameHandler(RabbitMQ.Client.AmqpTcpEndpoint endpoint, System.Func socketFactory, int connectionTimeout, int readTimeout, int writeTimeout) { } + public SocketFrameHandler(RabbitMQ.Client.AmqpTcpEndpoint endpoint, System.Func socketFactory, System.TimeSpan connectionTimeout, System.TimeSpan readTimeout, System.TimeSpan writeTimeout) { } public RabbitMQ.Client.AmqpTcpEndpoint Endpoint { get; set; } public System.Net.EndPoint LocalEndPoint { get; } public int LocalPort { get; } - public int ReadTimeout { set; } + public System.TimeSpan ReadTimeout { set; } public System.Net.EndPoint RemoteEndPoint { get; } public int RemotePort { get; } - public int WriteTimeout { set; } + public System.TimeSpan WriteTimeout { set; } public void Close() { } public RabbitMQ.Client.Impl.InboundFrame ReadFrame() { } public void SendHeader() { } @@ -2391,9 +2387,7 @@ namespace RabbitMQ.Util public BlockingCell() { } public void ContinueWithValue(T value) { } public T WaitForValue(System.TimeSpan timeout) { } - public T WaitForValue(int timeout) { } public T WaitForValue() { } - public static int validatedTimeout(int timeout) { } } public class static DebugUtil { diff --git a/projects/client/Unit/src/unit/Fixtures.cs b/projects/client/Unit/src/unit/Fixtures.cs index 3071528a53..28274cb0a4 100755 --- a/projects/client/Unit/src/unit/Fixtures.cs +++ b/projects/client/Unit/src/unit/Fixtures.cs @@ -122,7 +122,7 @@ internal AutorecoveringConnection CreateAutorecoveringConnection(TimeSpan interv cf.AutomaticRecoveryEnabled = true; // tests that use this helper will likely list unreachable hosts, // make sure we time out quickly on those - cf.RequestedConnectionTimeout = 1000; + cf.RequestedConnectionTimeout = TimeSpan.FromSeconds(1); cf.NetworkRecoveryInterval = interval; return (AutorecoveringConnection)cf.CreateConnection(hostnames, $"UNIT_CONN:{Guid.NewGuid()}"); } @@ -133,7 +133,7 @@ internal AutorecoveringConnection CreateAutorecoveringConnection(IList { public BlockingCell m_k; - public int m_delayMs; + public TimeSpan m_delay; public T m_v; public void Run() { - Thread.Sleep(m_delayMs); + Thread.Sleep(m_delay); m_k.ContinueWithValue(m_v); } } - public static void SetAfter(int delayMs, BlockingCell k, T v) + public static void SetAfter(TimeSpan delay, BlockingCell k, T v) { var ds = new DelayedSetter(); ds.m_k = k; - ds.m_delayMs = delayMs; + ds.m_delay = delay; ds.m_v = v; new Thread(new ThreadStart(ds.Run)).Start(); } @@ -78,9 +78,9 @@ public void ResetTimer() m_startTime = DateTime.Now; } - public int ElapsedMs() + public TimeSpan ElapsedMs() { - return (int)((DateTime.Now - m_startTime).TotalMilliseconds); + return DateTime.Now - m_startTime; } [Test] @@ -116,7 +116,7 @@ public void TestGetValueWhichDoesTimeOutWithTimeSpan() { var k = new BlockingCell(); ResetTimer(); - Assert.Throws(() => k.WaitForValue(TimeSpan.FromMilliseconds(TimingInterval))); + Assert.Throws(() => k.WaitForValue(TimingInterval)); } [Test] @@ -126,7 +126,7 @@ public void TestGetValueWithTimeoutInfinite() SetAfter(TimingInterval, k, 123); ResetTimer(); - var v = k.WaitForValue(Timeout.Infinite); + var v = k.WaitForValue(Timeout.InfiniteTimeSpan); Assert.Less(TimingInterval - SafetyMargin, ElapsedMs()); Assert.AreEqual(123, v); } @@ -150,19 +150,7 @@ public void TestBackgroundUpdateSucceedsWithTimeSpan() SetAfter(TimingInterval, k, 123); ResetTimer(); - var v = k.WaitForValue(TimeSpan.FromMilliseconds(TimingInterval * 2)); - Assert.Less(TimingInterval - SafetyMargin, ElapsedMs()); - Assert.AreEqual(123, v); - } - - [Test] - public void TestBackgroundUpdateSucceedsWithInfiniteTimeout() - { - var k = new BlockingCell(); - SetAfter(TimingInterval, k, 123); - - ResetTimer(); - var v = k.WaitForValue(Timeout.Infinite); + var v = k.WaitForValue(TimingInterval * 2); Assert.Less(TimingInterval - SafetyMargin, ElapsedMs()); Assert.AreEqual(123, v); } @@ -174,7 +162,7 @@ public void TestBackgroundUpdateSucceedsWithInfiniteTimeoutTimeSpan() SetAfter(TimingInterval, k, 123); ResetTimer(); - var infiniteTimeSpan =new TimeSpan(0, 0, 0, 0, Timeout.Infinite); + var infiniteTimeSpan = Timeout.InfiniteTimeSpan; var v = k.WaitForValue(infiniteTimeSpan); Assert.Less(TimingInterval - SafetyMargin, ElapsedMs()); Assert.AreEqual(123, v); diff --git a/projects/client/Unit/src/unit/TestFloodPublishing.cs b/projects/client/Unit/src/unit/TestFloodPublishing.cs index f392d472d8..fac0272e26 100755 --- a/projects/client/Unit/src/unit/TestFloodPublishing.cs +++ b/projects/client/Unit/src/unit/TestFloodPublishing.cs @@ -56,7 +56,7 @@ public override void Init() { var connFactory = new ConnectionFactory() { - RequestedHeartbeat = 60, + RequestedHeartbeat = TimeSpan.FromSeconds(60), AutomaticRecoveryEnabled = false }; Conn = connFactory.CreateConnection(); diff --git a/projects/client/Unit/src/unit/TestHeartbeats.cs b/projects/client/Unit/src/unit/TestHeartbeats.cs index aeebb28291..616a800a33 100644 --- a/projects/client/Unit/src/unit/TestHeartbeats.cs +++ b/projects/client/Unit/src/unit/TestHeartbeats.cs @@ -48,7 +48,7 @@ namespace RabbitMQ.Client.Unit [TestFixture] internal class TestHeartbeats : IntegrationFixture { - private const UInt16 heartbeatTimeout = 2; + private readonly TimeSpan heartbeatTimeout = TimeSpan.FromSeconds(2); [Test, Category("LongRunning"), MaxTimeAttribute(35000)] public void TestThatHeartbeatWriterUsesConfigurableInterval() @@ -101,7 +101,11 @@ public void TestHundredsOfConnectionsWithRandomHeartbeatInterval() for (var i = 0; i < 200; i++) { var n = Convert.ToUInt16(rnd.Next(2, 6)); - var cf = new ConnectionFactory() { RequestedHeartbeat = n, AutomaticRecoveryEnabled = false }; + var cf = new ConnectionFactory() + { + RequestedHeartbeat = TimeSpan.FromSeconds(n), + AutomaticRecoveryEnabled = false + }; var conn = cf.CreateConnection(); xs.Add(conn); var ch = conn.CreateModel(); diff --git a/projects/client/Unit/src/unit/TestSharedQueue.cs b/projects/client/Unit/src/unit/TestSharedQueue.cs index 26da803458..a919735dce 100644 --- a/projects/client/Unit/src/unit/TestSharedQueue.cs +++ b/projects/client/Unit/src/unit/TestSharedQueue.cs @@ -95,9 +95,9 @@ public void BackgroundEofExpectingDequeue() } } - public static void EnqueueAfter(int delayMs, SharedQueue queue, object v) + public static void EnqueueAfter(TimeSpan delay, SharedQueue queue, object v) { - var enqueuer = new DelayedEnqueuer(queue, delayMs, v); + var enqueuer = new DelayedEnqueuer(queue, (int)delay.TotalMilliseconds, v); new Thread(enqueuer.EnqueueValue).Start(); } @@ -120,9 +120,9 @@ public void ResetTimer() m_startTime = DateTime.Now; } - public int ElapsedMs() + public TimeSpan ElapsedMs() { - return (int) ((DateTime.Now - m_startTime).TotalMilliseconds); + return DateTime.Now - m_startTime; } [Test] @@ -133,7 +133,7 @@ public void TestBgLong() ResetTimer(); object v; - bool r = q.Dequeue(TimingInterval, out v); + bool r = q.Dequeue((int)TimingInterval.TotalMilliseconds, out v); Assert.Greater(TimingInterval + SafetyMargin, ElapsedMs()); Assert.IsTrue(!r); Assert.AreEqual(null, v); @@ -147,7 +147,7 @@ public void TestBgShort() ResetTimer(); object v; - bool r = q.Dequeue(TimingInterval*2, out v); + bool r = q.Dequeue((int)TimingInterval.TotalMilliseconds * 2, out v); Assert.Less(TimingInterval - SafetyMargin, ElapsedMs()); Assert.IsTrue(r); Assert.AreEqual(123, v); @@ -244,13 +244,14 @@ public void TestDoubleBg() object v; bool r; - r = q.Dequeue(TimingInterval*2, out v); + // TODO LRB + r = q.Dequeue((int)TimingInterval.TotalMilliseconds * 2, out v); Assert.Less(TimingInterval - SafetyMargin, ElapsedMs()); Assert.Greater(TimingInterval + SafetyMargin, ElapsedMs()); Assert.IsTrue(r); Assert.AreEqual(123, v); - r = q.Dequeue(TimingInterval*2, out v); + r = q.Dequeue((int)TimingInterval.TotalMilliseconds * 2, out v); Assert.Less(TimingInterval*2 - SafetyMargin, ElapsedMs()); Assert.Greater(TimingInterval*2 + SafetyMargin, ElapsedMs()); Assert.IsTrue(r); @@ -267,13 +268,14 @@ public void TestDoublePoll() object v; bool r; - r = q.Dequeue(TimingInterval, out v); + // TODO LRB + r = q.Dequeue((int)TimingInterval.TotalMilliseconds, out v); Assert.Less(TimingInterval - SafetyMargin, ElapsedMs()); Assert.Greater(TimingInterval + SafetyMargin, ElapsedMs()); Assert.IsTrue(!r); Assert.AreEqual(null, v); - r = q.Dequeue(TimingInterval*2, out v); + r = q.Dequeue((int)TimingInterval.TotalMilliseconds * 2, out v); Assert.Less(TimingInterval*2 - SafetyMargin, ElapsedMs()); Assert.Greater(TimingInterval*2 + SafetyMargin, ElapsedMs()); Assert.IsTrue(r); @@ -320,7 +322,7 @@ public void TestTimeoutLong() ResetTimer(); object v; - bool r = q.Dequeue(TimingInterval, out v); + bool r = q.Dequeue((int)TimingInterval.TotalMilliseconds, out v); Assert.Less(TimingInterval - SafetyMargin, ElapsedMs()); Assert.IsTrue(!r); Assert.AreEqual(null, v); @@ -347,7 +349,7 @@ public void TestTimeoutShort() ResetTimer(); object v; - bool r = q.Dequeue(TimingInterval, out v); + bool r = q.Dequeue((int)TimingInterval.TotalMilliseconds, out v); Assert.Greater(SafetyMargin, ElapsedMs()); Assert.IsTrue(r); Assert.AreEqual(123, v); From a1ed8a7509e4d0079a4ee5fe01b574a79630dcd9 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 21 Jan 2020 10:28:40 -0800 Subject: [PATCH 2/2] Remove last TODOs --- .../src/client/impl/Connection.cs | 2 +- .../RabbitMQ.Client/src/util/SharedQueue.cs | 12 ++++----- .../src/unit/APIApproval.Approve.approved.txt | 2 +- .../client/Unit/src/unit/TestSharedQueue.cs | 26 +++++++++---------- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs b/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs index 4fdd8a1c7c..e06d77ea16 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs @@ -1344,7 +1344,7 @@ void StartAndTune() connectionTune.m_frameMax); FrameMax = frameMax; - TimeSpan requestedHeartbeat = m_factory.RequestedHeartbeat; // TODO LRB + TimeSpan requestedHeartbeat = m_factory.RequestedHeartbeat; var heartbeatInSeconds = NegotiatedMaxValue((uint)requestedHeartbeat.TotalSeconds, (uint)connectionTune.m_heartbeatInSeconds); Heartbeat = TimeSpan.FromSeconds(heartbeatInSeconds); diff --git a/projects/client/RabbitMQ.Client/src/util/SharedQueue.cs b/projects/client/RabbitMQ.Client/src/util/SharedQueue.cs index d62f0a6667..bab29016fc 100644 --- a/projects/client/RabbitMQ.Client/src/util/SharedQueue.cs +++ b/projects/client/RabbitMQ.Client/src/util/SharedQueue.cs @@ -120,7 +120,7 @@ public T Dequeue() /// false, and sets "result" to null. /// /// - /// A timeout of -1 (i.e. System.Threading.Timeout.Infinite) + /// A timeout of -1 (i.e. System.Threading.Timeout.InfiniteTimeSpan) /// will be interpreted as a command to wait for an /// indefinitely long period of time for an item to become /// available. Usage of such a timeout is equivalent to @@ -135,9 +135,9 @@ public T Dequeue() /// method will throw EndOfStreamException. /// /// - public bool Dequeue(int millisecondsTimeout, out T result) + public bool Dequeue(TimeSpan timeout, out T result) { - if (millisecondsTimeout == Timeout.Infinite) + if (timeout == Timeout.InfiniteTimeSpan) { result = Dequeue(); return true; @@ -149,9 +149,9 @@ public bool Dequeue(int millisecondsTimeout, out T result) while (m_queue.Count == 0) { EnsureIsOpen(); - var elapsedTime = (int)((DateTime.Now - startTime).TotalMilliseconds); - int remainingTime = millisecondsTimeout - elapsedTime; - if (remainingTime <= 0) + TimeSpan elapsedTime = DateTime.Now.Subtract(startTime); + TimeSpan remainingTime = timeout.Subtract(elapsedTime); + if (remainingTime <= TimeSpan.Zero) { result = default(T); return false; diff --git a/projects/client/Unit/src/unit/APIApproval.Approve.approved.txt b/projects/client/Unit/src/unit/APIApproval.Approve.approved.txt index 0687b48ee1..0baa2360f9 100644 --- a/projects/client/Unit/src/unit/APIApproval.Approve.approved.txt +++ b/projects/client/Unit/src/unit/APIApproval.Approve.approved.txt @@ -2480,7 +2480,7 @@ namespace RabbitMQ.Util public SharedQueue() { } public void Close() { } public T Dequeue() { } - public bool Dequeue(int millisecondsTimeout, out T result) { } + public bool Dequeue(System.TimeSpan timeout, out T result) { } public T DequeueNoWait(T defaultValue) { } public void Enqueue(T o) { } } diff --git a/projects/client/Unit/src/unit/TestSharedQueue.cs b/projects/client/Unit/src/unit/TestSharedQueue.cs index a919735dce..08358b6d6f 100644 --- a/projects/client/Unit/src/unit/TestSharedQueue.cs +++ b/projects/client/Unit/src/unit/TestSharedQueue.cs @@ -86,7 +86,7 @@ public void DequeueNoWaitZero() public void DequeueAfterOneIntoV() { - m_q.Dequeue(1, out m_v); + m_q.Dequeue(TimeSpan.FromMilliseconds(1), out m_v); } public void BackgroundEofExpectingDequeue() @@ -133,7 +133,7 @@ public void TestBgLong() ResetTimer(); object v; - bool r = q.Dequeue((int)TimingInterval.TotalMilliseconds, out v); + bool r = q.Dequeue(TimingInterval, out v); Assert.Greater(TimingInterval + SafetyMargin, ElapsedMs()); Assert.IsTrue(!r); Assert.AreEqual(null, v); @@ -147,7 +147,7 @@ public void TestBgShort() ResetTimer(); object v; - bool r = q.Dequeue((int)TimingInterval.TotalMilliseconds * 2, out v); + bool r = q.Dequeue(TimingInterval * 2, out v); Assert.Less(TimingInterval - SafetyMargin, ElapsedMs()); Assert.IsTrue(r); Assert.AreEqual(123, v); @@ -177,7 +177,7 @@ public void TestCloseWhenFull() ExpectEof(de.EnqueueValue); Assert.AreEqual(1, q.Dequeue()); Assert.AreEqual(2, q.DequeueNoWait(0)); - bool r = q.Dequeue(1, out v); + bool r = q.Dequeue(TimeSpan.FromMilliseconds(1), out v); Assert.IsTrue(r); Assert.AreEqual(3, v); ExpectEof(de.Dequeue); @@ -244,14 +244,13 @@ public void TestDoubleBg() object v; bool r; - // TODO LRB - r = q.Dequeue((int)TimingInterval.TotalMilliseconds * 2, out v); + r = q.Dequeue(TimingInterval * 2, out v); Assert.Less(TimingInterval - SafetyMargin, ElapsedMs()); Assert.Greater(TimingInterval + SafetyMargin, ElapsedMs()); Assert.IsTrue(r); Assert.AreEqual(123, v); - r = q.Dequeue((int)TimingInterval.TotalMilliseconds * 2, out v); + r = q.Dequeue(TimingInterval * 2, out v); Assert.Less(TimingInterval*2 - SafetyMargin, ElapsedMs()); Assert.Greater(TimingInterval*2 + SafetyMargin, ElapsedMs()); Assert.IsTrue(r); @@ -268,14 +267,13 @@ public void TestDoublePoll() object v; bool r; - // TODO LRB - r = q.Dequeue((int)TimingInterval.TotalMilliseconds, out v); + r = q.Dequeue(TimingInterval, out v); Assert.Less(TimingInterval - SafetyMargin, ElapsedMs()); Assert.Greater(TimingInterval + SafetyMargin, ElapsedMs()); Assert.IsTrue(!r); Assert.AreEqual(null, v); - r = q.Dequeue((int)TimingInterval.TotalMilliseconds * 2, out v); + r = q.Dequeue(TimingInterval * 2, out v); Assert.Less(TimingInterval*2 - SafetyMargin, ElapsedMs()); Assert.Greater(TimingInterval*2 + SafetyMargin, ElapsedMs()); Assert.IsTrue(r); @@ -309,7 +307,7 @@ public void TestTimeoutInfinite() ResetTimer(); object v; - bool r = q.Dequeue(Timeout.Infinite, out v); + bool r = q.Dequeue(Timeout.InfiniteTimeSpan, out v); Assert.Less(TimingInterval - SafetyMargin, ElapsedMs()); Assert.IsTrue(r); Assert.AreEqual(123, v); @@ -322,7 +320,7 @@ public void TestTimeoutLong() ResetTimer(); object v; - bool r = q.Dequeue((int)TimingInterval.TotalMilliseconds, out v); + bool r = q.Dequeue(TimingInterval, out v); Assert.Less(TimingInterval - SafetyMargin, ElapsedMs()); Assert.IsTrue(!r); Assert.AreEqual(null, v); @@ -335,7 +333,7 @@ public void TestTimeoutNegative() ResetTimer(); object v; - bool r = q.Dequeue(-10000, out v); + bool r = q.Dequeue(TimeSpan.FromMilliseconds(-10000), out v); Assert.Greater(SafetyMargin, ElapsedMs()); Assert.IsTrue(!r); Assert.AreEqual(null, v); @@ -349,7 +347,7 @@ public void TestTimeoutShort() ResetTimer(); object v; - bool r = q.Dequeue((int)TimingInterval.TotalMilliseconds, out v); + bool r = q.Dequeue(TimingInterval, out v); Assert.Greater(SafetyMargin, ElapsedMs()); Assert.IsTrue(r); Assert.AreEqual(123, v);