From e983e5aaea8c60470d0c7ad512dfe79a48fb8f19 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Sat, 3 Aug 2024 20:52:57 +0100 Subject: [PATCH] Re-implement Kinesis source without fs2-kinesis The common-streams Kinesis source suffers from a problem where we don't quite achieve at-least-once processing semantics near the end of a shard. The problem was in the 3rd-party fs-kinesis library, and it is not easy to fix with any small code change to that library. Sorry I cannot provide a link here to back that up -- it is documented internally at Snowplow. This PR re-implements our Kinesis source from scratch, this time without a dependency on fs2-kinesis. The biggest difference is the way we block the `shardEnded` method of the KCL record processor, until all records from the shard have been written to the destination. --- .../snowplow/sources/kinesis/Utils.scala | 3 - .../kinesis/src/main/resources/reference.conf | 1 - .../sources/kinesis/KinesisSource.scala | 274 +++++++++++------- .../sources/kinesis/KinesisSourceConfig.scala | 5 - .../kinesis/KinesisSourceConfigSpec.scala | 6 - project/Dependencies.scala | 20 +- 6 files changed, 174 insertions(+), 135 deletions(-) diff --git a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala index f2ed3d0a..fdcd7ae9 100644 --- a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala +++ b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala @@ -12,8 +12,6 @@ import cats.effect.{IO, Ref} import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ -import eu.timepit.refined.types.numeric.PosInt - import software.amazon.awssdk.core.SdkBytes import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.kinesis.KinesisAsyncClient @@ -94,7 +92,6 @@ object Utils { UUID.randomUUID.toString, KinesisSourceConfig.InitialPosition.TrimHorizon, KinesisSourceConfig.Retrieval.Polling(1), - PosInt.unsafeFrom(1), Some(endpoint), Some(endpoint), Some(endpoint), diff --git a/modules/kinesis/src/main/resources/reference.conf b/modules/kinesis/src/main/resources/reference.conf index adabb89a..ab3b6593 100644 --- a/modules/kinesis/src/main/resources/reference.conf +++ b/modules/kinesis/src/main/resources/reference.conf @@ -9,7 +9,6 @@ snowplow.defaults: { type: "Polling" maxRecords: 1000 } - bufferSize: 1 leaseDuration: "10 seconds" } } diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index bded23a3..d470a7e9 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -7,37 +7,44 @@ */ package com.snowplowanalytics.snowplow.sources.kinesis -import cats.{Applicative, Semigroup} +import cats.{Order, Semigroup} import cats.effect.{Async, Ref, Resource, Sync} import cats.effect.implicits._ import cats.implicits._ -import fs2.Stream -import fs2.aws.kinesis.{CommittableRecord, Kinesis, KinesisConsumerSettings} +import fs2.{Chunk, Pull, Stream} import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} import org.typelevel.log4cats.slf4j.Slf4jLogger import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode -import software.amazon.awssdk.regions.Region -import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.kinesis.KinesisAsyncClient -import software.amazon.kinesis.common.{InitialPositionInStream, InitialPositionInStreamExtended} - -import java.net.URI -import java.util.Date -import java.util.concurrent.Semaphore -import scala.concurrent.duration.FiniteDuration - -// kinesis -import software.amazon.kinesis.common.ConfigsBuilder -import software.amazon.kinesis.coordinator.Scheduler +import software.amazon.kinesis.common.{ConfigsBuilder, InitialPositionInStream, InitialPositionInStreamExtended} +import software.amazon.kinesis.coordinator.{Scheduler, WorkerStateChangeListener} import software.amazon.kinesis.exceptions.ShutdownException +import software.amazon.kinesis.lifecycle.events.{ + InitializationInput, + LeaseLostInput, + ProcessRecordsInput, + ShardEndedInput, + ShutdownRequestedInput +} import software.amazon.kinesis.metrics.MetricsLevel -import software.amazon.kinesis.processor.{ShardRecordProcessorFactory, SingleStreamTracker} +import software.amazon.kinesis.processor.{ + RecordProcessorCheckpointer, + ShardRecordProcessor, + ShardRecordProcessorFactory, + SingleStreamTracker +} import software.amazon.kinesis.retrieval.fanout.FanOutConfig import software.amazon.kinesis.retrieval.polling.PollingConfig +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber + +import java.net.URI +import java.util.Date +import java.util.concurrent.{CountDownLatch, SynchronousQueue} +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters._ -// snowplow import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource} import com.snowplowanalytics.snowplow.sources.SourceAndAck @@ -50,109 +57,160 @@ object KinesisSource { LowLevelSource.toSourceAndAck(lowLevel(config, livenessRef)) } - private type KinesisCheckpointer[F[_]] = Checkpointer[F, Map[String, KinesisMetadata[F]]] - - private implicit class RichCommitableRecord(val cr: CommittableRecord) extends AnyVal { - def toMetadata[F[_]: Sync]: KinesisMetadata[F] = - KinesisMetadata(cr.shardId, cr.sequenceNumber, cr.isLastInShard, cr.lastRecordSemaphore, cr.checkpoint) + sealed trait Checkpointable { + def extendedSequenceNumber: ExtendedSequenceNumber + } + private case class RecordCheckpointable(extendedSequenceNumber: ExtendedSequenceNumber, checkpointer: RecordProcessorCheckpointer) + extends Checkpointable + private case class ShardEndCheckpointable(checkpointer: RecordProcessorCheckpointer, release: CountDownLatch) extends Checkpointable { + override def extendedSequenceNumber: ExtendedSequenceNumber = ExtendedSequenceNumber.SHARD_END } - private final case class KinesisMetadata[F[_]]( - shardId: String, - sequenceNumber: String, - isLastInShard: Boolean, - lastRecordSemaphore: Semaphore, - ack: F[Unit] - ) + private type KinesisCheckpointer[F[_]] = Checkpointer[F, Map[String, Checkpointable]] private def lowLevel[F[_]: Async]( config: KinesisSourceConfig, livenessRef: Ref[F, FiniteDuration] - ): LowLevelSource[F, Map[String, KinesisMetadata[F]]] = - new LowLevelSource[F, Map[String, KinesisMetadata[F]]] { + ): LowLevelSource[F, Map[String, Checkpointable]] = + new LowLevelSource[F, Map[String, Checkpointable]] { def checkpointer: KinesisCheckpointer[F] = kinesisCheckpointer[F] - def stream: Stream[F, Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]]] = - Stream.emit(kinesisStream(config, livenessRef)) + def stream: Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = + kinesisStream(config, livenessRef) def lastLiveness: F[FiniteDuration] = livenessRef.get } - private implicit def metadataSemigroup[F[_]]: Semigroup[KinesisMetadata[F]] = new Semigroup[KinesisMetadata[F]] { - override def combine(x: KinesisMetadata[F], y: KinesisMetadata[F]): KinesisMetadata[F] = - if (x.sequenceNumber > y.sequenceNumber) x else y + private implicit def checkpointableOrder: Order[Checkpointable] = Order.from { case (a, b) => + a.extendedSequenceNumber.compareTo(b.extendedSequenceNumber) + } + + private implicit def checkpointableSemigroup: Semigroup[Checkpointable] = new Semigroup[Checkpointable] { + def combine(x: Checkpointable, y: Checkpointable): Checkpointable = + x.max(y) + } + + private def ignoreShutdownExceptions[F[_]: Sync](shardId: String): PartialFunction[Throwable, F[Unit]] = { case _: ShutdownException => + // The ShardRecordProcessor instance has been shutdown. This just means another KCL + // worker has stolen our lease. It is expected during autoscaling of instances, and is + // safe to ignore. + Logger[F].warn(s"Skipping checkpointing of shard $shardId because this worker no longer owns the lease") } private def kinesisCheckpointer[F[_]: Async]: KinesisCheckpointer[F] = new KinesisCheckpointer[F] { - def combine(x: Map[String, KinesisMetadata[F]], y: Map[String, KinesisMetadata[F]]): Map[String, KinesisMetadata[F]] = + def combine(x: Map[String, Checkpointable], y: Map[String, Checkpointable]): Map[String, Checkpointable] = x |+| y - val empty: Map[String, KinesisMetadata[F]] = Map.empty - def ack(c: Map[String, KinesisMetadata[F]]): F[Unit] = - c.values.toList - .parTraverse_ { metadata => - metadata.ack - .recoverWith { - case _: ShutdownException => - // The ShardRecordProcessor instance has been shutdown. This just means another KCL - // worker has stolen our lease. It is expected during autoscaling of instances, and is - // safe to ignore. - Logger[F].warn(s"Skipping checkpointing of shard ${metadata.shardId} because this worker no longer owns the lease") - - case _: IllegalArgumentException if metadata.isLastInShard => - // See https://github.com/snowplow/enrich/issues/657 - // This can happen at the shard end when KCL no longer allows checkpointing of the last record in the shard. - // We need to release the semaphore, so that fs2-aws handles checkpointing the end of the shard. - Logger[F].warn( - s"Checkpointing failed on last record in shard. Ignoring error and instead try checkpointing of the shard end" - ) *> - Sync[F].delay(metadata.lastRecordSemaphore.release()) - - case _: IllegalArgumentException if metadata.lastRecordSemaphore.availablePermits === 0 => - // See https://github.com/snowplow/enrich/issues/657 and https://github.com/snowplow/enrich/pull/658 - // This can happen near the shard end, e.g. the penultimate batch in the shard, when KCL has already enqueued the final record in the shard to the fs2 queue. - // We must not release the semaphore yet, because we are not ready for fs2-aws to checkpoint the end of the shard. - // We can safely ignore the exception and move on. - Logger[F].warn( - s"Checkpointing failed on a record which was not the last in the shard. Meanwhile, KCL has already enqueued the final record in the shard to the fs2 queue. Ignoring error and instead continue processing towards the shard end" - ) - } - } - def nack(c: Map[String, KinesisMetadata[F]]): F[Unit] = Applicative[F].unit + val empty: Map[String, Checkpointable] = Map.empty + + def ack(c: Map[String, Checkpointable]): F[Unit] = + c.toList.parTraverse_ { + case (shardId, RecordCheckpointable(extendedSequenceNumber, checkpointer)) => + Logger[F].debug(s"Checkpointing shard $shardId at $extendedSequenceNumber") *> + Sync[F] + .blocking( + checkpointer.checkpoint(extendedSequenceNumber.sequenceNumber, extendedSequenceNumber.subSequenceNumber) + ) + .recoverWith(ignoreShutdownExceptions(shardId)) + case (shardId, ShardEndCheckpointable(checkpointer, release)) => + Logger[F].debug(s"Checkpointing shard $shardId at SHARD_END") *> + Sync[F].blocking(checkpointer.checkpoint()).recoverWith(ignoreShutdownExceptions(shardId)) *> + Sync[F].delay(release.countDown) + } + + def nack(c: Map[String, Checkpointable]): F[Unit] = + Sync[F].unit } + private sealed trait KCLAction + private case class ProcessRecords(shardId: String, processRecordsInput: ProcessRecordsInput) extends KCLAction + private case class ShardEnd( + shardId: String, + await: CountDownLatch, + shardEndedInput: ShardEndedInput + ) extends KCLAction + private case class KCLError(t: Throwable) extends KCLAction + private def kinesisStream[F[_]: Async]( config: KinesisSourceConfig, livenessRef: Ref[F, FiniteDuration] - ): Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]] = + ): Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = + for { + kinesisClient <- Stream.resource(mkKinesisClient[F](config.customEndpoint)) + dynamoClient <- Stream.resource(mkDynamoDbClient[F](config.dynamodbCustomEndpoint)) + cloudWatchClient <- Stream.resource(mkCloudWatchClient[F](config.cloudwatchCustomEndpoint)) + queue = new SynchronousQueue[KCLAction] + scheduler <- Stream.eval(scheduler(kinesisClient, dynamoClient, cloudWatchClient, config, queue)) + _ <- Stream.resource(runRecordProcessor[F](scheduler)) + s <- Stream.emit(pullFromQueue(queue, livenessRef).stream).repeat + } yield s + + private def pullFromQueue[F[_]: Sync]( + queue: SynchronousQueue[KCLAction], + livenessRef: Ref[F, FiniteDuration] + ): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] = for { - region <- Stream.eval(Sync[F].delay((new DefaultAwsRegionProviderChain).getRegion)) - consumerSettings = KinesisConsumerSettings( - config.streamName, - config.appName, - region, - bufferSize = config.bufferSize - ) - kinesisClient <- Stream.resource(mkKinesisClient[F](region, config.customEndpoint)) - dynamoClient <- Stream.resource(mkDynamoDbClient[F](region, config.dynamodbCustomEndpoint)) - cloudWatchClient <- Stream.resource(mkCloudWatchClient[F](region, config.cloudwatchCustomEndpoint)) - kinesis = Kinesis.create(scheduler(kinesisClient, dynamoClient, cloudWatchClient, config, _)) - chunk <- kinesis.readChunkedFromKinesisStream(consumerSettings) - now <- Stream.eval(Sync[F].realTime) - _ <- Stream.eval(livenessRef.set(now)) - if chunk.nonEmpty - } yield { - val ack = chunk.asSeq - .groupBy(_.shardId) - .map { case (k, records) => - k -> records.maxBy(_.sequenceNumber).toMetadata[F] - } - .toMap - val earliestTstamp = chunk.iterator.map(_.record.approximateArrivalTimestamp).min - LowLevelEvents(chunk.map(_.record.data()), ack, Some(earliestTstamp)) + maybeE <- Pull.eval(Sync[F].delay(Option(queue.poll))) + e <- maybeE match { + case Some(e) => Pull.pure(e) + case None => Pull.eval(Sync[F].interruptible(queue.take)) + } + now <- Pull.eval(Sync[F].realTime) + _ <- Pull.eval(livenessRef.set(now)) + _ <- e match { + case ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty => + pullFromQueue[F](queue, livenessRef) + case ProcessRecords(shardId, processRecordsInput) => + val chunk = Chunk.javaList(processRecordsInput.records()).map(_.data()) + val lastRecord = processRecordsInput.records.asScala.last // last is safe because we handled the empty case above + val checkpointable = RecordCheckpointable( + new ExtendedSequenceNumber(lastRecord.sequenceNumber, lastRecord.subSequenceNumber), + processRecordsInput.checkpointer + ) + val next = + LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(lastRecord.approximateArrivalTimestamp)) + Pull.output1(next).covary[F] *> pullFromQueue[F](queue, livenessRef) + case ShardEnd(shardId, await, shardEndedInput) => + val checkpointable = ShardEndCheckpointable(shardEndedInput.checkpointer, await) + val last = LowLevelEvents(Chunk.empty, Map[String, Checkpointable](shardId -> checkpointable), None) + Pull + .eval(Logger[F].info(s"Ending this window of events early because reached the end of Kinesis shard $shardId")) + .covaryOutput *> + Pull.output1(last).covary[F] *> Pull.done + case KCLError(t) => Pull.raiseError[F](t) + } + } yield () + + private def runRecordProcessor[F[_]: Async](scheduler: Scheduler): Resource[F, Unit] = + Sync[F].blocking(scheduler.run()).background *> Resource.onFinalize(Sync[F].blocking(scheduler.shutdown())) + + private def shardRecordProcessor(queue: SynchronousQueue[KCLAction]): ShardRecordProcessor = new ShardRecordProcessor { + private var shardId: String = _ + + def initialize(initializationInput: InitializationInput): Unit = + shardId = initializationInput.shardId + + def shardEnded(shardEndedInput: ShardEndedInput): Unit = { + val countDownLatch = new CountDownLatch(1) + queue.put(ShardEnd(shardId, countDownLatch, shardEndedInput)) + countDownLatch.await() + } + + def processRecords(processRecordsInput: ProcessRecordsInput): Unit = { + val action = ProcessRecords(shardId, processRecordsInput) + queue.put(action) } + def leaseLost(leaseLostInput: LeaseLostInput): Unit = () + + def shutdownRequested(shutdownRequestedInput: ShutdownRequestedInput): Unit = () + } + + private def recordProcessorFactory(queue: SynchronousQueue[KCLAction]): ShardRecordProcessorFactory = { () => + shardRecordProcessor(queue) + } + private def initialPositionOf(config: KinesisSourceConfig.InitialPosition): InitialPositionInStreamExtended = config match { case KinesisSourceConfig.InitialPosition.Latest => InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST) @@ -167,7 +225,7 @@ object KinesisSource { dynamoDbClient: DynamoDbAsyncClient, cloudWatchClient: CloudWatchAsyncClient, kinesisConfig: KinesisSourceConfig, - recordProcessorFactory: ShardRecordProcessorFactory + queue: SynchronousQueue[KCLAction] ): F[Scheduler] = Sync[F].delay { val configsBuilder = @@ -178,7 +236,7 @@ object KinesisSource { dynamoDbClient, cloudWatchClient, kinesisConfig.workerIdentifier, - recordProcessorFactory + recordProcessorFactory(queue) ) val retrievalConfig = @@ -202,9 +260,16 @@ object KinesisSource { configsBuilder.processorConfig .callProcessRecordsEvenForEmptyRecordList(true) + val coordinatorConfig = configsBuilder.coordinatorConfig + .workerStateChangeListener(new WorkerStateChangeListener { + def onWorkerStateChange(newState: WorkerStateChangeListener.WorkerState): Unit = () + override def onAllInitializationAttemptsFailed(e: Throwable): Unit = + queue.put(KCLError(e)) + }) + new Scheduler( configsBuilder.checkpointConfig, - configsBuilder.coordinatorConfig, + coordinatorConfig, leaseManagementConfig, configsBuilder.lifecycleConfig, configsBuilder.metricsConfig.metricsLevel(MetricsLevel.NONE), @@ -213,39 +278,36 @@ object KinesisSource { ) } - private def mkKinesisClient[F[_]: Sync](region: Region, customEndpoint: Option[URI]): Resource[F, KinesisAsyncClient] = + private def mkKinesisClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, KinesisAsyncClient] = Resource.fromAutoCloseable { - Sync[F].delay { + Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint val builder = KinesisAsyncClient .builder() - .region(region) .defaultsMode(DefaultsMode.AUTO) val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) customized.build } } - private def mkDynamoDbClient[F[_]: Sync](region: Region, customEndpoint: Option[URI]): Resource[F, DynamoDbAsyncClient] = + private def mkDynamoDbClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, DynamoDbAsyncClient] = Resource.fromAutoCloseable { - Sync[F].delay { + Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint val builder = DynamoDbAsyncClient .builder() - .region(region) .defaultsMode(DefaultsMode.AUTO) val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) customized.build } } - private def mkCloudWatchClient[F[_]: Sync](region: Region, customEndpoint: Option[URI]): Resource[F, CloudWatchAsyncClient] = + private def mkCloudWatchClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, CloudWatchAsyncClient] = Resource.fromAutoCloseable { - Sync[F].delay { + Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint val builder = CloudWatchAsyncClient .builder() - .region(region) .defaultsMode(DefaultsMode.AUTO) val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) customized.build diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala index 33161b57..5667e204 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala @@ -7,8 +7,6 @@ */ package com.snowplowanalytics.snowplow.sources.kinesis -import eu.timepit.refined.types.all.PosInt - import io.circe._ import io.circe.config.syntax._ import io.circe.generic.extras.semiauto.deriveConfiguredDecoder @@ -24,7 +22,6 @@ case class KinesisSourceConfig( workerIdentifier: String, initialPosition: KinesisSourceConfig.InitialPosition, retrievalMode: KinesisSourceConfig.Retrieval, - bufferSize: PosInt, customEndpoint: Option[URI], dynamodbCustomEndpoint: Option[URI], cloudwatchCustomEndpoint: Option[URI], @@ -33,8 +30,6 @@ case class KinesisSourceConfig( object KinesisSourceConfig { - private implicit val posIntDecoder: Decoder[PosInt] = Decoder.decodeInt.emap(PosInt.from) - sealed trait InitialPosition object InitialPosition { diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala index 892236f2..ab0c4d6c 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala @@ -9,7 +9,6 @@ package com.snowplowanalytics.snowplow.sources.kinesis import io.circe.literal._ import com.typesafe.config.ConfigFactory -import eu.timepit.refined.types.all.PosInt import io.circe.config.syntax.CirceConfigOps import io.circe.Decoder import io.circe.generic.semiauto._ @@ -40,7 +39,6 @@ class KinesisSourceConfigSpec extends Specification { "initialPosition": { "type": "TrimHorizon" }, - "bufferSize": 42, "leaseDuration": "20 seconds" } """ @@ -52,7 +50,6 @@ class KinesisSourceConfigSpec extends Specification { c.workerIdentifier must beEqualTo("my-identifier"), c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon), c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)), - c.bufferSize.value must beEqualTo(42), c.leaseDuration must beEqualTo(20.seconds) ).reduce(_ and _) } @@ -71,7 +68,6 @@ class KinesisSourceConfigSpec extends Specification { "initialPosition": { "type": "TRIM_HORIZON" }, - "bufferSize": 42, "leaseDuration": "20 seconds" } """ @@ -83,7 +79,6 @@ class KinesisSourceConfigSpec extends Specification { c.workerIdentifier must beEqualTo("my-identifier"), c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon), c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)), - c.bufferSize.value must beEqualTo(42), c.leaseDuration must beEqualTo(20.seconds) ).reduce(_ and _) } @@ -108,7 +103,6 @@ class KinesisSourceConfigSpec extends Specification { workerIdentifier = System.getenv("HOSTNAME"), initialPosition = KinesisSourceConfig.InitialPosition.Latest, retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), - bufferSize = PosInt.unsafeFrom(1), customEndpoint = None, dynamodbCustomEndpoint = None, cloudwatchCustomEndpoint = None, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f9233a97..c905f44f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -24,12 +24,10 @@ object Dependencies { val betterMonadicFor = "0.3.1" val kindProjector = "0.13.2" val collectionCompat = "2.11.0" - val refined = "0.11.1" // Streams val fs2Kafka = "3.4.0" val pubsub = "1.127.3" - val fs2AwsKinesis = "4.1.0" val awsSdk2 = "2.25.16" val kinesisClient = "2.5.7" @@ -67,18 +65,14 @@ object Dependencies { val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % V.betterMonadicFor val kindProjector = "org.typelevel" %% "kind-projector" % V.kindProjector cross CrossVersion.full val collectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % V.collectionCompat - val refined = "eu.timepit" %% "refined" % V.refined // streams - val fs2Kafka = "com.github.fd4s" %% "fs2-kafka" % V.fs2Kafka - val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub - val fs2AwsKinesis = ("io.laserdisc" %% "fs2-aws-kinesis" % V.fs2AwsKinesis) - .exclude("software.amazon.kinesis", "amazon-kinesis-client") - .exclude("com.amazonaws", "amazon-kinesis-producer") - val arnsSdk2 = "software.amazon.awssdk" % "arns" % V.awsSdk2 - val kinesisSdk2 = "software.amazon.awssdk" % "kinesis" % V.awsSdk2 - val dynamoDbSdk2 = "software.amazon.awssdk" % "dynamodb" % V.awsSdk2 - val cloudwatchSdk2 = "software.amazon.awssdk" % "cloudwatch" % V.awsSdk2 + val fs2Kafka = "com.github.fd4s" %% "fs2-kafka" % V.fs2Kafka + val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub + val arnsSdk2 = "software.amazon.awssdk" % "arns" % V.awsSdk2 + val kinesisSdk2 = "software.amazon.awssdk" % "kinesis" % V.awsSdk2 + val dynamoDbSdk2 = "software.amazon.awssdk" % "dynamodb" % V.awsSdk2 + val cloudwatchSdk2 = "software.amazon.awssdk" % "cloudwatch" % V.awsSdk2 val kinesisClient = ("software.amazon.kinesis" % "amazon-kinesis-client" % V.kinesisClient) .exclude("com.amazonaws", "amazon-kinesis-producer") .exclude("software.amazon.glue", "schema-registry-build-tools") @@ -124,12 +118,10 @@ object Dependencies { val kinesisDependencies = Seq( kinesisClient, - fs2AwsKinesis, arnsSdk2, kinesisSdk2, dynamoDbSdk2, cloudwatchSdk2, - refined, circeConfig, circeGeneric, circeGenericExtra,