Skip to content

Commit

Permalink
Create separate health status for source inactivity
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed May 27, 2024
1 parent 896eb39 commit 53eb2b5
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,17 @@ object SourceAndAck {
*/
case class LaggingEventProcessor(latency: FiniteDuration) extends Unhealthy

/**
* The health status expected if the source of events has been inactive for some time
*
* @param duration
* How long the source of events has been inactive
*/
case class InactiveSource(duration: FiniteDuration) extends Unhealthy

implicit def showUnhealthy: Show[Unhealthy] = Show {
case Disconnected => "No connection to a source of events"
case LaggingEventProcessor(latency) => show"Processing latency is $latency"
case InactiveSource(duration) => show"Source of events has been inactive for $duration"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private[sources] object LowLevelSource {
case (_, Some(lastPullTime), _, now) if now - lastPullTime > maxAllowedProcessingLatency =>
SourceAndAck.LaggingEventProcessor(now - lastPullTime)
case (_, _, lastLiveness, now) if now - lastLiveness > maxAllowedProcessingLatency =>
SourceAndAck.LaggingEventProcessor(now - lastLiveness)
SourceAndAck.InactiveSource(now - lastLiveness)
case _ => SourceAndAck.Healthy
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
_ <- IO.sleep(5.minutes)
health <- sourceAndAck.isHealthy(5.seconds)
_ <- fiber.cancel
} yield health must beEqualTo(SourceAndAck.LaggingEventProcessor(10.seconds))
} yield health must beEqualTo(SourceAndAck.InactiveSource(10.seconds))

TestControl.executeEmbed(io)
}
Expand Down

0 comments on commit 53eb2b5

Please sign in to comment.