Skip to content

Commit

Permalink
Remove buffered records (#744)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten authored Mar 25, 2023
1 parent cc1ac86 commit b350cb0
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ private[consumer] class ConsumerAccess(
access: Semaphore
) {
def withConsumer[A](f: ByteArrayKafkaConsumer => A): Task[A] =
withConsumerM[Any, A](c => ZIO.attempt(f(c)))
withConsumerZIO[Any, A](c => ZIO.attempt(f(c)))

def withConsumerM[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] =
def withConsumerZIO[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] =
access.withPermit(withConsumerNoPermit(f))

private[consumer] def withConsumerNoPermit[R, A](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ private[internal] class PartitionStreamControl private (
LogAnnotation("partition", tp.partition().toString)
)

/** Offer new data for the stream to process. */
def offerRecords(data: Chunk[ByteArrayCommittableRecord]): ZIO[Any, Nothing, Unit] =
dataQueue.offer(Take.chunk(data)).unit

/** To be invoked when the partition was lost. */
def lost(): UIO[Boolean] =
interruptPromise.fail(new RuntimeException(s"Partition ${tp.toString} was lost"))
Expand All @@ -31,18 +35,6 @@ private[internal] class PartitionStreamControl private (
dataQueue.offer(Take.end).unit
}

/** To be invoked when the partition was revoked or otherwise needs to be ended, after the last data is processed. */
def endWith(data: Chunk[ByteArrayCommittableRecord]): ZIO[Any, Nothing, Unit] =
logAnnotate {
ZIO.logTrace(s"Partition ${tp.toString} ending after ${data.size} records") *> {
if (data.isEmpty) {
dataQueue.offer(Take.end).unit
} else {
dataQueue.offerAll(List(Take.chunk(data), Take.end)).unit
}
}
}

/** Returns true when the stream is done. */
def isCompleted: ZIO[Any, Nothing, Boolean] =
completedPromise.isDone
Expand Down
Loading

0 comments on commit b350cb0

Please sign in to comment.