Skip to content

Commit

Permalink
Consumer connected only to the followers (#352)
Browse files Browse the repository at this point in the history
* Consumer connected only to the followers
* Rename method to LookupLeaderOrRandomReplicasConnection
---------
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio authored Feb 6, 2024
1 parent c3d8e55 commit 66b9cab
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 31 deletions.
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,4 @@ static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Clie
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>
static RabbitMQ.Stream.Client.Reliable.ReliableBase.RandomWait() -> System.Threading.Tasks.Task
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupLeaderConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupRandomConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupLeaderOrRandomReplicasConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public static async Task<IConsumer> Create(
)
{
var client = await RoutingHelper<Routing>
.LookupRandomConnection(clientParameters, metaStreamInfo, config.Pool, logger)
.LookupLeaderOrRandomReplicasConnection(clientParameters, metaStreamInfo, config.Pool, logger)
.ConfigureAwait(false);
var consumer = new RawConsumer((Client)client, config, logger);
await consumer.Init().ConfigureAwait(false);
Expand Down
49 changes: 31 additions & 18 deletions RabbitMQ.Stream.Client/RoutingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,27 @@ internal static async Task<IClient> LookupConnection(
// In this case we just return the node (leader for producer, random for consumer)
// since there is not load balancer configuration

return await routing.CreateClient(clientParameters with
{
Endpoint = endPointNoLb,
ClientProvidedName = clientParameters.ClientProvidedName
}, broker, logger)
return await routing
.CreateClient(
clientParameters with
{
Endpoint = endPointNoLb,
ClientProvidedName = clientParameters.ClientProvidedName
}, broker, logger)
.ConfigureAwait(false);
}

// here it means that there is a AddressResolver configuration
// so there is a load-balancer or proxy we need to get the right connection
// as first we try with the first node given from the LB
var endPoint = clientParameters.AddressResolver.EndPoint;
var client = await routing.CreateClient(clientParameters with
{
Endpoint = endPoint,
ClientProvidedName = clientParameters.ClientProvidedName
}, broker, logger)
var client = await routing
.CreateClient(
clientParameters with
{
Endpoint = endPoint,
ClientProvidedName = clientParameters.ClientProvidedName
}, broker, logger)
.ConfigureAwait(false);

var advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host");
Expand All @@ -95,11 +99,13 @@ internal static async Task<IClient> LookupConnection(
attemptNo++;
await client.Close("advertised_host or advertised_port doesn't match").ConfigureAwait(false);

client = await routing.CreateClient(clientParameters with
{
Endpoint = endPoint,
ClientProvidedName = clientParameters.ClientProvidedName
}, broker, logger)
client = await routing
.CreateClient(
clientParameters with
{
Endpoint = endPoint,
ClientProvidedName = clientParameters.ClientProvidedName
}, broker, logger)
.ConfigureAwait(false);

advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host");
Expand Down Expand Up @@ -175,13 +181,20 @@ await LookupConnection(clientParameters, metaDataInfo.Leader, MaxAttempts(metaDa
}

/// <summary>
/// Gets a random connection. The consumer can connect to a replica or leader.
/// Gets a random connection a random replica.
/// If the replicas are not available it will connect to the leader.
/// </summary>
public static async Task<IClient> LookupRandomConnection(ClientParameters clientParameters,
public static async Task<IClient> LookupLeaderOrRandomReplicasConnection(ClientParameters clientParameters,
StreamInfo metaDataInfo, ConnectionsPool pool, ILogger logger = null)
{
var brokers = new List<Broker>() { metaDataInfo.Leader };
var brokers = new List<Broker>();
if (metaDataInfo.Replicas is { Count: <= 0 })
{
brokers.Add(metaDataInfo.Leader);
}

brokers.AddRange(metaDataInfo.Replicas);

var exceptions = new List<Exception>();
var br = brokers.OrderBy(x => Random.Shared.Next()).ToList();

Expand Down
4 changes: 2 additions & 2 deletions Tests/ConnectionsPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ public async void RoutingShouldReturnTwoConnectionsGivenOneItemPerConnection()
var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("localhost", 3939),
new List<Broker>());
var pool = new ConnectionsPool(0, 1);
var c1 = await RoutingHelper<PoolRouting>.LookupRandomConnection(clientParameters, metaDataInfo, pool);
var c1 = await RoutingHelper<PoolRouting>.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo, pool);
c1.Consumers.Add(1, default);
var c2 = await RoutingHelper<PoolRouting>.LookupRandomConnection(clientParameters, metaDataInfo, pool);
var c2 = await RoutingHelper<PoolRouting>.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo, pool);
c2.Consumers.Add(1, default);
// here we have two different connections
// and must be different since we have only one id per connection
Expand Down
45 changes: 42 additions & 3 deletions Tests/UnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Task<IClient> CreateClient(ClientParameters clientParameters, Broker brok
public bool ValidateDns { get; set; } = false;
}

public class ReplicaRouting : IRouting
public class LeaderRouting : IRouting
{
public Task<IClient> CreateClient(ClientParameters clientParameters, Broker broker, ILogger logger = null)
{
Expand All @@ -125,6 +125,25 @@ public Task<IClient> CreateClient(ClientParameters clientParameters, Broker brok
public bool ValidateDns { get; set; } = false;
}

public class ReplicaseRouting : IRouting
{
public Task<IClient> CreateClient(ClientParameters clientParameters, Broker broker, ILogger logger = null)
{
var fake = new FakeClient(clientParameters)
{
ConnectionProperties = new Dictionary<string, string>()
{

["advertised_port"] = "5553",
["advertised_host"] = "replica2"
}
};
return Task.FromResult<IClient>(fake);
}

public bool ValidateDns { get; set; } = false;
}

// This class is only for unit tests
public class UnitTests
{
Expand All @@ -141,7 +160,7 @@ public async Task GiveProperExceptionWhenUnableToConnect()
var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("localhost", 3939),
new List<Broker>());
await Assert.ThrowsAsync<AggregateException>(() =>
RoutingHelper<Routing>.LookupRandomConnection(clientParameters, metaDataInfo,
RoutingHelper<Routing>.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo,
new ConnectionsPool(1, 1)));
}

Expand Down Expand Up @@ -214,14 +233,34 @@ public void RandomReplicaLeader()
var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("leader", 5552),
new List<Broker>());
var client =
RoutingHelper<ReplicaRouting>.LookupRandomConnection(clientParameters, metaDataInfo,
RoutingHelper<LeaderRouting>.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo,
new ConnectionsPool(1, 1));
Assert.Equal("5552", client.Result.ConnectionProperties["advertised_port"]);
var res = (client.Result.ConnectionProperties["advertised_host"] == "leader" ||
client.Result.ConnectionProperties["advertised_host"] == "replica");
Assert.True(res);
}

[Fact]
public void RandomOnlyReplicaIfThereAre()
{
// this test is not completed yet should add also some replicas
var addressResolver = new AddressResolver(new IPEndPoint(IPAddress.Parse("192.168.10.99"), 5552));
var clientParameters = new ClientParameters() { AddressResolver = addressResolver, };
var metaDataInfo = new StreamInfo("stream",
ResponseCode.Ok, new Broker("leader", 5552),
new List<Broker>()
{
new Broker("replica2", 5553),
});
var client =
RoutingHelper<ReplicaseRouting>.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo,
new ConnectionsPool(1, 1));
Assert.Equal("5553", client.Result.ConnectionProperties["advertised_port"]);
var res = (client.Result.ConnectionProperties["advertised_host"] == "replica2");
Assert.True(res);
}

[Fact]
public void CompressUnCompressShouldHaveTheSize()
{
Expand Down
12 changes: 6 additions & 6 deletions docs/ReliableClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
var rClient = RClient.Start(new RClient.Config()
{
ProducersPerConnection = 2,
ConsumersPerConnection = 2,
Host = "Node0",
Port = 5553,
ConsumersPerConnection = 100,
Host = "localhost",
Port = 5552,
LoadBalancer = true,
SuperStream = true,
Streams = 1,
Producers = 1,
SuperStream = false,
Streams = 10,
Producers = 4,
MessagesPerProducer = 50_000_000,
Consumers = 4
// Username = "test",
Expand Down
5 changes: 5 additions & 0 deletions docs/ReliableClient/RClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public static async Task Start(Config config)
AddressResolver = resolver,
UserName = config.Username,
Password = config.Password,
ConnectionPoolConfig = new ConnectionPoolConfig()
{
ProducersPerConnection = config.ProducersPerConnection,
ConsumersPerConnection = config.ConsumersPerConnection,
},
Endpoints = new List<EndPoint>() {resolver.EndPoint}
};
}
Expand Down

0 comments on commit 66b9cab

Please sign in to comment.