Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/actions/runner into fhammer…
Browse files Browse the repository at this point in the history
…l/devcontainers
  • Loading branch information
fhammerl committed Mar 17, 2022
2 parents 4c5730f + ddc700e commit 4d76eb4
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 16 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,17 @@ jobs:
console.log(releaseNote)
core.setOutput('version', runnerVersion);
core.setOutput('note', releaseNote);
- name: Validate Packages HASH
working-directory: _package
run: |
ls -l
echo "${{needs.build.outputs.win-x64-sha}} actions-runner-win-x64-${{ steps.releaseNote.outputs.version }}.zip" | shasum -a 256 -c
echo "${{needs.build.outputs.osx-x64-sha}} actions-runner-osx-x64-${{ steps.releaseNote.outputs.version }}.tar.gz" | shasum -a 256 -c
echo "${{needs.build.outputs.linux-x64-sha}} actions-runner-linux-x64-${{ steps.releaseNote.outputs.version }}.tar.gz" | shasum -a 256 -c
echo "${{needs.build.outputs.linux-arm-sha}} actions-runner-linux-arm-${{ steps.releaseNote.outputs.version }}.tar.gz" | shasum -a 256 -c
echo "${{needs.build.outputs.linux-arm64-sha}} actions-runner-linux-arm64-${{ steps.releaseNote.outputs.version }}.tar.gz" | shasum -a 256 -c
# Create GitHub release
- uses: actions/create-release@master
id: createRelease
Expand Down
136 changes: 128 additions & 8 deletions src/Runner.Common/JobServerQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using Newtonsoft.Json;
using Pipelines = GitHub.DistributedTask.Pipelines;

namespace GitHub.Runner.Common
Expand All @@ -30,6 +34,11 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan.FromMilliseconds(500);
private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan.FromMilliseconds(500);
private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan.FromMilliseconds(1000);
private static readonly TimeSpan _minDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(1);
private static readonly TimeSpan _maxDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(500);

private static readonly int _minWebsocketFailurePercentageAllowed = 50;
private static readonly int _minWebsocketBatchedLinesCountToConsider = 5;

// Job message information
private Guid _scopeIdentifier;
Expand Down Expand Up @@ -58,6 +67,8 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
private Task _fileUploadDequeueTask;
private Task _timelineUpdateDequeueTask;

private Task _websocketConnectTask = null;

// common
private IJobServer _jobServer;
private Task[] _allDequeueTasks;
Expand All @@ -71,14 +82,21 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue

// Web console dequeue will start with process queue every 250ms for the first 60*4 times (~60 seconds).
// Then the dequeue will happen every 500ms.
// In this way, customer still can get instance live console output on job start,
// In this way, customer still can get instance live console output on job start,
// at the same time we can cut the load to server after the build run for more than 60s
private int _webConsoleLineAggressiveDequeueCount = 0;
private const int _webConsoleLineAggressiveDequeueLimit = 4 * 60;
private const int _webConsoleLineQueueSizeLimit = 1024;
private bool _webConsoleLineAggressiveDequeue = true;
private bool _firstConsoleOutputs = true;

private int totalBatchedLinesAttemptedByWebsocket = 0;
private int failedAttemptsToPostBatchedLinesByWebsocket = 0;

private ClientWebSocket _websocketClient = null;

private ServiceEndpoint _serviceEndPoint;

public override void Initialize(IHostContext hostContext)
{
base.Initialize(hostContext);
Expand All @@ -89,6 +107,10 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest)
{
Trace.Entering();

this._serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));

InitializeWebsocket();

if (_queueInProcess)
{
Trace.Info("No-opt, all queue process tasks are running.");
Expand Down Expand Up @@ -156,6 +178,9 @@ public async Task ShutdownAsync()
await ProcessTimelinesUpdateQueueAsync(runOnce: true);
Trace.Info("Timeline update queue drained.");

Trace.Info($"Disposing websocket client ...");
this._websocketClient?.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Shutdown", CancellationToken.None);

Trace.Info("All queue process tasks have been stopped, and all queues are drained.");
}

Expand Down Expand Up @@ -292,14 +317,69 @@ private async Task ProcessWebConsoleLinesQueueAsync(bool runOnce = false)
{
try
{
// we will not requeue failed batch, since the web console lines are time sensitive.
if (batch[0].LineNumber.HasValue)
if (this._websocketConnectTask != null)
{
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value, default(CancellationToken));
// lazily await here, we are already in the background task here
await this._websocketConnectTask;
}
else

var pushedLinesViaWebsocket = false;
if (this._websocketClient != null)
{
var linesWrapper = batch[0].LineNumber.HasValue? new TimelineRecordFeedLinesWrapper(stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value):
new TimelineRecordFeedLinesWrapper(stepRecordId, batch.Select(logLine => logLine.Line).ToList());
var jsonData = StringUtil.ConvertToJson(linesWrapper);
try
{
totalBatchedLinesAttemptedByWebsocket++;
var jsonDataBytes = Encoding.UTF8.GetBytes(jsonData);
// break the message into chunks of 1024 bytes
for (var i = 0; i < jsonDataBytes.Length; i += 1 * 1024)
{
var lastChunk = i + (1 * 1024) >= jsonDataBytes.Length;
var chunk = new ArraySegment<byte>(jsonDataBytes, i, Math.Min(1 * 1024, jsonDataBytes.Length - i));
await this._websocketClient.SendAsync(chunk, WebSocketMessageType.Text, endOfMessage:lastChunk, CancellationToken.None);
}

pushedLinesViaWebsocket = true;
}
catch (Exception ex)
{
Trace.Info($"Caught exception during append web console line to websocket, let's fallback to sending via non-websocket call (total calls: {totalBatchedLinesAttemptedByWebsocket}, failed calls: {failedAttemptsToPostBatchedLinesByWebsocket}, websocket state: {this._websocketClient?.State}).");
Trace.Error(ex);
failedAttemptsToPostBatchedLinesByWebsocket++;
if (totalBatchedLinesAttemptedByWebsocket > _minWebsocketBatchedLinesCountToConsider)
{
// let's consider failure percentage
if (failedAttemptsToPostBatchedLinesByWebsocket * 100 / totalBatchedLinesAttemptedByWebsocket > _minWebsocketFailurePercentageAllowed)
{
Trace.Info($"Exhausted websocket allowed retries, we will not attempt websocket connection for this job to post lines again.");
this._websocketClient?.CloseOutputAsync(WebSocketCloseStatus.InternalServerError, "Shutdown due to failures", CancellationToken.None);
this._websocketClient = null;
}
}

if (this._websocketClient != null)
{
var delay = BackoffTimerHelper.GetRandomBackoff(_minDelayForWebsocketReconnect, _maxDelayForWebsocketReconnect);
Trace.Info($"Websocket is not open, let's attempt to connect back again with random backoff {delay} ms (total calls: {totalBatchedLinesAttemptedByWebsocket}, failed calls: {failedAttemptsToPostBatchedLinesByWebsocket}).");
InitializeWebsocket(delay);
}
}
}

// if we can't push via websocket, let's fallback to posting via REST API
if (!pushedLinesViaWebsocket)
{
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), default(CancellationToken));
// we will not requeue failed batch, since the web console lines are time sensitive.
if (batch[0].LineNumber.HasValue)
{
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value, default(CancellationToken));
}
else
{
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), default(CancellationToken));
}
}

if (_firstConsoleOutputs)
Expand Down Expand Up @@ -391,6 +471,46 @@ private async Task ProcessFilesUploadQueueAsync(bool runOnce = false)
}
}

private void InitializeWebsocket(TimeSpan? delay = null)
{
if (_serviceEndPoint.Authorization != null &&
_serviceEndPoint.Authorization.Parameters.TryGetValue(EndpointAuthorizationParameters.AccessToken, out var accessToken) &&
!string.IsNullOrEmpty(accessToken))
{
if (_serviceEndPoint.Data.TryGetValue("FeedStreamUrl", out var feedStreamUrl) && !string.IsNullOrEmpty(feedStreamUrl))
{
// let's ensure we use the right scheme
feedStreamUrl = feedStreamUrl.Replace("https://", "wss://").Replace("http://", "ws://");
Trace.Info($"Creating websocket client ..." + feedStreamUrl);
this._websocketClient = new ClientWebSocket();
this._websocketClient.Options.SetRequestHeader("Authorization", $"Bearer {accessToken}");
this._websocketConnectTask = Task.Run(async () =>
{
try
{
Trace.Info($"Attempting to start websocket client with delay {delay}.");
await Task.Delay(delay ?? TimeSpan.Zero);
await this._websocketClient.ConnectAsync(new Uri(feedStreamUrl), default(CancellationToken));
Trace.Info($"Successfully started websocket client.");
}
catch(Exception ex)
{
Trace.Info("Exception caught during websocket client connect, fallback of HTTP would be used now instead of websocket.");
Trace.Error(ex);
}
});
}
else
{
Trace.Info($"No FeedStreamUrl found, so we will use Rest API calls for sending feed data");
}
}
else
{
Trace.Info($"No access token from the service endpoint");
}
}

private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
{
while (!_jobCompletionSource.Task.IsCompleted || runOnce)
Expand Down Expand Up @@ -489,8 +609,8 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)

if (runOnce)
{
// continue process timeline records update,
// we might have more records need update,
// continue process timeline records update,
// we might have more records need update,
// since we just create a new sub-timeline
if (pendingSubtimelineUpdate)
{
Expand Down
14 changes: 7 additions & 7 deletions src/Runner.Worker/ExecutionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -550,10 +550,15 @@ public void AddIssue(Issue issue, string logMessage = null)
issue.Message = issue.Message[.._maxIssueMessageLength];
}

// Tracking the line number (logFileLineNumber) and step number (stepNumber) for each issue that gets created
// Actions UI from the run summary page use both values to easily link to an exact locations in logs where annotations originate from
if (_record.Order != null)
{
issue.Data["stepNumber"] = _record.Order.ToString();
}

if (issue.Type == IssueType.Error)
{
// tracking line number for each issue in log file
// log UI use this to navigate from issue to log
if (!string.IsNullOrEmpty(logMessage))
{
long logLineNumber = Write(WellKnownTags.Error, logMessage);
Expand All @@ -569,8 +574,6 @@ public void AddIssue(Issue issue, string logMessage = null)
}
else if (issue.Type == IssueType.Warning)
{
// tracking line number for each issue in log file
// log UI use this to navigate from issue to log
if (!string.IsNullOrEmpty(logMessage))
{
long logLineNumber = Write(WellKnownTags.Warning, logMessage);
Expand All @@ -586,9 +589,6 @@ public void AddIssue(Issue issue, string logMessage = null)
}
else if (issue.Type == IssueType.Notice)
{

// tracking line number for each issue in log file
// log UI use this to navigate from issue to log
if (!string.IsNullOrEmpty(logMessage))
{
long logLineNumber = Write(WellKnownTags.Notice, logMessage);
Expand Down
2 changes: 1 addition & 1 deletion src/Runner.Worker/JobExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ public void FinalizeJob(IExecutionContext jobContext, Pipelines.AgentJobRequestM
// create a new timeline record node for 'Finalize job'
IExecutionContext context = jobContext.CreateChild(Guid.NewGuid(), "Complete job", $"{nameof(JobExtension)}_Final", null, null, ActionRunStage.Post);
context.StepTelemetry.Type = "runner";
context.StepTelemetry.Action = "complete_joh";
context.StepTelemetry.Action = "complete_job";
using (var register = jobContext.CancellationToken.Register(() => { context.CancelToken(); }))
{
try
Expand Down
Loading

0 comments on commit 4d76eb4

Please sign in to comment.