-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Readside exponential backoff #329
Changes from all commits
28bf0d9
36c0284
5861f26
8316418
afc2814
b4539c0
04c29b7
1eccc6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,9 @@ import io.opentelemetry.api.GlobalOpenTelemetry | |
import io.opentelemetry.api.trace.Span | ||
import org.slf4j.{ Logger, LoggerFactory } | ||
|
||
import java.util.concurrent.TimeUnit | ||
import scala.concurrent.duration.FiniteDuration | ||
import scala.concurrent.{ ExecutionContext, Future, Promise } | ||
import scala.util.{ Failure, Success, Try } | ||
|
||
/** | ||
|
@@ -26,7 +29,7 @@ import scala.util.{ Failure, Success, Try } | |
*/ | ||
private[readside] class ReadSideHandlerImpl( | ||
processorId: String, | ||
readSideHandlerServiceBlockingStub: ReadSideHandlerServiceBlockingStub) | ||
readSideHandlerServiceBlockingStub: ReadSideHandlerServiceBlockingStub)(implicit val ec: ExecutionContext) | ||
extends ReadSideHandler { | ||
|
||
private val COS_EVENT_TAG_HEADER = "x-cos-event-tag" | ||
|
@@ -44,7 +47,7 @@ private[readside] class ReadSideHandlerImpl( | |
* @param meta the additional meta data | ||
* @return an eventual HandleReadSideResponse | ||
*/ | ||
def processEvent( | ||
override def doProcessEvent( | ||
event: com.google.protobuf.any.Any, | ||
eventTag: String, | ||
resultingState: com.google.protobuf.any.Any, | ||
|
@@ -103,27 +106,55 @@ private[readside] class ReadSideHandlerImpl( | |
} | ||
} | ||
|
||
/** | ||
* Processes events read from the Journal | ||
* | ||
* @param event the actual event | ||
* @param eventTag the event tag | ||
* @param resultingState the resulting state of the applied event | ||
* @param meta the additional meta data | ||
* @return an eventual HandleReadSideResponse | ||
*/ | ||
private[readside] trait ReadSideHandler { | ||
|
||
implicit val ec: ExecutionContext | ||
|
||
/** | ||
* handles a read side message | ||
* Processes events read from the Journal. | ||
* Exponentially backoff with a 10& gain modifier until it reaches the upper threshold of maxBackoffSeconds. | ||
* | ||
* @param event | ||
* @param eventTag | ||
* @param resultingState | ||
* @param meta | ||
* @return | ||
* @param event the actual event | ||
* @param eventTag the event tag | ||
* @param resultingState the resulting state of the applied event | ||
* @param meta the additional meta data | ||
* @param policy the retry policy. If provided, overrides the default backoff policy. Testing method | ||
* @param minBackoffSeconds the minimum number of seconds to backoff | ||
* @param maxBackoffSeconds the maximum number of seconds to backoff | ||
* @return Future[Boolean] for success | ||
*/ | ||
def processEvent( | ||
event: com.google.protobuf.any.Any, | ||
eventTag: String, | ||
resultingState: com.google.protobuf.any.Any, | ||
meta: MetaData, | ||
policy: Option[retry.Policy] = Some(retry.Directly()), | ||
minBackoffSeconds: Long = 1L, | ||
maxBackoffSeconds: Long = 30L): Future[Boolean] = { | ||
|
||
implicit val success: retry.Success[Boolean] = retry.Success(x => x) | ||
|
||
val finalPolicy: retry.Policy = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this implementation only makes sense for gRPC. Just put it in the FYI - this trait was only meant to make testing easier, not really for implementing many of these. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The policy is not a grpc concept though. It is a retry concept. |
||
policy.getOrElse(retry.Backoff(maxBackoffSeconds.toInt, FiniteDuration(minBackoffSeconds, TimeUnit.SECONDS))) | ||
|
||
val f: Future[Boolean] = finalPolicy.apply(() => | ||
Future { | ||
doProcessEvent(event, eventTag, resultingState, meta) | ||
}) | ||
|
||
f | ||
} | ||
|
||
/** | ||
* Processes events read from the Journal. | ||
* | ||
* @param event the actual event | ||
* @param eventTag the event tag | ||
* @param resultingState the resulting state of the applied event | ||
* @param meta the additional meta data | ||
* @return Boolean for success | ||
*/ | ||
protected def doProcessEvent( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this on the trait? this is an implementation detail There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wanted to decouple the implementation of "process" with the exponential backoff. |
||
event: com.google.protobuf.any.Any, | ||
eventTag: String, | ||
resultingState: com.google.protobuf.any.Any, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
/* | ||
* Copyright 2020 Namely Inc. | ||
* | ||
* SPDX-License-Identifier: MIT | ||
*/ | ||
|
||
package com.namely.chiefofstate.helper | ||
|
||
import scala.concurrent.ExecutionContext | ||
|
||
trait ExecutionContextHelper { | ||
implicit val ec: ExecutionContext = ExecutionContext.global | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't do this, just have your test set it explicitly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just felt this was cleaner |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 comments:
policy
should be a constructor arg, not a method argOption
if you fail over toretry.Backoff
, just make that the defaultThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the backoff policy itself is a stateful class. I wanted to decouple the max and min time from the default arg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But yeah I didn't mean to set this as Some(something) whoops.