Skip to content

Commit

Permalink
Committable partitioned source (#79)
Browse files Browse the repository at this point in the history
* Implemented SubSourceLogic

* Replaced null with Option.None for option parameter

* Fixed xml doc reference

* Added numbers to logger formatted string

* Implemented subsource stage logic

* Fixed base stage xml comment

* Added one test (failing)

* Some fixes

* Updated all stages to use IControl as materialized value

* Fixed PerformShutdown overriding

* Updated tests

* Removed temporary debugging flag

* Changed the way to override `PromiseControl` virtual methods

* Updated implementation to use IControl

* Applied minor fixes and refactoring

* Minor refactoring

* Fixed immutable collections usage

* Fixed test and added one more

* Added more tests

* Added serialization failure test

* Added failover tests

* Skipping AtMostOnce test

* Fixed AtMostOnceSource test messages ordering

* Fixed typo

* Removed nightly feed

* Added CommittablePartitionedSource implementation

* Added tests

* Nothing
  • Loading branch information
IgorFedchenko authored and Aaronontheweb committed Oct 15, 2019
1 parent 8e2faf7 commit 3633d97
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Helpers;
using Akka.Streams.Kafka.Settings;
using Akka.Util;
using Confluent.Kafka;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Kafka.Tests.Integration
{
public class CommittablePartitionedSourceIntegrationTests : KafkaIntegrationTests
{
public CommittablePartitionedSourceIntegrationTests(ITestOutputHelper output, KafkaFixture fixture)
: base(nameof(CommittablePartitionedSourceIntegrationTests), output, fixture)
{
}

[Fact]
public async Task CommittablePartitionedSource_Should_handle_exceptions_in_stream_without_commit_failures()
{
var partitionsCount = 3;
var topic = CreateTopic(1);
var group = CreateGroup(1);
var totalMessages = 100;
var exceptionTriggered = new AtomicBoolean(false);
var allTopicPartitions = Enumerable.Range(0, partitionsCount).Select(i => new TopicPartition(topic, i)).ToList();

var consumerSettings = CreateConsumerSettings<string>(group).WithStopTimeout(TimeSpan.FromSeconds(2));

var createdSubSources = new ConcurrentSet<TopicPartition>();
var commitFailures = new ConcurrentSet<(TopicPartition, Exception)>();

var control = KafkaConsumer.CommittablePartitionedSource(consumerSettings, Subscriptions.Topics(topic))
.GroupBy(partitionsCount, tuple => tuple.Item1)
.SelectAsync(6, tuple =>
{
var (topicPartition, source) = tuple;
createdSubSources.TryAdd(topicPartition);
return source
.Log($"Subsource for partition #{topicPartition.Partition.Value}", m => m.Record.Value)
.SelectAsync(3, async message =>
{
// fail on first partition; otherwise delay slightly and emit
if (topicPartition.Partition.Value == 0)
{
Log.Debug($"Failing {topicPartition} source");
exceptionTriggered.GetAndSet(true);
throw new Exception("FAIL");
}
else
{
await Task.Delay(50);
}

return message;
})
.Log($"Subsource {topicPartition} pre commit")
.SelectAsync(1, async message =>
{
try
{
await message.CommitableOffset.Commit();
}
catch (Exception ex)
{
Log.Error("Commit failure: " + ex);
commitFailures.TryAdd((topicPartition, ex));
}

return message;
})
.Scan(0, (c, _) => c + 1)
.RunWith(Sink.Last<int>(), Materializer)
.ContinueWith(t =>
{
Log.Info($"sub-source for {topicPartition} completed: Received {t.Result} messages in total.");
return t.Result;
});
})
.MergeSubstreams().As<Source<int, IControl>>()
.Scan(0, (c, n) => c + n)
.ToMaterialized(Sink.Last<int>(), Keep.Both)
.MapMaterializedValue(tuple => DrainingControl<int>.Create(tuple.Item1, tuple.Item2))
.Run(Materializer);

await ProduceStrings(i => new TopicPartition(topic, i % partitionsCount), Enumerable.Range(1, totalMessages), ProducerSettings);

AwaitCondition(() => exceptionTriggered.Value, TimeSpan.FromSeconds(10));

var shutdown = control.DrainAndShutdown();
AwaitCondition(() => shutdown.IsCompleted);
createdSubSources.Should().Contain(allTopicPartitions);
shutdown.Exception.GetBaseException().Message.Should().Be("FAIL");

// commits will fail if we shut down the consumer too early
commitFailures.Should().BeEmpty();

}
}
}
8 changes: 8 additions & 0 deletions src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ await Source
.RunWith(KafkaProducer.PlainSink(producerSettings), Materializer);
}

protected async Task ProduceStrings(Func<int, TopicPartition> partitionSelector, IEnumerable<int> range, ProducerSettings<Null, string> producerSettings)
{
await Source
.From(range)
.Select(elem => new MessageAndMeta<Null, string> { TopicPartition = partitionSelector(elem), Message = new Message<Null, string> { Value = elem.ToString() } })
.RunWith(KafkaProducer.PlainSink(producerSettings), Materializer);
}

protected async Task ProduceStrings(TopicPartition topicPartition, IEnumerable<int> range, ProducerSettings<Null, string> producerSettings)
{
await Source
Expand Down
13 changes: 11 additions & 2 deletions src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,22 @@ public static SourceWithContext<ICommittableOffset, ConsumeResult<K, V>, IContro
/// The same as <see cref="PlainExternalSource{K,V}"/> but for offset commit support
/// </summary>
public static Source<CommittableMessage<K, V>, IControl> CommittableExternalSource<K, V>(IActorRef consumer, IManualSubscription subscription,
string groupId, TimeSpan commitTimeout)
string groupId, TimeSpan commitTimeout)
{
return Source.FromGraph(new ExternalCommittableSourceStage<K, V>(consumer, groupId, commitTimeout, subscription));
}

/// <summary>
/// Convenience for "at-most once delivery" semantics.
/// The same as <see cref="PlainPartitionedSource{K,V}"/> but with offset commit support.
/// </summary>
public static Source<(TopicPartition, Source<CommittableMessage<K, V>, NotUsed>), IControl> CommittablePartitionedSource<K, V>(
ConsumerSettings<K, V> settings, IAutoSubscription subscription)
{
return Source.FromGraph(new CommittableSubSourceStage<K, V>(settings, subscription));
}

/// <summary>
/// Convenience for "at-most once delivery" semantics.
/// The offset of each message is committed to Kafka before being emitted downstream.
/// </summary>
public static Source<ConsumeResult<K, V>, IControl> AtMostOnceSource<K, V>(ConsumerSettings<K, V> settings, ISubscription subscription)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private class CloseRevokedPartitions { }


protected StageActor SourceActor { get; private set; }
protected IActorRef ConsumerActor { get; private set; }
public IActorRef ConsumerActor { get; private set; }

public PromiseControl<(TopicPartition, Source<TMessage, NotUsed>)> Control { get; }

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Helpers;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Settings;
using Akka.Streams.Kafka.Stages.Consumers.Abstract;
using Akka.Streams.Stage;
using Akka.Streams.Util;
using Confluent.Kafka;

namespace Akka.Streams.Kafka.Stages.Consumers.Concrete
{
public class CommittableSubSourceStage<K, V> : KafkaSourceStage<K, V, (TopicPartition, Source<CommittableMessage<K, V>, NotUsed>)>
{
private readonly Func<ConsumeResult<K, V>, string> _metadataFromRecord;

/// <summary>
/// Consumer settings
/// </summary>
public ConsumerSettings<K, V> Settings { get; }
/// <summary>
/// Subscription
/// </summary>
public IAutoSubscription Subscription { get; }

public CommittableSubSourceStage(ConsumerSettings<K, V> settings, IAutoSubscription subscription, Func<ConsumeResult<K, V>, string> metadataFromRecord = null)
: base("CommittableSubSourceStage")
{
Settings = settings;
Subscription = subscription;
_metadataFromRecord = metadataFromRecord ?? (_ => string.Empty);
}

protected override (GraphStageLogic, IControl) Logic(SourceShape<(TopicPartition, Source<CommittableMessage<K, V>, NotUsed>)> shape, Attributes inheritedAttributes)
{
var logic = new SubSourceLogic<K, V, CommittableMessage<K, V>>(shape, Settings, Subscription,
messageBuilderFactory: GetMessageBuilder,
getOffsetsOnAssign: Option<Func<IImmutableSet<TopicPartition>, Task<IImmutableSet<TopicPartitionOffset>>>>.None,
onRevoke: _ => { },
attributes: inheritedAttributes);
return (logic, logic.Control);
}

/// <summary>
/// Creates message builder for sub-source logic
/// </summary>
private CommittableSourceMessageBuilder<K, V> GetMessageBuilder(SubSourceLogic<K, V, CommittableMessage<K, V>> logic)
{
var committer = new KafkaAsyncConsumerCommitter(() => logic.ConsumerActor, Settings.CommitTimeout);
return new CommittableSourceMessageBuilder<K, V>(committer, Settings.GroupId, _metadataFromRecord);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,16 @@ public PlainSubSourceStage(ConsumerSettings<K, V> settings, IAutoSubscription su
OnRevoke = onRevoke;
}

/// <inheritdoc />
protected override (GraphStageLogic, IControl) Logic(SourceShape<(TopicPartition, Source<ConsumeResult<K, V>, NotUsed>)> shape,
Attributes inheritedAttributes)
{
var logic = new SubSourceLogic<K, V, ConsumeResult<K, V>>(shape, Settings, Subscription, _ => new PlainMessageBuilder<K, V>(),
GetOffsetsOnAssign, OnRevoke, inheritedAttributes);
var logic = new SubSourceLogic<K, V, ConsumeResult<K, V>>(shape, Settings, Subscription,
messageBuilderFactory: _ => new PlainMessageBuilder<K, V>(),
getOffsetsOnAssign: GetOffsetsOnAssign,
onRevoke: OnRevoke,
attributes: inheritedAttributes);

return (logic, logic.Control);
}
}
Expand Down

0 comments on commit 3633d97

Please sign in to comment.