Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for custom IPartitionEventHandler instances in subscriptions #154

Merged
merged 6 commits into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
Expand All @@ -13,6 +14,7 @@
using Akka.Streams.Kafka.Tests.Logging;
using Akka.Streams.Supervision;
using Akka.Streams.TestKit;
using Akka.Util.Internal;
using Confluent.Kafka;
using FluentAssertions;
using Xunit;
Expand Down Expand Up @@ -176,5 +178,60 @@ Directive Decider(Exception cause) => cause is SerializationException
probe.ExpectNoMsg(TimeSpan.FromSeconds(10));
probe.Cancel();
}

[Fact]
public async Task Custom_partition_event_handling_Should_work()
{
int elementsCount = 100;
var topic1 = CreateTopic(1);
var group1 = CreateGroup(1);
var topicPartition1 = new TopicPartition(topic1, 0);

await GivenInitializedTopic(topicPartition1);

await ProduceStrings(new TopicPartition(topic1, 0), Enumerable.Range(1, elementsCount), ProducerSettings);

var consumerSettings = CreateConsumerSettings<string>(group1);

var customHandler = new CustomEventsHandler();
var (control, probe) = CreateProbe(consumerSettings, Subscriptions.Topics(topic1).WithPartitionEventsHandler(customHandler));

probe.Request(elementsCount);
foreach (var i in Enumerable.Range(1, elementsCount).Select(c => c.ToString()))
probe.ExpectNext(i, TimeSpan.FromSeconds(10));

var shutdown = control.Shutdown();
await AwaitConditionAsync(() => shutdown.IsCompleted);

customHandler.AssignmentEventsCounter.Current.Should().BeGreaterThan(0);
customHandler.StopEventsCounter.Current.Should().BeGreaterThan(0);
}

class CustomEventsHandler : IPartitionEventHandler
{
public AtomicCounter AssignmentEventsCounter = new AtomicCounter(0);
public AtomicCounter RevokeEventsCounter = new AtomicCounter(0);
public AtomicCounter StopEventsCounter = new AtomicCounter(0);


/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
IRestrictedConsumer consumer)
{
RevokeEventsCounter.IncrementAndGet();
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
{
AssignmentEventsCounter.IncrementAndGet();
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, IRestrictedConsumer consumer)
{
StopEventsCounter.IncrementAndGet();
}
}
}
}
175 changes: 87 additions & 88 deletions src/Akka.Streams.Kafka/Helpers/PartitionEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,118 +17,117 @@ namespace Akka.Streams.Kafka.Helpers
/// the callbacks in this class are called.
/// </summary>
[ApiMayChange]
internal interface IPartitionEventHandler<K, V>
public interface IPartitionEventHandler
{
/// <summary>
/// Called when partitions are revoked
/// </summary>
void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, RestrictedConsumer<K, V> consumer);
void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer);

/// <summary>
/// Called when partitions are assigned
/// </summary>
void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, RestrictedConsumer<K, V> consumer);
void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer);

/// <summary>
/// Called when consuming is stopped
/// </summary>
void OnStop(IImmutableSet<TopicPartition> topicPartitions, RestrictedConsumer<K, V> consumer);
void OnStop(IImmutableSet<TopicPartition> topicPartitions, IRestrictedConsumer consumer);
}

/// <summary>
/// Dummy handler which does nothing. Also <see cref="IPartitionEventHandler{K,V}"/>
/// Contains internal imlementations of <see cref="IPartitionEventHandler"/>
/// </summary>
internal class EmptyPartitionEventHandler<K, V> : IPartitionEventHandler<K, V>
internal static class PartitionEventHandlers
{
/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, RestrictedConsumer<K, V> consumer)
{
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, RestrictedConsumer<K, V> consumer)
{
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, RestrictedConsumer<K, V> consumer)
{
}
}

/// <summary>
/// Handler allowing to pass custom stage callbacks. Also <see cref="IPartitionEventHandler{K,V}"/>
/// </summary>
internal class AsyncCallbacksPartitionEventHandler<K, V> : IPartitionEventHandler<K, V>
{
private readonly Action<IImmutableSet<TopicPartition>> _partitionAssignedCallback;
private readonly Action<IImmutableSet<TopicPartitionOffset>> _partitionRevokedCallback;

public AsyncCallbacksPartitionEventHandler(Action<IImmutableSet<TopicPartition>> partitionAssignedCallback,
Action<IImmutableSet<TopicPartitionOffset>> partitionRevokedCallback)
{
_partitionAssignedCallback = partitionAssignedCallback;
_partitionRevokedCallback = partitionRevokedCallback;
}

/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, RestrictedConsumer<K, V> consumer)
{
_partitionRevokedCallback(revokedTopicPartitions);
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, RestrictedConsumer<K, V> consumer)
{
_partitionAssignedCallback(assignedTopicPartitions);
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, RestrictedConsumer<K, V> consumer)
{
}
}

/// <summary>
/// Creates new handler with chaining of other two handlers
/// </summary>
/// <typeparam name="K"></typeparam>
/// <typeparam name="V"></typeparam>
internal class PartitionAssignedHandlersChain<K, V> : IPartitionEventHandler<K, V>
{
private readonly IPartitionEventHandler<K, V> _handler1;
private readonly IPartitionEventHandler<K, V> _handler2;

/// <summary>
/// PartitionAssignedHandlersChain
/// Dummy handler which does nothing. Also <see cref="IPartitionEventHandler"/>
/// </summary>
/// <param name="handler1">First handler in chain</param>
/// <param name="handler2">Second handler in chain</param>
public PartitionAssignedHandlersChain(IPartitionEventHandler<K, V> handler1, IPartitionEventHandler<K, V> handler2)
internal class Empty : IPartitionEventHandler
{
_handler1 = handler1;
_handler2 = handler2;
/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
{
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
{
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, IRestrictedConsumer consumer)
{
}
}

/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, RestrictedConsumer<K, V> consumer)
{
_handler1.OnRevoke(revokedTopicPartitions, consumer);
_handler2.OnRevoke(revokedTopicPartitions, consumer);
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, RestrictedConsumer<K, V> consumer)

/// <summary>
/// Handler allowing to pass custom stage callbacks. Also <see cref="IPartitionEventHandler{K,V}"/>
/// </summary>
internal class AsyncCallbacks : IPartitionEventHandler
{
_handler1.OnAssign(assignedTopicPartitions, consumer);
_handler2.OnAssign(assignedTopicPartitions, consumer);
private readonly Action<IImmutableSet<TopicPartition>> _partitionAssignedCallback;
private readonly Action<IImmutableSet<TopicPartitionOffset>> _partitionRevokedCallback;

public AsyncCallbacks(Action<IImmutableSet<TopicPartition>> partitionAssignedCallback,
Action<IImmutableSet<TopicPartitionOffset>> partitionRevokedCallback)
{
_partitionAssignedCallback = partitionAssignedCallback;
_partitionRevokedCallback = partitionRevokedCallback;
}

/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
{
_partitionRevokedCallback(revokedTopicPartitions);
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
{
_partitionAssignedCallback(assignedTopicPartitions);
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, IRestrictedConsumer consumer)
{
}
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, RestrictedConsumer<K, V> consumer)

/// <summary>
/// Handler allowing chain other implementations of <see cref="IPartitionEventHandler"/>
/// </summary>
internal class Chain : IPartitionEventHandler
{
_handler1.OnStop(topicPartitions, consumer);
_handler2.OnStop(topicPartitions, consumer);
private readonly IPartitionEventHandler _handler1;
private readonly IPartitionEventHandler _handler2;

public Chain(IPartitionEventHandler handler1, IPartitionEventHandler handler2)
{
_handler1 = handler1;
_handler2 = handler2;
}

/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
{
_handler1?.OnRevoke(revokedTopicPartitions, consumer);
_handler2?.OnRevoke(revokedTopicPartitions, consumer);
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
{
_handler1?.OnAssign(assignedTopicPartitions, consumer);
_handler2?.OnAssign(assignedTopicPartitions, consumer);
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, IRestrictedConsumer consumer)
{
_handler1?.OnStop(topicPartitions, consumer);
_handler2?.OnStop(topicPartitions, consumer);
}
}
}
}
66 changes: 44 additions & 22 deletions src/Akka.Streams.Kafka/Helpers/RestrictedConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,54 +9,76 @@ namespace Akka.Streams.Kafka.Helpers
/// <summary>
/// Offers parts of <see cref="IConsumer{K,V}"/> API which becomes available to <see cref="IPartitionEventHandler"/>
/// </summary>
public class RestrictedConsumer<K, V>
public interface IRestrictedConsumer
{
/// <summary>
/// See <see cref="IConsumer{TKey,TValue}.Assignment"/>
/// </summary>
List<TopicPartition> Assignment { get; }

/// <summary>
/// Get the first offset for the given partitions.
/// </summary>
List<TopicPartitionOffset> BeginningOffsets(IEnumerable<TopicPartition> topicPartitions);

/// <summary>
/// See <see cref="IConsumer{TKey,TValue}.Commit()"/>
/// </summary>
void CommitSync(IEnumerable<TopicPartitionOffset> offsets);

/// <summary>
/// See <see cref="IConsumer{TKey,TValue}.Committed"/>
/// </summary>
void Committed(IEnumerable<TopicPartition> topicPartitions);

/// <summary>
/// See <see cref="IConsumer{TKey,TValue}.Position"/>
/// </summary>
Offset Position(TopicPartition topicPartition);

/// <summary>
/// See <see cref="IConsumer{TKey,TValue}.Seek"/>
/// </summary>
void Seek(TopicPartitionOffset topicPartitionOffset);
}

/// <summary>
/// Offers parts of <see cref="IConsumer{K,V}"/> API which becomes available to <see cref="IPartitionEventHandler"/>
/// </summary>
internal class RestrictedConsumer<K, V> : IRestrictedConsumer
{
private readonly IConsumer<K, V> _consumer;
private readonly TimeSpan _duration;

/// <summary>
///
/// RestrictedConsumer
/// </summary>
/// <param name="consumer"></param>
/// <param name="duration"></param>
public RestrictedConsumer(IConsumer<K, V> consumer, TimeSpan duration)
{
_consumer = consumer;
_duration = duration;
}

/// <summary>
/// See <see cref="IConsumer{TKey,TValue}.Assignment"/>
/// </summary>
/// <inheritdoc />
public List<TopicPartition> Assignment => _consumer.Assignment;

/// <summary>
/// Get the first offset for the given partitions.
/// </summary>
/// <inheritdoc />
public List<TopicPartitionOffset> BeginningOffsets(IEnumerable<TopicPartition> topicPartitions)
{
var timestamps = topicPartitions.Select(tp => new TopicPartitionTimestamp(tp, new Timestamp(0, TimestampType.NotAvailable))).ToList();
return _consumer.OffsetsForTimes(timestamps, _duration);
}

/// <summary>
/// See <see cref="IConsumer{TKey,TValue}.Commit()"/>
/// </summary>
/// <inheritdoc />
public void CommitSync(IEnumerable<TopicPartitionOffset> offsets) => _consumer.Commit(offsets);

/// <summary>
/// See <see cref="IConsumer{TKey,TValue}.Committed"/>
/// </summary>
/// <inheritdoc />
public void Committed(IEnumerable<TopicPartition> topicPartitions) => _consumer.Committed(topicPartitions, _duration);

/// <summary>
/// See <see cref="IConsumer{TKey,TValue}.Position"/>
/// </summary>
/// <inheritdoc />
public Offset Position(TopicPartition topicPartition) => _consumer.Position(topicPartition);

/// <summary>
/// See <see cref="IConsumer{TKey,TValue}.Seek"/>
/// </summary>
/// <inheritdoc />
public void Seek(TopicPartitionOffset topicPartitionOffset) => _consumer.Seek(topicPartitionOffset);

}
Expand Down
Loading