Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External plain source #56

Merged
merged 17 commits into from
Sep 19, 2019

Conversation

IgorFedchenko
Copy link
Contributor

One more step to resolve issue #36 - this PR adds ExternalPlainSource stage.

Basically, this stage allows to make use of some existing consuming actor, and get messages from him instead of creating it's own consuming actor.

Implementation is pretty simple, but while adding test coverage we'll need to make sure that Pause()`Resume()` scenario is tested (issue #55 )

Close #55

It is work in progress for now (need more tests), that's why making it a draft PR.

@IgorFedchenko
Copy link
Contributor Author

@Aaronontheweb Please keep in mind that this PR is based on #53 and #54 PRs, so we would better have them merged first.

@Aaronontheweb
Copy link
Member

@IgorFedchenko will do - letting CI run for those now.

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a question

@IgorFedchenko
Copy link
Contributor Author

While adding and verifying pause/resume test, I discovered one more pretty serious issue which may lead to message loss when stages subscribed to multiple partitions.
I want to move this issue fix to separate PR, because technically this PR works and implements ExternalPlainSource stage well. And there is also a separate test to check pause/resume behavior, which also passes now.
So I will put some comments over changes here and move this PR to review.

Copy link
Contributor Author

@IgorFedchenko IgorFedchenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb Seems like this PR is ready for your review now.

docker-compose.yml Show resolved Hide resolved
@@ -18,41 +18,9 @@ namespace Akka.Streams.Kafka.Tests.Integration
{
public class CommittableSourceIntegrationTests : KafkaIntegrationTests
{
private const string InitialMsg = "initial msg in topic, required to create the topic before any consumer subscribes to it";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All common setup is moved to base KafkaIntegrationTests class

@@ -82,6 +82,7 @@ await CreateContainer(ZookeeperImageName, ZookeeperImageTag, _zookeeperContainer
await CreateContainer(KafkaImageName, KafkaImageTag, _kafkaContainerName, KafkaPort, new Dictionary<string, string>()
{
["KAFKA_BROKER_ID"] = "1",
["KAFKA_NUM_PARTITIONS"] = "3",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update docker container configuration as well

/// Generally this should not be used from outside of the library.
/// </summary>
[InternalApi]
public class KafkaConsumerActorMetadata
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has to be public to be available in tests

@IgorFedchenko IgorFedchenko marked this pull request as ready for review September 18, 2019 22:12
@IgorFedchenko
Copy link
Contributor Author

@Aaronontheweb There is also one more PR #59 that is based on this one, so after merging this you can review that one. It addresses issue #58 which was discovered during testing.

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

private readonly KafkaFixture _fixture;
protected IMaterializer Materializer { get; }

public KafkaIntegrationTests(string actorSystemName, ITestOutputHelper output, KafkaFixture fixture)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to move all of this boilerplate code to the KafkaIntegrationTests fixture so it can be re-used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb Well, KafkaIntegrationTests is a base class for all kafka tests by itself, so all of this code is going to be reused. Maybe would be nice to make it abstract by the way... Is it what you were talking about?

@Aaronontheweb Aaronontheweb merged commit a98057d into akkadotnet:dev Sep 19, 2019
@IgorFedchenko IgorFedchenko deleted the external_plain_source branch September 20, 2019 07:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add test coverage for Pause/Resume partitions scenario
2 participants