From 955bddd018c853299cd70c9cda323f9411224885 Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Fri, 19 Jul 2024 17:30:47 +0200 Subject: [PATCH 1/2] Add thread to Worker.Process --- .../src/DLLWorker/Services/ComputerService.cs | 38 +++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/Worker/src/DLLWorker/Services/ComputerService.cs b/Worker/src/DLLWorker/Services/ComputerService.cs index 66ff9b14..96b53e10 100644 --- a/Worker/src/DLLWorker/Services/ComputerService.cs +++ b/Worker/src/DLLWorker/Services/ComputerService.cs @@ -1,6 +1,6 @@ // This file is part of the ArmoniK project // -// Copyright (C) ANEO, 2021-2023. All rights reserved. +// Copyright (C) ANEO, 2021-2024. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License") // you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using ArmoniK.Api.Common.Channel.Utils; @@ -26,6 +28,7 @@ using ArmoniK.Api.Worker.Worker; using ArmoniK.DevelopmentKit.Common; using ArmoniK.DevelopmentKit.Common.Exceptions; +using ArmoniK.Utils; using Grpc.Core; @@ -36,6 +39,8 @@ namespace ArmoniK.DevelopmentKit.Worker.DLLWorker.Services; public class ComputerService : WorkerStreamWrapper { + private readonly ChannelWriter<(ArmonikServiceWorker, ITaskHandler, TaskCompletionSource)> channel_; + public ComputerService(IConfiguration configuration, GrpcChannelProvider provider, ServiceRequestContext serviceRequestContext) @@ -45,6 +50,30 @@ public ComputerService(IConfiguration configuration, Configuration = configuration; Logger = serviceRequestContext.LoggerFactory.CreateLogger(); ServiceRequestContext = serviceRequestContext; + + var channel = Channel.CreateBounded<(ArmonikServiceWorker, ITaskHandler, TaskCompletionSource)>(1); + var channelReader = channel.Reader; + channel_ = channel.Writer; + new Thread(() => + { + var requests = channelReader.ToAsyncEnumerable(CancellationToken.None) + .ToEnumerable(); + foreach (var (service, taskHandler, tcs) in requests) + { + try + { + tcs.SetResult(service.Execute(taskHandler)); + } + catch (Exception e) + { + tcs.SetException(e); + } + } + }) + { + IsBackground = true, + }.Start(); + Logger.LogDebug("Starting worker...OK"); } @@ -122,8 +151,11 @@ public override async Task Process(ITaskHandler taskHandler) ServiceRequestContext.SessionId = sessionIdCaller; Logger.LogInformation("Executing task"); - var sw = Stopwatch.StartNew(); - var result = serviceWorker.Execute(taskHandler); + var sw = Stopwatch.StartNew(); + var tcs = new TaskCompletionSource(); + await channel_.WriteAsync((serviceWorker, taskHandler, tcs)) + .ConfigureAwait(false); + var result = await tcs.Task.ConfigureAwait(false); if (result != null) { From 27a215b451e28c78c37ff0a885ecba5bbe0edd81 Mon Sep 17 00:00:00 2001 From: Florian Lemaitre Date: Mon, 22 Jul 2024 10:48:10 +0200 Subject: [PATCH 2/2] Avoid service recreation --- Worker/src/DLLWorker/Program.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Worker/src/DLLWorker/Program.cs b/Worker/src/DLLWorker/Program.cs index eb7b5178..0a15ae1f 100644 --- a/Worker/src/DLLWorker/Program.cs +++ b/Worker/src/DLLWorker/Program.cs @@ -26,5 +26,9 @@ AppContext.SetSwitch("System.Net.SocketsHttpHandler.Http2FlowControl.DisableDynamicWindowSizing", true); -WorkerServer.Create(serviceConfigurator: collection => collection.AddSingleton()) +WorkerServer.Create(serviceConfigurator: collection => + { + collection.AddSingleton(); + collection.AddSingleton(); + }) .Run();