Skip to content

Commit

Permalink
Enable worker indexing by default (#9574)
Browse files Browse the repository at this point in the history
  • Loading branch information
soninaren authored Nov 18, 2023
1 parent 76f1208 commit a0b23d3
Show file tree
Hide file tree
Showing 17 changed files with 306 additions and 59 deletions.
1 change: 1 addition & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- Limit dotnet-isolated specialization to 64 bit host process (#9548)
- Sending command line arguments to language workers with `functions-` prefix to prevent conflicts (#9514)
- Adding code to simulate placeholder and specilization locally (#9618)
- Enable worker indexing by default without the need for hosting config (#9574)
- Delaying execution of `LogWorkerMetadata` method until after coldstart is done. (#9627)
- Update PowerShell Worker 7.2 to 4.0.3070 [Release Note](https://github.com/Azure/azure-functions-powershell-worker/releases/tag/v4.0.3070)
- Update PowerShell Worker 7.4 to 4.0.3030 [Release Note](https://github.com/Azure/azure-functions-powershell-worker/releases/tag/v4.0.3030)
Expand Down
4 changes: 4 additions & 0 deletions sample/Node/host.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[3.*, 4.0.0)"
},
"watchDirectories": [ "Shared", "Test" ],
"healthMonitor": {
"enabled": true,
Expand Down
15 changes: 15 additions & 0 deletions sample/NodeWithoutBundle/HttpTrigger/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"bindings": [
{
"type": "httpTrigger",
"name": "req",
"direction": "in",
"methods": [ "get" ]
},
{
"type": "http",
"name": "$return",
"direction": "out"
}
]
}
33 changes: 33 additions & 0 deletions sample/NodeWithoutBundle/HttpTrigger/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
var test = require('../Shared/test');

module.exports = function (context, req) {
context.log('Node.js HTTP trigger function processed a request. Name=%s', req.query.name);

var headerValue = req.headers['test-header'];
if (headerValue) {
context.log('test-header=' + headerValue);
}

var res;
if (typeof req.query.name === 'undefined') {
res = {
status: 400,
body: "Please pass a name on the query string",
headers: {
'Content-Type': 'text/plain'
}
};
}
else {
res = {
status: 200,
body: test.greeting(req.query.name),
headers: {
'Content-Type': 'text/plain',
'Shared-Module': test.timestamp
}
};
}

context.done(null, res);
};
8 changes: 8 additions & 0 deletions sample/NodeWithoutBundle/Shared/test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
var timestamp = new Date().getTime();

module.exports = {
timestamp: timestamp,
greeting: function (name) {
return 'Hello ' + name;
}
};
39 changes: 39 additions & 0 deletions sample/NodeWithoutBundle/host.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"version": "2.0",
"healthMonitor": {
"enabled": true,
"healthCheckInterval": "00:00:10",
"healthCheckWindow": "00:02:00",
"healthCheckThreshold": 6,
"counterThreshold": 0.80
},
"functionTimeout": "00:00:10",
"logging": {
"fileLoggingMode": "always"
},
"extensions": {
"sendGrid": {
"from": "Azure Functions <samples@functions.com>"
},
"http": {
"routePrefix": "api",
"maxConcurrentRequests": 5,
"maxOutstandingRequests": 30
},
"queues": {
"visibilityTimeout": "00:00:10",
"maxDequeueCount": 3
},
"eventHubs": {
"maxBatchSize": 1000,
"prefetchCount": 1000,
"batchCheckpointFrequency": 1
},
"serviceBus": {
"prefetchCount": 100,
"messageHandlerOptions": {
"maxConcurrentCalls": 32
}
}
}
}
11 changes: 11 additions & 0 deletions src/WebJobs.Script/Config/FunctionsHostingConfigOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ public bool DisableLinuxAppServiceExecutionDetails
}
}

/// <summary>
/// Gets a value indicating whether the host should revert the worker shutdown behavior in the webhostworkerchannelmanager.
/// </summary>
public bool RevertWorkerShutdownBehaviour
{
get
{
return GetFeature(RpcWorkerConstants.RevertWorkerShutdownBehavior) == "1";
}
}

/// <summary>
/// Gets feature by name.
/// </summary>
Expand Down
21 changes: 13 additions & 8 deletions src/WebJobs.Script/Utility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1001,17 +1001,22 @@ public static void ValidateRetryOptions(RetryOptions
}
}

// EnableWorkerIndexing set through AzureWebjobsFeatuerFlag always take precdence
// if AzureWebjobsFeatuerFlag is not set then WORKER_INDEXING_ENABLED hosting config controls stamplevel enablement
// if WORKER_INDEXING_ENABLED is set and WORKER_INDEXING_DISABLED contains the customers app name worker indexing is then disabled for that customer only
// Also Worker indexing is disabled for Logic apps
// Worker indexing is disabled for Logic apps
// EnableWorkerIndexing set through AzureWebjobsFeatureFlag always take precedence
// If AzureWebjobsFeatureFlag is not set and
// WORKER_INDEXING_DISABLED contains the customers app name worker indexing is then disabled for that customer only
public static bool CanWorkerIndex(IEnumerable<RpcWorkerConfig> workerConfigs, IEnvironment environment, FunctionsHostingConfigOptions functionsHostingConfigOptions)
{
if (environment.IsLogicApp())
{
return false;
}

string appName = environment.GetAzureWebsiteUniqueSlotName();
bool workerIndexingEnabled = FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableWorkerIndexing, environment)
|| (functionsHostingConfigOptions.WorkerIndexingEnabled
&& !functionsHostingConfigOptions.WorkerIndexingDisabledApps.ToLowerInvariant().Split("|").Contains(appName)
&& !environment.IsLogicApp());

bool workerIndexingFeatureFlagSet = FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableWorkerIndexing, environment);
bool workerIndexingDisabledViaHostingConfig = functionsHostingConfigOptions.WorkerIndexingDisabledApps.ToLowerInvariant().Split("|").Contains(appName);
bool workerIndexingEnabled = workerIndexingFeatureFlagSet || !workerIndexingDisabledViaHostingConfig;

if (!workerIndexingEnabled)
{
Expand Down
1 change: 1 addition & 0 deletions src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,6 @@ public static class RpcWorkerConstants

public const string WorkerIndexingEnabled = "WORKER_INDEXING_ENABLED";
public const string WorkerIndexingDisabledApps = "WORKER_INDEXING_DISABLED_APPS";
public const string RevertWorkerShutdownBehavior = "REVERT_WORKER_SHUTDOWN_BEHAVIOR";
}
}
42 changes: 39 additions & 3 deletions src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.AppService.Proxy.Common.Extensions;
using Microsoft.Azure.AppService.Proxy.Common.Infra;
using Microsoft.Azure.AppService.Proxy.Runtime;
using Microsoft.Azure.WebJobs.Script.Config;
using Microsoft.Azure.WebJobs.Script.Diagnostics;
using Microsoft.Azure.WebJobs.Script.Eventing;
using Microsoft.Azure.WebJobs.Script.Workers.Profiles;
Expand All @@ -22,6 +25,7 @@ public class WebHostRpcWorkerChannelManager : IWebHostRpcWorkerChannelManager
private readonly ILogger _logger = null;
private readonly TimeSpan workerInitTimeout = TimeSpan.FromSeconds(30);
private readonly IOptionsMonitor<ScriptApplicationHostOptions> _applicationHostOptions = null;
private readonly IOptions<FunctionsHostingConfigOptions> _hostingConfigOptions;
private readonly IScriptEventManager _eventManager = null;
private readonly IEnvironment _environment;
private readonly ILoggerFactory _loggerFactory = null;
Expand All @@ -41,7 +45,8 @@ public WebHostRpcWorkerChannelManager(IScriptEventManager eventManager,
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions,
IMetricsLogger metricsLogger,
IConfiguration config,
IWorkerProfileManager workerProfileManager)
IWorkerProfileManager workerProfileManager,
IOptions<FunctionsHostingConfigOptions> hostingConfigOptions)
{
_environment = environment ?? throw new ArgumentNullException(nameof(environment));
_config = config ?? throw new ArgumentNullException(nameof(config));
Expand All @@ -52,6 +57,7 @@ public WebHostRpcWorkerChannelManager(IScriptEventManager eventManager,
_rpcWorkerChannelFactory = rpcWorkerChannelFactory;
_logger = loggerFactory.CreateLogger<WebHostRpcWorkerChannelManager>();
_applicationHostOptions = applicationHostOptions;
_hostingConfigOptions = hostingConfigOptions;

_shutdownStandbyWorkerChannels = ScheduleShutdownStandbyChannels;
_shutdownStandbyWorkerChannels = _shutdownStandbyWorkerChannels.Debounce(milliseconds: 5000);
Expand Down Expand Up @@ -228,9 +234,38 @@ public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId,
{
throw new ArgumentNullException(nameof(language));
}
if (_workerChannels.TryRemove(language, out Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> rpcWorkerChannels))

if (_hostingConfigOptions.Value.RevertWorkerShutdownBehaviour)
{
if (rpcWorkerChannels.TryGetValue(workerId, out TaskCompletionSource<IRpcWorkerChannel> value))
if (_workerChannels.TryRemove(language, out Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> rpcWorkerChannels))
{
if (rpcWorkerChannels.TryGetValue(workerId, out TaskCompletionSource<IRpcWorkerChannel> value))
{
value?.Task.ContinueWith(channelTask =>
{
if (channelTask.Status == TaskStatus.Faulted)
{
_logger.LogDebug(channelTask.Exception, "Removing errored worker channel");
}
else
{
IRpcWorkerChannel workerChannel = channelTask.Result;
if (workerChannel != null)
{
_logger.LogDebug("Disposing WebHost channel for workerId: {channelId}, for runtime:{language}", workerId, language);
workerChannel.TryFailExecutions(workerException);
(channelTask.Result as IDisposable)?.Dispose();
}
}
});
return Task.FromResult(true);
}
}
}
else
{
if (_workerChannels.TryGetValue(language, out Dictionary<string, TaskCompletionSource<IRpcWorkerChannel>> rpcWorkerChannels)
&& rpcWorkerChannels.TryRemove(workerId, out TaskCompletionSource<IRpcWorkerChannel> value))
{
value?.Task.ContinueWith(channelTask =>
{
Expand All @@ -252,6 +287,7 @@ public Task<bool> ShutdownChannelIfExistsAsync(string language, string workerId,
return Task.FromResult(true);
}
}

return Task.FromResult(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public FunctionsSyncManagerTests()
_mockEnvironment.Setup(p => p.GetEnvironmentVariable(EnvironmentSettingNames.AzureWebsiteArmCacheEnabled)).Returns("1");
_mockEnvironment.Setup(p => p.GetEnvironmentVariable(EnvironmentSettingNames.AzureWebsiteContainerReady)).Returns("1");
_mockEnvironment.Setup(p => p.GetEnvironmentVariable(EnvironmentSettingNames.CoreToolsEnvironment)).Returns((string)null);
_mockEnvironment.Setup(p => p.GetEnvironmentVariable(EnvironmentSettingNames.AppKind)).Returns((string)null);
_mockEnvironment.Setup(p => p.GetEnvironmentVariable(EnvironmentSettingNames.FunctionWorkerRuntime)).Returns("dotnet");
_mockEnvironment.Setup(p => p.GetEnvironmentVariable(EnvironmentSettingNames.WebSiteAuthEncryptionKey)).Returns("1");
_mockEnvironment.Setup(p => p.GetEnvironmentVariable(EnvironmentSettingNames.AzureWebsiteInstanceId)).Returns("1");
_mockEnvironment.Setup(p => p.GetEnvironmentVariable(EnvironmentSettingNames.AzureWebsiteHostName)).Returns(testHostName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Script.Config;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.WebJobs.Script.Tests;
using Xunit;

namespace Microsoft.Azure.WebJobs.Script.Tests.EndToEnd
{
[Trait(TestTraits.Category, TestTraits.EndToEnd)]
[Trait(TestTraits.Group, TestTraits.SamplesEndToEnd)]
public class SamplesEndToEndTests_Node_MultipleProcessesNoBundle : IClassFixture<SamplesEndToEndTests_Node_MultipleProcessesNoBundle.MultiplepleProcessesTestFixtureNoBundles>
{
private readonly ScriptSettingsManager _settingsManager;
private MultiplepleProcessesTestFixtureNoBundles _fixture;
private IEnumerable<int> _nodeProcessesBeforeTestStarted;

public SamplesEndToEndTests_Node_MultipleProcessesNoBundle(MultiplepleProcessesTestFixtureNoBundles fixture)
{
_fixture = fixture;
_nodeProcessesBeforeTestStarted = fixture.NodeProcessesBeforeTestStarted;
_settingsManager = ScriptSettingsManager.Instance;
}

[Fact(Skip = "https://github.com/Azure/azure-functions-host/issues")]
public async Task NodeProcessNoBundleConfigured_Different_AfterHostRestart()
{
await SamplesTestHelpers.InvokeAndValidateHttpTrigger(_fixture, "HttpTrigger");
IEnumerable<int> nodeProcessesBeforeHostRestart = Process.GetProcessesByName("node").Select(p => p.Id);
// Trigger a restart
await _fixture.Host.RestartAsync(CancellationToken.None);

await SamplesTestHelpers.InvokeAndValidateHttpTrigger(_fixture, "HttpTrigger");

// Wait for all the 3 process to start
await Task.Delay(TimeSpan.FromMinutes(1));

IEnumerable<int> nodeProcessesAfter = Process.GetProcessesByName("node").Select(p => p.Id);

// Verify node process is different after host restart
var result = nodeProcessesAfter.Where(pId1 => !nodeProcessesBeforeHostRestart.Any(pId2 => pId2 == pId1) && !_fixture.NodeProcessesBeforeTestStarted.Any(pId3 => pId3 == pId1));
Assert.Equal(3, result.Count());
}

public class MultiplepleProcessesTestFixtureNoBundles : EndToEndTestFixture
{
private IEnumerable<int> _nodeProcessesBeforeTestStarted;

public IEnumerable<int> NodeProcessesBeforeTestStarted => _nodeProcessesBeforeTestStarted;

static MultiplepleProcessesTestFixtureNoBundles()
{
}

public MultiplepleProcessesTestFixtureNoBundles()
: base(Path.Combine(Environment.CurrentDirectory, @"..", "..", "..", "..", "..", "sample", "NodeWithoutBundle"), "samples", RpcWorkerConstants.NodeLanguageWorkerName, 3)
{
_nodeProcessesBeforeTestStarted = Process.GetProcessesByName("node").Select(p => p.Id);
_nodeProcessesBeforeTestStarted = _nodeProcessesBeforeTestStarted ?? new List<int>();
}

public override void ConfigureScriptHost(IWebJobsBuilder webJobsBuilder)
{
base.ConfigureScriptHost(webJobsBuilder);
webJobsBuilder.Services.Configure<ScriptJobHostOptions>(o =>
{
o.Functions = new[]
{
"HttpTrigger"
};
});
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Script;
using Microsoft.Azure.WebJobs.Script.Description;
using Microsoft.Azure.WebJobs.Script.Diagnostics;
using Microsoft.Azure.WebJobs.Script.Grpc;
using Microsoft.Azure.WebJobs.Script.Tests;
Expand Down Expand Up @@ -99,9 +103,12 @@ public static IServiceCollection AddMockedSingleton<T>(IServiceCollection servic

private static IServiceCollection AddFunctionMetadataManager(this IServiceCollection services)
{
AddMockedSingleton<IWorkerFunctionMetadataProvider>(services);
var workerMetadataProvider = new Mock<IWorkerFunctionMetadataProvider>();
workerMetadataProvider.Setup(m => m.GetFunctionMetadataAsync(It.IsAny<IEnumerable<RpcWorkerConfig>>(), false)).Returns(Task.FromResult(new FunctionMetadataResult(true, ImmutableArray<FunctionMetadata>.Empty)));

services.AddSingleton<IHostFunctionMetadataProvider, HostFunctionMetadataProvider>();
services.AddSingleton<IFunctionMetadataProvider, FunctionMetadataProvider>();
services.AddSingleton<IWorkerFunctionMetadataProvider>(workerMetadataProvider.Object);

services.AddSingleton<IScriptHostManager>(s =>
{
Expand Down
Loading

0 comments on commit a0b23d3

Please sign in to comment.