Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalance: filter messages of revoked partitions (partitioned) #992

Merged

Conversation

seglo
Copy link
Contributor

@seglo seglo commented Nov 28, 2019

Purpose

A solution to the same issue as described in #872, but for partitioned sources

When a partition is revoked from an Alpakka Kafka consumer, it continues to emit data which was buffered before. This can be up to max.poll.records messages that got fetched just before the revoke was issued.

Instead, the stage should drop messages from its buffer when it doesn't have the partition assigned anymore.

References

Changes

  • Implement AlpakkaAssignor test class to deterministically control which partitions are assigned to which consumer group members in RebalanceSpec
  • Re-order default async rebalance handler events to occur after synchronous ones.
  • Suspend demand as soon as rebalance event that triggers filtering (on consumer polling thread) is fired to prevent the stage from processing messages on its own thread.
  • Factor out SourceLogicBuffer to manage source buffers consistently across single and partitioned sources
  • Factor out SourceLogicSubscription to manage rebalance handler events consistently across single and partitioned sources
  • New test in RebalanceSpec to demonstrate/resolve issue. Use default max.poll.records to reproduce failure before fix

@seglo
Copy link
Contributor Author

seglo commented Nov 29, 2019

The original RebalanceSpec fails transiently. It's been reported in other builds with #973.

This may be another candidate for using the new blocking partition rebalance implementation.

1/10 failures when run on master

sbt:akka-stream-kafka> tests/testOnly *.RebalanceSpec -- -DtimesToRepeat=10
[info] Formatting 2 Scala sources...
[info] Compiling 2 Scala sources to /home/seglo/source/alpakka-kafka/tests/target/scala-2.12/test-classes ...
[info] Done compiling.
[info] Compiling 12 Scala sources to /home/seglo/source/alpakka-kafka/tests/target/scala-2.12/test-classes ...
[info] Done compiling.
[info] RebalanceSpec:
        ℹ︎ Checking the system...
        ✔ Docker version should be at least 1.6.0
        ✔ Docker environment should have more than 2GB free disk space
[info] Fetched records
[info] - must actually show even if partition is revoked (10 seconds, 944 milliseconds)
[info] - must actually show even if partition is revoked (9 seconds, 722 milliseconds)
[info] - must actually show even if partition is revoked (9 seconds, 757 milliseconds)
[info] - must actually show even if partition is revoked (9 seconds, 755 milliseconds)
[info] - must actually show even if partition is revoked *** FAILED *** (8 seconds, 718 milliseconds)
[info]   java.lang.AssertionError: assertion failed: expected TopicPartitionsAssigned(TopicSubscription(Set(topic--1287195221-9),Some(Actor[akka://Spec/system/testProbe-28#1790430185]),EmptyPartitionAssignmentHandler),Set(topic--1287195221-9-0)), found TopicPartitionsAssigned(TopicSubscription(Set(topic--1287195221-9),Some(Actor[akka://Spec/system/testProbe-28#1790430185]),EmptyPartitionAssignmentHandler),Set(topic--1287195221-9-1))
[info]   at scala.Predef$.assert(Predef.scala:223)
[info]   at akka.testkit.TestKitBase.expectMsg_internal(TestKit.scala:419)
[info]   at akka.testkit.TestKitBase.expectMsg(TestKit.scala:395)
[info]   at akka.testkit.TestKitBase.expectMsg$(TestKit.scala:395)
[info]   at akka.testkit.TestKit.expectMsg(TestKit.scala:923)
[info]   at akka.kafka.scaladsl.RebalanceSpec.$anonfun$new$3(RebalanceSpec.scala:79)
[info]   at akka.stream.testkit.scaladsl.StreamTestKit$.assertAllStagesStopped(StreamTestKit.scala:32)
[info]   at akka.kafka.scaladsl.RebalanceSpec.$anonfun$new$2(RebalanceSpec.scala:34)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.WordSpecLike$$anon$3.apply(WordSpecLike.scala:1075)
[info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
[info]   at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
[info]   at akka.kafka.scaladsl.SpecBase.withFixture(SpecBase.scala:14)
[info]   at org.scalatest.WordSpecLike.invokeWithFixture$1(WordSpecLike.scala:1073)
[info]   at org.scalatest.WordSpecLike.$anonfun$runTest$1(WordSpecLike.scala:1085)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
[info]   at org.scalatest.WordSpecLike.runTest(WordSpecLike.scala:1085)
[info]   at org.scalatest.WordSpecLike.runTest$(WordSpecLike.scala:1067)
[info]   at akka.kafka.scaladsl.SpecBase.akka$kafka$Repeated$$super$runTest(SpecBase.scala:14)
[info]   at akka.kafka.Repeated.run0$1(Repeated.scala:25)
[info]   at akka.kafka.Repeated.$anonfun$runTest$1(Repeated.scala:26)
[info]   at org.scalatest.Status.$anonfun$thenRun$1(Status.scala:225)
[info]   at org.scalatest.Status.$anonfun$thenRun$1$adapted(Status.scala:223)
[info]   at org.scalatest.SucceededStatus$.whenCompleted(Status.scala:454)
[info]   at org.scalatest.Status.thenRun(Status.scala:223)
[info]   at org.scalatest.Status.thenRun$(Status.scala:221)
[info]   at org.scalatest.SucceededStatus$.thenRun(Status.scala:426)
[info]   at akka.kafka.Repeated.run0$1(Repeated.scala:26)
[info]   at akka.kafka.Repeated.$anonfun$runTest$1(Repeated.scala:26)
[info]   at org.scalatest.Status.$anonfun$thenRun$1(Status.scala:225)
[info]   at org.scalatest.Status.$anonfun$thenRun$1$adapted(Status.scala:223)
[info]   at org.scalatest.SucceededStatus$.whenCompleted(Status.scala:454)
[info]   at org.scalatest.Status.thenRun(Status.scala:223)
[info]   at org.scalatest.Status.thenRun$(Status.scala:221)
[info]   at org.scalatest.SucceededStatus$.thenRun(Status.scala:426)
[info]   at akka.kafka.Repeated.run0$1(Repeated.scala:26)
[info]   at akka.kafka.Repeated.$anonfun$runTest$1(Repeated.scala:26)
[info]   at org.scalatest.Status.$anonfun$thenRun$1(Status.scala:225)
[info]   at org.scalatest.Status.$anonfun$thenRun$1$adapted(Status.scala:223)
[info]   at org.scalatest.SucceededStatus$.whenCompleted(Status.scala:454)
[info]   at org.scalatest.Status.thenRun(Status.scala:223)
[info]   at org.scalatest.Status.thenRun$(Status.scala:221)
[info]   at org.scalatest.SucceededStatus$.thenRun(Status.scala:426)
[info]   at akka.kafka.Repeated.run0$1(Repeated.scala:26)
[info]   at akka.kafka.Repeated.$anonfun$runTest$1(Repeated.scala:26)
[info]   at org.scalatest.Status.$anonfun$thenRun$1(Status.scala:225)
[info]   at org.scalatest.Status.$anonfun$thenRun$1$adapted(Status.scala:223)
[info]   at org.scalatest.SucceededStatus$.whenCompleted(Status.scala:454)
[info]   at org.scalatest.Status.thenRun(Status.scala:223)
[info]   at org.scalatest.Status.thenRun$(Status.scala:221)
[info]   at org.scalatest.SucceededStatus$.thenRun(Status.scala:426)
[info]   at akka.kafka.Repeated.run0$1(Repeated.scala:26)
[info]   at akka.kafka.Repeated.runTest(Repeated.scala:29)
[info]   at akka.kafka.Repeated.runTest$(Repeated.scala:23)
[info]   at akka.kafka.scaladsl.SpecBase.runTest(SpecBase.scala:14)
[info]   at org.scalatest.WordSpecLike.$anonfun$runTests$1(WordSpecLike.scala:1144)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393)
[info]   at scala.collection.immutable.List.foreach(List.scala:392)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:370)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:407)
[info]   at scala.collection.immutable.List.foreach(List.scala:392)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
[info]   at org.scalatest.WordSpecLike.runTests(WordSpecLike.scala:1144)
[info]   at org.scalatest.WordSpecLike.runTests$(WordSpecLike.scala:1143)
[info]   at akka.kafka.scaladsl.SpecBase.runTests(SpecBase.scala:14)
[info]   at org.scalatest.Suite.run(Suite.scala:1124)
[info]   at org.scalatest.Suite.run$(Suite.scala:1106)
[info]   at akka.kafka.testkit.scaladsl.ScalatestKafkaSpec.org$scalatest$BeforeAndAfterAll$$super$run(ScalatestKafkaSpec.scala:11)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at akka.kafka.scaladsl.SpecBase.org$scalatest$WordSpecLike$$super$run(SpecBase.scala:14)
[info]   at org.scalatest.WordSpecLike.$anonfun$run$1(WordSpecLike.scala:1189)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
[info]   at org.scalatest.WordSpecLike.run(WordSpecLike.scala:1189)
[info]   at org.scalatest.WordSpecLike.run$(WordSpecLike.scala:1187)
[info]   at akka.kafka.scaladsl.SpecBase.run(SpecBase.scala:14)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:317)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:510)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:304)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)
[info] - must actually show even if partition is revoked (9 seconds, 788 milliseconds)
[info] - must actually show even if partition is revoked (9 seconds, 721 milliseconds)
[info] - must actually show even if partition is revoked (9 seconds, 730 milliseconds)
[info] - must actually show even if partition is revoked (9 seconds, 708 milliseconds)
[info] - must actually show even if partition is revoked (9 seconds, 719 milliseconds)
[info] ScalaTest
[info] Run completed in 1 minute, 59 seconds.
[info] Total number of tests run: 10
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 9, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***
[error] Failed: Total 10, Failed 1, Errors 0, Passed 9
[error] Failed tests:
[error]         akka.kafka.scaladsl.RebalanceSpec
[error] (tests / Test / testOnly) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 134 s, completed 29-Nov-2019 5:26:10 PM

@seglo seglo force-pushed the seglo/drop-revoked-partitions-subsources-test branch 2 times, most recently from ab69dbb to e707964 Compare December 3, 2019 03:15
@seglo seglo force-pushed the seglo/drop-revoked-partitions-subsources-test branch from c3b1201 to 34da701 Compare December 3, 2019 16:44
@seglo seglo changed the title Illustrate how max.poll.records affects buffer in partitioned sources Filter out messages for partitions that are balanced away in partitioned sources Dec 3, 2019
@seglo seglo requested a review from ennru December 3, 2019 21:40
Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to factor out the buffer and the subscription!
Trolling the demand seems dangerous, please make it log a reason when switching.

* duplicate messages sent downstream.
*/
@InternalApi
private trait SourceLogicBuffer[K, V, Msg] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does a private trait mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I'm surprised it doesn't introduce a compiler error TBH. I switched it to package protected like your DeferredProducer trait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some more of these BTW. PromiseControl is private.

@seglo
Copy link
Contributor Author

seglo commented Dec 4, 2019

Trolling the demand seems dangerous, please make it log a reason when switching.

It seems this isn't actually necessary. I'll remove it. I originally tried this before introducing the async callback, which appears to negate the message processing concurrency issue as well.

@seglo seglo force-pushed the seglo/drop-revoked-partitions-subsources-test branch from 8771233 to f221da1 Compare December 4, 2019 16:59
@seglo seglo force-pushed the seglo/drop-revoked-partitions-subsources-test branch from f221da1 to f86bc1d Compare December 4, 2019 17:23
@seglo seglo requested a review from ennru December 4, 2019 17:23
Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@ennru ennru changed the title Filter out messages for partitions that are balanced away in partitioned sources Rebalance: filter messages of revoked partitions (partitioned) Dec 5, 2019
@ennru ennru merged commit b493f5b into akka:master Dec 5, 2019
@ennru ennru added this to the 2.0.0 milestone Dec 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants