diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala index e5a4ac71d..e30c4051e 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala @@ -195,8 +195,9 @@ object RunloopSpec extends ZIOSpecDefaultSlf4j { ): ZIO[Scope, Throwable, TestResult] = ZIO.scoped { for { - consumerAccess <- ConsumerAccess.make(mockConsumer) - consumerScope <- ZIO.scope + access <- Semaphore.make(1) + consumerAccess = new ConsumerAccess(mockConsumer, access) + consumerScope <- ZIO.scope partitionsHub <- ZIO .acquireRelease(Hub.unbounded[Take[Throwable, PartitionAssignment]])(_.shutdown) .provide(ZLayer.succeed(consumerScope)) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 776e403e8..fb43e0cbb 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -199,15 +199,54 @@ object Consumer { * * You are responsible for creating and closing the KafkaConsumer. Make sure auto.commit is disabled. */ + @deprecated("Use fromJavaConsumerWithPermit", since = "2.8.4") def fromJavaConsumer( javaConsumer: JConsumer[Array[Byte], Array[Byte]], settings: ConsumerSettings, diagnostics: Diagnostics = Diagnostics.NoOp ): ZIO[Scope, Throwable, Consumer] = for { - _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized)) - consumerAccess <- ConsumerAccess.make(javaConsumer) - runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics) + _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized)) + access <- Semaphore.make(1) + consumerAccess = new ConsumerAccess(javaConsumer, access) + runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics) + } yield new ConsumerLive(consumerAccess, runloopAccess) + + /** + * Create a zio-kafka [[Consumer]] from an `org.apache.kafka KafkaConsumer`. + * + * You are responsible for all of the following: + * - creating and closing the `KafkaConsumer`, + * - making sure `auto.commit` is disabled, + * - creating `access` as a fair semaphore with a single permit, + * - acquire a permit from `access` before using the consumer, and release if afterwards, + * - not using the following consumer methods: `subscribe`, `unsubscribe`, `assign`, `poll`, `commit*`, `seek`, + * `pause`, `resume`, and `enforceRebalance`. + * + * Any deviation of these rules is likely to cause hard to track errors. + * + * Semaphore `access` is shared between you and the zio-kafka consumer. Use it as short as possible; while you hold a + * permit the zio-kafka consumer is blocked. + * + * @param javaConsumer + * Consumer + * @param settings + * Settings + * @param access + * A Semaphore with 1 permit. + * @param diagnostics + * Optional diagnostics listener + */ + def fromJavaConsumerWithPermit( + javaConsumer: JConsumer[Array[Byte], Array[Byte]], + settings: ConsumerSettings, + access: Semaphore, + diagnostics: Diagnostics = Diagnostics.NoOp + ): ZIO[Scope, Throwable, Consumer] = + for { + _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized)) + consumerAccess = new ConsumerAccess(javaConsumer, access) + runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics) } yield new ConsumerLive(consumerAccess, runloopAccess) /** diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala index 54690dc47..2c4aeb80c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala @@ -63,9 +63,4 @@ private[consumer] object ConsumerAccess { ZIO.blocking(access.withPermit(ZIO.attempt(consumer.close(settings.closeTimeout)))).orDie } } yield new ConsumerAccess(consumer, access) - - def make(consumer: ByteArrayKafkaConsumer): ZIO[Scope, Throwable, ConsumerAccess] = - for { - access <- Semaphore.make(1) - } yield new ConsumerAccess(consumer, access) }