Skip to content

Commit

Permalink
Fixed issue causing synchronous functions to block execution pipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
fabiocav committed May 8, 2023
1 parent 1c9b7e0 commit 7782ec9
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 157 deletions.
2 changes: 1 addition & 1 deletion release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
### Microsoft.Azure.Functions.Worker.Core <version>
- <entry>
### Microsoft.Azure.Functions.Worker.Grpc <version>
- <entry>
- Fixed an issue causing throughput degradation and for synchronous functions, blocked the execution pipeline. (#1516)
2 changes: 1 addition & 1 deletion src/DotNetWorker.Core/Hosting/WorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class WorkerOptions
};

/// <summary>
/// Gets and sets the flag for opting in to unwrapping user-code-thrown
/// Gets or sets the flag for opting in to unwrapping user-code-thrown
/// exceptions when they are surfaced to the Host.
/// </summary>
public bool EnableUserCodeException
Expand Down
2 changes: 2 additions & 0 deletions src/DotNetWorker.Grpc/GrpcServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Azure.Functions.Worker.Handlers;

namespace Microsoft.Extensions.DependencyInjection
{
Expand Down Expand Up @@ -49,6 +50,7 @@ public static IServiceCollection AddGrpc(this IServiceCollection services)

// gRPC Core services
services.AddSingleton<IWorker, GrpcWorker>();
services.TryAddSingleton<IInvocationHandler, InvocationHandler>();

#if NET5_0_OR_GREATER
// If we are running in the native host process, use the native client
Expand Down
24 changes: 4 additions & 20 deletions src/DotNetWorker.Grpc/GrpcWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core.Serialization;
using Microsoft.Azure.Functions.Worker.Context.Features;
using Microsoft.Azure.Functions.Worker.Core.FunctionMetadata;
using Microsoft.Azure.Functions.Worker.Grpc;
using Microsoft.Azure.Functions.Worker.Grpc.FunctionMetadata;
using Microsoft.Azure.Functions.Worker.Grpc.Messages;
using Microsoft.Azure.Functions.Worker.Handlers;
using Microsoft.Azure.Functions.Worker.Invocation;
using Microsoft.Azure.Functions.Worker.OutputBindings;
using Microsoft.Azure.Functions.Worker.Rpc;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
Expand All @@ -28,12 +25,8 @@ namespace Microsoft.Azure.Functions.Worker
internal class GrpcWorker : IWorker, IMessageProcessor
{
private readonly IFunctionsApplication _application;
private readonly IInvocationFeaturesFactory _invocationFeaturesFactory;
private readonly IOutputBindingsInfoProvider _outputBindingsInfoProvider;
private readonly IInputConversionFeatureProvider _inputConversionFeatureProvider;
private readonly IMethodInfoLocator _methodInfoLocator;
private readonly WorkerOptions _workerOptions;
private readonly ObjectSerializer _serializer;
private readonly IHostApplicationLifetime _hostApplicationLifetime;
private readonly IWorkerClientFactory _workerClientFactory;
private readonly IInvocationHandler _invocationHandler;
Expand All @@ -42,29 +35,20 @@ internal class GrpcWorker : IWorker, IMessageProcessor

public GrpcWorker(IFunctionsApplication application,
IWorkerClientFactory workerClientFactory,
IInvocationFeaturesFactory invocationFeaturesFactory,
IOutputBindingsInfoProvider outputBindingsInfoProvider,
IMethodInfoLocator methodInfoLocator,
IOptions<WorkerOptions> workerOptions,
IInputConversionFeatureProvider inputConversionFeatureProvider,
IFunctionMetadataProvider functionMetadataProvider,
IHostApplicationLifetime hostApplicationLifetime,
ILogger<GrpcWorker> logger)
IInvocationHandler invocationHandler)
{
_hostApplicationLifetime = hostApplicationLifetime ?? throw new ArgumentNullException(nameof(hostApplicationLifetime));
_workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory));
_application = application ?? throw new ArgumentNullException(nameof(application));
_invocationFeaturesFactory = invocationFeaturesFactory ?? throw new ArgumentNullException(nameof(invocationFeaturesFactory));
_outputBindingsInfoProvider = outputBindingsInfoProvider ?? throw new ArgumentNullException(nameof(outputBindingsInfoProvider));
_methodInfoLocator = methodInfoLocator ?? throw new ArgumentNullException(nameof(methodInfoLocator));

_workerOptions = workerOptions?.Value ?? throw new ArgumentNullException(nameof(workerOptions));
_serializer = workerOptions.Value.Serializer ?? throw new InvalidOperationException(nameof(workerOptions.Value.Serializer));
_inputConversionFeatureProvider = inputConversionFeatureProvider ?? throw new ArgumentNullException(nameof(inputConversionFeatureProvider));
_functionMetadataProvider = functionMetadataProvider ?? throw new ArgumentNullException(nameof(functionMetadataProvider));

// Handlers (TODO: dependency inject handlers instead of creating here)
_invocationHandler = new InvocationHandler(_application, _invocationFeaturesFactory, _serializer, _outputBindingsInfoProvider, _inputConversionFeatureProvider, logger);
_invocationHandler = invocationHandler;
}

public Task StartAsync(CancellationToken token)
Expand All @@ -78,7 +62,7 @@ public Task StartAsync(CancellationToken token)
Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message)
{
// Dispatch and return.
_ = ProcessRequestCoreAsync(message);
Task.Run(() => ProcessRequestCoreAsync(message));

return Task.CompletedTask;
}
Expand Down Expand Up @@ -134,7 +118,7 @@ private async Task ProcessRequestCoreAsync(StreamingMessage request)

internal Task<InvocationResponse> InvocationRequestHandlerAsync(InvocationRequest request)
{
return _invocationHandler.InvokeAsync(request, _workerOptions);
return _invocationHandler.InvokeAsync(request);
}

internal void InvocationCancelRequestHandler(InvocationCancel request)
Expand Down
4 changes: 1 addition & 3 deletions src/DotNetWorker.Grpc/Handlers/IInvocationHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker.Grpc.Messages;
using Microsoft.Extensions.Options;

namespace Microsoft.Azure.Functions.Worker.Handlers
{
Expand All @@ -16,9 +15,8 @@ internal interface IInvocationHandler
/// an associated cancellation token source for the invocation.
/// </summary>
/// <param name="request">Function invocation request</param>
/// <param name="workerOptions"></param>
/// <returns><see cref="InvocationResponse"/></returns>
Task<InvocationResponse> InvokeAsync(InvocationRequest request, WorkerOptions? workerOptions = null);
Task<InvocationResponse> InvokeAsync(InvocationRequest request);

/// <summary>
/// Cancels an invocation's associated <see cref="CancellationTokenSource"/>.
Expand Down
27 changes: 16 additions & 11 deletions src/DotNetWorker.Grpc/Handlers/InvocationHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core.Serialization;
using Microsoft.Azure.Functions.Worker.Context.Features;
using Microsoft.Azure.Functions.Worker.Grpc;
using Microsoft.Azure.Functions.Worker.Grpc.Features;
Expand All @@ -23,45 +22,50 @@ internal class InvocationHandler : IInvocationHandler
private readonly IInvocationFeaturesFactory _invocationFeaturesFactory;
private readonly IOutputBindingsInfoProvider _outputBindingsInfoProvider;
private readonly IInputConversionFeatureProvider _inputConversionFeatureProvider;
private readonly ObjectSerializer _serializer;
private readonly WorkerOptions _workerOptions;
private readonly ILogger _logger;

private ConcurrentDictionary<string, CancellationTokenSource> _inflightInvocations;

public InvocationHandler(
IFunctionsApplication application,
IInvocationFeaturesFactory invocationFeaturesFactory,
ObjectSerializer serializer,
IOutputBindingsInfoProvider outputBindingsInfoProvider,
IInputConversionFeatureProvider inputConversionFeatureProvider,
ILogger logger)
IOptions<WorkerOptions> workerOptions,
ILogger<InvocationHandler> logger)
{
_application = application ?? throw new ArgumentNullException(nameof(application));
_invocationFeaturesFactory = invocationFeaturesFactory ?? throw new ArgumentNullException(nameof(invocationFeaturesFactory));
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
_outputBindingsInfoProvider = outputBindingsInfoProvider ?? throw new ArgumentNullException(nameof(outputBindingsInfoProvider));
_inputConversionFeatureProvider = inputConversionFeatureProvider ?? throw new ArgumentNullException(nameof(inputConversionFeatureProvider));
_workerOptions = workerOptions?.Value ?? throw new ArgumentNullException(nameof(workerOptions));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

_inflightInvocations = new ConcurrentDictionary<string, CancellationTokenSource>();

if (_workerOptions.Serializer is null)
{
throw new InvalidOperationException($"The {nameof(WorkerOptions)}.{nameof(WorkerOptions.Serializer)} is null");
}
}

public async Task<InvocationResponse> InvokeAsync(InvocationRequest request, WorkerOptions? workerOptions = null)
public async Task<InvocationResponse> InvokeAsync(InvocationRequest request)
{
bool enableUserCodeException = workerOptions is not null && workerOptions.EnableUserCodeException;
using CancellationTokenSource cancellationTokenSource = new();
FunctionContext? context = null;
InvocationResponse response = new()
{
InvocationId = request.InvocationId,
Result = new StatusResult()
Result = new StatusResult()
};

if (!_inflightInvocations.TryAdd(request.InvocationId, cancellationTokenSource))
{
var exception = new InvalidOperationException("Unable to track CancellationTokenSource");
response.Result.Status = StatusResult.Types.Status.Failure;
response.Result.Exception = exception.ToRpcException();

return response;
}

Expand All @@ -83,6 +87,7 @@ public async Task<InvocationResponse> InvokeAsync(InvocationRequest request, Wor

await _application.InvokeFunctionAsync(context);

var serializer = _workerOptions.Serializer!;
var functionBindings = context.GetBindings();

foreach (var binding in functionBindings.OutputBindingData)
Expand All @@ -94,23 +99,23 @@ public async Task<InvocationResponse> InvokeAsync(InvocationRequest request, Wor

if (binding.Value is not null)
{
parameterBinding.Data = await binding.Value.ToRpcAsync(_serializer);
parameterBinding.Data = await binding.Value.ToRpcAsync(serializer);
}

response.OutputData.Add(parameterBinding);
}

if (functionBindings.InvocationResult is not null)
{
TypedData? returnVal = await functionBindings.InvocationResult.ToRpcAsync(_serializer);
TypedData? returnVal = await functionBindings.InvocationResult.ToRpcAsync(serializer);
response.ReturnValue = returnVal;
}

response.Result.Status = StatusResult.Types.Status.Success;
}
catch (Exception ex)
{
response.Result.Exception = enableUserCodeException ? ex.ToUserRpcException() : ex.ToRpcException();
response.Result.Exception = _workerOptions.EnableUserCodeException ? ex.ToUserRpcException() : ex.ToRpcException();
response.Result.Status = StatusResult.Types.Status.Failure;

if (ex.InnerException is TaskCanceledException or OperationCanceledException)
Expand Down
Loading

0 comments on commit 7782ec9

Please sign in to comment.