Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Circuit breaker #266

Merged
merged 28 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
90ff17b
AtomicCircularBuffer and skeleton on CircuitBreaker
Kamil-Lontkowski Jan 9, 2025
8447a0f
CircuitBreakerCountStateMachine
Kamil-Lontkowski Jan 9, 2025
9951f29
WIP
Kamil-Lontkowski Jan 10, 2025
ed9985c
breaker based on actor
Kamil-Lontkowski Jan 13, 2025
11d8005
Don't use atomics inside state machine
Kamil-Lontkowski Jan 14, 2025
72ab369
Delete out of date TODOs
Kamil-Lontkowski Jan 14, 2025
81ee70e
Refactor nextState to be pure
Kamil-Lontkowski Jan 15, 2025
5c94eda
fixes and tests for state machine
Kamil-Lontkowski Jan 16, 2025
0a26e2f
scaladoc, move logic to different files
Kamil-Lontkowski Jan 16, 2025
20ed3ff
Refactor nextState to object, use composition instead of inheritance
Kamil-Lontkowski Jan 20, 2025
c617c66
don't go through all results on every call
Kamil-Lontkowski Jan 20, 2025
64d694f
CircuiBreaker docs calculate rolling metrics, more tests
Kamil-Lontkowski Jan 20, 2025
f42e74d
added bigger time margin for test, track metrics per result
Kamil-Lontkowski Jan 20, 2025
6f376eb
use removeHeadWhile instead of filterInPlace
Kamil-Lontkowski Jan 20, 2025
3581bd9
introduce PercentageThreshold type
Kamil-Lontkowski Jan 20, 2025
55afda2
Try to fix tests
adamw Jan 21, 2025
6e34c70
Fix helper method conflict
adamw Jan 21, 2025
09cc330
Failing test
adamw Jan 22, 2025
5be4f24
fix edge case for last completed call in halfOpen state
Kamil-Lontkowski Jan 22, 2025
9139982
Add test case for wrong calculation of metrics
Kamil-Lontkowski Jan 22, 2025
24e70b7
Don't count metrics refistered with different state, fix test
Kamil-Lontkowski Jan 22, 2025
cf7dcea
docs
Kamil-Lontkowski Jan 22, 2025
4e01101
docs grammar fixes, better working example
Kamil-Lontkowski Jan 23, 2025
dc6d93e
minor fixes, spell check on docs
Kamil-Lontkowski Jan 27, 2025
5183768
more spell corrections, docs
Kamil-Lontkowski Jan 27, 2025
f6ba6d2
default config for CircuitBreaker
Kamil-Lontkowski Jan 27, 2025
c27d427
default config for CircuitBreaker
Kamil-Lontkowski Jan 27, 2025
1ef2045
change names from rate to percentage
Kamil-Lontkowski Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions core/src/main/scala/ox/resilience/CircuitBreaker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package ox.resilience

import scala.concurrent.duration.*
import ox.*
import java.util.concurrent.Semaphore
import ox.channels.Actor
import ox.channels.BufferCapacity
import ox.channels.ActorRef
import scala.util.Try

private[resilience] enum CircuitBreakerState:
case Open(since: Long)
case Closed(since: Long)
case HalfOpen(since: Long, semaphore: Semaphore, completedOperations: Int = 0)
def isSameState(other: CircuitBreakerState): Boolean =
(this, other) match
case (Open(sinceOpen), Open(since)) if since == sinceOpen => true
case (HalfOpen(sinceHalfOpen, _, _), HalfOpen(since, _, _)) if sinceHalfOpen == since => true
case (Closed(sinceClosed), Closed(since)) if sinceClosed == since => true
case _ => false
end CircuitBreakerState

private[resilience] enum CircuitBreakerResult:
case Success
case Failure
case Slow

private[resilience] case class Metrics(
failureRate: Int,
Kamil-Lontkowski marked this conversation as resolved.
Show resolved Hide resolved
Kamil-Lontkowski marked this conversation as resolved.
Show resolved Hide resolved
slowCallsRate: Int,
operationsInWindow: Int,
lastAcquisitionResult: Option[AcquireResult],
timestamp: Long
)

private[resilience] case class AcquireResult(acquired: Boolean, circuitState: CircuitBreakerState)

private case class CircuitBreakerStateMachineConfig(
failureRateThreshold: PercentageThreshold,
slowCallThreshold: PercentageThreshold,
slowCallDurationThreshold: FiniteDuration,
minimumNumberOfCalls: Int,
numberOfCallsInHalfOpenState: Int,
waitDurationOpenState: FiniteDuration,
halfOpenTimeoutDuration: FiniteDuration
)
private object CircuitBreakerStateMachineConfig:
def fromConfig(c: CircuitBreakerConfig): CircuitBreakerStateMachineConfig =
CircuitBreakerStateMachineConfig(
failureRateThreshold = c.failureRateThreshold,
slowCallThreshold = c.slowCallThreshold,
slowCallDurationThreshold = c.slowCallDurationThreshold,
minimumNumberOfCalls = c.minimumNumberOfCalls,
numberOfCallsInHalfOpenState = c.numberOfCallsInHalfOpenState,
waitDurationOpenState = c.waitDurationOpenState,
halfOpenTimeoutDuration = c.halfOpenTimeoutDuration
)
end CircuitBreakerStateMachineConfig

/** Circuit Breaker. Operations can be dropped, when the breaker is open or if it doesn't take more operation in halfOpen state. The Circuit
* Breaker might calculate different metrics based on [[SlidingWindow]] provided in config. See [[SlidingWindow]] for more details.
*/
case class CircuitBreaker(config: CircuitBreakerConfig)(using ox: Ox, bufferCapacity: BufferCapacity):
private[resilience] val stateMachine = CircuitBreakerStateMachine(config)
private val actorRef: ActorRef[CircuitBreakerStateMachine] = Actor.create(stateMachine)

private def tryAcquire(): AcquireResult = stateMachine.state match
case currState @ CircuitBreakerState.Closed(_) => AcquireResult(true, currState)
case currState @ CircuitBreakerState.Open(_) => AcquireResult(false, currState)
case currState @ CircuitBreakerState.HalfOpen(_, semaphore, _) => AcquireResult(semaphore.tryAcquire(1), currState)

/** Runs the operation using the given error mode or drops it if the breaker is open.
* @param em
* The error mode to use, which specifies when a result value is considered success, and when a failure.
* @param operation
* The operation to run.
* @return
* `Some` if the operation has been run, `None` if the operation has been dropped.
*/
def runOrDropWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(
operation: => F[T]
): Option[F[T]] =
val acquiredResult = tryAcquire()
if acquiredResult.acquired then
val (duration, result) = timed(operation)
if em.isError(result) then
actorRef.tell(_.registerResult(CircuitBreakerResult.Failure, acquiredResult, actorRef))
Some(result)
else
if duration > config.slowCallDurationThreshold then
actorRef.tell(_.registerResult(CircuitBreakerResult.Slow, acquiredResult, actorRef))
else actorRef.tell(_.registerResult(CircuitBreakerResult.Success, acquiredResult, actorRef))
Some(result)
end if
else None
end if
end runOrDropWithErrorMode

/** Runs the operation returning [[scala.util.Either]] or drops it if the breaker is open. Note that any exceptions thrown by the
* operation aren't caught and are propagated to user.
*
* @param operation
* The operation to run.
* @return
* `Some` if the operation has been run, `None` if the operation has been dropped.
* @throws anything
* The exception thrown by operation.
*/
def runOrDropEither[E, T](
operation: => Either[E, T]
): Option[Either[E, T]] =
runOrDropWithErrorMode(EitherMode[E])(operation)

/** Runs the operation or drops it if the breaker is open returning a direct result wrapped in [[Option]]
*
* @param operation
* The operation to run.
* @return
* `Some` if the operation has been run, `None` if the operation has been dropped.
* @throws anything
* The exception thrown by operation.
*/
def runOrDrop[T](operation: => T): Option[T] =
runOrDropEither(Try(operation).toEither).map(_.fold(throw _, identity))
end CircuitBreaker

object CircuitBreaker:
given default: BufferCapacity = BufferCapacity.apply(100)
79 changes: 79 additions & 0 deletions core/src/main/scala/ox/resilience/CircuitBreakerConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package ox.resilience

import scala.concurrent.duration.*

/** Allows to configure how [[Metrics]] will be calculated
*/
enum SlidingWindow:
/** Window counting last n operations when calculating metrics.
* @param windowSize
* number of last n results recorded.
*/
case CountBased(windowSize: Int)

/** Window counting operations in the lapse of `duration` before current time.
* @param duration
* span of time in which results are included in metrics.
*/
case TimeBased(duration: FiniteDuration)
end SlidingWindow

/** Type representing percentage threshold between 0 and 100 */
opaque type PercentageThreshold = Int

extension (c: PercentageThreshold)
def toInt: Int = c
def isExceeded(by: Int): Boolean = by >= c

object PercentageThreshold:
def apply(c: Int): PercentageThreshold =
assert(c >= 0 && c <= 100, s"PercentageThreshold must be between 0 and 100, value: $c")
c

/** @param failureRateThreshold
* threshold, as percentage of operations that ended in failure
* @param slowCallThreshold
* threshold, as percentage of operations that spanned more then [[slowCallDurationThreshold]].
* @param slowCallDurationThreshold
* time after which operation is considered slow.
* @param slidingWindow
* configures how thresholds will be calculated. See [[SlidingWindow]] for more details.
* @param minimumNumberOfCalls
* minimum number of results that must be registered before metrics are calculated.
* @param waitDurationOpenState
* how much time will pass before breaker will switch from open to half open state.
* @param halfOpenTimeoutDuration
* time out after which, if not enough calls where registered in half open state, breaker will go back to open state.
* @param numberOfCallsInHalfOpenState
* number of results that must be registered to calculate metrics and decide if breaker should go back to open state or close. This is
* also maximum number of operations that can be started in half open state.
*/
case class CircuitBreakerConfig(
failureRateThreshold: PercentageThreshold,
slowCallThreshold: PercentageThreshold,
slowCallDurationThreshold: FiniteDuration,
slidingWindow: SlidingWindow,
minimumNumberOfCalls: Int,
waitDurationOpenState: FiniteDuration,
halfOpenTimeoutDuration: FiniteDuration,
numberOfCallsInHalfOpenState: Int
):

assert(
numberOfCallsInHalfOpenState > 0,
s"numberOfCallsInHalfOpenState must be greater than 0, value: $numberOfCallsInHalfOpenState"
)
end CircuitBreakerConfig

object CircuitBreakerConfig:
def default: CircuitBreakerConfig = CircuitBreakerConfig(
failureRateThreshold = PercentageThreshold(50),
slowCallThreshold = PercentageThreshold(50),
slowCallDurationThreshold = 10.seconds,
slidingWindow = SlidingWindow.CountBased(100),
minimumNumberOfCalls = 20,
waitDurationOpenState = 10.seconds,
halfOpenTimeoutDuration = 0.millis,
numberOfCallsInHalfOpenState = 10
)
end CircuitBreakerConfig
Loading