diff --git a/build.sbt b/build.sbt index b128aff3..1fac4688 100755 --- a/build.sbt +++ b/build.sbt @@ -33,14 +33,14 @@ lazy val azure: Project = project .in(file("modules/azure")) .settings(BuildSettings.azureSettings) .settings(libraryDependencies ++= Dependencies.azureDependencies ++ Dependencies.icebergDeltaRuntimeDependencies) - .dependsOn(core) + .dependsOn(core, deltaIceberg) .enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin) lazy val gcp: Project = project .in(file("modules/gcp")) .settings(BuildSettings.gcpSettings) .settings(libraryDependencies ++= Dependencies.gcpDependencies ++ Dependencies.icebergDeltaRuntimeDependencies) - .dependsOn(core) + .dependsOn(core, deltaIceberg) .enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin) lazy val aws: Project = project @@ -50,7 +50,7 @@ lazy val aws: Project = project .dependsOn(core, deltaIceberg) .enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin) -/** Packaging: Extra runtime dependencies for alternative assets * */ +/** Packaging: Extra runtime dependencies for alternative assets */ lazy val hudi: Project = project .in(file("packaging/hudi")) @@ -94,7 +94,7 @@ lazy val gcpHudi: Project = project .settings(libraryDependencies ++= Dependencies.gcpDependencies) .dependsOn(core) .enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin) - .dependsOn(hudi % "runtime->runtime") + .dependsOn(hudi % "runtime->runtime;compile->compile") lazy val azureHudi: Project = project .in(file("modules/azure")) @@ -104,7 +104,7 @@ lazy val azureHudi: Project = project .settings(libraryDependencies ++= Dependencies.azureDependencies) .dependsOn(core) .enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin) - .dependsOn(hudi % "runtime->runtime") + .dependsOn(hudi % "runtime->runtime;compile->compile") lazy val gcpBiglake: Project = gcp .withId("gcpBiglake") diff --git a/config/config.aws.reference.hocon b/config/config.aws.reference.hocon index f75409ea..a51e26bd 100644 --- a/config/config.aws.reference.hocon +++ b/config/config.aws.reference.hocon @@ -249,7 +249,7 @@ } } - # -- Report alerts to the webhook + # -- Report alerts and heartbeats to the webhook "webhook": { # An actual HTTP endpoint "endpoint": "https://webhook.acme.com", @@ -257,6 +257,8 @@ "tags": { "pipeline": "production" } + # How often to send the heartbeat event + "heartbeat": "60.minutes" } # -- Open a HTTP server that returns OK only if the app is healthy diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index 50bd3f1a..5aa4de11 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -220,7 +220,7 @@ } } - # -- Report alerts to the webhook + # -- Report alerts and heartbeats to the webhook "webhook": { # An actual HTTP endpoint "endpoint": "https://webhook.acme.com", @@ -228,6 +228,8 @@ "tags": { "pipeline": "production" } + # How often to send the heartbeat event + "heartbeat": "60.minutes" } # -- Open a HTTP server that returns OK only if the app is healthy diff --git a/config/config.gcp.reference.hocon b/config/config.gcp.reference.hocon index 745e3aab..86474849 100644 --- a/config/config.gcp.reference.hocon +++ b/config/config.gcp.reference.hocon @@ -228,7 +228,7 @@ } } - # -- Report alerts to the webhook + # -- Report alerts and heartbeats to the webhook "webhook": { # An actual HTTP endpoint "endpoint": "https://webhook.acme.com", @@ -236,6 +236,8 @@ "tags": { "pipeline": "production" } + # How often to send the heartbeat event + "heartbeat": "60.minutes" } # -- Open a HTTP server that returns OK only if the app is healthy diff --git a/modules/aws/src/main/scala/com.snowplowanalytics.snowplow.lakes/AwsApp.scala b/modules/aws/src/main/scala/com.snowplowanalytics.snowplow.lakes/AwsApp.scala index 600c74ed..fa3a4f89 100644 --- a/modules/aws/src/main/scala/com.snowplowanalytics.snowplow.lakes/AwsApp.scala +++ b/modules/aws/src/main/scala/com.snowplowanalytics.snowplow.lakes/AwsApp.scala @@ -41,27 +41,27 @@ object AwsApp extends LoaderApp[KinesisSourceConfig, KinesisSinkConfig](BuildInf */ override def isDestinationSetupError: DestinationSetupErrorCheck = { - /** Exceptions raised by underlying AWS SDK * */ + // Exceptions raised by underlying AWS SDK case _: NoSuchBucketException => // S3 bucket does not exist - Some("S3 bucket does not exist or we do not have permissions to see it exists") + "S3 bucket does not exist or we do not have permissions to see it exists" case e: S3Exception if e.statusCode() === 403 => // No permission to read from S3 bucket or to write to S3 bucket - Some("Missing permissions to perform this action on S3 bucket") + "Missing permissions to perform this action on S3 bucket" case e: S3Exception if e.statusCode() === 301 => // Misconfigured AWS region - Some("S3 bucket is not in the expected region") + "S3 bucket is not in the expected region" case e: GlueAccessDeniedException => // No permission to read from Glue catalog - Some(Option(e.getMessage).getOrElse("Missing permissions to perform this action on Glue catalog")) + Option(e.getMessage).getOrElse("Missing permissions to perform this action on Glue catalog") case _: GlueEntityNotFoundException => // Glue database does not exist - Some("Glue resource does not exist or no permission to see it exists") + "Glue resource does not exist or no permission to see it exists" case e: StsException if e.statusCode() === 403 => // No permission to assume the role given to authenticate to S3/Glue - Some("Missing permissions to assume the AWS IAM role") + "Missing permissions to assume the AWS IAM role" - /** Exceptions raised via hadoop's s3a filesystem * */ + // Exceptions raised via hadoop's s3a filesystem case e: UnknownStoreException => // S3 bucket does not exist or no permission to see it exists stripCauseDetails(e) @@ -70,10 +70,11 @@ object AwsApp extends LoaderApp[KinesisSourceConfig, KinesisSinkConfig](BuildInf // 2 - No permission to assume the role given to authenticate to S3 stripCauseDetails(e) case _: CredentialInitializationException => - Some("Failed to initialize AWS access credentials") + "Failed to initialize AWS access credentials" - /** Exceptions common to the table format - Delta/Iceberg/Hudi * */ - case t => TableFormatSetupError.check(t) + // Exceptions common to the table format - Delta/Iceberg/Hudi + case TableFormatSetupError.check(t) => + t } /** @@ -86,13 +87,12 @@ object AwsApp extends LoaderApp[KinesisSourceConfig, KinesisSinkConfig](BuildInf * In order to have better control of the message sent to the webhook, we remove the cause details * here, and add back in pertinent cause information later. */ - private def stripCauseDetails(t: Throwable): Option[String] = - (Option(t.getMessage), Option(t.getCause)) match { - case (Some(message), Some(cause)) => + private def stripCauseDetails(t: Throwable): String = + Option(t.getCause) match { + case Some(cause) => val toRemove = new Regex(":? *" + Regex.quote(cause.toString) + ".*") - val replaced = toRemove.replaceAllIn(message, "") - Some(replaced) - case (other, _) => - other + toRemove.replaceAllIn(t.getMessage, "") + case None => + t.getMessage } } diff --git a/modules/azure/src/main/scala/com.snowplowanalytics.snowplow.lakes/AzureApp.scala b/modules/azure/src/main/scala/com.snowplowanalytics.snowplow.lakes/AzureApp.scala index cf154bc9..56f83089 100644 --- a/modules/azure/src/main/scala/com.snowplowanalytics.snowplow.lakes/AzureApp.scala +++ b/modules/azure/src/main/scala/com.snowplowanalytics.snowplow.lakes/AzureApp.scala @@ -35,5 +35,5 @@ object AzureApp extends LoaderApp[KafkaSourceConfig, KafkaSinkConfig](BuildInfo) override def badSink: SinkProvider = KafkaSink.resource(_, classTag[SinkAuthHandler]) - override def isDestinationSetupError: DestinationSetupErrorCheck = _ => None + override def isDestinationSetupError: DestinationSetupErrorCheck = TableFormatSetupError.check } diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index 1c9a27ad..33eea99c 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -147,6 +147,7 @@ "prefix": "snowplow.lakeloader" } } + "webhook": ${snowplow.defaults.webhook} "sentry": { "tags": { } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Alert.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Alert.scala index f5cd07a4..09a8adc9 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Alert.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Alert.scala @@ -13,58 +13,14 @@ package com.snowplowanalytics.snowplow.lakes import cats.Show import cats.implicits.showInterpolator -import com.snowplowanalytics.iglu.core.circe.implicits.igluNormalizeDataJson -import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} -import com.snowplowanalytics.snowplow.runtime.AppInfo - -import io.circe.Json -import io.circe.syntax.EncoderOps +import com.snowplowanalytics.snowplow.runtime.SetupExceptionMessages sealed trait Alert object Alert { - /** Restrict the length of an alert message to be compliant with alert iglu schema */ - private val MaxAlertPayloadLength = 4096 - - final case class FailedToCreateEventsTable(causes: List[String]) extends Alert - final case class FailedToCommitEvents(causes: List[String]) extends Alert - - def toSelfDescribingJson( - alert: Alert, - appInfo: AppInfo, - tags: Map[String, String] - ): Json = - SelfDescribingData( - schema = SchemaKey("com.snowplowanalytics.monitoring.loader", "alert", "jsonschema", SchemaVer.Full(1, 0, 0)), - data = Json.obj( - "appName" -> appInfo.name.asJson, - "appVersion" -> appInfo.version.asJson, - "message" -> getMessage(alert).asJson, - "tags" -> tags.asJson - ) - ).normalize - - private def getMessage(alert: Alert): String = { - val full = alert match { - case FailedToCreateEventsTable(causes) => show"Failed to create events table: $causes" - case FailedToCommitEvents(causes) => show"Failed to write events into table: $causes" - } - - full.take(MaxAlertPayloadLength) - } - - private implicit def causesShow: Show[List[String]] = { - def removeDuplicateMessages(in: List[String]): List[String] = - in match { - case h :: t :: rest => - if (h.contains(t)) removeDuplicateMessages(h :: rest) - else if (t.contains(h)) removeDuplicateMessages(t :: rest) - else h :: removeDuplicateMessages(t :: rest) - case fewer => fewer - } + final case class FailedToCreateEventsTable(causes: SetupExceptionMessages) extends Alert - Show.show { causes => - removeDuplicateMessages(causes).mkString(": ") - } + implicit def showAlert: Show[Alert] = Show[Alert] { case FailedToCreateEventsTable(causes) => + show"Failed to create events table: $causes" } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/AppHealth.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/AppHealth.scala deleted file mode 100644 index 7eb290b2..00000000 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/AppHealth.scala +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Copyright (c) 2013-present Snowplow Analytics Ltd. All rights reserved. - * - * This software is made available by Snowplow Analytics, Ltd., under the terms of the Snowplow - * Limited Use License Agreement, Version 1.0 located at - * https://docs.snowplow.io/limited-use-license-1.0 BY INSTALLING, DOWNLOADING, ACCESSING, USING OR - * DISTRIBUTING ANY PORTION OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. - */ -package com.snowplowanalytics.snowplow.lakes - -import cats.effect.{Concurrent, Ref} -import cats.implicits._ -import cats.{Monad, Monoid, Show} -import com.snowplowanalytics.snowplow.runtime.HealthProbe -import com.snowplowanalytics.snowplow.runtime.HealthProbe.{Healthy, Unhealthy} -import com.snowplowanalytics.snowplow.sources.SourceAndAck - -import scala.concurrent.duration.FiniteDuration - -final class AppHealth[F[_]: Monad]( - unhealthyLatency: FiniteDuration, - source: SourceAndAck[F], - appManagedServices: Ref[F, Map[AppHealth.Service, Boolean]] -) { - - def status: F[HealthProbe.Status] = - for { - sourceHealth <- getSourceHealth - servicesHealth <- getAppManagedServicesHealth - } yield (sourceHealth :: servicesHealth).combineAll - - def setServiceHealth(service: AppHealth.Service, isHealthy: Boolean): F[Unit] = - appManagedServices.update { currentHealth => - currentHealth.updated(service, isHealthy) - } - - private def getAppManagedServicesHealth: F[List[HealthProbe.Status]] = - appManagedServices.get.map { services => - services.map { - case (service, false) => HealthProbe.Unhealthy(show"$service is not healthy") - case _ => HealthProbe.Healthy - }.toList - } - - private def getSourceHealth: F[HealthProbe.Status] = - source.isHealthy(unhealthyLatency).map { - case SourceAndAck.Healthy => HealthProbe.Healthy - case unhealthy: SourceAndAck.Unhealthy => HealthProbe.Unhealthy(unhealthy.show) - } - - private val combineHealth: (HealthProbe.Status, HealthProbe.Status) => HealthProbe.Status = { - case (Healthy, Healthy) => Healthy - case (Healthy, unhealthy) => unhealthy - case (unhealthy, Healthy) => unhealthy - case (Unhealthy(first), Unhealthy(second)) => Unhealthy(reason = s"$first, $second") - } - - private implicit val healthMonoid: Monoid[HealthProbe.Status] = Monoid.instance(Healthy, combineHealth) -} - -object AppHealth { - - sealed trait Service - - object Service { - case object SparkWriter extends Service - case object BadSink extends Service - - implicit val show: Show[Service] = Show.show { - case SparkWriter => "Spark writer" - case BadSink => "Failed events sink" - } - } - - def init[F[_]: Concurrent]( - unhealthyLatency: FiniteDuration, - source: SourceAndAck[F] - ): F[AppHealth[F]] = - Ref[F] - .of(Map[Service, Boolean](Service.SparkWriter -> false, Service.BadSink -> false)) - .map(appManaged => new AppHealth[F](unhealthyLatency, source, appManaged)) -} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala index 812b47de..3fee211a 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala @@ -11,20 +11,18 @@ package com.snowplowanalytics.snowplow.lakes import cats.Id -import cats.syntax.either._ import io.circe.Decoder import io.circe.generic.extras.semiauto._ import io.circe.generic.extras.Configuration import io.circe.config.syntax._ import com.comcast.ip4s.Port -import org.http4s.{ParseFailure, Uri} import java.net.URI import scala.concurrent.duration.FiniteDuration import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig import com.snowplowanalytics.iglu.core.SchemaCriterion -import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, Metrics => CommonMetrics, Telemetry} +import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, Metrics => CommonMetrics, Retrying, Telemetry, Webhook} import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDecoder import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._ @@ -119,17 +117,15 @@ object Config { metrics: Metrics, sentry: Option[Sentry], healthProbe: HealthProbe, - webhook: Option[Webhook] + webhook: Webhook.Config ) - final case class Webhook(endpoint: Uri, tags: Map[String, String]) - case class SetupErrorRetries(delay: FiniteDuration) case class TransientErrorRetries(delay: FiniteDuration, attempts: Int) case class Retries( - setupErrors: SetupErrorRetries, - transientErrors: TransientErrorRetries + setupErrors: Retrying.Config.ForSetup, + transientErrors: Retrying.Config.ForTransient ) implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = { @@ -150,14 +146,9 @@ object Config { case SentryM(None, _) => None } - implicit val http4sUriDecoder: Decoder[Uri] = - Decoder[String].emap(s => Either.catchOnly[ParseFailure](Uri.unsafeFromString(s)).leftMap(_.toString)) - implicit val webhookDecoder = deriveConfiguredDecoder[Webhook] implicit val metricsDecoder = deriveConfiguredDecoder[Metrics] implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe] implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring] - implicit val setupRetries = deriveConfiguredDecoder[SetupErrorRetries] - implicit val transientRetries = deriveConfiguredDecoder[TransientErrorRetries] implicit val retriesDecoder = deriveConfiguredDecoder[Retries] // TODO add specific lake-loader docs for license diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala index bcf529e0..5cb56475 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala @@ -22,7 +22,7 @@ import com.snowplowanalytics.iglu.core.SchemaCriterion import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, SourceAndAck} import com.snowplowanalytics.snowplow.sinks.Sink import com.snowplowanalytics.snowplow.lakes.processing.LakeWriter -import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe} +import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, Webhook} /** * Resources and runtime-derived configuration needed for processing events @@ -47,7 +47,7 @@ case class Environment[F[_]]( httpClient: Client[F], lakeWriter: LakeWriter.WithHandledErrors[F], metrics: Metrics[F], - appHealth: AppHealth[F], + appHealth: AppHealth.Interface[F, Alert, RuntimeService], cpuParallelism: Int, inMemBatchBytes: Long, windowing: EventProcessingConfig.TimedWindows, @@ -68,15 +68,17 @@ object Environment { for { _ <- enableSentry[F](appInfo, config.main.monitoring.sentry) sourceAndAck <- Resource.eval(toSource(config.main.input)) - appHealth <- Resource.eval(AppHealth.init(config.main.monitoring.healthProbe.unhealthyLatency, sourceAndAck)) - _ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, appHealth.status) + sourceReporter = sourceAndAck.isHealthy(config.main.monitoring.healthProbe.unhealthyLatency).map(_.showIfUnhealthy) + appHealth <- Resource.eval(AppHealth.init[F, Alert, RuntimeService](List(sourceReporter))) resolver <- mkResolver[F](config.iglu) httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource - monitoring <- Monitoring.create[F](config.main.monitoring.webhook, appInfo, httpClient) - badSink <- toSink(config.main.output.bad.sink).evalTap(_ => appHealth.setServiceHealth(AppHealth.Service.BadSink, true)) + _ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, appHealth) + _ <- Webhook.resource(config.main.monitoring.webhook, appInfo, httpClient, appHealth) + badSink <- + toSink(config.main.output.bad.sink).onError(_ => Resource.eval(appHealth.beUnhealthyForRuntimeService(RuntimeService.BadSink))) windowing <- Resource.eval(EventProcessingConfig.TimedWindows.build(config.main.windowing, config.main.numEagerWindows)) lakeWriter <- LakeWriter.build(config.main.spark, config.main.output.good) - lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth, monitoring, config.main.retries, destinationSetupErrorCheck) + lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth, config.main.retries, destinationSetupErrorCheck) metrics <- Resource.eval(Metrics.build(config.main.monitoring.metrics)) cpuParallelism = chooseCpuParallelism(config.main) } yield Environment( diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Monitoring.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Monitoring.scala deleted file mode 100644 index d0f0620a..00000000 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Monitoring.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved. - * - * This software is made available by Snowplow Analytics, Ltd., - * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 - * located at https://docs.snowplow.io/limited-use-license-1.0 - * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION - * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. - */ - -package com.snowplowanalytics.snowplow.lakes - -import cats.effect.{Resource, Sync} -import cats.implicits._ - -import com.snowplowanalytics.snowplow.runtime.AppInfo - -import org.http4s.circe.jsonEncoder -import org.http4s.client.Client -import org.http4s.{EntityDecoder, Method, Request} - -import org.typelevel.log4cats.Logger -import org.typelevel.log4cats.slf4j.Slf4jLogger - -trait Monitoring[F[_]] { - def alert(message: Alert): F[Unit] -} - -object Monitoring { - - private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] - - def create[F[_]: Sync]( - config: Option[Config.Webhook], - appInfo: AppInfo, - httpClient: Client[F] - )(implicit E: EntityDecoder[F, String] - ): Resource[F, Monitoring[F]] = Resource.pure { - new Monitoring[F] { - - override def alert(message: Alert): F[Unit] = - config match { - case Some(webhookConfig) => - val request = buildHttpRequest(webhookConfig, message) - Logger[F].info(show"Sending alert to ${webhookConfig.endpoint} with details of the setup error...") *> - executeHttpRequest(webhookConfig, httpClient, request) - case None => - Logger[F].debug("Webhook monitoring is not configured, skipping alert") - } - - def buildHttpRequest(webhookConfig: Config.Webhook, alert: Alert): Request[F] = - Request[F](Method.POST, webhookConfig.endpoint) - .withEntity(Alert.toSelfDescribingJson(alert, appInfo, webhookConfig.tags)) - - def executeHttpRequest( - webhookConfig: Config.Webhook, - httpClient: Client[F], - request: Request[F] - ): F[Unit] = - httpClient - .run(request) - .use { response => - if (response.status.isSuccess) Sync[F].unit - else { - response - .as[String] - .flatMap(body => Logger[F].error(show"Webhook ${webhookConfig.endpoint} returned non-2xx response:\n$body")) - } - } - .handleErrorWith { e => - Logger[F].error(e)(show"Webhook ${webhookConfig.endpoint} resulted in exception without a response") - } - } - } - -} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/RuntimeService.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/RuntimeService.scala new file mode 100644 index 00000000..b942ee84 --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/RuntimeService.scala @@ -0,0 +1,23 @@ +/** + * Copyright (c) 2013-present Snowplow Analytics Ltd. All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., under the terms of the Snowplow + * Limited Use License Agreement, Version 1.0 located at + * https://docs.snowplow.io/limited-use-license-1.0 BY INSTALLING, DOWNLOADING, ACCESSING, USING OR + * DISTRIBUTING ANY PORTION OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.lakes + +import cats.Show + +sealed trait RuntimeService + +object RuntimeService { + case object SparkWriter extends RuntimeService + case object BadSink extends RuntimeService + + implicit val show: Show[RuntimeService] = Show.show { + case SparkWriter => "Spark writer" + case BadSink => "Failed events sink" + } +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/package.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/package.scala index 851ed903..bc56107f 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/package.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/package.scala @@ -17,10 +17,10 @@ package object lakes { * Function that checks whether an exception is due to a destination setup error * * If an exception was caused by a destination setup error, then it should return a short - * human-friendly description of the problem. For any other exception it should return None. + * human-friendly description of the problem. For any other exception it should return nothing. * * A DestinationSetupErrorCheck should check the top-level exception only; it should NOT check * `getCause`. Because our application code already checks the causes. */ - type DestinationSetupErrorCheck = Throwable => Option[String] + type DestinationSetupErrorCheck = PartialFunction[Throwable, String] } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala index d7894647..aac640a3 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala @@ -17,7 +17,8 @@ import cats.effect.std.Mutex import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.StructType -import com.snowplowanalytics.snowplow.lakes.{Alert, AppHealth, Config, DestinationSetupErrorCheck, Monitoring} +import com.snowplowanalytics.snowplow.runtime.{AppHealth, Retrying} +import com.snowplowanalytics.snowplow.lakes.{Alert, Config, DestinationSetupErrorCheck, RuntimeService} import com.snowplowanalytics.snowplow.lakes.tables.{DeltaWriter, HudiWriter, IcebergWriter, Writer} trait LakeWriter[F[_]] { @@ -88,15 +89,21 @@ object LakeWriter { def withHandledErrors[F[_]: Async]( underlying: LakeWriter[F], - appHealth: AppHealth[F], - monitoring: Monitoring[F], + appHealth: AppHealth.Interface[F, Alert, RuntimeService], retries: Config.Retries, destinationSetupErrorCheck: DestinationSetupErrorCheck ): WithHandledErrors[F] = new WithHandledErrors[F] { def createTable: F[Unit] = - Retrying.withRetries(appHealth, retries, monitoring, Alert.FailedToCreateEventsTable, destinationSetupErrorCheck) { + Retrying.withRetries( + appHealth, + retries.transientErrors, + retries.setupErrors, + RuntimeService.SparkWriter, + Alert.FailedToCreateEventsTable, + destinationSetupErrorCheck + ) { underlying.createTable - } + } <* appHealth.beHealthyForSetup def initializeLocalDataFrame(viewName: String): F[Unit] = underlying.initializeLocalDataFrame(viewName) @@ -115,8 +122,8 @@ object LakeWriter { underlying .commit(viewName) .onError { case _ => - appHealth.setServiceHealth(AppHealth.Service.SparkWriter, isHealthy = false) - } <* appHealth.setServiceHealth(AppHealth.Service.SparkWriter, isHealthy = true) + appHealth.beUnhealthyForRuntimeService(RuntimeService.SparkWriter) + } <* appHealth.beHealthyForRuntimeService(RuntimeService.SparkWriter) } /** diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala index ef298bd3..1a56df76 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala @@ -31,7 +31,7 @@ import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor => BadRowProces import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload} import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents} import com.snowplowanalytics.snowplow.sinks.ListOfList -import com.snowplowanalytics.snowplow.lakes.{AppHealth, Environment, Metrics} +import com.snowplowanalytics.snowplow.lakes.{Environment, Metrics, RuntimeService} import com.snowplowanalytics.snowplow.runtime.processing.BatchUp import com.snowplowanalytics.snowplow.runtime.syntax.foldable._ import com.snowplowanalytics.snowplow.loaders.transform.{ @@ -253,7 +253,7 @@ object Processing { env.badSink .sinkSimple(ListOfList.of(List(serialized))) .onError { case _ => - env.appHealth.setServiceHealth(AppHealth.Service.BadSink, isHealthy = false) + env.appHealth.beUnhealthyForRuntimeService(RuntimeService.BadSink) } } else Applicative[F].unit diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Retrying.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Retrying.scala deleted file mode 100644 index 4df64578..00000000 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Retrying.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved. - * - * This software is made available by Snowplow Analytics, Ltd., - * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 - * located at https://docs.snowplow.io/limited-use-license-1.0 - * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION - * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. - */ - -package com.snowplowanalytics.snowplow.lakes.processing - -import cats.Applicative -import cats.effect.Sync -import cats.implicits._ - -import org.typelevel.log4cats.Logger -import org.typelevel.log4cats.slf4j.Slf4jLogger - -import retry._ -import retry.implicits.retrySyntaxError - -import com.snowplowanalytics.snowplow.lakes.{Alert, AppHealth, Config, DestinationSetupErrorCheck, Monitoring} - -object Retrying { - - private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] - - def withRetries[F[_]: Sync: Sleep, A]( - appHealth: AppHealth[F], - config: Config.Retries, - monitoring: Monitoring[F], - toAlert: List[String] => Alert, - destinationSetupErrorCheck: DestinationSetupErrorCheck - )( - action: F[A] - ): F[A] = - retryUntilSuccessful(appHealth, config, monitoring, toAlert, destinationSetupErrorCheck, action) <* - appHealth.setServiceHealth(AppHealth.Service.SparkWriter, isHealthy = true) - - private def retryUntilSuccessful[F[_]: Sync: Sleep, A]( - appHealth: AppHealth[F], - config: Config.Retries, - monitoring: Monitoring[F], - toAlert: List[String] => Alert, - destinationSetupErrorCheck: DestinationSetupErrorCheck, - action: F[A] - ): F[A] = - action - .onError(_ => appHealth.setServiceHealth(AppHealth.Service.SparkWriter, isHealthy = false)) - .retryingOnSomeErrors( - isWorthRetrying = checkingNestedExceptions(destinationSetupErrorCheck, _).nonEmpty.pure[F], - policy = policyForSetupErrors[F](config), - onError = logErrorAndSendAlert[F](monitoring, destinationSetupErrorCheck, toAlert, _, _) - ) - .retryingOnAllErrors( - policy = policyForTransientErrors[F](config), - onError = logError[F](_, _) - ) - - private def policyForSetupErrors[F[_]: Applicative](config: Config.Retries): RetryPolicy[F] = - RetryPolicies.exponentialBackoff[F](config.setupErrors.delay) - - private def policyForTransientErrors[F[_]: Applicative](config: Config.Retries): RetryPolicy[F] = - RetryPolicies.fullJitter[F](config.transientErrors.delay).join(RetryPolicies.limitRetries(config.transientErrors.attempts - 1)) - - private def logErrorAndSendAlert[F[_]: Sync]( - monitoring: Monitoring[F], - destinationSetupErrorCheck: DestinationSetupErrorCheck, - toAlert: List[String] => Alert, - error: Throwable, - details: RetryDetails - ): F[Unit] = - logError(error, details) *> monitoring.alert(toAlert(checkingNestedExceptions(destinationSetupErrorCheck, error))) - - private def logError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] = - Logger[F].error(error)(s"Executing command failed. ${extractRetryDetails(details)}") - - private def extractRetryDetails(details: RetryDetails): String = details match { - case RetryDetails.GivingUp(totalRetries, totalDelay) => - s"Giving up on retrying, total retries: $totalRetries, total delay: ${totalDelay.toSeconds} seconds" - case RetryDetails.WillDelayAndRetry(nextDelay, retriesSoFar, cumulativeDelay) => - s"Will retry in ${nextDelay.toMillis} milliseconds, retries so far: $retriesSoFar, total delay so far: ${cumulativeDelay.toMillis} milliseconds" - } - - // Returns a list of reasons of why this was a destination setup error. - // Or empty list if this was not caused by a destination setup error - private def checkingNestedExceptions( - destinationSetupErrorCheck: DestinationSetupErrorCheck, - t: Throwable - ): List[String] = - (destinationSetupErrorCheck(t), Option(t.getCause)) match { - case (Some(msg), Some(cause)) => msg :: checkingNestedExceptions(destinationSetupErrorCheck, cause) - case (Some(msg), None) => List(msg) - case (None, Some(cause)) => checkingNestedExceptions(destinationSetupErrorCheck, cause) - case (None, None) => Nil - } -} diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala index 54d901b5..7258f059 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.types.StructType import scala.concurrent.duration.{DurationInt, FiniteDuration} import com.snowplowanalytics.iglu.client.Resolver +import com.snowplowanalytics.snowplow.runtime.AppHealth import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} import com.snowplowanalytics.snowplow.sinks.Sink import com.snowplowanalytics.snowplow.lakes.processing.LakeWriter @@ -53,6 +54,10 @@ object MockEnvironment { case class AddedCommittedCountMetric(count: Int) extends Action case class SetLatencyMetric(latency: FiniteDuration) extends Action case class SetProcessingLatencyMetric(latency: FiniteDuration) extends Action + + /* Health */ + case class BecameUnhealthy(service: RuntimeService) extends Action + case class BecameHealthy(service: RuntimeService) extends Action } import Action._ @@ -69,8 +74,6 @@ object MockEnvironment { for { state <- Ref[IO].of(Vector.empty[Action]) source = testSourceAndAck(windows, state) - appHealth <- AppHealth.init(10.seconds, source) - _ <- appHealth.setServiceHealth(AppHealth.Service.BadSink, isHealthy = true) } yield { val env = Environment( appInfo = TestSparkEnvironment.appInfo, @@ -80,7 +83,7 @@ object MockEnvironment { httpClient = testHttpClient, lakeWriter = testLakeWriter(state), metrics = testMetrics(state), - appHealth = appHealth, + appHealth = testAppHealth(state), inMemBatchBytes = 1000000L, cpuParallelism = 1, windowing = EventProcessingConfig.TimedWindows(1.minute, 1.0, 1), @@ -158,4 +161,16 @@ object MockEnvironment { def report: Stream[IO, Nothing] = Stream.never[IO] } + + private def testAppHealth(ref: Ref[IO, Vector[Action]]): AppHealth.Interface[IO, Alert, RuntimeService] = + new AppHealth.Interface[IO, Alert, RuntimeService] { + def beHealthyForSetup: IO[Unit] = + IO.unit + def beUnhealthyForSetup(alert: Alert): IO[Unit] = + IO.unit + def beHealthyForRuntimeService(service: RuntimeService): IO[Unit] = + ref.update(_ :+ BecameHealthy(service)) + def beUnhealthyForRuntimeService(service: RuntimeService): IO[Unit] = + ref.update(_ :+ BecameUnhealthy(service)) + } } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala index 5479826d..f533678b 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala @@ -22,7 +22,7 @@ import com.snowplowanalytics.iglu.client.Resolver import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} import com.snowplowanalytics.snowplow.sinks.Sink import com.snowplowanalytics.snowplow.lakes.processing.LakeWriter -import com.snowplowanalytics.snowplow.runtime.AppInfo +import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, Retrying} object TestSparkEnvironment { @@ -33,10 +33,8 @@ object TestSparkEnvironment { ): Resource[IO, Environment[IO]] = for { testConfig <- Resource.pure(TestConfig.defaults(target, tmpDir)) source = testSourceAndAck(windows) - appHealth <- Resource.eval(AppHealth.init(10.seconds, source)) - _ <- Resource.eval(appHealth.setServiceHealth(AppHealth.Service.BadSink, isHealthy = true)) lakeWriter <- LakeWriter.build[IO](testConfig.spark, testConfig.output.good) - lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth, dummyMonitoring, retriesConfig, _ => None) + lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, dummyAppHealth, retriesConfig, PartialFunction.empty) } yield Environment( appInfo = appInfo, source = source, @@ -45,7 +43,7 @@ object TestSparkEnvironment { httpClient = testHttpClient, lakeWriter = lakeWriterWrapped, metrics = testMetrics, - appHealth = appHealth, + appHealth = dummyAppHealth, inMemBatchBytes = 1000000L, cpuParallelism = 1, windowing = EventProcessingConfig.TimedWindows(1.minute, 1.0, 1), @@ -55,14 +53,10 @@ object TestSparkEnvironment { ) private val retriesConfig = Config.Retries( - Config.SetupErrorRetries(30.seconds), - Config.TransientErrorRetries(1.second, 5) + Retrying.Config.ForSetup(30.seconds), + Retrying.Config.ForTransient(1.second, 5) ) - private val dummyMonitoring = new Monitoring[IO] { - override def alert(message: Alert): IO[Unit] = IO.unit - } - private def testSourceAndAck(windows: List[List[TokenedEvents]]): SourceAndAck[IO] = new SourceAndAck[IO] { def stream(config: EventProcessingConfig, processor: EventProcessor[IO]): Stream[IO, Nothing] = @@ -97,4 +91,15 @@ object TestSparkEnvironment { def report: Stream[IO, Nothing] = Stream.never[IO] } + def dummyAppHealth: AppHealth.Interface[IO, Any, Any] = new AppHealth.Interface[IO, Any, Any] { + def beHealthyForSetup: IO[Unit] = + IO.unit + def beUnhealthyForSetup(alert: Any): IO[Unit] = + IO.unit + def beHealthyForRuntimeService(service: Any): IO[Unit] = + IO.unit + def beUnhealthyForRuntimeService(service: Any): IO[Unit] = + IO.unit + } + } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriterSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriterSpec.scala index 313b1e23..fd2355dd 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriterSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriterSpec.scala @@ -19,11 +19,10 @@ import cats.effect.testing.specs2.CatsEffect import org.apache.spark.sql.types.StructType import org.apache.spark.sql.Row -import com.snowplowanalytics.snowplow.runtime.HealthProbe +import com.snowplowanalytics.snowplow.runtime.{AppHealth, Retrying} import com.snowplowanalytics.snowplow.lakes._ -import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck} -import scala.concurrent.duration.{DurationLong, FiniteDuration} +import scala.concurrent.duration.DurationLong class LakeWriterSpec extends Specification with CatsEffect { import LakeWriterSpec._ @@ -42,27 +41,22 @@ class LakeWriterSpec extends Specification with CatsEffect { def e1 = control().flatMap { c => val expected = Vector( - Action.CreateTableAttempted + Action.CreateTableAttempted, + Action.BecameHealthy(RuntimeService.SparkWriter), + Action.BecameHealthyForSetup ) val wrappedLakeWriter = LakeWriter.withHandledErrors( c.lakeWriter, c.appHealth, - c.monitoring, retriesConfig, dummyDestinationSetupErrorCheck ) for { - healthBefore <- c.appHealth.status _ <- wrappedLakeWriter.createTable - healthAfter <- c.appHealth.status state <- c.state.get - } yield List( - state should beEqualTo(expected), - healthBefore should beAnInstanceOf[HealthProbe.Unhealthy], - healthAfter should beEqualTo(HealthProbe.Healthy) - ).reduce(_ and _) + } yield state should beEqualTo(expected) } def e2 = { @@ -82,23 +76,16 @@ class LakeWriterSpec extends Specification with CatsEffect { val wrappedLakeWriter = LakeWriter.withHandledErrors( c.lakeWriter, c.appHealth, - c.monitoring, retriesConfig, - _ => Some("this is a setup error") + _ => "this is a setup error" ) val test = for { - healthBefore <- c.appHealth.status fiber <- wrappedLakeWriter.createTable.voidError.start _ <- IO.sleep(4.minutes) _ <- fiber.cancel - healthAfter <- c.appHealth.status state <- c.state.get - } yield List( - state should beEqualTo(expected), - healthBefore should beUnhealthy, - healthAfter should beUnhealthy - ).reduce(_ and _) + } yield state should beEqualTo(expected) TestControl.executeEmbed(test) } @@ -109,30 +96,28 @@ class LakeWriterSpec extends Specification with CatsEffect { control(mocks).flatMap { c => val expected = Vector( Action.CreateTableAttempted, + Action.BecameUnhealthy(RuntimeService.SparkWriter), + Action.CreateTableAttempted, + Action.BecameUnhealthy(RuntimeService.SparkWriter), Action.CreateTableAttempted, + Action.BecameUnhealthy(RuntimeService.SparkWriter), Action.CreateTableAttempted, + Action.BecameUnhealthy(RuntimeService.SparkWriter), Action.CreateTableAttempted, - Action.CreateTableAttempted + Action.BecameUnhealthy(RuntimeService.SparkWriter) ) val wrappedLakeWriter = LakeWriter.withHandledErrors( c.lakeWriter, c.appHealth, - c.monitoring, retriesConfig, dummyDestinationSetupErrorCheck ) val test = for { - healthBefore <- c.appHealth.status _ <- wrappedLakeWriter.createTable.voidError - healthAfter <- c.appHealth.status state <- c.state.get - } yield List( - state should beEqualTo(expected), - healthBefore should beUnhealthy, - healthAfter should beUnhealthy - ).reduce(_ and _) + } yield state should beEqualTo(expected) TestControl.executeEmbed(test) } @@ -144,27 +129,22 @@ class LakeWriterSpec extends Specification with CatsEffect { val expected = Vector( Action.CreateTableAttempted, Action.SentAlert(0L), - Action.CreateTableAttempted + Action.CreateTableAttempted, + Action.BecameHealthy(RuntimeService.SparkWriter), + Action.BecameHealthyForSetup ) val wrappedLakeWriter = LakeWriter.withHandledErrors( c.lakeWriter, c.appHealth, - c.monitoring, retriesConfig, - _ => Some("this is a setup error") + _ => "this is a setup error" ) val test = for { - healthBefore <- c.appHealth.status _ <- wrappedLakeWriter.createTable.voidError - healthAfter <- c.appHealth.status state <- c.state.get - } yield List( - state should beEqualTo(expected), - healthBefore should beUnhealthy, - healthAfter should beHealthy - ).reduce(_ and _) + } yield state should beEqualTo(expected) TestControl.executeEmbed(test) } @@ -175,27 +155,23 @@ class LakeWriterSpec extends Specification with CatsEffect { control(mocks).flatMap { c => val expected = Vector( Action.CreateTableAttempted, - Action.CreateTableAttempted + Action.BecameUnhealthy(RuntimeService.SparkWriter), + Action.CreateTableAttempted, + Action.BecameHealthy(RuntimeService.SparkWriter), + Action.BecameHealthyForSetup ) val wrappedLakeWriter = LakeWriter.withHandledErrors( c.lakeWriter, c.appHealth, - c.monitoring, retriesConfig, dummyDestinationSetupErrorCheck ) val test = for { - healthBefore <- c.appHealth.status _ <- wrappedLakeWriter.createTable.voidError - healthAfter <- c.appHealth.status state <- c.state.get - } yield List( - state should beEqualTo(expected), - healthBefore should beUnhealthy, - healthAfter should beHealthy - ).reduce(_ and _) + } yield state should beEqualTo(expected) TestControl.executeEmbed(test) } @@ -204,27 +180,21 @@ class LakeWriterSpec extends Specification with CatsEffect { def e6 = control().flatMap { c => val expected = Vector( - Action.CommitAttempted("testview") + Action.CommitAttempted("testview"), + Action.BecameHealthy(RuntimeService.SparkWriter) ) val wrappedLakeWriter = LakeWriter.withHandledErrors( c.lakeWriter, c.appHealth, - c.monitoring, retriesConfig, dummyDestinationSetupErrorCheck ) for { - healthBefore <- c.appHealth.status _ <- wrappedLakeWriter.commit("testview") - healthAfter <- c.appHealth.status state <- c.state.get - } yield List( - state should beEqualTo(expected), - healthBefore should beAnInstanceOf[HealthProbe.Unhealthy], - healthAfter should beEqualTo(HealthProbe.Healthy) - ).reduce(_ and _) + } yield state should beEqualTo(expected) } def e7 = { @@ -233,47 +203,24 @@ class LakeWriterSpec extends Specification with CatsEffect { control(mocks).flatMap { c => val expected = Vector( Action.CommitAttempted("testview1"), - Action.CommitAttempted("testview2") + Action.BecameHealthy(RuntimeService.SparkWriter), + Action.CommitAttempted("testview2"), + Action.BecameUnhealthy(RuntimeService.SparkWriter) ) val wrappedLakeWriter = LakeWriter.withHandledErrors( c.lakeWriter, c.appHealth, - c.monitoring, retriesConfig, dummyDestinationSetupErrorCheck ) for { _ <- wrappedLakeWriter.commit("testview1") - healthAfterFirst <- c.appHealth.status _ <- wrappedLakeWriter.commit("testview2").voidError - healthAfterSecond <- c.appHealth.status state <- c.state.get - } yield List( - state should beEqualTo(expected), - healthAfterFirst should beEqualTo(HealthProbe.Healthy), - healthAfterSecond should beAnInstanceOf[HealthProbe.Unhealthy] - ).reduce(_ and _) - } - } - - /** Convenience matchers for health probe * */ - - def beHealthy: org.specs2.matcher.Matcher[HealthProbe.Status] = { (status: HealthProbe.Status) => - val result = status match { - case HealthProbe.Healthy => true - case HealthProbe.Unhealthy(_) => false - } - (result, s"$status is not healthy") - } - - def beUnhealthy: org.specs2.matcher.Matcher[HealthProbe.Status] = { (status: HealthProbe.Status) => - val result = status match { - case HealthProbe.Healthy => false - case HealthProbe.Unhealthy(_) => true + } yield state should beEqualTo(expected) } - (result, s"$status is not unhealthy") } } @@ -285,6 +232,9 @@ object LakeWriterSpec { case object CreateTableAttempted extends Action case class CommitAttempted(viewName: String) extends Action case class SentAlert(timeSentSeconds: Long) extends Action + case class BecameUnhealthy(service: RuntimeService) extends Action + case class BecameHealthy(service: RuntimeService) extends Action + case object BecameHealthyForSetup extends Action } sealed trait Response @@ -298,44 +248,36 @@ object LakeWriterSpec { case class Control( state: Ref[IO, Vector[Action]], lakeWriter: LakeWriter[IO], - appHealth: AppHealth[IO], - monitoring: Monitoring[IO] + appHealth: AppHealth.Interface[IO, Alert, RuntimeService] ) val retriesConfig = Config.Retries( - Config.SetupErrorRetries(30.seconds), - Config.TransientErrorRetries(1.second, 5) + Retrying.Config.ForSetup(30.seconds), + Retrying.Config.ForTransient(1.second, 5) ) def control(mocks: Mocks = Mocks(Nil)): IO[Control] = for { state <- Ref[IO].of(Vector.empty[Action]) - appHealth <- testAppHealth tableManager <- testLakeWriter(state, mocks.lakeWriterResults) - } yield Control(state, tableManager, appHealth, testMonitoring(state)) - - private def testAppHealth: IO[AppHealth[IO]] = { - val healthySource = new SourceAndAck[IO] { - override def stream(config: EventProcessingConfig, processor: EventProcessor[IO]): fs2.Stream[IO, Nothing] = - fs2.Stream.empty + } yield Control(state, tableManager, testAppHealth(state)) - override def isHealthy(maxAllowedProcessingLatency: FiniteDuration): IO[SourceAndAck.HealthStatus] = - IO(SourceAndAck.Healthy) - } - AppHealth.init(10.seconds, healthySource).flatTap { appHealth => - appHealth.setServiceHealth(AppHealth.Service.BadSink, isHealthy = true) + private def testAppHealth(state: Ref[IO, Vector[Action]]): AppHealth.Interface[IO, Alert, RuntimeService] = + new AppHealth.Interface[IO, Alert, RuntimeService] { + def beHealthyForSetup: IO[Unit] = + state.update(_ :+ Action.BecameHealthyForSetup) + def beUnhealthyForSetup(alert: Alert): IO[Unit] = + for { + now <- IO.realTime + _ <- state.update(_ :+ Action.SentAlert(now.toSeconds)) + } yield () + def beHealthyForRuntimeService(service: RuntimeService): IO[Unit] = + state.update(_ :+ Action.BecameHealthy(service)) + def beUnhealthyForRuntimeService(service: RuntimeService): IO[Unit] = + state.update(_ :+ Action.BecameUnhealthy(service)) } - } - - private def testMonitoring(state: Ref[IO, Vector[Action]]): Monitoring[IO] = new Monitoring[IO] { - def alert(message: Alert): IO[Unit] = - for { - now <- IO.realTime - _ <- state.update(_ :+ Action.SentAlert(now.toSeconds)) - } yield () - } - private val dummyDestinationSetupErrorCheck: Throwable => Option[String] = _ => None + private val dummyDestinationSetupErrorCheck: PartialFunction[Throwable, String] = PartialFunction.empty private def testLakeWriter(state: Ref[IO, Vector[Action]], mocks: List[Response]): IO[LakeWriter[IO]] = for { diff --git a/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala b/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala index db2399e3..aa504ac8 100644 --- a/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala +++ b/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala @@ -19,5 +19,5 @@ object GcpApp extends LoaderApp[PubsubSourceConfig, PubsubSinkConfig](BuildInfo) override def badSink: SinkProvider = PubsubSink.resource(_) - override def isDestinationSetupError: DestinationSetupErrorCheck = _ => None + override def isDestinationSetupError: DestinationSetupErrorCheck = TableFormatSetupError.check } diff --git a/packaging/delta-iceberg/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala b/packaging/delta-iceberg/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala index a0e0037d..ddd354e8 100644 --- a/packaging/delta-iceberg/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala +++ b/packaging/delta-iceberg/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala @@ -15,15 +15,12 @@ import org.apache.iceberg.exceptions.{ForbiddenException => IcebergForbiddenExce object TableFormatSetupError { // Check if given exception is specific to iceberg format - def check(t: Throwable): Option[String] = - t match { + def check: PartialFunction[Throwable, String] = { case e: IcebergNotFoundException => // Glue catalog does not exist - Some(e.getMessage) + e.getMessage case e: IcebergForbiddenException => // No permission to create a table in Glue catalog - Some(e.getMessage) - case _ => - None + e.getMessage } } diff --git a/packaging/hudi/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala b/packaging/hudi/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala index d8b04b42..3c08e84e 100644 --- a/packaging/hudi/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala +++ b/packaging/hudi/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala @@ -15,11 +15,9 @@ import org.apache.hudi.hive.HoodieHiveSyncException object TableFormatSetupError { // Check if given exception is specific to hudi format - def check: Throwable => Option[String] = { + def check: PartialFunction[Throwable, String] = { case e: HoodieHiveSyncException if e.getMessage.contains("database does not exist") => // Glue database does not exist or no permission to see it - Some(e.getMessage) - case _ => - None + e.getMessage } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6afd7aff..123a0913 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -26,7 +26,6 @@ object Dependencies { // Scala val catsEffect = "3.5.4" - val catsRetry = "3.1.3" val decline = "2.4.1" val circe = "0.14.3" val http4s = "0.23.16" @@ -51,7 +50,7 @@ object Dependencies { val awsRegistry = "1.1.20" // Snowplow - val streams = "0.7.0" + val streams = "0.8.0-M2" val igluClient = "3.0.0" // Transitive overrides @@ -72,11 +71,10 @@ object Dependencies { } - val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry - val blazeClient = "org.http4s" %% "http4s-blaze-client" % V.http4s - val decline = "com.monovore" %% "decline-effect" % V.decline - val circeGenericExtra = "io.circe" %% "circe-generic-extras" % V.circe - val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % V.betterMonadicFor + val blazeClient = "org.http4s" %% "http4s-blaze-client" % V.http4s + val decline = "com.monovore" %% "decline-effect" % V.decline + val circeGenericExtra = "io.circe" %% "circe-generic-extras" % V.circe + val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % V.betterMonadicFor object Spark { val coreForIcebergDelta = "org.apache.spark" %% "spark-core" % V.Spark.forIcebergDelta @@ -150,7 +148,6 @@ object Dependencies { streamsCore, loaders, runtime, - catsRetry, delta % Provided, Spark.coreForIcebergDelta % Provided, Spark.sqlForIcebergDelta % Provided,