Skip to content

Commit

Permalink
Remove resume/suspend demand logic. PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Dec 4, 2019
1 parent a679973 commit f221da1
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 88 deletions.
28 changes: 11 additions & 17 deletions core/src/main/scala/akka/kafka/internal/BaseSingleSourceLogic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,16 @@ import scala.concurrent.{ExecutionContext, Future}
private var requested = false
private var requestId = 0

protected val assignedCB: AsyncCallback[Set[TopicPartition]] =
getAsyncCallback[Set[TopicPartition]](partitionAssignedHandler)
protected val revokedCB: AsyncCallback[Set[TopicPartition]] =
getAsyncCallback[Set[TopicPartition]](partitionRevokedHandler)
protected val lostCB: AsyncCallback[Set[TopicPartition]] = getAsyncCallback[Set[TopicPartition]](partitionLostHandler)
private val assignedCB: AsyncCallback[Set[TopicPartition]] = getAsyncCallback[Set[TopicPartition]] { assignedTps =>
tps ++= assignedTps
log.debug("Assigned partitions: {}. All partitions: {}", assignedTps, tps)
requestMessages()
}

private val revokedCB: AsyncCallback[Set[TopicPartition]] = getAsyncCallback[Set[TopicPartition]] { revokedTps =>
tps --= revokedTps
log.debug("Revoked partitions: {}. All partitions: {}", revokedTps, tps)
}

override def preStart(): Unit = {
super.preStart()
Expand All @@ -55,7 +60,7 @@ import scala.concurrent.{ExecutionContext, Future}
consumerActor = createConsumerActor()
sourceActor.watch(consumerActor)

configureSubscription(assignedCB, revokedCB, lostCB)
configureSubscription(assignedCB, revokedCB)
}

protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = {
Expand Down Expand Up @@ -85,17 +90,6 @@ import scala.concurrent.{ExecutionContext, Future}
tps ++= topics.keySet
}

protected def partitionAssignedHandler(assignedTps: Set[TopicPartition]): Unit = {
tps ++= assignedTps
log.debug("Assigned partitions: {}. All partitions: {}", assignedTps, tps)
requestMessages()
}

protected def partitionRevokedHandler(revokedTps: Set[TopicPartition]): Unit = {
tps --= revokedTps
log.debug("Revoked partitions: {}. All partitions: {}", revokedTps, tps)
}

protected def partitionLostHandler(lostTps: Set[TopicPartition]): Unit = {
tps --= lostTps
log.debug("Lost partitions: {}. All partitions: {}", lostTps, tps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ object PartitionAssignmentHelpers {
final class AsyncCallbacks(subscription: AutoSubscription,
sourceActor: ActorRef,
partitionAssignedCB: AsyncCallback[Set[TopicPartition]],
partitionRevokedCB: AsyncCallback[Set[TopicPartition]],
partitionLostCB: AsyncCallback[Set[TopicPartition]])
partitionRevokedCB: AsyncCallback[Set[TopicPartition]])
extends PartitionAssignmentHandler {

override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,11 @@ import scala.concurrent.{Future, Promise}
override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit =
lastRevoked = revokedTps

override def onAssign(assignedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = {
suspendDemand()
override def onAssign(assignedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit =
filterRevokedPartitionsCB.invoke(lastRevoked -- assignedTps)
}

override def onLost(lostTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = {
suspendDemand()
override def onLost(lostTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit =
filterRevokedPartitionsCB.invoke(lostTps)
}

override def onStop(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = ()
}
Expand Down
25 changes: 1 addition & 24 deletions core/src/main/scala/akka/kafka/internal/SourceLogicBuffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,44 +25,21 @@ private trait SourceLogicBuffer[K, V, Msg] {

def out: Outlet[Msg]

protected def pump(): Unit

protected var buffer: Iterator[ConsumerRecord[K, V]] = Iterator.empty

protected val filterRevokedPartitionsCB: AsyncCallback[Set[TopicPartition]] = getAsyncCallback[Set[TopicPartition]] {
tps =>
suspendDemand()
filterRevokedPartitions(tps)
}

protected def filterRevokedPartitions(topicPartitions: Set[TopicPartition]): Unit = {
private def filterRevokedPartitions(topicPartitions: Set[TopicPartition]): Unit = {
if (topicPartitions.nonEmpty) {
log.debug("filtering out messages from revoked partitions {}", topicPartitions)
// as buffer is an Iterator the filtering will be applied during `pump`
buffer = buffer.filterNot { record =>
val tp = new TopicPartition(record.topic, record.partition)
topicPartitions.contains(tp)
}
log.debug("filtering applied")
}
resumeDemand()
}

protected def suspendDemand(): Unit = {
log.debug("Suspend demand")
setHandler(out, new OutHandler {
override def onPull(): Unit = ()
override def onDownstreamFinish(): Unit =
performShutdown()
})
}

protected def resumeDemand(): Unit = {
log.debug("Resume demand")
setHandler(out, new OutHandler {
override def onPull(): Unit = pump()
override def onDownstreamFinish(): Unit =
performShutdown()
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package akka.kafka.internal
import akka.actor.ActorRef
import akka.annotation.InternalApi
import akka.kafka.{AutoSubscription, ManualSubscription, Subscription}
import akka.kafka.Subscriptions._
import akka.kafka.scaladsl.PartitionAssignmentHandler
Expand All @@ -20,7 +21,8 @@ import org.apache.kafka.common.TopicPartition
* 1. Asynchronously by providing [[AsyncCallback]]s for rebalance events
* 2. Synchronously by overriding `addToPartitionAssignmentHandler`
*/
trait SourceLogicSubscription {
@InternalApi
private trait SourceLogicSubscription {
self: GraphStageLogic =>

def subscription: Subscription
Expand All @@ -29,17 +31,15 @@ trait SourceLogicSubscription {
protected def sourceActor: StageActor

protected def configureSubscription(partitionAssignedCB: AsyncCallback[Set[TopicPartition]],
partitionRevokedCB: AsyncCallback[Set[TopicPartition]],
partitionLostCB: AsyncCallback[Set[TopicPartition]]): Unit = {
partitionRevokedCB: AsyncCallback[Set[TopicPartition]]): Unit = {

def rebalanceListener(autoSubscription: AutoSubscription): PartitionAssignmentHandler = {
PartitionAssignmentHelpers.chain(
addToPartitionAssignmentHandler(autoSubscription.partitionAssignmentHandler),
new PartitionAssignmentHelpers.AsyncCallbacks(autoSubscription,
sourceActor.ref,
partitionAssignedCB,
partitionRevokedCB,
partitionLostCB)
partitionRevokedCB)
)
}

Expand Down
35 changes: 1 addition & 34 deletions core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private class SubSourceLogic[K, V, Msg](
consumerPromise.success(consumerActor)
sourceActor.watch(consumerActor)

configureSubscription(partitionAssignedCB, partitionRevokedCB, partitionLostCB)
configureSubscription(partitionAssignedCB, partitionRevokedCB)
}

private val updatePendingPartitionsAndEmitSubSourcesCb =
Expand Down Expand Up @@ -138,11 +138,6 @@ private class SubSourceLogic[K, V, Msg](
scheduleOnce(CloseRevokedPartitions, settings.waitClosePartition)
}

private val partitionLostCB = getAsyncCallback[Set[TopicPartition]] { lost =>
partitionsToRevoke ++= lost
scheduleOnce(CloseRevokedPartitions, settings.waitClosePartition)
}

private def seekAndEmitSubSources(
formerlyUnknown: Set[TopicPartition],
offsets: Map[TopicPartition, Long]
Expand Down Expand Up @@ -438,34 +433,6 @@ private abstract class SubSourceStageLogic[K, V, Msg](
super.postStop()
}

override protected def suspendDemand(): Unit = {
log.debug("Suspend demand")
setHandler(
out,
new OutHandler {
override def onPull(): Unit = ()
override def onDownstreamFinish(): Unit = {
subSourceCancelledCb.invoke(tp -> onDownstreamFinishSubSourceCancellationStrategy())
super.onDownstreamFinish()
}
}
)
}

override protected def resumeDemand(): Unit = {
log.debug("Resume demand")
setHandler(
out,
new OutHandler {
override def onPull(): Unit = pump()
override def onDownstreamFinish(): Unit = {
subSourceCancelledCb.invoke(tp -> onDownstreamFinishSubSourceCancellationStrategy())
super.onDownstreamFinish()
}
}
)
}

setHandler(
shape.out,
new OutHandler {
Expand Down

0 comments on commit f221da1

Please sign in to comment.