diff --git a/build.sbt b/build.sbt index 3c3ee042d..1d8e538ce 100644 --- a/build.sbt +++ b/build.sbt @@ -55,7 +55,8 @@ val commonSettings = Seq( "-Ywarn-dead-code", "-Ywarn-numeric-widen", "-Xfuture"), -testOptions += Tests.Argument(TestFrameworks.JUnit, "-q", "-v"), + testOptions += Tests.Argument("-oD"), + testOptions += Tests.Argument(TestFrameworks.JUnit, "-q", "-v"), scalariformPreferences := scalariformPreferences.value .setPreference(DoubleIndentConstructorArguments, true) .setPreference(PreserveSpaceBeforeArguments, true) diff --git a/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala b/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala index 9c85ea12e..0d58afbbd 100644 --- a/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala +++ b/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala @@ -155,7 +155,16 @@ private[kafka] abstract class SubSourceLogic[K, V, Msg]( super.postStop() } - override def performShutdown() = { + override def performStop(): Unit = { + setKeepGoing(true) + subSources.foreach { + case (_, control) => control.stop() + } + complete(shape.out) + onStop() + } + + override def performShutdown(): Unit = { setKeepGoing(true) //todo we should wait for subsources to be shutdown and next shutdown main stage subSources.foreach { diff --git a/core/src/test/scala/akka/kafka/internal/ConsumerTest.scala b/core/src/test/scala/akka/kafka/internal/ConsumerTest.scala index c567919b4..5c30afab3 100644 --- a/core/src/test/scala/akka/kafka/internal/ConsumerTest.scala +++ b/core/src/test/scala/akka/kafka/internal/ConsumerTest.scala @@ -539,8 +539,7 @@ class ConsumerTest(_system: ActorSystem) } } - // not implemented yet - ignore should "keep stage running after cancellation until all futures completed" in { + it should "keep stage running after cancellation until all futures completed" in { assertAllStagesStopped { val commitLog = new ConsumerMock.LogHandler() val mock = new ConsumerMock[K, V](commitLog) diff --git a/core/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala b/core/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala index 79523b881..76a6a8444 100644 --- a/core/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala +++ b/core/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala @@ -556,7 +556,7 @@ class IntegrationSpec extends TestKit(ActorSystem("IntegrationSpec")) } } - "complete partition sources when the main source control stopped" in pendingUntilFixed { + "complete partition sources when the main source control stopped" in { assertAllStagesStopped { val topic = createTopic(1) val group = createGroup(1)