Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use streaming to get logs for Executables and Containers #2435

Merged
merged 7 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions src/Aspire.Hosting/Dashboard/ConsoleLogPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Hosting.Dcp;
using Aspire.Hosting.Dcp.Model;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace Aspire.Hosting.Dashboard;

internal sealed class ConsoleLogPublisher(ResourcePublisher resourcePublisher)
using LogsEnumerable = IAsyncEnumerable<IReadOnlyList<(string Content, bool IsErrorMessage)>>;

internal sealed class ConsoleLogPublisher(
ResourcePublisher resourcePublisher,
IKubernetesService kubernetesService,
ILoggerFactory loggerFactory,
IConfiguration configuration)
{
internal IAsyncEnumerable<IReadOnlyList<(string Content, bool IsErrorMessage)>>? Subscribe(string resourceName)
internal LogsEnumerable? Subscribe(string resourceName)
{
// Look up the requested resource, so we know how to obtain logs.
if (!resourcePublisher.TryGetResource(resourceName, out var resource))
Expand All @@ -14,13 +25,36 @@ internal sealed class ConsoleLogPublisher(ResourcePublisher resourcePublisher)
}

// Obtain logs using the relevant approach.
// Note, we would like to obtain these logs via DCP directly, rather than sourcing them in the dashboard.
return resource switch
if (configuration.GetBool("DOTNET_ASPIRE_USE_STREAMING_LOGS") is true)
{
ExecutableSnapshot executable => SubscribeExecutable(executable),
ContainerSnapshot container => SubscribeContainer(container),
_ => throw new NotSupportedException($"Unsupported resource type {resource.GetType()}.")
};
return resource switch
{
ExecutableSnapshot executable => SubscribeExecutableResource(executable),
ContainerSnapshot container => SubscribeContainerResource(container),
_ => throw new NotSupportedException($"Unsupported resource type {resource.GetType()}.")
};
}
else
{
return resource switch
{
ExecutableSnapshot executable => SubscribeExecutable(executable),
ContainerSnapshot container => SubscribeContainer(container),
_ => throw new NotSupportedException($"Unsupported resource type {resource.GetType()}.")
};
}

LogsEnumerable SubscribeExecutableResource(ExecutableSnapshot executable)
{
var executableIdentity = Executable.Create(executable.Name, string.Empty);
return new ResourceLogSource<Executable>(loggerFactory, kubernetesService, executableIdentity);
}

LogsEnumerable SubscribeContainerResource(ContainerSnapshot container)
{
var containerIdentity = Container.Create(container.Name, string.Empty);
return new ResourceLogSource<Container>(loggerFactory, kubernetesService, containerIdentity);
}

static FileLogSource? SubscribeExecutable(ExecutableSnapshot executable)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Aspire.Hosting/Dashboard/DashboardServiceData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public DashboardServiceData(
ILoggerFactory loggerFactory)
{
_resourcePublisher = new ResourcePublisher(_cts.Token);
_consoleLogPublisher = new ConsoleLogPublisher(_resourcePublisher);
_consoleLogPublisher = new ConsoleLogPublisher(_resourcePublisher, kubernetesService, loggerFactory, configuration);

_ = new DcpDataSource(kubernetesService, applicationModel, configuration, loggerFactory, _resourcePublisher.IntegrateAsync, _cts.Token);
}
Expand Down
88 changes: 88 additions & 0 deletions src/Aspire.Hosting/Dashboard/ResourceLogSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Microsoft.Extensions.Logging;
using System.Threading.Channels;
using Aspire.Hosting.Dcp;
using Aspire.Hosting.Dcp.Model;

namespace Aspire.Hosting.Dashboard;

using LogEntry = (string Content, bool IsErrorMessage);
using LogEntryList = IReadOnlyList<(string Content, bool IsErrorMessage)>;

internal sealed class ResourceLogSource<TResource>(
ILoggerFactory loggerFactory,
IKubernetesService kubernetesService,
TResource resource) :
IAsyncEnumerable<LogEntryList>
where TResource : CustomResource
{
public async IAsyncEnumerator<LogEntryList> GetAsyncEnumerator(CancellationToken cancellationToken)
{
if (!cancellationToken.CanBeCanceled)
{
throw new ArgumentException("Cancellation token must be cancellable in order to prevent leaking resources.", nameof(cancellationToken));
}

var stdoutStream = await kubernetesService.GetLogStreamAsync(resource, Logs.StreamTypeStdOut, follow: true, cancellationToken).ConfigureAwait(false);
var stderrStream = await kubernetesService.GetLogStreamAsync(resource, Logs.StreamTypeStdErr, follow: true, cancellationToken).ConfigureAwait(false);

var channel = Channel.CreateUnbounded<LogEntry>(new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
SingleReader = true,
SingleWriter = false
});

var logger = loggerFactory.CreateLogger<ResourceLogSource<TResource>>();

var stdoutStreamTask = Task.Run(() => StreamLogsAsync(stdoutStream, isError: false), cancellationToken);
var stderrStreamTask = Task.Run(() => StreamLogsAsync(stderrStream, isError: true), cancellationToken);

// End the enumeration when both streams have been read to completion.
_ = Task.WhenAll(stdoutStreamTask, stderrStreamTask).ContinueWith
(_ => { channel.Writer.TryComplete(); },
cancellationToken,
TaskContinuationOptions.None,
TaskScheduler.Default).ConfigureAwait(false);

await foreach (var batch in channel.GetBatches(cancellationToken))
{
yield return batch;
}

async Task StreamLogsAsync(Stream stream, bool isError)
{
try
{
using StreamReader sr = new StreamReader(stream, leaveOpen: false);
while (!cancellationToken.IsCancellationRequested)
{
var line = await sr.ReadLineAsync(cancellationToken).ConfigureAwait(false);
if (line is null)
{
return; // No more data
}

var succeeded = channel.Writer.TryWrite((line, isError));
if (!succeeded)
{
logger.LogWarning("Failed to write log entry to channel. Logs for {Kind} {Name} may be incomplete", resource.Kind, resource.Metadata.Name);
channel.Writer.TryComplete();
return;
}
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Expected
}
catch (Exception ex)
{
logger.LogError(ex, "Unexpected error happened when capturing logs for {Kind} {Name}", resource.Kind, resource.Metadata.Name);
channel.Writer.TryComplete(ex);
}
}
}
}
113 changes: 113 additions & 0 deletions src/Aspire.Hosting/Dcp/DcpKubernetesClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using k8s;
using k8s.Autorest;

namespace Aspire.Hosting.Dcp;

// We need to create a custom Kubernetes client to support reading arbitrary subresources from a Kubernetes resource as a stream.
// k8s.Kubernetes does not support this operation natively, and required machinery (SendRequest() in particular) is protected.

internal class DcpKubernetesClient : k8s.Kubernetes
{
public DcpKubernetesClient(KubernetesClientConfiguration config, params DelegatingHandler[] handlers) : base(config, handlers)
{
}

/// <summary>
/// Asynchronously reads a sub-resource from a Kubernetes resource as a stream.
/// </summary>
/// <param name="group">The API group of the Kubernetes resource.</param>
/// <param name="version">The API version of the Kubernetes resource.</param>
/// <param name="plural">The plural name (API kind) of the Kubernetes resource, e.g. "executables".</param>
/// <param name="name">The name of the Kubernetes resource to use for sub-resource read operation.</param>
/// <param name="subResource">The sub-resource to read from the Kubernetes resource.</param>
/// <param name="namespaceParameter">The namespace of the Kubernetes resource.
/// If null or empty, the resource is assumed to be non-namespaced.</param>
/// <param name="queryParams">Optional query parameters to append to the request URL.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
public async Task<HttpOperationResponse<Stream>> ReadSubResourceAsStreamAsync(
string group,
string version,
string plural,
string name,
string subResource,
string? namespaceParameter,
IReadOnlyCollection<(string name, string value)>? queryParams = null,
CancellationToken cancellationToken = default
)
{
ArgumentException.ThrowIfNullOrWhiteSpace(group, nameof(group));
ArgumentException.ThrowIfNullOrWhiteSpace(version, nameof(version));
ArgumentException.ThrowIfNullOrWhiteSpace(plural, nameof(plural));
ArgumentException.ThrowIfNullOrWhiteSpace(name, nameof(name));
ArgumentException.ThrowIfNullOrWhiteSpace(subResource, nameof(subResource));

using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(HttpClientTimeout);
cancellationToken = cts.Token;

string url;
if (string.IsNullOrEmpty(namespaceParameter))
{
url = $"apis/{group}/{version}/{plural}/{name}/{subResource}";
}
else
{
url = $"apis/{group}/{version}/namespaces/{namespaceParameter}/{plural}/{name}/{subResource}";
}

var q = new QueryBuilder();
if (queryParams != null)
{
foreach (var (param, paramVal) in queryParams)
{
q.Append(param, paramVal);
}
}
url += q.ToString();

var httpResponse = await SendRequest<object?>(url, HttpMethod.Get, customHeaders : null, body: null, cancellationToken).ConfigureAwait(false);
var httpRequest = httpResponse.RequestMessage;
var result = new HttpOperationResponse<Stream>()
{
Request = httpRequest,
Response = httpResponse,
Body = await httpResponse.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false)
};
return result;
}

private sealed class QueryBuilder
{
private readonly List<string> _parameters = new List<string>();

public void Append(string key, int val)
{
_parameters.Add($"{key}={val}");
}

public void Append(string key, bool? val)
{
_parameters.Add($"{key}={(val == true ? "true" : "false")}");
}

public void Append(string key, string val)
{
_parameters.Add($"{key}={Uri.EscapeDataString(val)}");
}

public override string ToString()
{
if (_parameters.Count > 0)
{
return $"?{string.Join("&", _parameters)}";
}
else
{
return string.Empty;
}
}
}
}
2 changes: 1 addition & 1 deletion src/Aspire.Hosting/Dcp/DcpVersion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ namespace Aspire.Hosting.Dcp;

internal static class DcpVersion
{
public static Version MinimumVersionInclusive = new Version(0, 1, 52);
public static Version MinimumVersionInclusive = new Version(0, 1, 55);
}
Loading