Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make alert messages more human-readable #75

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/aws/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
}
"spark": {
"conf": {
"fs.s3a.aws.credentials.provider": "com.snowplowanalytics.snowplow.lakes.AssumedRoleCredentialsProviderV1"
"fs.s3a.aws.credentials.provider": "com.snowplowanalytics.snowplow.lakes.AssumedRoleCredentialsProvider"
"fs.s3a.assumed.role.session.name": "snowplow-lake-loader"
"fs.s3a.assumed.role.session.duration": "1h"
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@

package com.snowplowanalytics.snowplow.lakes

import cats.implicits._
import org.apache.hadoop.fs.s3a.{CredentialInitializationException, UnknownStoreException}
import software.amazon.awssdk.services.s3.model.{NoSuchBucketException, S3Exception}
import software.amazon.awssdk.services.sts.model.StsException
import software.amazon.awssdk.services.glue.model.{AccessDeniedException => GlueAccessDeniedException}

import org.apache.hadoop.fs.s3a.UnknownStoreException
import software.amazon.awssdk.services.glue.model.{
AccessDeniedException => GlueAccessDeniedException,
EntityNotFoundException => GlueEntityNotFoundException
}

import java.nio.file.AccessDeniedException
import scala.util.matching.Regex

import com.snowplowanalytics.snowplow.sources.kinesis.{KinesisSource, KinesisSourceConfig}
import com.snowplowanalytics.snowplow.sinks.kinesis.{KinesisSink, KinesisSinkConfig}
Expand All @@ -27,26 +31,68 @@ object AwsApp extends LoaderApp[KinesisSourceConfig, KinesisSinkConfig](BuildInf

override def badSink: SinkProvider = KinesisSink.resource(_)

/**
* Identifies known exceptions relating to setup of the destination
*
* Exceptions are often "caused by" an underlying exception. For example, a s3a
* UnknownStoreException is often "caused by" a aws sdk NoSuchBucketException. Our implementation
* checks both the top exception and the underlying causes. Therefore in some cases we
* over-specify the exceptions to watch out for; the top exception and causal exception both match
*/
override def isDestinationSetupError: DestinationSetupErrorCheck = {

/** Exceptions raised by underlying AWS SDK * */
case _: NoSuchBucketException =>
// S3 bucket does not exist
true
case e: S3Exception if e.statusCode() >= 400 && e.statusCode() < 500 =>
Some("S3 bucket does not exist or we do not have permissions to see it exists")
case e: S3Exception if e.statusCode() === 403 =>
// No permission to read from S3 bucket or to write to S3 bucket
true
case _: GlueAccessDeniedException =>
Some("Missing permissions to perform this action on S3 bucket")
case e: S3Exception if e.statusCode() === 301 =>
// Misconfigured AWS region
Some("S3 bucket is not in the expected region")
case e: GlueAccessDeniedException =>
// No permission to read from Glue catalog
true
case _: StsException =>
Some(Option(e.getMessage).getOrElse("Missing permissions to perform this action on Glue catalog"))
case _: GlueEntityNotFoundException =>
// Glue database does not exist
Some("Glue resource does not exist or no permission to see it exists")
case e: StsException if e.statusCode() === 403 =>
// No permission to assume the role given to authenticate to S3/Glue
true
case _: UnknownStoreException =>
// no such bucket exist
true
case _: AccessDeniedException =>
// 1 - s3 bucket's permission policy denies all actions
// 2 - not authorized to assume the role
true
Some("Missing permissions to assume the AWS IAM role")

/** Exceptions raised via hadoop's s3a filesystem * */
case e: UnknownStoreException =>
// S3 bucket does not exist or no permission to see it exists
stripCauseDetails(e)
case e: AccessDeniedException =>
// 1 - No permission to put object on the bucket
// 2 - No permission to assume the role given to authenticate to S3
stripCauseDetails(e)
case _: CredentialInitializationException =>
Some("Failed to initialize AWS access credentials")

/** Exceptions common to the table format - Delta/Iceberg/Hudi * */
case t => TableFormatSetupError.check(t)
}

/**
* Fixes hadoop Exception messages to be more reader-friendly
*
* Hadoop exception messages often add the exception's cause to the exception's message.
*
* E.g. "<HELPFUL MESSAGE>: <CAUSE CLASSNAME>: <CAUSE MESSAGE>"
*
* In order to have better control of the message sent to the webhook, we remove the cause details
* here, and add back in pertinent cause information later.
*/
private def stripCauseDetails(t: Throwable): Option[String] =
(Option(t.getMessage), Option(t.getCause)) match {
case (Some(message), Some(cause)) =>
val toRemove = new Regex(":? *" + Regex.quote(cause.toString) + ".*")
val replaced = toRemove.replaceAllIn(message, "")
Some(replaced)
case (other, _) =>
other
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ object AzureApp extends LoaderApp[KafkaSourceConfig, KafkaSinkConfig](BuildInfo)

override def badSink: SinkProvider = KafkaSink.resource(_, classTag[SinkAuthHandler])

override def isDestinationSetupError: DestinationSetupErrorCheck = _ => false
override def isDestinationSetupError: DestinationSetupErrorCheck = _ => None
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ import com.snowplowanalytics.snowplow.runtime.AppInfo
import io.circe.Json
import io.circe.syntax.EncoderOps

import java.sql.SQLException

sealed trait Alert
object Alert {

/** Restrict the length of an alert message to be compliant with alert iglu schema */
private val MaxAlertPayloadLength = 4096

final case class FailedToCreateEventsTable(cause: Throwable) extends Alert
final case class FailedToCreateEventsTable(causes: List[String]) extends Alert
final case class FailedToCommitEvents(causes: List[String]) extends Alert

def toSelfDescribingJson(
alert: Alert,
Expand All @@ -47,13 +46,14 @@ object Alert {

private def getMessage(alert: Alert): String = {
val full = alert match {
case FailedToCreateEventsTable(cause) => show"Failed to create events table: $cause"
case FailedToCreateEventsTable(causes) => show"Failed to create events table: $causes"
case FailedToCommitEvents(causes) => show"Failed to write events into table: $causes"
}

full.take(MaxAlertPayloadLength)
}

private implicit def throwableShow: Show[Throwable] = {
private implicit def causesShow: Show[List[String]] = {
def removeDuplicateMessages(in: List[String]): List[String] =
in match {
case h :: t :: rest =>
Expand All @@ -63,19 +63,8 @@ object Alert {
case fewer => fewer
}

def accumulateMessages(t: Throwable): List[String] = {
val nextMessage = t match {
case t: SQLException => Some(s"${t.getMessage} = SqlState: ${t.getSQLState}")
case t => Option(t.getMessage)
}
Option(t.getCause) match {
case Some(cause) => nextMessage.toList ::: accumulateMessages(cause)
case None => nextMessage.toList
}
}

Show.show { t =>
removeDuplicateMessages(accumulateMessages(t)).mkString(": ")
Show.show { causes =>
removeDuplicateMessages(causes).mkString(": ")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ package com.snowplowanalytics.snowplow
package object lakes {
type AnyConfig = Config[Any, Any]

// Type for the function that checks whether given exception
// is one of the destination setup errors
type DestinationSetupErrorCheck = Throwable => Boolean
/**
* Function that checks whether an exception is due to a destination setup error
*
* If an exception was caused by a destination setup error, then it should return a short
* human-friendly description of the problem. For any other exception it should return None.
*
* A DestinationSetupErrorCheck should check the top-level exception only; it should NOT check
* `getCause`. Because our application code already checks the causes.
*/
type DestinationSetupErrorCheck = Throwable => Option[String]
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object Retrying {
appHealth: AppHealth[F],
config: Config.Retries,
monitoring: Monitoring[F],
toAlert: Throwable => Alert,
toAlert: List[String] => Alert,
destinationSetupErrorCheck: DestinationSetupErrorCheck
)(
action: F[A]
Expand All @@ -42,16 +42,16 @@ object Retrying {
appHealth: AppHealth[F],
config: Config.Retries,
monitoring: Monitoring[F],
toAlert: Throwable => Alert,
toAlert: List[String] => Alert,
destinationSetupErrorCheck: DestinationSetupErrorCheck,
action: F[A]
): F[A] =
action
.onError(_ => appHealth.setServiceHealth(AppHealth.Service.SparkWriter, isHealthy = false))
.retryingOnSomeErrors(
isWorthRetrying = destinationSetupErrorCheck(_).pure[F],
isWorthRetrying = checkingNestedExceptions(destinationSetupErrorCheck, _).nonEmpty.pure[F],
policy = policyForSetupErrors[F](config),
onError = logErrorAndSendAlert[F](monitoring, toAlert, _, _)
onError = logErrorAndSendAlert[F](monitoring, destinationSetupErrorCheck, toAlert, _, _)
)
.retryingOnAllErrors(
policy = policyForTransientErrors[F](config),
Expand All @@ -66,11 +66,12 @@ object Retrying {

private def logErrorAndSendAlert[F[_]: Sync](
monitoring: Monitoring[F],
toAlert: Throwable => Alert,
destinationSetupErrorCheck: DestinationSetupErrorCheck,
toAlert: List[String] => Alert,
error: Throwable,
details: RetryDetails
): F[Unit] =
logError(error, details) *> monitoring.alert(toAlert(error))
logError(error, details) *> monitoring.alert(toAlert(checkingNestedExceptions(destinationSetupErrorCheck, error)))

private def logError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] =
Logger[F].error(error)(s"Executing command failed. ${extractRetryDetails(details)}")
Expand All @@ -81,4 +82,17 @@ object Retrying {
case RetryDetails.WillDelayAndRetry(nextDelay, retriesSoFar, cumulativeDelay) =>
s"Will retry in ${nextDelay.toMillis} milliseconds, retries so far: $retriesSoFar, total delay so far: ${cumulativeDelay.toMillis} milliseconds"
}

// Returns a list of reasons of why this was a destination setup error.
// Or empty list if this was not caused by a destination setup error
private def checkingNestedExceptions(
destinationSetupErrorCheck: DestinationSetupErrorCheck,
t: Throwable
): List[String] =
(destinationSetupErrorCheck(t), Option(t.getCause)) match {
case (Some(msg), Some(cause)) => msg :: checkingNestedExceptions(destinationSetupErrorCheck, cause)
case (Some(msg), None) => List(msg)
case (None, Some(cause)) => checkingNestedExceptions(destinationSetupErrorCheck, cause)
case (None, None) => Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class HudiWriter(config: Config.Hudi) extends Writer {
Sync[F].blocking {
// This action does not have any effect beyond the internals of this loader.
// It is required to prevent later exceptions for an unknown database.
spark.sql(s"CREATE DATABASE $db")
spark.sql(s"CREATE DATABASE IF NOT EXISTS $db")
}.void
case None =>
Sync[F].unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object TestSparkEnvironment {
appHealth <- Resource.eval(AppHealth.init(10.seconds, source))
_ <- Resource.eval(appHealth.setServiceHealth(AppHealth.Service.BadSink, isHealthy = true))
lakeWriter <- LakeWriter.build[IO](testConfig.spark, testConfig.output.good)
lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth, dummyMonitoring, retriesConfig, _ => false)
lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth, dummyMonitoring, retriesConfig, _ => None)
} yield Environment(
appInfo = appInfo,
source = source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class LakeWriterSpec extends Specification with CatsEffect {
def is = s2"""
The lake writer should:
become healthy after creating the table $e1
retry adding columns and send alerts when there is a setup exception $e2
retry adding columns if there is a transient exception, with limited number of attempts and no monitoring alerts $e3
retry creating table and send alerts when there is a setup exception $e2
retry creating table if there is a transient exception, with limited number of attempts and no monitoring alerts $e3
become healthy after recovering from an earlier setup error $e4
become healthy after recovering from an earlier transient error $e5
become healthy after committing to the lake $e6
Expand Down Expand Up @@ -84,7 +84,7 @@ class LakeWriterSpec extends Specification with CatsEffect {
c.appHealth,
c.monitoring,
retriesConfig,
_ => true
_ => Some("this is a setup error")
)

val test = for {
Expand Down Expand Up @@ -120,7 +120,7 @@ class LakeWriterSpec extends Specification with CatsEffect {
c.appHealth,
c.monitoring,
retriesConfig,
_ => false
dummyDestinationSetupErrorCheck
)

val test = for {
Expand Down Expand Up @@ -152,7 +152,7 @@ class LakeWriterSpec extends Specification with CatsEffect {
c.appHealth,
c.monitoring,
retriesConfig,
_ => true
_ => Some("this is a setup error")
)

val test = for {
Expand Down Expand Up @@ -183,7 +183,7 @@ class LakeWriterSpec extends Specification with CatsEffect {
c.appHealth,
c.monitoring,
retriesConfig,
_ => false
dummyDestinationSetupErrorCheck
)

val test = for {
Expand Down Expand Up @@ -335,7 +335,7 @@ object LakeWriterSpec {
} yield ()
}

private val dummyDestinationSetupErrorCheck: Throwable => Boolean = _ => false
private val dummyDestinationSetupErrorCheck: Throwable => Option[String] = _ => None

private def testLakeWriter(state: Ref[IO, Vector[Action]], mocks: List[Response]): IO[LakeWriter[IO]] =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ object GcpApp extends LoaderApp[PubsubSourceConfig, PubsubSinkConfig](BuildInfo)

override def badSink: SinkProvider = PubsubSink.resource(_)

override def isDestinationSetupError: DestinationSetupErrorCheck = _ => false
override def isDestinationSetupError: DestinationSetupErrorCheck = _ => None
}
Loading
Loading