Skip to content

Commit

Permalink
Kinesis source must report its latency when KCL is not healthy
Browse files Browse the repository at this point in the history
The KCL is sadly not very good at crashing and exiting. If the
underlying Kinesis client has errors (e.g. permissions errors) then KCL
tends to stay alive and not propagate the exceptions to our application
code. We want the app to crash under these circumstances because that
triggers an alert.

common-streams already has a health check feature, in which a health
probe becomes unhealthy if a single event gets stuck without making
progress.

This PR leans on the existing health check feature, so it also becomes
unhealthy if the Kinesis client is not regularly receiving healthy
responses.

I configured KCL to invoke our record processor every time it polls for
records, even if the batch is empty. This means the health check still
works even if there are no events in the stream.
  • Loading branch information
istreeter committed May 16, 2024
1 parent 4a154bf commit e4b0198
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.reflect._

import java.nio.ByteBuffer
import java.time.Instant
import scala.concurrent.duration.FiniteDuration

// kafka
import fs2.kafka._
Expand Down Expand Up @@ -49,6 +50,9 @@ object KafkaSource {

def stream: Stream[F, Stream[F, LowLevelEvents[KafkaCheckpoints[F]]]] =
kafkaStream(config, authHandlerClass)

def lastLiveness: F[FiniteDuration] =
Sync[F].realTime
}

case class OffsetAndCommit[F[_]](offset: Long, commit: F[Unit])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
*/
package com.snowplowanalytics.snowplow.sources.kinesis

import cats._
import cats.effect.{Async, Resource, Sync}
import cats.{Applicative, Semigroup}
import cats.effect.{Async, Ref, Resource, Sync}
import cats.effect.implicits._
import cats.implicits._
import fs2.Stream
import fs2.aws.kinesis.{CommittableRecord, Kinesis, KinesisConsumerSettings}
Expand All @@ -25,6 +26,7 @@ import software.amazon.kinesis.common.{InitialPositionInStream, InitialPositionI
import java.net.URI
import java.util.Date
import java.util.concurrent.Semaphore
import scala.concurrent.duration.FiniteDuration

// kinesis
import software.amazon.kinesis.common.ConfigsBuilder
Expand All @@ -43,8 +45,10 @@ object KinesisSource {

private implicit def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F]

def build[F[_]: Parallel: Async](config: KinesisSourceConfig): F[SourceAndAck[F]] =
LowLevelSource.toSourceAndAck(lowLevel(config))
def build[F[_]: Async](config: KinesisSourceConfig): F[SourceAndAck[F]] =
Ref.ofEffect(Sync[F].realTime).flatMap { livenessRef =>
LowLevelSource.toSourceAndAck(lowLevel(config, livenessRef))
}

private type KinesisCheckpointer[F[_]] = Checkpointer[F, Map[String, KinesisMetadata[F]]]

Expand All @@ -61,20 +65,26 @@ object KinesisSource {
ack: F[Unit]
)

private def lowLevel[F[_]: Parallel: Async](config: KinesisSourceConfig): LowLevelSource[F, Map[String, KinesisMetadata[F]]] =
private def lowLevel[F[_]: Async](
config: KinesisSourceConfig,
livenessRef: Ref[F, FiniteDuration]
): LowLevelSource[F, Map[String, KinesisMetadata[F]]] =
new LowLevelSource[F, Map[String, KinesisMetadata[F]]] {
def checkpointer: KinesisCheckpointer[F] = kinesisCheckpointer[F]

def stream: Stream[F, Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]]] =
Stream.emit(kinesisStream(config))
Stream.emit(kinesisStream(config, livenessRef))

def lastLiveness: F[FiniteDuration] =
livenessRef.get
}

private implicit def metadataSemigroup[F[_]]: Semigroup[KinesisMetadata[F]] = new Semigroup[KinesisMetadata[F]] {
override def combine(x: KinesisMetadata[F], y: KinesisMetadata[F]): KinesisMetadata[F] =
if (x.sequenceNumber > y.sequenceNumber) x else y
}

private def kinesisCheckpointer[F[_]: Parallel: Sync]: KinesisCheckpointer[F] = new KinesisCheckpointer[F] {
private def kinesisCheckpointer[F[_]: Async]: KinesisCheckpointer[F] = new KinesisCheckpointer[F] {
def combine(x: Map[String, KinesisMetadata[F]], y: Map[String, KinesisMetadata[F]]): Map[String, KinesisMetadata[F]] =
x |+| y

Expand Down Expand Up @@ -112,44 +122,36 @@ object KinesisSource {
def nack(c: Map[String, KinesisMetadata[F]]): F[Unit] = Applicative[F].unit
}

private def kinesisStream[F[_]: Async](config: KinesisSourceConfig): Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]] = {
val resources =
for {
region <- Resource.eval(Sync[F].delay((new DefaultAwsRegionProviderChain).getRegion))
consumerSettings <- Resource.pure[F, KinesisConsumerSettings](
KinesisConsumerSettings(
config.streamName,
config.appName,
region,
bufferSize = config.bufferSize
)
)
kinesisClient <- mkKinesisClient[F](region, config.customEndpoint)
dynamoClient <- mkDynamoDbClient[F](region, config.dynamodbCustomEndpoint)
cloudWatchClient <- mkCloudWatchClient[F](region, config.cloudwatchCustomEndpoint)
kinesis <- Resource.pure[F, Kinesis[F]](
Kinesis.create(scheduler(kinesisClient, dynamoClient, cloudWatchClient, config, _))
)
} yield (consumerSettings, kinesis)

Stream
.resource(resources)
.flatMap { case (settings, kinesis) =>
kinesis.readFromKinesisStream(settings)
}
.chunks
.filter(_.nonEmpty)
.map { chunk =>
val ack = chunk.asSeq
.groupBy(_.shardId)
.map { case (k, records) =>
k -> records.maxBy(_.sequenceNumber).toMetadata[F]
}
.toMap
val earliestTstamp = chunk.iterator.map(_.record.approximateArrivalTimestamp).min
LowLevelEvents(chunk.map(_.record.data()), ack, Some(earliestTstamp))
}
}
private def kinesisStream[F[_]: Async](
config: KinesisSourceConfig,
livenessRef: Ref[F, FiniteDuration]
): Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]] =
for {
region <- Stream.eval(Sync[F].delay((new DefaultAwsRegionProviderChain).getRegion))
consumerSettings = KinesisConsumerSettings(
config.streamName,
config.appName,
region,
bufferSize = config.bufferSize
)
kinesisClient <- Stream.resource(mkKinesisClient[F](region, config.customEndpoint))
dynamoClient <- Stream.resource(mkDynamoDbClient[F](region, config.dynamodbCustomEndpoint))
cloudWatchClient <- Stream.resource(mkCloudWatchClient[F](region, config.cloudwatchCustomEndpoint))
kinesis = Kinesis.create(scheduler(kinesisClient, dynamoClient, cloudWatchClient, config, _))
chunk <- kinesis.readFromKinesisStream(consumerSettings).chunks
now <- Stream.eval(Sync[F].realTime)
_ <- Stream.eval(livenessRef.set(now))
if chunk.nonEmpty
} yield {
val ack = chunk.asSeq
.groupBy(_.shardId)
.map { case (k, records) =>
k -> records.maxBy(_.sequenceNumber).toMetadata[F]
}
.toMap
val earliestTstamp = chunk.iterator.map(_.record.approximateArrivalTimestamp).min
LowLevelEvents(chunk.map(_.record.data()), ack, Some(earliestTstamp))
}

private def initialPositionOf(config: KinesisSourceConfig.InitialPosition): InitialPositionInStreamExtended =
config match {
Expand Down Expand Up @@ -195,13 +197,18 @@ object KinesisSource {
configsBuilder.leaseManagementConfig
.failoverTimeMillis(kinesisConfig.leaseDuration.toMillis)

// We ask to see empty batches, so that we can update the health check even when there are no records in the stream
val processorConfig =
configsBuilder.processorConfig
.callProcessRecordsEvenForEmptyRecordList(true)

new Scheduler(
configsBuilder.checkpointConfig,
configsBuilder.coordinatorConfig,
leaseManagementConfig,
configsBuilder.lifecycleConfig,
configsBuilder.metricsConfig.metricsLevel(MetricsLevel.NONE),
configsBuilder.processorConfig,
processorConfig,
retrievalConfig
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ object PubsubSource {

def stream: Stream[F, Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]]] =
pubsubStream(config)

def lastLiveness: F[FiniteDuration] =
Sync[F].realTime
}

private def pubsubCheckpointer[F[_]: Async]: PubSubCheckpointer[F] = new PubSubCheckpointer[F] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ private[sources] trait LowLevelSource[F[_], C] {
*/
def stream: Stream[F, Stream[F, LowLevelEvents[C]]]

/**
* The last time this source was known to be alive and healthy
*
* The returned value is FiniteDuration since the unix epoch, i.e. the value returned by
* `Sync[F].realTime`
*/
def lastLiveness: F[FiniteDuration]
}

private[sources] object LowLevelSource {
Expand Down Expand Up @@ -109,11 +116,13 @@ private[sources] object LowLevelSource {
}

def isHealthy(maxAllowedProcessingLatency: FiniteDuration): F[SourceAndAck.HealthStatus] =
(isConnectedRef.get, latencyRef.get, Sync[F].realTime).mapN {
case (false, _, _) =>
(isConnectedRef.get, latencyRef.get, source.lastLiveness, Sync[F].realTime).mapN {
case (false, _, _, _) =>
SourceAndAck.Disconnected
case (_, Some(lastPullTime), now) if now - lastPullTime > maxAllowedProcessingLatency =>
case (_, Some(lastPullTime), _, now) if now - lastPullTime > maxAllowedProcessingLatency =>
SourceAndAck.LaggingEventProcessor(now - lastPullTime)
case (_, _, lastLiveness, now) if now - lastLiveness > maxAllowedProcessingLatency =>
SourceAndAck.LaggingEventProcessor(now - lastLiveness)
case _ => SourceAndAck.Healthy
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import fs2.{Chunk, Stream}
import org.specs2.Specification
import org.specs2.matcher.Matcher

import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}

import java.nio.charset.StandardCharsets
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents}
Expand Down Expand Up @@ -43,6 +43,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
report healthy if events are processed but not yet acked (e.g. a batch-oriented loader) $e9
report healthy after all events have been processed and acked $e10
report disconnected while source is in between two active streams of events (e.g. during kafka rebalance) $e11
report unhealthy if the underlying low level source is lagging $e12
"""

def e1 = {
Expand Down Expand Up @@ -228,6 +229,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
val lowLevelSource = new LowLevelSource[IO, Unit] {
def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit)
def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit(Stream.never[IO])
def lastLiveness: IO[FiniteDuration] = IO.realTime
}

val io = for {
Expand All @@ -253,6 +255,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit {
Stream.emit(LowLevelEvents(Chunk.empty, (), None)).repeat
}
def lastLiveness: IO[FiniteDuration] = IO.realTime
}

// A processor which takes 1 hour to process each batch
Expand Down Expand Up @@ -282,6 +285,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit {
Stream.emit(LowLevelEvents(Chunk.empty, (), None)).repeat
}
def lastLiveness: IO[FiniteDuration] = IO.realTime
}

// A processor which takes 1 minute to process each batch, but does not emit the token (i.e. does not ack the batch)
Expand Down Expand Up @@ -309,6 +313,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit {
Stream.emit(LowLevelEvents(Chunk.empty, (), None)) ++ Stream.never[IO]
}
def lastLiveness: IO[FiniteDuration] = IO.realTime
}

// A processor which takes 1 minute to process each batch
Expand Down Expand Up @@ -339,6 +344,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
Stream.fixedDelay[IO](5.minutes).map { _ =>
Stream.emit(LowLevelEvents(Chunk.empty, (), None))
}
def lastLiveness: IO[FiniteDuration] = IO.realTime
}

// A processor which takes 5 seconds to process each batch
Expand All @@ -358,6 +364,29 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
TestControl.executeEmbed(io)
}

def e12 = {

val config = EventProcessingConfig(EventProcessingConfig.NoWindowing)

val lowLevelSource = new LowLevelSource[IO, Unit] {
def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit)
def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit(Stream.never[IO])
def lastLiveness: IO[FiniteDuration] = IO.realTime.map(_ - 10.seconds)
}

val io = for {
refProcessed <- Ref[IO].of[List[String]](Nil)
sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource)
processor = testProcessor(refProcessed)
fiber <- sourceAndAck.stream(config, processor).compile.drain.start
_ <- IO.sleep(5.minutes)
health <- sourceAndAck.isHealthy(5.seconds)
_ <- fiber.cancel
} yield health must beEqualTo(SourceAndAck.LaggingEventProcessor(10.seconds))

TestControl.executeEmbed(io)
}

def containUniqueStrings: Matcher[Seq[String]] = { (items: Seq[String]) =>
(items.toSet.size == items.size, s"$items contains non-unique values")
}
Expand Down Expand Up @@ -434,6 +463,7 @@ object LowLevelSourceSpec {
.drain
}
}
def lastLiveness: IO[FiniteDuration] = IO.realTime
}

def pureEvents: Stream[fs2.Pure, String] =
Expand Down

0 comments on commit e4b0198

Please sign in to comment.