Skip to content

Commit

Permalink
Added support for custom IPartitionEventHandler instances in subscrip…
Browse files Browse the repository at this point in the history
…tions (#154)

* Added support for custom IPartitionEventHandler instances

* Fixed compilation errors after merging with dev branch

* Added specs

* Restart CI

* Added docs to readme
  • Loading branch information
IgorFedchenko authored Sep 2, 2020
1 parent 8c69d80 commit 3dd904e
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 124 deletions.
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,44 @@ var source = KafkaConsumer.PlainPartitionedManualOffsetSource(consumerSettings,

Are not implemented yet. Waiting for issue https://github.com/akkadotnet/Akka.Streams.Kafka/issues/85 to be resolved.

### Partition Events handling

Sometimes you may need to add custom handling for partition events, like assigning partition to consumer. To do that, you will need:

1. To write custom implementation of `IPartitionEventHandler` interface:
```c#
class CustomEventsHandler : IPartitionEventHandler
{
/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
{
// Your code here
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
{
// Your code here
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, IRestrictedConsumer consumer)
{
// Your code here
}
}
```

Here `IRestrictedConsumer` is an object providing access to some limited API of internal consumer kafka client.

2. Use `WithPartitionEventsHandler` of `Topic` / `TopicPartition` subscriptions, like this:
```c#
var customHandler = new CustomEventsHandler();
KafkaConsumer.PlainSource(settings, Subscriptions.Topics(yourTopic).WithPartitionEventsHandler(customHandler));
```

> Note: Your handler callbacks will be invoked in the same thread where kafka consumer is handling all events and getting messages, so be careful when using it.
>
## Local development

There are some helpers to simplify local development
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
Expand All @@ -13,6 +14,7 @@
using Akka.Streams.Kafka.Tests.Logging;
using Akka.Streams.Supervision;
using Akka.Streams.TestKit;
using Akka.Util.Internal;
using Confluent.Kafka;
using FluentAssertions;
using Xunit;
Expand Down Expand Up @@ -176,5 +178,60 @@ Directive Decider(Exception cause) => cause is SerializationException
probe.ExpectNoMsg(TimeSpan.FromSeconds(10));
probe.Cancel();
}

[Fact]
public async Task Custom_partition_event_handling_Should_work()
{
int elementsCount = 100;
var topic1 = CreateTopic(1);
var group1 = CreateGroup(1);
var topicPartition1 = new TopicPartition(topic1, 0);

await GivenInitializedTopic(topicPartition1);

await ProduceStrings(new TopicPartition(topic1, 0), Enumerable.Range(1, elementsCount), ProducerSettings);

var consumerSettings = CreateConsumerSettings<string>(group1);

var customHandler = new CustomEventsHandler();
var (control, probe) = CreateProbe(consumerSettings, Subscriptions.Topics(topic1).WithPartitionEventsHandler(customHandler));

probe.Request(elementsCount);
foreach (var i in Enumerable.Range(1, elementsCount).Select(c => c.ToString()))
probe.ExpectNext(i, TimeSpan.FromSeconds(10));

var shutdown = control.Shutdown();
await AwaitConditionAsync(() => shutdown.IsCompleted);

customHandler.AssignmentEventsCounter.Current.Should().BeGreaterThan(0);
customHandler.StopEventsCounter.Current.Should().BeGreaterThan(0);
}

class CustomEventsHandler : IPartitionEventHandler
{
public AtomicCounter AssignmentEventsCounter = new AtomicCounter(0);
public AtomicCounter RevokeEventsCounter = new AtomicCounter(0);
public AtomicCounter StopEventsCounter = new AtomicCounter(0);


/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
IRestrictedConsumer consumer)
{
RevokeEventsCounter.IncrementAndGet();
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
{
AssignmentEventsCounter.IncrementAndGet();
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, IRestrictedConsumer consumer)
{
StopEventsCounter.IncrementAndGet();
}
}
}
}
175 changes: 87 additions & 88 deletions src/Akka.Streams.Kafka/Helpers/PartitionEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,118 +17,117 @@ namespace Akka.Streams.Kafka.Helpers
/// the callbacks in this class are called.
/// </summary>
[ApiMayChange]
internal interface IPartitionEventHandler<K, V>
public interface IPartitionEventHandler
{
/// <summary>
/// Called when partitions are revoked
/// </summary>
void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, RestrictedConsumer<K, V> consumer);
void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer);

/// <summary>
/// Called when partitions are assigned
/// </summary>
void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, RestrictedConsumer<K, V> consumer);
void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer);

/// <summary>
/// Called when consuming is stopped
/// </summary>
void OnStop(IImmutableSet<TopicPartition> topicPartitions, RestrictedConsumer<K, V> consumer);
void OnStop(IImmutableSet<TopicPartition> topicPartitions, IRestrictedConsumer consumer);
}

/// <summary>
/// Dummy handler which does nothing. Also <see cref="IPartitionEventHandler{K,V}"/>
/// Contains internal imlementations of <see cref="IPartitionEventHandler"/>
/// </summary>
internal class EmptyPartitionEventHandler<K, V> : IPartitionEventHandler<K, V>
internal static class PartitionEventHandlers
{
/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, RestrictedConsumer<K, V> consumer)
{
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, RestrictedConsumer<K, V> consumer)
{
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, RestrictedConsumer<K, V> consumer)
{
}
}

/// <summary>
/// Handler allowing to pass custom stage callbacks. Also <see cref="IPartitionEventHandler{K,V}"/>
/// </summary>
internal class AsyncCallbacksPartitionEventHandler<K, V> : IPartitionEventHandler<K, V>
{
private readonly Action<IImmutableSet<TopicPartition>> _partitionAssignedCallback;
private readonly Action<IImmutableSet<TopicPartitionOffset>> _partitionRevokedCallback;

public AsyncCallbacksPartitionEventHandler(Action<IImmutableSet<TopicPartition>> partitionAssignedCallback,
Action<IImmutableSet<TopicPartitionOffset>> partitionRevokedCallback)
{
_partitionAssignedCallback = partitionAssignedCallback;
_partitionRevokedCallback = partitionRevokedCallback;
}

/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, RestrictedConsumer<K, V> consumer)
{
_partitionRevokedCallback(revokedTopicPartitions);
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, RestrictedConsumer<K, V> consumer)
{
_partitionAssignedCallback(assignedTopicPartitions);
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, RestrictedConsumer<K, V> consumer)
{
}
}

/// <summary>
/// Creates new handler with chaining of other two handlers
/// </summary>
/// <typeparam name="K"></typeparam>
/// <typeparam name="V"></typeparam>
internal class PartitionAssignedHandlersChain<K, V> : IPartitionEventHandler<K, V>
{
private readonly IPartitionEventHandler<K, V> _handler1;
private readonly IPartitionEventHandler<K, V> _handler2;

/// <summary>
/// PartitionAssignedHandlersChain
/// Dummy handler which does nothing. Also <see cref="IPartitionEventHandler"/>
/// </summary>
/// <param name="handler1">First handler in chain</param>
/// <param name="handler2">Second handler in chain</param>
public PartitionAssignedHandlersChain(IPartitionEventHandler<K, V> handler1, IPartitionEventHandler<K, V> handler2)
internal class Empty : IPartitionEventHandler
{
_handler1 = handler1;
_handler2 = handler2;
/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
{
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
{
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, IRestrictedConsumer consumer)
{
}
}

/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, RestrictedConsumer<K, V> consumer)
{
_handler1.OnRevoke(revokedTopicPartitions, consumer);
_handler2.OnRevoke(revokedTopicPartitions, consumer);
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, RestrictedConsumer<K, V> consumer)

/// <summary>
/// Handler allowing to pass custom stage callbacks. Also <see cref="IPartitionEventHandler{K,V}"/>
/// </summary>
internal class AsyncCallbacks : IPartitionEventHandler
{
_handler1.OnAssign(assignedTopicPartitions, consumer);
_handler2.OnAssign(assignedTopicPartitions, consumer);
private readonly Action<IImmutableSet<TopicPartition>> _partitionAssignedCallback;
private readonly Action<IImmutableSet<TopicPartitionOffset>> _partitionRevokedCallback;

public AsyncCallbacks(Action<IImmutableSet<TopicPartition>> partitionAssignedCallback,
Action<IImmutableSet<TopicPartitionOffset>> partitionRevokedCallback)
{
_partitionAssignedCallback = partitionAssignedCallback;
_partitionRevokedCallback = partitionRevokedCallback;
}

/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
{
_partitionRevokedCallback(revokedTopicPartitions);
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
{
_partitionAssignedCallback(assignedTopicPartitions);
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, IRestrictedConsumer consumer)
{
}
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, RestrictedConsumer<K, V> consumer)

/// <summary>
/// Handler allowing chain other implementations of <see cref="IPartitionEventHandler"/>
/// </summary>
internal class Chain : IPartitionEventHandler
{
_handler1.OnStop(topicPartitions, consumer);
_handler2.OnStop(topicPartitions, consumer);
private readonly IPartitionEventHandler _handler1;
private readonly IPartitionEventHandler _handler2;

public Chain(IPartitionEventHandler handler1, IPartitionEventHandler handler2)
{
_handler1 = handler1;
_handler2 = handler2;
}

/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
{
_handler1?.OnRevoke(revokedTopicPartitions, consumer);
_handler2?.OnRevoke(revokedTopicPartitions, consumer);
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
{
_handler1?.OnAssign(assignedTopicPartitions, consumer);
_handler2?.OnAssign(assignedTopicPartitions, consumer);
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, IRestrictedConsumer consumer)
{
_handler1?.OnStop(topicPartitions, consumer);
_handler2?.OnStop(topicPartitions, consumer);
}
}
}
}
Loading

0 comments on commit 3dd904e

Please sign in to comment.