diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala index 358ae44..fbed044 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala @@ -73,8 +73,17 @@ object SourceAndAck { */ case class LaggingEventProcessor(latency: FiniteDuration) extends Unhealthy + /** + * The health status expected if the source of events has been inactive for some time + * + * @param duration + * How long the source of events has been inactive + */ + case class InactiveSource(duration: FiniteDuration) extends Unhealthy + implicit def showUnhealthy: Show[Unhealthy] = Show { case Disconnected => "No connection to a source of events" case LaggingEventProcessor(latency) => show"Processing latency is $latency" + case InactiveSource(duration) => show"Source of events has been inactive for $duration" } } diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala index 7fe3b44..dbdb5ae 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala @@ -122,7 +122,7 @@ private[sources] object LowLevelSource { case (_, Some(lastPullTime), _, now) if now - lastPullTime > maxAllowedProcessingLatency => SourceAndAck.LaggingEventProcessor(now - lastPullTime) case (_, _, lastLiveness, now) if now - lastLiveness > maxAllowedProcessingLatency => - SourceAndAck.LaggingEventProcessor(now - lastLiveness) + SourceAndAck.InactiveSource(now - lastLiveness) case _ => SourceAndAck.Healthy } } diff --git a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala index 7129c24..a7dfece 100644 --- a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala +++ b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala @@ -635,7 +635,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { _ <- IO.sleep(5.minutes) health <- sourceAndAck.isHealthy(5.seconds) _ <- fiber.cancel - } yield health must beEqualTo(SourceAndAck.LaggingEventProcessor(10.seconds)) + } yield health must beEqualTo(SourceAndAck.InactiveSource(10.seconds)) TestControl.executeEmbed(io) }