Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-implement Kinesis source without fs2-kinesis #84

Merged
merged 2 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import cats.effect.{IO, Ref}
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

import eu.timepit.refined.types.numeric.PosInt

import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
Expand Down Expand Up @@ -94,7 +92,6 @@ object Utils {
UUID.randomUUID.toString,
KinesisSourceConfig.InitialPosition.TrimHorizon,
KinesisSourceConfig.Retrieval.Polling(1),
PosInt.unsafeFrom(1),
Some(endpoint),
Some(endpoint),
Some(endpoint),
Expand Down
1 change: 0 additions & 1 deletion modules/kinesis/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ snowplow.defaults: {
type: "Polling"
maxRecords: 1000
}
bufferSize: 1
leaseDuration: "10 seconds"
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.kinesis

import cats.implicits._
import cats.{Order, Semigroup}
import software.amazon.kinesis.processor.RecordProcessorCheckpointer
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber

import java.util.concurrent.CountDownLatch

private sealed trait Checkpointable {
def extendedSequenceNumber: ExtendedSequenceNumber
}

private object Checkpointable {
final case class Record(extendedSequenceNumber: ExtendedSequenceNumber, checkpointer: RecordProcessorCheckpointer) extends Checkpointable

final case class ShardEnd(checkpointer: RecordProcessorCheckpointer, release: CountDownLatch) extends Checkpointable {
override def extendedSequenceNumber: ExtendedSequenceNumber = ExtendedSequenceNumber.SHARD_END
}

implicit def checkpointableOrder: Order[Checkpointable] = Order.from { case (a, b) =>
a.extendedSequenceNumber.compareTo(b.extendedSequenceNumber)
}

implicit def checkpointableSemigroup: Semigroup[Checkpointable] = new Semigroup[Checkpointable] {
def combine(x: Checkpointable, y: Checkpointable): Checkpointable =
x.max(y)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.kinesis

import software.amazon.kinesis.lifecycle.events.{ProcessRecordsInput, ShardEndedInput}

import java.util.concurrent.CountDownLatch

private sealed trait KCLAction

private object KCLAction {

final case class ProcessRecords(shardId: String, processRecordsInput: ProcessRecordsInput) extends KCLAction
final case class ShardEnd(
shardId: String,
await: CountDownLatch,
shardEndedInput: ShardEndedInput
) extends KCLAction
final case class KCLError(t: Throwable) extends KCLAction

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.kinesis

import cats.effect.implicits._
import cats.effect.{Async, Resource, Sync}
import cats.implicits._
import com.snowplowanalytics.snowplow.sources.kinesis.KCLAction.KCLError
import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.kinesis.common.{ConfigsBuilder, InitialPositionInStream, InitialPositionInStreamExtended}
import software.amazon.kinesis.coordinator.{Scheduler, WorkerStateChangeListener}
import software.amazon.kinesis.metrics.MetricsLevel
import software.amazon.kinesis.processor.SingleStreamTracker
import software.amazon.kinesis.retrieval.fanout.FanOutConfig
import software.amazon.kinesis.retrieval.polling.PollingConfig

import java.net.URI
import java.util.Date
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.atomic.AtomicReference

private[kinesis] object KCLScheduler {

def populateQueue[F[_]: Async](
config: KinesisSourceConfig,
queue: SynchronousQueue[KCLAction]
): Resource[F, Unit] =
for {
kinesis <- mkKinesisClient[F](config.customEndpoint)
dynamo <- mkDynamoDbClient[F](config.dynamodbCustomEndpoint)
cloudWatch <- mkCloudWatchClient[F](config.cloudwatchCustomEndpoint)
scheduler <- Resource.eval(mkScheduler(kinesis, dynamo, cloudWatch, config, queue))
_ <- runInBackground(scheduler)
} yield ()

private def mkScheduler[F[_]: Sync](
kinesisClient: KinesisAsyncClient,
dynamoDbClient: DynamoDbAsyncClient,
cloudWatchClient: CloudWatchAsyncClient,
kinesisConfig: KinesisSourceConfig,
queue: SynchronousQueue[KCLAction]
): F[Scheduler] =
Sync[F].delay {
val configsBuilder =
new ConfigsBuilder(
kinesisConfig.streamName,
kinesisConfig.appName,
kinesisClient,
dynamoDbClient,
cloudWatchClient,
kinesisConfig.workerIdentifier,
() => ShardRecordProcessor(queue, new AtomicReference(Set.empty[String]))
)

val retrievalConfig =
configsBuilder.retrievalConfig
.streamTracker(new SingleStreamTracker(kinesisConfig.streamName, initialPositionOf(kinesisConfig.initialPosition)))
.retrievalSpecificConfig {
kinesisConfig.retrievalMode match {
case KinesisSourceConfig.Retrieval.FanOut =>
new FanOutConfig(kinesisClient).streamName(kinesisConfig.streamName).applicationName(kinesisConfig.appName)
case KinesisSourceConfig.Retrieval.Polling(maxRecords) =>
new PollingConfig(kinesisConfig.streamName, kinesisClient).maxRecords(maxRecords)
}
}

val leaseManagementConfig =
configsBuilder.leaseManagementConfig
.failoverTimeMillis(kinesisConfig.leaseDuration.toMillis)

// We ask to see empty batches, so that we can update the health check even when there are no records in the stream
val processorConfig =
configsBuilder.processorConfig
.callProcessRecordsEvenForEmptyRecordList(true)

val coordinatorConfig = configsBuilder.coordinatorConfig
.workerStateChangeListener(new WorkerStateChangeListener {
def onWorkerStateChange(newState: WorkerStateChangeListener.WorkerState): Unit = ()
override def onAllInitializationAttemptsFailed(e: Throwable): Unit =
queue.put(KCLError(e))
})

new Scheduler(
configsBuilder.checkpointConfig,
coordinatorConfig,
leaseManagementConfig,
configsBuilder.lifecycleConfig,
configsBuilder.metricsConfig.metricsLevel(MetricsLevel.NONE),
processorConfig,
retrievalConfig
)
}

private def runInBackground[F[_]: Async](scheduler: Scheduler): Resource[F, Unit] =
Sync[F].blocking(scheduler.run()).background *> Resource.onFinalize(Sync[F].blocking(scheduler.shutdown()))

private def initialPositionOf(config: KinesisSourceConfig.InitialPosition): InitialPositionInStreamExtended =
config match {
case KinesisSourceConfig.InitialPosition.Latest => InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)
case KinesisSourceConfig.InitialPosition.TrimHorizon =>
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
case KinesisSourceConfig.InitialPosition.AtTimestamp(instant) =>
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(instant))
}

private def mkKinesisClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, KinesisAsyncClient] =
Resource.fromAutoCloseable {
Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint
val builder =
KinesisAsyncClient
.builder()
.defaultsMode(DefaultsMode.AUTO)
val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder)
customized.build
}
}

private def mkDynamoDbClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, DynamoDbAsyncClient] =
Resource.fromAutoCloseable {
Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint
val builder =
DynamoDbAsyncClient
.builder()
.defaultsMode(DefaultsMode.AUTO)
val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder)
customized.build
}
}

private def mkCloudWatchClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, CloudWatchAsyncClient] =
Resource.fromAutoCloseable {
Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint
val builder =
CloudWatchAsyncClient
.builder()
.defaultsMode(DefaultsMode.AUTO)
val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder)
customized.build
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.kinesis

import cats.effect.{Async, Sync}
import cats.implicits._
import cats.effect.implicits._
import com.snowplowanalytics.snowplow.sources.internal.Checkpointer
import org.typelevel.log4cats.Logger
import software.amazon.kinesis.exceptions.ShutdownException
import software.amazon.kinesis.processor.RecordProcessorCheckpointer
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber

import java.util.concurrent.CountDownLatch

private class KinesisCheckpointer[F[_]: Async: Logger] extends Checkpointer[F, Map[String, Checkpointable]] {

override val empty: Map[String, Checkpointable] = Map.empty

override def combine(x: Map[String, Checkpointable], y: Map[String, Checkpointable]): Map[String, Checkpointable] =
x |+| y

override def ack(c: Map[String, Checkpointable]): F[Unit] =
c.toList.parTraverse_ {
case (shardId, Checkpointable.Record(extendedSequenceNumber, checkpointer)) =>
checkpointRecord(shardId, extendedSequenceNumber, checkpointer)
case (shardId, Checkpointable.ShardEnd(checkpointer, release)) =>
checkpointShardEnd(shardId, checkpointer, release)
}

override def nack(c: Map[String, Checkpointable]): F[Unit] =
Sync[F].unit

private def checkpointShardEnd(
shardId: String,
checkpointer: RecordProcessorCheckpointer,
release: CountDownLatch
) =
Logger[F].debug(s"Checkpointing shard $shardId at SHARD_END") *>
Sync[F].blocking(checkpointer.checkpoint()).recoverWith(ignoreShutdownExceptions(shardId)) *>
Sync[F].delay(release.countDown())

private def checkpointRecord(
shardId: String,
extendedSequenceNumber: ExtendedSequenceNumber,
checkpointer: RecordProcessorCheckpointer
) =
Logger[F].debug(s"Checkpointing shard $shardId at $extendedSequenceNumber") *>
Sync[F]
.blocking(
checkpointer.checkpoint(extendedSequenceNumber.sequenceNumber, extendedSequenceNumber.subSequenceNumber)
)
.recoverWith(ignoreShutdownExceptions(shardId))

private def ignoreShutdownExceptions(shardId: String): PartialFunction[Throwable, F[Unit]] = { case _: ShutdownException =>
// The ShardRecordProcessor instance has been shutdown. This just means another KCL
// worker has stolen our lease. It is expected during autoscaling of instances, and is
// safe to ignore.
Logger[F].warn(s"Skipping checkpointing of shard $shardId because this worker no longer owns the lease")
}
}
Loading
Loading