From 3dd904e6d62d49de0ab334fff5152a6162d37fba Mon Sep 17 00:00:00 2001 From: Igor Fedchenko Date: Wed, 2 Sep 2020 17:26:44 +0300 Subject: [PATCH] Added support for custom IPartitionEventHandler instances in subscriptions (#154) * Added support for custom IPartitionEventHandler instances * Fixed compilation errors after merging with dev branch * Added specs * Restart CI * Added docs to readme --- README.md | 38 ++++ .../PlainSourceIntegrationTests.cs | 57 ++++++ .../Helpers/PartitionEventHandler.cs | 175 +++++++++--------- .../Helpers/RestrictedConsumer.cs | 66 ++++--- .../Settings/Subscriptions.cs | 37 +++- .../Abstract/SingleSourceStageLogic.cs | 10 +- .../Consumers/Abstract/SubSourceLogic.cs | 2 +- .../Abstract/TransactionalSourceLogic.cs | 6 +- .../Consumers/Actors/KafkaConsumerActor.cs | 8 +- .../Actors/KafkaConsumerActorMetadata.cs | 6 +- 10 files changed, 281 insertions(+), 124 deletions(-) diff --git a/README.md b/README.md index fd82342b..4b476bdd 100644 --- a/README.md +++ b/README.md @@ -399,6 +399,44 @@ var source = KafkaConsumer.PlainPartitionedManualOffsetSource(consumerSettings, Are not implemented yet. Waiting for issue https://github.com/akkadotnet/Akka.Streams.Kafka/issues/85 to be resolved. +### Partition Events handling + +Sometimes you may need to add custom handling for partition events, like assigning partition to consumer. To do that, you will need: + +1. To write custom implementation of `IPartitionEventHandler` interface: +```c# +class CustomEventsHandler : IPartitionEventHandler +{ + /// + public void OnRevoke(IImmutableSet revokedTopicPartitions, IRestrictedConsumer consumer) + { + // Your code here + } + + /// + public void OnAssign(IImmutableSet assignedTopicPartitions, IRestrictedConsumer consumer) + { + // Your code here + } + + /// + public void OnStop(IImmutableSet topicPartitions, IRestrictedConsumer consumer) + { + // Your code here + } +} +``` + +Here `IRestrictedConsumer` is an object providing access to some limited API of internal consumer kafka client. + +2. Use `WithPartitionEventsHandler` of `Topic` / `TopicPartition` subscriptions, like this: +```c# +var customHandler = new CustomEventsHandler(); +KafkaConsumer.PlainSource(settings, Subscriptions.Topics(yourTopic).WithPartitionEventsHandler(customHandler)); +``` + +> Note: Your handler callbacks will be invoked in the same thread where kafka consumer is handling all events and getting messages, so be careful when using it. +> ## Local development There are some helpers to simplify local development diff --git a/src/Akka.Streams.Kafka.Tests/Integration/PlainSourceIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/PlainSourceIntegrationTests.cs index da9e1cb2..9d8d7916 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/PlainSourceIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/PlainSourceIntegrationTests.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using System.Runtime.Serialization; using System.Text; @@ -13,6 +14,7 @@ using Akka.Streams.Kafka.Tests.Logging; using Akka.Streams.Supervision; using Akka.Streams.TestKit; +using Akka.Util.Internal; using Confluent.Kafka; using FluentAssertions; using Xunit; @@ -176,5 +178,60 @@ Directive Decider(Exception cause) => cause is SerializationException probe.ExpectNoMsg(TimeSpan.FromSeconds(10)); probe.Cancel(); } + + [Fact] + public async Task Custom_partition_event_handling_Should_work() + { + int elementsCount = 100; + var topic1 = CreateTopic(1); + var group1 = CreateGroup(1); + var topicPartition1 = new TopicPartition(topic1, 0); + + await GivenInitializedTopic(topicPartition1); + + await ProduceStrings(new TopicPartition(topic1, 0), Enumerable.Range(1, elementsCount), ProducerSettings); + + var consumerSettings = CreateConsumerSettings(group1); + + var customHandler = new CustomEventsHandler(); + var (control, probe) = CreateProbe(consumerSettings, Subscriptions.Topics(topic1).WithPartitionEventsHandler(customHandler)); + + probe.Request(elementsCount); + foreach (var i in Enumerable.Range(1, elementsCount).Select(c => c.ToString())) + probe.ExpectNext(i, TimeSpan.FromSeconds(10)); + + var shutdown = control.Shutdown(); + await AwaitConditionAsync(() => shutdown.IsCompleted); + + customHandler.AssignmentEventsCounter.Current.Should().BeGreaterThan(0); + customHandler.StopEventsCounter.Current.Should().BeGreaterThan(0); + } + + class CustomEventsHandler : IPartitionEventHandler + { + public AtomicCounter AssignmentEventsCounter = new AtomicCounter(0); + public AtomicCounter RevokeEventsCounter = new AtomicCounter(0); + public AtomicCounter StopEventsCounter = new AtomicCounter(0); + + + /// + public void OnRevoke(IImmutableSet revokedTopicPartitions, + IRestrictedConsumer consumer) + { + RevokeEventsCounter.IncrementAndGet(); + } + + /// + public void OnAssign(IImmutableSet assignedTopicPartitions, IRestrictedConsumer consumer) + { + AssignmentEventsCounter.IncrementAndGet(); + } + + /// + public void OnStop(IImmutableSet topicPartitions, IRestrictedConsumer consumer) + { + StopEventsCounter.IncrementAndGet(); + } + } } } diff --git a/src/Akka.Streams.Kafka/Helpers/PartitionEventHandler.cs b/src/Akka.Streams.Kafka/Helpers/PartitionEventHandler.cs index 4307c4b1..185dafc4 100644 --- a/src/Akka.Streams.Kafka/Helpers/PartitionEventHandler.cs +++ b/src/Akka.Streams.Kafka/Helpers/PartitionEventHandler.cs @@ -17,118 +17,117 @@ namespace Akka.Streams.Kafka.Helpers /// the callbacks in this class are called. /// [ApiMayChange] - internal interface IPartitionEventHandler + public interface IPartitionEventHandler { /// /// Called when partitions are revoked /// - void OnRevoke(IImmutableSet revokedTopicPartitions, RestrictedConsumer consumer); + void OnRevoke(IImmutableSet revokedTopicPartitions, IRestrictedConsumer consumer); /// /// Called when partitions are assigned /// - void OnAssign(IImmutableSet assignedTopicPartitions, RestrictedConsumer consumer); + void OnAssign(IImmutableSet assignedTopicPartitions, IRestrictedConsumer consumer); /// /// Called when consuming is stopped /// - void OnStop(IImmutableSet topicPartitions, RestrictedConsumer consumer); + void OnStop(IImmutableSet topicPartitions, IRestrictedConsumer consumer); } /// - /// Dummy handler which does nothing. Also + /// Contains internal imlementations of /// - internal class EmptyPartitionEventHandler : IPartitionEventHandler + internal static class PartitionEventHandlers { - /// - public void OnRevoke(IImmutableSet revokedTopicPartitions, RestrictedConsumer consumer) - { - } - - /// - public void OnAssign(IImmutableSet assignedTopicPartitions, RestrictedConsumer consumer) - { - } - - /// - public void OnStop(IImmutableSet topicPartitions, RestrictedConsumer consumer) - { - } - } - - /// - /// Handler allowing to pass custom stage callbacks. Also - /// - internal class AsyncCallbacksPartitionEventHandler : IPartitionEventHandler - { - private readonly Action> _partitionAssignedCallback; - private readonly Action> _partitionRevokedCallback; - - public AsyncCallbacksPartitionEventHandler(Action> partitionAssignedCallback, - Action> partitionRevokedCallback) - { - _partitionAssignedCallback = partitionAssignedCallback; - _partitionRevokedCallback = partitionRevokedCallback; - } - - /// - public void OnRevoke(IImmutableSet revokedTopicPartitions, RestrictedConsumer consumer) - { - _partitionRevokedCallback(revokedTopicPartitions); - } - - /// - public void OnAssign(IImmutableSet assignedTopicPartitions, RestrictedConsumer consumer) - { - _partitionAssignedCallback(assignedTopicPartitions); - } - - /// - public void OnStop(IImmutableSet topicPartitions, RestrictedConsumer consumer) - { - } - } - - /// - /// Creates new handler with chaining of other two handlers - /// - /// - /// - internal class PartitionAssignedHandlersChain : IPartitionEventHandler - { - private readonly IPartitionEventHandler _handler1; - private readonly IPartitionEventHandler _handler2; - /// - /// PartitionAssignedHandlersChain + /// Dummy handler which does nothing. Also /// - /// First handler in chain - /// Second handler in chain - public PartitionAssignedHandlersChain(IPartitionEventHandler handler1, IPartitionEventHandler handler2) + internal class Empty : IPartitionEventHandler { - _handler1 = handler1; - _handler2 = handler2; + /// + public void OnRevoke(IImmutableSet revokedTopicPartitions, IRestrictedConsumer consumer) + { + } + + /// + public void OnAssign(IImmutableSet assignedTopicPartitions, IRestrictedConsumer consumer) + { + } + + /// + public void OnStop(IImmutableSet topicPartitions, IRestrictedConsumer consumer) + { + } } - - /// - public void OnRevoke(IImmutableSet revokedTopicPartitions, RestrictedConsumer consumer) - { - _handler1.OnRevoke(revokedTopicPartitions, consumer); - _handler2.OnRevoke(revokedTopicPartitions, consumer); - } - - /// - public void OnAssign(IImmutableSet assignedTopicPartitions, RestrictedConsumer consumer) + + /// + /// Handler allowing to pass custom stage callbacks. Also + /// + internal class AsyncCallbacks : IPartitionEventHandler { - _handler1.OnAssign(assignedTopicPartitions, consumer); - _handler2.OnAssign(assignedTopicPartitions, consumer); + private readonly Action> _partitionAssignedCallback; + private readonly Action> _partitionRevokedCallback; + + public AsyncCallbacks(Action> partitionAssignedCallback, + Action> partitionRevokedCallback) + { + _partitionAssignedCallback = partitionAssignedCallback; + _partitionRevokedCallback = partitionRevokedCallback; + } + + /// + public void OnRevoke(IImmutableSet revokedTopicPartitions, IRestrictedConsumer consumer) + { + _partitionRevokedCallback(revokedTopicPartitions); + } + + /// + public void OnAssign(IImmutableSet assignedTopicPartitions, IRestrictedConsumer consumer) + { + _partitionAssignedCallback(assignedTopicPartitions); + } + + /// + public void OnStop(IImmutableSet topicPartitions, IRestrictedConsumer consumer) + { + } } - - /// - public void OnStop(IImmutableSet topicPartitions, RestrictedConsumer consumer) + + /// + /// Handler allowing chain other implementations of + /// + internal class Chain : IPartitionEventHandler { - _handler1.OnStop(topicPartitions, consumer); - _handler2.OnStop(topicPartitions, consumer); + private readonly IPartitionEventHandler _handler1; + private readonly IPartitionEventHandler _handler2; + + public Chain(IPartitionEventHandler handler1, IPartitionEventHandler handler2) + { + _handler1 = handler1; + _handler2 = handler2; + } + + /// + public void OnRevoke(IImmutableSet revokedTopicPartitions, IRestrictedConsumer consumer) + { + _handler1?.OnRevoke(revokedTopicPartitions, consumer); + _handler2?.OnRevoke(revokedTopicPartitions, consumer); + } + + /// + public void OnAssign(IImmutableSet assignedTopicPartitions, IRestrictedConsumer consumer) + { + _handler1?.OnAssign(assignedTopicPartitions, consumer); + _handler2?.OnAssign(assignedTopicPartitions, consumer); + } + + /// + public void OnStop(IImmutableSet topicPartitions, IRestrictedConsumer consumer) + { + _handler1?.OnStop(topicPartitions, consumer); + _handler2?.OnStop(topicPartitions, consumer); + } } } } \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Helpers/RestrictedConsumer.cs b/src/Akka.Streams.Kafka/Helpers/RestrictedConsumer.cs index 86dc8814..ead479a7 100644 --- a/src/Akka.Streams.Kafka/Helpers/RestrictedConsumer.cs +++ b/src/Akka.Streams.Kafka/Helpers/RestrictedConsumer.cs @@ -9,54 +9,76 @@ namespace Akka.Streams.Kafka.Helpers /// /// Offers parts of API which becomes available to /// - public class RestrictedConsumer + public interface IRestrictedConsumer + { + /// + /// See + /// + List Assignment { get; } + + /// + /// Get the first offset for the given partitions. + /// + List BeginningOffsets(IEnumerable topicPartitions); + + /// + /// See + /// + void CommitSync(IEnumerable offsets); + + /// + /// See + /// + void Committed(IEnumerable topicPartitions); + + /// + /// See + /// + Offset Position(TopicPartition topicPartition); + + /// + /// See + /// + void Seek(TopicPartitionOffset topicPartitionOffset); + } + + /// + /// Offers parts of API which becomes available to + /// + internal class RestrictedConsumer : IRestrictedConsumer { private readonly IConsumer _consumer; private readonly TimeSpan _duration; /// - /// + /// RestrictedConsumer /// - /// - /// public RestrictedConsumer(IConsumer consumer, TimeSpan duration) { _consumer = consumer; _duration = duration; } - /// - /// See - /// + /// public List Assignment => _consumer.Assignment; - /// - /// Get the first offset for the given partitions. - /// + /// public List BeginningOffsets(IEnumerable topicPartitions) { var timestamps = topicPartitions.Select(tp => new TopicPartitionTimestamp(tp, new Timestamp(0, TimestampType.NotAvailable))).ToList(); return _consumer.OffsetsForTimes(timestamps, _duration); } - /// - /// See - /// + /// public void CommitSync(IEnumerable offsets) => _consumer.Commit(offsets); - /// - /// See - /// + /// public void Committed(IEnumerable topicPartitions) => _consumer.Committed(topicPartitions, _duration); - /// - /// See - /// + /// public Offset Position(TopicPartition topicPartition) => _consumer.Position(topicPartition); - /// - /// See - /// + /// public void Seek(TopicPartitionOffset topicPartitionOffset) => _consumer.Seek(topicPartitionOffset); } diff --git a/src/Akka.Streams.Kafka/Settings/Subscriptions.cs b/src/Akka.Streams.Kafka/Settings/Subscriptions.cs index 34c60d35..b9249ab1 100644 --- a/src/Akka.Streams.Kafka/Settings/Subscriptions.cs +++ b/src/Akka.Streams.Kafka/Settings/Subscriptions.cs @@ -1,11 +1,26 @@ using System.Collections.Immutable; +using Akka.Streams.Kafka.Helpers; +using Akka.Streams.Util; +using Akka.Util; using Confluent.Kafka; namespace Akka.Streams.Kafka.Settings { public interface ISubscription { } public interface IManualSubscription : ISubscription { } - public interface IAutoSubscription : ISubscription { } + + public interface IAutoSubscription : ISubscription + { + /// + /// Partition events handler + /// + Option PartitionEventsHandler { get; } + + /// + /// Allows to specify custom partition events handler. See more at + /// + IAutoSubscription WithPartitionEventsHandler(IPartitionEventHandler partitionEventHandler); + } /// /// TopicSubscription @@ -25,6 +40,16 @@ public TopicSubscription(IImmutableSet topics) /// List of topics to subscribe /// public IImmutableSet Topics { get; } + + /// + public Option PartitionEventsHandler { get; private set; } + + /// + public IAutoSubscription WithPartitionEventsHandler(IPartitionEventHandler partitionEventHandler) + { + PartitionEventsHandler = new Option(partitionEventHandler); + return this; + } } /// @@ -48,6 +73,16 @@ public TopicSubscriptionPattern(string topicPattern) /// Topic pattern (regular expression to be matched) /// public string TopicPattern { get; } + + /// + public Option PartitionEventsHandler { get; private set; } + + /// + public IAutoSubscription WithPartitionEventsHandler(IPartitionEventHandler partitionEventHandler) + { + PartitionEventsHandler = new Option(partitionEventHandler); + return this; + } } /// diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SingleSourceStageLogic.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SingleSourceStageLogic.cs index b7fc8d9f..83233d96 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SingleSourceStageLogic.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SingleSourceStageLogic.cs @@ -60,7 +60,13 @@ protected override IActorRef CreateConsumerActor() var partitionsAssignedHandler = GetAsyncCallback>(PartitionsAssigned); var partitionsRevokedHandler = GetAsyncCallback>(PartitionsRevoked); - IPartitionEventHandler eventHandler = new AsyncCallbacksPartitionEventHandler(partitionsAssignedHandler, partitionsRevokedHandler); + IPartitionEventHandler internalHandler = new PartitionEventHandlers.AsyncCallbacks(partitionsAssignedHandler, partitionsRevokedHandler); + + // If custom partition events handler specified - add it to the chain + var eventHandler = _subscription is IAutoSubscription autoSubscription && autoSubscription.PartitionEventsHandler.HasValue + ? new PartitionEventHandlers.Chain(autoSubscription.PartitionEventsHandler.Value, internalHandler) + : internalHandler; + // This allows to override partition events handling by subclasses eventHandler = AddToPartitionAssignmentHandler(eventHandler); @@ -94,7 +100,7 @@ protected override void PerformShutdown() /// /// Opportunity for subclasses to add their logic to the partition assignment callbacks. /// - protected virtual IPartitionEventHandler AddToPartitionAssignmentHandler(IPartitionEventHandler handler) + protected virtual IPartitionEventHandler AddToPartitionAssignmentHandler(IPartitionEventHandler handler) { return handler; } diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SubSourceLogic.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SubSourceLogic.cs index 297512ab..5f6c7c01 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SubSourceLogic.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SubSourceLogic.cs @@ -133,7 +133,7 @@ public override void PreStart() if (!(Materializer is ActorMaterializer actorMaterializer)) throw new ArgumentException($"Expected {typeof(ActorMaterializer)} but got {Materializer.GetType()}"); - var eventHandler = new AsyncCallbacksPartitionEventHandler(_partitionAssignedCallback, _partitionRevokedCallback); + var eventHandler = new PartitionEventHandlers.AsyncCallbacks(_partitionAssignedCallback, _partitionRevokedCallback); var extendedActorSystem = actorMaterializer.System.AsInstanceOf(); ConsumerActor = extendedActorSystem.SystemActorOf(KafkaConsumerActorMetadata.GetProps(SourceActor.Ref, _settings, eventHandler), diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/TransactionalSourceLogic.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/TransactionalSourceLogic.cs index 41fa6766..be54952e 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/TransactionalSourceLogic.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/TransactionalSourceLogic.cs @@ -140,9 +140,9 @@ protected override void StopConsumerActor() } /// - protected override IPartitionEventHandler AddToPartitionAssignmentHandler(IPartitionEventHandler handler) + protected override IPartitionEventHandler AddToPartitionAssignmentHandler(IPartitionEventHandler handler) { - var blockingRevokedCall = new AsyncCallbacksPartitionEventHandler( + var blockingRevokedCall = new PartitionEventHandlers.AsyncCallbacks( partitionAssignedCallback: _ => { }, partitionRevokedCallback: revokedTopicPartitions => { @@ -158,7 +158,7 @@ protected override IPartitionEventHandler AddToPartitionAssignmentHandler( } }); - return new PartitionAssignedHandlersChain(handler, blockingRevokedCall); + return new PartitionEventHandlers.Chain(handler, blockingRevokedCall); } private bool WaitForDraining(IImmutableSet partitions) diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs index 12437371..4da09ec8 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs @@ -28,7 +28,7 @@ internal class KafkaConsumerActor : ActorBase /// /// Stores delegates for external handling of partition events /// - private readonly IPartitionEventHandler _partitionEventHandler; + private readonly IPartitionEventHandler _partitionEventHandler; private ICancelable _poolCancellation; private Internal.Poll _pollMessage; @@ -71,7 +71,7 @@ internal class KafkaConsumerActor : ActorBase /// Owner actor to send critical failures to /// Consumer settings /// Partion events handler - public KafkaConsumerActor(IActorRef owner, ConsumerSettings settings, IPartitionEventHandler partitionEventHandler) + public KafkaConsumerActor(IActorRef owner, ConsumerSettings settings, IPartitionEventHandler partitionEventHandler) { _owner = owner; _settings = settings; @@ -495,13 +495,13 @@ public override void OnPartitionsRevoked(IImmutableSet par /// class RebalanceListener : RebalanceListenerBase { - private readonly IPartitionEventHandler _partitionEventHandler; + private readonly IPartitionEventHandler _partitionEventHandler; private readonly KafkaConsumerActor _actor; private readonly RestrictedConsumer _restrictedConsumer; private readonly TimeSpan _warningDuration; - public RebalanceListener(IPartitionEventHandler partitionEventHandler, KafkaConsumerActor actor) + public RebalanceListener(IPartitionEventHandler partitionEventHandler, KafkaConsumerActor actor) { _partitionEventHandler = partitionEventHandler; _actor = actor; diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActorMetadata.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActorMetadata.cs index ad3ab0ac..0c3963b9 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActorMetadata.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActorMetadata.cs @@ -26,18 +26,18 @@ public class KafkaConsumerActorMetadata /// Gets actor props /// public static Props GetProps(ConsumerSettings settings) => - Props.Create(() => new KafkaConsumerActor(ActorRefs.Nobody, settings, new EmptyPartitionEventHandler())).WithDispatcher(settings.DispatcherId); + Props.Create(() => new KafkaConsumerActor(ActorRefs.Nobody, settings, new PartitionEventHandlers.Empty())).WithDispatcher(settings.DispatcherId); /// /// Gets actor props /// - internal static Props GetProps(ConsumerSettings settings, IPartitionEventHandler handler) => + internal static Props GetProps(ConsumerSettings settings, IPartitionEventHandler handler) => Props.Create(() => new KafkaConsumerActor(ActorRefs.Nobody, settings, handler)).WithDispatcher(settings.DispatcherId); /// /// Gets actor props /// - internal static Props GetProps(IActorRef owner, ConsumerSettings settings, IPartitionEventHandler handler) => + internal static Props GetProps(IActorRef owner, ConsumerSettings settings, IPartitionEventHandler handler) => Props.Create(() => new KafkaConsumerActor(owner, settings, handler)).WithDispatcher(settings.DispatcherId); ///