Skip to content

Commit

Permalink
common-streams 0.8.0-M4 with refactored kinesis source
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Sep 18, 2024
1 parent 972e471 commit 9abe8c8
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 15 deletions.
11 changes: 7 additions & 4 deletions config/config.aws.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@
"maxRecords": 1000
}

# -- The number of batches of events which are pre-fetched from kinesis.
# -- Increasing this above 1 is not known to improve performance.
"bufferSize": 3

# -- Name of this KCL worker used in the dynamodb lease table
"workerIdentifier": ${HOSTNAME}

Expand Down Expand Up @@ -219,6 +215,13 @@
# -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct.
"respectIgluNullability": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down
7 changes: 7 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@
# -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct.
"respectIgluNullability": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down
7 changes: 7 additions & 0 deletions config/config.gcp.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@
# -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct.
"respectIgluNullability": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down
4 changes: 4 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@
}
}

"http": {
"client": ${snowplow.defaults.http.client}
}

"skipSchemas": []
"respectIgluNullability": true
"exitOnMissingIgluSchema": true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.concurrent.duration.FiniteDuration

import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, HttpClient, Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDecoder
import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._

Expand All @@ -40,7 +40,8 @@ case class Config[+Source, +Sink](
skipSchemas: List[SchemaCriterion],
respectIgluNullability: Boolean,
exitOnMissingIgluSchema: Boolean,
retries: Config.Retries
retries: Config.Retries,
http: Config.Http
)

object Config {
Expand Down Expand Up @@ -129,6 +130,8 @@ object Config {
transientErrors: Retrying.Config.ForTransient
)

case class Http(client: HttpClient.Config)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
implicit val configuration = Configuration.default.withDiscriminator("type")
implicit val sinkWithMaxSize = for {
Expand All @@ -151,6 +154,7 @@ object Config {
implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]
implicit val httpDecoder = deriveConfiguredDecoder[Http]

// TODO add specific lake-loader docs for license
implicit val licenseDecoder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ package com.snowplowanalytics.snowplow.lakes

import cats.implicits._
import cats.effect.{Async, Resource, Sync}
import cats.effect.unsafe.implicits.global
import org.http4s.client.Client
import org.http4s.blaze.client.BlazeClientBuilder
import io.sentry.Sentry

import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, SourceAndAck}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.lakes.processing.LakeWriter
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, HttpClient, Webhook}

/**
* Resources and runtime-derived configuration needed for processing events
Expand Down Expand Up @@ -72,7 +70,7 @@ object Environment {
sourceReporter = sourceAndAck.isHealthy(config.main.monitoring.healthProbe.unhealthyLatency).map(_.showIfUnhealthy)
appHealth <- Resource.eval(AppHealth.init[F, Alert, RuntimeService](List(sourceReporter)))
resolver <- mkResolver[F](config.iglu)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
httpClient <- HttpClient.resource[F](config.main.http.client)
_ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, appHealth)
_ <- Webhook.resource(config.main.monitoring.webhook, appInfo, httpClient, appHealth)
badSink <-
Expand Down
8 changes: 3 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ object Dependencies {
val awsRegistry = "1.1.20"

// Snowplow
val streams = "0.8.0-M2"
val igluClient = "3.0.0"
val streams = "0.8.0-M4"
val igluClient = "3.2.0"

// Transitive overrides
val protobuf = "3.25.1"
Expand All @@ -71,7 +71,6 @@ object Dependencies {

}

val blazeClient = "org.http4s" %% "http4s-blaze-client" % V.http4s
val decline = "com.monovore" %% "decline-effect" % V.decline
val circeGenericExtra = "io.circe" %% "circe-generic-extras" % V.circe
val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % V.betterMonadicFor
Expand Down Expand Up @@ -153,7 +152,6 @@ object Dependencies {
Spark.sqlForIcebergDelta % Provided,
iceberg % Provided,
igluClientHttp4s,
blazeClient,
decline,
sentry,
circeGenericExtra,
Expand All @@ -170,7 +168,7 @@ object Dependencies {
awsS3,
awsGlue,
awsS3Transfer % Runtime,
awsSts % Runtime,
awsSts,
hadoopClient
) ++ commonRuntimeDependencies

Expand Down

0 comments on commit 9abe8c8

Please sign in to comment.