Skip to content

Commit

Permalink
cFix doc
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Nov 13, 2024
1 parent 0fd4114 commit 3915b08
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ private[internal] trait Committer {
*
* If the queue is empty, nothing happens, unless executeOnEmpty is true.
*
* @param consumer
* Consumer with exclusive access
* @param commitAsync
* Function 'commitAsync' on the KafkaConsumer. This is isolated from the whole KafkaConsumer for testing purposes.
* The caller should ensure exclusive access to the KafkaConsumer.
* @param executeOnEmpty
* Execute commitAsync() even if there are no commits
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package zio.kafka.consumer.internal
import org.apache.kafka.clients.consumer.{OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetCommitCallback }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RebalanceInProgressException
import zio.kafka.consumer.Consumer.CommitTimeout
import zio.kafka.consumer.diagnostics.{DiagnosticEvent, Diagnostics}
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.internal.Committer.CommitOffsets
import zio.kafka.consumer.internal.LiveCommitter.Commit
import zio.{Chunk, Duration, Exit, Promise, Queue, Ref, Runtime, Scope, Task, UIO, Unsafe, ZIO, durationLong}
import zio.{ durationLong, Chunk, Duration, Exit, Promise, Queue, Ref, Runtime, Scope, Task, UIO, Unsafe, ZIO }

import java.util
import java.util.{Map => JavaMap}
import java.util.{ Map => JavaMap }
import scala.collection.mutable
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -39,16 +39,6 @@ private[consumer] final class LiveCommitter(
_ <- consumerMetrics.observeCommit(latency)
} yield ()

/**
* Takes commits from the queue, commits them and adds them to pending commits
*
* If the queue is empty, nothing happens, unless executeOnEmpty is true.
*
* @param consumer
* Consumer with exclusive access
* @param executeOnEmpty
* Execute commitAsync() even if there are no commits
*/
override def processQueuedCommits(
commitAsync: (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => Task[Unit],
executeOnEmpty: Boolean = false
Expand Down

0 comments on commit 3915b08

Please sign in to comment.