diff --git a/release_notes.md b/release_notes.md index 61f31d477..d65436283 100644 --- a/release_notes.md +++ b/release_notes.md @@ -10,4 +10,4 @@ ### Microsoft.Azure.Functions.Worker.Core - ### Microsoft.Azure.Functions.Worker.Grpc -- +- Fixed an issue causing throughput degradation and for synchronous functions, blocked the execution pipeline. (#1516) diff --git a/src/DotNetWorker.Core/Hosting/WorkerOptions.cs b/src/DotNetWorker.Core/Hosting/WorkerOptions.cs index d2d58909c..acd500c8b 100644 --- a/src/DotNetWorker.Core/Hosting/WorkerOptions.cs +++ b/src/DotNetWorker.Core/Hosting/WorkerOptions.cs @@ -35,7 +35,7 @@ public class WorkerOptions }; /// - /// Gets and sets the flag for opting in to unwrapping user-code-thrown + /// Gets or sets the flag for opting in to unwrapping user-code-thrown /// exceptions when they are surfaced to the Host. /// public bool EnableUserCodeException diff --git a/src/DotNetWorker.Grpc/GrpcServiceCollectionExtensions.cs b/src/DotNetWorker.Grpc/GrpcServiceCollectionExtensions.cs index 9ba5842db..d95637d4b 100644 --- a/src/DotNetWorker.Grpc/GrpcServiceCollectionExtensions.cs +++ b/src/DotNetWorker.Grpc/GrpcServiceCollectionExtensions.cs @@ -12,6 +12,7 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Azure.Functions.Worker.Handlers; namespace Microsoft.Extensions.DependencyInjection { @@ -49,6 +50,7 @@ public static IServiceCollection AddGrpc(this IServiceCollection services) // gRPC Core services services.AddSingleton(); + services.TryAddSingleton(); #if NET5_0_OR_GREATER // If we are running in the native host process, use the native client diff --git a/src/DotNetWorker.Grpc/GrpcWorker.cs b/src/DotNetWorker.Grpc/GrpcWorker.cs index 806044cea..0745fcdb2 100644 --- a/src/DotNetWorker.Grpc/GrpcWorker.cs +++ b/src/DotNetWorker.Grpc/GrpcWorker.cs @@ -8,15 +8,12 @@ using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; -using Azure.Core.Serialization; -using Microsoft.Azure.Functions.Worker.Context.Features; using Microsoft.Azure.Functions.Worker.Core.FunctionMetadata; using Microsoft.Azure.Functions.Worker.Grpc; using Microsoft.Azure.Functions.Worker.Grpc.FunctionMetadata; using Microsoft.Azure.Functions.Worker.Grpc.Messages; using Microsoft.Azure.Functions.Worker.Handlers; using Microsoft.Azure.Functions.Worker.Invocation; -using Microsoft.Azure.Functions.Worker.OutputBindings; using Microsoft.Azure.Functions.Worker.Rpc; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -28,12 +25,8 @@ namespace Microsoft.Azure.Functions.Worker internal class GrpcWorker : IWorker, IMessageProcessor { private readonly IFunctionsApplication _application; - private readonly IInvocationFeaturesFactory _invocationFeaturesFactory; - private readonly IOutputBindingsInfoProvider _outputBindingsInfoProvider; - private readonly IInputConversionFeatureProvider _inputConversionFeatureProvider; private readonly IMethodInfoLocator _methodInfoLocator; private readonly WorkerOptions _workerOptions; - private readonly ObjectSerializer _serializer; private readonly IHostApplicationLifetime _hostApplicationLifetime; private readonly IWorkerClientFactory _workerClientFactory; private readonly IInvocationHandler _invocationHandler; @@ -42,29 +35,20 @@ internal class GrpcWorker : IWorker, IMessageProcessor public GrpcWorker(IFunctionsApplication application, IWorkerClientFactory workerClientFactory, - IInvocationFeaturesFactory invocationFeaturesFactory, - IOutputBindingsInfoProvider outputBindingsInfoProvider, IMethodInfoLocator methodInfoLocator, IOptions workerOptions, - IInputConversionFeatureProvider inputConversionFeatureProvider, IFunctionMetadataProvider functionMetadataProvider, IHostApplicationLifetime hostApplicationLifetime, - ILogger logger) + IInvocationHandler invocationHandler) { _hostApplicationLifetime = hostApplicationLifetime ?? throw new ArgumentNullException(nameof(hostApplicationLifetime)); _workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory)); _application = application ?? throw new ArgumentNullException(nameof(application)); - _invocationFeaturesFactory = invocationFeaturesFactory ?? throw new ArgumentNullException(nameof(invocationFeaturesFactory)); - _outputBindingsInfoProvider = outputBindingsInfoProvider ?? throw new ArgumentNullException(nameof(outputBindingsInfoProvider)); _methodInfoLocator = methodInfoLocator ?? throw new ArgumentNullException(nameof(methodInfoLocator)); - _workerOptions = workerOptions?.Value ?? throw new ArgumentNullException(nameof(workerOptions)); - _serializer = workerOptions.Value.Serializer ?? throw new InvalidOperationException(nameof(workerOptions.Value.Serializer)); - _inputConversionFeatureProvider = inputConversionFeatureProvider ?? throw new ArgumentNullException(nameof(inputConversionFeatureProvider)); _functionMetadataProvider = functionMetadataProvider ?? throw new ArgumentNullException(nameof(functionMetadataProvider)); - // Handlers (TODO: dependency inject handlers instead of creating here) - _invocationHandler = new InvocationHandler(_application, _invocationFeaturesFactory, _serializer, _outputBindingsInfoProvider, _inputConversionFeatureProvider, logger); + _invocationHandler = invocationHandler; } public Task StartAsync(CancellationToken token) @@ -78,7 +62,7 @@ public Task StartAsync(CancellationToken token) Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message) { // Dispatch and return. - _ = ProcessRequestCoreAsync(message); + Task.Run(() => ProcessRequestCoreAsync(message)); return Task.CompletedTask; } @@ -134,7 +118,7 @@ private async Task ProcessRequestCoreAsync(StreamingMessage request) internal Task InvocationRequestHandlerAsync(InvocationRequest request) { - return _invocationHandler.InvokeAsync(request, _workerOptions); + return _invocationHandler.InvokeAsync(request); } internal void InvocationCancelRequestHandler(InvocationCancel request) diff --git a/src/DotNetWorker.Grpc/Handlers/IInvocationHandler.cs b/src/DotNetWorker.Grpc/Handlers/IInvocationHandler.cs index 1b931975a..fc9135328 100644 --- a/src/DotNetWorker.Grpc/Handlers/IInvocationHandler.cs +++ b/src/DotNetWorker.Grpc/Handlers/IInvocationHandler.cs @@ -4,7 +4,6 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Functions.Worker.Grpc.Messages; -using Microsoft.Extensions.Options; namespace Microsoft.Azure.Functions.Worker.Handlers { @@ -16,9 +15,8 @@ internal interface IInvocationHandler /// an associated cancellation token source for the invocation. /// /// Function invocation request - /// /// - Task InvokeAsync(InvocationRequest request, WorkerOptions? workerOptions = null); + Task InvokeAsync(InvocationRequest request); /// /// Cancels an invocation's associated . diff --git a/src/DotNetWorker.Grpc/Handlers/InvocationHandler.cs b/src/DotNetWorker.Grpc/Handlers/InvocationHandler.cs index 7b5aee58b..e78ac87b1 100644 --- a/src/DotNetWorker.Grpc/Handlers/InvocationHandler.cs +++ b/src/DotNetWorker.Grpc/Handlers/InvocationHandler.cs @@ -5,7 +5,6 @@ using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; -using Azure.Core.Serialization; using Microsoft.Azure.Functions.Worker.Context.Features; using Microsoft.Azure.Functions.Worker.Grpc; using Microsoft.Azure.Functions.Worker.Grpc.Features; @@ -23,7 +22,7 @@ internal class InvocationHandler : IInvocationHandler private readonly IInvocationFeaturesFactory _invocationFeaturesFactory; private readonly IOutputBindingsInfoProvider _outputBindingsInfoProvider; private readonly IInputConversionFeatureProvider _inputConversionFeatureProvider; - private readonly ObjectSerializer _serializer; + private readonly WorkerOptions _workerOptions; private readonly ILogger _logger; private ConcurrentDictionary _inflightInvocations; @@ -31,30 +30,34 @@ internal class InvocationHandler : IInvocationHandler public InvocationHandler( IFunctionsApplication application, IInvocationFeaturesFactory invocationFeaturesFactory, - ObjectSerializer serializer, IOutputBindingsInfoProvider outputBindingsInfoProvider, IInputConversionFeatureProvider inputConversionFeatureProvider, - ILogger logger) + IOptions workerOptions, + ILogger logger) { _application = application ?? throw new ArgumentNullException(nameof(application)); _invocationFeaturesFactory = invocationFeaturesFactory ?? throw new ArgumentNullException(nameof(invocationFeaturesFactory)); - _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); _outputBindingsInfoProvider = outputBindingsInfoProvider ?? throw new ArgumentNullException(nameof(outputBindingsInfoProvider)); _inputConversionFeatureProvider = inputConversionFeatureProvider ?? throw new ArgumentNullException(nameof(inputConversionFeatureProvider)); + _workerOptions = workerOptions?.Value ?? throw new ArgumentNullException(nameof(workerOptions)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _inflightInvocations = new ConcurrentDictionary(); + + if (_workerOptions.Serializer is null) + { + throw new InvalidOperationException($"The {nameof(WorkerOptions)}.{nameof(WorkerOptions.Serializer)} is null"); + } } - public async Task InvokeAsync(InvocationRequest request, WorkerOptions? workerOptions = null) + public async Task InvokeAsync(InvocationRequest request) { - bool enableUserCodeException = workerOptions is not null && workerOptions.EnableUserCodeException; using CancellationTokenSource cancellationTokenSource = new(); FunctionContext? context = null; InvocationResponse response = new() { InvocationId = request.InvocationId, - Result = new StatusResult() + Result = new StatusResult() }; if (!_inflightInvocations.TryAdd(request.InvocationId, cancellationTokenSource)) @@ -62,6 +65,7 @@ public async Task InvokeAsync(InvocationRequest request, Wor var exception = new InvalidOperationException("Unable to track CancellationTokenSource"); response.Result.Status = StatusResult.Types.Status.Failure; response.Result.Exception = exception.ToRpcException(); + return response; } @@ -83,6 +87,7 @@ public async Task InvokeAsync(InvocationRequest request, Wor await _application.InvokeFunctionAsync(context); + var serializer = _workerOptions.Serializer!; var functionBindings = context.GetBindings(); foreach (var binding in functionBindings.OutputBindingData) @@ -94,7 +99,7 @@ public async Task InvokeAsync(InvocationRequest request, Wor if (binding.Value is not null) { - parameterBinding.Data = await binding.Value.ToRpcAsync(_serializer); + parameterBinding.Data = await binding.Value.ToRpcAsync(serializer); } response.OutputData.Add(parameterBinding); @@ -102,7 +107,7 @@ public async Task InvokeAsync(InvocationRequest request, Wor if (functionBindings.InvocationResult is not null) { - TypedData? returnVal = await functionBindings.InvocationResult.ToRpcAsync(_serializer); + TypedData? returnVal = await functionBindings.InvocationResult.ToRpcAsync(serializer); response.ReturnValue = returnVal; } @@ -110,7 +115,7 @@ public async Task InvokeAsync(InvocationRequest request, Wor } catch (Exception ex) { - response.Result.Exception = enableUserCodeException ? ex.ToUserRpcException() : ex.ToRpcException(); + response.Result.Exception = _workerOptions.EnableUserCodeException ? ex.ToUserRpcException() : ex.ToRpcException(); response.Result.Status = StatusResult.Types.Status.Failure; if (ex.InnerException is TaskCanceledException or OperationCanceledException) diff --git a/test/DotNetWorkerTests/GrpcWorkerTests.cs b/test/DotNetWorkerTests/GrpcWorkerTests.cs index 61edd17a2..a19a4a8fe 100644 --- a/test/DotNetWorkerTests/GrpcWorkerTests.cs +++ b/test/DotNetWorkerTests/GrpcWorkerTests.cs @@ -12,12 +12,15 @@ using Azure.Core.Serialization; using Microsoft.Azure.Functions.Tests; using Microsoft.Azure.Functions.Worker.Context.Features; +using Microsoft.Azure.Functions.Worker.Core.FunctionMetadata; +using Microsoft.Azure.Functions.Worker.Grpc; using Microsoft.Azure.Functions.Worker.Grpc.Messages; using Microsoft.Azure.Functions.Worker.Handlers; using Microsoft.Azure.Functions.Worker.Invocation; using Microsoft.Azure.Functions.Worker.OutputBindings; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Hosting.Internal; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Moq; @@ -28,7 +31,6 @@ namespace Microsoft.Azure.Functions.Worker.Tests public class GrpcWorkerTests { private readonly Mock _mockApplication = new(MockBehavior.Strict); - private readonly Mock _mockFeaturesFactory = new(MockBehavior.Strict); private readonly Mock _mockInputConversionFeatureProvider = new(MockBehavior.Strict); private readonly Mock mockConversionFeature = new(MockBehavior.Strict); private readonly Mock _mockOutputBindingsInfoProvider = new(MockBehavior.Strict); @@ -54,10 +56,6 @@ public GrpcWorkerTests() .Setup(m => m.InvokeFunctionAsync(It.IsAny())) .Returns(Task.CompletedTask); - _mockFeaturesFactory - .Setup(m => m.Create()) - .Returns(new InvocationFeatures(Enumerable.Empty())); - _mockMethodInfoLocator .Setup(m => m.GetMethod(It.IsAny(), It.IsAny())) .Returns(typeof(GrpcWorkerTests).GetMethod(nameof(TestRun), BindingFlags.Instance | BindingFlags.NonPublic)); @@ -246,106 +244,90 @@ void AssertKeyAndValue(KeyValuePair kvp, string expectedKey, str } [Fact] - public async Task Invoke_ReturnsSuccess() + public void EnvironmentReloadRequestHandler_ReturnsExpected() { - var request = TestUtility.CreateInvocationRequest(); - - var invocationHandler = new InvocationHandler(_mockApplication.Object, - _mockFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, - _mockInputConversionFeatureProvider.Object, _testLogger); - - var response = await invocationHandler.InvokeAsync(request); + var actual = GrpcWorker.EnvironmentReloadRequestHandler(new WorkerOptions()); ; - Assert.Equal(StatusResult.Types.Status.Success, response.Result.Status); - Assert.True(_context.IsDisposed); + Assert.Equal(StatusResult.Success, actual.Result); + Assert.NotNull(actual.WorkerMetadata); + Assert.NotEmpty(actual.Capabilities); } [Fact] - public async Task Invoke_ReturnsSuccess_AsyncFunctionContext() + public async Task Invocation_WhenSynchronous_DoesNotBlock() { - var request = TestUtility.CreateInvocationRequest(); - - // Mock IFunctionApplication.CreateContext to return TestAsyncFunctionContext instance. - _mockApplication - .Setup(m => m.CreateContext(It.IsAny(), It.IsAny())) - .Returns((f, ct) => - { - _context = new TestAsyncFunctionContext(f); - return _context; - }); - - var invocationHandler = new InvocationHandler(_mockApplication.Object, - _mockFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, - _mockInputConversionFeatureProvider.Object, _testLogger); - - var response = await invocationHandler.InvokeAsync(request); - - Assert.Equal(StatusResult.Types.Status.Success, response.Result.Status); - Assert.True((_context as TestAsyncFunctionContext).IsAsyncDisposed); - Assert.True(_context.IsDisposed); - } + using var testVariables = new TestScopedEnvironmentVariable("FUNCTIONS_WORKER_DIRECTORY", "test"); - [Fact] - public async Task Invoke_SetsRetryContext() - { - var request = TestUtility.CreateInvocationRequest(); + var blockingFunctionEvent = new ManualResetEventSlim(); + var releaseFunctionEvent = new ManualResetEventSlim(); - var invocationHandler = new InvocationHandler(_mockApplication.Object, - _mockFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, - _mockInputConversionFeatureProvider.Object, _testLogger); + var clientFactoryMock = new Mock(); + var clientMock = new Mock(); + var metadataProvider = new Mock(); + var invocationHandlerMock = new Mock(); - var response = await invocationHandler.InvokeAsync(request); + InvocationResponse ValueFunction(InvocationRequest request) + { + if (string.Equals(request.FunctionId, "a")) + { + blockingFunctionEvent.Set(); + releaseFunctionEvent.Wait(); + } + else + { + releaseFunctionEvent.Set(); + } - Assert.Equal(StatusResult.Types.Status.Success, response.Result.Status); - Assert.True(_context.IsDisposed); - Assert.Equal(request.RetryContext.RetryCount, _context.RetryContext.RetryCount); - Assert.Equal(request.RetryContext.MaxRetryCount, _context.RetryContext.MaxRetryCount); - } + return new InvocationResponse(); + } - [Fact] - public async Task SetRetryContextToNull() - { - var request = TestUtility.CreateInvocationRequestWithNullRetryContext(); + invocationHandlerMock.Setup(h => h.InvokeAsync(It.IsAny())) + .ReturnsAsync(ValueFunction); - var invocationHandler = new InvocationHandler(_mockApplication.Object, - _mockFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, - _mockInputConversionFeatureProvider.Object, _testLogger); + clientMock.Setup(c => c.SendMessageAsync(It.IsAny())) + .Returns(ValueTask.CompletedTask); - var response = await invocationHandler.InvokeAsync(request); + clientFactoryMock.Setup(f=>f.CreateClient(It.IsAny())) + .Returns(clientMock.Object); - Assert.Equal(StatusResult.Types.Status.Success, response.Result.Status); - Assert.True(_context.IsDisposed); - Assert.Null(_context.RetryContext); - } + var worker = new GrpcWorker(_mockApplication.Object, + clientFactoryMock.Object, + _mockMethodInfoLocator.Object, + new OptionsWrapper(new WorkerOptions()), + metadataProvider.Object, + new ApplicationLifetime(TestLogger.Create()), + invocationHandlerMock.Object); - [Fact] - public async Task Invoke_CreateContextThrows_ReturnsFailure() - { - _mockApplication - .Setup(m => m.CreateContext(It.IsAny(), It.IsAny())) - .Throws(new InvalidOperationException("whoops")); + await worker.StartAsync(CancellationToken.None); - var request = TestUtility.CreateInvocationRequest(); - var invocationHandler = new InvocationHandler(_mockApplication.Object, - _mockFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, - _mockInputConversionFeatureProvider.Object, _testLogger); + void ProcessMessage(IMessageProcessor processor, string functionId = null) + { + processor.ProcessMessageAsync(new StreamingMessage + { + InvocationRequest = new InvocationRequest { FunctionId = functionId } + }); + } - var response = await invocationHandler.InvokeAsync(request); + _ = Task.Run(() => + { + ProcessMessage(worker, "a"); - Assert.Equal(StatusResult.Types.Status.Failure, response.Result.Status); - Assert.Contains("InvalidOperationException: whoops", response.Result.Exception.Message); - Assert.Contains("CreateContext", response.Result.Exception.Message); - } + // Ensure we're executing the blocking function before invoking + // the release function + if (!blockingFunctionEvent.Wait(5000)) + { + Assert.Fail("Blocking function event was not set."); + } - [Fact] - public void EnvironmentReloadRequestHandler_ReturnsExpected() - { - var actual = GrpcWorker.EnvironmentReloadRequestHandler(new WorkerOptions()); ; + ProcessMessage(worker, "b"); + }); - Assert.Equal(StatusResult.Success, actual.Result); - Assert.NotNull(actual.WorkerMetadata); - Assert.NotEmpty(actual.Capabilities); + releaseFunctionEvent.Wait(5000); + + Assert.True(releaseFunctionEvent.IsSet, + "Release function was never called. " + + "This indicates the blocking function prevented execution flow."); } private static FunctionLoadRequest CreateFunctionLoadRequest() diff --git a/test/DotNetWorkerTests/Handlers/InvocationHandlerTests.cs b/test/DotNetWorkerTests/Handlers/InvocationHandlerTests.cs index 34a45667f..95363894d 100644 --- a/test/DotNetWorkerTests/Handlers/InvocationHandlerTests.cs +++ b/test/DotNetWorkerTests/Handlers/InvocationHandlerTests.cs @@ -24,6 +24,7 @@ public class InvocationHandlerTests private readonly Mock _mockOutputBindingsInfoProvider = new(MockBehavior.Strict); private readonly Mock _mockInputConversionFeatureProvider = new(MockBehavior.Strict); private readonly Mock mockConversionFeature = new(MockBehavior.Strict); + private readonly Mock _mockFeaturesFactory = new(MockBehavior.Strict); private TestFunctionContext _context = new(); private ILogger _testLogger; @@ -45,6 +46,10 @@ public InvocationHandlerTests() .Setup(m => m.Create()) .Returns(new InvocationFeatures(Enumerable.Empty())); + _mockFeaturesFactory + .Setup(m => m.Create()) + .Returns(new InvocationFeatures(Enumerable.Empty())); + IInputConversionFeature conversionFeature = mockConversionFeature.Object; _mockInputConversionFeatureProvider .Setup(m => m.TryCreate(typeof(DefaultInputConversionFeature), out conversionFeature)) @@ -58,11 +63,7 @@ public async Task InvokeAsync_CreatesValidCancellationToken_ReturnsSuccess() { var invocationId = "5fb3a9b4-0b38-450a-9d46-35946e7edea7"; var request = TestUtility.CreateInvocationRequest(invocationId); - - var handler = new InvocationHandler(_mockApplication.Object, - _mockInvocationFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, - _mockInputConversionFeatureProvider.Object, _testLogger); - + var handler = CreateInvocationHandler(); var response = await handler.InvokeAsync(request); // InvokeAsync should create a real cancellation token which can be cancelled, @@ -79,11 +80,7 @@ public async Task InvokeAsync_ThrowsTaskCanceledException_ReturnsCancelled() .Throws(new AggregateException(new Exception[] { new TaskCanceledException() })); var request = TestUtility.CreateInvocationRequest("abc"); - - var invocationHandler = new InvocationHandler(_mockApplication.Object, - _mockInvocationFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, - _mockInputConversionFeatureProvider.Object, _testLogger); - + var invocationHandler = CreateInvocationHandler(); var response = await invocationHandler.InvokeAsync(request); Assert.Equal(StatusResult.Types.Status.Cancelled, response.Result.Status); @@ -97,26 +94,24 @@ public void Cancel_InvocationInProgress_CancelsTokenSource_ReturnsTrue() var request = TestUtility.CreateInvocationRequest(invocationId); var cts = new CancellationTokenSource(); - var handler = new InvocationHandler(_mockApplication.Object, - _mockInvocationFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, - _mockInputConversionFeatureProvider.Object, _testLogger); - // Mock delay in InvokeFunctionAsync so that we can cancel mid invocation _mockApplication .Setup(m => m.InvokeFunctionAsync(It.IsAny())) .Callback(() => Thread.Sleep(1000)) .Returns(Task.CompletedTask); + var invocationHandler = CreateInvocationHandler(); + // Don't wait for InvokeAsync so we can cancel whilst it's in progress _ = Task.Run(async () => { - await handler.InvokeAsync(request); + await invocationHandler.InvokeAsync(request); }); // Buffer to ensure the cancellation token source was created before we try to cancel Thread.Sleep(500); - var result = handler.TryCancel(invocationId); + var result = invocationHandler.TryCancel(invocationId); Assert.True(result); } @@ -125,14 +120,11 @@ public async Task Cancel_InvocationCompleted_ReturnsFalse() { var invocationId = "5fb3a9b4-0b38-450a-9d46-35946e7edea7"; var request = TestUtility.CreateInvocationRequest(invocationId); + var invocationHandler = CreateInvocationHandler(); - var handler = new InvocationHandler(_mockApplication.Object, - _mockInvocationFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, - _mockInputConversionFeatureProvider.Object, _testLogger); - - _ = await handler.InvokeAsync(request); + _ = await invocationHandler.InvokeAsync(request); - var result = handler.TryCancel(invocationId); + var result = invocationHandler.TryCancel(invocationId); Assert.False(result); } @@ -143,21 +135,19 @@ public async Task Cancel_InvocationCompleted_ReturnsFalse() public async Task InvokeAsync_UserCodeThrowsException_OptionEnabled() { var exceptionMessage = "user code exception"; - var mockOptions = new WorkerOptions() + var mockOptions = new OptionsWrapper(new() { - EnableUserCodeException = true - }; + EnableUserCodeException = true, + Serializer = new JsonObjectSerializer() + }); _mockApplication .Setup(m => m.InvokeFunctionAsync(It.IsAny())) .Throws(new Exception(exceptionMessage)); - var request = TestUtility.CreateInvocationRequest("abc"); - - var invocationHandler = new InvocationHandler(_mockApplication.Object, - _mockInvocationFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, - _mockInputConversionFeatureProvider.Object, _testLogger); - var response = await invocationHandler.InvokeAsync(request, mockOptions); + var request = TestUtility.CreateInvocationRequest("abc"); + var invocationHandler = CreateInvocationHandler(workerOptions: mockOptions); + var response = await invocationHandler.InvokeAsync(request); Assert.Equal(StatusResult.Types.Status.Failure, response.Result.Status); Assert.Equal("System.Exception", response.Result.Exception.Type.ToString()); @@ -180,18 +170,106 @@ public async Task InvokeAsync_UserCodeThrowsException_OptionDisabled() _mockApplication .Setup(m => m.InvokeFunctionAsync(It.IsAny())) .Throws(new Exception(exceptionMessage)); - var request = TestUtility.CreateInvocationRequest("abc"); - - var invocationHandler = new InvocationHandler(_mockApplication.Object, - _mockInvocationFeaturesFactory.Object, new JsonObjectSerializer(), _mockOutputBindingsInfoProvider.Object, - _mockInputConversionFeatureProvider.Object, _testLogger); - var response = await invocationHandler.InvokeAsync(request, mockOptions); + var request = TestUtility.CreateInvocationRequest("abc"); + var invocationHandler = CreateInvocationHandler(); + var response = await invocationHandler.InvokeAsync(request); Assert.Equal(StatusResult.Types.Status.Failure, response.Result.Status); Assert.NotEqual("System.Exception", response.Result.Exception.Type.ToString()); Assert.NotEqual(exceptionMessage, response.Result.Exception.Message); Assert.False(response.Result.Exception.IsUserException); } + + [Fact] + public async Task Invoke_ReturnsSuccess_AsyncFunctionContext() + { + var request = TestUtility.CreateInvocationRequest(); + + // Mock IFunctionApplication.CreateContext to return TestAsyncFunctionContext instance. + _mockApplication + .Setup(m => m.CreateContext(It.IsAny(), It.IsAny())) + .Returns((f, ct) => + { + _context = new TestAsyncFunctionContext(f); + return _context; + }); + + var invocationHandler = CreateInvocationHandler(); + var response = await invocationHandler.InvokeAsync(request); + + Assert.Equal(StatusResult.Types.Status.Success, response.Result.Status); + Assert.True((_context as TestAsyncFunctionContext).IsAsyncDisposed); + Assert.True(_context.IsDisposed); + } + + [Fact] + public async Task Invoke_ReturnsSuccess() + { + var request = TestUtility.CreateInvocationRequest(); + var invocationHandler = CreateInvocationHandler(); + var response = await invocationHandler.InvokeAsync(request); + + Assert.Equal(StatusResult.Types.Status.Success, response.Result.Status); + Assert.True(_context.IsDisposed); + } + + [Fact] + public async Task Invoke_SetsRetryContext() + { + var request = TestUtility.CreateInvocationRequest(); + var invocationHandler = CreateInvocationHandler(); + var response = await invocationHandler.InvokeAsync(request); + + Assert.Equal(StatusResult.Types.Status.Success, response.Result.Status); + Assert.True(_context.IsDisposed); + Assert.Equal(request.RetryContext.RetryCount, _context.RetryContext.RetryCount); + Assert.Equal(request.RetryContext.MaxRetryCount, _context.RetryContext.MaxRetryCount); + } + + [Fact] + public async Task Invoke_CreateContextThrows_ReturnsFailure() + { + _mockApplication + .Setup(m => m.CreateContext(It.IsAny(), It.IsAny())) + .Throws(new InvalidOperationException("whoops")); + + var request = TestUtility.CreateInvocationRequest(); + var invocationHandler = CreateInvocationHandler(); + var response = await invocationHandler.InvokeAsync(request); + + Assert.Equal(StatusResult.Types.Status.Failure, response.Result.Status); + Assert.Contains("InvalidOperationException: whoops", response.Result.Exception.Message); + Assert.Contains("CreateContext", response.Result.Exception.Message); + } + + [Fact] + public async Task SetRetryContextToNull() + { + var request = TestUtility.CreateInvocationRequestWithNullRetryContext(); + var invocationHandler = CreateInvocationHandler(); + var response = await invocationHandler.InvokeAsync(request); + + Assert.Equal(StatusResult.Types.Status.Success, response.Result.Status); + Assert.True(_context.IsDisposed); + Assert.Null(_context.RetryContext); + } + + private InvocationHandler CreateInvocationHandler(IFunctionsApplication application = null, + IOptions workerOptions = null) + { + workerOptions ??= CreateDefaultWorkerOptions(); + + return new InvocationHandler(application ?? _mockApplication.Object, + _mockFeaturesFactory.Object, _mockOutputBindingsInfoProvider.Object, + _mockInputConversionFeatureProvider.Object, workerOptions, _testLogger); + } + private static IOptions CreateDefaultWorkerOptions() + { + return new OptionsWrapper(new() + { + Serializer = new JsonObjectSerializer() + }); + } } }