Skip to content

Commit

Permalink
Async Result loop
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo committed Jun 19, 2024
1 parent d5df95c commit b11c0a1
Showing 1 changed file with 122 additions and 98 deletions.
220 changes: 122 additions & 98 deletions Client/src/Unified/Services/Submitter/Service.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public Service(Properties properties,
}));


HandlerResponse = Task.Run(ResultTask,
HandlerResponse = Task.Run(() => ResultTask(CancellationResultTaskSource.Token),
CancellationResultTaskSource.Token);
}

Expand Down Expand Up @@ -451,10 +451,12 @@ private ServiceResult Execute(string methodName,
/// <param name="responseHandler">The action to take when a response is received.</param>
/// <param name="errorHandler">The action to take when an error occurs.</param>
/// <param name="chunkResultSize">The size of the chunk to retrieve results in.</param>
private void ProxyTryGetResults(IEnumerable<string> taskIds,
Action<string, byte[]> responseHandler,
Action<string, TaskStatus, string> errorHandler,
int chunkResultSize = 200)
/// <param name="cancellationToken"></param>
private async Task ProxyTryGetResults(IEnumerable<string> taskIds,
Action<string, byte[]> responseHandler,
Action<string, TaskStatus, string> errorHandler,
int chunkResultSize = 200,
CancellationToken cancellationToken = default)
{
var missing = new HashSet<string>(taskIds);
var holdPrev = missing.Count;
Expand All @@ -474,38 +476,46 @@ private void ProxyTryGetResults(IEnumerable<string> taskIds,
foreach (var bucket in missing.ToList()
.ToChunks(chunkResultSize))
{
var resultStatusCollection = SessionService.GetResultStatus(bucket);
var resultStatusCollection = await SessionService.GetResultStatusAsync(bucket,
cancellationToken)
.ConfigureAwait(false);

foreach (var resultStatusData in resultStatusCollection.IdsReady)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
Logger.LogTrace("Response handler for {taskId}",
resultStatusData.TaskId);
responseHandler(resultStatusData.TaskId,
Retry.WhileException(5,
2000,
retry =>
{
if (retry > 1)
{
Logger.LogWarning("Try {try} for {funcName}",
retry,
nameof(SessionService.TryGetResultAsync));
}
return SessionService.TryGetResultAsync(new ResultRequest
{
ResultId = resultStatusData.ResultId,
Session = SessionId,
},
CancellationToken.None)
.Result;
},
true,
Logger,
typeof(IOException),
typeof(RpcException))!);
await Retry.WhileException(5,
2000,
retry =>
{
if (retry > 1)
{
Logger.LogWarning("Try {try} for {funcName}",
retry,
nameof(SessionService.TryGetResultAsync));
}
return SessionService.TryGetResultAsync(new ResultRequest
{
ResultId = resultStatusData.ResultId,
Session = SessionId,
},
cancellationToken);
},
true,
Logger,
cancellationToken,
typeof(IOException),
typeof(RpcException))
.ConfigureAwait(false)!);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception e)
{
Expand Down Expand Up @@ -533,7 +543,9 @@ private void ProxyTryGetResults(IEnumerable<string> taskIds,
{
string details;

var taskStatus = SessionService.GetTaskStatus(resultStatusData.TaskId);
var taskStatus = await SessionService.GetTaskStatusAsync(resultStatusData.TaskId,
cancellationToken)
.ConfigureAwait(false);

switch (taskStatus)
{
Expand All @@ -542,7 +554,9 @@ private void ProxyTryGetResults(IEnumerable<string> taskIds,
details = $"Task {resultStatusData.TaskId} was canceled";
break;
default:
var outputInfo = SessionService.GetTaskOutputInfo(resultStatusData.TaskId);
var outputInfo = await SessionService.GetTaskOutputInfoAsync(resultStatusData.TaskId,
cancellationToken)
.ConfigureAwait(false);
details = outputInfo.TypeCase == Output.TypeOneofCase.Error
? outputInfo.Error.Details
: "Result is in status : " + resultStatusData.Status + ", look for task in error in logs.";
Expand Down Expand Up @@ -572,6 +586,7 @@ private void ProxyTryGetResults(IEnumerable<string> taskIds,

foreach (var resultStatusData in resultStatusCollection.Canceled)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
errorHandler(resultStatusData.TaskId,
Expand Down Expand Up @@ -605,89 +620,98 @@ private void ProxyTryGetResults(IEnumerable<string> taskIds,

holdPrev = missing.Count;

Thread.Sleep(waitInSeconds[idx]);
await Task.Delay(waitInSeconds[idx],
cancellationToken)
.ConfigureAwait(false);
}
}
}

private void ResultTask()
private async Task ResultTask(CancellationToken cancellationToken)
{
while (!(CancellationResultTaskSource.Token.IsCancellationRequested && ResultHandlerDictionary.IsEmpty))
{
try
{
if (!ResultHandlerDictionary.IsEmpty)
{
ProxyTryGetResults(ResultHandlerDictionary.Keys.ToList(),
(taskId,
byteResult) =>
{
try
{
var result = ProtoSerializer.Deserialize<object?[]>(byteResult);
ResultHandlerDictionary[taskId]
.HandleResponse(result![0],
taskId);
}
catch (Exception e)
{
const ArmonikStatusCode statusCode = ArmonikStatusCode.Unknown;
ServiceInvocationException ex;
var ae = e as AggregateException;
if (ae is not null && ae.InnerExceptions.Count > 1)
{
ex = new ServiceInvocationException(ae,
statusCode);
}
else if (ae is not null)
{
ex = new ServiceInvocationException(ae.InnerException ?? ae,
statusCode);
}
else
{
ex = new ServiceInvocationException(e,
statusCode);
}
ResultHandlerDictionary[taskId]
.HandleError(ex,
taskId);
}
finally
{
ResultHandlerDictionary.TryRemove(taskId,
out _);
}
},
(taskId,
taskStatus,
ex) =>
{
try
{
var statusCode = taskStatus.ToArmonikStatusCode();
ResultHandlerDictionary[taskId]
.HandleError(new ServiceInvocationException(ex,
statusCode),
taskId);
}
finally
{
ResultHandlerDictionary.TryRemove(taskId,
out _);
}
});
await ProxyTryGetResults(ResultHandlerDictionary.Keys.ToList(),
(taskId,
byteResult) =>
{
try
{
var result = ProtoSerializer.Deserialize<object?[]>(byteResult);
ResultHandlerDictionary[taskId]
.HandleResponse(result![0],
taskId);
}
catch (Exception e)
{
const ArmonikStatusCode statusCode = ArmonikStatusCode.Unknown;
ServiceInvocationException ex;
var ae = e as AggregateException;
if (ae is not null && ae.InnerExceptions.Count > 1)
{
ex = new ServiceInvocationException(ae,
statusCode);
}
else if (ae is not null)
{
ex = new ServiceInvocationException(ae.InnerException ?? ae,
statusCode);
}
else
{
ex = new ServiceInvocationException(e,
statusCode);
}
ResultHandlerDictionary[taskId]
.HandleError(ex,
taskId);
}
finally
{
ResultHandlerDictionary.TryRemove(taskId,
out _);
}
},
(taskId,
taskStatus,
ex) =>
{
try
{
var statusCode = taskStatus.ToArmonikStatusCode();
ResultHandlerDictionary[taskId]
.HandleError(new ServiceInvocationException(ex,
statusCode),
taskId);
}
finally
{
ResultHandlerDictionary.TryRemove(taskId,
out _);
}
},
cancellationToken: cancellationToken);
}
else
{
Thread.Sleep(100);
await Task.Delay(100,
cancellationToken)
.ConfigureAwait(false);
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
break;
}
catch (Exception e)
{
Logger.LogError("An error occurred while fetching results: {e}",
Expand Down

0 comments on commit b11c0a1

Please sign in to comment.