Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Committable partitioned source #79

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
87b2a65
Implemented SubSourceLogic
IgorFedchenko Sep 28, 2019
6a4d72c
Replaced null with Option.None for option parameter
IgorFedchenko Sep 28, 2019
0d58597
Fixed xml doc reference
IgorFedchenko Sep 28, 2019
d67b9e3
Merge branch 'dev' into plain_partitioned_source
IgorFedchenko Sep 29, 2019
6076cb4
Added numbers to logger formatted string
IgorFedchenko Oct 1, 2019
728efc2
Implemented subsource stage logic
IgorFedchenko Oct 1, 2019
6d72ba0
Fixed base stage xml comment
IgorFedchenko Oct 1, 2019
47c5a61
Added one test (failing)
IgorFedchenko Oct 1, 2019
c7e2c23
Some fixes
IgorFedchenko Oct 4, 2019
8d10f96
Updated all stages to use IControl as materialized value
IgorFedchenko Oct 4, 2019
8c5ad54
Fixed PerformShutdown overriding
IgorFedchenko Oct 5, 2019
2f68bd7
Updated tests
IgorFedchenko Oct 5, 2019
18672fd
Merge branch 'dev' into control_implementation
IgorFedchenko Oct 5, 2019
e9f5752
Removed temporary debugging flag
IgorFedchenko Oct 5, 2019
1f81237
Merge branch 'dev' into plain_partitioned_source
IgorFedchenko Oct 5, 2019
223be17
Merge branch 'control_implementation' into plain_partitioned_source
IgorFedchenko Oct 5, 2019
77c5c4e
Changed the way to override `PromiseControl` virtual methods
IgorFedchenko Oct 5, 2019
0e8a2e8
Merge branch 'control_implementation' into plain_partitioned_source
IgorFedchenko Oct 5, 2019
7c7cc29
Updated implementation to use IControl
IgorFedchenko Oct 5, 2019
9c9e026
Applied minor fixes and refactoring
IgorFedchenko Oct 5, 2019
ccc2cdf
Merge branch 'control_implementation' into plain_partitioned_source
IgorFedchenko Oct 5, 2019
5856178
Minor refactoring
IgorFedchenko Oct 5, 2019
bcb4c11
Fixed immutable collections usage
IgorFedchenko Oct 7, 2019
1e05086
Merge branch 'dev' into plain_partitioned_source
IgorFedchenko Oct 7, 2019
71f0776
Fixed test and added one more
IgorFedchenko Oct 7, 2019
389a26c
Added more tests
IgorFedchenko Oct 8, 2019
d6f8370
Added serialization failure test
IgorFedchenko Oct 8, 2019
c0c16e9
Added failover tests
IgorFedchenko Oct 8, 2019
75aaa2f
Skipping AtMostOnce test
IgorFedchenko Oct 8, 2019
b68e252
Fixed AtMostOnceSource test messages ordering
IgorFedchenko Oct 8, 2019
aea506b
Merge branch 'dev' into plain_partitioned_source
IgorFedchenko Oct 10, 2019
f2159b8
Fixed typo
IgorFedchenko Oct 10, 2019
64d6be2
Merge branch 'dev' into plain_partitioned_source
IgorFedchenko Oct 10, 2019
d6abcc9
Removed nightly feed
IgorFedchenko Oct 10, 2019
19e0578
Merge branch 'remove_nightly_nugets' into committable_partitioned_source
IgorFedchenko Oct 11, 2019
c0d1e5e
Added CommittablePartitionedSource implementation
IgorFedchenko Oct 11, 2019
295449c
Added tests
IgorFedchenko Oct 11, 2019
94f46d4
Merge branch 'dev' into committable_partitioned_source
IgorFedchenko Oct 11, 2019
1f9353e
Nothing
IgorFedchenko Oct 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Helpers;
using Akka.Streams.Kafka.Settings;
using Akka.Util;
using Confluent.Kafka;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Kafka.Tests.Integration
{
public class CommittablePartitionedSourceIntegrationTests : KafkaIntegrationTests
{
public CommittablePartitionedSourceIntegrationTests(ITestOutputHelper output, KafkaFixture fixture)
: base(nameof(CommittablePartitionedSourceIntegrationTests), output, fixture)
{
}

[Fact]
public async Task CommittablePartitionedSource_Should_handle_exceptions_in_stream_without_commit_failures()
{
var partitionsCount = 3;
var topic = CreateTopic(1);
var group = CreateGroup(1);
var totalMessages = 100;
var exceptionTriggered = new AtomicBoolean(false);
var allTopicPartitions = Enumerable.Range(0, partitionsCount).Select(i => new TopicPartition(topic, i)).ToList();

var consumerSettings = CreateConsumerSettings<string>(group).WithStopTimeout(TimeSpan.FromSeconds(2));

var createdSubSources = new ConcurrentSet<TopicPartition>();
var commitFailures = new ConcurrentSet<(TopicPartition, Exception)>();

var control = KafkaConsumer.CommittablePartitionedSource(consumerSettings, Subscriptions.Topics(topic))
.GroupBy(partitionsCount, tuple => tuple.Item1)
.SelectAsync(6, tuple =>
{
var (topicPartition, source) = tuple;
createdSubSources.TryAdd(topicPartition);
return source
.Log($"Subsource for partition #{topicPartition.Partition.Value}", m => m.Record.Value)
.SelectAsync(3, async message =>
{
// fail on first partition; otherwise delay slightly and emit
if (topicPartition.Partition.Value == 0)
{
Log.Debug($"Failing {topicPartition} source");
exceptionTriggered.GetAndSet(true);
throw new Exception("FAIL");
}
else
{
await Task.Delay(50);
}

return message;
})
.Log($"Subsource {topicPartition} pre commit")
.SelectAsync(1, async message =>
{
try
{
await message.CommitableOffset.Commit();
}
catch (Exception ex)
{
Log.Error("Commit failure: " + ex);
commitFailures.TryAdd((topicPartition, ex));
}

return message;
})
.Scan(0, (c, _) => c + 1)
.RunWith(Sink.Last<int>(), Materializer)
.ContinueWith(t =>
{
Log.Info($"sub-source for {topicPartition} completed: Received {t.Result} messages in total.");
return t.Result;
});
})
.MergeSubstreams().As<Source<int, IControl>>()
.Scan(0, (c, n) => c + n)
.ToMaterialized(Sink.Last<int>(), Keep.Both)
.MapMaterializedValue(tuple => DrainingControl<int>.Create(tuple.Item1, tuple.Item2))
.Run(Materializer);

await ProduceStrings(i => new TopicPartition(topic, i % partitionsCount), Enumerable.Range(1, totalMessages), ProducerSettings);

AwaitCondition(() => exceptionTriggered.Value, TimeSpan.FromSeconds(10));

var shutdown = control.DrainAndShutdown();
AwaitCondition(() => shutdown.IsCompleted);
createdSubSources.Should().Contain(allTopicPartitions);
shutdown.Exception.GetBaseException().Message.Should().Be("FAIL");

// commits will fail if we shut down the consumer too early
commitFailures.Should().BeEmpty();

}
}
}
8 changes: 8 additions & 0 deletions src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ await Source
.RunWith(KafkaProducer.PlainSink(producerSettings), Materializer);
}

protected async Task ProduceStrings(Func<int, TopicPartition> partitionSelector, IEnumerable<int> range, ProducerSettings<Null, string> producerSettings)
{
await Source
.From(range)
.Select(elem => new MessageAndMeta<Null, string> { TopicPartition = partitionSelector(elem), Message = new Message<Null, string> { Value = elem.ToString() } })
.RunWith(KafkaProducer.PlainSink(producerSettings), Materializer);
}

protected async Task ProduceStrings(TopicPartition topicPartition, IEnumerable<int> range, ProducerSettings<Null, string> producerSettings)
{
await Source
Expand Down
13 changes: 11 additions & 2 deletions src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,22 @@ public static SourceWithContext<ICommittableOffset, ConsumeResult<K, V>, IContro
/// The same as <see cref="PlainExternalSource{K,V}"/> but for offset commit support
/// </summary>
public static Source<CommittableMessage<K, V>, IControl> CommittableExternalSource<K, V>(IActorRef consumer, IManualSubscription subscription,
string groupId, TimeSpan commitTimeout)
string groupId, TimeSpan commitTimeout)
{
return Source.FromGraph(new ExternalCommittableSourceStage<K, V>(consumer, groupId, commitTimeout, subscription));
}

/// <summary>
/// Convenience for "at-most once delivery" semantics.
/// The same as <see cref="PlainPartitionedSource{K,V}"/> but with offset commit support.
/// </summary>
public static Source<(TopicPartition, Source<CommittableMessage<K, V>, NotUsed>), IControl> CommittablePartitionedSource<K, V>(
ConsumerSettings<K, V> settings, IAutoSubscription subscription)
{
return Source.FromGraph(new CommittableSubSourceStage<K, V>(settings, subscription));
}

/// <summary>
/// Convenience for "at-most once delivery" semantics.
/// The offset of each message is committed to Kafka before being emitted downstream.
/// </summary>
public static Source<ConsumeResult<K, V>, IControl> AtMostOnceSource<K, V>(ConsumerSettings<K, V> settings, ISubscription subscription)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private class CloseRevokedPartitions { }


protected StageActor SourceActor { get; private set; }
protected IActorRef ConsumerActor { get; private set; }
public IActorRef ConsumerActor { get; private set; }

public PromiseControl<(TopicPartition, Source<TMessage, NotUsed>)> Control { get; }

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Helpers;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Settings;
using Akka.Streams.Kafka.Stages.Consumers.Abstract;
using Akka.Streams.Stage;
using Akka.Streams.Util;
using Confluent.Kafka;

namespace Akka.Streams.Kafka.Stages.Consumers.Concrete
{
public class CommittableSubSourceStage<K, V> : KafkaSourceStage<K, V, (TopicPartition, Source<CommittableMessage<K, V>, NotUsed>)>
{
private readonly Func<ConsumeResult<K, V>, string> _metadataFromRecord;

/// <summary>
/// Consumer settings
/// </summary>
public ConsumerSettings<K, V> Settings { get; }
/// <summary>
/// Subscription
/// </summary>
public IAutoSubscription Subscription { get; }

public CommittableSubSourceStage(ConsumerSettings<K, V> settings, IAutoSubscription subscription, Func<ConsumeResult<K, V>, string> metadataFromRecord = null)
: base("CommittableSubSourceStage")
{
Settings = settings;
Subscription = subscription;
_metadataFromRecord = metadataFromRecord ?? (_ => string.Empty);
}

protected override (GraphStageLogic, IControl) Logic(SourceShape<(TopicPartition, Source<CommittableMessage<K, V>, NotUsed>)> shape, Attributes inheritedAttributes)
{
var logic = new SubSourceLogic<K, V, CommittableMessage<K, V>>(shape, Settings, Subscription,
messageBuilderFactory: GetMessageBuilder,
getOffsetsOnAssign: Option<Func<IImmutableSet<TopicPartition>, Task<IImmutableSet<TopicPartitionOffset>>>>.None,
onRevoke: _ => { },
attributes: inheritedAttributes);
return (logic, logic.Control);
}

/// <summary>
/// Creates message builder for sub-source logic
/// </summary>
private CommittableSourceMessageBuilder<K, V> GetMessageBuilder(SubSourceLogic<K, V, CommittableMessage<K, V>> logic)
{
var committer = new KafkaAsyncConsumerCommitter(() => logic.ConsumerActor, Settings.CommitTimeout);
return new CommittableSourceMessageBuilder<K, V>(committer, Settings.GroupId, _metadataFromRecord);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,16 @@ public PlainSubSourceStage(ConsumerSettings<K, V> settings, IAutoSubscription su
OnRevoke = onRevoke;
}

/// <inheritdoc />
protected override (GraphStageLogic, IControl) Logic(SourceShape<(TopicPartition, Source<ConsumeResult<K, V>, NotUsed>)> shape,
Attributes inheritedAttributes)
{
var logic = new SubSourceLogic<K, V, ConsumeResult<K, V>>(shape, Settings, Subscription, _ => new PlainMessageBuilder<K, V>(),
GetOffsetsOnAssign, OnRevoke, inheritedAttributes);
var logic = new SubSourceLogic<K, V, ConsumeResult<K, V>>(shape, Settings, Subscription,
messageBuilderFactory: _ => new PlainMessageBuilder<K, V>(),
getOffsetsOnAssign: GetOffsetsOnAssign,
onRevoke: OnRevoke,
attributes: inheritedAttributes);

return (logic, logic.Control);
}
}
Expand Down