Skip to content

Commit

Permalink
Re-implement Kinesis source without fs2-kinesis
Browse files Browse the repository at this point in the history
The common-streams Kinesis source suffers from a problem where we don't
quite achieve at-least-once processing semantics near the end of a
shard. The problem was in the 3rd-party fs-kinesis library, and it is
not easy to fix with any small code change to that library.

Sorry I cannot provide a link here to back that up -- it is documented
internally at Snowplow.

This PR re-implements our Kinesis source from scratch, this time without
a dependency on fs2-kinesis.  The biggest difference is the way we block
the `shardEnded` method of the KCL record processor, until all records
from the shard have been written to the destination.
  • Loading branch information
istreeter committed Sep 9, 2024
1 parent 65b7725 commit e983e5a
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import cats.effect.{IO, Ref}
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

import eu.timepit.refined.types.numeric.PosInt

import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
Expand Down Expand Up @@ -94,7 +92,6 @@ object Utils {
UUID.randomUUID.toString,
KinesisSourceConfig.InitialPosition.TrimHorizon,
KinesisSourceConfig.Retrieval.Polling(1),
PosInt.unsafeFrom(1),
Some(endpoint),
Some(endpoint),
Some(endpoint),
Expand Down
1 change: 0 additions & 1 deletion modules/kinesis/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ snowplow.defaults: {
type: "Polling"
maxRecords: 1000
}
bufferSize: 1
leaseDuration: "10 seconds"
}
}
Expand Down
Loading

0 comments on commit e983e5a

Please sign in to comment.