Skip to content

Commit

Permalink
[Internal] Use SLF4J/Logback to configure logging in tests (#858)
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii authored May 24, 2023
1 parent f0cdd20 commit d941b29
Show file tree
Hide file tree
Showing 15 changed files with 67 additions and 71 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ lazy val kafkaClients = "org.apache.kafka" % "kafka-clients"
lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.10.0"
lazy val jacksonDatabind = "com.fasterxml.jackson.core" % "jackson-databind" % "2.15.1"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.7"
lazy val zioLogging = "dev.zio" %% "zio-logging-slf4j2" % "2.1.13"

enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin)

Expand Down Expand Up @@ -138,7 +139,8 @@ lazy val zioKafkaTest =
libraryDependencies ++= Seq(
kafkaClients,
jacksonDatabind,
logback % Test,
logback % Test,
zioLogging % Test,
scalaCollectionCompat
) ++ `embedded-kafka`.value
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import zio.test._

import java.util.concurrent.TimeoutException

object AdminSaslSpec extends ZIOSpecDefault with KafkaRandom {
object AdminSaslSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {

override def kafkaPrefix: String = "adminsaslspec"

Expand Down
3 changes: 2 additions & 1 deletion zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.apache.kafka.clients.admin.{ ConfigEntry, RecordsToDelete }
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.{ Node => JNode }
import zio._
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.admin.AdminClient.{
AlterConfigOp,
AlterConfigOpType,
Expand Down Expand Up @@ -34,7 +35,7 @@ import zio.test._
import java.util.UUID
import java.util.concurrent.TimeoutException

object AdminSpec extends ZIOSpecDefault with KafkaRandom {
object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {

override val kafkaPrefix: String = "adminspec"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import zio.test._
import zio.test.TestAspect.flaky
import zio.test.Assertion._

object KafkaFutureSpec extends ZIOSpecDefault {
object KafkaFutureSpec extends ZIOSpecDefaultSlf4j {
override def spec: Spec[Any, Nothing] =
suite("kafka future conversion")(
test("completes successfully") {
Expand Down
2 changes: 1 addition & 1 deletion zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

object ProducerSpec extends ZIOSpecDefault with KafkaRandom {
object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
override val kafkaPrefix: String = "producerspec"

def withConsumerInt(
Expand Down
26 changes: 0 additions & 26 deletions zio-kafka-test/src/test/scala/zio/kafka/TestLogger.scala

This file was deleted.

17 changes: 17 additions & 0 deletions zio-kafka-test/src/test/scala/zio/kafka/ZIOSpecDefaultSlf4j.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package zio.kafka

import zio.Chunk
import zio.logging.backend.SLF4J
import zio.test.{ TestAspect, TestAspectAtLeastR, TestEnvironment, ZIOSpecDefault }

/**
* Use this class instead of `ZIOSpecDefault` if you want your tests to use SLF4J to log.
*
* Useful when you want to use logback to configure your logger, for example.
*/
abstract class ZIOSpecDefaultSlf4j extends ZIOSpecDefault {

override def aspects: Chunk[TestAspectAtLeastR[TestEnvironment]] =
super.aspects :+ TestAspect.fromLayer(zio.Runtime.removeDefaultLoggers >>> SLF4J.slf4j)

}
56 changes: 27 additions & 29 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.apache.kafka.clients.consumer.{
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import zio._
import zio.kafka.TestLogger.logger
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval }
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.producer.TransactionalProducer
Expand All @@ -24,7 +24,7 @@ import zio.test._

import scala.reflect.ClassTag

object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
override val kafkaPrefix: String = "consumespec"

override def spec: Spec[TestEnvironment with Scope, Throwable] =
Expand Down Expand Up @@ -551,7 +551,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
Consumer
.partitionedStream(subscription, Serde.string, Serde.string)
.flatMapPar(Int.MaxValue) { case (tp, partStream) =>
val registerAssignment = ZStream.logInfo(s"Registering partition ${tp.partition()}") *>
val registerAssignment = ZStream.logDebug(s"Registering partition ${tp.partition()}") *>
ZStream.fromZIO {
allAssignments.update { current =>
current.get(instance) match {
Expand All @@ -560,7 +560,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
}
}
}
val deregisterAssignment = ZStream.logInfo(s"Deregistering partition ${tp.partition()}") *>
val deregisterAssignment = ZStream.logDebug(s"Deregistering partition ${tp.partition()}") *>
ZStream.finalizer {
allAssignments.update { current =>
current.get(instance) match {
Expand Down Expand Up @@ -640,7 +640,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {

for {
// Produce messages on several partitions
_ <- ZIO.logInfo("Starting test")
_ <- ZIO.logDebug("Starting test")
topic <- randomTopic
group <- randomGroup
client1 <- randomClient
Expand All @@ -662,11 +662,11 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
.partitionedAssignmentStream(subscription, Serde.string, Serde.string)
.rechunk(1)
.mapZIO { partitions =>
ZIO.logInfo(s"Got partition assignment ${partitions.map(_._1).mkString(",")}") *>
ZIO.logDebug(s"Got partition assignment ${partitions.map(_._1).mkString(",")}") *>
ZStream
.fromIterable(partitions)
.flatMapPar(Int.MaxValue) { case (tp, partitionStream) =>
ZStream.finalizer(ZIO.logInfo(s"TP ${tp.toString} finalizer")) *>
ZStream.finalizer(ZIO.logDebug(s"TP ${tp.toString} finalizer")) *>
partitionStream.mapChunksZIO { records =>
OffsetBatch(records.map(_.offset)).commit *> messagesReceived(tp.partition)
.update(_ + records.size)
Expand All @@ -677,7 +677,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
}
.mapZIO(_ =>
drainCount.updateAndGet(_ + 1).flatMap {
case 2 => ZIO.logInfo("Stopping consumption") *> Consumer.stopConsumption
case 2 => ZIO.logDebug("Stopping consumption") *> Consumer.stopConsumption
// 1: when consumer on fib2 starts
// 2: when consumer on fib2 stops, end of test
case _ => ZIO.unit
Expand Down Expand Up @@ -723,7 +723,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {

// Waiting until fib1's partition streams got restarted because of the rebalancing
_ <- drainCount.get.repeat(Schedule.recurUntil((n: Int) => n == 1) && Schedule.fixed(100.millis))
_ <- ZIO.logInfo("Consumer 1 finished rebalancing")
_ <- ZIO.logDebug("Consumer 1 finished rebalancing")

// All messages processed, the partition streams of fib are still running.
// Saving the values and resetting the counters
Expand All @@ -742,9 +742,9 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i"))
}
_ <- fib2.join
_ <- ZIO.logInfo("Consumer 2 done")
_ <- ZIO.logDebug("Consumer 2 done")
_ <- fib.join
_ <- ZIO.logInfo("Consumer 1 done")
_ <- ZIO.logDebug("Consumer 1 done")
// fib2 terminates after 20 messages, fib terminates after fib2 because of the rebalancing (drainCount==2)
messagesPerPartition0 <-
ZIO.foreach(messagesReceived0.values)(_.get) // counts from the first N messages (single consumer)
Expand Down Expand Up @@ -800,7 +800,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
.partitionedAssignmentStream(subscription, Serde.string, Serde.string)
.rechunk(1)
.mapZIOPar(16) { partitions =>
ZIO.logInfo(s"Consumer 1 got new partition assignment: ${partitions.map(_._1.toString)}") *>
ZIO.logDebug(s"Consumer 1 got new partition assignment: ${partitions.map(_._1.toString)}") *>
ZStream
.fromIterable(partitions.map(_._2))
.flatMapPar(Int.MaxValue)(s => s)
Expand All @@ -822,7 +822,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {

_ <- messagesReceivedConsumer1.get
.repeat(Schedule.recurUntil((n: Int) => n >= 20) && Schedule.fixed(100.millis))
_ <- ZIO.logInfo("Starting consumer 2")
_ <- ZIO.logDebug("Starting consumer 2")

fib2 <-
ZIO
Expand Down Expand Up @@ -875,11 +875,11 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
onRevoked = (_, _) =>
streamCompleteOnRebalanceRef.get.flatMap {
case Some(p) =>
ZIO.logWarning("onRevoked, awaiting stream completion") *>
ZIO.logDebug("onRevoked, awaiting stream completion") *>
p.await.timeoutFail(new InterruptedException("Timed out waiting stream to complete"))(1.minute)
case None => ZIO.unit
},
onLost = (_, _) => ZIO.logWarning("Lost some partitions")
onLost = (_, _) => ZIO.logDebug("Lost some partitions")
)

def makeCopyingTransactionalConsumer(
Expand All @@ -895,7 +895,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
for {
consumedMessagesCounter <- Ref.make(0)
_ <- consumedMessagesCounter.get
.flatMap(consumed => ZIO.logInfo(s"Consumed so far: $consumed"))
.flatMap(consumed => ZIO.logDebug(s"Consumed so far: $consumed"))
.repeat(Schedule.fixed(1.second))
.fork
streamCompleteOnRebalanceRef <- Ref.make[Option[Promise[Nothing, Unit]]](None)
Expand All @@ -906,7 +906,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
for {
p <- Promise.make[Nothing, Unit]
_ <- streamCompleteOnRebalanceRef.set(Some(p))
_ <- ZIO.logInfo(s"${assignedPartitions.size} partitions assigned")
_ <- ZIO.logDebug(s"${assignedPartitions.size} partitions assigned")
_ <- consumerCreated.succeed(())
partitionStreams = assignedPartitions.map(_._2)
s <- ZStream
Expand All @@ -931,7 +931,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
_ <- streamCompleteOnRebalanceRef.set(None)
_ <- p.succeed(())
c <- consumedMessagesCounter.get
_ <- ZIO.logInfo(s"Consumed $c messages")
_ <- ZIO.logDebug(s"Consumed $c messages")
} yield ()
}
} yield s
Expand All @@ -951,7 +951,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
rebalanceListener = transactionalRebalanceListener(streamCompleteOnRebalanceRef)
)
)
.tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logInfo("Done")
.tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done")
} yield tConsumer
}

Expand All @@ -968,7 +968,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {

copyingGroup <- randomGroup

_ <- ZIO.logInfo("Starting copier 1")
_ <- ZIO.logDebug("Starting copier 1")
copier1ClientId = copyingGroup + "-1"
copier1Created <- Promise.make[Nothing, Unit]
copier1 <- makeCopyingTransactionalConsumer(
Expand All @@ -982,7 +982,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
).fork
_ <- copier1Created.await

_ <- ZIO.logInfo("Starting copier 2")
_ <- ZIO.logDebug("Starting copier 2")
copier2ClientId = copyingGroup + "-2"
copier2Created <- Promise.make[Nothing, Unit]
copier2 <- makeCopyingTransactionalConsumer(
Expand All @@ -994,13 +994,13 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
tProducer,
copier2Created
).fork
_ <- ZIO.logInfo("Waiting for copier 2 to start")
_ <- ZIO.logDebug("Waiting for copier 2 to start")
_ <- copier2Created.await

_ <- ZIO.logInfo("Producing some more messages")
_ <- ZIO.logDebug("Producing some more messages")
_ <- produceMany(topicA, messagesAfterRebalance)

_ <- ZIO.logInfo("Collecting messages on topic B")
_ <- ZIO.logDebug("Collecting messages on topic B")
groupB <- randomGroup
validatorClientId <- randomClient
messagesOnTopicB <- ZIO.logAnnotate("consumer", "validator") {
Expand All @@ -1017,7 +1017,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200")
)
)
.tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logInfo("Done")
.tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done")
}
_ <- copier1.interrupt
_ <- copier2.interrupt
Expand All @@ -1044,7 +1044,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
_ <- produceOne(topic, "key1", "message1")
_ <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.debug
.tap(r => ZIO.logDebug(r.toString))
.foreach(_ => recordsOut.offer(()))
.forkScoped
_ <- recordsOut.take // first record consumed
Expand All @@ -1056,9 +1056,7 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
)
.provideSome[Scope & Kafka](producer)
.provideSomeShared[Scope](
Kafka.embedded,
Runtime.removeDefaultLoggers,
Runtime.addLogger(logger())
Kafka.embedded
) @@ withLiveClock @@ TestAspect.sequential @@ timeout(5.minutes)

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package zio.kafka.consumer
import io.github.embeddedkafka.EmbeddedKafka
import org.apache.kafka.clients.consumer.ConsumerConfig
import zio._
import zio.kafka.TestLogger.logger
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.kafka.testkit.KafkaTestUtils._
Expand All @@ -13,7 +13,7 @@ import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

object SubscriptionsSpec extends ZIOSpecDefault with KafkaRandom {
object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
override val kafkaPrefix: String = "subscriptionsspec"

override def spec: Spec[TestEnvironment with Scope, Throwable] = suite("Consumer subscriptions")(
Expand Down Expand Up @@ -207,8 +207,6 @@ object SubscriptionsSpec extends ZIOSpecDefault with KafkaRandom {
)
.provideSome[Scope & Kafka](producer)
.provideSomeShared[Scope](
Kafka.embedded,
Runtime.removeDefaultLoggers,
Runtime.addLogger(logger())
Kafka.embedded
) @@ withLiveClock @@ TestAspect.sequential @@ timeout(5.minutes)
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package zio.kafka.consumer.internal

import zio.Scope
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.internal.PollHistory.PollHistoryImpl
import zio.test._

object PollHistorySpec extends ZIOSpecDefault {
object PollHistorySpec extends ZIOSpecDefaultSlf4j {
override def spec: Spec[TestEnvironment with Scope, Any] = suite("PollHistorySpec")(
test("optimisticResume for listed pattern") {
assertTrue("011111".toPollHistory.optimisticResume)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package zio.kafka.security
import zio.test.Assertion._
import zio.test._
import zio.Scope
import zio.kafka.ZIOSpecDefaultSlf4j

object KafkaCredentialStoreSpec extends ZIOSpecDefault {
object KafkaCredentialStoreSpec extends ZIOSpecDefaultSlf4j {
override def spec: Spec[Environment with TestEnvironment with Scope, Any] = suite("KafkaCredentialStore")(
suite("fromPemStrigs")(
test("KafkaCredentialStore.properties works properly") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import zio._
import zio.test.Assertion._
import zio.test._
import zio.ZAny
import zio.kafka.ZIOSpecDefaultSlf4j

object DeserializerSpec extends ZIOSpecDefault {
object DeserializerSpec extends ZIOSpecDefaultSlf4j {
override def spec: Spec[ZAny with Any, Throwable] = suite("Deserializer")(
suite("asOption")(
test("deserialize to None when value is null") {
Expand Down
Loading

0 comments on commit d941b29

Please sign in to comment.