Skip to content

Commit

Permalink
Allow start command to force waiting resources to start (dotnet#7312)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK authored Jan 29, 2025
1 parent 444eb64 commit c3dc761
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 62 deletions.
2 changes: 1 addition & 1 deletion playground/Stress/Stress.AppHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
builder.Services.AddHttpClient();
builder.Services.AddHealthChecks().AddAsyncCheck("health-test", async (ct) =>
{
await Task.Delay(500, ct);
await Task.Delay(5_000, ct);
return HealthCheckResult.Healthy();
});

Expand Down
50 changes: 50 additions & 0 deletions src/Aspire.Hosting/Dcp/AppResource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Dcp.Model;
using System.Diagnostics;

namespace Aspire.Hosting.Dcp;

[DebuggerDisplay("ModelResource = {ModelResource}, DcpResourceName = {DcpResourceName}")]
internal class AppResource : IResourceReference
{
public IResource ModelResource { get; }
public CustomResource DcpResource { get; }
public string DcpResourceName => DcpResource.Metadata.Name;
public virtual List<ServiceAppResource> ServicesProduced { get; } = [];
public virtual List<ServiceAppResource> ServicesConsumed { get; } = [];

public AppResource(IResource modelResource, CustomResource dcpResource)
{
ModelResource = modelResource;
DcpResource = dcpResource;
}
}

internal sealed class ServiceAppResource : AppResource
{
public Service Service => (Service)DcpResource;
public EndpointAnnotation EndpointAnnotation { get; }

public override List<ServiceAppResource> ServicesProduced
{
get { throw new InvalidOperationException("Service resources do not produce any services"); }
}
public override List<ServiceAppResource> ServicesConsumed
{
get { throw new InvalidOperationException("Service resources do not consume any services"); }
}

public ServiceAppResource(IResource modelResource, Service service, EndpointAnnotation sba) : base(modelResource, service)
{
EndpointAnnotation = sba;
}
}

internal interface IResourceReference
{
IResource ModelResource { get; }
string DcpResourceName { get; }
}
61 changes: 13 additions & 48 deletions src/Aspire.Hosting/Dcp/DcpExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,41 +29,6 @@

namespace Aspire.Hosting.Dcp;

[DebuggerDisplay("ModelResource = {ModelResource}, DcpResource = {DcpResource}")]
internal class AppResource
{
public IResource ModelResource { get; }
public CustomResource DcpResource { get; }
public virtual List<ServiceAppResource> ServicesProduced { get; } = [];
public virtual List<ServiceAppResource> ServicesConsumed { get; } = [];

public AppResource(IResource modelResource, CustomResource dcpResource)
{
ModelResource = modelResource;
DcpResource = dcpResource;
}
}

internal sealed class ServiceAppResource : AppResource
{
public Service Service => (Service)DcpResource;
public EndpointAnnotation EndpointAnnotation { get; }

public override List<ServiceAppResource> ServicesProduced
{
get { throw new InvalidOperationException("Service resources do not produce any services"); }
}
public override List<ServiceAppResource> ServicesConsumed
{
get { throw new InvalidOperationException("Service resources do not consume any services"); }
}

public ServiceAppResource(IResource modelResource, Service service, EndpointAnnotation sba) : base(modelResource, service)
{
EndpointAnnotation = sba;
}
}

internal sealed class DcpExecutor : IDcpExecutor
{
private const string DebugSessionPortVar = "DEBUG_SESSION_PORT";
Expand Down Expand Up @@ -1650,12 +1615,12 @@ private static V1Patch CreatePatch<T>(T obj, Action<T> change) where T : CustomR
return new V1Patch(jsonPatch, V1Patch.PatchType.JsonPatch);
}

public async Task StopResourceAsync(string resourceName, CancellationToken cancellationToken)
public async Task StopResourceAsync(IResourceReference resourceReference, CancellationToken cancellationToken)
{
var matchingResource = GetMatchingResource(resourceName);
var appResource = (AppResource)resourceReference;

V1Patch patch;
switch (matchingResource.DcpResource)
switch (appResource.DcpResource)
{
case Container c:
patch = CreatePatch(c, obj => obj.Spec.Stop = true);
Expand All @@ -1666,11 +1631,11 @@ public async Task StopResourceAsync(string resourceName, CancellationToken cance
await _kubernetesService.PatchAsync(e, patch, cancellationToken).ConfigureAwait(false);
break;
default:
throw new InvalidOperationException($"Unexpected resource type: {matchingResource.DcpResource.GetType().FullName}");
throw new InvalidOperationException($"Unexpected resource type: {appResource.DcpResource.GetType().FullName}");
}
}

private AppResource GetMatchingResource(string resourceName)
public IResourceReference GetResource(string resourceName)
{
var matchingResource = _appResources
.Where(r => r.DcpResource is not Service)
Expand All @@ -1683,14 +1648,14 @@ private AppResource GetMatchingResource(string resourceName)
return matchingResource;
}

public async Task StartResourceAsync(string resourceName, CancellationToken cancellationToken)
public async Task StartResourceAsync(IResourceReference resourceReference, CancellationToken cancellationToken)
{
var matchingResource = GetMatchingResource(resourceName);
var resourceType = GetResourceType(matchingResource.DcpResource, matchingResource.ModelResource);
var appResource = (AppResource)resourceReference;
var resourceType = GetResourceType(appResource.DcpResource, appResource.ModelResource);

try
{
switch (matchingResource.DcpResource)
switch (appResource.DcpResource)
{
case Container c:
await StartExecutableOrContainerAsync(c).ConfigureAwait(false);
Expand All @@ -1699,13 +1664,13 @@ public async Task StartResourceAsync(string resourceName, CancellationToken canc
await StartExecutableOrContainerAsync(e).ConfigureAwait(false);
break;
default:
throw new InvalidOperationException($"Unexpected resource type: {matchingResource.DcpResource.GetType().FullName}");
throw new InvalidOperationException($"Unexpected resource type: {appResource.DcpResource.GetType().FullName}");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to start resource {ResourceName}", matchingResource.ModelResource.Name);
await _executorEvents.PublishAsync(new OnResourceFailedToStartContext(cancellationToken, resourceType, matchingResource.ModelResource, matchingResource.DcpResource.Metadata.Name)).ConfigureAwait(false);
_logger.LogError(ex, "Failed to start resource {ResourceName}", appResource.ModelResource.Name);
await _executorEvents.PublishAsync(new OnResourceFailedToStartContext(cancellationToken, resourceType, appResource.ModelResource, appResource.DcpResource.Metadata.Name)).ConfigureAwait(false);
throw;
}

Expand Down Expand Up @@ -1765,7 +1730,7 @@ await execution.ExecuteAsync(async (attemptCancellationToken) =>

// Raise event after resource has been deleted. This is required because the event sets the status to "Starting" and resources being
// deleted will temporarily override the status to a terminal state, such as "Exited".
await _executorEvents.PublishAsync(new OnResourceStartingContext(cancellationToken, resourceType, matchingResource.ModelResource, matchingResource.DcpResource.Metadata.Name)).ConfigureAwait(false);
await _executorEvents.PublishAsync(new OnResourceStartingContext(cancellationToken, resourceType, appResource.ModelResource, appResource.DcpResource.Metadata.Name)).ConfigureAwait(false);

await _kubernetesService.CreateAsync(resource, cancellationToken).ConfigureAwait(false);
}
Expand Down
5 changes: 3 additions & 2 deletions src/Aspire.Hosting/Dcp/IDcpExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ internal interface IDcpExecutor
{
Task RunApplicationAsync(CancellationToken cancellationToken);
Task StopAsync(CancellationToken cancellationToken);
Task StartResourceAsync(string resourceName, CancellationToken cancellationToken);
Task StopResourceAsync(string resourceName, CancellationToken cancellationToken);
IResourceReference GetResource(string resourceName);
Task StartResourceAsync(IResourceReference resourceReference, CancellationToken cancellationToken);
Task StopResourceAsync(IResourceReference resourceReference, CancellationToken cancellationToken);
}
6 changes: 0 additions & 6 deletions src/Aspire.Hosting/DistributedApplicationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,6 @@ public DistributedApplicationBuilder(DistributedApplicationOptions options)

ExecutionContext = new DistributedApplicationExecutionContext(_executionContextOptions);

Eventing.Subscribe<BeforeResourceStartedEvent>(async (@event, ct) =>
{
var rns = @event.Services.GetRequiredService<ResourceNotificationService>();
await rns.WaitForDependenciesAsync(@event.Resource, ct).ConfigureAwait(false);
});

// Conditionally configure AppHostSha based on execution context. For local scenarios, we want to
// account for the path the AppHost is running from to disambiguate between different projects
// with the same name as seen in https://github.com/dotnet/aspire/issues/5413. For publish scenarios,
Expand Down
65 changes: 63 additions & 2 deletions src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,43 @@ public ApplicationOrchestrator(DistributedApplicationModel model,
dcpExecutorEvents.Subscribe<OnResourcesPreparedContext>(OnResourcesPrepared);
dcpExecutorEvents.Subscribe<OnResourceChangedContext>(OnResourceChanged);
dcpExecutorEvents.Subscribe<OnResourceFailedToStartContext>(OnResourceFailedToStart);

// Implement WaitFor functionality using BeforeResourceStartedEvent.
_eventing.Subscribe<BeforeResourceStartedEvent>(WaitForInBeforeResourceStartedEvent);
}

private async Task WaitForInBeforeResourceStartedEvent(BeforeResourceStartedEvent @event, CancellationToken cancellationToken)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

var waitForDependenciesTask = _notificationService.WaitForDependenciesAsync(@event.Resource, cts.Token);
if (waitForDependenciesTask.IsCompletedSuccessfully)
{
// Nothing to wait for. Return immediately.
return;
}

// Wait for either dependencies to be ready or for someone to move the resource out of a waiting state.
// This happens when resource start command is run, which forces the status to "Starting".
var waitForNonWaitingStateTask = _notificationService.WaitForResourceAsync(
@event.Resource.Name,
e => e.Snapshot.State?.Text != KnownResourceStates.Waiting,
cts.Token);

try
{
var completedTask = await Task.WhenAny(waitForDependenciesTask, waitForNonWaitingStateTask).ConfigureAwait(false);
if (completedTask.IsFaulted)
{
// Make error visible from completed task.
await completedTask.ConfigureAwait(false);
}
}
finally
{
// Ensure both wait tasks are cancelled.
cts.Cancel();
}
}

private async Task OnEndpointsAllocated(OnEndpointsAllocatedContext context)
Expand Down Expand Up @@ -148,12 +185,36 @@ public async Task StopAsync(CancellationToken cancellationToken)

public async Task StartResourceAsync(string resourceName, CancellationToken cancellationToken)
{
await _dcpExecutor.StartResourceAsync(resourceName, cancellationToken).ConfigureAwait(false);
var resourceReference = _dcpExecutor.GetResource(resourceName);

// Figure out if the resource is waiting or not using PublishUpdateAsync, and if it is then set the
// state to "Starting" to force waiting to complete.
var isWaiting = false;
await _notificationService.PublishUpdateAsync(
resourceReference.ModelResource,
s =>
{
if (s.State?.Text == KnownResourceStates.Waiting)
{
isWaiting = true;
return s with { State = KnownResourceStates.Starting };
}

return s;
}).ConfigureAwait(false);

// A waiting resource is already trying to start up and asking DCP to start it will result in a conflict.
// We only want to ask the DCP to start the resource if it wasn't.
if (!isWaiting)
{
await _dcpExecutor.StartResourceAsync(resourceReference, cancellationToken).ConfigureAwait(false);
}
}

public async Task StopResourceAsync(string resourceName, CancellationToken cancellationToken)
{
await _dcpExecutor.StopResourceAsync(resourceName, cancellationToken).ConfigureAwait(false);
var resourceReference = _dcpExecutor.GetResource(resourceName);
await _dcpExecutor.StopResourceAsync(resourceReference, cancellationToken).ConfigureAwait(false);
}

private static ILookup<IResource?, IResourceWithParent> GetParentChildLookup(DistributedApplicationModel model)
Expand Down
4 changes: 3 additions & 1 deletion tests/Aspire.Hosting.Tests/Dcp/DcpExecutorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,9 @@ public async Task ErrorIfResourceNotDeletedBeforeRestart()

var dcpCtr = Assert.Single(kubernetesService.CreatedResources.OfType<Container>());

var ex = await Assert.ThrowsAsync<DistributedApplicationException>(async () => await appExecutor.StartResourceAsync(dcpCtr.Metadata.Name, CancellationToken.None));
var resourceReference = appExecutor.GetResource(dcpCtr.Metadata.Name);

var ex = await Assert.ThrowsAsync<DistributedApplicationException>(async () => await appExecutor.StartResourceAsync(resourceReference, CancellationToken.None));
Assert.Equal($"Failed to delete '{dcpCtr.Metadata.Name}' successfully before restart.", ex.Message);

// Verify failed to start event.
Expand Down
45 changes: 45 additions & 0 deletions tests/Aspire.Hosting.Tests/DistributedApplicationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Aspire.Hosting.Dcp;
using Aspire.Hosting.Dcp.Model;
using Aspire.Hosting.Eventing;
using Aspire.Hosting.Health;
using Aspire.Hosting.Lifecycle;
using Aspire.Hosting.Orchestrator;
using Aspire.Hosting.Testing;
Expand All @@ -18,6 +19,7 @@
using k8s.Models;
using Microsoft.AspNetCore.InternalTesting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -108,6 +110,49 @@ public async Task MultipleRegisteredLifecycleHooksAreExecuted()
Assert.True(signal.SecondHookExecuted);
}

[Fact]
[RequiresDocker]
public async Task StartResourceForcesStart()
{
using var testProgram = CreateTestProgram();
testProgram.AppBuilder.Services.AddLogging(b => b.AddXunit(_testOutputHelper));
testProgram.AppBuilder.Services.AddHealthChecks().AddCheck("dummy_healthcheck", () => HealthCheckResult.Unhealthy());

var dependentResourceName = "serviceb";

testProgram.ServiceABuilder.WithHealthCheck("dummy_healthcheck");
testProgram.ServiceBBuilder.WaitFor(testProgram.ServiceABuilder);

using var app = testProgram.Build();
var rns = app.Services.GetRequiredService<ResourceNotificationService>();
var orchestrator = app.Services.GetRequiredService<ApplicationOrchestrator>();
var logger = app.Services.GetRequiredService<ILogger<ResourceHealthCheckService>>();

var startTask = app.StartAsync();

var resourceEvent = await rns.WaitForResourceAsync(dependentResourceName, e => e.Snapshot.State?.Text == KnownResourceStates.Waiting).DefaultTimeout(TestConstants.LongTimeoutTimeSpan);

logger.LogInformation("Force resource to start.");
await orchestrator.StartResourceAsync(resourceEvent.ResourceId, CancellationToken.None).DefaultTimeout(TestConstants.LongTimeoutTimeSpan);
await rns.WaitForResourceAsync(dependentResourceName, e => e.Snapshot.State?.Text == KnownResourceStates.Running).DefaultTimeout(TestConstants.LongTimeoutTimeSpan);

logger.LogInformation("Stop resource.");
await orchestrator.StopResourceAsync(resourceEvent.ResourceId, CancellationToken.None).DefaultTimeout(TestConstants.LongTimeoutTimeSpan);
await rns.WaitForResourceAsync(dependentResourceName, e => e.Snapshot.State?.Text == KnownResourceStates.Finished).DefaultTimeout(TestConstants.LongTimeoutTimeSpan);

logger.LogInformation("Start resource (into waiting state)");
var restartResourceTask = orchestrator.StartResourceAsync(resourceEvent.ResourceId, CancellationToken.None).DefaultTimeout(TestConstants.LongTimeoutTimeSpan);
await rns.WaitForResourceAsync(dependentResourceName, e => e.Snapshot.State?.Text == KnownResourceStates.Waiting).DefaultTimeout(TestConstants.LongTimeoutTimeSpan);

logger.LogInformation("Force resource to start.");
await orchestrator.StartResourceAsync(resourceEvent.ResourceId, CancellationToken.None).DefaultTimeout(TestConstants.LongTimeoutTimeSpan);
await rns.WaitForResourceAsync(dependentResourceName, e => e.Snapshot.State?.Text == KnownResourceStates.Running).DefaultTimeout(TestConstants.LongTimeoutTimeSpan);

await restartResourceTask.DefaultTimeout(TestConstants.LongTimeoutDuration);
await startTask.DefaultTimeout(TestConstants.LongTimeoutTimeSpan);
await app.StopAsync().DefaultTimeout(TestConstants.LongTimeoutTimeSpan);
}

[Fact]
public void RegisteredLifecycleHookIsExecutedWhenRunSynchronously()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@ private static ApplicationOrchestrator CreateOrchestrator(

private sealed class TestDcpExecutor : IDcpExecutor
{
public IResourceReference GetResource(string resourceName) => throw new NotImplementedException();

public Task RunApplicationAsync(CancellationToken cancellationToken) => Task.CompletedTask;

public Task StartResourceAsync(string resourceName, CancellationToken cancellationToken) => Task.CompletedTask;
public Task StartResourceAsync(IResourceReference resourceReference, CancellationToken cancellationToken) => Task.CompletedTask;

public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;

public Task StopResourceAsync(string resourceName, CancellationToken cancellationToken) => Task.CompletedTask;
public Task StopResourceAsync(IResourceReference resourceReference, CancellationToken cancellationToken) => Task.CompletedTask;
}

private sealed class CustomChildResource(string name, IResource parent) : Resource(name), IResourceWithParent
Expand Down

0 comments on commit c3dc761

Please sign in to comment.