diff --git a/RabbitMQ.Stream.Client/ConnectionsPool.cs b/RabbitMQ.Stream.Client/ConnectionsPool.cs index 5eb0a0c4..2ed1028f 100644 --- a/RabbitMQ.Stream.Client/ConnectionsPool.cs +++ b/RabbitMQ.Stream.Client/ConnectionsPool.cs @@ -32,6 +32,19 @@ public class ConnectionPoolConfig public byte ProducersPerConnection { get; set; } = 1; } +public class LastSecret +{ + public string Secret { get; private set; } = string.Empty; + public DateTime LastUpdate { get; private set; } = DateTime.MinValue; + public bool IsValid => LastUpdate > DateTime.MinValue && !string.IsNullOrEmpty(Secret); + + public void Update(string secret) + { + Secret = secret; + LastUpdate = DateTime.UtcNow; + } +} + public class ConnectionItem { public ConnectionItem(string brokerInfo, byte idsPerConnection, IClient client) @@ -113,6 +126,7 @@ internal static byte FindNextValidId(List ids, byte nextId = 0) private readonly int _maxConnections; private readonly byte _idsPerConnection; private readonly SemaphoreSlim _semaphoreSlim = new(1, 1); + private readonly LastSecret _lastSecret = new(); /// /// Init the pool with the max connections and the max ids per connection @@ -186,6 +200,21 @@ internal async Task GetOrCreateClient(string brokerInfo, Func void RabbitMQ.Stream.Client.ConnectionsPool.RemoveConsumerEntityFromStream(string clientId, byte id, string stream) -> void RabbitMQ.Stream.Client.ConnectionsPool.RemoveProducerEntityFromStream(string clientId, byte id, string stream) -> void +RabbitMQ.Stream.Client.ConnectionsPool.TryMergeClientParameters(RabbitMQ.Stream.Client.ClientParameters clientParameters, out RabbitMQ.Stream.Client.ClientParameters cp) -> bool RabbitMQ.Stream.Client.ConnectionsPool.UpdateSecrets(string newSecret) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.ConsumerEvents RabbitMQ.Stream.Client.ConsumerEvents.ConsumerEvents() -> void @@ -176,6 +177,12 @@ RabbitMQ.Stream.Client.ISuperStreamProducer.ReconnectPartition(RabbitMQ.Stream.C RabbitMQ.Stream.Client.KeyRoutingStrategy RabbitMQ.Stream.Client.KeyRoutingStrategy.KeyRoutingStrategy(System.Func routingKeyExtractor, System.Func> routingKeyQFunc, string superStream) -> void RabbitMQ.Stream.Client.KeyRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List partitions) -> System.Threading.Tasks.Task> +RabbitMQ.Stream.Client.LastSecret +RabbitMQ.Stream.Client.LastSecret.IsValid.get -> bool +RabbitMQ.Stream.Client.LastSecret.LastSecret() -> void +RabbitMQ.Stream.Client.LastSecret.LastUpdate.get -> System.DateTime +RabbitMQ.Stream.Client.LastSecret.Secret.get -> string +RabbitMQ.Stream.Client.LastSecret.Update(string secret) -> void RabbitMQ.Stream.Client.MessageContext.ChunkId.get -> ulong RabbitMQ.Stream.Client.MessageContext.ChunkMessagesCount.get -> uint RabbitMQ.Stream.Client.MessageContext.MessageContext(ulong offset, System.TimeSpan timestamp, uint chunkMessagesCount, ulong chunkId) -> void diff --git a/RabbitMQ.Stream.Client/RoutingClient.cs b/RabbitMQ.Stream.Client/RoutingClient.cs index cdd40ad1..dd131b3f 100644 --- a/RabbitMQ.Stream.Client/RoutingClient.cs +++ b/RabbitMQ.Stream.Client/RoutingClient.cs @@ -174,9 +174,16 @@ private static string GetPropertyValue(IDictionary connectionPro public static async Task LookupLeaderConnection(ClientParameters clientParameters, StreamInfo metaDataInfo, ConnectionsPool pool, ILogger logger = null) { + + if (pool.TryMergeClientParameters(clientParameters, out var mergedClientParameters)) + { + logger?.LogInformation("Leader Connection. Password changed Merged client parameters"); + } + return await pool.GetOrCreateClient(metaDataInfo.Leader.ToString(), async () => - await LookupConnection(clientParameters, metaDataInfo.Leader, MaxAttempts(metaDataInfo), logger) + await LookupConnection(mergedClientParameters, metaDataInfo.Leader, + MaxAttempts(metaDataInfo), logger) .ConfigureAwait(false)).ConfigureAwait(false); } @@ -202,9 +209,15 @@ public static async Task LookupLeaderOrRandomReplicasConnection(ClientP { try { + if (pool.TryMergeClientParameters(clientParameters, out var mergedClientParameters)) + { + logger?.LogInformation("Replicas Connections. Password changed Merged client parameters"); + } + return await pool.GetOrCreateClient(broker.ToString(), async () => - await LookupConnection(clientParameters, broker, MaxAttempts(metaDataInfo), + await LookupConnection(mergedClientParameters, broker, + MaxAttempts(metaDataInfo), logger) .ConfigureAwait(false)).ConfigureAwait(false); } diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index f2b0fd99..ffabd762 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -32,7 +32,8 @@ internal void Validate() /// public SslOption Ssl { get; set; } = new(); - public IList Endpoints { get; set; } = new List { new IPEndPoint(IPAddress.Loopback, 5552) }; + public IList Endpoints { get; set; } = + new List { new IPEndPoint(IPAddress.Loopback, 5552) }; public AddressResolver AddressResolver { get; set; } public string ClientProvidedName { get; set; } = "dotnet-stream-locator"; @@ -155,12 +156,22 @@ private async Task MayBeReconnectLocator() public async Task UpdateSecret(string newSecret) { + // store the old password just in case it will fail to update the secret + var oldSecret = _clientParameters.Password; + _clientParameters.Password = newSecret; + _client.Parameters.Password = newSecret; + await MayBeReconnectLocator().ConfigureAwait(false); if (_client.IsClosed) + { + // it can happen during some network problem or server rebooting + // even the _clientParameters.Password could be invalid we restore the + // the old one just to be consistent + _clientParameters.Password = oldSecret; + _client.Parameters.Password = oldSecret; throw new UpdateSecretFailureException("Cannot update a closed connection."); + } await _client.UpdateSecret(newSecret).ConfigureAwait(false); - _clientParameters.Password = newSecret; - _client.Parameters.Password = newSecret; await PoolConsumers.UpdateSecrets(newSecret).ConfigureAwait(false); await PoolProducers.UpdateSecrets(newSecret).ConfigureAwait(false); } @@ -373,7 +384,9 @@ public async Task CreateSuperStream(SuperStreamSpec spec) { spec.Validate(); await MayBeReconnectLocator().ConfigureAwait(false); - var response = await _client.CreateSuperStream(spec.Name, spec.GetPartitions(), spec.GetBindingKeys(), spec.Args).ConfigureAwait(false); + var response = await _client + .CreateSuperStream(spec.Name, spec.GetPartitions(), spec.GetBindingKeys(), spec.Args) + .ConfigureAwait(false); if (response.ResponseCode is ResponseCode.Ok or ResponseCode.StreamAlreadyExists) { return; @@ -473,7 +486,8 @@ public async Task DeleteSuperStream(string superStream) return; } - throw new DeleteStreamException($"Failed to delete super stream, error code: {response.ResponseCode.ToString()}"); + throw new DeleteStreamException( + $"Failed to delete super stream, error code: {response.ResponseCode.ToString()}"); } public async Task StreamStats(string stream) @@ -593,6 +607,7 @@ public StreamSystemInitialisationException(string error) : base(error) { } } + public class UpdateSecretFailureException : ProtocolException { public UpdateSecretFailureException(string s) diff --git a/Tests/SystemTests.cs b/Tests/SystemTests.cs index 4cac92b9..46138b49 100644 --- a/Tests/SystemTests.cs +++ b/Tests/SystemTests.cs @@ -181,19 +181,6 @@ await Assert.ThrowsAsync( await streamSystem.Close(); } - [Fact] - public async void UpdateSecretForClosedConnectionShouldThrowUpdateSecretFailureException() - { - var config = new StreamSystemConfig { UserName = "guest", Password = "guest" }; // specified for readability - var streamSystem = await StreamSystem.Create(config); - - await streamSystem.Close(); - await Assert.ThrowsAsync( - async () => { await streamSystem.UpdateSecret("guest"); } - ); - await streamSystem.Close(); - } - [Fact] public async void CreateExistStreamIdempotentShouldNoRaiseExceptions() {