Skip to content

Commit

Permalink
Merge branch 'master' into serdes-renames
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland authored Nov 12, 2024
2 parents 3a856dc + 18aa941 commit 769a1a3
Show file tree
Hide file tree
Showing 11 changed files with 369 additions and 76 deletions.
24 changes: 20 additions & 4 deletions .github/workflows/benchs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ jobs:
if: ${{ !github.event.pull_request.head.repo.fork }} # comes from https://github.com/orgs/community/discussions/25217#discussioncomment-3246904
steps:
- name: Checkout current branch
uses: actions/checkout@v4.1.1
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup Java
uses: actions/setup-java@v4.2.1
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: 17
java-version: 21
check-latest: true
- uses: VirtusLab/scala-cli-setup@main
- name: Use CI sbt jvmopts
shell: bash
run: |
Expand Down Expand Up @@ -74,7 +75,7 @@ jobs:
cat .jvmopts
- name: Store benchmark result
uses: benchmark-action/github-action-benchmark@v1.20.4
uses: benchmark-action/github-action-benchmark@v1
with:
name: JMH Benchmark
tool: 'jmh'
Expand All @@ -88,3 +89,18 @@ jobs:
fail-on-alert: true
# Mention these maintainers in the commit comment
alert-comment-cc-users: '@svroonland,@guizmaii,@erikvanoosten'

- name: Prune benchmark history
if: ${{ github.event_name != 'pull_request' }}
run: |
echo "::group::Checkout gh-pages branch"
git clean -fdx
git checkout gh-pages
git branch --set-upstream-to=origin/gh-pages gh-pages
git pull --rebase
git config --global user.name "zio-kafka CI"
git config --global user.email "ziokafkaci@users.noreply.github.com"
echo "::endgroup::"
echo "Prune benchmark history"
scala-cli scripts/prune-benchmark-history.sc
git push
8 changes: 4 additions & 4 deletions .github/workflows/profile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ jobs:
runs-on: ubuntu-latest
if: ${{ !github.event.pull_request.head.repo.fork }} # comes from https://github.com/orgs/community/discussions/25217#discussioncomment-3246904
steps:
- uses: actions/checkout@v4.1.1
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup Java
uses: actions/setup-java@v4.2.1
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: 17
java-version: 21
check-latest: true
- uses: coursier/cache-action@v6.4.5
- uses: coursier/cache-action@v6
- uses: VirtusLab/scala-cli-setup@main
- name: Use CI sbt jvmopts
shell: bash
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/scala-steward.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
name: Scala Steward
steps:
- name: Scala Steward
uses: scala-steward-org/scala-steward-action@v2.70.0
uses: scala-steward-org/scala-steward-action@v2.71.0
with:
github-app-id: ${{ secrets.SCALA_STEWARD_GITHUB_APP_ID }}
github-app-installation-id: ${{ secrets.SCALA_STEWARD_GITHUB_APP_INSTALLATION_ID }}
Expand Down
16 changes: 0 additions & 16 deletions prune-benchmark-history.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
clientId = clientId,
groupId = Some(groupId),
`max.poll.records` = 1,
rebalanceSafeCommits = rebalanceSafeCommits
rebalanceSafeCommits = rebalanceSafeCommits,
maxRebalanceDuration = 60.seconds
)
consumer <- Consumer.make(settings)
} yield consumer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import zio._
import zio.kafka.consumer.CommittableRecord
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.test._

import java.util.concurrent.TimeoutException

object PartitionStreamControlSpec extends ZIOSpecDefault {
override def spec: Spec[Any, Throwable] = suite("PartitionStreamControl")(
suite("Queue info")(
test("offerRecords updates queue size correctly") {
for {
control <- createTestControl
records = createTestRecords(3)
_ <- control.offerRecords(records)
size <- control.queueSize
} yield assertTrue(size == 3)
},
test("empty offerRecords updates outstandingPolls") {
for {
control <- createTestControl
_ <- control.offerRecords(Chunk.empty)
_ <- control.offerRecords(Chunk.empty)
polls <- control.outstandingPolls
} yield assertTrue(polls == 2)
},
test("multiple offers accumulate correctly") {
for {
control <- createTestControl
_ <- control.offerRecords(createTestRecords(2))
_ <- control.offerRecords(createTestRecords(3))
size <- control.queueSize
} yield assertTrue(size == 5)
}
),
// Stream Control Tests
suite("Stream control")(
test("offered records end up in the stream") {
for {
control <- createTestControl
records = createTestRecords(3)
_ <- control.offerRecords(records)
stream = control.stream
pulledRecords <- stream.take(3).runCollect
} yield assertTrue(records == pulledRecords)
},
test("end will end the stream") {
for {
control <- createTestControl
records = createTestRecords(3)
_ <- control.offerRecords(records)
_ <- control.end
pulledRecords <- control.stream.runCollect
} yield assertTrue(records == pulledRecords)
},
test("halt completes the stream with a TimeoutException") {
for {
control <- createTestControl
_ <- control.halt
result <- control.stream.runCollect.exit
} yield assertTrue(result.isFailure, result.causeOrNull.squash.isInstanceOf[TimeoutException])
},
test("lost clears queue and ends stream") {
for {
control <- createTestControl
_ <- control.offerRecords(createTestRecords(5))
_ <- control.lost
_ <- control.stream.runCollect
size <- control.queueSize
completed <- control.isCompleted
} yield assertTrue(size == 0, completed)
},
test("finalizing the stream will set isCompleted") {
for {
control <- createTestControl
initialIsCompleted <- control.isCompleted
records = createTestRecords(3)
_ <- control.offerRecords(records)
_ <- control.end
_ <- control.stream.runCollect
finalIsCompleted <- control.isCompleted
} yield assertTrue(!initialIsCompleted, finalIsCompleted)
},
test("pulling from the stream will request data") {
ZIO.scoped {
for {
requested <- Promise.make[Nothing, Unit]
control <- createTestControlWithRequestData(requested.succeed(()))
_ <- control.stream.runCollect.forkScoped
_ <- requested.await
} yield assertCompletes
}
},
test("pulling from the stream when there are records in the queue will request additional data") {
ZIO.scoped {
for {
requested <- Promise.make[Nothing, Unit]
control <- createTestControlWithRequestData(requested.succeed(()))
records = createTestRecords(3)
_ <- control.offerRecords(records)
_ <- control.stream.runCollect.forkScoped
_ <- requested.await
} yield assertCompletes
}
}
),
suite("Poll deadline")(
test("maxPollIntervalExceeded returns false initially") {
for {
control <- createTestControl
now <- Clock.nanoTime
exceeded <- control.maxPollIntervalExceeded(now)
} yield assertTrue(!exceeded)
},
test("maxPollIntervalExceeded returns true after timeout") {
for {
control <- createTestControl
_ <- control.offerRecords(createTestRecords(1))
now <- Clock.nanoTime
futureTime = now + Duration.fromSeconds(31).toNanos
exceeded <- control.maxPollIntervalExceeded(futureTime)
} yield assertTrue(exceeded)
}
),
suite("Offset Tracking")(
test("lastPulledOffset updates correctly after each pull") {
for {
control <- createTestControl
records = createTestRecords(6)
_ <- control.offerRecords(records.take(3))
offerNextBatch = control.offerRecords(records.slice(3, 6))

offsetsAfterChunks <- Ref.make(Chunk.empty[Option[Long]])
_ <- {
def updateLastPulledOffsets =
control.lastPulledOffset.flatMap(offset => offsetsAfterChunks.update(_ :+ offset.map(_.offset)))

updateLastPulledOffsets *> control.stream
.mapChunksZIO(updateLastPulledOffsets *> offerNextBatch.as(_))
.take(6)
.runCollect
}
lastPulledOffsets <- offsetsAfterChunks.get
} yield assertTrue(lastPulledOffsets == Chunk(None, Some(3L), Some(6L)))
}
)
) @@ TestAspect.withLiveClock @@ TestAspect.timeout(1.minute)

private def createTestControl: ZIO[Any, Nothing, PartitionStreamControl] =
createTestControlWithRequestData(ZIO.unit)

private def createTestControlWithRequestData(requestData: UIO[Any]): ZIO[Any, Nothing, PartitionStreamControl] = {
val tp = new TopicPartition("test-topic", 0)
val diagnostics = Diagnostics.NoOp
PartitionStreamControl.newPartitionStream(
tp,
requestData.unit,
diagnostics,
Duration.fromSeconds(30)
)
}

private def createTestRecords(count: Int): Chunk[ByteArrayCommittableRecord] =
Chunk.fromIterable(
(1 to count).map(i =>
CommittableRecord(
record = new ConsumerRecord[Array[Byte], Array[Byte]](
"test-topic",
0,
i.toLong,
Array[Byte](),
Array[Byte]()
),
commitHandle = _ => ZIO.unit,
consumerGroupMetadata = None
)
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,9 @@ object RunloopSpec extends ZIOSpecDefaultSlf4j {
): ZIO[Scope, Throwable, TestResult] =
ZIO.scoped {
for {
consumerAccess <- ConsumerAccess.make(mockConsumer)
consumerScope <- ZIO.scope
access <- Semaphore.make(1)
consumerAccess = new ConsumerAccess(mockConsumer, access)
consumerScope <- ZIO.scope
partitionsHub <- ZIO
.acquireRelease(Hub.unbounded[Take[Throwable, PartitionAssignment]])(_.shutdown)
.provide(ZLayer.succeed(consumerScope))
Expand Down
45 changes: 42 additions & 3 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,54 @@ object Consumer {
*
* You are responsible for creating and closing the KafkaConsumer. Make sure auto.commit is disabled.
*/
@deprecated("Use fromJavaConsumerWithPermit", since = "2.8.4")
def fromJavaConsumer(
javaConsumer: JConsumer[Array[Byte], Array[Byte]],
settings: ConsumerSettings,
diagnostics: Diagnostics = Diagnostics.NoOp
): ZIO[Scope, Throwable, Consumer] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized))
consumerAccess <- ConsumerAccess.make(javaConsumer)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics)
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized))
access <- Semaphore.make(1)
consumerAccess = new ConsumerAccess(javaConsumer, access)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics)
} yield new ConsumerLive(consumerAccess, runloopAccess)

/**
* Create a zio-kafka [[Consumer]] from an `org.apache.kafka KafkaConsumer`.
*
* You are responsible for all of the following:
* - creating and closing the `KafkaConsumer`,
* - making sure `auto.commit` is disabled,
* - creating `access` as a fair semaphore with a single permit,
* - acquire a permit from `access` before using the consumer, and release if afterwards,
* - not using the following consumer methods: `subscribe`, `unsubscribe`, `assign`, `poll`, `commit*`, `seek`,
* `pause`, `resume`, and `enforceRebalance`.
*
* Any deviation of these rules is likely to cause hard to track errors.
*
* Semaphore `access` is shared between you and the zio-kafka consumer. Use it as short as possible; while you hold a
* permit the zio-kafka consumer is blocked.
*
* @param javaConsumer
* Consumer
* @param settings
* Settings
* @param access
* A Semaphore with 1 permit.
* @param diagnostics
* Optional diagnostics listener
*/
def fromJavaConsumerWithPermit(
javaConsumer: JConsumer[Array[Byte], Array[Byte]],
settings: ConsumerSettings,
access: Semaphore,
diagnostics: Diagnostics = Diagnostics.NoOp
): ZIO[Scope, Throwable, Consumer] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized))
consumerAccess = new ConsumerAccess(javaConsumer, access)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics)
} yield new ConsumerLive(consumerAccess, runloopAccess)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private[consumer] object ConsumerAccess {

def make(settings: ConsumerSettings): ZIO[Scope, Throwable, ConsumerAccess] =
for {
access <- Semaphore.make(1)
consumer <- ZIO.acquireRelease {
ZIO.attemptBlocking {
new KafkaConsumer[Array[Byte], Array[Byte]](
Expand All @@ -59,13 +60,7 @@ private[consumer] object ConsumerAccess {
)
}
} { consumer =>
ZIO.blocking(ZIO.attempt(consumer.close(settings.closeTimeout))).orDie
ZIO.blocking(access.withPermit(ZIO.attempt(consumer.close(settings.closeTimeout)))).orDie
}
result <- make(consumer)
} yield result

def make(consumer: ByteArrayKafkaConsumer): ZIO[Scope, Throwable, ConsumerAccess] =
for {
access <- Semaphore.make(1)
} yield new ConsumerAccess(consumer, access)
}
Loading

0 comments on commit 769a1a3

Please sign in to comment.