diff --git a/core/src/main/scala/akka/kafka/internal/BaseSingleSourceLogic.scala b/core/src/main/scala/akka/kafka/internal/BaseSingleSourceLogic.scala index 7f86729f2..4f802f8e4 100644 --- a/core/src/main/scala/akka/kafka/internal/BaseSingleSourceLogic.scala +++ b/core/src/main/scala/akka/kafka/internal/BaseSingleSourceLogic.scala @@ -41,11 +41,16 @@ import scala.concurrent.{ExecutionContext, Future} private var requested = false private var requestId = 0 - protected val assignedCB: AsyncCallback[Set[TopicPartition]] = - getAsyncCallback[Set[TopicPartition]](partitionAssignedHandler) - protected val revokedCB: AsyncCallback[Set[TopicPartition]] = - getAsyncCallback[Set[TopicPartition]](partitionRevokedHandler) - protected val lostCB: AsyncCallback[Set[TopicPartition]] = getAsyncCallback[Set[TopicPartition]](partitionLostHandler) + private val assignedCB: AsyncCallback[Set[TopicPartition]] = getAsyncCallback[Set[TopicPartition]] { assignedTps => + tps ++= assignedTps + log.debug("Assigned partitions: {}. All partitions: {}", assignedTps, tps) + requestMessages() + } + + private val revokedCB: AsyncCallback[Set[TopicPartition]] = getAsyncCallback[Set[TopicPartition]] { revokedTps => + tps --= revokedTps + log.debug("Revoked partitions: {}. All partitions: {}", revokedTps, tps) + } override def preStart(): Unit = { super.preStart() @@ -55,7 +60,7 @@ import scala.concurrent.{ExecutionContext, Future} consumerActor = createConsumerActor() sourceActor.watch(consumerActor) - configureSubscription(assignedCB, revokedCB, lostCB) + configureSubscription(assignedCB, revokedCB) } protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = { @@ -85,17 +90,6 @@ import scala.concurrent.{ExecutionContext, Future} tps ++= topics.keySet } - protected def partitionAssignedHandler(assignedTps: Set[TopicPartition]): Unit = { - tps ++= assignedTps - log.debug("Assigned partitions: {}. All partitions: {}", assignedTps, tps) - requestMessages() - } - - protected def partitionRevokedHandler(revokedTps: Set[TopicPartition]): Unit = { - tps --= revokedTps - log.debug("Revoked partitions: {}. All partitions: {}", revokedTps, tps) - } - protected def partitionLostHandler(lostTps: Set[TopicPartition]): Unit = { tps --= lostTps log.debug("Lost partitions: {}. All partitions: {}", lostTps, tps) diff --git a/core/src/main/scala/akka/kafka/internal/PartitionAssignmentHelpers.scala b/core/src/main/scala/akka/kafka/internal/PartitionAssignmentHelpers.scala index 882b7c66f..03307c301 100644 --- a/core/src/main/scala/akka/kafka/internal/PartitionAssignmentHelpers.scala +++ b/core/src/main/scala/akka/kafka/internal/PartitionAssignmentHelpers.scala @@ -57,8 +57,7 @@ object PartitionAssignmentHelpers { final class AsyncCallbacks(subscription: AutoSubscription, sourceActor: ActorRef, partitionAssignedCB: AsyncCallback[Set[TopicPartition]], - partitionRevokedCB: AsyncCallback[Set[TopicPartition]], - partitionLostCB: AsyncCallback[Set[TopicPartition]]) + partitionRevokedCB: AsyncCallback[Set[TopicPartition]]) extends PartitionAssignmentHandler { override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = { diff --git a/core/src/main/scala/akka/kafka/internal/SingleSourceLogic.scala b/core/src/main/scala/akka/kafka/internal/SingleSourceLogic.scala index 4f1c5b6c1..f205e936a 100644 --- a/core/src/main/scala/akka/kafka/internal/SingleSourceLogic.scala +++ b/core/src/main/scala/akka/kafka/internal/SingleSourceLogic.scala @@ -80,15 +80,11 @@ import scala.concurrent.{Future, Promise} override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = lastRevoked = revokedTps - override def onAssign(assignedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = { - suspendDemand() + override def onAssign(assignedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = filterRevokedPartitionsCB.invoke(lastRevoked -- assignedTps) - } - override def onLost(lostTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = { - suspendDemand() + override def onLost(lostTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = filterRevokedPartitionsCB.invoke(lostTps) - } override def onStop(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = () } diff --git a/core/src/main/scala/akka/kafka/internal/SourceLogicBuffer.scala b/core/src/main/scala/akka/kafka/internal/SourceLogicBuffer.scala index a26bb88ae..c639d051e 100644 --- a/core/src/main/scala/akka/kafka/internal/SourceLogicBuffer.scala +++ b/core/src/main/scala/akka/kafka/internal/SourceLogicBuffer.scala @@ -25,17 +25,14 @@ private trait SourceLogicBuffer[K, V, Msg] { def out: Outlet[Msg] - protected def pump(): Unit - protected var buffer: Iterator[ConsumerRecord[K, V]] = Iterator.empty protected val filterRevokedPartitionsCB: AsyncCallback[Set[TopicPartition]] = getAsyncCallback[Set[TopicPartition]] { tps => - suspendDemand() filterRevokedPartitions(tps) } - protected def filterRevokedPartitions(topicPartitions: Set[TopicPartition]): Unit = { + private def filterRevokedPartitions(topicPartitions: Set[TopicPartition]): Unit = { if (topicPartitions.nonEmpty) { log.debug("filtering out messages from revoked partitions {}", topicPartitions) // as buffer is an Iterator the filtering will be applied during `pump` @@ -43,26 +40,6 @@ private trait SourceLogicBuffer[K, V, Msg] { val tp = new TopicPartition(record.topic, record.partition) topicPartitions.contains(tp) } - log.debug("filtering applied") } - resumeDemand() - } - - protected def suspendDemand(): Unit = { - log.debug("Suspend demand") - setHandler(out, new OutHandler { - override def onPull(): Unit = () - override def onDownstreamFinish(): Unit = - performShutdown() - }) - } - - protected def resumeDemand(): Unit = { - log.debug("Resume demand") - setHandler(out, new OutHandler { - override def onPull(): Unit = pump() - override def onDownstreamFinish(): Unit = - performShutdown() - }) } } diff --git a/core/src/main/scala/akka/kafka/internal/SourceLogicSubscription.scala b/core/src/main/scala/akka/kafka/internal/SourceLogicSubscription.scala index 357fd299f..6ae0c5a1a 100644 --- a/core/src/main/scala/akka/kafka/internal/SourceLogicSubscription.scala +++ b/core/src/main/scala/akka/kafka/internal/SourceLogicSubscription.scala @@ -5,6 +5,7 @@ package akka.kafka.internal import akka.actor.ActorRef +import akka.annotation.InternalApi import akka.kafka.{AutoSubscription, ManualSubscription, Subscription} import akka.kafka.Subscriptions._ import akka.kafka.scaladsl.PartitionAssignmentHandler @@ -20,7 +21,8 @@ import org.apache.kafka.common.TopicPartition * 1. Asynchronously by providing [[AsyncCallback]]s for rebalance events * 2. Synchronously by overriding `addToPartitionAssignmentHandler` */ -trait SourceLogicSubscription { +@InternalApi +private trait SourceLogicSubscription { self: GraphStageLogic => def subscription: Subscription @@ -29,8 +31,7 @@ trait SourceLogicSubscription { protected def sourceActor: StageActor protected def configureSubscription(partitionAssignedCB: AsyncCallback[Set[TopicPartition]], - partitionRevokedCB: AsyncCallback[Set[TopicPartition]], - partitionLostCB: AsyncCallback[Set[TopicPartition]]): Unit = { + partitionRevokedCB: AsyncCallback[Set[TopicPartition]]): Unit = { def rebalanceListener(autoSubscription: AutoSubscription): PartitionAssignmentHandler = { PartitionAssignmentHelpers.chain( @@ -38,8 +39,7 @@ trait SourceLogicSubscription { new PartitionAssignmentHelpers.AsyncCallbacks(autoSubscription, sourceActor.ref, partitionAssignedCB, - partitionRevokedCB, - partitionLostCB) + partitionRevokedCB) ) } diff --git a/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala b/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala index 06b236432..aa4f27501 100644 --- a/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala +++ b/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala @@ -92,7 +92,7 @@ private class SubSourceLogic[K, V, Msg]( consumerPromise.success(consumerActor) sourceActor.watch(consumerActor) - configureSubscription(partitionAssignedCB, partitionRevokedCB, partitionLostCB) + configureSubscription(partitionAssignedCB, partitionRevokedCB) } private val updatePendingPartitionsAndEmitSubSourcesCb = @@ -138,11 +138,6 @@ private class SubSourceLogic[K, V, Msg]( scheduleOnce(CloseRevokedPartitions, settings.waitClosePartition) } - private val partitionLostCB = getAsyncCallback[Set[TopicPartition]] { lost => - partitionsToRevoke ++= lost - scheduleOnce(CloseRevokedPartitions, settings.waitClosePartition) - } - private def seekAndEmitSubSources( formerlyUnknown: Set[TopicPartition], offsets: Map[TopicPartition, Long] @@ -438,34 +433,6 @@ private abstract class SubSourceStageLogic[K, V, Msg]( super.postStop() } - override protected def suspendDemand(): Unit = { - log.debug("Suspend demand") - setHandler( - out, - new OutHandler { - override def onPull(): Unit = () - override def onDownstreamFinish(): Unit = { - subSourceCancelledCb.invoke(tp -> onDownstreamFinishSubSourceCancellationStrategy()) - super.onDownstreamFinish() - } - } - ) - } - - override protected def resumeDemand(): Unit = { - log.debug("Resume demand") - setHandler( - out, - new OutHandler { - override def onPull(): Unit = pump() - override def onDownstreamFinish(): Unit = { - subSourceCancelledCb.invoke(tp -> onDownstreamFinishSubSourceCancellationStrategy()) - super.onDownstreamFinish() - } - } - ) - } - setHandler( shape.out, new OutHandler {