From 28bf0d9d3dec59b575da026855e2e415d2510255 Mon Sep 17 00:00:00 2001 From: Antonio Yuen Date: Fri, 30 Apr 2021 13:01:34 -0400 Subject: [PATCH 1/5] enable grpc retry --- .../src/main/scala/com/namely/chiefofstate/NettyHelper.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code/service/src/main/scala/com/namely/chiefofstate/NettyHelper.scala b/code/service/src/main/scala/com/namely/chiefofstate/NettyHelper.scala index 5f4553f3..483d1ead 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/NettyHelper.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/NettyHelper.scala @@ -24,6 +24,6 @@ object NettyHelper { // decide on negotiation type val negotiationType: NegotiationType = if (useTls) TLS else PLAINTEXT - NettyChannelBuilder.forAddress(host, port).negotiationType(negotiationType) + NettyChannelBuilder.forAddress(host, port).enableRetry().negotiationType(negotiationType) } } From 8316418af96eb9e1de58228b2886a2882b70d280 Mon Sep 17 00:00:00 2001 From: Antonio Yuen Date: Fri, 30 Apr 2021 16:55:58 -0400 Subject: [PATCH 2/5] exponential backoff --- .../readside/ReadSideHandler.scala | 65 ++++++++++++------- .../readside/ReadSideJdbcHandler.scala | 6 +- 2 files changed, 42 insertions(+), 29 deletions(-) diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala index 56eb7258..4ecae3cb 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala @@ -7,15 +7,17 @@ package com.namely.chiefofstate.readside import com.namely.protobuf.chiefofstate.v1.common.MetaData -import com.namely.protobuf.chiefofstate.v1.readside.{ HandleReadSideRequest, HandleReadSideResponse } +import com.namely.protobuf.chiefofstate.v1.readside.{HandleReadSideRequest, HandleReadSideResponse} import com.namely.protobuf.chiefofstate.v1.readside.ReadSideHandlerServiceGrpc.ReadSideHandlerServiceBlockingStub import io.grpc.Metadata import io.grpc.stub.MetadataUtils import io.opentelemetry.api.GlobalOpenTelemetry import io.opentelemetry.api.trace.Span -import org.slf4j.{ Logger, LoggerFactory } +import org.slf4j.{Logger, LoggerFactory} -import scala.util.{ Failure, Success, Try } +import java.time.Duration +import scala.annotation.tailrec +import scala.util.{Failure, Success, Try} /** * read side processor that sends messages to a gRPC server that implements @@ -44,7 +46,7 @@ private[readside] class ReadSideHandlerImpl( * @param meta the additional meta data * @return an eventual HandleReadSideResponse */ - def processEvent( + override def doProcessEvent( event: com.google.protobuf.any.Any, eventTag: String, resultingState: com.google.protobuf.any.Any, @@ -102,29 +104,44 @@ private[readside] class ReadSideHandlerImpl( } } -/** - * Processes events read from the Journal - * - * @param event the actual event - * @param eventTag the event tag - * @param resultingState the resulting state of the applied event - * @param meta the additional meta data - * @return an eventual HandleReadSideResponse - */ private[readside] trait ReadSideHandler { /** - * handles a read side message + * Processes events read from the Journal * - * @param event - * @param eventTag - * @param resultingState - * @param meta - * @return + * @param event the actual event + * @param eventTag the event tag + * @param resultingState the resulting state of the applied event + * @param meta the additional meta data + * @return Boolean for success */ - def processEvent( - event: com.google.protobuf.any.Any, - eventTag: String, - resultingState: com.google.protobuf.any.Any, - meta: MetaData): Boolean + @tailrec + protected final def processEvent( + event: com.google.protobuf.any.Any, + eventTag: String, + resultingState: com.google.protobuf.any.Any, + meta: MetaData, + numAttempts: Int = 0, + minBackOffSeconds: Long = 1, + maxBackOffSeconds: Long = 30): Boolean = { + + val isSuccess: Boolean = doProcessEvent(event, eventTag, resultingState, meta) + + if (!isSuccess) { + val backoffSeconds: Long = Math.min(maxBackOffSeconds, (minBackOffSeconds * Math.pow(1.1, numAttempts)).toLong) + + Thread.sleep(Duration.ofSeconds(backoffSeconds).toMillis) + + processEvent(event, eventTag, resultingState, meta, numAttempts + 1, minBackOffSeconds, maxBackOffSeconds) + } else { + isSuccess + } + + } + + def doProcessEvent( + event: com.google.protobuf.any.Any, + eventTag: String, + resultingState: com.google.protobuf.any.Any, + meta: MetaData): Boolean } diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandler.scala index dc1e0528..bd2ffac3 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandler.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandler.scala @@ -12,11 +12,8 @@ import akka.projection.jdbc.JdbcSession import com.google.protobuf.any.{ Any => ProtoAny } import com.namely.protobuf.chiefofstate.v1.common.MetaData import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper -import com.namely.protobuf.chiefofstate.v1.readside.HandleReadSideResponse import org.slf4j.{ Logger, LoggerFactory } -import scala.util.{ Failure, Success, Try } - /** * Implements the akka JdbcHandler interface and forwards events to the * provided read side handler @@ -43,8 +40,7 @@ private[readside] class ReadSideJdbcHandler(eventTag: String, processorId: Strin val meta: MetaData = envelope.event.getMeta // invoke remote processor, get result - val readSideSuccess: Boolean = - readSideHandler.processEvent(event, eventTag, resultingState, meta) + val readSideSuccess: Boolean = readSideHandler.processEvent(event, eventTag, resultingState, meta) if (!readSideSuccess) { val errMsg: String = From afc2814b16a5a43f4047490d332b69fed25f02c6 Mon Sep 17 00:00:00 2001 From: Antonio Yuen Date: Fri, 30 Apr 2021 18:57:47 -0400 Subject: [PATCH 3/5] refactor backoff method and tests --- .../readside/ReadSideHandler.scala | 104 +++++++++++------ .../readside/ReadSideManager.scala | 6 +- .../readside/ReadSideProjection.scala | 6 +- .../chiefofstate/readside/Cancellable.scala | 30 +++++ .../readside/ReadSideHandlerSpec.scala | 109 ++++++++++++++---- .../readside/ReadSideJdbcHandlerSpec.scala | 4 +- 6 files changed, 195 insertions(+), 64 deletions(-) create mode 100644 code/service/src/test/scala/com/namely/chiefofstate/readside/Cancellable.scala diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala index 4ecae3cb..2aebeaf8 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala @@ -7,17 +7,17 @@ package com.namely.chiefofstate.readside import com.namely.protobuf.chiefofstate.v1.common.MetaData -import com.namely.protobuf.chiefofstate.v1.readside.{HandleReadSideRequest, HandleReadSideResponse} +import com.namely.protobuf.chiefofstate.v1.readside.{ HandleReadSideRequest, HandleReadSideResponse } import com.namely.protobuf.chiefofstate.v1.readside.ReadSideHandlerServiceGrpc.ReadSideHandlerServiceBlockingStub import io.grpc.Metadata import io.grpc.stub.MetadataUtils import io.opentelemetry.api.GlobalOpenTelemetry import io.opentelemetry.api.trace.Span -import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.{ Logger, LoggerFactory } import java.time.Duration import scala.annotation.tailrec -import scala.util.{Failure, Success, Try} +import scala.util.{ Failure, Success, Try } /** * read side processor that sends messages to a gRPC server that implements @@ -106,42 +106,78 @@ private[readside] class ReadSideHandlerImpl( private[readside] trait ReadSideHandler { + def onBeginProcess(): Unit = {} + def onEndProcess(): Unit = {} + /** - * Processes events read from the Journal + * Processes events read from the Journal. + * Exponentially backoff with a 10& gain modifier until it reaches the upper threshold of maxBackoffSeconds. + * If maxAttempts is set to a value 0 or less, it will backoff indefinitely, otherwise it + * will run a number of times up to to the maxAttempts. * - * @param event the actual event - * @param eventTag the event tag - * @param resultingState the resulting state of the applied event - * @param meta the additional meta data - * @return Boolean for success + * @param event the actual event + * @param eventTag the event tag + * @param resultingState the resulting state of the applied event + * @param meta the additional meta data + * @param maxAttempts the max number of attempts before quit, infinite if set to 0 + * @param minBackoffSeconds the minimum number of seconds to backoff + * @param maxBackoffSeconds the maximum number of seconds to backoff + * @return Boolean for success */ - @tailrec - protected final def processEvent( - event: com.google.protobuf.any.Any, - eventTag: String, - resultingState: com.google.protobuf.any.Any, - meta: MetaData, - numAttempts: Int = 0, - minBackOffSeconds: Long = 1, - maxBackOffSeconds: Long = 30): Boolean = { - - val isSuccess: Boolean = doProcessEvent(event, eventTag, resultingState, meta) - - if (!isSuccess) { - val backoffSeconds: Long = Math.min(maxBackOffSeconds, (minBackOffSeconds * Math.pow(1.1, numAttempts)).toLong) - - Thread.sleep(Duration.ofSeconds(backoffSeconds).toMillis) - - processEvent(event, eventTag, resultingState, meta, numAttempts + 1, minBackOffSeconds, maxBackOffSeconds) - } else { - isSuccess + def processEvent( + event: com.google.protobuf.any.Any, + eventTag: String, + resultingState: com.google.protobuf.any.Any, + meta: MetaData, + maxAttempts: Int = 0, + minBackoffSeconds: Long = 1, + maxBackoffSeconds: Long = 30): Boolean = { + + /** + * Recursive function that incorporates exponential backOff + * + * @param numAttempts the attempt number + * @return Boolean for success + */ + @tailrec + def loop(numAttempts: Int = 0): Boolean = { + val isSuccess: Boolean = doProcessEvent(event, eventTag, resultingState, meta) + + if (!isSuccess && (maxAttempts <= 0 || numAttempts >= maxAttempts)) { + val backoffSeconds: Long = Math.min(maxBackoffSeconds, (minBackoffSeconds * Math.pow(1.1, numAttempts)).toLong) + + Thread.sleep(Duration.ofSeconds(backoffSeconds).toMillis) + + loop(numAttempts + 1) + } else { + isSuccess + } } + require(minBackoffSeconds > 0, "minBackOffSeconds must be greater than 0") + require( + maxBackoffSeconds >= minBackoffSeconds, + "maxBackOffSeconds must be greater than or equal to minBackOffSeconds") + + onBeginProcess() + val result: Boolean = loop() + onEndProcess() + + result } - def doProcessEvent( - event: com.google.protobuf.any.Any, - eventTag: String, - resultingState: com.google.protobuf.any.Any, - meta: MetaData): Boolean + /** + * Processes events read from the Journal. + * + * @param event the actual event + * @param eventTag the event tag + * @param resultingState the resulting state of the applied event + * @param meta the additional meta data + * @return Boolean for success + */ + protected def doProcessEvent( + event: com.google.protobuf.any.Any, + eventTag: String, + resultingState: com.google.protobuf.any.Any, + meta: MetaData): Boolean } diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala index 9f6028d6..c0ebd550 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala @@ -8,12 +8,12 @@ package com.namely.chiefofstate.readside import akka.actor.typed.ActorSystem import com.namely.chiefofstate.NettyHelper -import com.namely.chiefofstate.config.{ReadSideConfig, ReadSideConfigReader} +import com.namely.chiefofstate.config.{ ReadSideConfig, ReadSideConfigReader } import com.namely.protobuf.chiefofstate.v1.readside.ReadSideHandlerServiceGrpc.ReadSideHandlerServiceBlockingStub import com.typesafe.config.Config -import com.zaxxer.hikari.{HikariConfig, HikariDataSource} +import com.zaxxer.hikari.{ HikariConfig, HikariDataSource } import io.grpc.ClientInterceptor -import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.{ Logger, LoggerFactory } /** * Used to configure and start all read side processors diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideProjection.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideProjection.scala index a6fc6fa6..fdc52260 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideProjection.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideProjection.scala @@ -6,7 +6,7 @@ package com.namely.chiefofstate.readside -import akka.actor.typed.{ActorSystem, Behavior} +import akka.actor.typed.{ ActorSystem, Behavior } import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal @@ -15,10 +15,10 @@ import akka.projection.eventsourced.EventEnvelope import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.jdbc.scaladsl.JdbcProjection import akka.projection.scaladsl.SourceProvider -import akka.projection.{ProjectionBehavior, ProjectionId} +import akka.projection.{ ProjectionBehavior, ProjectionId } import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper import javax.sql.DataSource -import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.{ Logger, LoggerFactory } /** * Read side processor creates a sharded daemon process for handling diff --git a/code/service/src/test/scala/com/namely/chiefofstate/readside/Cancellable.scala b/code/service/src/test/scala/com/namely/chiefofstate/readside/Cancellable.scala new file mode 100644 index 00000000..06e145b7 --- /dev/null +++ b/code/service/src/test/scala/com/namely/chiefofstate/readside/Cancellable.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2020 Namely Inc. + * + * SPDX-License-Identifier: MIT + */ + +package com.namely.chiefofstate.readside + +import java.util.concurrent.FutureTask +import scala.concurrent.{ ExecutionContext, Future, Promise } +import scala.util.Try + +class Cancellable[T](executionContext: ExecutionContext, todo: => T) { + private val promise: Promise[T] = Promise[T]() + + def future: Future[T] = promise.future + + private val jf: FutureTask[T] = new FutureTask[T](() => todo) { + override def done(): Unit = promise.complete(Try(get())) + } + + def cancel(): Unit = jf.cancel(true) + + executionContext.execute(jf) +} + +object Cancellable { + def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] = + new Cancellable[T](executionContext, todo) +} diff --git a/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideHandlerSpec.scala b/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideHandlerSpec.scala index f19f3444..b4803359 100644 --- a/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideHandlerSpec.scala +++ b/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideHandlerSpec.scala @@ -6,6 +6,7 @@ package com.namely.chiefofstate.readside +import com.google.protobuf.any import com.namely.chiefofstate.helper.BaseSpec import com.namely.protobuf.chiefofstate.v1.common.MetaData import com.namely.protobuf.chiefofstate.v1.readside.{ @@ -20,19 +21,19 @@ import io.grpc.inprocess._ import io.opentelemetry.api.{ GlobalOpenTelemetry, OpenTelemetry } import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter import io.opentelemetry.sdk.OpenTelemetrySdk -import io.opentelemetry.sdk.trace.data.SpanData import scala.jdk.CollectionConverters.ListHasAsScala import scala.concurrent.ExecutionContext.global -import scala.concurrent.Future -import scala.util.{ Failure, Success, Try } +import scala.concurrent.{ Await, Awaitable, CanAwait, ExecutionContext, Future } import io.opentelemetry.sdk.trace.SdkTracerProvider import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor import io.opentelemetry.context.propagation.ContextPropagators import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator -import io.grpc.StatusRuntimeException -class ReadSideHandlerImplSpec extends BaseSpec { +import java.util.concurrent.Executors +import scala.concurrent.duration.Duration + +class ReadSideHandlerSpec extends BaseSpec { var testExporter: InMemorySpanExporter = _ var openTelemetry: OpenTelemetry = _ @@ -88,10 +89,11 @@ class ReadSideHandlerImplSpec extends BaseSpec { val triedHandleReadSideResponse = readSideHandlerImpl.processEvent( - com.google.protobuf.any.Any.pack(accountOpened), - eventTag, - resultingState, - meta) + event = com.google.protobuf.any.Any.pack(accountOpened), + eventTag = eventTag, + resultingState = resultingState, + meta = meta, + maxAttempts = 5) triedHandleReadSideResponse shouldBe true } @@ -136,10 +138,11 @@ class ReadSideHandlerImplSpec extends BaseSpec { val triedHandleReadSideResponse = readSideHandlerImpl.processEvent( - com.google.protobuf.any.Any.pack(accountOpened), - eventTag, - resultingState, - meta) + event = com.google.protobuf.any.Any.pack(accountOpened), + eventTag = eventTag, + resultingState = resultingState, + meta = meta, + maxAttempts = 5) triedHandleReadSideResponse shouldBe false } @@ -181,19 +184,81 @@ class ReadSideHandlerImplSpec extends BaseSpec { val readSideHandlerImpl = new ReadSideHandlerImpl("id", readSideHandlerServiceStub) val triedHandleReadSideResponse = readSideHandlerImpl.processEvent( - com.google.protobuf.any.Any.pack(accountOpened), - eventTag, - resultingState, - meta) + event = com.google.protobuf.any.Any.pack(accountOpened), + eventTag = eventTag, + resultingState = resultingState, + meta = meta, + maxAttempts = 5) triedHandleReadSideResponse shouldBe false // assert the span was closed even in case of a failure - testExporter - .getFinishedSpanItems() - .asScala - .find(_.getName() == readSideHandlerImpl.spanName) - .isDefined shouldBe true + testExporter.getFinishedSpanItems.asScala.exists(_.getName == readSideHandlerImpl.spanName) shouldBe true + } + } + + "ReadSideHandler" should { + "run up to the maxAttempts" in { + val readSideHandler: MockReadSideHandler = new MockReadSideHandler + val actual: Boolean = readSideHandler.processEvent(null, null, null, null, maxAttempts = 1) + + actual shouldBe false + readSideHandler.getNumRuns should be > 0L + readSideHandler.isStarted shouldBe true + readSideHandler.isEnded shouldBe true + } + + "run indefinitely" in { + val readSideHandler: MockReadSideHandler = new MockReadSideHandler + + val ec: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1)) + val f = new Cancellable[Boolean](ec, readSideHandler.processEvent(null, null, null, null, maxBackoffSeconds = 1)) + + Thread.sleep(2000) // 2 seconds + + f.cancel() + + println(readSideHandler.getNumRuns) + readSideHandler.getNumRuns should be > 1L + readSideHandler.isStarted shouldBe true + readSideHandler.isEnded shouldBe false + } + + "fail minBackoffSeconds requirement" in { + val readSideHandler: MockReadSideHandler = new MockReadSideHandler + val err: IllegalArgumentException = intercept[IllegalArgumentException] { + readSideHandler.processEvent(null, null, null, null, minBackoffSeconds = 0) + } + err.getMessage shouldBe "requirement failed: minBackOffSeconds must be greater than 0" + } + + "fail maxBackoffSeconds requirement" in { + val readSideHandler: MockReadSideHandler = new MockReadSideHandler + val err: IllegalArgumentException = intercept[IllegalArgumentException] { + readSideHandler.processEvent(null, null, null, null, minBackoffSeconds = 100, maxBackoffSeconds = 0) + } + err.getMessage shouldBe "requirement failed: maxBackOffSeconds must be greater than or equal to minBackOffSeconds" } } } + +class MockReadSideHandler extends ReadSideHandler { + private var started: Boolean = false + private var ended: Boolean = false + private var numRuns: Long = 0 + + def isStarted: Boolean = started + def isEnded: Boolean = ended + def getNumRuns: Long = numRuns + + override def onBeginProcess(): Unit = { started = true } + override def onEndProcess(): Unit = { ended = true } + override protected def doProcessEvent( + event: any.Any, + eventTag: String, + resultingState: any.Any, + meta: MetaData): Boolean = { + numRuns = numRuns + 1L + false + } +} diff --git a/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandlerSpec.scala b/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandlerSpec.scala index c77a0a54..5bcbc6b1 100644 --- a/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandlerSpec.scala +++ b/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandlerSpec.scala @@ -41,7 +41,7 @@ class ReadSideJdbcHandlerSpec extends BaseSpec with ForAllTestContainer { // mock read handler that returns success val readHandler = mock[ReadSideHandler] - (readHandler.processEvent _).expects(*, *, *, *).returning(true).once + (readHandler.processEvent _).expects(*, *, *, *, *, *, *).returning(true).once() val jdbcHandler = new ReadSideJdbcHandler("tag", "processor", readHandler) val jdbcSession: JdbcSession = mock[JdbcSession] @@ -59,7 +59,7 @@ class ReadSideJdbcHandlerSpec extends BaseSpec with ForAllTestContainer { // mock read handler that returns success val readHandler = mock[ReadSideHandler] - (readHandler.processEvent _).expects(*, *, *, *).returning(false).once + (readHandler.processEvent _).expects(*, *, *, *, *, *, *).returning(false).once() val jdbcHandler = new ReadSideJdbcHandler("tag", "processor", readHandler) val jdbcSession: JdbcSession = mock[JdbcSession] From b4539c0ff75aa8106998357de2f1b90fab4836d6 Mon Sep 17 00:00:00 2001 From: Antonio Yuen Date: Sat, 1 May 2021 09:39:02 -0400 Subject: [PATCH 4/5] fix comment --- .../chiefofstate/readside/ReadSideHandler.scala | 7 +------ .../chiefofstate/readside/ReadSideHandlerSpec.scala | 11 ++++------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala index 2aebeaf8..24b5a51a 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala @@ -133,12 +133,7 @@ private[readside] trait ReadSideHandler { minBackoffSeconds: Long = 1, maxBackoffSeconds: Long = 30): Boolean = { - /** - * Recursive function that incorporates exponential backOff - * - * @param numAttempts the attempt number - * @return Boolean for success - */ + // Recursive function that incorporates exponential backOff @tailrec def loop(numAttempts: Int = 0): Boolean = { val isSuccess: Boolean = doProcessEvent(event, eventTag, resultingState, meta) diff --git a/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideHandlerSpec.scala b/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideHandlerSpec.scala index b4803359..746d7366 100644 --- a/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideHandlerSpec.scala +++ b/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideHandlerSpec.scala @@ -218,7 +218,6 @@ class ReadSideHandlerSpec extends BaseSpec { f.cancel() - println(readSideHandler.getNumRuns) readSideHandler.getNumRuns should be > 1L readSideHandler.isStarted shouldBe true readSideHandler.isEnded shouldBe false @@ -226,17 +225,15 @@ class ReadSideHandlerSpec extends BaseSpec { "fail minBackoffSeconds requirement" in { val readSideHandler: MockReadSideHandler = new MockReadSideHandler - val err: IllegalArgumentException = intercept[IllegalArgumentException] { - readSideHandler.processEvent(null, null, null, null, minBackoffSeconds = 0) - } + val err: IllegalArgumentException = intercept[IllegalArgumentException]{ + readSideHandler.processEvent(null, null, null, null, minBackoffSeconds = 0)} err.getMessage shouldBe "requirement failed: minBackOffSeconds must be greater than 0" } "fail maxBackoffSeconds requirement" in { val readSideHandler: MockReadSideHandler = new MockReadSideHandler - val err: IllegalArgumentException = intercept[IllegalArgumentException] { - readSideHandler.processEvent(null, null, null, null, minBackoffSeconds = 100, maxBackoffSeconds = 0) - } + val err: IllegalArgumentException = intercept[IllegalArgumentException]{ + readSideHandler.processEvent(null, null, null, null, minBackoffSeconds = 100, maxBackoffSeconds = 0)} err.getMessage shouldBe "requirement failed: maxBackOffSeconds must be greater than or equal to minBackOffSeconds" } } From 04c29b785d0e32eaa5f67c1f25557979bffdc90c Mon Sep 17 00:00:00 2001 From: Antonio Yuen Date: Mon, 3 May 2021 13:00:02 -0400 Subject: [PATCH 5/5] using retry package and passing futures around --- .../chiefofstate/ServiceBootstrapper.scala | 6 +- .../readside/ReadSideHandler.scala | 53 ++++------- .../readside/ReadSideJdbcHandler.scala | 10 ++- .../readside/ReadSideManager.scala | 7 +- .../helper/ExecutionContextHelper.scala | 13 +++ .../chiefofstate/readside/Cancellable.scala | 30 ------- .../readside/ReadSideHandlerSpec.scala | 89 ++++--------------- .../readside/ReadSideJdbcHandlerSpec.scala | 11 ++- .../readside/ReadSideManagerSpec.scala | 4 +- project/Dependencies.scala | 6 +- 10 files changed, 80 insertions(+), 149 deletions(-) create mode 100644 code/service/src/test/scala/com/namely/chiefofstate/helper/ExecutionContextHelper.scala delete mode 100644 code/service/src/test/scala/com/namely/chiefofstate/readside/Cancellable.scala diff --git a/code/service/src/main/scala/com/namely/chiefofstate/ServiceBootstrapper.scala b/code/service/src/main/scala/com/namely/chiefofstate/ServiceBootstrapper.scala index 96459ac7..dc8b9da9 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/ServiceBootstrapper.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/ServiceBootstrapper.scala @@ -157,11 +157,15 @@ object ServiceBootstrapper { system: ActorSystem[_], cosConfig: CosConfig, interceptors: Seq[ClientInterceptor]): Unit = { + + val readSideEc: ExecutionContext = TracedExecutorService.get() + // if read side is enabled if (cosConfig.enableReadSide) { // instantiate a read side manager val readSideManager: ReadSideManager = - ReadSideManager(system = system, interceptors = interceptors, numShards = cosConfig.eventsConfig.numShards) + ReadSideManager(system = system, interceptors = interceptors, numShards = cosConfig.eventsConfig.numShards)( + readSideEc) // initialize all configured read sides readSideManager.init() } diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala index 24b5a51a..4484f12f 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideHandler.scala @@ -15,8 +15,9 @@ import io.opentelemetry.api.GlobalOpenTelemetry import io.opentelemetry.api.trace.Span import org.slf4j.{ Logger, LoggerFactory } -import java.time.Duration -import scala.annotation.tailrec +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ ExecutionContext, Future, Promise } import scala.util.{ Failure, Success, Try } /** @@ -28,7 +29,7 @@ import scala.util.{ Failure, Success, Try } */ private[readside] class ReadSideHandlerImpl( processorId: String, - readSideHandlerServiceBlockingStub: ReadSideHandlerServiceBlockingStub) + readSideHandlerServiceBlockingStub: ReadSideHandlerServiceBlockingStub)(implicit val ec: ExecutionContext) extends ReadSideHandler { private val COS_EVENT_TAG_HEADER = "x-cos-event-tag" @@ -106,59 +107,41 @@ private[readside] class ReadSideHandlerImpl( private[readside] trait ReadSideHandler { - def onBeginProcess(): Unit = {} - def onEndProcess(): Unit = {} + implicit val ec: ExecutionContext /** * Processes events read from the Journal. * Exponentially backoff with a 10& gain modifier until it reaches the upper threshold of maxBackoffSeconds. - * If maxAttempts is set to a value 0 or less, it will backoff indefinitely, otherwise it - * will run a number of times up to to the maxAttempts. * * @param event the actual event * @param eventTag the event tag * @param resultingState the resulting state of the applied event * @param meta the additional meta data - * @param maxAttempts the max number of attempts before quit, infinite if set to 0 + * @param policy the retry policy. If provided, overrides the default backoff policy. Testing method * @param minBackoffSeconds the minimum number of seconds to backoff * @param maxBackoffSeconds the maximum number of seconds to backoff - * @return Boolean for success + * @return Future[Boolean] for success */ def processEvent( event: com.google.protobuf.any.Any, eventTag: String, resultingState: com.google.protobuf.any.Any, meta: MetaData, - maxAttempts: Int = 0, - minBackoffSeconds: Long = 1, - maxBackoffSeconds: Long = 30): Boolean = { - - // Recursive function that incorporates exponential backOff - @tailrec - def loop(numAttempts: Int = 0): Boolean = { - val isSuccess: Boolean = doProcessEvent(event, eventTag, resultingState, meta) - - if (!isSuccess && (maxAttempts <= 0 || numAttempts >= maxAttempts)) { - val backoffSeconds: Long = Math.min(maxBackoffSeconds, (minBackoffSeconds * Math.pow(1.1, numAttempts)).toLong) + policy: Option[retry.Policy] = Some(retry.Directly()), + minBackoffSeconds: Long = 1L, + maxBackoffSeconds: Long = 30L): Future[Boolean] = { - Thread.sleep(Duration.ofSeconds(backoffSeconds).toMillis) - - loop(numAttempts + 1) - } else { - isSuccess - } - } + implicit val success: retry.Success[Boolean] = retry.Success(x => x) - require(minBackoffSeconds > 0, "minBackOffSeconds must be greater than 0") - require( - maxBackoffSeconds >= minBackoffSeconds, - "maxBackOffSeconds must be greater than or equal to minBackOffSeconds") + val finalPolicy: retry.Policy = + policy.getOrElse(retry.Backoff(maxBackoffSeconds.toInt, FiniteDuration(minBackoffSeconds, TimeUnit.SECONDS))) - onBeginProcess() - val result: Boolean = loop() - onEndProcess() + val f: Future[Boolean] = finalPolicy.apply(() => + Future { + doProcessEvent(event, eventTag, resultingState, meta) + }) - result + f } /** diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandler.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandler.scala index bd2ffac3..c168e683 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandler.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandler.scala @@ -14,6 +14,11 @@ import com.namely.protobuf.chiefofstate.v1.common.MetaData import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper import org.slf4j.{ Logger, LoggerFactory } +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.Duration +import scala.concurrent.{ Await, ExecutionContext, Future } +import scala.util.{ Failure, Success } + /** * Implements the akka JdbcHandler interface and forwards events to the * provided read side handler @@ -40,7 +45,10 @@ private[readside] class ReadSideJdbcHandler(eventTag: String, processorId: Strin val meta: MetaData = envelope.event.getMeta // invoke remote processor, get result - val readSideSuccess: Boolean = readSideHandler.processEvent(event, eventTag, resultingState, meta) + val processEvent: Future[Boolean] = readSideHandler.processEvent(event, eventTag, resultingState, meta) + + // TODO: Make this configurable + val readSideSuccess: Boolean = Await.result(processEvent, Duration.apply(60L, TimeUnit.SECONDS)) if (!readSideSuccess) { val errMsg: String = diff --git a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala index c0ebd550..1557b87c 100644 --- a/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala +++ b/code/service/src/main/scala/com/namely/chiefofstate/readside/ReadSideManager.scala @@ -15,6 +15,8 @@ import com.zaxxer.hikari.{ HikariConfig, HikariDataSource } import io.grpc.ClientInterceptor import org.slf4j.{ Logger, LoggerFactory } +import scala.concurrent.ExecutionContext + /** * Used to configure and start all read side processors * @@ -29,7 +31,7 @@ class ReadSideManager( interceptors: Seq[ClientInterceptor], dbConfig: ReadSideManager.DbConfig, readSideConfigs: Seq[ReadSideConfig], - numShards: Int) { + numShards: Int)(implicit ec: ExecutionContext) { private val logger: Logger = LoggerFactory.getLogger(this.getClass) @@ -62,7 +64,8 @@ class ReadSideManager( object ReadSideManager { - def apply(system: ActorSystem[_], interceptors: Seq[ClientInterceptor], numShards: Int): ReadSideManager = { + def apply(system: ActorSystem[_], interceptors: Seq[ClientInterceptor], numShards: Int)( + implicit ec: ExecutionContext): ReadSideManager = { val dbConfig: DbConfig = { // read the jdbc-default settings diff --git a/code/service/src/test/scala/com/namely/chiefofstate/helper/ExecutionContextHelper.scala b/code/service/src/test/scala/com/namely/chiefofstate/helper/ExecutionContextHelper.scala new file mode 100644 index 00000000..31fa0580 --- /dev/null +++ b/code/service/src/test/scala/com/namely/chiefofstate/helper/ExecutionContextHelper.scala @@ -0,0 +1,13 @@ +/* + * Copyright 2020 Namely Inc. + * + * SPDX-License-Identifier: MIT + */ + +package com.namely.chiefofstate.helper + +import scala.concurrent.ExecutionContext + +trait ExecutionContextHelper { + implicit val ec: ExecutionContext = ExecutionContext.global +} diff --git a/code/service/src/test/scala/com/namely/chiefofstate/readside/Cancellable.scala b/code/service/src/test/scala/com/namely/chiefofstate/readside/Cancellable.scala deleted file mode 100644 index 06e145b7..00000000 --- a/code/service/src/test/scala/com/namely/chiefofstate/readside/Cancellable.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2020 Namely Inc. - * - * SPDX-License-Identifier: MIT - */ - -package com.namely.chiefofstate.readside - -import java.util.concurrent.FutureTask -import scala.concurrent.{ ExecutionContext, Future, Promise } -import scala.util.Try - -class Cancellable[T](executionContext: ExecutionContext, todo: => T) { - private val promise: Promise[T] = Promise[T]() - - def future: Future[T] = promise.future - - private val jf: FutureTask[T] = new FutureTask[T](() => todo) { - override def done(): Unit = promise.complete(Try(get())) - } - - def cancel(): Unit = jf.cancel(true) - - executionContext.execute(jf) -} - -object Cancellable { - def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] = - new Cancellable[T](executionContext, todo) -} diff --git a/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideHandlerSpec.scala b/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideHandlerSpec.scala index 746d7366..eaaedfc5 100644 --- a/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideHandlerSpec.scala +++ b/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideHandlerSpec.scala @@ -7,7 +7,7 @@ package com.namely.chiefofstate.readside import com.google.protobuf.any -import com.namely.chiefofstate.helper.BaseSpec +import com.namely.chiefofstate.helper.{ BaseSpec, ExecutionContextHelper } import com.namely.protobuf.chiefofstate.v1.common.MetaData import com.namely.protobuf.chiefofstate.v1.readside.{ HandleReadSideRequest, @@ -24,16 +24,15 @@ import io.opentelemetry.sdk.OpenTelemetrySdk import scala.jdk.CollectionConverters.ListHasAsScala import scala.concurrent.ExecutionContext.global -import scala.concurrent.{ Await, Awaitable, CanAwait, ExecutionContext, Future } +import scala.concurrent.{ Await, ExecutionContext, Future } import io.opentelemetry.sdk.trace.SdkTracerProvider import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor import io.opentelemetry.context.propagation.ContextPropagators import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator -import java.util.concurrent.Executors import scala.concurrent.duration.Duration -class ReadSideHandlerSpec extends BaseSpec { +class ReadSideHandlerSpec extends BaseSpec with ExecutionContextHelper { var testExporter: InMemorySpanExporter = _ var openTelemetry: OpenTelemetry = _ @@ -87,15 +86,17 @@ class ReadSideHandlerSpec extends BaseSpec { val readSideHandlerImpl = new ReadSideHandlerImpl("id", readSideHandlerServiceStub) - val triedHandleReadSideResponse = + val triedHandleReadSideResponse: Future[Boolean] = readSideHandlerImpl.processEvent( event = com.google.protobuf.any.Any.pack(accountOpened), eventTag = eventTag, resultingState = resultingState, meta = meta, - maxAttempts = 5) + policy = Some(retry.Directly())) + + val result: Boolean = Await.result(triedHandleReadSideResponse, Duration.Inf) - triedHandleReadSideResponse shouldBe true + result shouldBe true } "handle response with explicit failure" in { @@ -142,9 +143,11 @@ class ReadSideHandlerSpec extends BaseSpec { eventTag = eventTag, resultingState = resultingState, meta = meta, - maxAttempts = 5) + policy = Some(retry.Directly())) + + val result: Boolean = Await.result(triedHandleReadSideResponse, Duration.Inf) - triedHandleReadSideResponse shouldBe false + result shouldBe false } "handle event when there is an exception" in { @@ -188,74 +191,14 @@ class ReadSideHandlerSpec extends BaseSpec { eventTag = eventTag, resultingState = resultingState, meta = meta, - maxAttempts = 5) + policy = Some(retry.Directly())) - triedHandleReadSideResponse shouldBe false + val result: Boolean = Await.result(triedHandleReadSideResponse, Duration.Inf) + + result shouldBe false // assert the span was closed even in case of a failure testExporter.getFinishedSpanItems.asScala.exists(_.getName == readSideHandlerImpl.spanName) shouldBe true } } - - "ReadSideHandler" should { - "run up to the maxAttempts" in { - val readSideHandler: MockReadSideHandler = new MockReadSideHandler - val actual: Boolean = readSideHandler.processEvent(null, null, null, null, maxAttempts = 1) - - actual shouldBe false - readSideHandler.getNumRuns should be > 0L - readSideHandler.isStarted shouldBe true - readSideHandler.isEnded shouldBe true - } - - "run indefinitely" in { - val readSideHandler: MockReadSideHandler = new MockReadSideHandler - - val ec: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1)) - val f = new Cancellable[Boolean](ec, readSideHandler.processEvent(null, null, null, null, maxBackoffSeconds = 1)) - - Thread.sleep(2000) // 2 seconds - - f.cancel() - - readSideHandler.getNumRuns should be > 1L - readSideHandler.isStarted shouldBe true - readSideHandler.isEnded shouldBe false - } - - "fail minBackoffSeconds requirement" in { - val readSideHandler: MockReadSideHandler = new MockReadSideHandler - val err: IllegalArgumentException = intercept[IllegalArgumentException]{ - readSideHandler.processEvent(null, null, null, null, minBackoffSeconds = 0)} - err.getMessage shouldBe "requirement failed: minBackOffSeconds must be greater than 0" - } - - "fail maxBackoffSeconds requirement" in { - val readSideHandler: MockReadSideHandler = new MockReadSideHandler - val err: IllegalArgumentException = intercept[IllegalArgumentException]{ - readSideHandler.processEvent(null, null, null, null, minBackoffSeconds = 100, maxBackoffSeconds = 0)} - err.getMessage shouldBe "requirement failed: maxBackOffSeconds must be greater than or equal to minBackOffSeconds" - } - } -} - -class MockReadSideHandler extends ReadSideHandler { - private var started: Boolean = false - private var ended: Boolean = false - private var numRuns: Long = 0 - - def isStarted: Boolean = started - def isEnded: Boolean = ended - def getNumRuns: Long = numRuns - - override def onBeginProcess(): Unit = { started = true } - override def onEndProcess(): Unit = { ended = true } - override protected def doProcessEvent( - event: any.Any, - eventTag: String, - resultingState: any.Any, - meta: MetaData): Boolean = { - numRuns = numRuns + 1L - false - } } diff --git a/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandlerSpec.scala b/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandlerSpec.scala index 5bcbc6b1..65892fce 100644 --- a/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandlerSpec.scala +++ b/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideJdbcHandlerSpec.scala @@ -6,9 +6,10 @@ package com.namely.chiefofstate.readside -import com.namely.chiefofstate.helper.BaseSpec +import com.namely.chiefofstate.helper.{ BaseSpec, ExecutionContextHelper } import com.dimafeng.testcontainers.{ ForAllTestContainer, PostgreSQLContainer } import org.testcontainers.utility.DockerImageName + import java.sql.{ Connection, DriverManager } import akka.projection.jdbc.JdbcSession import akka.projection.eventsourced.EventEnvelope @@ -18,7 +19,9 @@ import com.namely.protobuf.chiefofstate.v1.common.MetaData import com.google.protobuf.any import com.google.protobuf.wrappers.StringValue -class ReadSideJdbcHandlerSpec extends BaseSpec with ForAllTestContainer { +import scala.concurrent.Future + +class ReadSideJdbcHandlerSpec extends BaseSpec with ForAllTestContainer with ExecutionContextHelper { val cosSchema: String = "cos" @@ -41,7 +44,7 @@ class ReadSideJdbcHandlerSpec extends BaseSpec with ForAllTestContainer { // mock read handler that returns success val readHandler = mock[ReadSideHandler] - (readHandler.processEvent _).expects(*, *, *, *, *, *, *).returning(true).once() + (readHandler.processEvent _).expects(*, *, *, *, *, *, *).returning(Future(true)).once() val jdbcHandler = new ReadSideJdbcHandler("tag", "processor", readHandler) val jdbcSession: JdbcSession = mock[JdbcSession] @@ -59,7 +62,7 @@ class ReadSideJdbcHandlerSpec extends BaseSpec with ForAllTestContainer { // mock read handler that returns success val readHandler = mock[ReadSideHandler] - (readHandler.processEvent _).expects(*, *, *, *, *, *, *).returning(false).once() + (readHandler.processEvent _).expects(*, *, *, *, *, *, *).returning(Future(false)).once() val jdbcHandler = new ReadSideJdbcHandler("tag", "processor", readHandler) val jdbcSession: JdbcSession = mock[JdbcSession] diff --git a/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideManagerSpec.scala b/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideManagerSpec.scala index 98ea1e91..eb33e9a6 100644 --- a/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideManagerSpec.scala +++ b/code/service/src/test/scala/com/namely/chiefofstate/readside/ReadSideManagerSpec.scala @@ -6,13 +6,13 @@ package com.namely.chiefofstate.readside -import com.namely.chiefofstate.helper.BaseSpec +import com.namely.chiefofstate.helper.{ BaseSpec, ExecutionContextHelper } import com.dimafeng.testcontainers.{ ForAllTestContainer, PostgreSQLContainer } import org.testcontainers.utility.DockerImageName import com.typesafe.config.{ Config, ConfigFactory, ConfigValueFactory } import akka.actor.testkit.typed.scaladsl.ActorTestKit -class ReadSideManagerSpec extends BaseSpec with ForAllTestContainer { +class ReadSideManagerSpec extends BaseSpec with ForAllTestContainer with ExecutionContextHelper { val cosSchema: String = "cos" diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0860d21e..3fd23a38 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -28,6 +28,8 @@ object Dependencies { val PrometheusServerVersion: String = "0.10.0" val TestContainers: String = "0.39.3" + + val RetryVersion: String = "0.3.3" } import Dependencies.Versions._ @@ -73,7 +75,9 @@ object Dependencies { ("io.opentelemetry" % "opentelemetry-exporter-jaeger-thrift" % OpenTelemetryVersion).excludeAll(excludeGRPC), "io.opentelemetry" % "opentelemetry-exporter-prometheus" % OpenTelemetryMetricsVersion, "io.opentelemetry" % "opentelemetry-sdk-testing" % OpenTelemetryVersion % Test, - "io.prometheus" % "simpleclient_httpserver" % PrometheusServerVersion) + "io.prometheus" % "simpleclient_httpserver" % PrometheusServerVersion, + "com.softwaremill.retry" %% "retry" % RetryVersion + ) val testJars: Seq[ModuleID] = Seq( "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,