Skip to content

Commit

Permalink
Merge branch 'master' into update/sbt-native-packager-1.10.4
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland authored Aug 20, 2024
2 parents 75d476d + b842a18 commit 0a57cdc
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 12 deletions.
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import MimaSettings.mimaSettings
lazy val binCompatVersionToCompare = None // Some("2.8.0")

lazy val kafkaVersion = "3.7.1"
lazy val embeddedKafkaVersion = "3.7.0" // Should be the same as kafkaVersion, except for the patch part
lazy val embeddedKafkaVersion = "3.7.1.1" // Should be the same as kafkaVersion, except for the patch part

lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.5.6"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.5.7"

enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin)

Expand All @@ -25,7 +25,7 @@ lazy val _scala3 = "3.3.3"
inThisBuild(
List(
name := "ZIO Kafka",
zioVersion := "2.1.6",
zioVersion := "2.1.7",
scalaVersion := _scala213,
// zio-sbt defines these 'scala213' and 'scala3' settings, but we need to define them here to override the defaults and better control them
scala213 := _scala213,
Expand Down Expand Up @@ -179,13 +179,13 @@ lazy val zioKafkaExample =
.settings(run / fork := false)
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "2.1.6",
"dev.zio" %% "zio" % "2.1.7",
"dev.zio" %% "zio-kafka" % "2.8.0",
"dev.zio" %% "zio-logging-slf4j2" % "2.3.0",
"io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion,
logback,
"dev.zio" %% "zio-kafka-testkit" % "2.8.0" % Test,
"dev.zio" %% "zio-test" % "2.1.6" % Test
"dev.zio" %% "zio-test" % "2.1.7" % Test
),
// Scala 3 compiling fails with:
// [error] Modules were resolved with conflicting cross-version suffixes in ProjectRef(uri("file:/home/runner/work/zio-kafka/zio-kafka/"), "zioKafkaExample"):
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ addSbtPlugin("dev.zio" % "zio-sbt-website" % zioSbtVersion)
addSbtPlugin("dev.zio" % "zio-sbt-ci" % zioSbtVersion)

addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.12.1")
addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.1")
addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.2")
addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.10.4")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.1.3")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException }
import zio._
import zio.kafka.consumer.{ ConsumerSettings, Subscription }
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.metrics.{ MetricState, Metrics }
import zio.stream.{ Take, ZStream }
Expand All @@ -20,14 +20,15 @@ object RunloopSpec extends ZIOSpecDefault {
private type PartitionsHub = Hub[Take[Throwable, PartitionAssignment]]

private val tp10 = new TopicPartition("t1", 0)
private val tp11 = new TopicPartition("t1", 1)
private val key123 = "123".getBytes

private val consumerSettings = ConsumerSettings(List("bootstrap"))

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("RunloopSpec")(
test("runloop creates a new partition stream and polls for new records") {
withRunloop { (mockConsumer, partitionsHub, runloop) =>
withRunloop() { (mockConsumer, partitionsHub, runloop) =>
mockConsumer.schedulePollTask { () =>
mockConsumer.updateEndOffsets(Map(tp10 -> Long.box(0L)).asJava)
mockConsumer.rebalance(Seq(tp10).asJava)
Expand All @@ -52,8 +53,48 @@ object RunloopSpec extends ZIOSpecDefault {
)
}
},
test(
"runloop does not starts a new stream for partition which being revoked right after assignment within the same RebalanceEvent"
) {
Diagnostics.SlidingQueue.make(100).flatMap { diagnostics =>
withRunloop(diagnostics) { (mockConsumer, partitionsHub, runloop) =>
mockConsumer.schedulePollTask { () =>
mockConsumer.updateEndOffsets(Map(tp10 -> Long.box(0L), tp11 -> Long.box(0L)).asJava)
mockConsumer.rebalance(Seq(tp10, tp11).asJava)
mockConsumer.rebalance(Seq(tp10).asJava)
mockConsumer.addRecord(makeConsumerRecord(tp10, key123))
}
for {
streamStream <- ZStream.fromHubScoped(partitionsHub)
_ <- runloop.addSubscription(Subscription.Topics(Set(tp10, tp11).map(_.topic())))
_ <- streamStream
.map(_.exit)
.flattenExitOption
.flattenChunks
.take(1)
.mapZIO { case (_, stream) =>
stream.runHead
}
.runDrain
diagnosticEvents <- diagnostics.queue.takeAll
rebalanceEvents =
diagnosticEvents.collect { case rebalanceEvent: DiagnosticEvent.Rebalance =>
rebalanceEvent
}
} yield assertTrue(
rebalanceEvents.length == 1,
rebalanceEvents.head == DiagnosticEvent.Rebalance(
revoked = Set(tp11),
assigned = Set(tp10),
lost = Set.empty,
ended = Set.empty
)
)
}
}
},
test("runloop retries poll upon AuthorizationException and AuthenticationException") {
withRunloop { (mockConsumer, partitionsHub, runloop) =>
withRunloop() { (mockConsumer, partitionsHub, runloop) =>
mockConsumer.schedulePollTask { () =>
mockConsumer.updateEndOffsets(Map(tp10 -> Long.box(0L)).asJava)
mockConsumer.rebalance(Seq(tp10).asJava)
Expand Down Expand Up @@ -90,7 +131,7 @@ object RunloopSpec extends ZIOSpecDefault {
}
) @@ withLiveClock

private def withRunloop(
private def withRunloop(diagnostics: Diagnostics = Diagnostics.NoOp)(
f: (BinaryMockConsumer, PartitionsHub, Runloop) => ZIO[Scope, Throwable, TestResult]
): ZIO[Scope, Throwable, TestResult] =
ZIO.scoped {
Expand All @@ -105,7 +146,7 @@ object RunloopSpec extends ZIOSpecDefault {
consumerSettings,
100.millis,
100.millis,
Diagnostics.NoOp,
diagnostics,
consumerAccess,
partitionsHub
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,12 +825,17 @@ object Runloop {
): RebalanceEvent =
copy(
wasInvoked = true,
assignedTps = assignedTps -- revoked,
revokedTps = revokedTps ++ revoked,
endedStreams = this.endedStreams ++ endedStreams
)

def onLost(lost: Set[TopicPartition]): RebalanceEvent =
copy(wasInvoked = true, lostTps = lostTps ++ lost)
copy(
wasInvoked = true,
assignedTps = assignedTps -- lost,
lostTps = lostTps ++ lost
)
}

private object RebalanceEvent {
Expand Down

0 comments on commit 0a57cdc

Please sign in to comment.