Skip to content

Commit

Permalink
Health checks are an "upsert" now so that a node is put back if it's …
Browse files Browse the repository at this point in the history
…deleted somehow, always guarantees that the current node is part of the assignments. Fix for bug with recent changes in envelope persistence for modular monoliths. Closes GH-1232
  • Loading branch information
jeremydmiller committed Jan 21, 2025
1 parent c30f831 commit d8e783d
Show file tree
Hide file tree
Showing 15 changed files with 95 additions and 15 deletions.
6 changes: 4 additions & 2 deletions src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ public async Task persist_an_incoming_envelope_raw()
Status = EnvelopeStatus.Scheduled,
Attempts = 2,
MessageType = "foo",
ContentType = EnvelopeConstants.JsonContentType
ContentType = EnvelopeConstants.JsonContentType,
Destination = TransportConstants.DurableLocalUri
};

var container = Host.Services.GetRequiredService<IServiceContainer>();
Expand Down Expand Up @@ -465,7 +466,8 @@ public async Task persist_an_incoming_envelope_mapped()
Status = EnvelopeStatus.Scheduled,
Attempts = 2,
MessageType = "foo",
ContentType = EnvelopeConstants.JsonContentType
ContentType = EnvelopeConstants.JsonContentType,
Destination = TransportConstants.DurableLocalUri
};

var container = Host.Services.GetRequiredService<IServiceContainer>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ public async Task send_messages_with_postgresql_queueing()
opts.ListenToPostgresqlQueue("request").MaximumParallelMessages(14, ProcessingOrder.UnOrdered);
opts.PublishMessage<ColorResponse>().ToPostgresqlQueue("response");

opts.Services.AddMarten(opt =>
opts.Durability.ScheduledJobPollingTime = 250.Milliseconds();

opts.Services.AddMarten(m =>
{
opt.Connection(Servers.PostgresConnectionString);
opt.Events.TenancyStyle = TenancyStyle.Conjoined;
m.Connection(Servers.PostgresConnectionString);
m.Events.TenancyStyle = TenancyStyle.Conjoined;
m.DisableNpgsqlLogging = true;
})
.UseLightweightSessions()
.IntegrateWithWolverine(options =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public async Task can_resume_listening()

await host.TrackActivity()
.WaitForMessageToBeReceivedAt<ThisMeansTrouble>(host)
.Timeout(20.Seconds())
.Timeout(60.Seconds())
.PublishMessageAndWaitAsync(new ThisMeansTrouble());
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,17 @@ await _dataSource.CreateCommand($"update {_nodeTable} set health_check = now() w
.With("id", nodeId).ExecuteNonQueryAsync();
}

public async Task MarkHealthCheckAsync(WolverineNode node, CancellationToken token)
{
var count = await _dataSource.CreateCommand($"update {_nodeTable} set health_check = now() where id = :id")
.With("id", node.NodeId).ExecuteNonQueryAsync(token);

if (count == 0)
{
await PersistAsync(node, token);
}
}

public async Task<IReadOnlyList<int>> LoadAllNodeAssignedIdsAsync()
{
return await _dataSource.CreateCommand($"select node_number from {_nodeTable}")
Expand Down
2 changes: 2 additions & 0 deletions src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public Task ReassignIncomingAsync(int ownerId, IReadOnlyList<Envelope> incoming)
{
builder.Append($"update {SchemaName}.{DatabaseConstants.IncomingTable} set owner_id = ");
builder.AppendParameter(ownerId);
builder.Append($" where {DatabaseConstants.Id} = ");
builder.AppendParameter(envelope.Id);
builder.Append($" and {DatabaseConstants.ReceivedAt} = ");
builder.AppendParameter(envelope.Destination.ToString());
builder.Append(";");
Expand Down
4 changes: 4 additions & 0 deletions src/Persistence/Wolverine.RDBMS/Polling/DatabaseBatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ public async Task DrainAsync()
_executingBlock.Complete();
await _executingBlock.Completion;
}
catch (TaskCanceledException)
{
// it just timed out, let it go
}
catch (Exception e)
{
_logger.LogError(e, "Error trying to drain the current database batcher");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ public async Task MarkHealthCheckAsync(Guid nodeId)
await session.SaveChangesAsync();
}

public async Task MarkHealthCheckAsync(WolverineNode node, CancellationToken cancellationToken)
{
using var session = _store.OpenAsyncSession();
session.Advanced.AddOrPatch(node.NodeId.ToString(), node, x => x.LastHealthCheck, DateTimeOffset.UtcNow);
await session.SaveChangesAsync(cancellationToken);
}

public async Task OverwriteHealthCheckTimeAsync(Guid nodeId, DateTimeOffset lastHeartbeatTime)
{
using var session = _store.OpenAsyncSession();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ public async Task<int> PersistAsync(WolverineNode node, CancellationToken cancel
await using var conn = new SqlConnection(_settings.ConnectionString);
await conn.OpenAsync(cancellationToken);

var raw = await persistNode(conn, node, cancellationToken);

await conn.CloseAsync();

return (int)raw;
}

private async Task<object> persistNode(SqlConnection conn, WolverineNode node, CancellationToken cancellationToken)
{
var strings = node.Capabilities.Select(x => x.ToString()).Join(",");

var cmd = conn.CreateCommand($"insert into {_nodeTable} (id, uri, capabilities, description) OUTPUT Inserted.node_number values (@id, @uri, @capabilities, @description) ")
Expand All @@ -55,10 +64,7 @@ public async Task<int> PersistAsync(WolverineNode node, CancellationToken cancel
.With("capabilities", strings);

var raw = await cmd.ExecuteScalarAsync(cancellationToken);

await conn.CloseAsync();

return (int)raw;
return raw;
}

public async Task DeleteAsync(Guid nodeId, int assignedNodeNumber)
Expand Down Expand Up @@ -160,6 +166,22 @@ await conn.CreateCommand($"update {_nodeTable} set health_check = GETUTCDATE() w
await conn.CloseAsync();
}

public async Task MarkHealthCheckAsync(WolverineNode node, CancellationToken cancellationToken)
{
await using var conn = new SqlConnection(_settings.ConnectionString);
await conn.OpenAsync(cancellationToken);

var count = await conn.CreateCommand($"update {_nodeTable} set health_check = GETUTCDATE() where id = @id")
.With("id", node.NodeId).ExecuteNonQueryAsync(cancellationToken);

if (count == 0)
{
await persistNode(conn, node, cancellationToken);
}

await conn.CloseAsync();
}

private async Task<WolverineNode> readNodeAsync(DbDataReader reader)
{
var node = new WolverineNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,15 @@ public async Task update_health_check_smoke_test()

await _database.Nodes.PersistAsync(node1, CancellationToken.None);
await _database.Nodes.PersistAsync(node2, CancellationToken.None);
await _database.Nodes.PersistAsync(node3, CancellationToken.None);
//await _database.Nodes.PersistAsync(node3, CancellationToken.None);

await _database.Nodes.MarkHealthCheckAsync(node1, CancellationToken.None);
await _database.Nodes.MarkHealthCheckAsync(node2, CancellationToken.None);
await _database.Nodes.MarkHealthCheckAsync(node3, CancellationToken.None);

await _database.Nodes.MarkHealthCheckAsync(node1.NodeId);
// Proving the upsert behavior
var nodes = await _database.Nodes.LoadAllNodesAsync(CancellationToken.None);
nodes.Any(x => x.NodeId == node3.NodeId).ShouldBeTrue();
}

}
5 changes: 5 additions & 0 deletions src/Wolverine/Persistence/Durability/NullMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ public Task MarkHealthCheckAsync(Guid nodeId)
return Task.CompletedTask;
}

public Task MarkHealthCheckAsync(WolverineNode node, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

public Task OverwriteHealthCheckTimeAsync(Guid nodeId, DateTimeOffset lastHeartbeatTime)
{
return Task.CompletedTask;
Expand Down
5 changes: 5 additions & 0 deletions src/Wolverine/Runtime/Agents/INodeAgentPersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ public interface INodeAgentPersistence
[Obsolete("Kill this in 3.0")]
Task<Guid?> MarkNodeAsLeaderAsync(Guid? originalLeader, Guid id);
Task<WolverineNode?> LoadNodeAsync(Guid nodeId, CancellationToken cancellationToken);

// TODO -- make this take WolverineNode instead
[Obsolete("Will be removed in 4.0")]
Task MarkHealthCheckAsync(Guid nodeId);

Task MarkHealthCheckAsync(WolverineNode node, CancellationToken cancellationToken);

Task OverwriteHealthCheckTimeAsync(Guid nodeId, DateTimeOffset lastHeartbeatTime);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ public partial class NodeAgentController
public async Task<AgentCommands> EvaluateAssignmentsAsync(IReadOnlyList<WolverineNode> nodes)
{
using var activity = WolverineTracing.ActivitySource.StartActivity("wolverine_node_assignments");

// Not sure how this *could* happen, but we had a report of it happening in production
// probably because someone messed w/ the database though
if (!nodes.Any())
{
// At least use the current node
nodes = new List<WolverineNode> { WolverineNode.For(_runtime.Options) };
}

var grid = new AssignmentGrid();

Expand Down
4 changes: 2 additions & 2 deletions src/Wolverine/Runtime/Agents/NodeAgentController.HeartBeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public async Task<AgentCommands> DoHealthChecksAsync()

using var activity = WolverineTracing.ActivitySource.StartActivity("wolverine_node_assignments");

// write health check regardless
await _persistence.MarkHealthCheckAsync(_runtime.Options.UniqueNodeId);
// write health check regardless, and due to GH-1232, pass in the whole node so you can do an upsert
await _persistence.MarkHealthCheckAsync(WolverineNode.For(_runtime.Options), _cancellation.Token);

var nodes = await _persistence.LoadAllNodesAsync(_cancellation.Token);

Expand Down
3 changes: 2 additions & 1 deletion src/Wolverine/Runtime/Agents/WolverineNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public static WolverineNode For(WolverineOptions options)
return new WolverineNode
{
NodeId = options.UniqueNodeId,
ControlUri = options.Transports.NodeControlEndpoint?.Uri
ControlUri = options.Transports.NodeControlEndpoint?.Uri,
LastHealthCheck = DateTimeOffset.UtcNow
};
}

Expand Down
4 changes: 4 additions & 0 deletions src/Wolverine/Runtime/HandlerPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public async Task InvokeAsync(Envelope envelope, IChannelCallback channel, Activ
var continuation = await executeAsync(context, envelope, activity);
await continuation.ExecuteAsync(context, _runtime, DateTimeOffset.Now, activity);
}
catch (ObjectDisposedException)
{
// It's shutting down, get out of here
}
catch (Exception e)
{
await channel.CompleteAsync(envelope);
Expand Down

0 comments on commit d8e783d

Please sign in to comment.