Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Readside exponential backoff #329

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ object NettyHelper {
// decide on negotiation type
val negotiationType: NegotiationType = if (useTls) TLS else PLAINTEXT

NettyChannelBuilder.forAddress(host, port).negotiationType(negotiationType)
NettyChannelBuilder.forAddress(host, port).enableRetry().negotiationType(negotiationType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,15 @@ object ServiceBootstrapper {
system: ActorSystem[_],
cosConfig: CosConfig,
interceptors: Seq[ClientInterceptor]): Unit = {

val readSideEc: ExecutionContext = TracedExecutorService.get()

// if read side is enabled
if (cosConfig.enableReadSide) {
// instantiate a read side manager
val readSideManager: ReadSideManager =
ReadSideManager(system = system, interceptors = interceptors, numShards = cosConfig.eventsConfig.numShards)
ReadSideManager(system = system, interceptors = interceptors, numShards = cosConfig.eventsConfig.numShards)(
readSideEc)
// initialize all configured read sides
readSideManager.init()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import org.slf4j.{ Logger, LoggerFactory }

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.util.{ Failure, Success, Try }

/**
Expand All @@ -26,7 +29,7 @@ import scala.util.{ Failure, Success, Try }
*/
private[readside] class ReadSideHandlerImpl(
processorId: String,
readSideHandlerServiceBlockingStub: ReadSideHandlerServiceBlockingStub)
readSideHandlerServiceBlockingStub: ReadSideHandlerServiceBlockingStub)(implicit val ec: ExecutionContext)
extends ReadSideHandler {

private val COS_EVENT_TAG_HEADER = "x-cos-event-tag"
Expand All @@ -44,7 +47,7 @@ private[readside] class ReadSideHandlerImpl(
* @param meta the additional meta data
* @return an eventual HandleReadSideResponse
*/
def processEvent(
override def doProcessEvent(
event: com.google.protobuf.any.Any,
eventTag: String,
resultingState: com.google.protobuf.any.Any,
Expand Down Expand Up @@ -103,27 +106,55 @@ private[readside] class ReadSideHandlerImpl(
}
}

/**
* Processes events read from the Journal
*
* @param event the actual event
* @param eventTag the event tag
* @param resultingState the resulting state of the applied event
* @param meta the additional meta data
* @return an eventual HandleReadSideResponse
*/
private[readside] trait ReadSideHandler {

implicit val ec: ExecutionContext

/**
* handles a read side message
* Processes events read from the Journal.
* Exponentially backoff with a 10& gain modifier until it reaches the upper threshold of maxBackoffSeconds.
*
* @param event
* @param eventTag
* @param resultingState
* @param meta
* @return
* @param event the actual event
* @param eventTag the event tag
* @param resultingState the resulting state of the applied event
* @param meta the additional meta data
* @param policy the retry policy. If provided, overrides the default backoff policy. Testing method
* @param minBackoffSeconds the minimum number of seconds to backoff
* @param maxBackoffSeconds the maximum number of seconds to backoff
* @return Future[Boolean] for success
*/
def processEvent(
event: com.google.protobuf.any.Any,
eventTag: String,
resultingState: com.google.protobuf.any.Any,
meta: MetaData,
policy: Option[retry.Policy] = Some(retry.Directly()),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 comments:

  • policy should be a constructor arg, not a method arg
  • you don't need an Option if you fail over to retry.Backoff, just make that the default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the backoff policy itself is a stateful class. I wanted to decouple the max and min time from the default arg

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yeah I didn't mean to set this as Some(something) whoops.

minBackoffSeconds: Long = 1L,
maxBackoffSeconds: Long = 30L): Future[Boolean] = {

implicit val success: retry.Success[Boolean] = retry.Success(x => x)

val finalPolicy: retry.Policy =
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this implementation only makes sense for gRPC. Just put it in the impl class.

FYI - this trait was only meant to make testing easier, not really for implementing many of these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The policy is not a grpc concept though. It is a retry concept.

policy.getOrElse(retry.Backoff(maxBackoffSeconds.toInt, FiniteDuration(minBackoffSeconds, TimeUnit.SECONDS)))

val f: Future[Boolean] = finalPolicy.apply(() =>
Future {
doProcessEvent(event, eventTag, resultingState, meta)
})

f
}

/**
* Processes events read from the Journal.
*
* @param event the actual event
* @param eventTag the event tag
* @param resultingState the resulting state of the applied event
* @param meta the additional meta data
* @return Boolean for success
*/
protected def doProcessEvent(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this on the trait? this is an implementation detail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wanted to decouple the implementation of "process" with the exponential backoff.

event: com.google.protobuf.any.Any,
eventTag: String,
resultingState: com.google.protobuf.any.Any,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import akka.projection.jdbc.JdbcSession
import com.google.protobuf.any.{ Any => ProtoAny }
import com.namely.protobuf.chiefofstate.v1.common.MetaData
import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper
import com.namely.protobuf.chiefofstate.v1.readside.HandleReadSideResponse
import org.slf4j.{ Logger, LoggerFactory }

import scala.util.{ Failure, Success, Try }
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.util.{ Failure, Success }

/**
* Implements the akka JdbcHandler interface and forwards events to the
Expand Down Expand Up @@ -43,8 +45,10 @@ private[readside] class ReadSideJdbcHandler(eventTag: String, processorId: Strin
val meta: MetaData = envelope.event.getMeta

// invoke remote processor, get result
val readSideSuccess: Boolean =
readSideHandler.processEvent(event, eventTag, resultingState, meta)
val processEvent: Future[Boolean] = readSideHandler.processEvent(event, eventTag, resultingState, meta)

// TODO: Make this configurable
val readSideSuccess: Boolean = Await.result(processEvent, Duration.apply(60L, TimeUnit.SECONDS))

if (!readSideSuccess) {
val errMsg: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.typesafe.config.Config
import com.zaxxer.hikari.{ HikariConfig, HikariDataSource }
import io.grpc.ClientInterceptor
import org.slf4j.{ Logger, LoggerFactory }
import scala.concurrent.ExecutionContext

/**
* Used to configure and start all read side processors
Expand All @@ -29,7 +30,7 @@ class ReadSideManager(
interceptors: Seq[ClientInterceptor],
dbConfig: ReadSideManager.DbConfig,
readSideConfigs: Seq[ReadSideConfig],
numShards: Int) {
numShards: Int)(implicit ec: ExecutionContext) {
AntonioYuen marked this conversation as resolved.
Show resolved Hide resolved

private val logger: Logger = LoggerFactory.getLogger(this.getClass)

Expand Down Expand Up @@ -62,7 +63,8 @@ class ReadSideManager(

object ReadSideManager {

def apply(system: ActorSystem[_], interceptors: Seq[ClientInterceptor], numShards: Int): ReadSideManager = {
def apply(system: ActorSystem[_], interceptors: Seq[ClientInterceptor], numShards: Int)(
implicit ec: ExecutionContext): ReadSideManager = {

val dbConfig: DbConfig = {
// read the jdbc-default settings
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright 2020 Namely Inc.
*
* SPDX-License-Identifier: MIT
*/

package com.namely.chiefofstate.helper

import scala.concurrent.ExecutionContext

trait ExecutionContextHelper {
implicit val ec: ExecutionContext = ExecutionContext.global
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't do this, just have your test set it explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just felt this was cleaner

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

package com.namely.chiefofstate.readside

import com.namely.chiefofstate.helper.BaseSpec
import com.google.protobuf.any
import com.namely.chiefofstate.helper.{ BaseSpec, ExecutionContextHelper }
import com.namely.protobuf.chiefofstate.v1.common.MetaData
import com.namely.protobuf.chiefofstate.v1.readside.{
HandleReadSideRequest,
Expand All @@ -20,19 +21,18 @@ import io.grpc.inprocess._
import io.opentelemetry.api.{ GlobalOpenTelemetry, OpenTelemetry }
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.trace.data.SpanData

import scala.jdk.CollectionConverters.ListHasAsScala
import scala.concurrent.ExecutionContext.global
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }
import scala.concurrent.{ Await, ExecutionContext, Future }
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.grpc.StatusRuntimeException

class ReadSideHandlerImplSpec extends BaseSpec {
import scala.concurrent.duration.Duration

class ReadSideHandlerSpec extends BaseSpec with ExecutionContextHelper {

var testExporter: InMemorySpanExporter = _
var openTelemetry: OpenTelemetry = _
Expand Down Expand Up @@ -88,14 +88,17 @@ class ReadSideHandlerImplSpec extends BaseSpec {

val readSideHandlerImpl = new ReadSideHandlerImpl(readSideId, readSideHandlerServiceStub)

val triedHandleReadSideResponse =
val triedHandleReadSideResponse: Future[Boolean] =
readSideHandlerImpl.processEvent(
com.google.protobuf.any.Any.pack(accountOpened),
eventTag,
resultingState,
meta)
event = com.google.protobuf.any.Any.pack(accountOpened),
eventTag = eventTag,
resultingState = resultingState,
meta = meta,
policy = Some(retry.Directly()))

val result: Boolean = Await.result(triedHandleReadSideResponse, Duration.Inf)

triedHandleReadSideResponse shouldBe true
result shouldBe true
}

"handle response with explicit failure" in {
Expand Down Expand Up @@ -140,12 +143,15 @@ class ReadSideHandlerImplSpec extends BaseSpec {

val triedHandleReadSideResponse =
readSideHandlerImpl.processEvent(
com.google.protobuf.any.Any.pack(accountOpened),
eventTag,
resultingState,
meta)
event = com.google.protobuf.any.Any.pack(accountOpened),
eventTag = eventTag,
resultingState = resultingState,
meta = meta,
policy = Some(retry.Directly()))

triedHandleReadSideResponse shouldBe false
val result: Boolean = Await.result(triedHandleReadSideResponse, Duration.Inf)

result shouldBe false
}

"handle event when there is an exception" in {
Expand Down Expand Up @@ -187,19 +193,18 @@ class ReadSideHandlerImplSpec extends BaseSpec {
val readSideHandlerImpl = new ReadSideHandlerImpl(readSideId, readSideHandlerServiceStub)
val triedHandleReadSideResponse =
readSideHandlerImpl.processEvent(
com.google.protobuf.any.Any.pack(accountOpened),
eventTag,
resultingState,
meta)
event = com.google.protobuf.any.Any.pack(accountOpened),
eventTag = eventTag,
resultingState = resultingState,
meta = meta,
policy = Some(retry.Directly()))

val result: Boolean = Await.result(triedHandleReadSideResponse, Duration.Inf)

triedHandleReadSideResponse shouldBe false
result shouldBe false

// assert the span was closed even in case of a failure
testExporter
.getFinishedSpanItems()
.asScala
.find(_.getName() == readSideHandlerImpl.spanName)
.isDefined shouldBe true
testExporter.getFinishedSpanItems.asScala.exists(_.getName == readSideHandlerImpl.spanName) shouldBe true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

package com.namely.chiefofstate.readside

import com.namely.chiefofstate.helper.BaseSpec
import com.namely.chiefofstate.helper.{ BaseSpec, ExecutionContextHelper }
import com.dimafeng.testcontainers.{ ForAllTestContainer, PostgreSQLContainer }
import org.testcontainers.utility.DockerImageName

import java.sql.{ Connection, DriverManager }
import akka.projection.jdbc.JdbcSession
import akka.projection.eventsourced.EventEnvelope
Expand All @@ -18,7 +19,9 @@ import com.namely.protobuf.chiefofstate.v1.common.MetaData
import com.google.protobuf.any
import com.google.protobuf.wrappers.StringValue

class ReadSideJdbcHandlerSpec extends BaseSpec with ForAllTestContainer {
import scala.concurrent.Future

class ReadSideJdbcHandlerSpec extends BaseSpec with ForAllTestContainer with ExecutionContextHelper {

val cosSchema: String = "cos"

Expand All @@ -41,7 +44,7 @@ class ReadSideJdbcHandlerSpec extends BaseSpec with ForAllTestContainer {
// mock read handler that returns success
val readHandler = mock[ReadSideHandler]

(readHandler.processEvent _).expects(*, *, *, *).returning(true).once
(readHandler.processEvent _).expects(*, *, *, *, *, *, *).returning(Future(true)).once()

val jdbcHandler = new ReadSideJdbcHandler("tag", "processor", readHandler)
val jdbcSession: JdbcSession = mock[JdbcSession]
Expand All @@ -59,7 +62,7 @@ class ReadSideJdbcHandlerSpec extends BaseSpec with ForAllTestContainer {
// mock read handler that returns success
val readHandler = mock[ReadSideHandler]

(readHandler.processEvent _).expects(*, *, *, *).returning(false).once
(readHandler.processEvent _).expects(*, *, *, *, *, *, *).returning(Future(false)).once()

val jdbcHandler = new ReadSideJdbcHandler("tag", "processor", readHandler)
val jdbcSession: JdbcSession = mock[JdbcSession]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

package com.namely.chiefofstate.readside

import com.namely.chiefofstate.helper.BaseSpec
import com.namely.chiefofstate.helper.{ BaseSpec, ExecutionContextHelper }
import com.dimafeng.testcontainers.{ ForAllTestContainer, PostgreSQLContainer }
import org.testcontainers.utility.DockerImageName
import com.typesafe.config.{ Config, ConfigFactory, ConfigValueFactory }
import akka.actor.testkit.typed.scaladsl.ActorTestKit

class ReadSideManagerSpec extends BaseSpec with ForAllTestContainer {
class ReadSideManagerSpec extends BaseSpec with ForAllTestContainer with ExecutionContextHelper {

val cosSchema: String = "cos"

Expand Down
6 changes: 5 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ object Dependencies {
val PrometheusServerVersion: String = "0.10.0"

val TestContainers: String = "0.39.3"

val RetryVersion: String = "0.3.3"
}

import Dependencies.Versions._
Expand Down Expand Up @@ -73,7 +75,9 @@ object Dependencies {
("io.opentelemetry" % "opentelemetry-exporter-jaeger-thrift" % OpenTelemetryVersion).excludeAll(excludeGRPC),
"io.opentelemetry" % "opentelemetry-exporter-prometheus" % OpenTelemetryMetricsVersion,
"io.opentelemetry" % "opentelemetry-sdk-testing" % OpenTelemetryVersion % Test,
"io.prometheus" % "simpleclient_httpserver" % PrometheusServerVersion)
"io.prometheus" % "simpleclient_httpserver" % PrometheusServerVersion,
"com.softwaremill.retry" %% "retry" % RetryVersion
)

val testJars: Seq[ModuleID] = Seq(
"com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,
Expand Down