Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External plain source #56

Merged
merged 17 commits into from
Sep 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker-compose-up.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker-compose up
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ services:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_NUM_PARTITIONS: 3
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,9 @@ namespace Akka.Streams.Kafka.Tests.Integration
{
public class CommittableSourceIntegrationTests : KafkaIntegrationTests
{
private const string InitialMsg = "initial msg in topic, required to create the topic before any consumer subscribes to it";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All common setup is moved to base KafkaIntegrationTests class

private readonly KafkaFixture _fixture;
private readonly ActorMaterializer _materializer;

public CommittableSourceIntegrationTests(ITestOutputHelper output, KafkaFixture fixture)
: base(nameof(CommittableSourceIntegrationTests), output)
{
_fixture = fixture;
_materializer = Sys.Materializer();
}

private string Uuid { get; } = Guid.NewGuid().ToString();

private string CreateTopic(int number) => $"topic-{number}-{Uuid}";
private string CreateGroup(int number) => $"group-{number}-{Uuid}";

private ProducerSettings<Null, string> ProducerSettings
{
get => ProducerSettings<Null, string>.Create(Sys, null, null).WithBootstrapServers(_fixture.KafkaServer);
}

private async Task GivenInitializedTopic(string topic)
{
using (var producer = ProducerSettings.CreateKafkaProducer())
{
await producer.ProduceAsync(topic, new Message<Null, string> { Value = InitialMsg });
}
}

private ConsumerSettings<Null, string> CreateConsumerSettings(string group)
: base(nameof(CommittableSourceIntegrationTests), output, fixture)
{
return ConsumerSettings<Null, string>.Create(Sys, null, null)
.WithBootstrapServers(_fixture.KafkaServer)
.WithProperty("auto.offset.reset", "earliest")
.WithGroupId(group);
}

[Fact]
Expand All @@ -61,21 +29,22 @@ public async Task CommitableSource_consumes_messages_from_Producer_without_commi
int elementsCount = 100;
var topic1 = CreateTopic(1);
var group1 = CreateGroup(1);
var topicPartition1 = new TopicPartition(topic1, 0);

await GivenInitializedTopic(topic1);
await GivenInitializedTopic(topicPartition1);
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved

await Source
.From(Enumerable.Range(1, elementsCount))
.Select(elem => new MessageAndMeta<Null, string> { Topic = topic1, Message = new Message<Null, string> { Value = elem.ToString() } })
.RunWith(KafkaProducer.PlainSink(ProducerSettings), _materializer);
.Select(elem => new MessageAndMeta<Null, string> { TopicPartition = topicPartition1, Message = new Message<Null, string> { Value = elem.ToString() } })
.RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer);

var consumerSettings = CreateConsumerSettings(group1);
var consumerSettings = CreateConsumerSettings<string>(group1);

var probe = KafkaConsumer
.CommittableSource(consumerSettings, Subscriptions.Assignment(new TopicPartition(topic1, 0)))
.CommittableSource(consumerSettings, Subscriptions.Assignment(topicPartition1))
.Where(c => !c.Record.Value.Equals(InitialMsg))
.Select(c => c.Record.Value)
.RunWith(this.SinkProbe<string>(), _materializer);
.RunWith(this.SinkProbe<string>(), Materializer);

probe.Request(elementsCount);
foreach (var i in Enumerable.Range(1, elementsCount).Select(c => c.ToString()))
Expand All @@ -88,20 +57,21 @@ await Source
public async Task CommitableSource_resume_from_commited_offset()
{
var topic1 = CreateTopic(1);
var topicPartition1 = new TopicPartition(topic1, 0);
var group1 = CreateGroup(1);
var group2 = CreateGroup(2);

await GivenInitializedTopic(topic1);
await GivenInitializedTopic(topicPartition1);

await Source
.From(Enumerable.Range(1, 100))
.Select(elem => new MessageAndMeta<Null, string> { Topic = topic1, Message = new Message<Null, string> { Value = elem.ToString() } })
.RunWith(KafkaProducer.PlainSink(ProducerSettings), _materializer);
.Select(elem => new MessageAndMeta<Null, string> { TopicPartition = topicPartition1, Message = new Message<Null, string> { Value = elem.ToString() } })
.RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer);

var consumerSettings = CreateConsumerSettings(group1);
var consumerSettings = CreateConsumerSettings<string>(group1);
var committedElements = new ConcurrentQueue<string>();

var (task, probe1) = KafkaConsumer.CommittableSource(consumerSettings, Subscriptions.Assignment(new TopicPartition(topic1, 0)))
var (task, probe1) = KafkaConsumer.CommittableSource(consumerSettings, Subscriptions.Assignment(topicPartition1))
.WhereNot(c => c.Record.Value == InitialMsg)
.SelectAsync(10, async elem =>
{
Expand All @@ -110,7 +80,7 @@ await Source
return Done.Instance;
})
.ToMaterialized(this.SinkProbe<Done>(), Keep.Both)
.Run(_materializer);
.Run(Materializer);

probe1.Request(25);

Expand All @@ -125,16 +95,16 @@ await Source

var probe2 = KafkaConsumer.CommittableSource(consumerSettings, Subscriptions.Assignment(new TopicPartition(topic1, 0)))
.Select(_ => _.Record.Value)
.RunWith(this.SinkProbe<string>(), _materializer);
.RunWith(this.SinkProbe<string>(), Materializer);

// Note that due to buffers and SelectAsync(10) the committed offset is more
// than 26, and that is not wrong

// some concurrent publish
await Source
.From(Enumerable.Range(101, 100))
.Select(elem => new MessageAndMeta<Null, string> { Topic = topic1, Message = new Message<Null, string> { Value = elem.ToString() } })
.RunWith(KafkaProducer.PlainSink(ProducerSettings), _materializer);
.Select(elem => new MessageAndMeta<Null, string> { TopicPartition = topicPartition1, Message = new Message<Null, string> { Value = elem.ToString() } })
.RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer);

probe2.Request(100);
foreach (var i in Enumerable.Range(committedElements.Count + 1, 100).Select(c => c.ToString()))
Expand All @@ -146,7 +116,7 @@ await Source
var probe3 = KafkaConsumer.CommittableSource(consumerSettings.WithGroupId(group2), Subscriptions.Assignment(new TopicPartition(topic1, 0)))
.WhereNot(c => c.Record.Value == InitialMsg)
.Select(_ => _.Record.Value)
.RunWith(this.SinkProbe<string>(), _materializer);
.RunWith(this.SinkProbe<string>(), Materializer);

probe3.Request(100);
foreach (var i in Enumerable.Range(1, 100).Select(c => c.ToString()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Settings;
using Akka.Streams.Kafka.Stages.Consumers.Actors;
using Akka.Streams.TestKit;
using Akka.TestKit;
using Akka.Util.Internal;
using Confluent.Kafka;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Kafka.Tests.Integration
{
public class ExternalPlainSourceIntegrationTests : KafkaIntegrationTests
{
public ExternalPlainSourceIntegrationTests(ITestOutputHelper output, KafkaFixture fixture)
: base(nameof(ExternalPlainSourceIntegrationTests), output, fixture)
{
}

protected Tuple<Task, TestSubscriber.Probe<TValue>> CreateProbe<TValue>(IActorRef consumer, IManualSubscription sub)
{
return KafkaConsumer
.PlainExternalSource<Null, TValue>(consumer, sub)
.Where(c => !c.Value.Equals(InitialMsg))
.Select(c => c.Value)
.ToMaterialized(this.SinkProbe<TValue>(), Keep.Both)
.Run(Materializer);
}

[Fact]
public async Task ExternalPlainSource_with_external_consumer_Should_work()
{
var elementsCount = 10;
var topic = CreateTopic(1);
var group = CreateGroup(1);

//Consumer is represented by actor
var consumer = Sys.ActorOf(KafkaConsumerActorMetadata.GetProps(CreateConsumerSettings<string>(group)));

//Manually assign topic partition to it
var (partitionTask1, probe1) = CreateProbe<string>(consumer, Subscriptions.Assignment(new TopicPartition(topic, 0)));
var (partitionTask2, probe2) = CreateProbe<string>(consumer, Subscriptions.Assignment(new TopicPartition(topic, 1)));

// Produce messages to partitions
await ProduceStrings(new TopicPartition(topic, new Partition(0)), Enumerable.Range(1, elementsCount), ProducerSettings);
await ProduceStrings(new TopicPartition(topic, new Partition(1)), Enumerable.Range(1, elementsCount), ProducerSettings);

// Request for produced messages and consume them
probe1.Request(elementsCount);
probe2.Request(elementsCount);
probe1.Within(TimeSpan.FromSeconds(10), () => probe1.ExpectNextN(elementsCount));
probe2.Within(TimeSpan.FromSeconds(10), () => probe2.ExpectNextN(elementsCount));

// Stop stages
probe1.Cancel();
probe2.Cancel();

// Make sure stages are stopped gracefully
AwaitCondition(() => partitionTask1.IsCompletedSuccessfully && partitionTask2.IsCompletedSuccessfully);

// Cleanup
consumer.Tell(new KafkaConsumerActorMetadata.Internal.Stop(), ActorRefs.NoSender);
}

[Fact]
public async Task ExternalPlainSource_should_be_stopped_on_serialization_error_only_when_requested_messages()
{
var topic = CreateTopic(1);
var group = CreateGroup(1);

// Make consumer expect numeric messages
var settings = CreateConsumerSettings<int>(group).WithValueDeserializer(Deserializers.Int32);
var consumer = Sys.ActorOf(KafkaConsumerActorMetadata.GetProps(settings));

// Subscribe to partitions
var (partitionTask1, probe1) = CreateProbe<int>(consumer, Subscriptions.Assignment(new TopicPartition(topic, 0)));
var (partitionTask2, probe2) = CreateProbe<int>(consumer, Subscriptions.Assignment(new TopicPartition(topic, 1)));
var (partitionTask3, probe3) = CreateProbe<int>(consumer, Subscriptions.Assignment(new TopicPartition(topic, 2)));

// request from 2 streams
probe1.Request(1);
probe2.Request(1);
await Task.Delay(500); // To establish demand

// Send string messages
await ProduceStrings(new TopicPartition(topic, 0), new int[] { 1 }, ProducerSettings);
await ProduceStrings(new TopicPartition(topic, 1), new int[] { 1 }, ProducerSettings);

// First two stages should fail, and only stage without demand should keep going
probe1.ExpectError().Should().BeOfType<SerializationException>();
probe2.ExpectError().Should().BeOfType<SerializationException>();
probe3.Cancel();

// Make sure source tasks finish accordingly
AwaitCondition(() => partitionTask1.IsFaulted && partitionTask2.IsFaulted && partitionTask3.IsCompletedSuccessfully);

// Cleanup
consumer.Tell(new KafkaConsumerActorMetadata.Internal.Stop(), ActorRefs.NoSender);
}

[Fact]
public async Task ExternalPlainSource_verify_consuming_actor_pause_resume_partitions_works_fine()
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
{
var topic = CreateTopic(1);
var group = CreateGroup(1);

// Create consumer actor
var consumer = Sys.ActorOf(KafkaConsumerActorMetadata.GetProps(CreateConsumerSettings<string>(group)));

// Send one message per each partition
await ProduceStrings(new TopicPartition(topic, 0), Enumerable.Range(1, 100), ProducerSettings);
await ProduceStrings(new TopicPartition(topic, 1), Enumerable.Range(1, 100), ProducerSettings);

// Subscribe to partitions
var (partitionTask1, probe1) = CreateProbe<string>(consumer, Subscriptions.Assignment(new TopicPartition(topic, 0)));
var (partitionTask2, probe2) = CreateProbe<string>(consumer, Subscriptions.Assignment(new TopicPartition(topic, 1)));

var probes = new[] { probe1, probe2 };

// All partitions resumed
probes.ForEach(p => p.Request(1));
probes.ForEach(p => p.ExpectNext(TimeSpan.FromSeconds(10)));

await Task.Delay(1000); // All partitions become paused when now demand

// Make resumed and second paused
probe1.Request(1);
probe1.ExpectNext(TimeSpan.FromSeconds(10));

await Task.Delay(1000); // All partitions become paused when now demand

// Make second resumed and first paused
probe2.Request(1);
probe2.ExpectNext(TimeSpan.FromSeconds(10));

await Task.Delay(1000); // All partitions become paused when now demand

// All partitions resumed back
probes.ForEach(p => p.Request(1));
probes.ForEach(p => p.ExpectNext(TimeSpan.FromSeconds(10)));

// Stop and check gracefull shutdown
probes.ForEach(p => p.Cancel());
AwaitCondition(() => partitionTask1.IsCompletedSuccessfully && partitionTask2.IsCompletedSuccessfully);

// Cleanup
consumer.Tell(new KafkaConsumerActorMetadata.Internal.Stop(), ActorRefs.NoSender);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,63 +18,32 @@ namespace Akka.Streams.Kafka.Tests.Integration
{
public class PlainSinkIntegrationTests : KafkaIntegrationTests
{
private readonly KafkaFixture _fixture;
private const string InitialMsg = "initial msg in topic, required to create the topic before any consumer subscribes to it";
private readonly ActorMaterializer _materializer;

private string Uuid { get; } = Guid.NewGuid().ToString();

private string CreateTopic(int number) => $"topic-{number}-{Uuid}";
private string CreateGroup(int number) => $"group-{number}-{Uuid}";

public PlainSinkIntegrationTests(ITestOutputHelper output, KafkaFixture fixture)
: base(null, output)
{
_fixture = fixture;
_materializer = Sys.Materializer();
}

private async Task GivenInitializedTopic(string topic)
{
using (var producer = ProducerSettings.CreateKafkaProducer())
{
await producer.ProduceAsync(topic, new Message<Null, string> { Value = InitialMsg });
}
}

private ProducerSettings<Null, string> ProducerSettings
{
get => ProducerSettings<Null, string>.Create(Sys, null, null).WithBootstrapServers(_fixture.KafkaServer);
}

private ConsumerSettings<Null, string> CreateConsumerSettings(string group)
: base(null, output, fixture)
{
return ConsumerSettings<Null, string>.Create(Sys, null, null)
.WithBootstrapServers(_fixture.KafkaServer)
.WithProperty("auto.offset.reset", "earliest")
.WithGroupId(group);
}

[Fact]
public async Task PlainSink_should_publish_100_elements_to_Kafka_producer()
{
var topic1 = CreateTopic(1);
var group1 = CreateGroup(1);
var topicPartition1 = new TopicPartition(topic1, 0);

await GivenInitializedTopic(topic1);
await GivenInitializedTopic(topicPartition1);

var consumerSettings = CreateConsumerSettings(group1);
var consumerSettings = CreateConsumerSettings<string>(group1);
var consumer = consumerSettings.CreateKafkaConsumer();
consumer.Assign(new List<TopicPartition> { new TopicPartition(topic1, 0) });
consumer.Assign(new List<TopicPartition> { topicPartition1 });

var task = new TaskCompletionSource<NotUsed>();
int messagesReceived = 0;

await Source
.From(Enumerable.Range(1, 100))
.Select(c => c.ToString())
.Select(elem => new MessageAndMeta<Null, string> { Topic = topic1, Message = new Message<Null, string> { Value = elem } })
.RunWith(KafkaProducer.PlainSink(ProducerSettings), _materializer);
.Select(elem => new MessageAndMeta<Null, string> { TopicPartition = topicPartition1, Message = new Message<Null, string> { Value = elem } })
.RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer);

var dateTimeStart = DateTime.UtcNow;

Expand Down Expand Up @@ -113,7 +82,7 @@ public async Task PlainSink_should_fail_stage_if_broker_unavailable()
.Select(c => c.ToString())
.Select(elem => new MessageAndMeta<Null, string> { Topic = topic1, Message = new Message<Null, string> { Value = elem } })
.Via(KafkaProducer.PlainFlow(config))
.RunWith(this.SinkProbe<DeliveryReport<Null, string>>(), _materializer);
.RunWith(this.SinkProbe<DeliveryReport<Null, string>>(), Materializer);

probe.ExpectSubscription();
probe.OnError(new KafkaException(ErrorCode.Local_Transport));
Expand Down
Loading