Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Support direct actor call in Java worker #5504

Merged
merged 32 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3c78c68
Support direct actor call in Java worker
kfstorm Aug 20, 2019
9530f16
Skip some tests
kfstorm Aug 20, 2019
9fc1e24
Support pass large object by value to a direct call actor.
kfstorm Aug 22, 2019
05d5c9d
Merge remote-tracking branch 'origin/master' into java_actor_direct_call
kfstorm Aug 23, 2019
0c474b9
Support actor checkpointing for direct call
kfstorm Aug 23, 2019
5798637
Merge remote-tracking branch 'origin/master' into java_actor_direct_call
kfstorm Aug 23, 2019
7a5801b
Fix test case `testActorProcessDying`
kfstorm Aug 23, 2019
73bef52
Merge remote-tracking branch 'upstream/master' into java_actor_direct…
kfstorm Aug 23, 2019
748938d
Update ActorTest.java
kfstorm Aug 23, 2019
fe0ea89
Fix duplicated put in memory store provider
kfstorm Aug 24, 2019
82ca22b
Direct actor call subscribe to individual actor updates
kfstorm Aug 26, 2019
c0da338
Fix IsActorAlive
kfstorm Aug 26, 2019
779c49c
Skip testActorProcessDying because it hangs
kfstorm Aug 27, 2019
a1600bc
Add todos for skipped tests.
kfstorm Aug 28, 2019
3f78be0
Merge remote-tracking branch 'upstream/master' into java_actor_direct…
kfstorm Aug 28, 2019
1492600
Merge remote-tracking branch 'upstream/master' into java_actor_direct…
kfstorm Sep 6, 2019
1b7837f
Address comments
kfstorm Sep 6, 2019
fa2d4fd
Unskip testActorReconstruction
kfstorm Sep 6, 2019
f4d3e65
Merge remote-tracking branch 'linux/java_actor_direct_call' into java…
kfstorm Sep 6, 2019
a932190
Cleanup
kfstorm Sep 6, 2019
f5570c7
Add is_direct_call to actor creation task and ActorTableData
kfstorm Sep 6, 2019
130e667
Update comment
kfstorm Sep 6, 2019
3586456
skip some tests not relevant to direct call
kfstorm Sep 6, 2019
534efdf
Use IAlterSuiteListener
kfstorm Sep 7, 2019
9613575
Minor fix about missing a test
kfstorm Sep 7, 2019
e3d77b7
Rename is_direct_call to use_direct_call in Java
kfstorm Sep 7, 2019
ec33ab8
Address comments
kfstorm Sep 7, 2019
dc0a86f
Fix
kfstorm Sep 7, 2019
56428e2
Merge remote-tracking branch 'upstream/master' into java_actor_direct…
kfstorm Sep 7, 2019
80e64ac
Fix test
kfstorm Sep 7, 2019
5114af9
Merge remote-tracking branch 'upstream/master' into java_actor_direct…
kfstorm Sep 8, 2019
99837ac
Update ArgumentsBuilder
kfstorm Sep 8, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,33 @@ public class ActorCreationOptions extends BaseTaskOptions {

public static final int NO_RECONSTRUCTION = 0;
public static final int INFINITE_RECONSTRUCTIONS = (int) Math.pow(2, 30);
// DO NOT set this environment variable. It's only used for test purposes.
// Please use `setUseDirectCall` instead.
public static final boolean DEFAULT_USE_DIRECT_CALL = "1"
.equals(System.getenv("ACTOR_CREATION_OPTIONS_DEFAULT_USE_DIRECT_CALL"));

public final int maxReconstructions;

public final boolean useDirectCall;

public final String jvmOptions;

private ActorCreationOptions(Map<String, Double> resources,
int maxReconstructions,
String jvmOptions) {
private ActorCreationOptions(Map<String, Double> resources, int maxReconstructions,
boolean useDirectCall, String jvmOptions) {
super(resources);
this.maxReconstructions = maxReconstructions;
this.useDirectCall = useDirectCall;
this.jvmOptions = jvmOptions;
}

/**
* The inner class for building ActorCreationOptions.
* The inner class for building ActorCreationOptions.
*/
public static class Builder {

private Map<String, Double> resources = new HashMap<>();
private int maxReconstructions = NO_RECONSTRUCTION;
private boolean useDirectCall = DEFAULT_USE_DIRECT_CALL;
private String jvmOptions = null;

public Builder setResources(Map<String, Double> resources) {
Expand All @@ -42,13 +49,21 @@ public Builder setMaxReconstructions(int maxReconstructions) {
return this;
}

// Since direct call is not fully supported yet (see issue #5559),
// users are not allowed to set the option to true.
// TODO (kfstorm): uncomment when direct call is ready.
// public Builder setUseDirectCall(boolean useDirectCall) {
// this.useDirectCall = useDirectCall;
// return this;
// }

public Builder setJvmOptions(String jvmOptions) {
this.jvmOptions = jvmOptions;
return this;
}

public ActorCreationOptions createActorCreationOptions() {
return new ActorCreationOptions(resources, maxReconstructions, jvmOptions);
return new ActorCreationOptions(resources, maxReconstructions, useDirectCall, jvmOptions);
}
}

Expand Down
2 changes: 1 addition & 1 deletion java/dependencies.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def gen_java_deps():
"org.apache.commons:commons-lang3:3.4",
"org.ow2.asm:asm:6.0",
"org.slf4j:slf4j-log4j12:1.7.25",
"org.testng:testng:6.9.9",
"org.testng:testng:6.9.10",
"redis.clients:jedis:2.8.0",
],
repositories = [
Expand Down
2 changes: 1 addition & 1 deletion java/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.9.9</version>
<version>6.9.10</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
Expand Down
30 changes: 11 additions & 19 deletions java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
import org.ray.api.function.RayFunc;
import org.ray.api.function.RayFuncVoid;
import org.ray.api.id.ObjectId;
import org.ray.api.id.UniqueId;
import org.ray.api.options.ActorCreationOptions;
import org.ray.api.options.CallOptions;
import org.ray.api.runtime.RayRuntime;
import org.ray.api.runtimecontext.RuntimeContext;
import org.ray.runtime.actor.NativeRayActor;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.context.RuntimeContextImpl;
import org.ray.runtime.context.WorkerContext;
Expand All @@ -28,7 +28,6 @@
import org.ray.runtime.generated.Common.Language;
import org.ray.runtime.object.ObjectStore;
import org.ray.runtime.object.RayObjectImpl;
import org.ray.runtime.raylet.RayletClient;
import org.ray.runtime.task.ArgumentsBuilder;
import org.ray.runtime.task.FunctionArg;
import org.ray.runtime.task.TaskExecutor;
Expand All @@ -51,7 +50,6 @@ public abstract class AbstractRayRuntime implements RayRuntime {

protected ObjectStore objectStore;
protected TaskSubmitter taskSubmitter;
protected RayletClient rayletClient;
protected WorkerContext workerContext;

public AbstractRayRuntime(RayConfig rayConfig) {
Expand Down Expand Up @@ -85,15 +83,6 @@ public void free(List<ObjectId> objectIds, boolean localOnly, boolean deleteCrea
objectStore.delete(objectIds, localOnly, deleteCreatingTasks);
}

@Override
public void setResource(String resourceName, double capacity, UniqueId nodeId) {
Preconditions.checkArgument(Double.compare(capacity, 0) >= 0);
if (nodeId == null) {
nodeId = UniqueId.NIL;
}
rayletClient.setResource(resourceName, capacity, nodeId);
}

@Override
public <T> WaitResult<T> wait(List<RayObject<T>> waitList, int numReturns, int timeoutMs) {
return objectStore.wait(waitList, numReturns, timeoutMs);
Expand Down Expand Up @@ -176,7 +165,7 @@ public Callable wrapCallable(Callable callable) {

private RayObject callNormalFunction(FunctionDescriptor functionDescriptor,
Object[] args, int numReturns, CallOptions options) {
List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args);
List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args, /*isDirectCall*/false);
List<ObjectId> returnIds = taskSubmitter.submitTask(functionDescriptor,
functionArgs, numReturns, options);
Preconditions.checkState(returnIds.size() == numReturns && returnIds.size() <= 1);
Expand All @@ -189,7 +178,7 @@ private RayObject callNormalFunction(FunctionDescriptor functionDescriptor,

private RayObject callActorFunction(RayActor rayActor,
FunctionDescriptor functionDescriptor, Object[] args, int numReturns) {
List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args);
List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args, isDirectCall(rayActor));
List<ObjectId> returnIds = taskSubmitter.submitActorTask(rayActor,
functionDescriptor, functionArgs, numReturns, null);
Preconditions.checkState(returnIds.size() == numReturns && returnIds.size() <= 1);
Expand All @@ -202,14 +191,21 @@ private RayObject callActorFunction(RayActor rayActor,

private RayActor createActorImpl(FunctionDescriptor functionDescriptor,
Object[] args, ActorCreationOptions options) {
List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args);
List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args, /*isDirectCall*/false);
if (functionDescriptor.getLanguage() != Language.JAVA && options != null) {
Preconditions.checkState(Strings.isNullOrEmpty(options.jvmOptions));
}
RayActor actor = taskSubmitter.createActor(functionDescriptor, functionArgs, options);
return actor;
}

private boolean isDirectCall(RayActor rayActor) {
if (rayActor instanceof NativeRayActor) {
return ((NativeRayActor) rayActor).isDirectCallActor();
}
return false;
}

public WorkerContext getWorkerContext() {
return workerContext;
}
Expand All @@ -218,10 +214,6 @@ public ObjectStore getObjectStore() {
return objectStore;
}

public RayletClient getRayletClient() {
return rayletClient;
}

public FunctionManager getFunctionManager() {
return functionManager;
}
Expand Down
16 changes: 12 additions & 4 deletions java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,45 @@

import java.util.concurrent.atomic.AtomicInteger;
import org.ray.api.id.JobId;
import org.ray.api.id.UniqueId;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.context.LocalModeWorkerContext;
import org.ray.runtime.object.LocalModeObjectStore;
import org.ray.runtime.raylet.LocalModeRayletClient;
import org.ray.runtime.task.LocalModeTaskExecutor;
import org.ray.runtime.task.LocalModeTaskSubmitter;
import org.ray.runtime.task.TaskExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RayDevRuntime extends AbstractRayRuntime {

private static final Logger LOGGER = LoggerFactory.getLogger(RayDevRuntime.class);

private AtomicInteger jobCounter = new AtomicInteger(0);

public RayDevRuntime(RayConfig rayConfig) {
super(rayConfig);
if (rayConfig.getJobId().isNil()) {
rayConfig.setJobId(nextJobId());
}
taskExecutor = new TaskExecutor(this);
taskExecutor = new LocalModeTaskExecutor(this);
workerContext = new LocalModeWorkerContext(rayConfig.getJobId());
objectStore = new LocalModeObjectStore(workerContext);
taskSubmitter = new LocalModeTaskSubmitter(this, (LocalModeObjectStore) objectStore,
rayConfig.numberExecThreadsForDevRuntime);
((LocalModeObjectStore) objectStore).addObjectPutCallback(
objectId -> ((LocalModeTaskSubmitter) taskSubmitter).onObjectPut(objectId));
rayletClient = new LocalModeRayletClient();
}

@Override
public void shutdown() {
taskExecutor = null;
}

@Override
public void setResource(String resourceName, double capacity, UniqueId nodeId) {
LOGGER.error("Not implemented under SINGLE_PROCESS mode.");
}

private JobId nextJobId() {
return JobId.fromInt(jobCounter.getAndIncrement());
}
Expand Down
18 changes: 15 additions & 3 deletions java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.ray.api.id.JobId;
import org.ray.api.id.UniqueId;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.context.NativeWorkerContext;
import org.ray.runtime.gcs.GcsClient;
import org.ray.runtime.gcs.GcsClientOptions;
import org.ray.runtime.gcs.RedisClient;
import org.ray.runtime.generated.Common.WorkerType;
import org.ray.runtime.object.NativeObjectStore;
import org.ray.runtime.raylet.NativeRayletClient;
import org.ray.runtime.runner.RunManager;
import org.ray.runtime.task.NativeTaskExecutor;
import org.ray.runtime.task.NativeTaskSubmitter;
import org.ray.runtime.task.TaskExecutor;
import org.ray.runtime.util.FileUtil;
Expand Down Expand Up @@ -112,11 +113,10 @@ public RayNativeRuntime(RayConfig rayConfig) {
new GcsClientOptions(rayConfig));
Preconditions.checkState(nativeCoreWorkerPointer != 0);

taskExecutor = new TaskExecutor(this);
taskExecutor = new NativeTaskExecutor(nativeCoreWorkerPointer, this);
workerContext = new NativeWorkerContext(nativeCoreWorkerPointer);
objectStore = new NativeObjectStore(workerContext, nativeCoreWorkerPointer);
taskSubmitter = new NativeTaskSubmitter(nativeCoreWorkerPointer);
rayletClient = new NativeRayletClient(nativeCoreWorkerPointer);

// register
registerWorker();
Expand All @@ -136,6 +136,15 @@ public void shutdown() {
}
}

@Override
public void setResource(String resourceName, double capacity, UniqueId nodeId) {
Preconditions.checkArgument(Double.compare(capacity, 0) >= 0);
if (nodeId == null) {
nodeId = UniqueId.NIL;
}
nativeSetResource(nativeCoreWorkerPointer, resourceName, capacity, nodeId.getBytes());
}

public void run() {
nativeRunTaskExecutor(nativeCoreWorkerPointer, taskExecutor);
}
Expand Down Expand Up @@ -176,4 +185,7 @@ private static native void nativeRunTaskExecutor(long nativeCoreWorkerPointer,
private static native void nativeSetup(String logDir);

private static native void nativeShutdownHook();

private static native void nativeSetResource(long conn, String resourceName, double capacity,
byte[] nodeId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public Language getLanguage() {
return Language.forNumber(nativeGetLanguage(nativeActorHandle));
}

public boolean isDirectCallActor() {
return nativeIsDirectCallActor(nativeActorHandle);
}

@Override
public String getModuleName() {
Preconditions.checkState(getLanguage() == Language.PYTHON);
Expand Down Expand Up @@ -90,6 +94,8 @@ protected void finalize() {

private static native int nativeGetLanguage(long nativeActorHandle);

private static native boolean nativeIsDirectCallActor(long nativeActorHandle);

private static native List<String> nativeGetActorCreationTaskFunctionDescriptor(
long nativeActorHandle);

Expand Down

This file was deleted.

This file was deleted.

Loading