diff --git a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala index 0e76894..dc9feef 100644 --- a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala +++ b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala @@ -20,6 +20,7 @@ import scala.reflect._ import java.nio.ByteBuffer import java.time.Instant +import scala.concurrent.duration.FiniteDuration // kafka import fs2.kafka._ @@ -49,6 +50,9 @@ object KafkaSource { def stream: Stream[F, Stream[F, LowLevelEvents[KafkaCheckpoints[F]]]] = kafkaStream(config, authHandlerClass) + + def lastLiveness: F[FiniteDuration] = + Sync[F].realTime } case class OffsetAndCommit[F[_]](offset: Long, commit: F[Unit]) diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index 3daa90b..9fc9580 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -7,8 +7,9 @@ */ package com.snowplowanalytics.snowplow.sources.kinesis -import cats._ -import cats.effect.{Async, Resource, Sync} +import cats.{Applicative, Semigroup} +import cats.effect.{Async, Ref, Resource, Sync} +import cats.effect.implicits._ import cats.implicits._ import fs2.Stream import fs2.aws.kinesis.{CommittableRecord, Kinesis, KinesisConsumerSettings} @@ -25,6 +26,7 @@ import software.amazon.kinesis.common.{InitialPositionInStream, InitialPositionI import java.net.URI import java.util.Date import java.util.concurrent.Semaphore +import scala.concurrent.duration.FiniteDuration // kinesis import software.amazon.kinesis.common.ConfigsBuilder @@ -43,8 +45,10 @@ object KinesisSource { private implicit def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F] - def build[F[_]: Parallel: Async](config: KinesisSourceConfig): F[SourceAndAck[F]] = - LowLevelSource.toSourceAndAck(lowLevel(config)) + def build[F[_]: Async](config: KinesisSourceConfig): F[SourceAndAck[F]] = + Ref.ofEffect(Sync[F].realTime).flatMap { livenessRef => + LowLevelSource.toSourceAndAck(lowLevel(config, livenessRef)) + } private type KinesisCheckpointer[F[_]] = Checkpointer[F, Map[String, KinesisMetadata[F]]] @@ -61,12 +65,18 @@ object KinesisSource { ack: F[Unit] ) - private def lowLevel[F[_]: Parallel: Async](config: KinesisSourceConfig): LowLevelSource[F, Map[String, KinesisMetadata[F]]] = + private def lowLevel[F[_]: Async]( + config: KinesisSourceConfig, + livenessRef: Ref[F, FiniteDuration] + ): LowLevelSource[F, Map[String, KinesisMetadata[F]]] = new LowLevelSource[F, Map[String, KinesisMetadata[F]]] { def checkpointer: KinesisCheckpointer[F] = kinesisCheckpointer[F] def stream: Stream[F, Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]]] = - Stream.emit(kinesisStream(config)) + Stream.emit(kinesisStream(config, livenessRef)) + + def lastLiveness: F[FiniteDuration] = + livenessRef.get } private implicit def metadataSemigroup[F[_]]: Semigroup[KinesisMetadata[F]] = new Semigroup[KinesisMetadata[F]] { @@ -74,7 +84,7 @@ object KinesisSource { if (x.sequenceNumber > y.sequenceNumber) x else y } - private def kinesisCheckpointer[F[_]: Parallel: Sync]: KinesisCheckpointer[F] = new KinesisCheckpointer[F] { + private def kinesisCheckpointer[F[_]: Async]: KinesisCheckpointer[F] = new KinesisCheckpointer[F] { def combine(x: Map[String, KinesisMetadata[F]], y: Map[String, KinesisMetadata[F]]): Map[String, KinesisMetadata[F]] = x |+| y @@ -112,44 +122,36 @@ object KinesisSource { def nack(c: Map[String, KinesisMetadata[F]]): F[Unit] = Applicative[F].unit } - private def kinesisStream[F[_]: Async](config: KinesisSourceConfig): Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]] = { - val resources = - for { - region <- Resource.eval(Sync[F].delay((new DefaultAwsRegionProviderChain).getRegion)) - consumerSettings <- Resource.pure[F, KinesisConsumerSettings]( - KinesisConsumerSettings( - config.streamName, - config.appName, - region, - bufferSize = config.bufferSize - ) - ) - kinesisClient <- mkKinesisClient[F](region, config.customEndpoint) - dynamoClient <- mkDynamoDbClient[F](region, config.dynamodbCustomEndpoint) - cloudWatchClient <- mkCloudWatchClient[F](region, config.cloudwatchCustomEndpoint) - kinesis <- Resource.pure[F, Kinesis[F]]( - Kinesis.create(scheduler(kinesisClient, dynamoClient, cloudWatchClient, config, _)) - ) - } yield (consumerSettings, kinesis) - - Stream - .resource(resources) - .flatMap { case (settings, kinesis) => - kinesis.readFromKinesisStream(settings) - } - .chunks - .filter(_.nonEmpty) - .map { chunk => - val ack = chunk.asSeq - .groupBy(_.shardId) - .map { case (k, records) => - k -> records.maxBy(_.sequenceNumber).toMetadata[F] - } - .toMap - val earliestTstamp = chunk.iterator.map(_.record.approximateArrivalTimestamp).min - LowLevelEvents(chunk.map(_.record.data()), ack, Some(earliestTstamp)) - } - } + private def kinesisStream[F[_]: Async]( + config: KinesisSourceConfig, + livenessRef: Ref[F, FiniteDuration] + ): Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]] = + for { + region <- Stream.eval(Sync[F].delay((new DefaultAwsRegionProviderChain).getRegion)) + consumerSettings = KinesisConsumerSettings( + config.streamName, + config.appName, + region, + bufferSize = config.bufferSize + ) + kinesisClient <- Stream.resource(mkKinesisClient[F](region, config.customEndpoint)) + dynamoClient <- Stream.resource(mkDynamoDbClient[F](region, config.dynamodbCustomEndpoint)) + cloudWatchClient <- Stream.resource(mkCloudWatchClient[F](region, config.cloudwatchCustomEndpoint)) + kinesis = Kinesis.create(scheduler(kinesisClient, dynamoClient, cloudWatchClient, config, _)) + chunk <- kinesis.readFromKinesisStream(consumerSettings).chunks + now <- Stream.eval(Sync[F].realTime) + _ <- Stream.eval(livenessRef.set(now)) + if chunk.nonEmpty + } yield { + val ack = chunk.asSeq + .groupBy(_.shardId) + .map { case (k, records) => + k -> records.maxBy(_.sequenceNumber).toMetadata[F] + } + .toMap + val earliestTstamp = chunk.iterator.map(_.record.approximateArrivalTimestamp).min + LowLevelEvents(chunk.map(_.record.data()), ack, Some(earliestTstamp)) + } private def initialPositionOf(config: KinesisSourceConfig.InitialPosition): InitialPositionInStreamExtended = config match { @@ -195,13 +197,18 @@ object KinesisSource { configsBuilder.leaseManagementConfig .failoverTimeMillis(kinesisConfig.leaseDuration.toMillis) + // We ask to see empty batches, so that we can update the health check even when there are no records in the stream + val processorConfig = + configsBuilder.processorConfig + .callProcessRecordsEvenForEmptyRecordList(true) + new Scheduler( configsBuilder.checkpointConfig, configsBuilder.coordinatorConfig, leaseManagementConfig, configsBuilder.lifecycleConfig, configsBuilder.metricsConfig.metricsLevel(MetricsLevel.NONE), - configsBuilder.processorConfig, + processorConfig, retrievalConfig ) } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala index 585ab33..03d906f 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala @@ -61,6 +61,9 @@ object PubsubSource { def stream: Stream[F, Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]]] = pubsubStream(config) + + def lastLiveness: F[FiniteDuration] = + Sync[F].realTime } private def pubsubCheckpointer[F[_]: Async]: PubSubCheckpointer[F] = new PubSubCheckpointer[F] { diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala index 4d1f4be..eaed8d8 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala @@ -46,6 +46,13 @@ private[sources] trait LowLevelSource[F[_], C] { */ def stream: Stream[F, Stream[F, LowLevelEvents[C]]] + /** + * The last time this source was known to be alive and healthy + * + * The returned value is FiniteDuration since the unix epoch, i.e. the value returned by + * `Sync[F].realTime` + */ + def lastLiveness: F[FiniteDuration] } private[sources] object LowLevelSource { @@ -109,11 +116,13 @@ private[sources] object LowLevelSource { } def isHealthy(maxAllowedProcessingLatency: FiniteDuration): F[SourceAndAck.HealthStatus] = - (isConnectedRef.get, latencyRef.get, Sync[F].realTime).mapN { - case (false, _, _) => + (isConnectedRef.get, latencyRef.get, source.lastLiveness, Sync[F].realTime).mapN { + case (false, _, _, _) => SourceAndAck.Disconnected - case (_, Some(lastPullTime), now) if now - lastPullTime > maxAllowedProcessingLatency => + case (_, Some(lastPullTime), _, now) if now - lastPullTime > maxAllowedProcessingLatency => SourceAndAck.LaggingEventProcessor(now - lastPullTime) + case (_, _, lastLiveness, now) if now - lastLiveness > maxAllowedProcessingLatency => + SourceAndAck.LaggingEventProcessor(now - lastLiveness) case _ => SourceAndAck.Healthy } } diff --git a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala index 399a6e4..f99d398 100644 --- a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala +++ b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala @@ -15,7 +15,7 @@ import fs2.{Chunk, Stream} import org.specs2.Specification import org.specs2.matcher.Matcher -import scala.concurrent.duration.{Duration, DurationInt} +import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} import java.nio.charset.StandardCharsets import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} @@ -43,6 +43,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { report healthy if events are processed but not yet acked (e.g. a batch-oriented loader) $e9 report healthy after all events have been processed and acked $e10 report disconnected while source is in between two active streams of events (e.g. during kafka rebalance) $e11 + report unhealthy if the underlying low level source is lagging $e12 """ def e1 = { @@ -228,6 +229,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { val lowLevelSource = new LowLevelSource[IO, Unit] { def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit(Stream.never[IO]) + def lastLiveness: IO[FiniteDuration] = IO.realTime } val io = for { @@ -253,6 +255,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit { Stream.emit(LowLevelEvents(Chunk.empty, (), None)).repeat } + def lastLiveness: IO[FiniteDuration] = IO.realTime } // A processor which takes 1 hour to process each batch @@ -282,6 +285,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit { Stream.emit(LowLevelEvents(Chunk.empty, (), None)).repeat } + def lastLiveness: IO[FiniteDuration] = IO.realTime } // A processor which takes 1 minute to process each batch, but does not emit the token (i.e. does not ack the batch) @@ -309,6 +313,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit { Stream.emit(LowLevelEvents(Chunk.empty, (), None)) ++ Stream.never[IO] } + def lastLiveness: IO[FiniteDuration] = IO.realTime } // A processor which takes 1 minute to process each batch @@ -339,6 +344,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { Stream.fixedDelay[IO](5.minutes).map { _ => Stream.emit(LowLevelEvents(Chunk.empty, (), None)) } + def lastLiveness: IO[FiniteDuration] = IO.realTime } // A processor which takes 5 seconds to process each batch @@ -358,6 +364,29 @@ class LowLevelSourceSpec extends Specification with CatsEffect { TestControl.executeEmbed(io) } + def e12 = { + + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + + val lowLevelSource = new LowLevelSource[IO, Unit] { + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit(Stream.never[IO]) + def lastLiveness: IO[FiniteDuration] = IO.realTime.map(_ - 10.seconds) + } + + val io = for { + refProcessed <- Ref[IO].of[List[String]](Nil) + sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource) + processor = testProcessor(refProcessed) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(5.minutes) + health <- sourceAndAck.isHealthy(5.seconds) + _ <- fiber.cancel + } yield health must beEqualTo(SourceAndAck.LaggingEventProcessor(10.seconds)) + + TestControl.executeEmbed(io) + } + def containUniqueStrings: Matcher[Seq[String]] = { (items: Seq[String]) => (items.toSet.size == items.size, s"$items contains non-unique values") } @@ -434,6 +463,7 @@ object LowLevelSourceSpec { .drain } } + def lastLiveness: IO[FiniteDuration] = IO.realTime } def pureEvents: Stream[fs2.Pure, String] =