Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Healthcheck.Persistence degraded support #284

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/Akka.HealthCheck.Hosting/AkkaHealthCheckOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,24 @@ public sealed class ProviderOptions
/// <summary>
/// Defines the interval for each persistence liveness health check probe refresh.
/// Does not have any effect on readiness options.
/// Default value: 10 seconds
/// </summary>
public TimeSpan? PersistenceProbeInterval { get; set; }

/// <summary>
/// Defines the timeout for each persistence liveness check operation
/// Does not have any effect on readiness options.
/// Default value: 3 seconds
/// </summary>
public TimeSpan? PersistenceProbeTimeout { get; set; }

/// <summary>
/// Defines the number of failures that needs to happen before the probe returns an unhealthy status.
/// Any failure count below this threshold will be reported as degraded.
/// Default value: 3
/// </summary>
public int? PersistenceProbeUnhealthyThreshold { get; set; }


public ProviderOptions ClearProviders()
{
Expand Down Expand Up @@ -198,8 +214,13 @@ public ProviderOptions AddProvider<T>(string key) where T : IProbeProvider
sb.AppendLine($"file.path = {FilePath}");
if (TcpPort is { })
sb.AppendLine($"tcp.port = {TcpPort}");

if(PersistenceProbeInterval is not null)
sb.AppendLine($"persistence.probe-interval = {PersistenceProbeInterval.ToHocon()}");
if (PersistenceProbeTimeout is not null)
sb.AppendLine($"persistence.timeout = {PersistenceProbeTimeout.ToHocon()}");
if (PersistenceProbeUnhealthyThreshold is not null)
sb.Append($"persistence.unhealthy-threshold = {PersistenceProbeUnhealthyThreshold.ToHocon()}");

return sb.Length > 0 ? sb : null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public AkkaPersistenceLivenessProbeNotAvailableDueToSnapshotStoreSpecs(ITestOutp
public void AkkaPersistenceLivenessProbeProvidert_Should_Report_Akka_Persistance_Is_Unavailable_With_Bad_Snapshot_Store_Setup()
{

var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds())));
var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds(), 0)));
ProbActor.Tell(new SubscribeToLiveness(TestActor));
var firstResult = ExpectMsg<LivenessStatus>();
firstResult.Status.Should().Be(AkkaHealthStatus.Degraded, "Initial status should be degraded, not unhealthy");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public AkkaPersistenceLivenessProbeNotAvailableDueToJournalSpecs(ITestOutputHelp
public void AkkaPersistenceLivenessProbeProvidert_Should_Report_Akka_Persistance_Is_Unavailable_With_Bad_Journal_Setup()
{

var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds())));
var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds(), 0)));
ProbActor.Tell(new SubscribeToLiveness(TestActor));
var firstResult = ExpectMsg<LivenessStatus>();
firstResult.Status.Should().Be(AkkaHealthStatus.Degraded, "Initial status should be degraded, not unhealthy");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public AkkaPersistenceLivenessProbeSubscriptionTest(ITestOutputHelper helper)
[Fact(DisplayName = "AkkaPersistenceLivenessProbe should correctly handle subscription requests")]
public void AkkaPersistenceLivenessProbe_Should_Handle_Subscriptions_In_Any_State()
{
var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds())));
var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds(), 0)));
ProbActor.Tell(new SubscribeToLiveness(TestActor));
var firstResult = ExpectMsg<LivenessStatus>();
firstResult.Status.Should().Be(AkkaHealthStatus.Degraded, "Initial status should be degraded, not unhealthy");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ await WithJournalRecovery(

private async Task TestTimeout(CancellationTokenSource cts)
{
var probeActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 500.Milliseconds())));
var probeActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 500.Milliseconds(), 0)));
probeActor.Tell(new SubscribeToLiveness(TestActor));
var status = ExpectMsg<LivenessStatus>();
status.Status.Should().Be(AkkaHealthStatus.Degraded);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// -----------------------------------------------------------------------
// <copyright file="PersistenceLivenessStatusSpecs.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Persistence.TestKit;
using FluentAssertions;
using Xunit;

namespace Akka.HealthCheck.Persistence.Tests;

public class PersistenceLivenessStatusSpecs
{
[Fact(DisplayName = "Healthy status should be alive")]
public void HealthyStatusTest()
{
var status = PersistenceLivenessStatus.Healthy("Test");
status.IsLive.Should().BeTrue();
status.StatusMessage.Should().Be("Test");
status.Failure.Should().BeNull();
status.Status.Should().Be(AkkaHealthStatus.Healthy);
}

[Fact(DisplayName = "Degraded status should still be alive")]
public void DegradedStatusTest()
{
var status = PersistenceLivenessStatus.Degraded(null, "Test");
status.IsLive.Should().BeTrue();
status.StatusMessage.Should().Be("Test");
status.Failure.Should().BeNull();
status.Status.Should().Be(AkkaHealthStatus.Degraded);

status = PersistenceLivenessStatus.Degraded(new TestJournalFailureException(), "Test");
status.IsLive.Should().BeTrue();
status.StatusMessage.Should().Be("Test");
status.Failure.Should().BeOfType<TestJournalFailureException>();
status.Status.Should().Be(AkkaHealthStatus.Degraded);
}

[Fact(DisplayName = "Unhealthy status should not be alive")]
public void UnhealthyStatusTest()
{
var status = PersistenceLivenessStatus.Unhealthy(null, "Test");
status.IsLive.Should().BeFalse();
status.StatusMessage.Should().Be("Test");
status.Failure.Should().BeNull();
status.Status.Should().Be(AkkaHealthStatus.Unhealthy);

status = PersistenceLivenessStatus.Unhealthy(new TestJournalFailureException(), "Test");
status.IsLive.Should().BeFalse();
status.StatusMessage.Should().Be("Test");
status.Failure.Should().BeOfType<TestJournalFailureException>();
status.Status.Should().Be(AkkaHealthStatus.Unhealthy);
}
}
32 changes: 32 additions & 0 deletions src/Akka.HealthCheck.Persistence.Tests/ProbeFailureSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,37 @@ private PersistenceLivenessStatus PerformProbe()

return status;
}

[Fact(DisplayName = "Failures should progressively change from healthy to degraded to unhealthy")]
public async Task FailStatusProgressionTest()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

{
const int failureThreshold = 3;
var probe = Sys.ActorOf(AkkaPersistenceLivenessProbe.PersistentHealthCheckProps(true, 500.Milliseconds(), 3.Seconds(), failureThreshold));
probe.Tell(new SubscribeToLiveness(TestActor), TestActor);

// wait until probe returns a healthy status
FishForMessage<PersistenceLivenessStatus>(s => s.Status is AkkaHealthStatus.Healthy);

await WithSnapshotLoad(load => load.Fail(), async () =>
{
PersistenceLivenessStatus status;

// wait until probe detects first failure
// needed to avoid race condition with decoupled suicide actor
FishForMessage<PersistenceLivenessStatus>(s => s.Status is AkkaHealthStatus.Degraded);

// Below failure threshold, probe should report degraded
// first failure test already handled above
for (var i = 0; i < failureThreshold - 1; i++)
{
status = await ExpectMsgAsync<PersistenceLivenessStatus>();
status.Status.Should().Be(AkkaHealthStatus.Degraded);
}

// exceeding failure threshold, probe should report unhealthy
status = await ExpectMsgAsync<PersistenceLivenessStatus>();
status.Status.Should().Be(AkkaHealthStatus.Unhealthy);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ await WithSnapshotLoad(load => load.Fail(), async () =>
{
Sys.EventStream.Subscribe(TestActor, typeof(LogEvent));
var probe = Sys.ActorOf(Props.Create(() =>
new AkkaPersistenceLivenessProbe(true, 400.Milliseconds(), 3.Seconds())));
new AkkaPersistenceLivenessProbe(true, 400.Milliseconds(), 3.Seconds(), 0)));
await FishForMessageAsync<LogEvent>(e => e.Message.ToString() is "Recreating persistence warmup probe.");

var stopwatch = Stopwatch.StartNew();
Expand Down
54 changes: 38 additions & 16 deletions src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

using System;
using System.Collections.Generic;
using System.Threading;
using Akka.Actor;
using Akka.Event;
using Akka.HealthCheck.Liveness;
Expand Down Expand Up @@ -50,8 +51,6 @@ public PersistenceLivenessStatus(
_message = message;
}

public override bool IsLive => base.IsLive && Failure is null;

public override string StatusMessage => _message ?? ToString();

public Exception? Failure { get; }
Expand Down Expand Up @@ -110,10 +109,14 @@ public class AkkaPersistenceLivenessProbe : ActorBase, IWithTimers
private readonly TimeSpan _timeout;
private readonly bool _logInfo;

public AkkaPersistenceLivenessProbe(bool logInfo, TimeSpan delay, TimeSpan timeout)
private readonly int _maxRetry;
private int _retryCount;

public AkkaPersistenceLivenessProbe(bool logInfo, TimeSpan delay, TimeSpan timeout, int maxRetry)
{
_delay = delay;
_timeout = timeout;
_maxRetry = maxRetry;
_logInfo = logInfo;
_log = Context.GetLogger();

Expand All @@ -122,9 +125,9 @@ public AkkaPersistenceLivenessProbe(bool logInfo, TimeSpan delay, TimeSpan timeo

public ITimerScheduler Timers { get; set; } = null!;

public static Props PersistentHealthCheckProps(bool logInfo, TimeSpan delay, TimeSpan timeout)
public static Props PersistentHealthCheckProps(bool logInfo, TimeSpan delay, TimeSpan timeout, int maxRetry)
{
return Props.Create(() => new AkkaPersistenceLivenessProbe(logInfo, delay, timeout))
return Props.Create(() => new AkkaPersistenceLivenessProbe(logInfo, delay, timeout, maxRetry))
.WithDeploy(Deploy.Local);
}

Expand Down Expand Up @@ -203,8 +206,7 @@ private bool WarmingUp(object message)
case CheckTimeout:
const string errMsg = "Timeout while checking persistence liveness. Persistence liveness status is undefined.";
_log.Warning(errMsg);
_currentLivenessStatus = PersistenceLivenessStatus.Unhealthy(null, errMsg);
PublishStatusUpdates();
HandleFailure(null, errMsg);

if(_probe is not null)
Context.Stop(_probe);
Expand All @@ -214,23 +216,21 @@ private bool WarmingUp(object message)
if(_logInfo)
_log.Debug("Persistence warmup complete");

_currentLivenessStatus = PersistenceLivenessStatus.Healthy("Persistence warmup complete");
PublishStatusUpdates();
HandleSuccess("Persistence warmup complete");
return true;

case WarmupFailed fail:
if(_logInfo)
_log.Warning(fail.Cause, "Persistence warmup failed");

_currentLivenessStatus = PersistenceLivenessStatus.Unhealthy(fail.Cause, "Persistence warmup failed");
PublishStatusUpdates();
HandleFailure(fail.Cause, "Persistence warmup failed");
return true;

default:
return HandleSubscriptions(message);
}
}

private bool Active(object message)
{
switch (message)
Expand Down Expand Up @@ -260,8 +260,7 @@ private bool Active(object message)
case CheckTimeout:
const string errMsg = "Timeout while checking persistence liveness. Persistence liveness status is undefined.";
_log.Warning(errMsg);
_currentLivenessStatus = PersistenceLivenessStatus.Unhealthy(null, errMsg);
PublishStatusUpdates();
HandleFailure(null, errMsg);

if(_probe is not null)
Context.Stop(_probe);
Expand All @@ -272,8 +271,7 @@ private bool Active(object message)
if(_logInfo)
_log.Debug("Received recovery status {0} from probe", status);

_currentLivenessStatus = status;
PublishStatusUpdates();
HandleStatus(status);
return true;

default:
Expand All @@ -291,6 +289,30 @@ protected override void PreStart()
Self.Tell(CreateProbe.Instance);
}

private void HandleFailure(Exception? e, string? message)
{
_retryCount++;
_currentLivenessStatus = _retryCount > _maxRetry
? PersistenceLivenessStatus.Unhealthy(e, message)
: PersistenceLivenessStatus.Degraded(e, message);
PublishStatusUpdates();
}

private void HandleSuccess(string? message)
{
_retryCount = 0;
_currentLivenessStatus = PersistenceLivenessStatus.Healthy(message);
PublishStatusUpdates();
}

private void HandleStatus(PersistenceLivenessStatus status)
{
if (status.Status is AkkaHealthStatus.Unhealthy or AkkaHealthStatus.Degraded)
HandleFailure(status.Failure, status.StatusMessage);
else
HandleSuccess(status.StatusMessage);
}

private void ScheduleProbeRestart()
{
Timers.StartSingleTimer(CreateProbe.Instance, CreateProbe.Instance, _delay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ public sealed class AkkaPersistenceLivenessProbeProvider : ProbeProviderBase
{
private readonly TimeSpan _interval;
private readonly TimeSpan _timeout;
private readonly int _maxRetry;

public AkkaPersistenceLivenessProbeProvider(ActorSystem system) : base(system)
{
var config = system.Settings.Config.GetConfig("akka.healthcheck.liveness.persistence");
_interval = config.GetTimeSpan("probe-interval", TimeSpan.FromSeconds(10));
_timeout = config.GetTimeSpan("timeout", TimeSpan.FromSeconds(3));
_maxRetry = config.GetInt("unhealthy-threshold", 3);
}

public override Props ProbeProps =>
AkkaPersistenceLivenessProbe.PersistentHealthCheckProps(Settings.LogInfoEvents, _interval, _timeout);
AkkaPersistenceLivenessProbe.PersistentHealthCheckProps(Settings.LogInfoEvents, _interval, _timeout, _maxRetry);
}
}
4 changes: 4 additions & 0 deletions src/Akka.HealthCheck/Configuration/akka.healthcheck.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ akka.healthcheck{

# Defines the timeout for each liveness check operation
timeout = 3s

# Defines the number of failures that needs to happen before the probe returns an unhealthy status.
# Any failure count below this threshold will be reported as degraded.
unhealthy-threshold = 3
}

# Defines the signaling mechanism used to communicate with K8s, AWS, Azure,
Expand Down