Skip to content

Commit

Permalink
Allow a consumer to set the rebalancing strategy (#2660)
Browse files Browse the repository at this point in the history
Co-authored-by: Paul Reardon <Paul@ReardonTech.UK>
  • Loading branch information
iancooper and preardon authored May 11, 2023
1 parent 1b207b1 commit 7ea5be7
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class KafkaMessageConsumer : KafkaMessagingGateway, IAmAMessageConsumer
/// <param name="readCommittedOffsetsTimeoutMs">Timeout when reading the committed offsets, used when closing a consumer to log where it reached.
/// Defaults to 5000</param>
/// <param name="numPartitions">If we are creating missing infrastructure, How many partitions should the topic have. Defaults to 1</param>
/// <param name="partitionAssignmentStrategy">What is the strategy for assigning partitions to consumers?</param>
/// <param name="replicationFactor">If we are creating missing infrastructure, how many in-sync replicas do we need. Defaults to 1</param>
/// <param name="topicFindTimeoutMs">If we are checking for the existence of the topic, what is the timeout. Defaults to 10000ms</param>
/// <param name="makeChannels">Should we create infrastructure (topics) where it does not exist or check. Defaults to Create</param>
Expand All @@ -98,6 +99,7 @@ public KafkaMessageConsumer(
int sweepUncommittedOffsetsIntervalMs = 30000,
int readCommittedOffsetsTimeoutMs = 5000,
int numPartitions = 1,
PartitionAssignmentStrategy partitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
short replicationFactor = 1,
int topicFindTimeoutMs = 10000,
OnMissingChannel makeChannels = OnMissingChannel.Create
Expand Down Expand Up @@ -147,7 +149,7 @@ public KafkaMessageConsumer(
EnableAutoOffsetStore = false,
EnableAutoCommit = false,
// https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/
PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
PartitionAssignmentStrategy = partitionAssignmentStrategy,
};

_maxBatchSize = commitBatchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public IAmAMessageConsumer Create(Subscription subscription)
sweepUncommittedOffsetsIntervalMs: kafkaSubscription.SweepUncommittedOffsetsIntervalMs,
readCommittedOffsetsTimeoutMs: kafkaSubscription.ReadCommittedOffsetsTimeOutMs,
numPartitions: kafkaSubscription.NumPartitions,
partitionAssignmentStrategy: kafkaSubscription.PartitionAssignmentStrategy,
replicationFactor: kafkaSubscription.ReplicationFactor,
topicFindTimeoutMs: kafkaSubscription.TopicFindTimeoutMs,
makeChannels: kafkaSubscription.MakeChannels
Expand Down
21 changes: 18 additions & 3 deletions src/Paramore.Brighter.MessagingGateway.Kafka/KafkaSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ public class KafkaSubscription : Subscription
/// AutoOffsetReset.Error - Consider it an error to be lacking a reset
/// </summary>
public AutoOffsetReset OffsetDefault { get; set; } = AutoOffsetReset.Earliest;

/// <summary>
/// How should we assign partitions to consumers in the group?
/// Range - Assign to co-localise partitions to consumers in the same group
/// RoundRobin - Assign partitions to consumers in a round robin fashion
/// CooperativeSticky - Brighter's default, reduce the number of re-balances by assigning partitions to the consumer that previously owned them
/// </summary>
public PartitionAssignmentStrategy PartitionAssignmentStrategy { get; set; }

/// <summary>
/// How long before we time out when we are reading the committed offsets back (mainly used for debugging)
Expand Down Expand Up @@ -95,6 +103,7 @@ public class KafkaSubscription : Subscription
/// </summary>
public int TopicFindTimeoutMs { get; set; } = 5000;


/// <summary>
/// Initializes a new instance of the <see cref="Subscription"/> class.
/// </summary>
Expand All @@ -121,6 +130,7 @@ public class KafkaSubscription : Subscription
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="emptyChannelDelay">How long to pause when a channel is empty in milliseconds</param>
/// <param name="channelFailureDelay">How long to pause when there is a channel failure in milliseconds</param>
/// <param name="partitionAssignmentStrategy">How do partitions get assigned to consumers?</param>
public KafkaSubscription (
Type dataType,
SubscriptionName name = null,
Expand All @@ -145,7 +155,8 @@ public KafkaSubscription (
IAmAChannelFactory channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000)
int channelFailureDelay = 1000,
PartitionAssignmentStrategy partitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky)
: base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount,
requeueDelayInMilliseconds, unacceptableMessageLimit, isAsync, channelFactory, makeChannels, emptyChannelDelay, channelFailureDelay)
{
Expand All @@ -158,6 +169,7 @@ public KafkaSubscription (
SessionTimeoutMs = sessionTimeoutMs;
NumPartitions = numOfPartitions;
ReplicationFactor = replicationFactor;
PartitionAssignmentStrategy = partitionAssignmentStrategy;
}
}

Expand Down Expand Up @@ -189,6 +201,7 @@ public class KafkaSubscription<T> : KafkaSubscription where T : IRequest
/// <param name="channelFactory">The channel factory to create channels for Consumer.</param>
/// <param name="emptyChannelDelay">How long to pause when a channel is empty in milliseconds</param>
/// <param name="channelFailureDelay">How long to pause when there is a channel failure in milliseconds</param>
/// <param name="partitionAssignmentStrategy">How do partitions get assigned to consumers?</param>
public KafkaSubscription(
SubscriptionName name = null,
ChannelName channelName = null,
Expand All @@ -212,11 +225,13 @@ public KafkaSubscription(
IAmAChannelFactory channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000)
int channelFailureDelay = 1000,
PartitionAssignmentStrategy partitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky)
: base(typeof(T), name, channelName, routingKey, groupId, bufferSize, noOfPerformers, timeoutInMilliseconds,
requeueCount, requeueDelayInMilliseconds, unacceptableMessageLimit, offsetDefault, commitBatchSize,
sessionTimeoutMs, maxPollIntervalMs, sweepUncommittedOffsetsIntervalMs, isolationLevel, isAsync,
numOfPartitions, replicationFactor, channelFactory, makeChannels, emptyChannelDelay, channelFailureDelay)
numOfPartitions, replicationFactor, channelFactory, makeChannels, emptyChannelDelay, channelFailureDelay,
partitionAssignmentStrategy)
{
}
}
Expand Down

0 comments on commit 7ea5be7

Please sign in to comment.