Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Prefer Tuples over complex SubscriptionStreamControl interface #1207

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 37 additions & 32 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -578,29 +578,34 @@ private[consumer] final class ConsumerLive private[consumer] (
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZIO[Scope, Throwable, SubscriptionStreamControl[
Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]]
]] = {
val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray)
for {
streamControl <- runloopAccess.subscribe(subscription)
} yield SubscriptionStreamControl(
streamControl.stream
.map(_.exit)
.flattenExitOption
.map {
_.collect {
case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) =>
val stream: ZStream[R, Throwable, CommittableRecord[K, V]] =
if (onlyByteArraySerdes)
partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]]
else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer)))

tp -> stream
}
},
streamControl.stop
): ZIO[
Scope,
Throwable,
(
Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]],
SubscriptionStreamControl
)
] = {
val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray)

runloopAccess
.subscribe(subscription)
.map { case (stream, control) =>
stream
.map(_.exit)
.flattenExitOption
.map {
_.collect {
case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) =>
val stream: ZStream[R, Throwable, CommittableRecord[K, V]] =
if (onlyByteArraySerdes)
partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]]
else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer)))

tp -> stream
}
} -> control
}
}

override def partitionedAssignmentStream[R, K, V](
Expand All @@ -617,8 +622,8 @@ private[consumer] final class ConsumerLive private[consumer] (
]
] = ZStream.unwrapScoped {
for {
streamControl <- partitionedAssignmentStreamWithControl(subscription, keyDeserializer, valueDeserializer)
} yield streamControl.stream
(stream, _) <- partitionedAssignmentStreamWithControl(subscription, keyDeserializer, valueDeserializer)
} yield stream
}

override def partitionedAssignmentStreamWithGracefulShutdown[R, K, V](
Expand Down Expand Up @@ -653,8 +658,8 @@ private[consumer] final class ConsumerLive private[consumer] (
)
] = ZStream.unwrapScoped {
for {
streamControl <- partitionedAssignmentStreamWithControl(subscription, keyDeserializer, valueDeserializer)
} yield streamControl.stream.flattenChunks
(stream, _) <- partitionedAssignmentStreamWithControl(subscription, keyDeserializer, valueDeserializer)
} yield stream.flattenChunks
}

override def partitionedStreamWithGracefulShutdown[R, K, V](
Expand Down Expand Up @@ -682,8 +687,8 @@ private[consumer] final class ConsumerLive private[consumer] (
): ZStream[R, Throwable, CommittableRecord[K, V]] =
ZStream.unwrapScoped {
for {
streamControl <- partitionedAssignmentStreamWithControl(subscription, keyDeserializer, valueDeserializer)
} yield streamControl.stream.flattenChunks.flatMapPar(n = Int.MaxValue, bufferSize = bufferSize)(_._2)
(stream, _) <- partitionedAssignmentStreamWithControl(subscription, keyDeserializer, valueDeserializer)
} yield stream.flattenChunks.flatMapPar(n = Int.MaxValue, bufferSize = bufferSize)(_._2)
}

override def plainStreamWithGracefulShutdown[R, K, V](
Expand Down Expand Up @@ -715,16 +720,16 @@ private[consumer] final class ConsumerLive private[consumer] (
* typically something like `stream => stream.mapZIO(record => ZIO.debug(record)).mapZIO(_.offset.commit)`
*/
private def runWithGracefulShutdown[StreamType <: ZStream[_, _, _], R, E](
streamControl: ZIO[Scope, E, SubscriptionStreamControl[StreamType]],
streamControl: ZIO[Scope, E, (StreamType, SubscriptionStreamControl)],
shutdownTimeout: Duration
)(
withStream: StreamType => ZIO[R, E, Any]
): ZIO[R, E, Any] =
ZIO.scoped[R] {
for {
control <- streamControl
fib <- withStream(control.stream).forkScoped
result <- fib.join.onInterrupt(control.stop *> fib.join.timeout(shutdownTimeout).ignore)
(stream, control) <- streamControl
fib <- withStream(stream).forkScoped
result <- fib.join.onInterrupt(control.stop *> fib.join.timeout(shutdownTimeout).ignore)
} yield result
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,13 @@
package zio.kafka.consumer
import zio.UIO
import zio.stream.ZStream

/**
* Allows graceful shutdown of a stream, where no more records are being fetched but the in-flight records can continue
* to be processed and their offsets committed.
*
* As long as this object is in scope, the Kafka consumer remains subscribed.
*
* @tparam S
* Type of the stream returned from [[stream]]
*/
private[consumer] trait SubscriptionStreamControl[S <: ZStream[_, _, _]] {

/**
* The stream of partitions / records for this subscription
*/
def stream: S
private[consumer] trait SubscriptionStreamControl {

/**
* Stop fetching records for the subscribed topic-partitions and end the associated streams, while allowing commits to
Expand All @@ -26,9 +17,8 @@ private[consumer] trait SubscriptionStreamControl[S <: ZStream[_, _, _]] {
}

private[consumer] object SubscriptionStreamControl {
def apply[S <: ZStream[_, _, _]](stream0: S, stop0: UIO[Unit]): SubscriptionStreamControl[S] =
new SubscriptionStreamControl[S] {
override def stream: S = stream0
def apply(stop0: UIO[Unit]): SubscriptionStreamControl =
new SubscriptionStreamControl {
override def stop: UIO[Unit] = stop0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private[consumer] final class RunloopAccess private (
*/
def subscribe(
subscription: Subscription
): ZIO[Scope, InvalidSubscriptionUnion, SubscriptionStreamControl[UStream[Take[Throwable, PartitionAssignment]]]] =
): ZIO[Scope, InvalidSubscriptionUnion, (UStream[Take[Throwable, PartitionAssignment]], SubscriptionStreamControl)] =
for {
partitionAssignmentQueue <- Queue.unbounded[Take[Throwable, PartitionAssignment]]
stream <- ZStream.fromHubScoped(partitionHub)
Expand All @@ -66,8 +66,7 @@ private[consumer] final class RunloopAccess private (
withRunloopZIO(requireRunning = false)(_.removeSubscription(subscription)) <*
diagnostics.emit(Finalization.SubscriptionFinalized)
}
} yield SubscriptionStreamControl(
ZStream.fromQueue(partitionAssignmentQueue),
} yield ZStream.fromQueue(partitionAssignmentQueue) -> SubscriptionStreamControl(
withRunloopZIO(requireRunning = true)(_.stopSubscribedTopicPartitions(subscription)) *> partitionAssignmentQueue
.offer(Take.end)
.ignore
Expand Down
Loading