Skip to content

Commit

Permalink
Warmup java worker before specialization (#672)
Browse files Browse the repository at this point in the history
* add warm up logics

* update warm up request

* minor updates

* update logs

* add warm up jar

* add more logs for env testing

* remove test logs

* refractor code

* add warm up log

* add fix for java8 systemclassloader - add warmup funciton

* revert pipeline change - add logs

* restrucer warmup handler

* stop system classloader load java library in warmup process

* update warmup function

* update log messages

* adjust logs

* fix warmup jar

* refactor code

* update warm up jar

* style refactoring

* Rename to worker warmup and added capability

* Updating tests

---------

Co-authored-by: Shreyas Gopalakrishna <shreyasg@microsoft.com>
  • Loading branch information
kaibocai and shreyas-gopalakrishna authored Feb 2, 2023
1 parent 1483b11 commit f66b2fe
Show file tree
Hide file tree
Showing 16 changed files with 276 additions and 79 deletions.
Binary file added annotationLib/warmup-httptrigger.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ private void addHandlers() {
JavaFunctionBroker broker = new JavaFunctionBroker(classPathProvider);

this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_INIT_REQUEST, () -> new WorkerInitRequestHandler(broker));
this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_WARMUP_REQUEST, WorkerWarmupHandler::new);
this.handlerSuppliers.put(StreamingMessage.ContentCase.FUNCTION_ENVIRONMENT_RELOAD_REQUEST, () -> new FunctionEnvironmentReloadRequestHandler(broker));
this.handlerSuppliers.put(StreamingMessage.ContentCase.FUNCTION_LOAD_REQUEST, () -> new FunctionLoadRequestHandler(broker));
this.handlerSuppliers.put(StreamingMessage.ContentCase.INVOCATION_REQUEST, () -> new InvocationRequestHandler(broker));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ public static String getAnnotationName(Parameter parameter) {
String annotationName = null;

for (Annotation annotation : annotations) {

//Checking if it's warmup function, warmup function has its own HttpTrigger class defined in the function jar file.
//If it's not warmup function will bypass this check and fail back to normal logics.
if (annotation.annotationType().getName().equals("com.microsoft.azure.functions.warmup.java.HttpTrigger")){
annotationName = getBindingNameFromAnnotation(annotation);
return annotationName;
}

if (annotation.toString().contains("com.microsoft.azure.functions.annotation")) {
annotationName = getBindingNameFromAnnotation(annotation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private void addSearchPathsToClassLoader(FunctionMethodDescriptor function) thro
if(function.getLibDirectory().isPresent()) {
registerWithClassLoaderProvider(function.getLibDirectory().get());
}else{
registerJavaLibrary();
registerJavaLibrary(function.isWarmup());
}
}

Expand All @@ -194,9 +194,9 @@ void registerWithClassLoaderProvider(File libDirectory) {
}
}

void registerJavaLibrary(){
void registerJavaLibrary(boolean isWarmup){
try {
if (!isTesting()){
if (!isTesting() && !isWarmup){
addJavaAnnotationLibrary();
}
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,23 @@
import org.apache.commons.lang3.*;

public class FunctionMethodDescriptor {
public FunctionMethodDescriptor(String id, String name, String fullMethodName, String jarPath) {
private File parentDirectory;
private File jarDirectory;
private MethodInfo methodInfo;

private final String id;
private final String jarPath;
private final String name;
private final String fullMethodName;
private final boolean isWarmup;

public FunctionMethodDescriptor(String id, String name, String fullMethodName, String jarPath, boolean isWarmup) {
this.id = id;
this.name = name;
this.fullMethodName = fullMethodName;
this.methodInfo = new MethodInfo(fullMethodName);
this.jarPath = StringUtils.trim(jarPath);
this.isWarmup = isWarmup;
}

/**
Expand Down Expand Up @@ -141,15 +152,10 @@ void guardAgainstUnqualifiedJarPath() {
throw new IllegalArgumentException("\"" + jarPath + "\" is not a qualified JAR file name");
}
}

private File parentDirectory;
private File jarDirectory;
private MethodInfo methodInfo;

private final String id;
private final String jarPath;
private final String name;
private final String fullMethodName;

public boolean isWarmup() {
return isWarmup;
}

/*
* "struct" to track the info on the function method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,17 @@ public FunctionEnvironmentReloadRequestHandler(JavaFunctionBroker broker) {
this.broker = broker;
}

public Map<String, String> EnvironmentVariables = new HashMap<>();
public Map<String, String> environmentVariables = new HashMap<>();

@Override
String execute(FunctionEnvironmentReloadRequest request, Builder response) throws Exception {
WorkerLogManager.getSystemLogger().log(Level.INFO, "FunctionEnvironmentReloadRequest received by the Java worker");
EnvironmentVariables = request.getEnvironmentVariablesMap();
if (EnvironmentVariables == null || EnvironmentVariables.isEmpty()) {
return String
.format("Ignoring FunctionEnvironmentReloadRequest as newSettings map is either empty or null");
environmentVariables = request.getEnvironmentVariablesMap();
if (environmentVariables.isEmpty()) {
return "Ignoring FunctionEnvironmentReloadRequest as newSettings map is empty.";
}
setEnv(EnvironmentVariables);
return String.format("FunctionEnvironmentReloadRequest completed");
setEnv(environmentVariables);
return "FunctionEnvironmentReloadRequest completed";
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,28 @@


public class FunctionLoadRequestHandler extends MessageHandler<FunctionLoadRequest, FunctionLoadResponse.Builder> {

private final JavaFunctionBroker broker;
private boolean isWarmup;

public FunctionLoadRequestHandler(JavaFunctionBroker broker) {
super(StreamingMessage::getFunctionLoadRequest,
FunctionLoadResponse::newBuilder,
FunctionLoadResponse.Builder::setResult,
StreamingMessage.Builder::setFunctionLoadResponse);

this.broker = broker;
}

public FunctionLoadRequestHandler(JavaFunctionBroker broker, boolean isWarmup){
this(broker);
this.isWarmup = isWarmup;
}

@Override
String execute(FunctionLoadRequest request, FunctionLoadResponse.Builder response) throws Exception {
WorkerLogManager.getSystemLogger().log(Level.INFO, "FunctionLoadRequest received by the Java worker, Java version - " + Util.getJavaVersion());
final RpcFunctionMetadata metadata = request.getMetadata();
final FunctionMethodDescriptor descriptor = createFunctionDescriptor(request.getFunctionId(), metadata);
final FunctionMethodDescriptor descriptor = createFunctionDescriptor(request.getFunctionId(), metadata, this.isWarmup);

final Map<String, BindingInfo> bindings = metadata.getBindingsMap();

Expand All @@ -38,9 +46,8 @@ String execute(FunctionLoadRequest request, FunctionLoadResponse.Builder respons
descriptor.getFullMethodName());
}

FunctionMethodDescriptor createFunctionDescriptor(String functionId, RpcFunctionMetadata metadata) {
return new FunctionMethodDescriptor(functionId, metadata.getName(), metadata.getEntryPoint(), metadata.getScriptFile());
FunctionMethodDescriptor createFunctionDescriptor(String functionId, RpcFunctionMetadata metadata, boolean isWarmup) {
return new FunctionMethodDescriptor(functionId, metadata.getName(), metadata.getEntryPoint(), metadata.getScriptFile(), isWarmup);
}

private final JavaFunctionBroker broker;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public RpcLogHandler(LogRecord record, String invocationId) {
private static RpcLog.Builder generateRpcLog(LogRecord record, String invocationId) {
RpcLog.Builder log = RpcLog.newBuilder();
/**
* Check if the logging namespace belongs to system logsq, invocation log should be categorized to user type (default), others should
* Check if the logging namespace belongs to system logs, invocation log should be categorized to user type (default), others should
* be categorized to system type.
*
* local_console customer_app_insight functions_kusto_table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ String execute(WorkerInitRequest request, WorkerInitResponse.Builder response) {
response.putCapabilities("RpcHttpBodyOnly", "RpcHttpBodyOnly");
response.putCapabilities("RpcHttpTriggerMetadataRemoved", "RpcHttpTriggerMetadataRemoved");
response.putCapabilities("HandlesWorkerTerminateMessage", "HandlesWorkerTerminateMessage");
response.putCapabilities("HandlesWorkerWarmupMessage", "HandlesWorkerWarmupMessage");
response.setWorkerMetadata(composeWorkerMetadata());
return "Worker initialized";
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.microsoft.azure.functions.worker.handler;

import com.microsoft.azure.functions.rpc.messages.*;
import com.microsoft.azure.functions.worker.WorkerLogManager;
import com.microsoft.azure.functions.worker.broker.JavaFunctionBroker;
import com.microsoft.azure.functions.worker.reflect.FactoryClassLoader;

import java.util.*;

import static com.microsoft.azure.functions.worker.Constants.JAVA_LIBRARY_DIRECTORY;


public class WorkerWarmupHandler extends MessageHandler<WorkerWarmupRequest, WorkerWarmupResponse.Builder> {

private static final String WARM_UP_FUNCTION_NAME = "WarmupFunc";
private static final String WARM_UP_FUNCTION_ENTRY_POINT = "com.microsoft.azure.functions.warmup.java.Function.run";
private static final String WARM_UP_FUNCTION_SCRIPT_FILE = JAVA_LIBRARY_DIRECTORY + "/warmup-httptrigger.jar";
private final JavaFunctionBroker javaFunctionBroker = new JavaFunctionBroker(new FactoryClassLoader().createClassLoaderProvider());

public WorkerWarmupHandler() {
super(StreamingMessage::getWorkerWarmupRequest,
WorkerWarmupResponse::newBuilder,
WorkerWarmupResponse.Builder::setResult,
StreamingMessage.Builder::setWorkerWarmupResponse);
}

@Override
String execute(WorkerWarmupRequest workerWarmupRequest, WorkerWarmupResponse.Builder builder) {
try {
WorkerLogManager.getSystemLogger().info("azure function java worker warm up start.");
this.javaFunctionBroker.setWorkerDirectory(workerWarmupRequest.getWorkerDirectory());
warmupFunctionEnvironmentReload();
UUID functionId = warmupFunctionLoad(workerWarmupRequest);
warmupInvocation(functionId);
WorkerLogManager.getSystemLogger().info("azure function java worker warm up completed successfully.");
} catch (Exception e) {
WorkerLogManager.getSystemLogger().severe("warm up process failed with exception: " + e.getMessage());
throw new RuntimeException(e);
}
return "azure function java worker warm up completed";
}

private void warmupFunctionEnvironmentReload() throws Exception {
FunctionEnvironmentReloadRequest functionEnvironmentReloadRequest = FunctionEnvironmentReloadRequest.newBuilder()
.putAllEnvironmentVariables(System.getenv())
.build();
new FunctionEnvironmentReloadRequestHandler(this.javaFunctionBroker).execute(functionEnvironmentReloadRequest, null);
WorkerLogManager.getSystemLogger().info("finish warm up FunctionEnvironmentReloadRequestHandler");
}

private UUID warmupFunctionLoad(WorkerWarmupRequest workerWarmupRequest) throws Exception {
Map<String, BindingInfo> map = new HashMap<>();
BindingInfo httpTrigger = BindingInfo.newBuilder().setDirection(BindingInfo.Direction.in).setDataType(BindingInfo.DataType.undefined).setType("httpTrigger").build();
map.put("req", httpTrigger);
BindingInfo http = BindingInfo.newBuilder().setDirection(BindingInfo.Direction.out).setDataType(BindingInfo.DataType.undefined).setType("http").build();
map.put("$return", http);
RpcFunctionMetadata rpcFunctionMetadata = RpcFunctionMetadata.newBuilder()
.setName(WARM_UP_FUNCTION_NAME)
.setEntryPoint(WARM_UP_FUNCTION_ENTRY_POINT)
.setScriptFile(workerWarmupRequest.getWorkerDirectory() + WARM_UP_FUNCTION_SCRIPT_FILE)
.putAllBindings(map)
.build();
final UUID functionId = UUID.randomUUID();
FunctionLoadRequest functionLoadRequest = FunctionLoadRequest.newBuilder()
.setFunctionId(functionId.toString())
.setMetadata(rpcFunctionMetadata)
.build();
String loadRequestResult = new FunctionLoadRequestHandler(this.javaFunctionBroker, true).execute(functionLoadRequest, FunctionLoadResponse.newBuilder());
WorkerLogManager.getSystemLogger().info("finish warm up FunctionLoadRequestHandler with result: " + loadRequestResult);
return functionId;
}

private void warmupInvocation(UUID functionId) throws Exception {
List<ParameterBinding> inputDataList = new ArrayList<>();
ParameterBinding parameterBinding = ParameterBinding.newBuilder()
.setName("req")
.setData(TypedData.newBuilder().setHttp(RpcHttp.newBuilder().setMethod("GET")))
.build();
inputDataList.add(parameterBinding);
InvocationRequest invocationRequest = InvocationRequest.newBuilder()
.setFunctionId(functionId.toString())
.setInvocationId(UUID.randomUUID().toString())
.addAllInputData(inputDataList)
.build();
String invocationResult = new InvocationRequestHandler(this.javaFunctionBroker).execute(invocationRequest, InvocationResponse.newBuilder());
WorkerLogManager.getSystemLogger().info("finish warm up InvocationRequestHandler with result: " + invocationResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.microsoft.azure.functions.worker.*;

public class EnhancedClassLoaderProvider implements ClassLoaderProvider {

public EnhancedClassLoaderProvider() {
customerUrls = Collections.newSetFromMap(new ConcurrentHashMap<URL, Boolean>());
workerUrls = Collections.newSetFromMap(new ConcurrentHashMap<URL, Boolean>());
Expand Down Expand Up @@ -51,8 +52,9 @@ public void addWorkerUrl(URL url) throws IOException {
WorkerLogManager.getSystemLogger().info("Loading worker file URL: " + url);
workerUrls.add(url);
}

private final Set<URL> customerUrls;
private final Set<URL> workerUrls;
private final Object lock = new Object();
private static volatile URLClassLoader classLoaderInstance;
private volatile URLClassLoader classLoaderInstance;
}
Loading

0 comments on commit f66b2fe

Please sign in to comment.