Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SourceWithOffsetContext #62

Merged
merged 14 commits into from
Oct 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public async Task AtMostOnceSource_Should_work()

probe.Request(10);

AwaitCondition(() => lastMessage.Task.IsCompletedSuccessfully, TimeSpan.FromSeconds(15));
AwaitCondition(() => lastMessage.Task.IsCompletedSuccessfully, TimeSpan.FromSeconds(30));

probe.Cancel();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Helpers;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Settings;
using Akka.Streams.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Kafka.Tests.Integration
{
public class SourceWithOffsetContextIntegrationTests : KafkaIntegrationTests
{
public SourceWithOffsetContextIntegrationTests(ITestOutputHelper output, KafkaFixture fixture)
: base(nameof(SourceWithOffsetContextIntegrationTests), output, fixture)
{
}

[Fact]
public async Task SourceWithOffsetContext_at_least_once_consuming_should_work()
{
var topic = CreateTopic(1);
var settings = CreateConsumerSettings<string>(CreateGroup(1));
var elementCount = 10;
var batchSize = 2;
var messages = Enumerable.Range(1, elementCount).ToList();

await ProduceStrings(topic, messages, ProducerSettings);

var committerSettings = CommitterSettings.WithMaxBatch(batchSize);

var (task, probe) = KafkaConsumer.SourceWithOffsetContext(settings, Subscriptions.Topics(topic))
.SelectAsync(10, message => Task.FromResult(Done.Instance))
.Via(Committer.FlowWithOffsetContext<Done>(committerSettings))
.AsSource()
.ToMaterialized(this.SinkProbe<Tuple<NotUsed, ICommittableOffsetBatch>>(), Keep.Both)
.Run(Materializer);

probe.Request(10);
var committedBatches = probe.Within(TimeSpan.FromSeconds(10), () => probe.ExpectNextN(elementCount / batchSize));

probe.Cancel();

AwaitCondition(() => task.IsCompletedSuccessfully, TimeSpan.FromSeconds(10));

committedBatches.Select(r => r.Item2).Sum(batch => batch.BatchSize).Should().Be(10);
}
}
}
5 changes: 5 additions & 0 deletions src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ protected ProducerSettings<Null, string> ProducerSettings
get => ProducerSettings<Null, string>.Create(Sys, null, null).WithBootstrapServers(_fixture.KafkaServer);
}

protected CommitterSettings CommitterSettings
{
get => CommitterSettings.Create(Sys);
}

protected ConsumerSettings<Null, TValue> CreateConsumerSettings<TValue>(string group)
{
return ConsumerSettings<Null, TValue>.Create(Sys, null, null)
Expand Down
22 changes: 22 additions & 0 deletions src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Annotations;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Helpers;
using Akka.Streams.Kafka.Settings;
using Akka.Streams.Kafka.Stages;
using Confluent.Kafka;
Expand Down Expand Up @@ -53,6 +55,26 @@ public static Source<CommittableMessage<K, V>, Task> CommittableSource<K, V>(Con
return Source.FromGraph(new CommittableSourceStage<K, V>(settings, subscription));
}

/// <summary>
/// API MAY CHANGE
///
/// This source emits <see cref="ConsumeResult{TKey,TValue}"/> together with the offset position as flow context, thus makes it possible
/// to commit offset positions to Kafka.
/// This is useful when "at-least once delivery" is desired, as each message will likely be
/// delivered one time but in failure cases could be duplicated.
///
/// It is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html),
/// <see cref="KafkaProducer.FlowWithContext{K,V,C}"/> and/or <see cref="Committer.SinkWithOffsetContext{E}"/>
/// </summary>
[ApiMayChange]
public static SourceWithContext<ICommittableOffset, ConsumeResult<K, V>, Task> SourceWithOffsetContext<K, V>(
ConsumerSettings<K, V> settings, ISubscription subscription, Func<ConsumeResult<K, V>, string> metadataFromRecord = null)
{
return Source.FromGraph(new SourceWithOffsetContextStage<K, V>(settings, subscription, metadataFromRecord))
.AsSourceWithContext(m => m.Item2)
.Select(m => m.Item1);
}

/// <summary>
/// The same as <see cref="PlainExternalSource{K,V}"/> but for offset commit support
/// </summary>
Expand Down
9 changes: 8 additions & 1 deletion src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Settings;
Expand Down Expand Up @@ -66,5 +67,11 @@ public static Flow<MessageAndMeta<TKey, TValue>, DeliveryReport<TKey, TValue>, N
? flow
: flow.WithAttributes(ActorAttributes.CreateDispatcher(settings.DispatcherId));
}

// TODO
public static FlowWithContext<IEnvelope<K, V, NotUsed>, C, DeliveryReport<K, V>, C, NotUsed> FlowWithContext<K, V, C>(ProducerSettings<K, V> settings)
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
{
throw new NotImplementedException();
}
}
}
75 changes: 75 additions & 0 deletions src/Akka.Streams.Kafka/Helpers/Committer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using System;
using System.Threading.Tasks;
using Akka.Annotations;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Settings;

namespace Akka.Streams.Kafka.Helpers
{
/// <summary>
/// Implements committing flows
/// </summary>
public static class Committer
{
/// <summary>
/// Batches offsets and commits them to Kafka, emits <see cref="CommittableOffsetBatch"/> for every committed batch.
/// </summary>
public static Flow<ICommittable, ICommittableOffsetBatch, NotUsed> BatchFlow(CommitterSettings settings)
{
return Akka.Streams.Dsl.Flow.Create<ICommittable>().GroupedWithin(settings.MaxBatch, settings.MaxInterval)
.Select(CommittableOffsetBatch.Create)
.SelectAsync(settings.Parallelism, async batch =>
{
await batch.Commit();
return batch;
});
}

/// <summary>
/// Batches offsets and commits them to Kafka, emits <see cref="Done.Instance"/> for every committed batch.
/// </summary>
public static Flow<ICommittable, Done, NotUsed> Flow(CommitterSettings settings)
{
return BatchFlow(settings).Select(_ => Done.Instance);
}

/// <summary>
/// API MAY CHANGE
///
/// Batches offsets from context and commits them to Kafka, emits no useful value,
/// but keeps the committed <see cref="ICommittableOffsetBatch"/> as context
/// </summary>
[ApiMayChange]
public static FlowWithContext<ICommittableOffset, E, ICommittableOffsetBatch, NotUsed, NotUsed> FlowWithOffsetContext<E>(CommitterSettings settings)
{
var value = Akka.Streams.Dsl.Flow.Create<Tuple<E, ICommittableOffset>>()
IgorFedchenko marked this conversation as resolved.
Show resolved Hide resolved
.Select(m => m.Item2 as ICommittable)
.Via(BatchFlow(settings))
.Select(b => Tuple.Create(NotUsed.Instance, b));

return FlowWithContext.From(value);
}

/// <summary>
/// Batches offsets and commits them to Kafka.
/// </summary>
public static Sink<ICommittable, Task> Sink(CommitterSettings settings)
{
return Flow(settings).ToMaterialized(Streams.Dsl.Sink.Ignore<Done>(), Keep.Right);
}

/// <summary>
/// API MAY CHANGE
///
/// Batches offsets from context and commits them to Kafka.
/// </summary>
[ApiMayChange]
public static Sink<Tuple<E, ICommittableOffset>, Task> SinkWithOffsetContext<E>(CommitterSettings settings)
{
return Akka.Streams.Dsl.Flow.Create<Tuple<E, ICommittableOffset>>()
.Via(FlowWithOffsetContext<E>(settings))
.ToMaterialized(Streams.Dsl.Sink.Ignore<Tuple<NotUsed, ICommittableOffsetBatch>>(), Keep.Right);
}
}
}
66 changes: 66 additions & 0 deletions src/Akka.Streams.Kafka/Messages/Committable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System.Collections.Immutable;
using System.Threading.Tasks;

namespace Akka.Streams.Kafka.Messages
{
/// <summary>
/// Commit an offset that is included in a <see cref="CommittableMessage{K,V}"/>
/// If you need to store offsets in anything other than Kafka, this API
/// should not be used.
/// </summary>
public interface ICommittable
IgorFedchenko marked this conversation as resolved.
Show resolved Hide resolved
{
/// <summary>
/// Commits an offset that is included in a <see cref="CommittableMessage{K,V}"/>
/// </summary>
Task Commit();
/// <summary>
/// Get a number of processed messages this committable contains
/// </summary>
long BatchSize { get; }
}

/// <summary>
/// For improved efficiency it is good to aggregate several <see cref="ICommittableOffset"/>,
/// using this class, befoe <see cref="ICommittable.Commit"/> them.
/// Start with
/// </summary>
public interface ICommittableOffsetBatch : ICommittable
{
/// <summary>
/// Add/overwrite an offset position from another committable.
/// </summary>
ICommittableOffsetBatch Updated(ICommittable offset);
/// <summary>
/// Get current offset positions
/// </summary>
IImmutableSet<GroupTopicPartitionOffset> Offsets { get; }
}

/// <summary>
/// Included in <see cref="CommittableMessage{K,V}"/>. Makes it possible to
/// commit an offset or aggregate several offsets before committing.
/// Note that the offset position that is committed to Kafka will automatically
/// be one more than the `offset` of the message, because the committed offset
/// should be the next message your application will consume,
/// i.e. lastProcessedMessageOffset + 1.
/// </summary>
public interface ICommittableOffset : ICommittable
{
/// <summary>
/// Offset value
/// </summary>
GroupTopicPartitionOffset Offset { get; }
}

/// <summary>
/// Extends <see cref="ICommittableOffset"/> with some metadata
/// </summary>
public interface ICommittableOffsetMetadata : ICommittableOffset
{
/// <summary>
/// Cosumed record metadata
/// </summary>
string Metadata { get; }
}
}
109 changes: 1 addition & 108 deletions src/Akka.Streams.Kafka/Messages/CommittableMessage.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Stages.Consumers;
using Confluent.Kafka;

namespace Akka.Streams.Kafka.Messages
Expand All @@ -29,108 +26,4 @@ public CommittableMessage(ConsumeResult<K, V> record, ICommittableOffset commita
/// </summary>
public ICommittableOffset CommitableOffset { get; }
}

/// <summary>
/// Commit an offset that is included in a <see cref="CommittableMessage{K,V}"/>
/// If you need to store offsets in anything other than Kafka, this API
/// should not be used.
/// </summary>
public interface ICommittable
{
/// <summary>
/// Commits an offset that is included in a <see cref="CommittableMessage{K,V}"/>
/// </summary>
Task Commit();
}

/// <summary>
/// Included in <see cref="CommittableMessage{K,V}"/>. Makes it possible to
/// commit an offset or aggregate several offsets before committing.
/// Note that the offset position that is committed to Kafka will automatically
/// be one more than the `offset` of the message, because the committed offset
/// should be the next message your application will consume,
/// i.e. lastProcessedMessageOffset + 1.
/// </summary>
public interface ICommittableOffset : ICommittable
{
/// <summary>
/// Offset value
/// </summary>
PartitionOffset Offset { get; }
}

/// <summary>
/// Extends <see cref="ICommittableOffset"/> with some metadata
/// </summary>
public interface ICommittableOffsetMetadata : ICommittableOffset
{
/// <summary>
/// Cosumed record metadata
/// </summary>
string Metadata { get; }
}

/// <summary>
/// Implementation of the offset, contained in <see cref="CommittableMessage{K,V}"/>.
/// Can be commited via <see cref="Commit"/> method.
/// </summary>
internal class CommittableOffset : ICommittableOffsetMetadata
{
private readonly IInternalCommitter _committer;

/// <summary>
/// Offset value
/// </summary>
public PartitionOffset Offset { get; }
/// <summary>
/// Cosumed record metadata
/// </summary>
public string Metadata { get; }

public CommittableOffset(IInternalCommitter committer, PartitionOffset offset, string metadata)
{
_committer = committer;
Offset = offset;
Metadata = metadata;
}

/// <summary>
/// Commits offset to Kafka
/// </summary>
public Task Commit()
{
return _committer.Commit(ImmutableList.Create(Offset));
}
}

/// <summary>
/// Offset position for a groupId, topic, partition.
/// </summary>
public class PartitionOffset
{
public PartitionOffset(string groupId, string topic, int partition, Offset offset)
{
GroupId = groupId;
Topic = topic;
Partition = partition;
Offset = offset;
}

/// <summary>
/// Consumer's group Id
/// </summary>
public string GroupId { get; }
/// <summary>
/// Topic
/// </summary>
public string Topic { get; }
/// <summary>
/// Partition
/// </summary>
public int Partition { get; }
/// <summary>
/// Kafka partition offset value
/// </summary>
public Offset Offset { get; }
}
}
Loading