diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index b6fc440e..511591d9 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -301,4 +301,4 @@ static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Clie static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.Reliable.ReliableBase.RandomWait() -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.RoutingHelper.LookupLeaderConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task -static RabbitMQ.Stream.Client.RoutingHelper.LookupRandomConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task +static RabbitMQ.Stream.Client.RoutingHelper.LookupLeaderOrRandomReplicasConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index 2a97b016..3f62ce4e 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -202,7 +202,7 @@ public static async Task Create( ) { var client = await RoutingHelper - .LookupRandomConnection(clientParameters, metaStreamInfo, config.Pool, logger) + .LookupLeaderOrRandomReplicasConnection(clientParameters, metaStreamInfo, config.Pool, logger) .ConfigureAwait(false); var consumer = new RawConsumer((Client)client, config, logger); await consumer.Init().ConfigureAwait(false); diff --git a/RabbitMQ.Stream.Client/RoutingClient.cs b/RabbitMQ.Stream.Client/RoutingClient.cs index 11c71afa..cdd40ad1 100644 --- a/RabbitMQ.Stream.Client/RoutingClient.cs +++ b/RabbitMQ.Stream.Client/RoutingClient.cs @@ -67,11 +67,13 @@ internal static async Task LookupConnection( // In this case we just return the node (leader for producer, random for consumer) // since there is not load balancer configuration - return await routing.CreateClient(clientParameters with - { - Endpoint = endPointNoLb, - ClientProvidedName = clientParameters.ClientProvidedName - }, broker, logger) + return await routing + .CreateClient( + clientParameters with + { + Endpoint = endPointNoLb, + ClientProvidedName = clientParameters.ClientProvidedName + }, broker, logger) .ConfigureAwait(false); } @@ -79,11 +81,13 @@ internal static async Task LookupConnection( // so there is a load-balancer or proxy we need to get the right connection // as first we try with the first node given from the LB var endPoint = clientParameters.AddressResolver.EndPoint; - var client = await routing.CreateClient(clientParameters with - { - Endpoint = endPoint, - ClientProvidedName = clientParameters.ClientProvidedName - }, broker, logger) + var client = await routing + .CreateClient( + clientParameters with + { + Endpoint = endPoint, + ClientProvidedName = clientParameters.ClientProvidedName + }, broker, logger) .ConfigureAwait(false); var advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host"); @@ -95,11 +99,13 @@ internal static async Task LookupConnection( attemptNo++; await client.Close("advertised_host or advertised_port doesn't match").ConfigureAwait(false); - client = await routing.CreateClient(clientParameters with - { - Endpoint = endPoint, - ClientProvidedName = clientParameters.ClientProvidedName - }, broker, logger) + client = await routing + .CreateClient( + clientParameters with + { + Endpoint = endPoint, + ClientProvidedName = clientParameters.ClientProvidedName + }, broker, logger) .ConfigureAwait(false); advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host"); @@ -175,13 +181,20 @@ await LookupConnection(clientParameters, metaDataInfo.Leader, MaxAttempts(metaDa } /// - /// Gets a random connection. The consumer can connect to a replica or leader. + /// Gets a random connection a random replica. + /// If the replicas are not available it will connect to the leader. /// - public static async Task LookupRandomConnection(ClientParameters clientParameters, + public static async Task LookupLeaderOrRandomReplicasConnection(ClientParameters clientParameters, StreamInfo metaDataInfo, ConnectionsPool pool, ILogger logger = null) { - var brokers = new List() { metaDataInfo.Leader }; + var brokers = new List(); + if (metaDataInfo.Replicas is { Count: <= 0 }) + { + brokers.Add(metaDataInfo.Leader); + } + brokers.AddRange(metaDataInfo.Replicas); + var exceptions = new List(); var br = brokers.OrderBy(x => Random.Shared.Next()).ToList(); diff --git a/Tests/ConnectionsPoolTests.cs b/Tests/ConnectionsPoolTests.cs index e84dea36..dfa86ea0 100644 --- a/Tests/ConnectionsPoolTests.cs +++ b/Tests/ConnectionsPoolTests.cs @@ -80,9 +80,9 @@ public async void RoutingShouldReturnTwoConnectionsGivenOneItemPerConnection() var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("localhost", 3939), new List()); var pool = new ConnectionsPool(0, 1); - var c1 = await RoutingHelper.LookupRandomConnection(clientParameters, metaDataInfo, pool); + var c1 = await RoutingHelper.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo, pool); c1.Consumers.Add(1, default); - var c2 = await RoutingHelper.LookupRandomConnection(clientParameters, metaDataInfo, pool); + var c2 = await RoutingHelper.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo, pool); c2.Consumers.Add(1, default); // here we have two different connections // and must be different since we have only one id per connection diff --git a/Tests/UnitTests.cs b/Tests/UnitTests.cs index ad3ef5a5..e77a6599 100644 --- a/Tests/UnitTests.cs +++ b/Tests/UnitTests.cs @@ -107,7 +107,7 @@ public Task CreateClient(ClientParameters clientParameters, Broker brok public bool ValidateDns { get; set; } = false; } - public class ReplicaRouting : IRouting + public class LeaderRouting : IRouting { public Task CreateClient(ClientParameters clientParameters, Broker broker, ILogger logger = null) { @@ -125,6 +125,25 @@ public Task CreateClient(ClientParameters clientParameters, Broker brok public bool ValidateDns { get; set; } = false; } + public class ReplicaseRouting : IRouting + { + public Task CreateClient(ClientParameters clientParameters, Broker broker, ILogger logger = null) + { + var fake = new FakeClient(clientParameters) + { + ConnectionProperties = new Dictionary() + { + + ["advertised_port"] = "5553", + ["advertised_host"] = "replica2" + } + }; + return Task.FromResult(fake); + } + + public bool ValidateDns { get; set; } = false; + } + // This class is only for unit tests public class UnitTests { @@ -141,7 +160,7 @@ public async Task GiveProperExceptionWhenUnableToConnect() var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("localhost", 3939), new List()); await Assert.ThrowsAsync(() => - RoutingHelper.LookupRandomConnection(clientParameters, metaDataInfo, + RoutingHelper.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo, new ConnectionsPool(1, 1))); } @@ -214,7 +233,7 @@ public void RandomReplicaLeader() var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("leader", 5552), new List()); var client = - RoutingHelper.LookupRandomConnection(clientParameters, metaDataInfo, + RoutingHelper.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo, new ConnectionsPool(1, 1)); Assert.Equal("5552", client.Result.ConnectionProperties["advertised_port"]); var res = (client.Result.ConnectionProperties["advertised_host"] == "leader" || @@ -222,6 +241,26 @@ public void RandomReplicaLeader() Assert.True(res); } + [Fact] + public void RandomOnlyReplicaIfThereAre() + { + // this test is not completed yet should add also some replicas + var addressResolver = new AddressResolver(new IPEndPoint(IPAddress.Parse("192.168.10.99"), 5552)); + var clientParameters = new ClientParameters() { AddressResolver = addressResolver, }; + var metaDataInfo = new StreamInfo("stream", + ResponseCode.Ok, new Broker("leader", 5552), + new List() + { + new Broker("replica2", 5553), + }); + var client = + RoutingHelper.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo, + new ConnectionsPool(1, 1)); + Assert.Equal("5553", client.Result.ConnectionProperties["advertised_port"]); + var res = (client.Result.ConnectionProperties["advertised_host"] == "replica2"); + Assert.True(res); + } + [Fact] public void CompressUnCompressShouldHaveTheSize() { diff --git a/docs/ReliableClient/Program.cs b/docs/ReliableClient/Program.cs index 2f64c89d..d24960e3 100644 --- a/docs/ReliableClient/Program.cs +++ b/docs/ReliableClient/Program.cs @@ -8,13 +8,13 @@ var rClient = RClient.Start(new RClient.Config() { ProducersPerConnection = 2, - ConsumersPerConnection = 2, - Host = "Node0", - Port = 5553, + ConsumersPerConnection = 100, + Host = "localhost", + Port = 5552, LoadBalancer = true, - SuperStream = true, - Streams = 1, - Producers = 1, + SuperStream = false, + Streams = 10, + Producers = 4, MessagesPerProducer = 50_000_000, Consumers = 4 // Username = "test", diff --git a/docs/ReliableClient/RClient.cs b/docs/ReliableClient/RClient.cs index 9a31e611..eacbe5b3 100644 --- a/docs/ReliableClient/RClient.cs +++ b/docs/ReliableClient/RClient.cs @@ -91,6 +91,11 @@ public static async Task Start(Config config) AddressResolver = resolver, UserName = config.Username, Password = config.Password, + ConnectionPoolConfig = new ConnectionPoolConfig() + { + ProducersPerConnection = config.ProducersPerConnection, + ConsumersPerConnection = config.ConsumersPerConnection, + }, Endpoints = new List() {resolver.EndPoint} }; }