diff --git a/src/Akka.Streams.Kafka.Tests/Integration/AtMostOnceSourceIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/AtMostOnceSourceIntegrationTests.cs index 58837286..71f3b50f 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/AtMostOnceSourceIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/AtMostOnceSourceIntegrationTests.cs @@ -60,7 +60,7 @@ public async Task AtMostOnceSource_Should_work() probe.Request(10); - AwaitCondition(() => lastMessage.Task.IsCompletedSuccessfully, TimeSpan.FromSeconds(15)); + AwaitCondition(() => lastMessage.Task.IsCompletedSuccessfully, TimeSpan.FromSeconds(30)); probe.Cancel(); diff --git a/src/Akka.Streams.Kafka.Tests/Integration/SourceWithOffsetContextIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/SourceWithOffsetContextIntegrationTests.cs new file mode 100644 index 00000000..95e54864 --- /dev/null +++ b/src/Akka.Streams.Kafka.Tests/Integration/SourceWithOffsetContextIntegrationTests.cs @@ -0,0 +1,53 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Akka.Streams.Dsl; +using Akka.Streams.Kafka.Dsl; +using Akka.Streams.Kafka.Helpers; +using Akka.Streams.Kafka.Messages; +using Akka.Streams.Kafka.Settings; +using Akka.Streams.TestKit; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Streams.Kafka.Tests.Integration +{ + public class SourceWithOffsetContextIntegrationTests : KafkaIntegrationTests + { + public SourceWithOffsetContextIntegrationTests(ITestOutputHelper output, KafkaFixture fixture) + : base(nameof(SourceWithOffsetContextIntegrationTests), output, fixture) + { + } + + [Fact] + public async Task SourceWithOffsetContext_at_least_once_consuming_should_work() + { + var topic = CreateTopic(1); + var settings = CreateConsumerSettings(CreateGroup(1)); + var elementCount = 10; + var batchSize = 2; + var messages = Enumerable.Range(1, elementCount).ToList(); + + await ProduceStrings(topic, messages, ProducerSettings); + + var committerSettings = CommitterSettings.WithMaxBatch(batchSize); + + var (task, probe) = KafkaConsumer.SourceWithOffsetContext(settings, Subscriptions.Topics(topic)) + .SelectAsync(10, message => Task.FromResult(Done.Instance)) + .Via(Committer.FlowWithOffsetContext(committerSettings)) + .AsSource() + .ToMaterialized(this.SinkProbe>(), Keep.Both) + .Run(Materializer); + + probe.Request(10); + var committedBatches = probe.Within(TimeSpan.FromSeconds(10), () => probe.ExpectNextN(elementCount / batchSize)); + + probe.Cancel(); + + AwaitCondition(() => task.IsCompletedSuccessfully, TimeSpan.FromSeconds(10)); + + committedBatches.Select(r => r.Item2).Sum(batch => batch.BatchSize).Should().Be(10); + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs index b6541347..87dd4f4b 100644 --- a/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs @@ -42,6 +42,11 @@ protected ProducerSettings ProducerSettings get => ProducerSettings.Create(Sys, null, null).WithBootstrapServers(_fixture.KafkaServer); } + protected CommitterSettings CommitterSettings + { + get => CommitterSettings.Create(Sys); + } + protected ConsumerSettings CreateConsumerSettings(string group) { return ConsumerSettings.Create(Sys, null, null) diff --git a/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs b/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs index f477f257..efcecee4 100644 --- a/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs +++ b/src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs @@ -1,7 +1,9 @@ using System; using System.Threading.Tasks; using Akka.Actor; +using Akka.Annotations; using Akka.Streams.Dsl; +using Akka.Streams.Kafka.Helpers; using Akka.Streams.Kafka.Settings; using Akka.Streams.Kafka.Stages; using Confluent.Kafka; @@ -53,6 +55,26 @@ public static Source, Task> CommittableSource(Con return Source.FromGraph(new CommittableSourceStage(settings, subscription)); } + /// + /// API MAY CHANGE + /// + /// This source emits together with the offset position as flow context, thus makes it possible + /// to commit offset positions to Kafka. + /// This is useful when "at-least once delivery" is desired, as each message will likely be + /// delivered one time but in failure cases could be duplicated. + /// + /// It is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html), + /// and/or + /// + [ApiMayChange] + public static SourceWithContext, Task> SourceWithOffsetContext( + ConsumerSettings settings, ISubscription subscription, Func, string> metadataFromRecord = null) + { + return Source.FromGraph(new SourceWithOffsetContextStage(settings, subscription, metadataFromRecord)) + .AsSourceWithContext(m => m.Item2) + .Select(m => m.Item1); + } + /// /// The same as but for offset commit support /// diff --git a/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs b/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs index ff828e29..b708a7e4 100644 --- a/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs +++ b/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs @@ -1,4 +1,5 @@ -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; @@ -66,5 +67,11 @@ public static Flow, DeliveryReport, N ? flow : flow.WithAttributes(ActorAttributes.CreateDispatcher(settings.DispatcherId)); } + + // TODO + public static FlowWithContext, C, DeliveryReport, C, NotUsed> FlowWithContext(ProducerSettings settings) + { + throw new NotImplementedException(); + } } } diff --git a/src/Akka.Streams.Kafka/Helpers/Committer.cs b/src/Akka.Streams.Kafka/Helpers/Committer.cs new file mode 100644 index 00000000..cb88286a --- /dev/null +++ b/src/Akka.Streams.Kafka/Helpers/Committer.cs @@ -0,0 +1,75 @@ +using System; +using System.Threading.Tasks; +using Akka.Annotations; +using Akka.Streams.Dsl; +using Akka.Streams.Kafka.Messages; +using Akka.Streams.Kafka.Settings; + +namespace Akka.Streams.Kafka.Helpers +{ + /// + /// Implements committing flows + /// + public static class Committer + { + /// + /// Batches offsets and commits them to Kafka, emits for every committed batch. + /// + public static Flow BatchFlow(CommitterSettings settings) + { + return Akka.Streams.Dsl.Flow.Create().GroupedWithin(settings.MaxBatch, settings.MaxInterval) + .Select(CommittableOffsetBatch.Create) + .SelectAsync(settings.Parallelism, async batch => + { + await batch.Commit(); + return batch; + }); + } + + /// + /// Batches offsets and commits them to Kafka, emits for every committed batch. + /// + public static Flow Flow(CommitterSettings settings) + { + return BatchFlow(settings).Select(_ => Done.Instance); + } + + /// + /// API MAY CHANGE + /// + /// Batches offsets from context and commits them to Kafka, emits no useful value, + /// but keeps the committed as context + /// + [ApiMayChange] + public static FlowWithContext FlowWithOffsetContext(CommitterSettings settings) + { + var value = Akka.Streams.Dsl.Flow.Create>() + .Select(m => m.Item2 as ICommittable) + .Via(BatchFlow(settings)) + .Select(b => Tuple.Create(NotUsed.Instance, b)); + + return FlowWithContext.From(value); + } + + /// + /// Batches offsets and commits them to Kafka. + /// + public static Sink Sink(CommitterSettings settings) + { + return Flow(settings).ToMaterialized(Streams.Dsl.Sink.Ignore(), Keep.Right); + } + + /// + /// API MAY CHANGE + /// + /// Batches offsets from context and commits them to Kafka. + /// + [ApiMayChange] + public static Sink, Task> SinkWithOffsetContext(CommitterSettings settings) + { + return Akka.Streams.Dsl.Flow.Create>() + .Via(FlowWithOffsetContext(settings)) + .ToMaterialized(Streams.Dsl.Sink.Ignore>(), Keep.Right); + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Messages/Committable.cs b/src/Akka.Streams.Kafka/Messages/Committable.cs new file mode 100644 index 00000000..fe7512cc --- /dev/null +++ b/src/Akka.Streams.Kafka/Messages/Committable.cs @@ -0,0 +1,66 @@ +using System.Collections.Immutable; +using System.Threading.Tasks; + +namespace Akka.Streams.Kafka.Messages +{ + /// + /// Commit an offset that is included in a + /// If you need to store offsets in anything other than Kafka, this API + /// should not be used. + /// + public interface ICommittable + { + /// + /// Commits an offset that is included in a + /// + Task Commit(); + /// + /// Get a number of processed messages this committable contains + /// + long BatchSize { get; } + } + + /// + /// For improved efficiency it is good to aggregate several , + /// using this class, befoe them. + /// Start with + /// + public interface ICommittableOffsetBatch : ICommittable + { + /// + /// Add/overwrite an offset position from another committable. + /// + ICommittableOffsetBatch Updated(ICommittable offset); + /// + /// Get current offset positions + /// + IImmutableSet Offsets { get; } + } + + /// + /// Included in . Makes it possible to + /// commit an offset or aggregate several offsets before committing. + /// Note that the offset position that is committed to Kafka will automatically + /// be one more than the `offset` of the message, because the committed offset + /// should be the next message your application will consume, + /// i.e. lastProcessedMessageOffset + 1. + /// + public interface ICommittableOffset : ICommittable + { + /// + /// Offset value + /// + GroupTopicPartitionOffset Offset { get; } + } + + /// + /// Extends with some metadata + /// + public interface ICommittableOffsetMetadata : ICommittableOffset + { + /// + /// Cosumed record metadata + /// + string Metadata { get; } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs b/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs index 49db9a85..bc5c38d5 100644 --- a/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs +++ b/src/Akka.Streams.Kafka/Messages/CommittableMessage.cs @@ -1,9 +1,6 @@ -using System; -using System.Collections.Generic; -using System.Collections.Immutable; +using System.Collections.Immutable; using System.Threading.Tasks; using Akka.Streams.Kafka.Dsl; -using Akka.Streams.Kafka.Stages.Consumers; using Confluent.Kafka; namespace Akka.Streams.Kafka.Messages @@ -29,108 +26,4 @@ public CommittableMessage(ConsumeResult record, ICommittableOffset commita /// public ICommittableOffset CommitableOffset { get; } } - - /// - /// Commit an offset that is included in a - /// If you need to store offsets in anything other than Kafka, this API - /// should not be used. - /// - public interface ICommittable - { - /// - /// Commits an offset that is included in a - /// - Task Commit(); - } - - /// - /// Included in . Makes it possible to - /// commit an offset or aggregate several offsets before committing. - /// Note that the offset position that is committed to Kafka will automatically - /// be one more than the `offset` of the message, because the committed offset - /// should be the next message your application will consume, - /// i.e. lastProcessedMessageOffset + 1. - /// - public interface ICommittableOffset : ICommittable - { - /// - /// Offset value - /// - PartitionOffset Offset { get; } - } - - /// - /// Extends with some metadata - /// - public interface ICommittableOffsetMetadata : ICommittableOffset - { - /// - /// Cosumed record metadata - /// - string Metadata { get; } - } - - /// - /// Implementation of the offset, contained in . - /// Can be commited via method. - /// - internal class CommittableOffset : ICommittableOffsetMetadata - { - private readonly IInternalCommitter _committer; - - /// - /// Offset value - /// - public PartitionOffset Offset { get; } - /// - /// Cosumed record metadata - /// - public string Metadata { get; } - - public CommittableOffset(IInternalCommitter committer, PartitionOffset offset, string metadata) - { - _committer = committer; - Offset = offset; - Metadata = metadata; - } - - /// - /// Commits offset to Kafka - /// - public Task Commit() - { - return _committer.Commit(ImmutableList.Create(Offset)); - } - } - - /// - /// Offset position for a groupId, topic, partition. - /// - public class PartitionOffset - { - public PartitionOffset(string groupId, string topic, int partition, Offset offset) - { - GroupId = groupId; - Topic = topic; - Partition = partition; - Offset = offset; - } - - /// - /// Consumer's group Id - /// - public string GroupId { get; } - /// - /// Topic - /// - public string Topic { get; } - /// - /// Partition - /// - public int Partition { get; } - /// - /// Kafka partition offset value - /// - public Offset Offset { get; } - } } diff --git a/src/Akka.Streams.Kafka/Messages/CommittableOffset.cs b/src/Akka.Streams.Kafka/Messages/CommittableOffset.cs new file mode 100644 index 00000000..8b12b5ad --- /dev/null +++ b/src/Akka.Streams.Kafka/Messages/CommittableOffset.cs @@ -0,0 +1,43 @@ +using System.Collections.Immutable; +using System.Threading.Tasks; +using Akka.Streams.Kafka.Stages.Consumers; + +namespace Akka.Streams.Kafka.Messages +{ + /// + /// Implementation of the offset, contained in . + /// Can be commited via method. + /// + internal sealed class CommittableOffset : ICommittableOffsetMetadata + { + /// + public long BatchSize => 1; + /// + /// Offset value + /// + public GroupTopicPartitionOffset Offset { get; } + /// + /// Cosumed record metadata + /// + public string Metadata { get; } + /// + /// Committer + /// + public IInternalCommitter Committer { get; } + + public CommittableOffset(IInternalCommitter committer, GroupTopicPartitionOffset offset, string metadata) + { + Committer = committer; + Offset = offset; + Metadata = metadata; + } + + /// + /// Commits offset to Kafka + /// + public Task Commit() + { + return Committer.Commit(ImmutableList.Create(Offset)); + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Messages/CommittableOffsetBatch.cs b/src/Akka.Streams.Kafka/Messages/CommittableOffsetBatch.cs new file mode 100644 index 00000000..da71859b --- /dev/null +++ b/src/Akka.Streams.Kafka/Messages/CommittableOffsetBatch.cs @@ -0,0 +1,152 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Threading.Tasks; +using Akka.Streams.Kafka.Extensions; +using Akka.Streams.Kafka.Stages.Consumers; + +namespace Akka.Streams.Kafka.Messages +{ + /// + /// Stores committable offsets batch and allows to commit them with method + /// + internal sealed class CommittableOffsetBatch : ICommittableOffsetBatch + { + /// + /// CommittableOffsetBatch + /// + public CommittableOffsetBatch(IImmutableDictionary offsetsAndMetadata, + IImmutableDictionary committers, + long batchSize) + { + OffsetsAndMetadata = offsetsAndMetadata; + Committers = committers; + BatchSize = batchSize; + } + + /// + public long BatchSize { get; } + + /// + public IImmutableSet Offsets => OffsetsAndMetadata.Select(o => new GroupTopicPartitionOffset(o.Key, o.Value.Offset)).ToImmutableHashSet(); + + /// + /// Committers + /// + public IImmutableDictionary Committers { get; } + + /// + /// Offsets and metadata + /// + public IImmutableDictionary OffsetsAndMetadata { get; } + + /// + /// Create empty offset batch + /// + public static ICommittableOffsetBatch Empty => new CommittableOffsetBatch(ImmutableDictionary.Empty, + ImmutableDictionary.Empty, + 0); + + /// + /// Create an offset batch out of a first offsets. + /// + public static ICommittableOffsetBatch Create(ICommittableOffset offset) => Empty.Updated(offset); + /// + /// Create an offset batch out of a list of offsets. + /// + public static ICommittableOffsetBatch Create(IEnumerable offsets) + { + return offsets.Aggregate(Empty, (batch, offset) => batch.Updated(offset)); + } + + /// + public async Task Commit() + { + if (Offsets.IsEmpty() || Committers.IsEmpty()) + return; + + await Committers.First().Value.Commit(this); + } + + /// + public ICommittableOffsetBatch Updated(ICommittable offset) + { + switch (offset) + { + case ICommittableOffset committableOffset: + return UpdateWithOffset(committableOffset); + break; + case ICommittableOffsetBatch committableOffsetBatch: + return UpdateWithBatch(committableOffsetBatch); + default: + throw new AggregateException($"Unexpected offset to update committable batch offsets from: {offset.GetType().Name}"); + } + } + + /// + /// Adds offsets from given committable batch to existing ones + /// + private ICommittableOffsetBatch UpdateWithBatch(ICommittableOffsetBatch committableOffsetBatch) + { + if (!(committableOffsetBatch is CommittableOffsetBatch committableOffsetBatchImpl)) + throw new ArgumentException($"Unexpected CommittableOffsetBatch, got {committableOffsetBatch.GetType().Name}, expected {nameof(CommittableOffsetBatch)}"); + + var newOffsetsAndMetdata = OffsetsAndMetadata.SetItems(committableOffsetBatchImpl.OffsetsAndMetadata); + var newCommitters = committableOffsetBatchImpl.Committers.Aggregate(Committers, (committers, pair) => + { + var groupId = pair.Key; + var committer = pair.Value; + if (committers.TryGetValue(groupId, out var groupCommitter)) + { + if (!groupCommitter.Equals(committer)) + { + throw new ArgumentException($"CommittableOffsetBatch {committableOffsetBatch} committer for groupId {groupId} " + + $"must be same as the other with this groupId."); + } + + return committers; + } + else + { + return committers.Add(groupId, committer); + } + }).ToImmutableDictionary(pair => pair.Key, pair => pair.Value); + + return new CommittableOffsetBatch(newOffsetsAndMetdata, newCommitters, BatchSize + committableOffsetBatchImpl.BatchSize); + } + + /// + /// Adds committable offset to existing ones + /// + private ICommittableOffsetBatch UpdateWithOffset(ICommittableOffset committableOffset) + { + var partitionOffset = committableOffset.Offset; + var metadata = (committableOffset is ICommittableOffsetMetadata withMetadata) ? withMetadata.Metadata : string.Empty; + + var newOffsets = OffsetsAndMetadata.SetItem(partitionOffset.GroupTopicPartition, new OffsetAndMetadata(partitionOffset.Offset, metadata)); + var committer = committableOffset is CommittableOffset c + ? c.Committer + : throw new ArgumentException($"Unknown committable offset, got {committableOffset.GetType().Name}, expected {nameof(committableOffset)}"); + + + IImmutableDictionary newCommitters = ImmutableDictionary.Empty; + if (Committers.TryGetValue(partitionOffset.GroupId, out var groupCommitter)) + { + if (!groupCommitter.Equals(committer)) + { + throw new ArgumentException($"CommittableOffset {committableOffset} committer for groupId {partitionOffset.GroupId} " + + $"must be same as the other with this groupId."); + } + + newCommitters = Committers; + } + else + { + Committers.SetItem(partitionOffset.GroupId, committer); + } + + return new CommittableOffsetBatch(newOffsets, newCommitters, BatchSize + 1); + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Messages/Envelope.cs b/src/Akka.Streams.Kafka/Messages/Envelope.cs new file mode 100644 index 00000000..cb311b57 --- /dev/null +++ b/src/Akka.Streams.Kafka/Messages/Envelope.cs @@ -0,0 +1,7 @@ +namespace Akka.Streams.Kafka.Messages +{ + public interface IEnvelope + { + // TODO + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Messages/GroupTopicPartitionOffset.cs b/src/Akka.Streams.Kafka/Messages/GroupTopicPartitionOffset.cs new file mode 100644 index 00000000..7bc29411 --- /dev/null +++ b/src/Akka.Streams.Kafka/Messages/GroupTopicPartitionOffset.cs @@ -0,0 +1,153 @@ +using System; +using Confluent.Kafka; + +namespace Akka.Streams.Kafka.Messages +{ + /// + /// Offset position for a groupId, topic, partition. + /// + public sealed class GroupTopicPartitionOffset : IEquatable + { + /// + /// GroupTopicPartitionOffset + /// + public GroupTopicPartitionOffset(string groupId, string topic, int partition, Offset offset) + { + GroupId = groupId; + Topic = topic; + Partition = partition; + Offset = offset; + } + + /// + /// GroupTopicPartitionOffset + /// + public GroupTopicPartitionOffset(GroupTopicPartition groupTopicPartition, Offset offset) + : this(groupTopicPartition.GroupId, groupTopicPartition.Topic, groupTopicPartition.Partition, offset) + { + } + + /// + /// Consumer's group Id + /// + public string GroupId { get; } + /// + /// Topic + /// + public string Topic { get; } + /// + /// Partition + /// + public int Partition { get; } + /// + /// Kafka partition offset value + /// + public Offset Offset { get; } + /// + /// Group topic partition info + /// + public GroupTopicPartition GroupTopicPartition => new GroupTopicPartition(GroupId, Topic, Partition); + + public bool Equals(GroupTopicPartitionOffset other) + { + if (ReferenceEquals(null, other)) return false; + if (ReferenceEquals(this, other)) return true; + return GroupId == other.GroupId && Topic == other.Topic && Partition == other.Partition && Offset.Equals(other.Offset); + } + + public override bool Equals(object obj) => ReferenceEquals(this, obj) || obj is GroupTopicPartitionOffset other && Equals(other); + + public override int GetHashCode() + { + unchecked + { + var hashCode = (GroupId != null ? GroupId.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ (Topic != null ? Topic.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ Partition; + hashCode = (hashCode * 397) ^ Offset.GetHashCode(); + return hashCode; + } + } + } + + /// + /// Group, topic and partition info + /// + public sealed class GroupTopicPartition : IEquatable + { + public GroupTopicPartition(string groupId, string topic, int partition) + { + GroupId = groupId; + Topic = topic; + Partition = partition; + } + + /// + /// Consumer's group Id + /// + public string GroupId { get; } + /// + /// Topic + /// + public string Topic { get; } + /// + /// Partition + /// + public int Partition { get; } + + public bool Equals(GroupTopicPartition other) + { + if (ReferenceEquals(null, other)) return false; + if (ReferenceEquals(this, other)) return true; + return GroupId == other.GroupId && Topic == other.Topic && Partition == other.Partition; + } + + public override bool Equals(object obj) => ReferenceEquals(this, obj) || obj is GroupTopicPartition other && Equals(other); + + public override int GetHashCode() + { + unchecked + { + var hashCode = (GroupId != null ? GroupId.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ (Topic != null ? Topic.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ Partition; + return hashCode; + } + } + } + + public sealed class OffsetAndMetadata : IEquatable + { + public OffsetAndMetadata(Offset offset, string metadata) + { + Offset = offset; + Metadata = metadata; + } + + /// + /// Kafka partition offset value + /// + public Offset Offset { get; } + /// + /// Metadata + /// + public string Metadata { get; } + + public bool Equals(OffsetAndMetadata other) + { + if (ReferenceEquals(null, other)) return false; + if (ReferenceEquals(this, other)) return true; + return Offset.Equals(other.Offset) && Metadata == other.Metadata; + } + + public override bool Equals(object obj) => ReferenceEquals(this, obj) || obj is OffsetAndMetadata other && Equals(other); + + public override int GetHashCode() + { + unchecked + { + return (Offset.GetHashCode() * 397) ^ (Metadata != null ? Metadata.GetHashCode() : 0); + } + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Settings/CommitterSettings.cs b/src/Akka.Streams.Kafka/Settings/CommitterSettings.cs new file mode 100644 index 00000000..5c9be412 --- /dev/null +++ b/src/Akka.Streams.Kafka/Settings/CommitterSettings.cs @@ -0,0 +1,83 @@ +using System; +using Akka.Actor; +using Akka.Configuration; + +namespace Akka.Streams.Kafka.Settings +{ + /// + /// Settings for committer. See 'akka.kafka.committer' section in reference.conf. + /// + public sealed class CommitterSettings + { + /// + /// CommitterSettings + /// + /// Max commit batch size + /// Max commit interval + /// Level of parallelism + public CommitterSettings(int maxBatch, TimeSpan maxInterval, int parallelism) + { + MaxBatch = maxBatch; + MaxInterval = maxInterval; + Parallelism = parallelism; + } + + /// + /// Creates committer settings + /// + /// Actor system for stage materialization + /// Committer settings + public static CommitterSettings Create(ActorSystem system) + { + var config = system.Settings.Config.GetConfig("akka.kafka.committer"); + return Create(config); + } + + /// + /// Creates committer settings + /// + /// Config to load properties from + /// Committer settings + public static CommitterSettings Create(Config config) + { + var maxBatch = config.GetInt("max-batch"); + var maxInterval = config.GetTimeSpan("max-interval"); + var parallelism = config.GetInt("parallelism"); + return new CommitterSettings(maxBatch, maxInterval, parallelism); + } + + /// + /// Max commit batch size + /// + public int MaxBatch { get; } + /// + /// Max commit interval + /// + public TimeSpan MaxInterval { get; } + /// + /// Level of parallelism + /// + public int Parallelism { get; } + + /// + /// Sets max batch size + /// + public CommitterSettings WithMaxBatch(int maxBatch) => Copy(maxBatch: maxBatch); + /// + /// Sets max commit interval + /// + public CommitterSettings WithMaxInterval(TimeSpan maxInterval) => Copy(maxInterval: maxInterval); + /// + /// Sets parallelism level + /// + public CommitterSettings WithParallelism(int parallelism) => Copy(parallelism: parallelism); + + private CommitterSettings Copy(int? maxBatch = null, TimeSpan? maxInterval = null, int? parallelism = null) + { + return new CommitterSettings( + maxBatch: maxBatch ?? MaxBatch, + maxInterval: maxInterval ?? MaxInterval, + parallelism: parallelism ?? Parallelism); + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs index a92f8329..a115ccb3 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Committers.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Akka.Actor; using Akka.Dispatch; +using Akka.Pattern; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Stages.Consumers.Actors; using Akka.Streams.Kafka.Stages.Consumers.Exceptions; @@ -20,7 +21,11 @@ internal interface IInternalCommitter /// /// Commit all offsets (of different topics) belonging to the same stage /// - Task Commit(ImmutableList offsets); + Task Commit(ImmutableList offsets); + /// + /// Commit offsets in batch + /// + Task Commit(ICommittableOffsetBatch batch); } /// @@ -38,10 +43,8 @@ public KafkaAsyncConsumerCommitter(Func consumerActorFactory, TimeSpa _consumerActor = new Lazy(consumerActorFactory); } - /// - /// Commits specified offsets - /// - public Task Commit(ImmutableList offsets) + /// + public Task Commit(ImmutableList offsets) { var topicPartitionOffsets = offsets.Select(offset => new TopicPartitionOffset(offset.Topic, offset.Partition, offset.Offset + 1)).ToImmutableHashSet(); @@ -60,5 +63,21 @@ public Task Commit(ImmutableList offsets) } }); } + + /// + public async Task Commit(ICommittableOffsetBatch batch) + { + if (!(batch is CommittableOffsetBatch batchImpl)) + throw new ArgumentException($"Unknown CommittableOffsetBatch, got {batch.GetType().FullName}, but expected {nameof(CommittableOffsetBatch)}"); + + await Task.WhenAll(batchImpl.OffsetsAndMetadata.GroupBy(o => o.Key.GroupId).Select(group => + { + if (!batchImpl.Committers.TryGetValue(group.Key, out var committer)) + throw new IllegalStateException($"Unknown committer, got groupId = {group.Key}"); + + var offsets = group.Select(offset => new GroupTopicPartitionOffset(offset.Key, offset.Value.Offset)).ToImmutableList(); + return committer.Commit(offsets); + })); + } } } \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/SourceWithOffsetContextStage.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/SourceWithOffsetContextStage.cs new file mode 100644 index 00000000..591ab881 --- /dev/null +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Concrete/SourceWithOffsetContextStage.cs @@ -0,0 +1,65 @@ +using System; +using System.Threading.Tasks; +using Akka.Streams.Kafka.Dsl; +using Akka.Streams.Kafka.Messages; +using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Stages.Consumers.Abstract; +using Akka.Streams.Stage; +using Confluent.Kafka; + +namespace Akka.Streams.Kafka.Stages.Consumers.Concrete +{ + /// + /// This stage is used for + /// + /// The key type + /// The value type + internal class SourceWithOffsetContextStage : KafkaSourceStage, ICommittableOffset)> + { + /// + /// Method for extracting string metadata from consumed record + /// + private readonly Func, string> _metadataFromMessage; + + /// + /// Consumer settings + /// + public ConsumerSettings Settings { get; } + /// + /// Subscription + /// + public ISubscription Subscription { get; } + + /// + /// CommittableSourceStage + /// + /// Consumer settings + /// Subscription to be used + /// Function to extract string metadata from consumed message + public SourceWithOffsetContextStage(ConsumerSettings settings, ISubscription subscription, + Func, string> metadataFromMessage = null) + : base("SourceWithOffsetContext") + { + _metadataFromMessage = metadataFromMessage ?? (msg => string.Empty); + Settings = settings; + Subscription = subscription; + } + + /// + protected override GraphStageLogic Logic( + SourceShape<(ConsumeResult, ICommittableOffset)> shape, + TaskCompletionSource completion, + Attributes inheritedAttributes) + { + return new SingleSourceStageLogic, ICommittableOffset)>(shape, Settings, Subscription, + inheritedAttributes, completion, + GetMessageBuilder); + } + + private OffsetContextBuilder GetMessageBuilder(BaseSingleSourceLogic, ICommittableOffset)> logic) + { + var committer = new KafkaAsyncConsumerCommitter(() => logic.ConsumerActor, Settings.CommitTimeout); + return new OffsetContextBuilder(committer, Settings, _metadataFromMessage); + } + } +} diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs b/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs index 1cdf5fba..89ab5e73 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs @@ -50,7 +50,7 @@ internal abstract class CommittableMessageBuilderBase : IMessageBuilder public CommittableMessage CreateMessage(ConsumeResult record) { - var offset = new PartitionOffset(GroupId, record.Topic, record.Partition, record.Offset); + var offset = new GroupTopicPartitionOffset(GroupId, record.Topic, record.Partition, record.Offset); return new CommittableMessage(record, new CommittableOffset(Committer, offset, MetadataFromRecord(record))); } } @@ -81,4 +81,40 @@ public CommittableSourceMessageBuilder(IInternalCommitter committer, string grou /// public override string MetadataFromRecord(ConsumeResult record) => _metadataFromRecord(record); } + + /// + /// Message builder used by + /// + internal class OffsetContextBuilder : IMessageBuilder, ICommittableOffset)> + { + /// + /// Method for extracting string metadata from consumed record + /// + private readonly Func, string> _metadataFromMessage; + /// + /// Committed object + /// + public IInternalCommitter Committer { get; } + /// + /// Consumer group Id + /// + public string GroupId { get; } + + /// + /// OffsetContextBuilder + /// + public OffsetContextBuilder(IInternalCommitter committer, ConsumerSettings setting, Func, string> metadataFromMessage) + { + _metadataFromMessage = metadataFromMessage; + Committer = committer; + GroupId = setting.GroupId; + } + + /// + public (ConsumeResult, ICommittableOffset) CreateMessage(ConsumeResult record) + { + var offset = new GroupTopicPartitionOffset(GroupId, record.Topic, record.Partition, record.Offset); + return (record, new CommittableOffset(Committer, offset, _metadataFromMessage(record))); + } + } } \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/reference.conf b/src/Akka.Streams.Kafka/reference.conf index 559e0e9a..d40be9d8 100644 --- a/src/Akka.Streams.Kafka/reference.conf +++ b/src/Akka.Streams.Kafka/reference.conf @@ -40,3 +40,15 @@ akka.kafka.default-dispatcher { type = "Dispatcher" executor = "default-executor" } + +# Committer flows use this settings to make batch commits +akka.kafka.committer { + # Set maximum number of messages to commit at once + max-batch = 1000 + + # Set maximum interval between commits + max-interval = 10s + + # Set parallelism for async committing + parallelism = 1 +}