-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
External plain source #56
External plain source #56
Conversation
@Aaronontheweb Please keep in mind that this PR is based on #53 and #54 PRs, so we would better have them merged first. |
@IgorFedchenko will do - letting CI run for those now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a question
src/Akka.Streams.Kafka/Stages/Consumers/Abstract/ExternalSingleSourceLogic.cs
Show resolved
Hide resolved
While adding and verifying pause/resume test, I discovered one more pretty serious issue which may lead to message loss when stages subscribed to multiple partitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aaronontheweb Seems like this PR is ready for your review now.
@@ -18,41 +18,9 @@ namespace Akka.Streams.Kafka.Tests.Integration | |||
{ | |||
public class CommittableSourceIntegrationTests : KafkaIntegrationTests | |||
{ | |||
private const string InitialMsg = "initial msg in topic, required to create the topic before any consumer subscribes to it"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All common setup is moved to base KafkaIntegrationTests
class
src/Akka.Streams.Kafka.Tests/Integration/CommittableSourceIntegrationTests.cs
Show resolved
Hide resolved
src/Akka.Streams.Kafka.Tests/Integration/ExternalPlainSourceIntegrationTests.cs
Show resolved
Hide resolved
@@ -82,6 +82,7 @@ await CreateContainer(ZookeeperImageName, ZookeeperImageTag, _zookeeperContainer | |||
await CreateContainer(KafkaImageName, KafkaImageTag, _kafkaContainerName, KafkaPort, new Dictionary<string, string>() | |||
{ | |||
["KAFKA_BROKER_ID"] = "1", | |||
["KAFKA_NUM_PARTITIONS"] = "3", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to update docker container configuration as well
/// Generally this should not be used from outside of the library. | ||
/// </summary> | ||
[InternalApi] | ||
public class KafkaConsumerActorMetadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has to be public to be available in tests
@Aaronontheweb There is also one more PR #59 that is based on this one, so after merging this you can review that one. It addresses issue #58 which was discovered during testing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
src/Akka.Streams.Kafka.Tests/Integration/ExternalPlainSourceIntegrationTests.cs
Show resolved
Hide resolved
private readonly KafkaFixture _fixture; | ||
protected IMaterializer Materializer { get; } | ||
|
||
public KafkaIntegrationTests(string actorSystemName, ITestOutputHelper output, KafkaFixture fixture) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to move all of this boilerplate code to the KafkaIntegrationTests
fixture so it can be re-used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aaronontheweb Well, KafkaIntegrationTests
is a base class for all kafka tests by itself, so all of this code is going to be reused. Maybe would be nice to make it abstract
by the way... Is it what you were talking about?
One more step to resolve issue #36 - this PR adds ExternalPlainSource stage.
Basically, this stage allows to make use of some existing consuming actor, and get messages from him instead of creating it's own consuming actor.
Implementation is pretty simple, but while adding test coverage we'll need to make sure that
Pause()
`Resume()` scenario is tested (issue #55 )Close #55
It is work in progress for now (need more tests), that's why making it a draft PR.