Skip to content

Commit

Permalink
PlainPartitionedWithOffset source (#80)
Browse files Browse the repository at this point in the history
* Added KafkaConsumer's static method

* Added offsets test

* Added revoke callback test
  • Loading branch information
IgorFedchenko authored and Aaronontheweb committed Oct 15, 2019
1 parent 3633d97 commit ab26dfc
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
using System;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Helpers;
using Akka.Streams.Kafka.Settings;
using Akka.Streams.TestKit;
using Akka.Streams.Util;
using Confluent.Kafka;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Kafka.Tests.Integration
{
public class PlainPartitionedManualOffsetSourceIntegrationTests : KafkaIntegrationTests
{
public PlainPartitionedManualOffsetSourceIntegrationTests(ITestOutputHelper output, KafkaFixture fixture)
: base(nameof(PlainPartitionedManualOffsetSourceIntegrationTests), output, fixture)
{
}

[Fact]
public async Task PlainPartitionedManualOffsetSource_Should_begin_consuming_from_beginning_of_the_topic()
{
var topic = CreateTopic(1);
var group = CreateGroup(1);
var totalMessages = 100;
var consumerSettings = CreateConsumerSettings<string>(group);

await ProduceStrings(topic, Enumerable.Range(1, totalMessages), ProducerSettings);

var probe = KafkaConsumer.PlainPartitionedManualOffsetSource(
consumerSettings,
Subscriptions.Topics(topic),
getOffsetsOnAssign: _ => Task.FromResult(ImmutableHashSet<TopicPartitionOffset>.Empty as IImmutableSet<TopicPartitionOffset>),
onRevoke: _ => { }
).MergeMany(3, tuple => tuple.Item2.MapMaterializedValue(notUsed => new NoopControl()))
.Select(m => m.Value)
.RunWith(this.SinkProbe<string>(), Materializer);

probe.Request(totalMessages);
var messages = probe.Within(TimeSpan.FromSeconds(10), () => probe.ExpectNextN(totalMessages));
messages.Should().BeEquivalentTo(Enumerable.Range(1, totalMessages).Select(m => m.ToString()));

probe.Cancel();
}

[Fact]
public async Task PlainPartitionedManualOffsetSource_Should_begin_consuming_with_offset()
{
var topic = CreateTopic(1);
var group = CreateGroup(1);
var consumerSettings = CreateConsumerSettings<string>(group);

await ProduceStrings(topic, Enumerable.Range(1, 100), ProducerSettings);

var probe = KafkaConsumer.PlainPartitionedManualOffsetSource(
consumerSettings,
Subscriptions.Topics(topic),
getOffsetsOnAssign: topicPartitions =>
{
// Skip first message from first partition
var firstPartition = topicPartitions.OrderBy(tp => tp.Partition.Value).First();
var offset = ImmutableHashSet<TopicPartitionOffset>.Empty.Add(new TopicPartitionOffset(firstPartition, 1));
return Task.FromResult<IImmutableSet<TopicPartitionOffset>>(offset);
},
onRevoke: _ => { }
).MergeMany(3, tuple => tuple.Item2.MapMaterializedValue(notUsed => new NoopControl()))
.Select(m => m.Value)
.RunWith(this.SinkProbe<string>(), Materializer);

probe.Request(99);
var messages = probe.Within(TimeSpan.FromSeconds(10), () => probe.ExpectNextN(99));
messages.ToHashSet().Count.Should().Be(99); // All consumed messages should be different (only one value is missing)

probe.Cancel();
}

[Fact]
public async Task PlainPartitionedManualOffsetSource_Should_call_the_OnRevoke_hook()
{
var topic = CreateTopic(1);
var group = CreateGroup(1);
var consumerSettings = CreateConsumerSettings<string>(group);

var partitionsAssigned = false;
var revoked = Option<IImmutableSet<TopicPartition>>.None;

var source = KafkaConsumer.PlainPartitionedManualOffsetSource(consumerSettings, Subscriptions.Topics(topic),
assignedPartitions =>
{
partitionsAssigned = true;
return Task.FromResult(ImmutableHashSet<TopicPartitionOffset>.Empty as IImmutableSet<TopicPartitionOffset>);
},
revokedPartitions =>
{
revoked = new Option<IImmutableSet<TopicPartition>>(revokedPartitions);
})
.MergeMany(3, tuple => tuple.Item2.MapMaterializedValue(notUsed => new NoopControl()))
.Select(m => m.Value);

var (control1, firstConsumer) = source.ToMaterialized(this.SinkProbe<string>(), Keep.Both).Run(Materializer);

AwaitCondition(() => partitionsAssigned, TimeSpan.FromSeconds(10), "First consumer should get asked for offsets");

var secondConsumer = source.RunWith(this.SinkProbe<string>(), Materializer);

AwaitCondition(() => revoked.Value?.Count > 0, TimeSpan.FromSeconds(10));

firstConsumer.Cancel();
secondConsumer.Cancel();
AwaitCondition(() => control1.IsShutdown.IsCompletedSuccessfully, TimeSpan.FromSeconds(10));
}
}
}
2 changes: 1 addition & 1 deletion src/Akka.Streams.Kafka.Tests/TestsConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ public static class TestsConfiguration
/// <remarks>
/// When this option is enabled, use docker-compose to start kafka manually (see docker-compose.yml file in the root folder)
/// </remarks>
public static readonly bool UseExistingDockerContainer = false && Environment.GetEnvironmentVariable("AKKA_STREAMS_KAFKA_TEST_CONTAINER_REUSE") != null;
public static readonly bool UseExistingDockerContainer = Environment.GetEnvironmentVariable("AKKA_STREAMS_KAFKA_TEST_CONTAINER_REUSE") != null;
}
}
23 changes: 23 additions & 0 deletions src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,28 @@ public static Source<ConsumeResult<K, V>, IControl> AtMostOnceSource<K, V>(Consu
return message.Record;
});
}

/// <summary>
/// The <see cref="PlainPartitionedManualOffsetSource{K,V}"/> is similar to <see cref="PlainPartitionedSource{K,V}"/>
/// but allows the use of an offset store outside of Kafka, while retaining the automatic partition assignment.
/// When a topic-partition is assigned to a consumer, the <see cref="getOffsetsOnAssign"/>
/// function will be called to retrieve the offset, followed by a seek to the correct spot in the partition.
///
/// The <see cref="onRevoke"/> function gives the consumer a chance to store any uncommitted offsets, and do any other cleanup
/// that is required. Also allows the user access to the `onPartitionsRevoked` hook, useful for cleaning up any
/// partition-specific resources being used by the consumer.
/// </summary>
public static Source<(TopicPartition, Source<ConsumeResult<K, V>, NotUsed>), IControl> PlainPartitionedManualOffsetSource<K, V>(
ConsumerSettings<K, V> settings,
IAutoSubscription subscription,
Func<IImmutableSet<TopicPartition>, Task<IImmutableSet<TopicPartitionOffset>>> getOffsetsOnAssign,
Action<IImmutableSet<TopicPartition>> onRevoke)
{
return Source.FromGraph(new PlainSubSourceStage<K, V>(
settings,
subscription,
new Option<Func<IImmutableSet<TopicPartition>, Task<IImmutableSet<TopicPartitionOffset>>>>(getOffsetsOnAssign),
onRevoke));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ private async void SeekAndEmitSubSources(IImmutableSet<TopicPartition> formerlyU
{
try
{
// Without this delay kafka does not allow to perform seek operation (gives Local: Erroneous state error)
await Task.Delay(50);

await ConsumerActor.Ask(new KafkaConsumerActorMetadata.Internal.Seek(offsets), TimeSpan.FromSeconds(10));

_updatePendingPartitionsAndEmitSubSourcesCallback(formerlyUnknown);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,21 @@ protected override bool Receive(object message)
return true;

case KafkaConsumerActorMetadata.Internal.Seek seek:
seek.Offsets.ForEach(topicPartitionOffset => _consumer.Seek(topicPartitionOffset));
seek.Offsets.ForEach(topicPartitionOffset =>
{
try
{
_consumer.Seek(topicPartitionOffset);
}
catch (Exception ex)
{
_log.Error($"Failed to seek to {topicPartitionOffset}: {ex}");
throw;
}
});
Sender.Tell(Done.Instance);
return true;


case KafkaConsumerActorMetadata.Internal.Committed committed:
_commitRefreshing.Committed(committed.Offsets);
Expand Down

0 comments on commit ab26dfc

Please sign in to comment.