Skip to content

Commit

Permalink
Make alert messages more human-readable
Browse files Browse the repository at this point in the history
The webhook alert should contain a short helpful message explaining why
an error is caused by the destination setup.  In other snowplow loaders
we get the message simply by serializing the Exception.  But in Lake
Loader I found the exception messages to be very messy.

In a related problem, for Hudi setup errors I needed to traverse the
Exception's `getCause` in order to check if it was a setup error.

This PR takes more explicit control of setting short friendly error
messages, and traversing the `getCause` to get all relevant messages.

E.g. an alert message before this change:

> Failed to create events table: s3a://<REDACTED/events/_delta_log: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by V1ToV2AwsCredentialProviderAdapter : software.amazon.awssdk.services.sts.model.StsException: User: arn:aws:iam::<REDACTED>:user/<REDACTED> is not authorized to perform: sts:AssumeRole on resource: arn:aws:iam::<REDACTED>:role/<REDACTED> (Service: Sts, Status Code: 403, Request ID: 00000000-0000-0000-0000-000000000000)

The corresponding alert after this change:

> Failed to create events table: s3a://<REDACTED/events/_delta_log: Failed to initialize AWS access credentials: Missing permissions to assume the AWS IAM role

**Other small changes I snuck into this commit:**

- Added specific webhook alerts for Hudi.
- Removed the AssumedRoleCredentialsProvider for aws sdk v1.  This is no
  longer needed now that Hadoop is fully using aws sdk v2.
- Added retrying and alerting for committing events to the lake.
  • Loading branch information
istreeter committed Aug 5, 2024
1 parent cd920c4 commit b7f9565
Show file tree
Hide file tree
Showing 15 changed files with 132 additions and 123 deletions.
2 changes: 1 addition & 1 deletion modules/aws/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
}
"spark": {
"conf": {
"fs.s3a.aws.credentials.provider": "com.snowplowanalytics.snowplow.lakes.AssumedRoleCredentialsProviderV1"
"fs.s3a.aws.credentials.provider": "com.snowplowanalytics.snowplow.lakes.AssumedRoleCredentialsProvider"
"fs.s3a.assumed.role.session.name": "snowplow-lake-loader"
"fs.s3a.assumed.role.session.duration": "1h"
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@

package com.snowplowanalytics.snowplow.lakes

import cats.implicits._
import org.apache.hadoop.fs.s3a.{CredentialInitializationException, UnknownStoreException}
import software.amazon.awssdk.services.s3.model.{NoSuchBucketException, S3Exception}
import software.amazon.awssdk.services.sts.model.StsException
import software.amazon.awssdk.services.glue.model.{AccessDeniedException => GlueAccessDeniedException}

import org.apache.hadoop.fs.s3a.UnknownStoreException
import software.amazon.awssdk.services.glue.model.{
AccessDeniedException => GlueAccessDeniedException,
EntityNotFoundException => GlueEntityNotFoundException
}

import java.nio.file.AccessDeniedException
import scala.util.matching.Regex

import com.snowplowanalytics.snowplow.sources.kinesis.{KinesisSource, KinesisSourceConfig}
import com.snowplowanalytics.snowplow.sinks.kinesis.{KinesisSink, KinesisSinkConfig}
Expand All @@ -27,26 +31,68 @@ object AwsApp extends LoaderApp[KinesisSourceConfig, KinesisSinkConfig](BuildInf

override def badSink: SinkProvider = KinesisSink.resource(_)

/**
* Identifies known exceptions relating to setup of the destination
*
* Exceptions are often "caused by" an underlying exception. For example, a s3a
* UnknownStoreException is often "caused by" a aws sdk NoSuchBucketException. Our implementation
* checks both the top exception and the underlying causes. Therefore in some cases we
* over-specify the exceptions to watch out for; the top exception and causal exception both match
*/
override def isDestinationSetupError: DestinationSetupErrorCheck = {

/** Exceptions raised by underlying AWS SDK * */
case _: NoSuchBucketException =>
// S3 bucket does not exist
true
case e: S3Exception if e.statusCode() >= 400 && e.statusCode() < 500 =>
Some("S3 bucket does not exist or we do not have permissions to see it exists")
case e: S3Exception if e.statusCode() === 403 =>
// No permission to read from S3 bucket or to write to S3 bucket
true
case _: GlueAccessDeniedException =>
Some("Missing permissions to perform this action on S3 bucket")
case e: S3Exception if e.statusCode() === 301 =>
// Misconfigured AWS region
Some("S3 bucket is not in the expected region")
case e: GlueAccessDeniedException =>
// No permission to read from Glue catalog
true
case _: StsException =>
Some(Option(e.getMessage).getOrElse("Missing permissions to perform this action on Glue catalog"))
case _: GlueEntityNotFoundException =>
// Glue database does not exist
Some("Glue resource does not exist or no permission to see it exists")
case e: StsException if e.statusCode() === 403 =>
// No permission to assume the role given to authenticate to S3/Glue
true
case _: UnknownStoreException =>
// no such bucket exist
true
case _: AccessDeniedException =>
// 1 - s3 bucket's permission policy denies all actions
// 2 - not authorized to assume the role
true
Some("Missing permissions to assume the AWS IAM role")

/** Exceptions raised via hadoop's s3a filesystem * */
case e: UnknownStoreException =>
// S3 bucket does not exist or no permission to see it exists
stripCauseDetails(e)
case e: AccessDeniedException =>
// 1 - No permission to put object on the bucket
// 2 - No permission to assume the role given to authenticate to S3
stripCauseDetails(e)
case _: CredentialInitializationException =>
Some("Failed to initialize AWS access credentials")

/** Exceptions common to the table format - Delta/Iceberg/Hudi * */
case t => TableFormatSetupError.check(t)
}

/**
* Fixes hadoop Exception messages to be more reader-friendly
*
* Hadoop exception messages often add the exception's cause to the exception's message.
*
* E.g. "<HELPFUL MESSAGE>: <CAUSE CLASSNAME>: <CAUSE MESSAGE>"
*
* In order to have better control of the message sent to the webhook, we remove the cause details
* here, and add back in pertinent cause information later.
*/
private def stripCauseDetails(t: Throwable): Option[String] =
(Option(t.getMessage), Option(t.getCause)) match {
case (Some(message), Some(cause)) =>
val toRemove = new Regex(":? *" + Regex.quote(cause.toString) + ".*")
val replaced = toRemove.replaceAllIn(message, "")
Some(replaced)
case (other, _) =>
other
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ object AzureApp extends LoaderApp[KafkaSourceConfig, KafkaSinkConfig](BuildInfo)

override def badSink: SinkProvider = KafkaSink.resource(_, classTag[SinkAuthHandler])

override def isDestinationSetupError: DestinationSetupErrorCheck = _ => false
override def isDestinationSetupError: DestinationSetupErrorCheck = _ => None
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ import com.snowplowanalytics.snowplow.runtime.AppInfo
import io.circe.Json
import io.circe.syntax.EncoderOps

import java.sql.SQLException

sealed trait Alert
object Alert {

/** Restrict the length of an alert message to be compliant with alert iglu schema */
private val MaxAlertPayloadLength = 4096

final case class FailedToCreateEventsTable(cause: Throwable) extends Alert
final case class FailedToCreateEventsTable(causes: List[String]) extends Alert
final case class FailedToCommitEvents(causes: List[String]) extends Alert

def toSelfDescribingJson(
alert: Alert,
Expand All @@ -47,13 +46,14 @@ object Alert {

private def getMessage(alert: Alert): String = {
val full = alert match {
case FailedToCreateEventsTable(cause) => show"Failed to create events table: $cause"
case FailedToCreateEventsTable(causes) => show"Failed to create events table: $causes"
case FailedToCommitEvents(causes) => show"Failed to write events into table: $causes"
}

full.take(MaxAlertPayloadLength)
}

private implicit def throwableShow: Show[Throwable] = {
private implicit def causesShow: Show[List[String]] = {
def removeDuplicateMessages(in: List[String]): List[String] =
in match {
case h :: t :: rest =>
Expand All @@ -63,19 +63,8 @@ object Alert {
case fewer => fewer
}

def accumulateMessages(t: Throwable): List[String] = {
val nextMessage = t match {
case t: SQLException => Some(s"${t.getMessage} = SqlState: ${t.getSQLState}")
case t => Option(t.getMessage)
}
Option(t.getCause) match {
case Some(cause) => nextMessage.toList ::: accumulateMessages(cause)
case None => nextMessage.toList
}
}

Show.show { t =>
removeDuplicateMessages(accumulateMessages(t)).mkString(": ")
Show.show { causes =>
removeDuplicateMessages(causes).mkString(": ")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ package com.snowplowanalytics.snowplow
package object lakes {
type AnyConfig = Config[Any, Any]

// Type for the function that checks whether given exception
// is one of the destination setup errors
type DestinationSetupErrorCheck = Throwable => Boolean
/**
* Function that checks whether an exception is due to a destination setup error
*
* If an exception was caused by a destination setup error, then it should return a short
* human-friendly description of the problem. For any other exception it should return None.
*
* A DestinationSetupErrorCheck should check the top-level exception only; it should NOT check
* `getCause`. Because our application code already checks the causes.
*/
type DestinationSetupErrorCheck = Throwable => Option[String]
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,9 @@ object LakeWriter {
underlying.removeDataFrameFromDisk(viewName)

def commit(viewName: String): F[Unit] =
underlying
.commit(viewName)
.onError { case _ =>
appHealth.setServiceHealth(AppHealth.Service.SparkWriter, isHealthy = false)
} <* appHealth.setServiceHealth(AppHealth.Service.SparkWriter, isHealthy = true)
Retrying.withRetries(appHealth, retries, monitoring, Alert.FailedToCommitEvents, destinationSetupErrorCheck) {
underlying.commit(viewName)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object Retrying {
appHealth: AppHealth[F],
config: Config.Retries,
monitoring: Monitoring[F],
toAlert: Throwable => Alert,
toAlert: List[String] => Alert,
destinationSetupErrorCheck: DestinationSetupErrorCheck
)(
action: F[A]
Expand All @@ -42,16 +42,16 @@ object Retrying {
appHealth: AppHealth[F],
config: Config.Retries,
monitoring: Monitoring[F],
toAlert: Throwable => Alert,
toAlert: List[String] => Alert,
destinationSetupErrorCheck: DestinationSetupErrorCheck,
action: F[A]
): F[A] =
action
.onError(_ => appHealth.setServiceHealth(AppHealth.Service.SparkWriter, isHealthy = false))
.retryingOnSomeErrors(
isWorthRetrying = destinationSetupErrorCheck(_).pure[F],
isWorthRetrying = checkingNestedExceptions(destinationSetupErrorCheck, _).nonEmpty.pure[F],
policy = policyForSetupErrors[F](config),
onError = logErrorAndSendAlert[F](monitoring, toAlert, _, _)
onError = logErrorAndSendAlert[F](monitoring, destinationSetupErrorCheck, toAlert, _, _)
)
.retryingOnAllErrors(
policy = policyForTransientErrors[F](config),
Expand All @@ -66,11 +66,12 @@ object Retrying {

private def logErrorAndSendAlert[F[_]: Sync](
monitoring: Monitoring[F],
toAlert: Throwable => Alert,
destinationSetupErrorCheck: DestinationSetupErrorCheck,
toAlert: List[String] => Alert,
error: Throwable,
details: RetryDetails
): F[Unit] =
logError(error, details) *> monitoring.alert(toAlert(error))
logError(error, details) *> monitoring.alert(toAlert(checkingNestedExceptions(destinationSetupErrorCheck, error)))

private def logError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] =
Logger[F].error(error)(s"Executing command failed. ${extractRetryDetails(details)}")
Expand All @@ -81,4 +82,17 @@ object Retrying {
case RetryDetails.WillDelayAndRetry(nextDelay, retriesSoFar, cumulativeDelay) =>
s"Will retry in ${nextDelay.toMillis} milliseconds, retries so far: $retriesSoFar, total delay so far: ${cumulativeDelay.toMillis} milliseconds"
}

// Returns a list of reasons of why this was a destination setup error.
// Or empty list if this was not caused by a destination setup error
private def checkingNestedExceptions(
destinationSetupErrorCheck: DestinationSetupErrorCheck,
t: Throwable
): List[String] =
(destinationSetupErrorCheck(t), Option(t.getCause)) match {
case (Some(msg), Some(cause)) => msg :: checkingNestedExceptions(destinationSetupErrorCheck, cause)
case (Some(msg), None) => List(msg)
case (None, Some(cause)) => checkingNestedExceptions(destinationSetupErrorCheck, cause)
case (None, None) => Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class HudiWriter(config: Config.Hudi) extends Writer {
Sync[F].blocking {
// This action does not have any effect beyond the internals of this loader.
// It is required to prevent later exceptions for an unknown database.
spark.sql(s"CREATE DATABASE $db")
spark.sql(s"CREATE DATABASE IF NOT EXISTS $db")
}.void
case None =>
Sync[F].unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object TestSparkEnvironment {
appHealth <- Resource.eval(AppHealth.init(10.seconds, source))
_ <- Resource.eval(appHealth.setServiceHealth(AppHealth.Service.BadSink, isHealthy = true))
lakeWriter <- LakeWriter.build[IO](testConfig.spark, testConfig.output.good)
lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth, dummyMonitoring, retriesConfig, _ => false)
lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth, dummyMonitoring, retriesConfig, _ => None)
} yield Environment(
appInfo = appInfo,
source = source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class LakeWriterSpec extends Specification with CatsEffect {
c.appHealth,
c.monitoring,
retriesConfig,
_ => true
_ => Some("this is a setup error")
)

val test = for {
Expand Down Expand Up @@ -120,7 +120,7 @@ class LakeWriterSpec extends Specification with CatsEffect {
c.appHealth,
c.monitoring,
retriesConfig,
_ => false
dummyDestinationSetupErrorCheck
)

val test = for {
Expand Down Expand Up @@ -152,7 +152,7 @@ class LakeWriterSpec extends Specification with CatsEffect {
c.appHealth,
c.monitoring,
retriesConfig,
_ => true
_ => Some("this is a setup error")
)

val test = for {
Expand Down Expand Up @@ -183,7 +183,7 @@ class LakeWriterSpec extends Specification with CatsEffect {
c.appHealth,
c.monitoring,
retriesConfig,
_ => false
dummyDestinationSetupErrorCheck
)

val test = for {
Expand Down Expand Up @@ -335,7 +335,7 @@ object LakeWriterSpec {
} yield ()
}

private val dummyDestinationSetupErrorCheck: Throwable => Boolean = _ => false
private val dummyDestinationSetupErrorCheck: Throwable => Option[String] = _ => None

private def testLakeWriter(state: Ref[IO, Vector[Action]], mocks: List[Response]): IO[LakeWriter[IO]] =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ object GcpApp extends LoaderApp[PubsubSourceConfig, PubsubSinkConfig](BuildInfo)

override def badSink: SinkProvider = PubsubSink.resource(_)

override def isDestinationSetupError: DestinationSetupErrorCheck = _ => false
override def isDestinationSetupError: DestinationSetupErrorCheck = _ => None
}
Loading

0 comments on commit b7f9565

Please sign in to comment.