Skip to content

Commit

Permalink
Kinesis source must report its latency when KCL is not healthy (#77)
Browse files Browse the repository at this point in the history
The KCL is sadly not very good at crashing and exiting. If the
underlying Kinesis client has errors (e.g. permissions errors) then KCL
tends to stay alive and not propagate the exceptions to our application
code. We want the app to crash under these circumstances because that
triggers an alert.

common-streams already has a health check feature, in which a health
probe becomes unhealthy if a single event gets stuck without making
progress.

This PR leans on the existing health check feature, so it also becomes
unhealthy if the Kinesis client is not regularly receiving healthy
responses.

I configured KCL to invoke our record processor every time it polls for
records, even if the batch is empty. This means the health check still
works even if there are no events in the stream.
  • Loading branch information
istreeter authored and spenes committed May 29, 2024
1 parent 9641083 commit be1fdb8
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.reflect._

import java.nio.ByteBuffer
import java.time.Instant
import scala.concurrent.duration.FiniteDuration

// kafka
import fs2.kafka._
Expand Down Expand Up @@ -49,6 +50,9 @@ object KafkaSource {

def stream: Stream[F, Stream[F, LowLevelEvents[KafkaCheckpoints[F]]]] =
kafkaStream(config, authHandlerClass)

def lastLiveness: F[FiniteDuration] =
Sync[F].realTime
}

case class OffsetAndCommit[F[_]](offset: Long, commit: F[Unit])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
*/
package com.snowplowanalytics.snowplow.sources.kinesis

import cats._
import cats.effect.{Async, Resource, Sync}
import cats.{Applicative, Semigroup}
import cats.effect.{Async, Ref, Resource, Sync}
import cats.effect.implicits._
import cats.implicits._
import fs2.Stream
import fs2.aws.kinesis.{CommittableRecord, Kinesis, KinesisConsumerSettings}
Expand All @@ -25,6 +26,7 @@ import software.amazon.kinesis.common.{InitialPositionInStream, InitialPositionI
import java.net.URI
import java.util.Date
import java.util.concurrent.Semaphore
import scala.concurrent.duration.FiniteDuration

// kinesis
import software.amazon.kinesis.common.ConfigsBuilder
Expand All @@ -43,8 +45,10 @@ object KinesisSource {

private implicit def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F]

def build[F[_]: Parallel: Async](config: KinesisSourceConfig): F[SourceAndAck[F]] =
LowLevelSource.toSourceAndAck(lowLevel(config))
def build[F[_]: Async](config: KinesisSourceConfig): F[SourceAndAck[F]] =
Ref.ofEffect(Sync[F].realTime).flatMap { livenessRef =>
LowLevelSource.toSourceAndAck(lowLevel(config, livenessRef))
}

private type KinesisCheckpointer[F[_]] = Checkpointer[F, Map[String, KinesisMetadata[F]]]

Expand All @@ -61,20 +65,26 @@ object KinesisSource {
ack: F[Unit]
)

private def lowLevel[F[_]: Parallel: Async](config: KinesisSourceConfig): LowLevelSource[F, Map[String, KinesisMetadata[F]]] =
private def lowLevel[F[_]: Async](
config: KinesisSourceConfig,
livenessRef: Ref[F, FiniteDuration]
): LowLevelSource[F, Map[String, KinesisMetadata[F]]] =
new LowLevelSource[F, Map[String, KinesisMetadata[F]]] {
def checkpointer: KinesisCheckpointer[F] = kinesisCheckpointer[F]

def stream: Stream[F, Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]]] =
Stream.emit(kinesisStream(config))
Stream.emit(kinesisStream(config, livenessRef))

def lastLiveness: F[FiniteDuration] =
livenessRef.get
}

private implicit def metadataSemigroup[F[_]]: Semigroup[KinesisMetadata[F]] = new Semigroup[KinesisMetadata[F]] {
override def combine(x: KinesisMetadata[F], y: KinesisMetadata[F]): KinesisMetadata[F] =
if (x.sequenceNumber > y.sequenceNumber) x else y
}

private def kinesisCheckpointer[F[_]: Parallel: Sync]: KinesisCheckpointer[F] = new KinesisCheckpointer[F] {
private def kinesisCheckpointer[F[_]: Async]: KinesisCheckpointer[F] = new KinesisCheckpointer[F] {
def combine(x: Map[String, KinesisMetadata[F]], y: Map[String, KinesisMetadata[F]]): Map[String, KinesisMetadata[F]] =
x |+| y

Expand Down Expand Up @@ -112,44 +122,36 @@ object KinesisSource {
def nack(c: Map[String, KinesisMetadata[F]]): F[Unit] = Applicative[F].unit
}

private def kinesisStream[F[_]: Async](config: KinesisSourceConfig): Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]] = {
val resources =
for {
region <- Resource.eval(Sync[F].delay((new DefaultAwsRegionProviderChain).getRegion))
consumerSettings <- Resource.pure[F, KinesisConsumerSettings](
KinesisConsumerSettings(
config.streamName,
config.appName,
region,
bufferSize = config.bufferSize
)
)
kinesisClient <- mkKinesisClient[F](region, config.customEndpoint)
dynamoClient <- mkDynamoDbClient[F](region, config.dynamodbCustomEndpoint)
cloudWatchClient <- mkCloudWatchClient[F](region, config.cloudwatchCustomEndpoint)
kinesis <- Resource.pure[F, Kinesis[F]](
Kinesis.create(scheduler(kinesisClient, dynamoClient, cloudWatchClient, config, _))
)
} yield (consumerSettings, kinesis)

Stream
.resource(resources)
.flatMap { case (settings, kinesis) =>
kinesis.readFromKinesisStream(settings)
}
.chunks
.filter(_.nonEmpty)
.map { chunk =>
val ack = chunk.asSeq
.groupBy(_.shardId)
.map { case (k, records) =>
k -> records.maxBy(_.sequenceNumber).toMetadata[F]
}
.toMap
val earliestTstamp = chunk.iterator.map(_.record.approximateArrivalTimestamp).min
LowLevelEvents(chunk.map(_.record.data()), ack, Some(earliestTstamp))
}
}
private def kinesisStream[F[_]: Async](
config: KinesisSourceConfig,
livenessRef: Ref[F, FiniteDuration]
): Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]] =
for {
region <- Stream.eval(Sync[F].delay((new DefaultAwsRegionProviderChain).getRegion))
consumerSettings = KinesisConsumerSettings(
config.streamName,
config.appName,
region,
bufferSize = config.bufferSize
)
kinesisClient <- Stream.resource(mkKinesisClient[F](region, config.customEndpoint))
dynamoClient <- Stream.resource(mkDynamoDbClient[F](region, config.dynamodbCustomEndpoint))
cloudWatchClient <- Stream.resource(mkCloudWatchClient[F](region, config.cloudwatchCustomEndpoint))
kinesis = Kinesis.create(scheduler(kinesisClient, dynamoClient, cloudWatchClient, config, _))
chunk <- kinesis.readChunkedFromKinesisStream(consumerSettings)
now <- Stream.eval(Sync[F].realTime)
_ <- Stream.eval(livenessRef.set(now))
if chunk.nonEmpty
} yield {
val ack = chunk.asSeq
.groupBy(_.shardId)
.map { case (k, records) =>
k -> records.maxBy(_.sequenceNumber).toMetadata[F]
}
.toMap
val earliestTstamp = chunk.iterator.map(_.record.approximateArrivalTimestamp).min
LowLevelEvents(chunk.map(_.record.data()), ack, Some(earliestTstamp))
}

private def initialPositionOf(config: KinesisSourceConfig.InitialPosition): InitialPositionInStreamExtended =
config match {
Expand Down Expand Up @@ -195,13 +197,18 @@ object KinesisSource {
configsBuilder.leaseManagementConfig
.failoverTimeMillis(kinesisConfig.leaseDuration.toMillis)

// We ask to see empty batches, so that we can update the health check even when there are no records in the stream
val processorConfig =
configsBuilder.processorConfig
.callProcessRecordsEvenForEmptyRecordList(true)

new Scheduler(
configsBuilder.checkpointConfig,
configsBuilder.coordinatorConfig,
leaseManagementConfig,
configsBuilder.lifecycleConfig,
configsBuilder.metricsConfig.metricsLevel(MetricsLevel.NONE),
configsBuilder.processorConfig,
processorConfig,
retrievalConfig
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ object PubsubSource {

def stream: Stream[F, Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]]] =
pubsubStream(config)

def lastLiveness: F[FiniteDuration] =
Sync[F].realTime
}

private def pubsubCheckpointer[F[_]: Async]: PubSubCheckpointer[F] = new PubSubCheckpointer[F] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,17 @@ object SourceAndAck {
*/
case class LaggingEventProcessor(latency: FiniteDuration) extends Unhealthy

/**
* The health status expected if the source of events has been inactive for some time
*
* @param duration
* How long the source of events has been inactive
*/
case class InactiveSource(duration: FiniteDuration) extends Unhealthy

implicit def showUnhealthy: Show[Unhealthy] = Show {
case Disconnected => "No connection to a source of events"
case LaggingEventProcessor(latency) => show"Processing latency is $latency"
case InactiveSource(duration) => show"Source of events has been inactive for $duration"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ private[sources] trait LowLevelSource[F[_], C] {
*/
def stream: Stream[F, Stream[F, LowLevelEvents[C]]]

/**
* The last time this source was known to be alive and healthy
*
* The returned value is FiniteDuration since the unix epoch, i.e. the value returned by
* `Sync[F].realTime`
*/
def lastLiveness: F[FiniteDuration]
}

private[sources] object LowLevelSource {
Expand Down Expand Up @@ -109,11 +116,13 @@ private[sources] object LowLevelSource {
}

def isHealthy(maxAllowedProcessingLatency: FiniteDuration): F[SourceAndAck.HealthStatus] =
(isConnectedRef.get, latencyRef.get, Sync[F].realTime).mapN {
case (false, _, _) =>
(isConnectedRef.get, latencyRef.get, source.lastLiveness, Sync[F].realTime).mapN {
case (false, _, _, _) =>
SourceAndAck.Disconnected
case (_, Some(lastPullTime), now) if now - lastPullTime > maxAllowedProcessingLatency =>
case (_, Some(lastPullTime), _, now) if now - lastPullTime > maxAllowedProcessingLatency =>
SourceAndAck.LaggingEventProcessor(now - lastPullTime)
case (_, _, lastLiveness, now) if now - lastLiveness > maxAllowedProcessingLatency =>
SourceAndAck.InactiveSource(now - lastLiveness)
case _ => SourceAndAck.Healthy
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
report healthy if events are processed but not yet acked (e.g. a batch-oriented loader) $health3
report healthy after all events have been processed and acked $health4
report disconnected while source is in between two active streams of events (e.g. during kafka rebalance) $health5
report unhealthy if the underlying low level source is lagging $health6
"""

def e1 = {
Expand Down Expand Up @@ -501,6 +502,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
val lowLevelSource = new LowLevelSource[IO, Unit] {
def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit)
def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit(Stream.never[IO])
def lastLiveness: IO[FiniteDuration] = IO.realTime
}

val io = for {
Expand Down Expand Up @@ -599,6 +601,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
Stream.fixedDelay[IO](5.minutes).map { _ =>
Stream.emit(LowLevelEvents(Chunk.empty, (), None))
}
def lastLiveness: IO[FiniteDuration] = IO.realTime
}

val io = for {
Expand All @@ -614,6 +617,28 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
TestControl.executeEmbed(io)
}

def health6 = {

val config = EventProcessingConfig(EventProcessingConfig.NoWindowing)

val lowLevelSource = new LowLevelSource[IO, Unit] {
def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit)
def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit(Stream.never[IO])
def lastLiveness: IO[FiniteDuration] = IO.realTime.map(_ - 10.seconds)
}

val io = for {
refProcessed <- Ref[IO].of(Vector.empty[Action])
sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource)
processor = testProcessor(refProcessed, TestSourceConfig(1, 1, 1.second, 1.second))
fiber <- sourceAndAck.stream(config, processor).compile.drain.start
_ <- IO.sleep(5.minutes)
health <- sourceAndAck.isHealthy(5.seconds)
_ <- fiber.cancel
} yield health must beEqualTo(SourceAndAck.InactiveSource(10.seconds))

TestControl.executeEmbed(io)
}
}

object LowLevelSourceSpec {
Expand Down Expand Up @@ -721,6 +746,7 @@ object LowLevelSourceSpec {
.repeatN(config.batchesPerRebalance.toLong)
}
}
def lastLiveness: IO[FiniteDuration] = IO.realTime
}

}

0 comments on commit be1fdb8

Please sign in to comment.