From 292a2e0ab3e5a22bf44d6270a993d323cd61fb38 Mon Sep 17 00:00:00 2001 From: Thomas Boop <52323235+thboop@users.noreply.github.com> Date: Fri, 11 Mar 2022 09:41:54 -0500 Subject: [PATCH 1/4] Fix spelling (#1747) --- src/Runner.Worker/JobExtension.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Runner.Worker/JobExtension.cs b/src/Runner.Worker/JobExtension.cs index ce0f37feef3..6a41b450d15 100644 --- a/src/Runner.Worker/JobExtension.cs +++ b/src/Runner.Worker/JobExtension.cs @@ -406,7 +406,7 @@ public void FinalizeJob(IExecutionContext jobContext, Pipelines.AgentJobRequestM // create a new timeline record node for 'Finalize job' IExecutionContext context = jobContext.CreateChild(Guid.NewGuid(), "Complete job", $"{nameof(JobExtension)}_Final", null, null, ActionRunStage.Post); context.StepTelemetry.Type = "runner"; - context.StepTelemetry.Action = "complete_joh"; + context.StepTelemetry.Action = "complete_job"; using (var register = jobContext.CancellationToken.Register(() => { context.CancelToken(); })) { try From b2c6d093b2900b1a34036824a3e86565b5c7a21d Mon Sep 17 00:00:00 2001 From: Tingluo Huang Date: Mon, 14 Mar 2022 09:21:13 -0400 Subject: [PATCH 2/4] Validate packages hash before uploading to github release in CD workflow. (#1745) --- .github/workflows/release.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index dd4db371a39..677c6bf2a77 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -260,6 +260,17 @@ jobs: console.log(releaseNote) core.setOutput('version', runnerVersion); core.setOutput('note', releaseNote); + + - name: Validate Packages HASH + working-directory: _package + run: | + ls -l + echo "${{needs.build.outputs.win-x64-sha}} actions-runner-win-x64-${{ steps.releaseNote.outputs.version }}.zip" | shasum -a 256 -c + echo "${{needs.build.outputs.osx-x64-sha}} actions-runner-osx-x64-${{ steps.releaseNote.outputs.version }}.tar.gz" | shasum -a 256 -c + echo "${{needs.build.outputs.linux-x64-sha}} actions-runner-linux-x64-${{ steps.releaseNote.outputs.version }}.tar.gz" | shasum -a 256 -c + echo "${{needs.build.outputs.linux-arm-sha}} actions-runner-linux-arm-${{ steps.releaseNote.outputs.version }}.tar.gz" | shasum -a 256 -c + echo "${{needs.build.outputs.linux-arm64-sha}} actions-runner-linux-arm64-${{ steps.releaseNote.outputs.version }}.tar.gz" | shasum -a 256 -c + # Create GitHub release - uses: actions/create-release@master id: createRelease From a0458aebfe8a0f78c861b0d4fa48e86548495c04 Mon Sep 17 00:00:00 2001 From: Konrad Pabjan Date: Mon, 14 Mar 2022 11:20:11 -0400 Subject: [PATCH 3/4] Save record order for annotation links when creating issues (#1744) * Save record order for annotation links when creating issues * PR feedback * Add tests for step and line numbers --- src/Runner.Worker/ExecutionContext.cs | 14 +++---- src/Test/L0/Worker/ExecutionContextL0.cs | 49 ++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 7 deletions(-) diff --git a/src/Runner.Worker/ExecutionContext.cs b/src/Runner.Worker/ExecutionContext.cs index 48366b3104d..ace20cc773d 100644 --- a/src/Runner.Worker/ExecutionContext.cs +++ b/src/Runner.Worker/ExecutionContext.cs @@ -550,10 +550,15 @@ public void AddIssue(Issue issue, string logMessage = null) issue.Message = issue.Message[.._maxIssueMessageLength]; } + // Tracking the line number (logFileLineNumber) and step number (stepNumber) for each issue that gets created + // Actions UI from the run summary page use both values to easily link to an exact locations in logs where annotations originate from + if (_record.Order != null) + { + issue.Data["stepNumber"] = _record.Order.ToString(); + } + if (issue.Type == IssueType.Error) { - // tracking line number for each issue in log file - // log UI use this to navigate from issue to log if (!string.IsNullOrEmpty(logMessage)) { long logLineNumber = Write(WellKnownTags.Error, logMessage); @@ -569,8 +574,6 @@ public void AddIssue(Issue issue, string logMessage = null) } else if (issue.Type == IssueType.Warning) { - // tracking line number for each issue in log file - // log UI use this to navigate from issue to log if (!string.IsNullOrEmpty(logMessage)) { long logLineNumber = Write(WellKnownTags.Warning, logMessage); @@ -586,9 +589,6 @@ public void AddIssue(Issue issue, string logMessage = null) } else if (issue.Type == IssueType.Notice) { - - // tracking line number for each issue in log file - // log UI use this to navigate from issue to log if (!string.IsNullOrEmpty(logMessage)) { long logLineNumber = Write(WellKnownTags.Notice, logMessage); diff --git a/src/Test/L0/Worker/ExecutionContextL0.cs b/src/Test/L0/Worker/ExecutionContextL0.cs index e6351edb099..aa50c71fa27 100644 --- a/src/Test/L0/Worker/ExecutionContextL0.cs +++ b/src/Test/L0/Worker/ExecutionContextL0.cs @@ -144,6 +144,55 @@ public void AddIssue_TrimMessageSize() } } + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Worker")] + public void AddIssue_AddStepAndLineNumberInformation() + { + using (TestHostContext hc = CreateTestContext()) + { + + TaskOrchestrationPlanReference plan = new TaskOrchestrationPlanReference(); + TimelineReference timeline = new TimelineReference(); + Guid jobId = Guid.NewGuid(); + string jobName = "some job name"; + var jobRequest = new Pipelines.AgentJobRequestMessage(plan, timeline, jobId, jobName, jobName, null, null, null, new Dictionary(), new List(), new Pipelines.JobResources(), new Pipelines.ContextData.DictionaryContextData(), new Pipelines.WorkspaceOptions(), new List(), null, null, null, null); + jobRequest.Resources.Repositories.Add(new Pipelines.RepositoryResource() + { + Alias = Pipelines.PipelineConstants.SelfAlias, + Id = "github", + Version = "sha1" + }); + jobRequest.ContextData["github"] = new Pipelines.ContextData.DictionaryContextData(); + + // Arrange: Setup the paging logger. + var pagingLogger = new Mock(); + var pagingLogger2 = new Mock(); + var jobServerQueue = new Mock(); + jobServerQueue.Setup(x => x.QueueTimelineRecordUpdate(It.IsAny(), It.IsAny())); + + hc.EnqueueInstance(pagingLogger.Object); + hc.EnqueueInstance(pagingLogger2.Object); + hc.SetSingleton(jobServerQueue.Object); + + var ec = new Runner.Worker.ExecutionContext(); + ec.Initialize(hc); + ec.InitializeJob(jobRequest, CancellationToken.None); + ec.Start(); + + var embeddedStep = ec.CreateChild(Guid.NewGuid(), "action_1_pre", "action_1_pre", null, null, ActionRunStage.Main, isEmbedded: true); + embeddedStep.Start(); + + embeddedStep.AddIssue(new Issue() { Type = IssueType.Error, Message = "error annotation that should have step and line number information" }); + embeddedStep.AddIssue(new Issue() { Type = IssueType.Warning, Message = "warning annotation that should have step and line number information" }); + embeddedStep.AddIssue(new Issue() { Type = IssueType.Notice, Message = "notice annotation that should have step and line number information" }); + + jobServerQueue.Verify(x => x.QueueTimelineRecordUpdate(It.IsAny(), It.Is(t => t.Issues.Where(i => i.Data.ContainsKey("stepNumber") && i.Data.ContainsKey("logFileLineNumber") && i.Type == IssueType.Error).Count() == 1)), Times.AtLeastOnce); + jobServerQueue.Verify(x => x.QueueTimelineRecordUpdate(It.IsAny(), It.Is(t => t.Issues.Where(i => i.Data.ContainsKey("stepNumber") && i.Data.ContainsKey("logFileLineNumber") && i.Type == IssueType.Warning).Count() == 1)), Times.AtLeastOnce); + jobServerQueue.Verify(x => x.QueueTimelineRecordUpdate(It.IsAny(), It.Is(t => t.Issues.Where(i => i.Data.ContainsKey("stepNumber") && i.Data.ContainsKey("logFileLineNumber") && i.Type == IssueType.Notice).Count() == 1)), Times.AtLeastOnce); + } + } + [Fact] [Trait("Level", "L0")] [Trait("Category", "Worker")] From ddc700e9eb2fe4ac88214841c15e1384bc7b64be Mon Sep 17 00:00:00 2001 From: Yashwanth Anantharaju Date: Tue, 15 Mar 2022 14:01:18 -0400 Subject: [PATCH 4/4] Send postlines via websocket if we can (#1730) * feed via websocket * feed via websocket * feedback * ensure right schema is used * fix resiliency * some fixes * fix sending message * chunk data * let's abort, which will also dispose * close gracefully --- src/Runner.Common/JobServerQueue.cs | 136 ++++++++++++++++++++++++++-- 1 file changed, 128 insertions(+), 8 deletions(-) diff --git a/src/Runner.Common/JobServerQueue.cs b/src/Runner.Common/JobServerQueue.cs index eb8fa9e6704..9f237279ba0 100644 --- a/src/Runner.Common/JobServerQueue.cs +++ b/src/Runner.Common/JobServerQueue.cs @@ -3,10 +3,14 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Net.WebSockets; +using System.Text; using System.Threading; using System.Threading.Tasks; using GitHub.DistributedTask.WebApi; using GitHub.Runner.Sdk; +using GitHub.Services.Common; +using Newtonsoft.Json; using Pipelines = GitHub.DistributedTask.Pipelines; namespace GitHub.Runner.Common @@ -30,6 +34,11 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan.FromMilliseconds(500); private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan.FromMilliseconds(500); private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan.FromMilliseconds(1000); + private static readonly TimeSpan _minDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(1); + private static readonly TimeSpan _maxDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(500); + + private static readonly int _minWebsocketFailurePercentageAllowed = 50; + private static readonly int _minWebsocketBatchedLinesCountToConsider = 5; // Job message information private Guid _scopeIdentifier; @@ -58,6 +67,8 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue private Task _fileUploadDequeueTask; private Task _timelineUpdateDequeueTask; + private Task _websocketConnectTask = null; + // common private IJobServer _jobServer; private Task[] _allDequeueTasks; @@ -71,7 +82,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue // Web console dequeue will start with process queue every 250ms for the first 60*4 times (~60 seconds). // Then the dequeue will happen every 500ms. - // In this way, customer still can get instance live console output on job start, + // In this way, customer still can get instance live console output on job start, // at the same time we can cut the load to server after the build run for more than 60s private int _webConsoleLineAggressiveDequeueCount = 0; private const int _webConsoleLineAggressiveDequeueLimit = 4 * 60; @@ -79,6 +90,13 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue private bool _webConsoleLineAggressiveDequeue = true; private bool _firstConsoleOutputs = true; + private int totalBatchedLinesAttemptedByWebsocket = 0; + private int failedAttemptsToPostBatchedLinesByWebsocket = 0; + + private ClientWebSocket _websocketClient = null; + + private ServiceEndpoint _serviceEndPoint; + public override void Initialize(IHostContext hostContext) { base.Initialize(hostContext); @@ -89,6 +107,10 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest) { Trace.Entering(); + this._serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); + + InitializeWebsocket(); + if (_queueInProcess) { Trace.Info("No-opt, all queue process tasks are running."); @@ -156,6 +178,9 @@ public async Task ShutdownAsync() await ProcessTimelinesUpdateQueueAsync(runOnce: true); Trace.Info("Timeline update queue drained."); + Trace.Info($"Disposing websocket client ..."); + this._websocketClient?.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Shutdown", CancellationToken.None); + Trace.Info("All queue process tasks have been stopped, and all queues are drained."); } @@ -292,14 +317,69 @@ private async Task ProcessWebConsoleLinesQueueAsync(bool runOnce = false) { try { - // we will not requeue failed batch, since the web console lines are time sensitive. - if (batch[0].LineNumber.HasValue) + if (this._websocketConnectTask != null) { - await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value, default(CancellationToken)); + // lazily await here, we are already in the background task here + await this._websocketConnectTask; } - else + + var pushedLinesViaWebsocket = false; + if (this._websocketClient != null) + { + var linesWrapper = batch[0].LineNumber.HasValue? new TimelineRecordFeedLinesWrapper(stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value): + new TimelineRecordFeedLinesWrapper(stepRecordId, batch.Select(logLine => logLine.Line).ToList()); + var jsonData = StringUtil.ConvertToJson(linesWrapper); + try + { + totalBatchedLinesAttemptedByWebsocket++; + var jsonDataBytes = Encoding.UTF8.GetBytes(jsonData); + // break the message into chunks of 1024 bytes + for (var i = 0; i < jsonDataBytes.Length; i += 1 * 1024) + { + var lastChunk = i + (1 * 1024) >= jsonDataBytes.Length; + var chunk = new ArraySegment(jsonDataBytes, i, Math.Min(1 * 1024, jsonDataBytes.Length - i)); + await this._websocketClient.SendAsync(chunk, WebSocketMessageType.Text, endOfMessage:lastChunk, CancellationToken.None); + } + + pushedLinesViaWebsocket = true; + } + catch (Exception ex) + { + Trace.Info($"Caught exception during append web console line to websocket, let's fallback to sending via non-websocket call (total calls: {totalBatchedLinesAttemptedByWebsocket}, failed calls: {failedAttemptsToPostBatchedLinesByWebsocket}, websocket state: {this._websocketClient?.State})."); + Trace.Error(ex); + failedAttemptsToPostBatchedLinesByWebsocket++; + if (totalBatchedLinesAttemptedByWebsocket > _minWebsocketBatchedLinesCountToConsider) + { + // let's consider failure percentage + if (failedAttemptsToPostBatchedLinesByWebsocket * 100 / totalBatchedLinesAttemptedByWebsocket > _minWebsocketFailurePercentageAllowed) + { + Trace.Info($"Exhausted websocket allowed retries, we will not attempt websocket connection for this job to post lines again."); + this._websocketClient?.CloseOutputAsync(WebSocketCloseStatus.InternalServerError, "Shutdown due to failures", CancellationToken.None); + this._websocketClient = null; + } + } + + if (this._websocketClient != null) + { + var delay = BackoffTimerHelper.GetRandomBackoff(_minDelayForWebsocketReconnect, _maxDelayForWebsocketReconnect); + Trace.Info($"Websocket is not open, let's attempt to connect back again with random backoff {delay} ms (total calls: {totalBatchedLinesAttemptedByWebsocket}, failed calls: {failedAttemptsToPostBatchedLinesByWebsocket})."); + InitializeWebsocket(delay); + } + } + } + + // if we can't push via websocket, let's fallback to posting via REST API + if (!pushedLinesViaWebsocket) { - await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), default(CancellationToken)); + // we will not requeue failed batch, since the web console lines are time sensitive. + if (batch[0].LineNumber.HasValue) + { + await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value, default(CancellationToken)); + } + else + { + await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), default(CancellationToken)); + } } if (_firstConsoleOutputs) @@ -391,6 +471,46 @@ private async Task ProcessFilesUploadQueueAsync(bool runOnce = false) } } + private void InitializeWebsocket(TimeSpan? delay = null) + { + if (_serviceEndPoint.Authorization != null && + _serviceEndPoint.Authorization.Parameters.TryGetValue(EndpointAuthorizationParameters.AccessToken, out var accessToken) && + !string.IsNullOrEmpty(accessToken)) + { + if (_serviceEndPoint.Data.TryGetValue("FeedStreamUrl", out var feedStreamUrl) && !string.IsNullOrEmpty(feedStreamUrl)) + { + // let's ensure we use the right scheme + feedStreamUrl = feedStreamUrl.Replace("https://", "wss://").Replace("http://", "ws://"); + Trace.Info($"Creating websocket client ..." + feedStreamUrl); + this._websocketClient = new ClientWebSocket(); + this._websocketClient.Options.SetRequestHeader("Authorization", $"Bearer {accessToken}"); + this._websocketConnectTask = Task.Run(async () => + { + try + { + Trace.Info($"Attempting to start websocket client with delay {delay}."); + await Task.Delay(delay ?? TimeSpan.Zero); + await this._websocketClient.ConnectAsync(new Uri(feedStreamUrl), default(CancellationToken)); + Trace.Info($"Successfully started websocket client."); + } + catch(Exception ex) + { + Trace.Info("Exception caught during websocket client connect, fallback of HTTP would be used now instead of websocket."); + Trace.Error(ex); + } + }); + } + else + { + Trace.Info($"No FeedStreamUrl found, so we will use Rest API calls for sending feed data"); + } + } + else + { + Trace.Info($"No access token from the service endpoint"); + } + } + private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false) { while (!_jobCompletionSource.Task.IsCompleted || runOnce) @@ -489,8 +609,8 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false) if (runOnce) { - // continue process timeline records update, - // we might have more records need update, + // continue process timeline records update, + // we might have more records need update, // since we just create a new sub-timeline if (pendingSubtimelineUpdate) {