diff --git a/Client/src/Common/Submitter/BaseClientSubmitter.cs b/Client/src/Common/Submitter/BaseClientSubmitter.cs
index 26749b46..525ff0dc 100644
--- a/Client/src/Common/Submitter/BaseClientSubmitter.cs
+++ b/Client/src/Common/Submitter/BaseClientSubmitter.cs
@@ -34,6 +34,7 @@
using ArmoniK.DevelopmentKit.Client.Common.Status;
using ArmoniK.DevelopmentKit.Common;
using ArmoniK.DevelopmentKit.Common.Exceptions;
+using ArmoniK.DevelopmentKit.Common.Utils;
using ArmoniK.Utils;
using Google.Protobuf;
@@ -301,23 +302,18 @@ public Output GetTaskOutputInfo(string taskId)
/// The result ids must first be created using
/// return a list of taskIds of the created tasks
[PublicAPI]
- public async IAsyncEnumerable SubmitTasksWithDependenciesAsync(IEnumerable>> payloadsWithDependencies,
- int maxRetries = 5,
- TaskOptions? taskOptions = null,
- [EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- foreach (var chunk in payloadsWithDependencies.ToChunks(chunkSubmitSize_))
- {
- await foreach (var taskIds in ChunkSubmitTasksWithDependenciesAsync(chunk,
- maxRetries,
- taskOptions ?? TaskOptions,
- cancellationToken)
- .ConfigureAwait(false))
- {
- yield return taskIds;
- }
- }
- }
+ public IAsyncEnumerable SubmitTasksWithDependenciesAsync(IEnumerable>> payloadsWithDependencies,
+ int maxRetries = 5,
+ TaskOptions? taskOptions = null,
+ CancellationToken cancellationToken = default)
+ => payloadsWithDependencies.ToChunks(chunkSubmitSize_)
+ .ToAsyncEnumerable()
+ .SelectMany(chunk => ChunkSubmitTasksWithDependenciesAsync(chunk.Select(tuple => ((string?)tuple.Item1, tuple.Item2, tuple.Item3,
+ (TaskOptions?)null)),
+ maxRetries,
+ taskOptions,
+ cancellationToken))
+ .Select(task => task.taskId);
///
/// The method to submit several tasks with dependencies tasks. This task will wait for
@@ -356,33 +352,18 @@ public IEnumerable SubmitTasksWithDependencies(IEnumerable
/// return a list of taskIds of the created tasks
[PublicAPI]
- public async IAsyncEnumerable SubmitTasksWithDependenciesAsync(IEnumerable>> payloadsWithDependencies,
- int maxRetries = 5,
- TaskOptions? taskOptions = null,
- [EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- foreach (var chunk in payloadsWithDependencies.ToChunks(chunkSubmitSize_))
- {
- var resultsMetadata = await CreateResultsMetadataAsync(Enumerable.Range(0,
- chunk.Length)
- .Select(_ => Guid.NewGuid()
- .ToString()),
- cancellationToken)
- .ConfigureAwait(false);
- var tasks = ChunkSubmitTasksWithDependenciesAsync(chunk.Zip(resultsMetadata,
- (payloadWithDependencies,
- metadata) => Tuple.Create(metadata.Value,
- payloadWithDependencies.Item1,
- payloadWithDependencies.Item2)),
- maxRetries,
- taskOptions ?? TaskOptions,
- cancellationToken);
- await foreach (var taskId in tasks.ConfigureAwait(false))
- {
- yield return taskId;
- }
- }
- }
+ public IAsyncEnumerable SubmitTasksWithDependenciesAsync(IEnumerable>> payloadsWithDependencies,
+ int maxRetries = 5,
+ TaskOptions? taskOptions = null,
+ CancellationToken cancellationToken = default)
+ => payloadsWithDependencies.ToChunks(chunkSubmitSize_)
+ .ToAsyncEnumerable()
+ .SelectMany(chunk => ChunkSubmitTasksWithDependenciesAsync(chunk.Select(tuple => ((string?)null, tuple.Item1, tuple.Item2,
+ (TaskOptions?)null)),
+ maxRetries,
+ taskOptions,
+ cancellationToken))
+ .Select(task => task.taskId);
///
/// The method to submit several tasks with dependencies tasks. This task will wait for
@@ -420,191 +401,228 @@ public IEnumerable SubmitTasksWithDependencies(IEnumerable
///
/// return the ids of the created tasks
- private async IAsyncEnumerable ChunkSubmitTasksWithDependenciesAsync(IEnumerable>> payloadsWithDependencies,
- int maxRetries,
- TaskOptions? taskOptions = null,
- [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ public async IAsyncEnumerable<(string taskId, string resultId)> ChunkSubmitTasksWithDependenciesAsync(
+ IEnumerable<(string?, byte[], IList, TaskOptions?)> payloadsWithDependencies,
+ int maxRetries = 5,
+ TaskOptions? taskOptions = null,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
using var _ = Logger.LogFunction();
- var tasks = new List();
+ var taskProperties = new List<(Either, int, bool, IList, TaskOptions?)>();
+ var smallPayloadProperties = new List();
+ var largePayloadProperties = new List<(byte[], int)>();
+ var nbResults = 0;
- foreach (var (resultId, payload, dependencies) in payloadsWithDependencies)
+ foreach (var (resultId, payload, dependencies, specificTaskOptions) in payloadsWithDependencies)
{
- for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++)
+ Either result;
+ if (resultId is null)
{
- await using var channel = await ChannelPool.GetAsync(cancellationToken)
- .ConfigureAwait(false);
- var resultsClient = new Results.ResultsClient(channel);
-
- try
- {
- // todo: migrate to ArmoniK.Api
- string payloadId;
- if (payload.Length > configuration_)
- {
- var response = await resultsClient.CreateResultsMetaDataAsync(new CreateResultsMetaDataRequest
- {
- SessionId = SessionId.Id,
- Results =
- {
- new CreateResultsMetaDataRequest.Types.ResultCreate(),
- },
- },
- cancellationToken: cancellationToken)
- .ConfigureAwait(false);
- payloadId = response.Results.Select(raw => raw.ResultId)
- .Single();
-
- await resultsClient.UploadResultData(SessionId.Id,
- payloadId,
- payload);
- }
- else
- {
- var response = await resultsClient.CreateResultsAsync(new CreateResultsRequest
- {
- SessionId = SessionId.Id,
- Results =
- {
- new CreateResultsRequest.Types.ResultCreate
- {
- Data = UnsafeByteOperations.UnsafeWrap(payload),
- },
- },
- },
- cancellationToken: cancellationToken)
- .ConfigureAwait(false);
- payloadId = response.Results.Select(raw => raw.ResultId)
- .Single();
- }
-
-
- tasks.Add(new SubmitTasksRequest.Types.TaskCreation
- {
- PayloadId = payloadId,
- DataDependencies =
- {
- dependencies,
- },
- ExpectedOutputKeys =
- {
- resultId,
- },
- });
- // break retry loop because submission is successful
- break;
- }
- catch (Exception e)
- {
- if (nbRetry >= maxRetries - 1)
- {
- throw;
- }
-
- var innerException = e is AggregateException
- {
- InnerExceptions.Count: 1,
- } agg
- ? agg.InnerException
- : e;
-
- switch (innerException)
- {
- case RpcException:
- case IOException:
- Logger.LogWarning(innerException,
- "Failure to submit : Retrying");
- break;
- default:
- Logger.LogError(innerException,
- "Unknown failure");
- throw;
- }
-
- if (nbRetry > 0)
- {
- Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit task associated to {resultId}",
- nbRetry,
- maxRetries,
- resultId);
- }
- }
+ result = nbResults;
+ nbResults += 1;
}
- }
-
- foreach (var taskChunk in tasks.ToChunks(100))
- {
- if (taskChunk.Length == 0)
+ else
{
- continue;
+ result = resultId;
}
- SubmitTasksResponse? submitTasksResponse = null;
- for (var nbRetry = 0; nbRetry < maxRetries; nbRetry++)
+ int payloadIndex;
+ bool isLarge;
+ if (payload.Length > configuration_)
{
- await using var channel = await ChannelPool.GetAsync(cancellationToken)
- .ConfigureAwait(false);
- var tasksClient = new Tasks.TasksClient(channel);
+ payloadIndex = largePayloadProperties.Count;
+ largePayloadProperties.Add((payload, nbResults));
+ nbResults += 1;
+ isLarge = true;
+ }
+ else
+ {
+ payloadIndex = smallPayloadProperties.Count;
+ smallPayloadProperties.Add(payload);
+ isLarge = false;
+ }
- try
- {
- submitTasksResponse = await tasksClient.SubmitTasksAsync(new SubmitTasksRequest
- {
- TaskOptions = taskOptions,
- SessionId = SessionId.Id,
- TaskCreations =
- {
- taskChunk,
- },
- },
- cancellationToken: cancellationToken)
- .ConfigureAwait(false);
+ taskProperties.Add((result, payloadIndex, isLarge, dependencies, specificTaskOptions));
+ }
- // break retry loop because submission is successful
- break;
- }
- catch (Exception e)
- {
- if (nbRetry >= maxRetries - 1)
- {
- throw;
- }
+ var uploadSmallPayloads = smallPayloadProperties.ParallelSelect(new ParallelTaskOptions(properties_.MaxParallelChannels,
+ cancellationToken),
+ payload => Retry.WhileException(maxRetries,
+ 2000,
+ async _ =>
+ {
+ await using var channel =
+ await ChannelPool.GetAsync(cancellationToken)
+ .ConfigureAwait(false);
+ var resultClient = new Results.ResultsClient(channel);
+ var response = await resultClient
+ .CreateResultsAsync(new CreateResultsRequest
+ {
+ SessionId =
+ SessionId.Id,
+ Results =
+ {
+ new
+ CreateResultsRequest.
+ Types.ResultCreate
+ {
+ Data =
+ UnsafeByteOperations
+ .UnsafeWrap(payload),
+ },
+ },
+ },
+ cancellationToken:
+ cancellationToken)
+ .ConfigureAwait(false);
+
+ return response.Results.Single()
+ .ResultId;
+ },
+ true,
+ Logger,
+ cancellationToken,
+ typeof(IOException),
+ typeof(RpcException))
+ .AsTask())
+ .ToListAsync(cancellationToken);
+
+ var createResultMetadata = Retry.WhileException(maxRetries,
+ 2000,
+ async _ =>
+ {
+ await using var channel = await ChannelPool.GetAsync(cancellationToken)
+ .ConfigureAwait(false);
+ var resultClient = new Results.ResultsClient(channel);
+ var response = await resultClient.CreateResultsMetaDataAsync(new CreateResultsMetaDataRequest
+ {
+ SessionId = SessionId.Id,
+ Results =
+ {
+ Enumerable.Range(0,
+ nbResults)
+ .Select(_ => new
+ CreateResultsMetaDataRequest.
+ Types.ResultCreate()),
+ },
+ },
+ cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+
+ return response.Results.Select(result => result.ResultId)
+ .AsIList();
+ },
+ true,
+ Logger,
+ cancellationToken,
+ typeof(IOException),
+ typeof(RpcException))
+ .AsTask();
+
+
+ var uploadLargePayloads = largePayloadProperties.ParallelForEach(new ParallelTaskOptions(properties_.MaxParallelChannels,
+ cancellationToken),
+ async payload =>
+ {
+ var results = await createResultMetadata.ConfigureAwait(false);
+
+ await Retry.WhileException(maxRetries,
+ 2000,
+ async _ =>
+ {
+ var resultId = results[payload.Item2];
+ await using var channel =
+ await ChannelPool.GetAsync(cancellationToken)
+ .ConfigureAwait(false);
+ var resultClient = new Results.ResultsClient(channel);
+
+ await resultClient.UploadResultData(SessionId.Id,
+ resultId,
+ payload.Item1)
+ .ConfigureAwait(false);
+ },
+ true,
+ Logger,
+ cancellationToken,
+ typeof(IOException),
+ typeof(RpcException))
+ .ConfigureAwait(false);
+ });
+
+ var results = await createResultMetadata.ConfigureAwait(false);
+ var smallPayloads = await uploadSmallPayloads.ConfigureAwait(false);
+
+ var tasks = taskProperties.Select(tuple =>
+ {
+ var (result, payloadIndex, isLarge, dependencies, specificTaskOptions) = tuple;
+ var resultId = (string?)result ?? results[(int)result]!;
- var innerException = e is AggregateException
- {
- InnerExceptions.Count: 1,
- } agg
- ? agg.InnerException
- : e;
+ var payloadId = isLarge
+ ? results[largePayloadProperties[payloadIndex]
+ .Item2]
+ : smallPayloads[payloadIndex];
- switch (innerException)
- {
- case RpcException:
- case IOException:
- Logger.LogWarning(innerException,
- "Failure to submit : Retrying");
- break;
- default:
- Logger.LogError(innerException,
- "Unknown failure");
- throw;
- }
+ return new SubmitTasksRequest.Types.TaskCreation
+ {
+ PayloadId = payloadId,
+ DataDependencies =
+ {
+ dependencies,
+ },
+ ExpectedOutputKeys =
+ {
+ resultId,
+ },
+ TaskOptions = specificTaskOptions,
+ };
+ })
+ .AsIList();
+
+ var taskSubmit = tasks.ToChunks(100)
+ .ParallelSelect(new ParallelTaskOptions(1,
+ cancellationToken),
+ async taskChunk =>
+ {
+ var response = await Retry.WhileException(maxRetries,
+ 2000,
+ async _ =>
+ {
+ await using var channel = await ChannelPool.GetAsync(cancellationToken)
+ .ConfigureAwait(false);
+ var taskClient = new Tasks.TasksClient(channel);
+
+ return await taskClient.SubmitTasksAsync(new SubmitTasksRequest
+ {
+ TaskOptions = taskOptions,
+ SessionId = SessionId.Id,
+ TaskCreations =
+ {
+ taskChunk,
+ },
+ },
+ cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+ },
+ true,
+ Logger,
+ cancellationToken,
+ typeof(IOException),
+ typeof(RpcException))
+ .ConfigureAwait(false);
- if (nbRetry > 0)
- {
- Logger.LogWarning("{retry}/{maxRetries} nbRetry to submit tasks",
- nbRetry,
- maxRetries);
- }
- }
- }
+ return response.TaskInfos.Select(task => (task.TaskId, task.ExpectedOutputIds.Single()));
+ });
- foreach (var taskInfo in submitTasksResponse!.TaskInfos)
+ await foreach (var taskChunk in taskSubmit.ConfigureAwait(false))
+ {
+ foreach (var task in taskChunk)
{
- yield return taskInfo.TaskId;
+ yield return task;
}
}
+
+ await uploadLargePayloads.ConfigureAwait(false);
}
///
@@ -761,7 +779,9 @@ public async ValueTask GetResultStatusAsync(IEnumerable<
nameof(Results.ResultsClient.GetResult));
return await result2TaskDic.Keys.ToChunks(100)
- .ParallelSelect(async chunk =>
+ .ParallelSelect(new ParallelTaskOptions(properties_.MaxParallelChannels,
+ cancellationToken),
+ async chunk =>
{
await using var channel = await ChannelPool.GetAsync(cancellationToken)
.ConfigureAwait(false);
diff --git a/Client/src/Unified/ArmoniK.DevelopmentKit.Client.Unified.csproj b/Client/src/Unified/ArmoniK.DevelopmentKit.Client.Unified.csproj
index d562a242..5f6308e5 100644
--- a/Client/src/Unified/ArmoniK.DevelopmentKit.Client.Unified.csproj
+++ b/Client/src/Unified/ArmoniK.DevelopmentKit.Client.Unified.csproj
@@ -11,7 +11,6 @@
-
diff --git a/Client/src/Unified/Services/Submitter/BlockRequest.cs b/Client/src/Unified/IsExternalInit.cs
similarity index 53%
rename from Client/src/Unified/Services/Submitter/BlockRequest.cs
rename to Client/src/Unified/IsExternalInit.cs
index aa47ef97..a0c89d77 100644
--- a/Client/src/Unified/Services/Submitter/BlockRequest.cs
+++ b/Client/src/Unified/IsExternalInit.cs
@@ -14,25 +14,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-using System;
-using System.Threading;
-using ArmoniK.Api.gRPC.V1;
-using ArmoniK.DevelopmentKit.Client.Common;
-using ArmoniK.DevelopmentKit.Common;
+#if NETFRAMEWORK || NETSTANDARD
+// This type is required to use initializers when compiling to framework
+namespace System.Runtime.CompilerServices;
-namespace ArmoniK.DevelopmentKit.Client.Unified.Services.Submitter;
-
-internal class BlockRequest
+internal static class IsExternalInit
{
- public IServiceInvocationHandler Handler;
-
- public ArmonikPayload? Payload { get; set; }
-
- public SemaphoreSlim Lock { get; set; }
- public Guid SubmitId { get; set; }
-
- public string ResultId { get; set; }
- public int MaxRetries { get; set; } = 5;
- public TaskOptions TaskOptions { get; set; }
}
+#endif
diff --git a/Client/src/Unified/Services/Submitter/BatchUntilInactiveBlock.cs b/Client/src/Unified/Services/Submitter/BatchUntilInactiveBlock.cs
deleted file mode 100644
index 7ade944d..00000000
--- a/Client/src/Unified/Services/Submitter/BatchUntilInactiveBlock.cs
+++ /dev/null
@@ -1,196 +0,0 @@
-// This file is part of the ArmoniK project
-//
-// Copyright (C) ANEO, 2021-2023. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License")
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-using System;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-using System.Threading.Tasks.Dataflow;
-
-namespace ArmoniK.DevelopmentKit.Client.Unified.Services.Submitter;
-
-///
-/// Provides a dataFlow block that batches inputs into arrays.
-/// A batch is produced when the number of currently queued items becomes equal
-/// to BatchSize, or when a Timeout period has elapsed after receiving the last item.
-///
-public class BatchUntilInactiveBlock : IPropagatorBlock, IReceivableSourceBlock
-{
- private readonly ExecutionDataflowBlockOptions executionDataFlowBlockOptions_;
- private readonly BatchBlock source_;
- private readonly TransformBlock timeoutTransformBlock_;
- private readonly Timer timer_;
-
- ///
- /// The buffer construct base on the number of request in the buffer
- /// Be aware that buffer should be T sized for network B/W
- ///
- ///
- /// Time out before the next submit call
- ///
- /// Parameters to control execution for each block in pipeline
- /// Options to configure message.
- /// https://learn.microsoft.com/fr-fr/dotnet/api/system.threading.tasks.dataflow.executiondataflowblockoptions?view=net-6.0
- ///
- public BatchUntilInactiveBlock(int bufferRequestsSize,
- TimeSpan timeout,
- ExecutionDataflowBlockOptions? executionDataFlowBlockOptions = null)
- {
- executionDataFlowBlockOptions_ = executionDataFlowBlockOptions ?? new ExecutionDataflowBlockOptions
- {
- BoundedCapacity = 1,
- MaxDegreeOfParallelism = 1,
- EnsureOrdered = true,
- };
-
- source_ = new BatchBlock(bufferRequestsSize,
- new GroupingDataflowBlockOptions
- {
- BoundedCapacity = bufferRequestsSize,
- EnsureOrdered = true,
- });
-
- timer_ = new Timer(_ =>
- {
- source_.TriggerBatch();
- },
- null,
- timeout,
- System.Threading.Timeout.InfiniteTimeSpan);
-
- timeoutTransformBlock_ = new TransformBlock(value =>
- {
- timer_.Change(timeout,
- System.Threading.Timeout.InfiniteTimeSpan);
-
- return value;
- },
- executionDataFlowBlockOptions_);
-
- source_.LinkTo(timeoutTransformBlock_,
- new DataflowLinkOptions
- {
- PropagateCompletion = true,
- });
-
- Timeout = timeout;
- }
-
- ///
- /// Simple Getter to return size of batch in the pipeline
- ///
- public int BatchSize
- => source_.BatchSize;
-
- ///
- /// Return the TimeSpan timer set in the constructor
- ///
- private TimeSpan Timeout { get; }
-
- ///
- public Task Completion
- => source_.Completion;
-
-
- ///
- public void Complete()
- => source_.Complete();
-
- ///
- public void Fault(Exception exception)
- => ((IDataflowBlock)source_).Fault(exception);
-
-
- ///
- public IDisposable LinkTo(ITargetBlock target,
- DataflowLinkOptions linkOptions)
- => timeoutTransformBlock_.LinkTo(target,
- linkOptions);
-
-
- ///
- public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
- T messageValue,
- ISourceBlock source,
- bool consumeToAccept)
- {
- var offerResult = ((ITargetBlock)source_).OfferMessage(messageHeader,
- messageValue,
- source,
- consumeToAccept);
-
- if (offerResult == DataflowMessageStatus.Accepted)
- {
- timer_.Change(Timeout,
- System.Threading.Timeout.InfiniteTimeSpan);
- }
-
- return offerResult;
- }
-
- ///
- public T[] ConsumeMessage(DataflowMessageHeader messageHeader,
- ITargetBlock target,
- out bool messageConsumed)
- => ((ISourceBlock)source_).ConsumeMessage(messageHeader,
- target,
- out messageConsumed);
-
- ///
- public bool ReserveMessage(DataflowMessageHeader messageHeader,
- ITargetBlock target)
- => ((ISourceBlock)source_).ReserveMessage(messageHeader,
- target);
-
- ///
- public void ReleaseReservation(DataflowMessageHeader messageHeader,
- ITargetBlock target)
- => ((ISourceBlock)source_).ReleaseReservation(messageHeader,
- target);
-
- ///
- public bool TryReceive(Predicate filter,
- out T[] item)
- => source_.TryReceive(filter,
- out item);
-
- ///
- public bool TryReceiveAll(out IList items)
- => source_.TryReceiveAll(out items);
-
- ///
- /// Create an ActionBlock with a delegated function to execute
- /// at the end of pipeline
- ///
- /// the method to call
- public void ExecuteAsync(Action action)
- {
- var actBlock = new ActionBlock(action,
- executionDataFlowBlockOptions_);
-
- timeoutTransformBlock_.LinkTo(actBlock,
- new DataflowLinkOptions
- {
- PropagateCompletion = true,
- });
- }
-
- ///
- /// Trigger the batch even if it doesn't criteria to submit
- ///
- public void TriggerBatch()
- => source_.TriggerBatch();
-}
diff --git a/Client/src/Unified/Services/Submitter/Service.cs b/Client/src/Unified/Services/Submitter/Service.cs
index 8dfa34c4..a1130831 100644
--- a/Client/src/Unified/Services/Submitter/Service.cs
+++ b/Client/src/Unified/Services/Submitter/Service.cs
@@ -19,8 +19,8 @@
using System.IO;
using System.Linq;
using System.Threading;
+using System.Threading.Channels;
using System.Threading.Tasks;
-using System.Threading.Tasks.Dataflow;
using ArmoniK.Api.Common.Utils;
using ArmoniK.Api.gRPC.V1;
@@ -52,11 +52,6 @@ namespace ArmoniK.DevelopmentKit.Client.Unified.Services.Submitter;
[MarkDownDoc]
public class Service : AbstractClientService, ISubmitterService
{
- private readonly RequestTaskMap requestTaskMap_ = new();
-
-
- private readonly SemaphoreSlim semaphoreSlim_;
-
///
/// The default constructor to open connection with the control plane
/// and create the session to ArmoniK
@@ -67,156 +62,74 @@ public Service(Properties properties,
ILoggerFactory? loggerFactory = null)
: base(loggerFactory)
{
- var timeOutSending = properties.TimeTriggerBuffer ?? TimeSpan.FromSeconds(1);
-
- var maxTasksPerBuffer = properties.MaxTasksPerBuffer;
+ SessionServiceFactory = new SessionServiceFactory(LoggerFactory);
+ SessionService = SessionServiceFactory.CreateSession(properties);
+ CancellationResultTaskSource = new CancellationTokenSource();
+ Logger = LoggerFactory.CreateLogger();
+ Logger.BeginPropertyScope(("SessionId", SessionService.SessionId),
+ ("Class", "Service"));
- semaphoreSlim_ = new SemaphoreSlim(properties.MaxConcurrentBuffers * maxTasksPerBuffer);
+ var submitChannel = Channel.CreateUnbounded();
+ SubmitChannel = submitChannel.Writer;
+
+ var cancellationToken = CancellationResultTaskSource.Token;
+ var requests = submitChannel.Reader.ToAsyncEnumerable(cancellationToken);
+
+ SubmitTask = Task.Run(() => requests.ToChunksAsync(properties.MaxTasksPerBuffer,
+ properties.TimeTriggerBuffer ?? TimeSpan.FromSeconds(1),
+ cancellationToken)
+ .ParallelForEach(new ParallelTaskOptions(properties.MaxConcurrentBuffers,
+ cancellationToken),
+ async chunk =>
+ {
+ Logger?.LogInformation("Submitting batch of {NbTask}/{MaxTask}",
+ chunk.Length,
+ properties.MaxTasksPerBuffer);
+
+ List<(string taskId, string resultId)> tasks;
+ try
+ {
+ var taskRequests = chunk.Select(req => ((string?)null, req.Payload, req.Dependencies, req.TaskOptions));
+ var response = SessionService.ChunkSubmitTasksWithDependenciesAsync(taskRequests,
+ cancellationToken: cancellationToken);
+ tasks = await response.ToListAsync(cancellationToken)
+ .ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ Logger?.LogError(e,
+ "Error while submitting tasks");
+ foreach (var req in chunk)
+ {
+ req.Tcs.SetException(e);
+ }
- SessionServiceFactory = new SessionServiceFactory(LoggerFactory);
+ return;
+ }
- SessionService = SessionServiceFactory.CreateSession(properties);
+ foreach (var (task, submission) in tasks.Zip(chunk,
+ (s,
+ submission) => (s, submission)))
+ {
+ ResultHandlerDictionary[task.taskId] = submission.Handler;
+ submission.Tcs.SetResult(task.taskId);
+ }
+ }));
- CancellationResultTaskSource = new CancellationTokenSource();
HandlerResponse = Task.Run(ResultTask,
CancellationResultTaskSource.Token);
-
- Logger = LoggerFactory.CreateLogger();
- Logger.BeginPropertyScope(("SessionId", SessionService.SessionId),
- ("Class", "Service"));
-
- BufferSubmit = new BatchUntilInactiveBlock(maxTasksPerBuffer,
- timeOutSending,
- new ExecutionDataflowBlockOptions
- {
- BoundedCapacity = properties.MaxParallelChannels,
- MaxDegreeOfParallelism = properties.MaxParallelChannels,
- });
-
- BufferSubmit.ExecuteAsync(blockRequests =>
- {
- var blockRequestList = blockRequests.ToList();
-
- try
- {
- if (blockRequestList.Count == 0)
- {
- return;
- }
-
- Logger.LogInformation("Submitting buffer of {count} task...",
- blockRequestList.Count);
- var query = blockRequestList.GroupBy(blockRequest => blockRequest.TaskOptions);
-
- foreach (var groupBlockRequest in query)
- {
- var maxRetries = groupBlockRequest.First()
- .MaxRetries;
- //Generate resultId
- var resultsIds = SessionService.CreateResultsMetadata(groupBlockRequest.Select(_ => Guid.NewGuid()
- .ToString()))
- .Values.ToList();
-
- foreach (var (request, index) in groupBlockRequest.Select((r,
- i) => (r, i)))
- {
- request.ResultId = resultsIds[index];
- }
-
- var currentBackoff = properties.RetryInitialBackoff;
- for (var retry = 0; retry < maxRetries; retry++)
- {
- try
- {
- var taskIds =
- SessionService.SubmitTasksWithDependencies(groupBlockRequest.Select(x => new Tuple>(x.ResultId,
- x.Payload!
- .Serialize(),
- Array
- .Empty<
- string>())),
- 1,
- groupBlockRequest.First()
- .TaskOptions);
-
-
- var ids = taskIds.ToList();
- var mapTaskResults = SessionService.GetResultIds(ids);
- var taskIdsResultIds = mapTaskResults.ToDictionary(result => result.ResultIds.Single(),
- result => result.TaskId);
-
-
- foreach (var pairTaskIdResultId in taskIdsResultIds)
- {
- var blockRequest = groupBlockRequest.FirstOrDefault(x => x.ResultId == pairTaskIdResultId.Key) ??
- throw new InvalidOperationException($"Cannot find BlockRequest with result id {pairTaskIdResultId.Value}");
-
- ResultHandlerDictionary[pairTaskIdResultId.Value] = blockRequest.Handler;
-
- requestTaskMap_.PutResponse(blockRequest.SubmitId,
- pairTaskIdResultId.Value);
- }
-
- if (ids.Count > taskIdsResultIds.Count)
- {
- Logger.LogWarning("Fail to submit all tasks at once, retry with missing tasks");
-
- throw new Exception("Fail to submit all tasks at once. Retrying...");
- }
-
- break;
- }
- catch (Exception e)
- {
- if (retry >= maxRetries - 1)
- {
- Logger.LogError(e,
- "Fail to retry {count} times of submission. Stop trying to submit",
- maxRetries);
- throw;
- }
-
- Logger?.LogWarning(e,
- "Fail to submit, {retry}/{maxRetries} retrying",
- retry + 1,
- maxRetries);
-
- //Delay before submission
- Task.Delay(currentBackoff)
- .Wait();
- currentBackoff = TimeSpan.FromSeconds(Math.Min(currentBackoff.TotalSeconds * properties.RetryBackoffMultiplier,
- properties.RetryMaxBackoff.TotalSeconds));
- }
- }
-
- foreach (var blockRequest in groupBlockRequest)
- {
- blockRequest.Lock.Release();
- }
- }
- }
- catch (Exception e)
- {
- Logger.LogError(e,
- "Fail to submit buffer with {count} tasks inside",
- blockRequestList.Count);
-
- requestTaskMap_.BufferFailures(blockRequestList.Select(block => block.SubmitId),
- e);
- }
- });
}
+ private Task SubmitTask { get; }
+ private ChannelWriter SubmitChannel { get; }
+
///
/// Property Get the SessionId
///
[PublicAPI]
public SessionService SessionService { get; }
-
- private BatchUntilInactiveBlock BufferSubmit { get; }
-
private ILogger Logger { get; }
private SessionServiceFactory SessionServiceFactory { get; }
@@ -299,9 +212,22 @@ public async Task SubmitAsync(string methodName,
public override void Dispose()
{
CancellationResultTaskSource.Cancel();
- HandlerResponse.Wait();
- HandlerResponse.Dispose();
- semaphoreSlim_.Dispose();
+
+ foreach (var awaitable in new[]
+ {
+ HandlerResponse,
+ SubmitTask,
+ })
+ {
+ try
+ {
+ awaitable.WaitSync();
+ awaitable.Dispose();
+ }
+ catch (OperationCanceledException)
+ {
+ }
+ }
GC.SuppressFinalize(this);
}
@@ -365,31 +291,18 @@ private async Task SubmitAsync(string methodName,
bool serializedArguments,
CancellationToken token)
{
- await semaphoreSlim_.WaitAsync(token);
-
- return await SubmitAsync(new BlockRequest
- {
- SubmitId = Guid.NewGuid(),
- Payload = new ArmonikPayload(methodName,
- argument,
- serializedArguments),
- Handler = handler,
- MaxRetries = maxRetries,
- TaskOptions = taskOptions ?? SessionService.TaskOptions,
- Lock = semaphoreSlim_,
- },
- token)
- .ConfigureAwait(false);
- }
-
- private async Task SubmitAsync(BlockRequest blockRequest,
- CancellationToken token = default)
- {
- await BufferSubmit.SendAsync(blockRequest,
- token)
- .ConfigureAwait(false);
-
- return await requestTaskMap_.GetResponseAsync(blockRequest.SubmitId);
+ var tcs = new TaskCompletionSource();
+ await SubmitChannel.WriteAsync(new TaskSubmission(new ArmonikPayload(methodName,
+ argument,
+ serializedArguments).Serialize(),
+ Array.Empty(),
+ taskOptions,
+ handler,
+ tcs),
+ token)
+ .ConfigureAwait(false);
+
+ return await tcs.Task.ConfigureAwait(false);
}
///
@@ -800,6 +713,12 @@ private void ResultTask()
public ObjectPool GetChannelPool()
=> SessionService.ChannelPool;
+ private record TaskSubmission(byte[] Payload,
+ IList Dependencies,
+ TaskOptions? TaskOptions,
+ IServiceInvocationHandler Handler,
+ TaskCompletionSource? Tcs);
+
///
/// Class to return TaskId and the result
///