From 5c7015ac0944b07590f05ea9a4b03c2d14dfb9f4 Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Wed, 5 Jun 2024 21:25:03 +0200 Subject: [PATCH 1/6] Pipeline on all PRs --- .github/workflows/build.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b46235bb..c13248a5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -6,10 +6,6 @@ on: - main - "[0-9]+.[0-9]+.x" pull_request: - branches: - - main - - "[0-9]+.[0-9]+.x" - jobs: versionning: From 47df1225669d0ef36bce033ab781207240e82dbe Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Wed, 5 Jun 2024 12:21:09 +0200 Subject: [PATCH 2/6] Use Utils ObjectPool --- .../Common/Submitter/BaseClientSubmitter.cs | 75 +++---- Client/src/Common/Submitter/ChannelPool.cs | 188 ------------------ .../Submitter/ClientServiceConnector.cs | 38 +++- Client/src/Symphony/ArmonikSymphonyClient.cs | 5 +- .../Unified/Factory/SessionServiceFactory.cs | 5 +- .../Services/Admin/AdminMonitoringService.cs | 27 +-- .../src/Unified/Services/Submitter/Service.cs | 6 +- .../ArmoniK.DevelopmentKit.Common.csproj | 2 +- .../AggregationPriorityTest.cs | 6 +- 9 files changed, 105 insertions(+), 247 deletions(-) delete mode 100644 Client/src/Common/Submitter/ChannelPool.cs diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index f28fff6e..354e9dab 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -38,6 +38,7 @@ using Google.Protobuf; using Grpc.Core; +using Grpc.Net.Client; using JetBrains.Annotations; @@ -69,7 +70,7 @@ public abstract class BaseClientSubmitter /// /// The channel pool to use for creating clients /// - private ChannelPool? channelPool_; + private ObjectPool? channelPool_; /// /// Base Object for all Client submitter @@ -95,8 +96,8 @@ protected BaseClientSubmitter(Properties properties, TaskOptions.PartitionId, }); - configuration_ = ChannelPool.WithChannel(channel => new Results.ResultsClient(channel).GetServiceConfiguration(new Empty()) - .DataChunkMaxSize); + configuration_ = ChannelPool.WithInstance(channel => new Results.ResultsClient(channel).GetServiceConfiguration(new Empty()) + .DataChunkMaxSize); } private ILoggerFactory LoggerFactory { get; } @@ -115,7 +116,7 @@ protected BaseClientSubmitter(Properties properties, /// /// The channel pool to use for creating clients /// - public ChannelPool ChannelPool + public ObjectPool ChannelPool => channelPool_ ??= ClientServiceConnector.ControlPlaneConnectionPool(properties_, LoggerFactory); @@ -129,7 +130,7 @@ private Session CreateSession(IEnumerable partitionIds) { using var _ = Logger.LogFunction(); Logger.LogDebug("Creating Session... "); - using var channel = ChannelPool.GetChannel(); + using var channel = ChannelPool.Get(); var sessionsClient = new Sessions.SessionsClient(channel); var createSessionReply = sessionsClient.CreateSession(new CreateSessionRequest { @@ -167,7 +168,7 @@ public TaskStatus GetTaskStatus(string taskId) /// public IEnumerable> GetTaskStatues(params string[] taskIds) { - using var channel = ChannelPool.GetChannel(); + using var channel = ChannelPool.Get(); var tasksClient = new Tasks.TasksClient(channel); return tasksClient.ListTasks(new Filters { @@ -200,10 +201,10 @@ public IEnumerable> GetTaskStatues(params string[] tas // TODO: This function should not have Output as a return type because it is a gRPC type public Output GetTaskOutputInfo(string taskId) { - var getTaskResponse = ChannelPool.WithChannel(channel => new Tasks.TasksClient(channel).GetTask(new GetTaskRequest - { - TaskId = taskId, - })); + var getTaskResponse = ChannelPool.WithInstance(channel => new Tasks.TasksClient(channel).GetTask(new GetTaskRequest + { + TaskId = taskId, + })); return new Output { Error = new Output.Types.Error @@ -296,7 +297,7 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable taskIds, delayMs, retry => { - using var channel = ChannelPool.GetChannel(); + using var channel = ChannelPool.Get(); var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); if (retry > 1) @@ -565,7 +566,7 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, // TODO: use ListResult var idStatusPair = result2TaskDic.Keys.ParallelSelect(async resultId => { - using var channel = ChannelPool.GetChannel(); + using var channel = ChannelPool.Get(); var resultsClient = new Results.ResultsClient(channel); var result = await resultsClient.GetResultAsync(new GetResultRequest { @@ -640,14 +641,14 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, nameof(GetResultIds)); } - return ChannelPool.WithChannel(channel => new Tasks.TasksClient(channel).GetResultIds(new GetResultIdsRequest - { - TaskId = - { - taskIds, - }, - }) - .TaskResults); + return ChannelPool.WithInstance(channel => new Tasks.TasksClient(channel).GetResultIds(new GetResultIdsRequest + { + TaskId = + { + taskIds, + }, + }) + .TaskResults); }, true, Logger, @@ -680,7 +681,7 @@ public byte[] GetResult(string taskId, ResultId = resultId, Session = SessionId.Id, }; - using var channel = ChannelPool.GetChannel(); + using var channel = ChannelPool.Get(); var eventsClient = new Events.EventsClient(channel); eventsClient.WaitForResultsAsync(SessionId.Id, new List @@ -740,8 +741,9 @@ public IEnumerable> GetResults(IEnumerable taskIds public async Task TryGetResultAsync(ResultRequest resultRequest, CancellationToken cancellationToken = default) { - using var channel = ChannelPool.GetChannel(); - var resultsClient = new Results.ResultsClient(channel); + await using var channel = await ChannelPool.GetAsync(cancellationToken) + .ConfigureAwait(false); + var resultsClient = new Results.ResultsClient(channel); var getResultResponse = await resultsClient.GetResultAsync(new GetResultRequest { ResultId = resultRequest.ResultId, @@ -951,17 +953,18 @@ public IList> TryGetResults(IList resultIds) /// Dictionary where each result name is associated with its result id [PublicAPI] public Dictionary CreateResultsMetadata(IEnumerable resultNames) - => ChannelPool.WithChannel(c => new Results.ResultsClient(c).CreateResultsMetaData(new CreateResultsMetaDataRequest - { - SessionId = SessionId.Id, - Results = - { - resultNames.Select(name => new CreateResultsMetaDataRequest.Types.ResultCreate - { - Name = name, - }), - }, - })) + => ChannelPool.WithInstance(c => new Results.ResultsClient(c).CreateResultsMetaData(new CreateResultsMetaDataRequest + { + SessionId = SessionId.Id, + Results = + { + resultNames.Select(name + => new CreateResultsMetaDataRequest.Types.ResultCreate + { + Name = name, + }), + }, + })) .Results.ToDictionary(r => r.Name, r => r.ResultId); } diff --git a/Client/src/Common/Submitter/ChannelPool.cs b/Client/src/Common/Submitter/ChannelPool.cs deleted file mode 100644 index b7cba89c..00000000 --- a/Client/src/Common/Submitter/ChannelPool.cs +++ /dev/null @@ -1,188 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2024. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License") -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System; -using System.Collections.Concurrent; -using System.Diagnostics.CodeAnalysis; - -using Grpc.Net.Client; - -using Microsoft.Extensions.Logging; - -namespace ArmoniK.DevelopmentKit.Client.Common.Submitter; - -/// -/// Helper to have a connection pool for gRPC services -/// -public sealed class ChannelPool -{ - private readonly Func channelFactory_; - - private readonly ILogger? logger_; - - private readonly ConcurrentBag pool_; - - /// - /// Constructs a new channelPool - /// - /// Function used to create new channels - /// loggerFactory used to instantiate a logger for the pool - public ChannelPool(Func channelFactory, - ILoggerFactory? loggerFactory = null) - { - channelFactory_ = channelFactory; - pool_ = new ConcurrentBag(); - logger_ = loggerFactory?.CreateLogger(); - } - - /// - /// Get a channel from the pool. If the pool is empty, create a new channel - /// - /// A GrpcChannel used by nobody else - private GrpcChannel AcquireChannel() - { - if (pool_.TryTake(out var channel)) - { - if (ShutdownOnFailure(channel)) - { - logger_?.LogDebug("Got an invalid channel {channel} from pool", - channel); - } - else - { - logger_?.LogDebug("Acquired already existing channel {channel} from pool", - channel); - return channel; - } - } - - channel = channelFactory_(); - logger_?.LogInformation("Created and acquired new channel {channel} from pool", - channel); - return channel; - } - - /// - /// Release a GrpcChannel to the pool that could be reused later by someone else - /// - /// Channel to release - private void ReleaseChannel(GrpcChannel channel) - { - if (ShutdownOnFailure(channel)) - { - logger_?.LogDebug("Shutdown unhealthy channel {channel}", - channel); - } - else - { - logger_?.LogDebug("Released channel {channel} to pool", - channel); - pool_.Add(channel); - } - } - - /// - /// Check the state of a channel and shutdown it in case of failure - /// - /// Channel to check the state - /// True if the channel has been shut down - private static bool ShutdownOnFailure(GrpcChannel channel) - { - try - { -#if NET5_0_OR_GREATER - switch (channel.State) - { - case ConnectivityState.TransientFailure: - channel.ShutdownAsync() - .Wait(); - channel.Dispose(); - return true; - case ConnectivityState.Shutdown: - return true; - case ConnectivityState.Idle: - case ConnectivityState.Connecting: - case ConnectivityState.Ready: - default: - return false; - } -#else - _ = channel; - return false; -#endif - } - catch (InvalidOperationException) - { - return false; - } - } - - /// - /// Get a channel that will be automatically released when disposed - /// - /// - public ChannelGuard GetChannel() - => new(this); - - /// - /// Call f with an acquired channel - /// - /// Function to be called - /// Type of the return type of f - /// Value returned by f - public T WithChannel(Func f) - { - using var channel = GetChannel(); - return f(channel); - } - - /// - /// Helper class that acquires a channel from a pool when constructed, and releases it when disposed - /// - public sealed class ChannelGuard : IDisposable - { - /// - /// Channel that is used by nobody else - /// - [SuppressMessage("Usage", - "CA2213:Disposable fields should be disposed")] - private readonly GrpcChannel channel_; - - private readonly ChannelPool pool_; - - /// - /// Acquire a channel that will be released when disposed - /// - /// - public ChannelGuard(ChannelPool channelPool) - { - pool_ = channelPool; - channel_ = channelPool.AcquireChannel(); - } - - /// - public void Dispose() - => pool_.ReleaseChannel(channel_); - - /// - /// Implicit convert a ChannelGuard into a GrpcChannel - /// - /// ChannelGuard - /// GrpcChannel - public static implicit operator GrpcChannel(ChannelGuard guard) - => guard.channel_; - } -} diff --git a/Client/src/Common/Submitter/ClientServiceConnector.cs b/Client/src/Common/Submitter/ClientServiceConnector.cs index 641bbcc0..26e71c3f 100644 --- a/Client/src/Common/Submitter/ClientServiceConnector.cs +++ b/Client/src/Common/Submitter/ClientServiceConnector.cs @@ -14,8 +14,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +using System.Threading.Tasks; + using ArmoniK.Api.Client.Options; using ArmoniK.Api.Client.Submitter; +using ArmoniK.Utils; + +using Grpc.Net.Client; using Microsoft.Extensions.Logging; @@ -33,8 +38,8 @@ public class ClientServiceConnector /// Configuration Properties /// Optional logger factory /// The connection pool - public static ChannelPool ControlPlaneConnectionPool(Properties properties, - ILoggerFactory? loggerFactory = null) + public static ObjectPool ControlPlaneConnectionPool(Properties properties, + ILoggerFactory? loggerFactory = null) { var options = new GrpcClient { @@ -52,7 +57,32 @@ public static ChannelPool ControlPlaneConnectionPool(Properties properties, ProxyPassword = properties.ProxyPassword, }; - return new ChannelPool(() => GrpcChannelFactory.CreateChannel(options, - loggerFactory?.CreateLogger(typeof(ClientServiceConnector)))); + return new ObjectPool(ct => new ValueTask(GrpcChannelFactory.CreateChannel(options, + loggerFactory?.CreateLogger(typeof(ClientServiceConnector)))), + + +#if NET5_0_OR_GREATER + async (channel, ct) => + { +switch (channel.State) + { + case ConnectivityState.TransientFailure: + await channel.ShutdownAsync() + .ConfigureAwait(false); + return false; + case ConnectivityState.Shutdown: + return false; + case ConnectivityState.Idle: + case ConnectivityState.Connecting: + case ConnectivityState.Ready: + default: + return true; + } + } +#else + (_, + _) => new ValueTask(true) +#endif + ); } } diff --git a/Client/src/Symphony/ArmonikSymphonyClient.cs b/Client/src/Symphony/ArmonikSymphonyClient.cs index 38bdeeef..e7e163ea 100644 --- a/Client/src/Symphony/ArmonikSymphonyClient.cs +++ b/Client/src/Symphony/ArmonikSymphonyClient.cs @@ -18,6 +18,9 @@ using ArmoniK.DevelopmentKit.Client.Common; using ArmoniK.DevelopmentKit.Client.Common.Submitter; using ArmoniK.DevelopmentKit.Common; +using ArmoniK.Utils; + +using Grpc.Net.Client; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; @@ -63,7 +66,7 @@ public ArmonikSymphonyClient(IConfiguration configuration, /// public string SectionGrpc { get; set; } = "Grpc"; - private ChannelPool GrpcPool { get; set; } + private ObjectPool GrpcPool { get; set; } private IConfiguration Configuration { get; } diff --git a/Client/src/Unified/Factory/SessionServiceFactory.cs b/Client/src/Unified/Factory/SessionServiceFactory.cs index ab8a2844..8f15d275 100644 --- a/Client/src/Unified/Factory/SessionServiceFactory.cs +++ b/Client/src/Unified/Factory/SessionServiceFactory.cs @@ -20,9 +20,12 @@ using ArmoniK.DevelopmentKit.Client.Unified.Services; using ArmoniK.DevelopmentKit.Client.Unified.Services.Admin; using ArmoniK.DevelopmentKit.Common; +using ArmoniK.Utils; using Google.Protobuf.WellKnownTypes; +using Grpc.Net.Client; + using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -51,7 +54,7 @@ public SessionServiceFactory(ILoggerFactory? loggerFactory = null) private ILogger Logger { get; } - private ChannelPool? GrpcPool { get; set; } + private ObjectPool? GrpcPool { get; set; } private ILoggerFactory LoggerFactory { get; } diff --git a/Client/src/Unified/Services/Admin/AdminMonitoringService.cs b/Client/src/Unified/Services/Admin/AdminMonitoringService.cs index 3447329f..b9577179 100644 --- a/Client/src/Unified/Services/Admin/AdminMonitoringService.cs +++ b/Client/src/Unified/Services/Admin/AdminMonitoringService.cs @@ -24,6 +24,9 @@ using ArmoniK.Api.gRPC.V1.SortDirection; using ArmoniK.Api.gRPC.V1.Tasks; using ArmoniK.DevelopmentKit.Client.Common.Submitter; +using ArmoniK.Utils; + +using Grpc.Net.Client; using Microsoft.Extensions.Logging; @@ -39,15 +42,15 @@ namespace ArmoniK.DevelopmentKit.Client.Unified.Services.Admin; /// public class AdminMonitoringService { - private readonly ChannelPool channelPool_; + private readonly ObjectPool channelPool_; /// /// The constructor to instantiate this service /// /// The entry point to the control plane /// The factory logger to create logger - public AdminMonitoringService(ChannelPool channelPool, - ILoggerFactory? loggerFactory = null) + public AdminMonitoringService(ObjectPool channelPool, + ILoggerFactory? loggerFactory = null) { Logger = loggerFactory?.CreateLogger(); channelPool_ = channelPool; @@ -61,7 +64,7 @@ public AdminMonitoringService(ChannelPool channelPool, /// public void GetServiceConfiguration() { - using var channel = channelPool_.GetChannel(); + using var channel = channelPool_.Get(); var resultsClient = new Results.ResultsClient(channel); var configuration = resultsClient.GetServiceConfiguration(new Empty()); Logger?.LogInformation($"This configuration will be update in the nex version [ {configuration} ]"); @@ -74,7 +77,7 @@ public void GetServiceConfiguration() /// the sessionId of the session to cancel public void CancelSession(string sessionId) { - using var channel = channelPool_.GetChannel(); + using var channel = channelPool_.Get(); var sessionsClient = new Sessions.SessionsClient(channel); sessionsClient.CancelSession(new CancelSessionRequest { @@ -98,7 +101,7 @@ public IEnumerable ListAllTasksBySession(string sessionId) public IEnumerable ListTasksBySession(string sessionId, params TaskStatus[] taskStatus) { - using var channel = channelPool_.GetChannel(); + using var channel = channelPool_.Get(); var tasksClient = new Tasks.TasksClient(channel); return tasksClient.ListTasks(new Filters @@ -159,7 +162,7 @@ public IEnumerable ListCancelledTasks(string sessionId) /// The list of filtered session public IEnumerable ListAllSessions() { - using var channel = channelPool_.GetChannel(); + using var channel = channelPool_.Get(); var sessionsClient = new Sessions.SessionsClient(channel); return sessionsClient.ListSessions(new ListSessionsRequest()) .Sessions.Select(session => session.SessionId); @@ -172,7 +175,7 @@ public IEnumerable ListAllSessions() /// returns a list of session filtered public IEnumerable ListRunningSessions() { - using var channel = channelPool_.GetChannel(); + using var channel = channelPool_.Get(); var sessionsClient = new Sessions.SessionsClient(channel); return sessionsClient.ListSessions(new ListSessionsRequest { @@ -213,7 +216,7 @@ public IEnumerable ListRunningSessions() /// returns a list of session filtered public IEnumerable ListCancelledSessions() { - using var channel = channelPool_.GetChannel(); + using var channel = channelPool_.Get(); var sessionsClient = new Sessions.SessionsClient(channel); return sessionsClient.ListSessions(new ListSessionsRequest { @@ -281,7 +284,7 @@ public int CountErrorTasksBySession(string sessionId) public int CountTaskBySession(string sessionId, params TaskStatus[] taskStatus) { - using var channel = channelPool_.GetChannel(); + using var channel = channelPool_.Get(); var tasksClient = new Tasks.TasksClient(channel); return tasksClient.CountTasksByStatus(new CountTasksByStatusRequest { @@ -322,7 +325,7 @@ public int CountCompletedTasksBySession(string sessionId) /// the taskIds list to cancel public void CancelTasksBySession(IEnumerable taskIds) { - using var channel = channelPool_.GetChannel(); + using var channel = channelPool_.Get(); var tasksClient = new Tasks.TasksClient(channel); tasksClient.CancelTasks(new CancelTasksRequest { @@ -340,7 +343,7 @@ public void CancelTasksBySession(IEnumerable taskIds) /// returns a list of pair TaskId/TaskStatus public IEnumerable> GetTaskStatus(IEnumerable taskIds) { - using var channel = channelPool_.GetChannel(); + using var channel = channelPool_.Get(); var tasksClient = new Tasks.TasksClient(channel); return tasksClient.ListTasks(new Filters { diff --git a/Client/src/Unified/Services/Submitter/Service.cs b/Client/src/Unified/Services/Submitter/Service.cs index 901e2d19..8dfa34c4 100644 --- a/Client/src/Unified/Services/Submitter/Service.cs +++ b/Client/src/Unified/Services/Submitter/Service.cs @@ -1,6 +1,6 @@ // This file is part of the ArmoniK project // -// Copyright (C) ANEO, 2021-2023. All rights reserved. +// Copyright (C) ANEO, 2021-2024. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License") // you may not use this file except in compliance with the License. @@ -797,8 +797,8 @@ private void ResultTask() /// gRPC channel // TODO: Refactor test to remove this // ReSharper disable once UnusedMember.Global - public GrpcChannel GetChannel() - => SessionService.ChannelPool.GetChannel(); + public ObjectPool GetChannelPool() + => SessionService.ChannelPool; /// /// Class to return TaskId and the result diff --git a/Common/src/Common/ArmoniK.DevelopmentKit.Common.csproj b/Common/src/Common/ArmoniK.DevelopmentKit.Common.csproj index d4b9dc58..4af40e4f 100644 --- a/Common/src/Common/ArmoniK.DevelopmentKit.Common.csproj +++ b/Common/src/Common/ArmoniK.DevelopmentKit.Common.csproj @@ -8,7 +8,7 @@ - + diff --git a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs index 89f36a7a..25d6e72f 100644 --- a/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs +++ b/Tests/ArmoniK.EndToEndTests/ArmoniK.EndToEndTests.Client/Tests/AggregationPriority/AggregationPriorityTest.cs @@ -157,7 +157,11 @@ private async Task> GetDistribution(int nRows) var taskRawData = new List(); - await foreach (var taskRaw in RetrieveAllTasksStats(service.GetChannel(), + await using var channel = await service!.GetChannelPool() + .GetAsync(CancellationToken.None) + .ConfigureAwait(false); + + await foreach (var taskRaw in RetrieveAllTasksStats(channel, new Filters { Or = From 039d3876163da1ba467624984ec051a8ac6d5f35 Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Thu, 6 Jun 2024 01:43:13 +0200 Subject: [PATCH 3/6] Add async internal method --- .../Common/Submitter/BaseClientSubmitter.cs | 1095 +++++++++++------ Client/src/Common/Submitter/TasksClientExt.cs | 48 +- Common/src/Common/RetryAction.cs | 196 ++- 3 files changed, 980 insertions(+), 359 deletions(-) diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs index 354e9dab..26749b46 100644 --- a/Client/src/Common/Submitter/BaseClientSubmitter.cs +++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs @@ -1,6 +1,6 @@ // This file is part of the ArmoniK project // -// Copyright (C) ANEO, 2021-2023. All rights reserved. +// Copyright (C) ANEO, 2021-2024. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License") // you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -45,7 +46,9 @@ using Microsoft.Extensions.Logging; using CreateSessionRequest = ArmoniK.Api.gRPC.V1.Sessions.CreateSessionRequest; +using FilterField = ArmoniK.Api.gRPC.V1.Results.FilterField; using Filters = ArmoniK.Api.gRPC.V1.Tasks.Filters; +using FiltersAnd = ArmoniK.Api.gRPC.V1.Results.FiltersAnd; using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; namespace ArmoniK.DevelopmentKit.Client.Common.Submitter; @@ -67,11 +70,6 @@ public abstract class BaseClientSubmitter private readonly Properties properties_; - /// - /// The channel pool to use for creating clients - /// - private ObjectPool? channelPool_; - /// /// Base Object for all Client submitter /// @@ -91,10 +89,15 @@ protected BaseClientSubmitter(Properties properties, properties_ = properties; Logger = loggerFactory.CreateLogger(); chunkSubmitSize_ = chunkSubmitSize; - SessionId = session ?? CreateSession(new[] - { - TaskOptions.PartitionId, - }); + + ChannelPool = ClientServiceConnector.ControlPlaneConnectionPool(properties_, + LoggerFactory); + + SessionId = session ?? CreateSessionAsync(new[] + { + TaskOptions.PartitionId, + }) + .WaitSync(); configuration_ = ChannelPool.WithInstance(channel => new Results.ResultsClient(channel).GetServiceConfiguration(new Empty()) .DataChunkMaxSize); @@ -116,9 +119,7 @@ protected BaseClientSubmitter(Properties properties, /// /// The channel pool to use for creating clients /// - public ObjectPool ChannelPool - => channelPool_ ??= ClientServiceConnector.ControlPlaneConnectionPool(properties_, - LoggerFactory); + public ObjectPool ChannelPool { get; } /// /// The logger to call the generate log in Seq @@ -126,20 +127,24 @@ public ObjectPool ChannelPool protected ILogger Logger { get; } - private Session CreateSession(IEnumerable partitionIds) + private async ValueTask CreateSessionAsync(IEnumerable partitionIds, + CancellationToken cancellationToken = default) { using var _ = Logger.LogFunction(); Logger.LogDebug("Creating Session... "); - using var channel = ChannelPool.Get(); - var sessionsClient = new Sessions.SessionsClient(channel); - var createSessionReply = sessionsClient.CreateSession(new CreateSessionRequest - { - DefaultTaskOption = TaskOptions, - PartitionIds = - { - partitionIds, - }, - }); + await using var channel = await ChannelPool.GetAsync(cancellationToken) + .ConfigureAwait(false); + var sessionsClient = new Sessions.SessionsClient(channel); + var createSessionReply = await sessionsClient.CreateSessionAsync(new CreateSessionRequest + { + DefaultTaskOption = TaskOptions, + PartitionIds = + { + partitionIds, + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); Logger.LogDebug("Session Created {SessionId}", SessionId); return new Session @@ -153,58 +158,112 @@ private Session CreateSession(IEnumerable partitionIds) /// Returns the status of the task /// /// The taskId of the task + /// /// - public TaskStatus GetTaskStatus(string taskId) + [PublicAPI] + public async ValueTask GetTaskStatusAsync(string taskId, + CancellationToken cancellationToken = default) { - var status = GetTaskStatues(taskId) - .Single(); + var status = await GetTaskStatuesAsync(new[] + { + taskId, + }, + cancellationToken) + .SingleAsync(cancellationToken) + .ConfigureAwait(false); return status.Item2; } + /// + /// Returns the status of the task + /// + /// The taskId of the task + /// + [PublicAPI] + public TaskStatus GetTaskStatus(string taskId) + => GetTaskStatusAsync(taskId) + .WaitSync(); + + /// /// Returns the list status of the tasks /// /// The list of taskIds + /// /// - public IEnumerable> GetTaskStatues(params string[] taskIds) + [PublicAPI] + public IAsyncEnumerable> GetTaskStatuesAsync(CancellationToken cancellationToken = default, + params string[] taskIds) + => GetTaskStatuesAsync(taskIds, + cancellationToken); + + /// + /// Returns the list status of the tasks + /// + /// The list of taskIds + /// + /// + private async IAsyncEnumerable> GetTaskStatuesAsync(string[] taskIds, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { - using var channel = ChannelPool.Get(); - var tasksClient = new Tasks.TasksClient(channel); - return tasksClient.ListTasks(new Filters - { - Or = - { - taskIds.Select(TasksClientExt.TaskIdFilter), - }, - }, - new ListTasksRequest.Types.Sort - { - Direction = SortDirection.Asc, - Field = new TaskField + await using var channel = await ChannelPool.GetAsync(cancellationToken) + .ConfigureAwait(false); + var tasksClient = new Tasks.TasksClient(channel); + var tasks = tasksClient.ListTasksAsync(new Filters { - TaskSummaryField = new TaskSummaryField - { - Field = TaskSummaryEnumField.TaskId, - }, + Or = + { + taskIds.Select(TasksClientExt.TaskIdFilter), + }, }, - }) - .Select(task => new Tuple(task.Id, - task.Status)); + new ListTasksRequest.Types.Sort + { + Direction = SortDirection.Asc, + Field = new TaskField + { + TaskSummaryField = new TaskSummaryField + { + Field = TaskSummaryEnumField.TaskId, + }, + }, + }, + cancellationToken: cancellationToken); + await foreach (var task in tasks.ConfigureAwait(false)) + { + yield return new Tuple(task.Id, + task.Status); + } } + /// + /// Returns the list status of the tasks + /// + /// The list of taskIds + /// + [PublicAPI] + public IEnumerable> GetTaskStatues(params string[] taskIds) + => GetTaskStatuesAsync(taskIds) + .ToEnumerable(); + /// /// Return the taskOutput when error occurred /// /// + /// /// // TODO: This function should not have Output as a return type because it is a gRPC type - public Output GetTaskOutputInfo(string taskId) + [PublicAPI] + public async ValueTask GetTaskOutputInfoAsync(string taskId, + CancellationToken cancellationToken = default) { - var getTaskResponse = ChannelPool.WithInstance(channel => new Tasks.TasksClient(channel).GetTask(new GetTaskRequest - { - TaskId = taskId, - })); + var getTaskResponse = await ChannelPool.WithInstanceAsync(async channel => await new Tasks.TasksClient(channel).GetTaskAsync(new GetTaskRequest + { + TaskId = taskId, + }) + .ConfigureAwait(false), + cancellationToken) + .ConfigureAwait(false); return new Output { Error = new Output.Types.Error @@ -215,6 +274,51 @@ public Output GetTaskOutputInfo(string taskId) } + /// + /// Return the taskOutput when error occurred + /// + /// + /// + + // TODO: This function should not have Output as a return type because it is a gRPC type + [PublicAPI] + public Output GetTaskOutputInfo(string taskId) + => GetTaskOutputInfoAsync(taskId) + .WaitSync(); + + + /// + /// The method to submit several tasks with dependencies tasks. This task will wait for + /// to start until all dependencies are completed successfully + /// + /// + /// A list of Tuple(resultId, payload, parent dependencies) in dependence of those + /// created tasks + /// + /// The number of retry before fail to submit task. Default = 5 retries + /// TaskOptions overrides if non null override default in Session + /// + /// The result ids must first be created using + /// return a list of taskIds of the created tasks + [PublicAPI] + public async IAsyncEnumerable SubmitTasksWithDependenciesAsync(IEnumerable>> payloadsWithDependencies, + int maxRetries = 5, + TaskOptions? taskOptions = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + foreach (var chunk in payloadsWithDependencies.ToChunks(chunkSubmitSize_)) + { + await foreach (var taskIds in ChunkSubmitTasksWithDependenciesAsync(chunk, + maxRetries, + taskOptions ?? TaskOptions, + cancellationToken) + .ConfigureAwait(false)) + { + yield return taskIds; + } + } + } + /// /// The method to submit several tasks with dependencies tasks. This task will wait for /// to start until all dependencies are completed successfully @@ -231,10 +335,54 @@ public Output GetTaskOutputInfo(string taskId) public IEnumerable SubmitTasksWithDependencies(IEnumerable>> payloadsWithDependencies, int maxRetries = 5, TaskOptions? taskOptions = null) - => payloadsWithDependencies.ToChunks(chunkSubmitSize_) - .SelectMany(chunk => ChunkSubmitTasksWithDependencies(chunk, - maxRetries, - taskOptions ?? TaskOptions)); + => SubmitTasksWithDependenciesAsync(payloadsWithDependencies, + maxRetries, + taskOptions) + .ToEnumerable(); + + /// + /// The method to submit several tasks with dependencies tasks. This task will wait for + /// to start until all dependencies are completed successfully + /// + /// + /// A list of Tuple(Payload, parent dependencies) in dependence of those created + /// tasks + /// + /// The number of retry before fail to submit task. Default = 5 retries + /// + /// TaskOptions argument to override default taskOptions in Session. + /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker + /// + /// + /// return a list of taskIds of the created tasks + [PublicAPI] + public async IAsyncEnumerable SubmitTasksWithDependenciesAsync(IEnumerable>> payloadsWithDependencies, + int maxRetries = 5, + TaskOptions? taskOptions = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + foreach (var chunk in payloadsWithDependencies.ToChunks(chunkSubmitSize_)) + { + var resultsMetadata = await CreateResultsMetadataAsync(Enumerable.Range(0, + chunk.Length) + .Select(_ => Guid.NewGuid() + .ToString()), + cancellationToken) + .ConfigureAwait(false); + var tasks = ChunkSubmitTasksWithDependenciesAsync(chunk.Zip(resultsMetadata, + (payloadWithDependencies, + metadata) => Tuple.Create(metadata.Value, + payloadWithDependencies.Item1, + payloadWithDependencies.Item2)), + maxRetries, + taskOptions ?? TaskOptions, + cancellationToken); + await foreach (var taskId in tasks.ConfigureAwait(false)) + { + yield return taskId; + } + } + } /// /// The method to submit several tasks with dependencies tasks. This task will wait for @@ -254,22 +402,10 @@ public IEnumerable SubmitTasksWithDependencies(IEnumerable SubmitTasksWithDependencies(IEnumerable>> payloadsWithDependencies, int maxRetries = 5, TaskOptions? taskOptions = null) - => payloadsWithDependencies.ToChunks(chunkSubmitSize_) - .SelectMany(chunk => - { - // Create the result metadata before sending the tasks. - var resultsMetadata = CreateResultsMetadata(Enumerable.Range(0, - chunk.Length) - .Select(_ => Guid.NewGuid() - .ToString())); - return ChunkSubmitTasksWithDependencies(chunk.Zip(resultsMetadata, - (payloadWithDependencies, - metadata) => Tuple.Create(metadata.Value, - payloadWithDependencies.Item1, - payloadWithDependencies.Item2)), - maxRetries, - taskOptions ?? TaskOptions); - }); + => SubmitTasksWithDependenciesAsync(payloadsWithDependencies, + maxRetries, + taskOptions) + .ToEnumerable(); /// @@ -282,23 +418,24 @@ public IEnumerable SubmitTasksWithDependencies(IEnumerable + /// /// return the ids of the created tasks - [PublicAPI] - private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable>> payloadsWithDependencies, - int maxRetries, - TaskOptions? taskOptions = null) + private async IAsyncEnumerable ChunkSubmitTasksWithDependenciesAsync(IEnumerable>> payloadsWithDependencies, + int maxRetries, + TaskOptions? taskOptions = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { using var _ = Logger.LogFunction(); - var tasks = new List(); - var tasksSubmitted = new List(); + var tasks = new List(); foreach (var (resultId, payload, dependencies) in payloadsWithDependencies) { for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++) { - using var channel = ChannelPool.Get(); - var resultsClient = new Results.ResultsClient(channel); + await using var channel = await ChannelPool.GetAsync(cancellationToken) + .ConfigureAwait(false); + var resultsClient = new Results.ResultsClient(channel); try { @@ -306,36 +443,40 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable configuration_) { - payloadId = resultsClient.CreateResultsMetaData(new CreateResultsMetaDataRequest - { - SessionId = SessionId.Id, - Results = - { - new CreateResultsMetaDataRequest.Types.ResultCreate(), - }, - }) - .Results.Select(raw => raw.ResultId) - .Single(); - - resultsClient.UploadResultData(SessionId.Id, - payloadId, - payload); + var response = await resultsClient.CreateResultsMetaDataAsync(new CreateResultsMetaDataRequest + { + SessionId = SessionId.Id, + Results = + { + new CreateResultsMetaDataRequest.Types.ResultCreate(), + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + payloadId = response.Results.Select(raw => raw.ResultId) + .Single(); + + await resultsClient.UploadResultData(SessionId.Id, + payloadId, + payload); } else { - payloadId = resultsClient.CreateResults(new CreateResultsRequest - { - SessionId = SessionId.Id, - Results = - { - new CreateResultsRequest.Types.ResultCreate - { - Data = UnsafeByteOperations.UnsafeWrap(payload), - }, - }, - }) - .Results.Select(raw => raw.ResultId) - .Single(); + var response = await resultsClient.CreateResultsAsync(new CreateResultsRequest + { + SessionId = SessionId.Id, + Results = + { + new CreateResultsRequest.Types.ResultCreate + { + Data = UnsafeByteOperations.UnsafeWrap(payload), + }, + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + payloadId = response.Results.Select(raw => raw.ResultId) + .Single(); } @@ -399,24 +540,27 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable info.TaskId)); + submitTasksResponse = await tasksClient.SubmitTasksAsync(new SubmitTasksRequest + { + TaskOptions = taskOptions, + SessionId = SessionId.Id, + TaskCreations = + { + taskChunk, + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + // break retry loop because submission is successful break; } @@ -455,9 +599,39 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable + /// User method to wait for only the parent task from the client + /// + /// + /// The task taskId of the task to wait for + /// + /// Max number of retries for the underlying calls + /// Delay between retries + /// + [PublicAPI] + public async ValueTask WaitForTaskCompletionAsync(string taskId, + int maxRetries = 5, + int delayMs = 20000, + CancellationToken cancellationToken = default) + { + using var _ = Logger.LogFunction(taskId); + + await WaitForTasksCompletionAsync(new[] + { + taskId, + }, + maxRetries, + delayMs, + cancellationToken) + .ConfigureAwait(false); } /// @@ -468,18 +642,72 @@ private IEnumerable ChunkSubmitTasksWithDependencies(IEnumerable /// Max number of retries for the underlying calls /// Delay between retries + [PublicAPI] public void WaitForTaskCompletion(string taskId, int maxRetries = 5, int delayMs = 20000) + => WaitForTaskCompletionAsync(taskId, + maxRetries, + delayMs) + .WaitSync(); + + /// + /// User method to wait for only the parent task from the client + /// + /// + /// List of taskIds + /// + /// Max number of retries + /// + /// + [PublicAPI] + public async ValueTask WaitForTasksCompletionAsync(IEnumerable taskIds, + int maxRetries = 5, + int delayMs = 20000, + CancellationToken cancellationToken = default) { - using var _ = Logger.LogFunction(taskId); + using var _ = Logger.LogFunction(); - WaitForTasksCompletion(new[] - { - taskId, - }, - maxRetries, - delayMs); + var filter = new TaskFilter + { + Task = new TaskFilter.Types.IdsRequest + { + Ids = + { + taskIds, + }, + }, + }; + + await Retry.WhileException(maxRetries, + delayMs, + async retry => + { + await using var channel = await ChannelPool.GetAsync(cancellationToken) + .ConfigureAwait(false); + var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); + + if (retry > 1) + { + Logger.LogWarning("Try {try} for {funcName}", + retry, + nameof(submitterService.WaitForCompletion)); + } + + var __ = await submitterService.WaitForCompletionAsync(new WaitRequest + { + Filter = filter, + StopOnFirstTaskCancellation = true, + StopOnFirstTaskError = true, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + }, + true, + Logger, + typeof(IOException), + typeof(RpcException)) + .ConfigureAwait(false); } /// @@ -494,44 +722,10 @@ public void WaitForTaskCompletion(string taskId, public void WaitForTasksCompletion(IEnumerable taskIds, int maxRetries = 5, int delayMs = 20000) - { - using var _ = Logger.LogFunction(); - - Retry.WhileException(maxRetries, - delayMs, - retry => - { - using var channel = ChannelPool.Get(); - var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel); - - if (retry > 1) - { - Logger.LogWarning("Try {try} for {funcName}", - retry, - nameof(submitterService.WaitForCompletion)); - } - - var __ = submitterService.WaitForCompletion(new WaitRequest - { - Filter = new TaskFilter - { - Task = new TaskFilter.Types.IdsRequest - { - Ids = - { - taskIds, - }, - }, - }, - StopOnFirstTaskCancellation = true, - StopOnFirstTaskError = true, - }); - }, - true, - Logger, - typeof(IOException), - typeof(RpcException)); - } + => WaitForTasksCompletionAsync(taskIds, + maxRetries, + delayMs) + .WaitSync(); /// /// Get the result status of a list of results @@ -539,11 +733,14 @@ public void WaitForTasksCompletion(IEnumerable taskIds, /// Collection of task ids from which to retrieve results /// /// A ResultCollection sorted by Status Completed, Result in Error or missing - public ResultStatusCollection GetResultStatus(IEnumerable taskIds, - CancellationToken cancellationToken = default) + [PublicAPI] + public async ValueTask GetResultStatusAsync(IEnumerable taskIds, + CancellationToken cancellationToken = default) { - var taskList = taskIds.ToList(); - var mapTaskResults = GetResultIds(taskList); + var taskList = taskIds.ToList(); + var mapTaskResults = await GetResultIdsAsync(taskList, + cancellationToken) + .ConfigureAwait(false); var result2TaskDic = mapTaskResults.ToDictionary(result => result.ResultIds.Single(), result => result.TaskId); @@ -555,35 +752,96 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, ResultStatus.Notfound)) : Array.Empty(); - var idStatus = Retry.WhileException(5, - 2000, - retry => - { - Logger.LogDebug("Try {try} for {funcName}", - retry, - nameof(Results.ResultsClient.GetResult)); - - // TODO: use ListResult - var idStatusPair = result2TaskDic.Keys.ParallelSelect(async resultId => - { - using var channel = ChannelPool.Get(); - var resultsClient = new Results.ResultsClient(channel); - var result = await resultsClient.GetResultAsync(new GetResultRequest + var idStatus = await Retry.WhileException(5, + 2000, + async retry => + { + Logger.LogDebug("Try {try} for {funcName}", + retry, + nameof(Results.ResultsClient.GetResult)); + + return await result2TaskDic.Keys.ToChunks(100) + .ParallelSelect(async chunk => + { + await using var channel = await ChannelPool.GetAsync(cancellationToken) + .ConfigureAwait(false); + + var resultsClient = new Results.ResultsClient(channel); + var filters = chunk.Select(resultId => new FiltersAnd + { + And = + { + new FilterField + { + Field = new ResultField { - ResultId = resultId, - }) - .ConfigureAwait(false); - var status = result.Result.Status; - return (resultId, status); - }) - .ToListAsync(CancellationToken.None) - .WaitSync(); - return idStatusPair; - }, - true, - Logger, - typeof(IOException), - typeof(RpcException)); + ResultRawField = + new ResultRawField + { + Field = + ResultRawEnumField + .ResultId, + }, + }, + FilterString = new FilterString + { + Operator = + FilterStringOperator + .Equal, + Value = + resultId, + }, + }, + }, + }); + var res = await resultsClient.ListResultsAsync(new ListResultsRequest + { + Filters = + new Api.gRPC.V1.Results. + Filters + { + Or = + { + filters, + }, + }, + Sort = + new ListResultsRequest. + Types.Sort + { + Direction = + SortDirection.Asc, + Field = new ResultField + { + ResultRawField = + new + ResultRawField + { + Field = + ResultRawEnumField + .ResultId, + }, + }, + }, + PageSize = 100, + }, + cancellationToken: + cancellationToken) + .ConfigureAwait(false); + return res; + }) + .SelectMany(results => results + .Results.Select(result => (resultId: result.ResultId, + status: result.Status)) + .ToAsyncEnumerable()) + .ToListAsync(cancellationToken) + .ConfigureAwait(false); + }, + true, + Logger, + typeof(IOException), + typeof(RpcException)) + .ConfigureAwait(false); var idsResultError = new List(); var idsReady = new List(); @@ -624,15 +882,32 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, return resultStatusList; } + + /// + /// Get the result status of a list of results + /// + /// Collection of task ids from which to retrieve results + /// + /// A ResultCollection sorted by Status Completed, Result in Error or missing + [PublicAPI] + public ResultStatusCollection GetResultStatus(IEnumerable taskIds, + CancellationToken cancellationToken = default) + => GetResultStatusAsync(taskIds, + cancellationToken) + .WaitSync(); + /// /// Gets the result ids for a given list of task ids. /// /// The list of task ids. + /// /// A collection of map task results. - public ICollection GetResultIds(IEnumerable taskIds) + [PublicAPI] + public ValueTask> GetResultIdsAsync(IEnumerable taskIds, + CancellationToken cancellationToken = default) => Retry.WhileException(5, 2000, - retry => + async retry => { if (retry > 1) { @@ -641,39 +916,60 @@ public ResultStatusCollection GetResultStatus(IEnumerable taskIds, nameof(GetResultIds)); } - return ChannelPool.WithInstance(channel => new Tasks.TasksClient(channel).GetResultIds(new GetResultIdsRequest - { - TaskId = - { - taskIds, - }, - }) - .TaskResults); + await using var channel = await ChannelPool.GetAsync(cancellationToken) + .ConfigureAwait(false); + var taskClient = new Tasks.TasksClient(channel); + + var response = await taskClient.GetResultIdsAsync(new GetResultIdsRequest + { + TaskId = + { + taskIds, + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + return response.TaskResults.AsICollection(); }, true, Logger, + cancellationToken, typeof(IOException), typeof(RpcException)); + /// + /// Gets the result ids for a given list of task ids. + /// + /// The list of task ids. + /// A collection of map task results. + [PublicAPI] + public ICollection GetResultIds(IEnumerable taskIds) + => GetResultIdsAsync(taskIds) + .WaitSync(); + /// /// Try to find the result of One task. If there no result, the function return byte[0] /// /// The Id of the task /// The optional cancellationToken /// Returns the result or byte[0] if there no result - public byte[] GetResult(string taskId, - CancellationToken cancellationToken = default) + [PublicAPI] + public async ValueTask GetResultAsync(string taskId, + CancellationToken cancellationToken = default) { using var _ = Logger.LogFunction(taskId); try { - var resultId = GetResultIds(new[] - { - taskId, - }) - .Single() - .ResultIds.Single(); + var results = await GetResultIdsAsync(new[] + { + taskId, + }, + cancellationToken) + .ConfigureAwait(false); + var resultId = results.Single() + .ResultIds.Single(); var resultRequest = new ResultRequest @@ -681,24 +977,30 @@ public byte[] GetResult(string taskId, ResultId = resultId, Session = SessionId.Id, }; - using var channel = ChannelPool.Get(); - var eventsClient = new Events.EventsClient(channel); - eventsClient.WaitForResultsAsync(SessionId.Id, - new List - { - resultId, - }, - cancellationToken) - .Wait(cancellationToken); - - return Retry.WhileException(5, - 200, - _ => TryGetResultAsync(resultRequest, - cancellationToken) - .Result, - true, - typeof(IOException), - typeof(RpcException))!; + + { + await using var channel = await ChannelPool.GetAsync(cancellationToken) + .ConfigureAwait(false); + var eventsClient = new Events.EventsClient(channel); + + await eventsClient.WaitForResultsAsync(SessionId.Id, + new List + { + resultId, + }, + cancellationToken) + .ConfigureAwait(false); + } + + return await Retry.WhileException(5, + 200, + _ => TryGetResultAsync(resultRequest, + cancellationToken), + true, + cancellationToken, + typeof(IOException), + typeof(RpcException)) + .ConfigureAwait(false)!; } catch (Exception ex) { @@ -708,6 +1010,40 @@ public byte[] GetResult(string taskId, } } + /// + /// Try to find the result of One task. If there no result, the function return byte[0] + /// + /// The Id of the task + /// The optional cancellationToken + /// Returns the result or byte[0] if there no result + [PublicAPI] + public byte[] GetResult(string taskId, + CancellationToken cancellationToken = default) + => GetResultAsync(taskId, + cancellationToken) + .WaitSync(); + + /// + /// Retrieve results from control plane + /// + /// Collection of task ids + /// The optional cancellationToken + /// return a dictionary with key taskId and payload + /// + /// + [PublicAPI] + public IAsyncEnumerable> GetResultsAsync(IEnumerable taskIds, + CancellationToken cancellationToken = default) + => taskIds.ParallelSelect(async id => + { + var res = await GetResultAsync(id, + cancellationToken) + .ConfigureAwait(false); + + return new Tuple(id, + res); + }); + /// /// Retrieve results from control plane /// @@ -716,17 +1052,12 @@ public byte[] GetResult(string taskId, /// return a dictionary with key taskId and payload /// /// + [PublicAPI] public IEnumerable> GetResults(IEnumerable taskIds, CancellationToken cancellationToken = default) - => taskIds.AsParallel() - .Select(id => - { - var res = GetResult(id, - cancellationToken); - - return new Tuple(id, - res); - }); + => GetResultsAsync(taskIds, + cancellationToken) + .ToEnumerable(); /// /// Try to get the result if it is available @@ -738,8 +1069,9 @@ public IEnumerable> GetResults(IEnumerable taskIds /// // TODO: return a compound type to avoid having a nullable that holds the information and return an empty array. // TODO: This function should not have an argument of type ResultRequest because it is a gRPC type - public async Task TryGetResultAsync(ResultRequest resultRequest, - CancellationToken cancellationToken = default) + [PublicAPI] + public async ValueTask TryGetResultAsync(ResultRequest resultRequest, + CancellationToken cancellationToken = default) { await using var channel = await ChannelPool.GetAsync(cancellationToken) .ConfigureAwait(false); @@ -774,6 +1106,22 @@ public IEnumerable> GetResults(IEnumerable taskIds } } + /// + /// Try to find the result of One task. If there no result, the function return byte[0] + /// + /// The Id of the task + /// + /// The optional cancellationToken + /// Returns the result or byte[0] if there no result or null if task is not yet ready + // TODO: return a compound type to avoid having a nullable that holds the information and return an empty array. + [PublicAPI] + [Obsolete("Use version without the checkOutput parameter.")] + public ValueTask TryGetResultAsync(string taskId, + bool checkOutput, + CancellationToken cancellationToken = default) + => TryGetResultAsync(taskId, + cancellationToken); + /// /// Try to find the result of One task. If there no result, the function return byte[0] /// @@ -787,8 +1135,10 @@ public IEnumerable> GetResults(IEnumerable taskIds public byte[]? TryGetResult(string taskId, bool checkOutput, CancellationToken cancellationToken = default) - => TryGetResult(taskId, - cancellationToken); + => TryGetResultAsync(taskId, + checkOutput, + cancellationToken) + .WaitSync(); /// @@ -799,16 +1149,18 @@ public IEnumerable> GetResults(IEnumerable taskIds /// Returns the result or byte[0] if there no result or null if task is not yet ready // TODO: return a compound type to avoid having a nullable that holds the information and return an empty array. [PublicAPI] - public byte[]? TryGetResult(string taskId, - CancellationToken cancellationToken = default) + public async ValueTask TryGetResultAsync(string taskId, + CancellationToken cancellationToken = default) { using var _ = Logger.LogFunction(taskId); - var resultId = GetResultIds(new[] - { - taskId, - }) - .Single() - .ResultIds.Single(); + var resultResponse = await GetResultIdsAsync(new[] + { + taskId, + }, + cancellationToken) + .ConfigureAwait(false); + var resultId = resultResponse.Single() + .ResultIds.Single(); var resultRequest = new ResultRequest { @@ -816,79 +1168,99 @@ public IEnumerable> GetResults(IEnumerable taskIds Session = SessionId.Id, }; - var resultReply = Retry.WhileException(5, - 2000, - retry => - { - if (retry > 1) - { - Logger.LogWarning("Try {try} for {funcName}", - retry, - "SubmitterService.TryGetResultAsync"); - } - - try - { - var response = TryGetResultAsync(resultRequest, - cancellationToken) - .Result; - return response; - } - catch (AggregateException ex) - { - if (ex.InnerException == null) - { - throw; - } + var resultReply = await Retry.WhileException(5, + 2000, + async retry => + { + if (retry > 1) + { + Logger.LogWarning("Try {try} for {funcName}", + retry, + "SubmitterService.TryGetResultAsync"); + } + + try + { + var response = await TryGetResultAsync(resultRequest, + cancellationToken) + .ConfigureAwait(false); + return response; + } + catch (AggregateException ex) + { + if (ex.InnerException == null) + { + throw; + } + + var rpcException = ex.InnerException; + + switch (rpcException) + { + //Not yet available return from the tryGetResult + case RpcException + { + StatusCode: StatusCode.NotFound, + }: + return null; - var rpcException = ex.InnerException; + //We lost the communication rethrow to retry : + case RpcException + { + StatusCode: StatusCode.Unavailable, + }: + throw; - switch (rpcException) - { - //Not yet available return from the tryGetResult - case RpcException - { - StatusCode: StatusCode.NotFound, - }: - return null; - - //We lost the communication rethrow to retry : - case RpcException - { - StatusCode: StatusCode.Unavailable, - }: - throw; - - case RpcException - { - StatusCode: StatusCode.Aborted or StatusCode.Cancelled, - }: - - Logger.LogError(rpcException, - "Error while trying to get a result: {error}", - rpcException.Message); - return null; - default: - throw; - } - } - }, - true, - Logger, - typeof(IOException), - typeof(RpcException)); + case RpcException + { + StatusCode: StatusCode.Aborted or StatusCode.Cancelled, + }: + + Logger.LogError(rpcException, + "Error while trying to get a result: {error}", + rpcException.Message); + return null; + default: + throw; + } + } + }, + true, + Logger, + cancellationToken, + typeof(IOException), + typeof(RpcException)); return resultReply; } + /// + /// Try to find the result of One task. If there no result, the function return byte[0] + /// + /// The Id of the task + /// The optional cancellationToken + /// Returns the result or byte[0] if there no result or null if task is not yet ready + // TODO: return a compound type to avoid having a nullable that holds the information and return an empty array. + [PublicAPI] + public byte[]? TryGetResult(string taskId, + CancellationToken cancellationToken = default) + => TryGetResultAsync(taskId, + cancellationToken) + .WaitSync(); + /// /// Try to get result of a list of taskIds /// /// A list of result ids + /// /// Returns an Enumerable pair of - public IList> TryGetResults(IList resultIds) + [PublicAPI] + public async IAsyncEnumerable> TryGetResultsAsync(IList resultIds, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { - var resultStatus = GetResultStatus(resultIds); + var resultStatus = await GetResultStatusAsync(resultIds, + cancellationToken) + .ConfigureAwait(false); if (!resultStatus.IdsReady.Any() && !resultStatus.IdsNotReady.Any()) { @@ -928,22 +1300,65 @@ public IList> TryGetResults(IList resultIds) } } - return resultStatus.IdsReady.Select(resultStatusData => + foreach (var resultStatusData in resultStatus.IdsReady) + { + var res = await TryGetResultAsync(new ResultRequest { - var res = TryGetResultAsync(new ResultRequest - { - ResultId = resultStatusData.ResultId, - Session = SessionId.Id, - }) - .Result; - return res == null - ? null - : new Tuple(resultStatusData.TaskId, - res); - }) - .Where(tuple => tuple is not null) - .Select(tuple => tuple!) - .ToList(); + ResultId = resultStatusData.ResultId, + Session = SessionId.Id, + }, + cancellationToken) + .ConfigureAwait(false); + + if (res is null) + { + continue; + } + + yield return new Tuple(resultStatusData.TaskId, + res); + } + } + + /// + /// Try to get result of a list of taskIds + /// + /// A list of result ids + /// Returns an Enumerable pair of + [PublicAPI] + public IList> TryGetResults(IList resultIds) + => TryGetResultsAsync(resultIds) + .ToListAsync() + .WaitSync(); + + /// + /// Creates the results metadata + /// + /// Results names + /// + /// Dictionary where each result name is associated with its result id + [PublicAPI] + public async ValueTask> CreateResultsMetadataAsync(IEnumerable resultNames, + CancellationToken cancellationToken = default) + { + await using var channel = await ChannelPool.GetAsync(cancellationToken) + .ConfigureAwait(false); + var client = new Results.ResultsClient(channel); + var results = await client.CreateResultsMetaDataAsync(new CreateResultsMetaDataRequest + { + SessionId = SessionId.Id, + Results = + { + resultNames.Select(name => new CreateResultsMetaDataRequest.Types.ResultCreate + { + Name = name, + }), + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + return results.Results.ToDictionary(r => r.Name, + r => r.ResultId); } /// @@ -953,18 +1368,6 @@ public IList> TryGetResults(IList resultIds) /// Dictionary where each result name is associated with its result id [PublicAPI] public Dictionary CreateResultsMetadata(IEnumerable resultNames) - => ChannelPool.WithInstance(c => new Results.ResultsClient(c).CreateResultsMetaData(new CreateResultsMetaDataRequest - { - SessionId = SessionId.Id, - Results = - { - resultNames.Select(name - => new CreateResultsMetaDataRequest.Types.ResultCreate - { - Name = name, - }), - }, - })) - .Results.ToDictionary(r => r.Name, - r => r.ResultId); + => CreateResultsMetadataAsync(resultNames) + .WaitSync(); } diff --git a/Client/src/Common/Submitter/TasksClientExt.cs b/Client/src/Common/Submitter/TasksClientExt.cs index 3e4625a9..df9e0675 100644 --- a/Client/src/Common/Submitter/TasksClientExt.cs +++ b/Client/src/Common/Submitter/TasksClientExt.cs @@ -1,6 +1,6 @@ // This file is part of the ArmoniK project // -// Copyright (C) ANEO, 2021-$CURRENT_YEAR$. All rights reserved. +// Copyright (C) ANEO, 2021-2024. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License") // you may not use this file except in compliance with the License. @@ -17,6 +17,8 @@ using System.Collections.Generic; using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; using ArmoniK.Api.gRPC.V1; using ArmoniK.Api.gRPC.V1.Tasks; @@ -133,21 +135,25 @@ public static FiltersAnd TaskStatusFilter(TaskStatus status, /// filters to apply on the tasks /// sorting order /// page size + /// /// - public static IEnumerable ListTasks(this Tasks.TasksClient tasksClient, - Filters filters, - ListTasksRequest.Types.Sort sort, - int pageSize = 50) + public static async IAsyncEnumerable ListTasksAsync(this Tasks.TasksClient tasksClient, + Filters filters, + ListTasksRequest.Types.Sort sort, + int pageSize = 50, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { var page = 0; ListTasksResponse res; - while ((res = tasksClient.ListTasks(new ListTasksRequest - { - Filters = filters, - Sort = sort, - PageSize = pageSize, - Page = page, - })).Tasks.Any()) + while ((res = await tasksClient.ListTasksAsync(new ListTasksRequest + { + Filters = filters, + Sort = sort, + PageSize = pageSize, + Page = page, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false)).Tasks.Any()) { foreach (var taskSummary in res.Tasks) { @@ -157,4 +163,22 @@ public static IEnumerable ListTasks(this Tasks.TasksClient tas page++; } } + + /// + /// List tasks while handling page size + /// + /// the tasks client + /// filters to apply on the tasks + /// sorting order + /// page size + /// + public static IEnumerable ListTasks(this Tasks.TasksClient tasksClient, + Filters filters, + ListTasksRequest.Types.Sort sort, + int pageSize = 50) + => ListTasksAsync(tasksClient, + filters, + sort, + pageSize) + .ToEnumerable(); } diff --git a/Common/src/Common/RetryAction.cs b/Common/src/Common/RetryAction.cs index 26cc0bb6..60ab1683 100644 --- a/Common/src/Common/RetryAction.cs +++ b/Common/src/Common/RetryAction.cs @@ -1,6 +1,6 @@ // This file is part of the ArmoniK project // -// Copyright (C) ANEO, 2021-2023.All rights reserved. +// Copyright (C) ANEO, 2021-2024. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License") // you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ using System; using System.Linq; using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -223,4 +224,197 @@ public static T WhileException(int retries, // we're out of retries. If it's unexpected, throwing is the right thing to do anyway return operation(retries); } + + + /// + /// Retry the specified operation the specified number of times, until there are no more retries or it succeeded + /// without an exception. + /// + /// The number of times to retry the operation + /// The number of milliseconds to sleep after a failed invocation of the operation + /// the operation to perform + /// + /// if not null, ignore any exceptions of this type and subtypes + /// + /// If true, exceptions deriving from the specified exception type are ignored as + /// well. Defaults to False + /// + /// When one of the retries succeeds, return the value the operation returned. If not, an exception is thrown. + public static ValueTask WhileException(int retries, + int delayMs, + Func operation, + bool allowDerivedExceptions = false, + CancellationToken cancellationToken = default, + params Type[] exceptionType) + => WhileException(retries, + delayMs, + operation, + allowDerivedExceptions, + null, + cancellationToken, + exceptionType); + + /// + /// Retry the specified operation the specified number of times, until there are no more retries or it succeeded + /// without an exception. + /// + /// The number of times to retry the operation + /// The number of milliseconds to sleep after a failed invocation of the operation + /// the operation to perform + /// + /// if not null, ignore any exceptions of this type and subtypes + /// + /// If true, exceptions deriving from the specified exception type are ignored as + /// well. Defaults to False + /// + /// Logger to log retried exception + /// When one of the retries succeeds, return the value the operation returned. If not, an exception is thrown. + public static async ValueTask WhileException(int retries, + int delayMs, + Func operation, + bool allowDerivedExceptions = false, + ILogger? logger = null, + CancellationToken cancellationToken = default, + params Type[] exceptionType) + { + // Do all but one retries in the loop + for (var retry = 1; retry < retries; retry++) + { + try + { + // Try the operation. If it succeeds, return its result + await operation(retry) + .ConfigureAwait(false); + return; + } + catch (Exception ex) + { + // Oops - it did NOT succeed! + if (exceptionType != null && allowDerivedExceptions && ex is AggregateException && + exceptionType.Any(e => ex.InnerException != null && ex.InnerException.GetType() == e)) + { + logger?.LogDebug("Got exception while executing function to retry : {ex}", + ex); + Thread.Sleep(delayMs); + } + else if (exceptionType == null || exceptionType.Any(e => e == ex.GetType()) || (allowDerivedExceptions && exceptionType.Any(e => ex.GetType() + .IsSubclassOf(e)))) + { + // Ignore exceptions when exceptionType is not specified OR + // the exception thrown was of the specified exception type OR + // the exception thrown is derived from the specified exception type and we allow that + logger?.LogDebug("Got exception while executing function to retry : {ex}", + ex); + await Task.Delay(delayMs, + cancellationToken) + .ConfigureAwait(false); + } + else + { + // We have an unexpected exception! Re-throw it: + throw; + } + } + } + } + + /// + /// Retry the specified operation the specified number of times, until there are no more retries or it succeeded + /// without an exception. + /// + /// The return type of the exception + /// The number of times to retry the operation + /// The number of milliseconds to sleep after a failed invocation of the operation + /// the operation to perform + /// + /// if not null, ignore any exceptions of this type and subtypes + /// + /// If true, exceptions deriving from the specified exception type are ignored as + /// well. Defaults to False + /// + /// When one of the retries succeeds, return the value the operation returned. If not, an exception is thrown. + public static ValueTask WhileException(int retries, + int delayMs, + Func> operation, + bool allowDerivedExceptions = false, + CancellationToken cancellationToken = default, + params Type[] exceptionType) + => WhileException(retries, + delayMs, + operation, + allowDerivedExceptions, + null, + cancellationToken, + exceptionType); + + /// + /// Retry the specified operation the specified number of times, until there are no more retries or it succeeded + /// without an exception. + /// + /// The return type of the exception + /// The number of times to retry the operation + /// The number of milliseconds to sleep after a failed invocation of the operation + /// the operation to perform + /// + /// if not null, ignore any exceptions of this type and subtypes + /// + /// If true, exceptions deriving from the specified exception type are ignored as + /// well. Defaults to False + /// + /// Logger to log retried exception + /// When one of the retries succeeds, return the value the operation returned. If not, an exception is thrown. + public static async ValueTask WhileException(int retries, + int delayMs, + Func> operation, + bool allowDerivedExceptions = false, + ILogger? logger = null, + CancellationToken cancellationToken = default, + params Type[] exceptionType) + { + // Do all but one retries in the loop + for (var retry = 1; retry < retries; retry++) + { + try + { + // Try the operation. If it succeeds, return its result + return await operation(retry) + .ConfigureAwait(false); + } + catch (Exception ex) + { + if (exceptionType != null && allowDerivedExceptions && ex is AggregateException && + exceptionType.Any(e => ex.InnerException != null && ex.InnerException.GetType() == e)) + { + logger?.LogDebug("Got exception while executing function to retry : {ex}", + ex); + await Task.Delay(delayMs, + cancellationToken) + .ConfigureAwait(false); + } + else if (exceptionType == null || exceptionType.Any(e => e == ex.GetType()) || (allowDerivedExceptions && exceptionType.Any(e => ex.GetType() + .IsSubclassOf(e)))) + { + // Ignore exceptions when exceptionType is not specified OR + // the exception thrown was of the specified exception type OR + // the exception thrown is derived from the specified exception type and we allow that + logger?.LogDebug("Got exception while executing function to retry : {ex}", + ex); + await Task.Delay(delayMs, + cancellationToken) + .ConfigureAwait(false); + } + else + { + // We have an unexpected exception! Re-throw it: + throw; + } + } + } + + // Try the operation one last time. This may or may not succeed. + // Exceptions pass unchanged. If this is an expected exception we need to know about it because + // we're out of retries. If it's unexpected, throwing is the right thing to do anyway + return await operation(retries) + .ConfigureAwait(false); + } } From 0fe48fe0523697fa7daa0cd6c8fdbd39e36e7b3b Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Thu, 6 Jun 2024 23:58:24 +0200 Subject: [PATCH 4/6] Async Symphony --- Client/src/Symphony/SessionService.cs | 127 +++++++++++++++++++++----- 1 file changed, 103 insertions(+), 24 deletions(-) diff --git a/Client/src/Symphony/SessionService.cs b/Client/src/Symphony/SessionService.cs index 4053b210..29b3ffd7 100644 --- a/Client/src/Symphony/SessionService.cs +++ b/Client/src/Symphony/SessionService.cs @@ -1,6 +1,6 @@ // This file is part of the ArmoniK project // -// Copyright (C) ANEO, 2021-2023. All rights reserved. +// Copyright (C) ANEO, 2021-2024. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License") // you may not use this file except in compliance with the License. @@ -17,14 +17,19 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; +using System.Threading.Tasks; using ArmoniK.Api.gRPC.V1; using ArmoniK.DevelopmentKit.Client.Common; using ArmoniK.DevelopmentKit.Client.Common.Submitter; using ArmoniK.DevelopmentKit.Common; +using ArmoniK.Utils; using Google.Protobuf.WellKnownTypes; +using JetBrains.Annotations; + using Microsoft.Extensions.Logging; namespace ArmoniK.DevelopmentKit.Client.Symphony; @@ -60,8 +65,7 @@ public override string ToString() /// Default task options /// /// - // TODO: mark with [PublicApi] ? - // ReSharper disable once MemberCanBePrivate.Global + [PublicAPI] public static TaskOptions InitializeDefaultTaskOptions() => new() { @@ -90,13 +94,65 @@ public static TaskOptions InitializeDefaultTaskOptions() /// TaskOptions argument to override default taskOptions in Session. /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker /// + /// + [PublicAPI] + public IAsyncEnumerable SubmitTasksAsync(IEnumerable payloads, + int maxRetries = 5, + TaskOptions? taskOptions = null, + CancellationToken cancellationToken = default) + => SubmitTasksWithDependenciesAsync(payloads.Select(payload => new Tuple>(payload, + Array.Empty())), + maxRetries, + taskOptions, + cancellationToken); + + /// + /// User method to submit task from the client + /// Need a client Service. In case of ServiceContainer + /// channel can be null until the OpenSession is called + /// + /// + /// The user payload list to execute. General used for subTasking. + /// + /// The number of retry before fail to submit task. Default = 5 retries + /// + /// TaskOptions argument to override default taskOptions in Session. + /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker + /// + [PublicAPI] public IEnumerable SubmitTasks(IEnumerable payloads, int maxRetries = 5, TaskOptions? taskOptions = null) - => SubmitTasksWithDependencies(payloads.Select(payload => new Tuple>(payload, - Array.Empty())), - maxRetries, - taskOptions); + => SubmitTasksAsync(payloads, + maxRetries, + taskOptions) + .ToEnumerable(); + + /// + /// User method to submit task from the client + /// + /// + /// The user payload to execute. + /// + /// The number of retry before fail to submit task. Default = 5 retries + /// + /// TaskOptions argument to override default taskOptions in Session. + /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker + /// + /// + [PublicAPI] + public ValueTask SubmitTaskAsync(byte[] payload, + int maxRetries = 5, + TaskOptions? taskOptions = null, + CancellationToken cancellationToken = default) + => SubmitTasksAsync(new[] + { + payload, + }, + maxRetries, + taskOptions, + cancellationToken) + .SingleAsync(cancellationToken); /// /// User method to submit task from the client @@ -109,17 +165,44 @@ public IEnumerable SubmitTasks(IEnumerable payloads, /// TaskOptions argument to override default taskOptions in Session. /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker /// + [PublicAPI] public string SubmitTask(byte[] payload, int maxRetries = 5, TaskOptions? taskOptions = null) - => SubmitTasks(new[] - { - payload, - }, - maxRetries, - taskOptions) - .Single(); + => SubmitTaskAsync(payload, + maxRetries, + taskOptions) + .WaitSync(); + + /// + /// The method to submit One task with dependencies tasks. This task will wait for + /// to start until all dependencies are completed successfully + /// + /// The payload to submit + /// A list of task Id in dependence of this created task + /// The number of retry before fail to submit task. Default = 5 retries + /// + /// TaskOptions argument to override default taskOptions in Session. + /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker + /// + /// + /// return the taskId of the created task + [PublicAPI] + public ValueTask SubmitTaskWithDependenciesAsync(byte[] payload, + IList dependencies, + int maxRetries = 5, + TaskOptions? taskOptions = null, + CancellationToken cancellationToken = default) + => SubmitTasksWithDependenciesAsync(new[] + { + Tuple.Create(payload, + dependencies), + }, + maxRetries, + taskOptions, + cancellationToken) + .SingleAsync(cancellationToken); /// /// The method to submit One task with dependencies tasks. This task will wait for @@ -133,18 +216,14 @@ public string SubmitTask(byte[] payload, /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker /// /// return the taskId of the created task - // TODO: mark with [PublicApi] ? - // ReSharper disable once UnusedMember.Global + [PublicAPI] public string SubmitTaskWithDependencies(byte[] payload, IList dependencies, int maxRetries = 5, TaskOptions? taskOptions = null) - => SubmitTasksWithDependencies(new[] - { - Tuple.Create(payload, - dependencies), - }, - maxRetries, - taskOptions) - .Single(); + => SubmitTaskWithDependenciesAsync(payload, + dependencies, + maxRetries, + taskOptions) + .WaitSync(); } From 915efc4457baa3eb30f51cbc36d01431c35761bb Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Fri, 7 Jun 2024 00:30:49 +0200 Subject: [PATCH 5/6] Async Admin Unified --- .../Services/Admin/AdminMonitoringService.cs | 450 ++++++++++++------ 1 file changed, 309 insertions(+), 141 deletions(-) diff --git a/Client/src/Unified/Services/Admin/AdminMonitoringService.cs b/Client/src/Unified/Services/Admin/AdminMonitoringService.cs index b9577179..9c79d2e7 100644 --- a/Client/src/Unified/Services/Admin/AdminMonitoringService.cs +++ b/Client/src/Unified/Services/Admin/AdminMonitoringService.cs @@ -1,6 +1,6 @@ // This file is part of the ArmoniK project // -// Copyright (C) ANEO, 2021-2023. All rights reserved. +// Copyright (C) ANEO, 2021-2024. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License") // you may not use this file except in compliance with the License. @@ -17,6 +17,9 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; using ArmoniK.Api.gRPC.V1; using ArmoniK.Api.gRPC.V1.Results; @@ -28,12 +31,15 @@ using Grpc.Net.Client; +using JetBrains.Annotations; + using Microsoft.Extensions.Logging; using FilterField = ArmoniK.Api.gRPC.V1.Sessions.FilterField; using Filters = ArmoniK.Api.gRPC.V1.Tasks.Filters; using FiltersAnd = ArmoniK.Api.gRPC.V1.Sessions.FiltersAnd; using FilterStatus = ArmoniK.Api.gRPC.V1.Sessions.FilterStatus; +using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; namespace ArmoniK.DevelopmentKit.Client.Unified.Services.Admin; @@ -75,18 +81,34 @@ public void GetServiceConfiguration() /// mark all tasks in cancel status /// /// the sessionId of the session to cancel - public void CancelSession(string sessionId) + /// + [PublicAPI] + public async ValueTask CancelSessionAsync(string sessionId, + CancellationToken cancellationToken = default) { - using var channel = channelPool_.Get(); - var sessionsClient = new Sessions.SessionsClient(channel); - sessionsClient.CancelSession(new CancelSessionRequest - { - SessionId = sessionId, - }); - Logger.LogDebug("Session cancelled {sessionId}", - sessionId); + await using var channel = await channelPool_.GetAsync(cancellationToken) + .ConfigureAwait(false); + var sessionsClient = new Sessions.SessionsClient(channel); + await sessionsClient.CancelSessionAsync(new CancelSessionRequest + { + SessionId = sessionId, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + Logger?.LogDebug("Session cancelled {sessionId}", + sessionId); } + /// + /// This method can mark the session in status Cancelled and + /// mark all tasks in cancel status + /// + /// the sessionId of the session to cancel + [PublicAPI] + public void CancelSession(string sessionId) + => CancelSessionAsync(sessionId) + .WaitSync(); + /// /// Return the whole list of task of a session /// @@ -98,32 +120,61 @@ public IEnumerable ListAllTasksBySession(string sessionId) /// Return the list of task of a session filtered by status /// /// The list of filtered task + [PublicAPI] public IEnumerable ListTasksBySession(string sessionId, params TaskStatus[] taskStatus) + => ListTasksBySessionAsync(sessionId, + taskStatus) + .ToEnumerable(); + + /// + /// Return the list of task of a session filtered by status + /// + /// The list of filtered task + [PublicAPI] + public IAsyncEnumerable ListTasksBySessionAsync(string sessionId, + CancellationToken cancellationToken = default, + params TaskStatus[] taskStatus) + => ListTasksBySessionAsync(sessionId, + taskStatus, + cancellationToken); + + /// + /// Return the list of task of a session filtered by status + /// + /// The list of filtered task + private async IAsyncEnumerable ListTasksBySessionAsync(string sessionId, + TaskStatus[] taskStatus, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { - using var channel = channelPool_.Get(); - var tasksClient = new Tasks.TasksClient(channel); - - return tasksClient.ListTasks(new Filters - { - Or = - { - taskStatus.Select(status => TasksClientExt.TaskStatusFilter(status, - sessionId)), - }, - }, - new ListTasksRequest.Types.Sort - { - Field = new TaskField + await using var channel = await channelPool_.GetAsync(cancellationToken) + .ConfigureAwait(false); + var tasksClient = new Tasks.TasksClient(channel); + + var tasks = tasksClient.ListTasksAsync(new Filters + { + Or = + { + taskStatus.Select(status => TasksClientExt.TaskStatusFilter(status, + sessionId)), + }, + }, + new ListTasksRequest.Types.Sort { - TaskSummaryField = new TaskSummaryField - { - Field = TaskSummaryEnumField.TaskId, - }, + Field = new TaskField + { + TaskSummaryField = new TaskSummaryField + { + Field = TaskSummaryEnumField.TaskId, + }, + }, + Direction = SortDirection.Asc, }, - Direction = SortDirection.Asc, - }) - .Select(summary => summary.Id); + cancellationToken: cancellationToken); + await foreach (var task in tasks.ConfigureAwait(false)) + { + yield return task.Id; + } } /// @@ -160,12 +211,29 @@ public IEnumerable ListCancelledTasks(string sessionId) /// Return the list of all sessions /// /// The list of filtered session + [PublicAPI] public IEnumerable ListAllSessions() + => ListAllSessionsAsync() + .ToEnumerable(); + + /// + /// Return the list of all sessions + /// + /// The list of filtered session + [PublicAPI] + public async IAsyncEnumerable ListAllSessionsAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - using var channel = channelPool_.Get(); - var sessionsClient = new Sessions.SessionsClient(channel); - return sessionsClient.ListSessions(new ListSessionsRequest()) - .Sessions.Select(session => session.SessionId); + await using var channel = await channelPool_.GetAsync(cancellationToken) + .ConfigureAwait(false); + var sessionsClient = new Sessions.SessionsClient(channel); + var sessions = await sessionsClient.ListSessionsAsync(new ListSessionsRequest(), + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + foreach (var session in sessions.Sessions) + { + yield return session.SessionId; + } } @@ -173,82 +241,116 @@ public IEnumerable ListAllSessions() /// The method is to get a filtered list of running session /// /// returns a list of session filtered + [PublicAPI] public IEnumerable ListRunningSessions() + => ListRunningSessionsAsync() + .ToEnumerable(); + + /// + /// The method is to get a filtered list of running session + /// + /// returns a list of session filtered + [PublicAPI] + public async IAsyncEnumerable ListRunningSessionsAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - using var channel = channelPool_.Get(); - var sessionsClient = new Sessions.SessionsClient(channel); - return sessionsClient.ListSessions(new ListSessionsRequest - { - Filters = new Api.gRPC.V1.Sessions.Filters - { - Or = - { - new FiltersAnd - { - And = - { - new FilterField - { - FilterStatus = new FilterStatus + await using var channel = await channelPool_.GetAsync(cancellationToken) + .ConfigureAwait(false); + var sessionsClient = new Sessions.SessionsClient(channel); + var sessions = await sessionsClient.ListSessionsAsync(new ListSessionsRequest + { + Filters = new Api.gRPC.V1.Sessions.Filters + { + Or = + { + new FiltersAnd + { + And = { - Operator = FilterStatusOperator.Equal, - Value = SessionStatus.Running, + new FilterField + { + FilterStatus = new FilterStatus + { + Operator = FilterStatusOperator.Equal, + Value = SessionStatus.Running, + }, + Field = new SessionField + { + SessionRawField = new SessionRawField + { + Field = SessionRawEnumField.Status, + }, + }, + }, }, - Field = new SessionField - { - SessionRawField = new SessionRawField - { - Field = SessionRawEnumField.Status, - }, - }, - }, - }, - }, - }, - }, - }) - .Sessions.Select(session => session.SessionId); + }, + }, + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + foreach (var session in sessions.Sessions) + { + yield return session.SessionId; + } } /// /// The method is to get a filtered list of running session /// /// returns a list of session filtered + [PublicAPI] public IEnumerable ListCancelledSessions() + => ListCancelledSessionsAsync() + .ToEnumerable(); + + /// + /// The method is to get a filtered list of running session + /// + /// returns a list of session filtered + [PublicAPI] + public async IAsyncEnumerable ListCancelledSessionsAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - using var channel = channelPool_.Get(); - var sessionsClient = new Sessions.SessionsClient(channel); - return sessionsClient.ListSessions(new ListSessionsRequest - { - Filters = new Api.gRPC.V1.Sessions.Filters - { - Or = - { - new FiltersAnd - { - And = - { - new FilterField - { - FilterStatus = new FilterStatus + await using var channel = await channelPool_.GetAsync(cancellationToken) + .ConfigureAwait(false); + var sessionsClient = new Sessions.SessionsClient(channel); + var sessions = await sessionsClient.ListSessionsAsync(new ListSessionsRequest + { + Filters = new Api.gRPC.V1.Sessions.Filters + { + Or = + { + new FiltersAnd + { + And = { - Operator = FilterStatusOperator.Equal, - Value = SessionStatus.Cancelled, + new FilterField + { + FilterStatus = new FilterStatus + { + Operator = FilterStatusOperator.Equal, + Value = SessionStatus.Cancelled, + }, + Field = new SessionField + { + SessionRawField = new SessionRawField + { + Field = SessionRawEnumField.Status, + }, + }, + }, }, - Field = new SessionField - { - SessionRawField = new SessionRawField - { - Field = SessionRawEnumField.Status, - }, - }, - }, - }, - }, - }, - }, - }) - .Sessions.Select(session => session.SessionId); + }, + }, + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + foreach (var session in sessions.Sessions) + { + yield return session.SessionId; + } } /// @@ -281,23 +383,56 @@ public int CountErrorTasksBySession(string sessionId) /// the id of the session /// a variadic list of taskStatus /// return the number of task + [PublicAPI] public int CountTaskBySession(string sessionId, params TaskStatus[] taskStatus) + => CountTaskBySessionAsync(sessionId, + taskStatus) + .WaitSync(); + + /// + /// Count task in a session and select by status + /// + /// the id of the session + /// + /// a variadic list of taskStatus + /// return the number of task + [PublicAPI] + private ValueTask CountTaskBySessionAsync(string sessionId, + CancellationToken cancellationToken = default, + params TaskStatus[] taskStatus) + => CountTaskBySessionAsync(sessionId, + taskStatus, + cancellationToken); + + /// + /// Count task in a session and select by status + /// + /// the id of the session + /// a variadic list of taskStatus + /// + /// return the number of task + private async ValueTask CountTaskBySessionAsync(string sessionId, + TaskStatus[] taskStatus, + CancellationToken cancellationToken = default) { - using var channel = channelPool_.Get(); - var tasksClient = new Tasks.TasksClient(channel); - return tasksClient.CountTasksByStatus(new CountTasksByStatusRequest - { - Filters = new Filters - { - Or = - { - taskStatus.Select(status => TasksClientExt.TaskStatusFilter(status, - sessionId)), - }, - }, - }) - .Status.Sum(count => count.Count); + await using var channel = await channelPool_.GetAsync(cancellationToken) + .ConfigureAwait(false); + var tasksClient = new Tasks.TasksClient(channel); + var counts = await tasksClient.CountTasksByStatusAsync(new CountTasksByStatusRequest + { + Filters = new Filters + { + Or = + { + taskStatus.Select(status => TasksClientExt.TaskStatusFilter(status, + sessionId)), + }, + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + return counts.Status.Sum(count => count.Count); } /// @@ -318,22 +453,36 @@ public int CountCompletedTasksBySession(string sessionId) => CountTaskBySession(sessionId, TaskStatus.Completed); - /// /// Cancel a list of task in a session /// /// the taskIds list to cancel + [PublicAPI] public void CancelTasksBySession(IEnumerable taskIds) + => CancelTasksBySessionAsync(taskIds) + .WaitSync(); + + /// + /// Cancel a list of task in a session + /// + /// the taskIds list to cancel + /// + [PublicAPI] + public async ValueTask CancelTasksBySessionAsync(IEnumerable taskIds, + CancellationToken cancellationToken = default) { - using var channel = channelPool_.Get(); - var tasksClient = new Tasks.TasksClient(channel); - tasksClient.CancelTasks(new CancelTasksRequest - { - TaskIds = - { - taskIds, - }, - }); + await using var channel = await channelPool_.GetAsync(cancellationToken) + .ConfigureAwait(false); + var tasksClient = new Tasks.TasksClient(channel); + await tasksClient.CancelTasksAsync(new CancelTasksRequest + { + TaskIds = + { + taskIds, + }, + }, + cancellationToken: cancellationToken) + .ConfigureAwait(false); } /// @@ -341,30 +490,49 @@ public void CancelTasksBySession(IEnumerable taskIds) /// /// The list of task /// returns a list of pair TaskId/TaskStatus + [PublicAPI] public IEnumerable> GetTaskStatus(IEnumerable taskIds) + => GetTaskStatusAsync(taskIds) + .ToEnumerable(); + + /// + /// The method to get status of a list of tasks + /// + /// The list of task + /// + /// returns a list of pair TaskId/TaskStatus + [PublicAPI] + public async IAsyncEnumerable> GetTaskStatusAsync(IEnumerable taskIds, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { - using var channel = channelPool_.Get(); - var tasksClient = new Tasks.TasksClient(channel); - return tasksClient.ListTasks(new Filters - { - Or = - { - taskIds.Select(TasksClientExt.TaskIdFilter), - }, - }, - new ListTasksRequest.Types.Sort - { - Direction = SortDirection.Asc, - Field = new TaskField + await using var channel = await channelPool_.GetAsync(cancellationToken) + .ConfigureAwait(false); + var tasksClient = new Tasks.TasksClient(channel); + var tasks = tasksClient.ListTasksAsync(new Filters { - TaskSummaryField = new TaskSummaryField - { - Field = TaskSummaryEnumField.TaskId, - }, + Or = + { + taskIds.Select(TasksClientExt.TaskIdFilter), + }, }, - }) - .Select(task => new Tuple(task.Id, - task.Status)); + new ListTasksRequest.Types.Sort + { + Direction = SortDirection.Asc, + Field = new TaskField + { + TaskSummaryField = new TaskSummaryField + { + Field = TaskSummaryEnumField.TaskId, + }, + }, + }, + cancellationToken: cancellationToken); + + await foreach (var task in tasks.ConfigureAwait(false)) + { + yield return new Tuple(task.Id, + task.Status); + } } From 0c7d21f92f86b9f2aab5407afc499d6bc5b682b7 Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Fri, 7 Jun 2024 01:06:40 +0200 Subject: [PATCH 6/6] Async Unified --- Client/src/Unified/Services/SessionService.cs | 134 ++++++++++++++---- 1 file changed, 110 insertions(+), 24 deletions(-) diff --git a/Client/src/Unified/Services/SessionService.cs b/Client/src/Unified/Services/SessionService.cs index 14417e6c..794d73d2 100644 --- a/Client/src/Unified/Services/SessionService.cs +++ b/Client/src/Unified/Services/SessionService.cs @@ -1,6 +1,6 @@ // This file is part of the ArmoniK project // -// Copyright (C) ANEO, 2021-2023. All rights reserved. +// Copyright (C) ANEO, 2021-2024. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License") // you may not use this file except in compliance with the License. @@ -18,14 +18,18 @@ using System.Collections.Generic; using System.Linq; using System.Threading; +using System.Threading.Tasks; using ArmoniK.Api.gRPC.V1; using ArmoniK.DevelopmentKit.Client.Common; using ArmoniK.DevelopmentKit.Client.Common.Submitter; using ArmoniK.DevelopmentKit.Common; +using ArmoniK.Utils; using Google.Protobuf.WellKnownTypes; +using JetBrains.Annotations; + using Microsoft.Extensions.Logging; namespace ArmoniK.DevelopmentKit.Client.Unified.Services; @@ -96,13 +100,39 @@ public static TaskOptions InitializeDefaultTaskOptions() /// TaskOptions argument to override default taskOptions in Session. /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker /// + [PublicAPI] public IEnumerable SubmitTasks(IEnumerable payloads, int maxRetries = 5, TaskOptions? taskOptions = null) - => SubmitTasksWithDependencies(payloads.Select(payload => new Tuple>(payload, - Array.Empty())), - maxRetries, - taskOptions); + => SubmitTasksAsync(payloads, + maxRetries, + taskOptions) + .ToEnumerable(); + + /// + /// User method to submit task from the client + /// Need a client Service. In case of ServiceContainer + /// submitterService can be null until the OpenSession is called + /// + /// + /// The user payload list to execute. General used for subTasking. + /// + /// The number of retry before fail to submit task. Default = 5 retries + /// + /// TaskOptions argument to override default taskOptions in Session. + /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker + /// + /// + [PublicAPI] + public IAsyncEnumerable SubmitTasksAsync(IEnumerable payloads, + int maxRetries = 5, + TaskOptions? taskOptions = null, + CancellationToken cancellationToken = default) + => SubmitTasksWithDependenciesAsync(payloads.Select(payload => new Tuple>(payload, + Array.Empty())), + maxRetries, + taskOptions, + cancellationToken); /// /// User method to submit task from the client @@ -116,22 +146,53 @@ public IEnumerable SubmitTasks(IEnumerable payloads, /// TaskOptions argument to override default taskOptions in Session. /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker /// + [PublicAPI] public string SubmitTask(byte[] payload, int waitTimeBeforeNextSubmit = 2, int maxRetries = 5, TaskOptions? taskOptions = null) - { - // TODO: wtf? - Thread.Sleep(waitTimeBeforeNextSubmit); // Twice the keep alive - return SubmitTasks(new[] - { - payload, - }, + => SubmitTaskAsync(payload, + waitTimeBeforeNextSubmit, maxRetries, taskOptions) - .Single(); - } + .WaitSync(); + + /// + /// User method to submit task from the client + /// + /// + /// The user payload to execute. + /// + /// The time to wait before 2 single submitTask + /// The number of retry before fail to submit task. Default = 5 retries + /// + /// TaskOptions argument to override default taskOptions in Session. + /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker + /// + /// + [PublicAPI] + public async ValueTask SubmitTaskAsync(byte[] payload, + int waitTimeBeforeNextSubmit = 2, + int maxRetries = 5, + TaskOptions? taskOptions = null, + CancellationToken cancellationToken = default) + { + // TODO: wtf? + // Twice the keep alive + await Task.Delay(waitTimeBeforeNextSubmit, + cancellationToken) + .ConfigureAwait(false); + return await SubmitTasksAsync(new[] + { + payload, + }, + maxRetries, + taskOptions, + cancellationToken) + .SingleAsync(cancellationToken) + .ConfigureAwait(false); + } /// /// The method to submit One task with dependencies tasks. This task will wait for @@ -145,18 +206,43 @@ public string SubmitTask(byte[] payload, /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker /// /// return the taskId of the created task - // TODO: mark with [PublicApi] ? - // ReSharper disable once UnusedMember.Global + [PublicAPI] public string SubmitTaskWithDependencies(byte[] payload, IList dependencies, int maxRetries = 5, TaskOptions? taskOptions = null) - => SubmitTasksWithDependencies(new[] - { - Tuple.Create(payload, - dependencies), - }, - maxRetries, - taskOptions) - .Single(); + => SubmitTaskWithDependenciesAsync(payload, + dependencies, + maxRetries, + taskOptions) + .WaitSync(); + + /// + /// The method to submit One task with dependencies tasks. This task will wait for + /// to start until all dependencies are completed successfully + /// + /// The payload to submit + /// A list of task Id in dependence of this created task + /// The number of retry before fail to submit task. Default = 5 retries + /// + /// TaskOptions argument to override default taskOptions in Session. + /// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker + /// + /// + /// return the taskId of the created task + [PublicAPI] + public ValueTask SubmitTaskWithDependenciesAsync(byte[] payload, + IList dependencies, + int maxRetries = 5, + TaskOptions? taskOptions = null, + CancellationToken cancellationToken = default) + => SubmitTasksWithDependenciesAsync(new[] + { + Tuple.Create(payload, + dependencies), + }, + maxRetries, + taskOptions, + cancellationToken) + .SingleAsync(cancellationToken); }