From ff3bf300920f547c54fa9327ec0fbed065dc8876 Mon Sep 17 00:00:00 2001 From: cbqiao Date: Tue, 7 Jan 2025 21:18:13 +0800 Subject: [PATCH] [ISSUE-409] support submit job to ray cluster --- .../geaflow/env/EnvironmentFactory.java | 8 +- .../antgroup/geaflow/env/IEnvironment.java | 4 +- .../AbstractClusterManager.java | 1 + .../failover/FoStrategyFactoryTest.java | 2 +- .../geaflow-deploy/geaflow-assembly/pom.xml | 2 +- .../cluster/ray/utils/RaySystemFunc.java | 157 ------------------ .../pom.xml | 4 +- .../cluster/ray/client/RayClusterClient.java | 0 .../cluster/ray/client/RayEnvironment.java | 4 +- .../cluster/ray/clustermanager/RayClient.java | 6 +- .../ray/clustermanager/RayClusterId.java | 0 .../ray/clustermanager/RayClusterManager.java | 3 +- .../geaflow/cluster/ray/config/RayConfig.java | 0 .../ray/context/RayContainerContext.java | 0 .../cluster/ray/context/RayDriverContext.java | 0 .../ray/entrypoint/RayContainerRunner.java | 0 .../ray/entrypoint/RayDriverRunner.java | 0 .../ray/entrypoint/RayMasterRunner.java | 0 .../ray/entrypoint/RaySupervisorRunner.java | 0 .../failover/RayClusterFailoverStrategy.java | 2 +- .../RayComponentFailoverStrategy.java | 2 +- .../failover/RayDisableFailoverStrategy.java | 2 +- .../cluster/ray/utils/RaySystemFunc.java | 77 +++++++++ ...geaflow.cluster.failover.IFailoverStrategy | 0 .../com.antgroup.geaflow.env.IEnvironment | 0 .../geaflow/cluster/ray/RayJobTest.java | 73 ++++++++ .../failover/RayClusterFoStrategyTest.java | 6 +- .../failover/RayDisableFoStrategyTest.java | 4 +- geaflow/geaflow-deploy/pom.xml | 2 +- .../dsl/runtime/engine/GeaFlowGqlClient.java | 4 +- .../example/stream/UnBoundedStreamFoTest.java | 2 +- .../stream/UnBoundedStreamWordPrint.java | 2 +- .../example/stream/WindowStreamWordCount.java | 2 +- .../geaflow/example/util/EnvironmentUtil.java | 4 +- geaflow/pom.xml | 2 +- 35 files changed, 184 insertions(+), 191 deletions(-) delete mode 100644 geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/pom.xml (94%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayClusterClient.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayEnvironment.java (91%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClient.java (94%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterId.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterManager.java (98%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/config/RayConfig.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayContainerContext.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayDriverContext.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayContainerRunner.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayDriverRunner.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayMasterRunner.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RaySupervisorRunner.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFailoverStrategy.java (96%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayComponentFailoverStrategy.java (96%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java (96%) create mode 100644 geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/resources/META-INF/services/com.antgroup.geaflow.cluster.failover.IFailoverStrategy (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/main/resources/META-INF/services/com.antgroup.geaflow.env.IEnvironment (100%) create mode 100644 geaflow/geaflow-deploy/geaflow-on-ray/src/test/java/com/antgroup/geaflow/cluster/ray/RayJobTest.java rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFoStrategyTest.java (89%) rename geaflow/geaflow-deploy/{geaflow-on-ray-community => geaflow-on-ray}/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFoStrategyTest.java (90%) diff --git a/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/EnvironmentFactory.java b/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/EnvironmentFactory.java index dab383b2..fec3cbde 100644 --- a/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/EnvironmentFactory.java +++ b/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/EnvironmentFactory.java @@ -60,12 +60,12 @@ private static Environment onLocalEnvironment(Map config) { return environment; } - public static Environment onRayCommunityEnvironment() { - return (Environment) loadEnvironment(EnvType.RAY_COMMUNITY); + public static Environment onRayEnvironment() { + return (Environment) loadEnvironment(EnvType.RAY); } - public static Environment onRayCommunityEnvironment(String[] args) { - Environment environment = (Environment) loadEnvironment(EnvType.RAY_COMMUNITY); + public static Environment onRayEnvironment(String[] args) { + Environment environment = (Environment) loadEnvironment(EnvType.RAY); IEnvironmentArgsParser argsParser = loadEnvironmentArgsParser(); environment.getEnvironmentContext().withConfig(argsParser.parse(args)); return environment; diff --git a/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/IEnvironment.java b/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/IEnvironment.java index d4b1d979..9cadbc4b 100644 --- a/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/IEnvironment.java +++ b/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/IEnvironment.java @@ -42,9 +42,9 @@ public interface IEnvironment extends Serializable { enum EnvType { /** - * Community ray cluster. + * Ray cluster. */ - RAY_COMMUNITY, + RAY, /** * K8s cluster. diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java index 5c1ef695..ee276fcd 100644 --- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java +++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java @@ -103,6 +103,7 @@ protected void startContainers(int containerNum) { @Override public Map startDrivers() { int driverNum = clusterConfig.getDriverNum(); + LOGGER.info("start driver number: {}", driverNum); if (!clusterContext.isRecover()) { Map driverIds = new HashMap<>(); for (int driverIndex = 0; driverIndex < driverNum; driverIndex++) { diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/failover/FoStrategyFactoryTest.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/failover/FoStrategyFactoryTest.java index 89333f08..6eef1bdd 100644 --- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/failover/FoStrategyFactoryTest.java +++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/failover/FoStrategyFactoryTest.java @@ -22,7 +22,7 @@ public class FoStrategyFactoryTest { @Test(expectedExceptions = GeaflowRuntimeException.class) public void testLoad() { - FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY_COMMUNITY, ""); + FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY, ""); } } diff --git a/geaflow/geaflow-deploy/geaflow-assembly/pom.xml b/geaflow/geaflow-deploy/geaflow-assembly/pom.xml index 77bdb9ff..d8f0f38b 100644 --- a/geaflow/geaflow-deploy/geaflow-assembly/pom.xml +++ b/geaflow/geaflow-deploy/geaflow-assembly/pom.xml @@ -75,7 +75,7 @@ com.antgroup.tugraph - geaflow-on-ray-community + geaflow-on-ray com.antgroup.tugraph diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java deleted file mode 100644 index ed390973..00000000 --- a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright 2023 AntGroup CO., Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package com.antgroup.geaflow.cluster.ray.utils; - -import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.JOB_WORK_PATH; -import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.RUN_LOCAL_MODE; - -import com.antgroup.geaflow.cluster.config.ClusterConfig; -import com.antgroup.geaflow.cluster.ray.config.RayConfig; -import com.antgroup.geaflow.common.config.Configuration; -import com.antgroup.geaflow.utils.math.MathUtil; -import io.ray.api.Ray; -import io.ray.runtime.config.RunMode; -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.util.LinkedHashMap; -import java.util.Map; -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RaySystemFunc implements Serializable { - - private static final Logger LOGGER = LoggerFactory.getLogger(RaySystemFunc.class); - - - private static final Object LOCK = new Object(); - private static String appPath; - - public static boolean isRestarted() { - return Ray.getRuntimeContext().wasCurrentActorRestarted(); - } - - public static boolean isLocalMode() { - return Ray.getRuntimeContext().isLocalMode(); - } - - public static String getWorkPath() { - if (appPath != null) { - return appPath; - } - synchronized (LOCK) { - if (Ray.getRuntimeContext().isLocalMode()) { - appPath = "/tmp/" + System.currentTimeMillis(); - try { - FileUtils.forceMkdir(new File(appPath)); - } catch (IOException e) { - LOGGER.error(e.getMessage(), e); - } - } else { - appPath = Ray.getRuntimeContext().getCurrentRuntimeEnv() - .get(RayConfig.RAY_JOB_WORKING_DIR, String.class); - } - } - return appPath; - } - - public static void initRayEnv(ClusterConfig clusterConfig) { - LOGGER.info("clusterConfig:{}", clusterConfig); - Configuration config = clusterConfig.getConfig(); - - Map systemProperties = new LinkedHashMap<>(); - boolean isLocal = config.getBoolean(RUN_LOCAL_MODE); - if (isLocal) { - systemProperties.put(RayConfig.RAY_RUN_MODE, RunMode.LOCAL.name()); - } else { - systemProperties.put(RayConfig.RAY_RUN_MODE, RunMode.CLUSTER.name()); - } - - // Sets how many actor threads can be started by a jvm process. - int containerMemoryMb = clusterConfig.getContainerMemoryMB(); - // The amount of memory used by a jvm process, both in and out of the heap, must be a factor of 50. - systemProperties.put(RayConfig.RAY_JOB_JAVA_WORKER_PROCESS_DEFAULT_MEMORY_MB, - String.valueOf(MathUtil.multiplesOf50(containerMemoryMb))); - - // To set how many jvm processes are started there, geaflow defaults to 0. - systemProperties.put(RayConfig.RAY_JOB_NUM_INITIAL_JAVA_WORKER_PROCESS, "0"); - - // Set all resources required for this job - // Includes all node memory, as well as additional master and driver memory for the engine, - // and reserves additional memory for normal tasks - // totalMemory can be set to a larger value. ray will report an error if more memory is used than VC. - int totalDriverMemoryMb = clusterConfig.getDriverMemoryMB() * clusterConfig.getDriverNum(); - - int totalMasterMemoryMb = clusterConfig.getMasterMemoryMB(); - - int totalMemoryMb = - containerMemoryMb * clusterConfig.getContainerNum() + totalDriverMemoryMb - + totalMasterMemoryMb + RayConfig.CLUSTER_RESERVED_MEMORY_MB; - systemProperties.put(RayConfig.RAY_JOB_TOTAL_MEMORY_MB, - String.valueOf(MathUtil.multiplesOf50(totalMemoryMb))); - - // Set the jvm process heap memory ratio - // If clusterConfig is empty, set the default Xmx percentage of total memory - // Otherwise, do not set this ratio, but set each jvm parameter separately in setting jvm parameters later - systemProperties.put(RayConfig.RAY_JOB_JAVA_HEAP_FRACTION, "0.8"); - - systemProperties.put(RayConfig.RAY_TASK_RETURN_TASK_EXCEPTION, Boolean.FALSE.toString()); - - - // Set the JVM parameters below - int optionIndex = 0; - while (System.getProperty( - String.format("%s.%d", RayConfig.RAY_JOB_JVM_OPTIONS_PREFIX, optionIndex)) != null) { - optionIndex++; - } - - // jvm parameter - if (clusterConfig.getContainerJvmOptions() != null) { - for (String option : clusterConfig.getContainerJvmOptions().getJvmOptions()) { - systemProperties.put(String.format("%s.%d", RayConfig.RAY_JOB_JVM_OPTIONS_PREFIX, optionIndex++), - option); - } - } - - // Set the user log file configuration as follows - systemProperties - .put(String.format("%s.%d", RayConfig.RAY_JOB_JVM_OPTIONS_PREFIX, optionIndex++), String - .format("-D%s=%s", RayConfig.RAY_CUSTOM_LOGGER0_NAME, - RayConfig.CUSTOM_LOGGER_NAME)); - - systemProperties - .put(String.format("%s.%d", RayConfig.RAY_JOB_JVM_OPTIONS_PREFIX, optionIndex++), String - .format("-D%s=%s", RayConfig.RAY_CUSTOM_LOGGER0_FILE_NAME, - RayConfig.CUSTOM_LOGGER_FILE_NAME)); - - systemProperties - .put(String.format("%s.%d", RayConfig.RAY_JOB_JVM_OPTIONS_PREFIX, optionIndex++), String - .format("-D%s=%s", RayConfig.RAY_CUSTOM_LOGGER0_PATTERN, - RayConfig.CUSTOM_LOGGER_PATTERN)); - - LOGGER.info("set system property: {}", systemProperties); - - for (Map.Entry entry : systemProperties.entrySet()) { - System.setProperty(entry.getKey(), entry.getValue()); - } - - // Set working dir. - System.setProperty( - String.format("%s.%s", RayConfig.RAY_JOB_RUNTIME_ENV, RayConfig.RAY_JOB_WORKING_DIR), - config.getString(JOB_WORK_PATH)); - Ray.init(); - } -} diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/pom.xml b/geaflow/geaflow-deploy/geaflow-on-ray/pom.xml similarity index 94% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/pom.xml rename to geaflow/geaflow-deploy/geaflow-on-ray/pom.xml index 922f6971..63c86565 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray-community/pom.xml +++ b/geaflow/geaflow-deploy/geaflow-on-ray/pom.xml @@ -24,11 +24,11 @@ 4.0.0 - geaflow-on-ray-community + geaflow-on-ray jar - 2.4.0 + 2.39.0 diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayClusterClient.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayClusterClient.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayClusterClient.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayClusterClient.java diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayEnvironment.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayEnvironment.java similarity index 91% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayEnvironment.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayEnvironment.java index a04b4148..04cbbc46 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayEnvironment.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayEnvironment.java @@ -25,7 +25,7 @@ public class RayEnvironment extends AbstractEnvironment { public RayEnvironment() { context.getConfig().put(LOG_DIR, RAY_LOG_DIR); - context.getConfig().put(SUPERVISOR_ENABLE, Boolean.TRUE.toString()); + context.getConfig().put(SUPERVISOR_ENABLE, Boolean.FALSE.toString()); } @Override @@ -35,6 +35,6 @@ protected IClusterClient getClusterClient() { @Override public EnvType getEnvType() { - return EnvType.RAY_COMMUNITY; + return EnvType.RAY; } } diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClient.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClient.java similarity index 94% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClient.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClient.java index b82a024f..2dc07152 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClient.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClient.java @@ -45,7 +45,7 @@ public static ActorHandle createMaster(ClusterConfig clusterCon .actor(RayMasterRunner::new, clusterConfig.getConfig()) .setMaxRestarts(clusterConfig.getMaxRestarts()) .setLifetime(ActorLifetime.DETACHED) - .setJvmOptions(jvmOptions).remote(); + .remote(); LOGGER.info("master actor:{}, memoryMB:{}, jvmOptions:{}, foRestartTimes:{}", masterRayActor.getId().toString(), totalMemoryMb, jvmOptions, clusterConfig.getMaxRestarts()); @@ -68,7 +68,7 @@ public static ActorHandle createDriver(ClusterConfig clusterCon .actor(RayDriverRunner::new, context) .setMaxRestarts(clusterConfig.getMaxRestarts()) .setLifetime(ActorLifetime.DETACHED) - .setJvmOptions(jvmOptions).remote(); + .remote(); LOGGER.info("driver actor:{}, memoryMB:{}, jvmOptions:{}, foRestartTimes:{}", driverRayActor.getId().toString(), totalMemoryMb, jvmOptions, clusterConfig.getMaxRestarts()); @@ -81,7 +81,6 @@ public static ActorHandle createContainer(ClusterConfig clus .actor(RayContainerRunner::new, containerContext) .setMaxRestarts(clusterConfig.getMaxRestarts()) .setLifetime(ActorLifetime.DETACHED) - .setJvmOptions(clusterConfig.getContainerJvmOptions().getJvmOptions()) .remote(); LOGGER.info("worker actor {} maxRestarts {}", rayContainer.getId().toString(), clusterConfig.getMaxRestarts()); @@ -94,7 +93,6 @@ public static ActorHandle createSupervisor(ClusterConfig cl .actor(RaySupervisorRunner::new, clusterConfig.getConfig(), envs) .setMaxRestarts(clusterConfig.getMaxRestarts()) .setLifetime(ActorLifetime.DETACHED) - .setJvmOptions(clusterConfig.getSupervisorJvmOptions().getJvmOptions()) .remote(); LOGGER.info("supervisor actor {} maxRestarts {}", rayContainer.getId().toString(), clusterConfig.getMaxRestarts()); diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterId.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterId.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterId.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterId.java diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterManager.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterManager.java similarity index 98% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterManager.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterManager.java index 9b449f3b..fd05dfea 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterManager.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterManager.java @@ -54,7 +54,7 @@ public class RayClusterManager extends GeaFlowClusterManager { private String currentJobId; public RayClusterManager() { - super(EnvType.RAY_COMMUNITY); + super(EnvType.RAY); } @Override @@ -98,6 +98,7 @@ public void createNewContainer(int containerId, boolean isRecover) { @Override public void createNewDriver(int driverId, int driverIndex) { + LOGGER.info("create driver start, enable supervisor:{}", enableSupervisor); if (enableSupervisor) { String logFile = String.format("driver-%s-%s.log", currentJobId, driverId); String command = getDriverShellCommand(driverId, driverIndex, classpath, logFile); diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/config/RayConfig.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/config/RayConfig.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/config/RayConfig.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/config/RayConfig.java diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayContainerContext.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayContainerContext.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayContainerContext.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayContainerContext.java diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayDriverContext.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayDriverContext.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayDriverContext.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayDriverContext.java diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayContainerRunner.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayContainerRunner.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayContainerRunner.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayContainerRunner.java diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayDriverRunner.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayDriverRunner.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayDriverRunner.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayDriverRunner.java diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayMasterRunner.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayMasterRunner.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayMasterRunner.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayMasterRunner.java diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RaySupervisorRunner.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RaySupervisorRunner.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RaySupervisorRunner.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RaySupervisorRunner.java diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFailoverStrategy.java similarity index 96% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFailoverStrategy.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFailoverStrategy.java index 6f09942b..9123ff57 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFailoverStrategy.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFailoverStrategy.java @@ -22,7 +22,7 @@ public class RayClusterFailoverStrategy extends ClusterFailoverStrategy { public RayClusterFailoverStrategy() { - super(EnvType.RAY_COMMUNITY); + super(EnvType.RAY); } @Override diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayComponentFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayComponentFailoverStrategy.java similarity index 96% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayComponentFailoverStrategy.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayComponentFailoverStrategy.java index 3d5a6c69..61e8f8c6 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayComponentFailoverStrategy.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayComponentFailoverStrategy.java @@ -22,7 +22,7 @@ public class RayComponentFailoverStrategy extends ComponentFailoverStrategy { public RayComponentFailoverStrategy() { - super(EnvType.RAY_COMMUNITY); + super(EnvType.RAY); } @Override diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java similarity index 96% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java index 4236fae2..bc43afe1 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java @@ -22,7 +22,7 @@ public class RayDisableFailoverStrategy extends DisableFailoverStrategy { public RayDisableFailoverStrategy() { - super(EnvType.RAY_COMMUNITY); + super(EnvType.RAY); } @Override diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java new file mode 100644 index 00000000..c9e0227d --- /dev/null +++ b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java @@ -0,0 +1,77 @@ +/* + * Copyright 2023 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package com.antgroup.geaflow.cluster.ray.utils; + +import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.JOB_WORK_PATH; + +import com.antgroup.geaflow.cluster.config.ClusterConfig; +import com.antgroup.geaflow.cluster.ray.config.RayConfig; +import com.antgroup.geaflow.common.config.Configuration; +import io.ray.api.Ray; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RaySystemFunc implements Serializable { + + private static final long serialVersionUID = -3708025618479190982L; + + private static final Logger LOGGER = LoggerFactory.getLogger(RaySystemFunc.class); + + private static final Object LOCK = new Object(); + private static String appPath; + + public static boolean isRestarted() { + return Ray.getRuntimeContext().wasCurrentActorRestarted(); + } + + public static boolean isLocalMode() { + return Ray.getRuntimeContext().isLocalMode(); + } + + public static String getWorkPath() { + if (appPath != null) { + return appPath; + } + synchronized (LOCK) { + if (Ray.getRuntimeContext().isLocalMode()) { + appPath = "/tmp/" + System.currentTimeMillis(); + try { + FileUtils.forceMkdir(new File(appPath)); + } catch (IOException e) { + LOGGER.error(e.getMessage(), e); + } + } else { + appPath = Ray.getRuntimeContext().getCurrentRuntimeEnv() + .get(RayConfig.RAY_JOB_WORKING_DIR, String.class); + } + } + return appPath; + } + + public static void initRayEnv(ClusterConfig clusterConfig) { + LOGGER.info("clusterConfig:{}", clusterConfig); + Configuration config = clusterConfig.getConfig(); + + // Set working dir. + System.setProperty( + String.format("%s.%s", RayConfig.RAY_JOB_RUNTIME_ENV, RayConfig.RAY_JOB_WORKING_DIR), + config.getString(JOB_WORK_PATH)); + Ray.init(); + } +} diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/resources/META-INF/services/com.antgroup.geaflow.cluster.failover.IFailoverStrategy b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/resources/META-INF/services/com.antgroup.geaflow.cluster.failover.IFailoverStrategy similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/resources/META-INF/services/com.antgroup.geaflow.cluster.failover.IFailoverStrategy rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/resources/META-INF/services/com.antgroup.geaflow.cluster.failover.IFailoverStrategy diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/resources/META-INF/services/com.antgroup.geaflow.env.IEnvironment b/geaflow/geaflow-deploy/geaflow-on-ray/src/main/resources/META-INF/services/com.antgroup.geaflow.env.IEnvironment similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/resources/META-INF/services/com.antgroup.geaflow.env.IEnvironment rename to geaflow/geaflow-deploy/geaflow-on-ray/src/main/resources/META-INF/services/com.antgroup.geaflow.env.IEnvironment diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/test/java/com/antgroup/geaflow/cluster/ray/RayJobTest.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/test/java/com/antgroup/geaflow/cluster/ray/RayJobTest.java new file mode 100644 index 00000000..804eb260 --- /dev/null +++ b/geaflow/geaflow-deploy/geaflow-on-ray/src/test/java/com/antgroup/geaflow/cluster/ray/RayJobTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2023 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package com.antgroup.geaflow.cluster.ray; + + +import com.antgroup.geaflow.utils.HttpUtil; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.lang3.StringEscapeUtils; +import org.testng.annotations.Test; + +public class RayJobTest { + + @Test(enabled = false) + public void testRayJobSubmitTest() { + String mainClass = "com.antgroup.geaflow.GeaFlowJobDemo"; + List remoteJarUrls = Arrays.asList( + "https:://public-engine-url-geaflow-0.6.zip", + "https://public-udf-url-udf.zip"); + String args = "{\"job\":{\"geaflow.config.key\":\"true\"}"; + args = StringEscapeUtils.escapeJava(args); + args = StringEscapeUtils.escapeJava("\"" + args + "\""); + + submitJob(remoteJarUrls, mainClass, args); + } + + public static void submitJob(List remoteJarUrls, String mainClass, String args) { + // Cluster args. + // ray dashboard url. + String rayDashboardAddress = "127.0.0.1:8265"; + // ray redis url. + String rayRedisAddress = "127.0.0.1:6379"; + String rayDistJarPath = "path-to-ray-cluster/ray_dist.jar"; + String raySessionResourceJarPath = "/path-to-ray-cluster-session-dir/session_latest/runtime_resources/java_jars_files/"; + + // Job args. + List downloadJarPaths = new ArrayList<>(remoteJarUrls.size()); + for (String remoteJarUrl : remoteJarUrls) { + String str = remoteJarUrl.replace(".zip", ""); + String result = str.replaceAll("[:/.]+", "_"); + downloadJarPaths.add(raySessionResourceJarPath + result + "/*"); + } + + String downloadJarClassPath = String.join(":", downloadJarPaths); + List remoteJarUrlsStr = new ArrayList<>(remoteJarUrls.size()); + for (String remoteUrl : remoteJarUrls) { + remoteJarUrlsStr.add("\"" + remoteUrl + "\""); + } + String remoteJarJsonPath = String.join(",", remoteJarUrlsStr); + + String request = String.format("{\n" + + "\"entrypoint\": \"java -classpath %s:%s -Dlog.file=/tmp/logfile.log -Dray.address=%s %s %s\",\n" + + "\"runtime_env\": {\"java_jars\": [%s]}\n" + + "}", rayDistJarPath, downloadJarClassPath, rayRedisAddress, mainClass, args, remoteJarJsonPath); + + String postUrl = String.format("http://%s/api/jobs/", rayDashboardAddress); + System.out.println("request: \n" + request + "\npost url: \n" + postUrl); + HttpUtil.post(postUrl, request); + } +} diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFoStrategyTest.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFoStrategyTest.java similarity index 89% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFoStrategyTest.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFoStrategyTest.java index 77cf306e..a2b43c66 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFoStrategyTest.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFoStrategyTest.java @@ -28,15 +28,15 @@ public class RayClusterFoStrategyTest { @Test public void testLoad() { Configuration configuration = new Configuration(); - IFailoverStrategy foStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY_COMMUNITY, configuration.getString(FO_STRATEGY)); + IFailoverStrategy foStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY, configuration.getString(FO_STRATEGY)); Assert.assertNotNull(foStrategy); Assert.assertEquals(foStrategy.getType().name(), configuration.getString(FO_STRATEGY)); - IFailoverStrategy rayFoStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY_COMMUNITY, "cluster_fo"); + IFailoverStrategy rayFoStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY, "cluster_fo"); Assert.assertNotNull(rayFoStrategy); Assert.assertEquals(rayFoStrategy.getClass(), RayClusterFailoverStrategy.class); - rayFoStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY_COMMUNITY, "component_fo"); + rayFoStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY, "component_fo"); Assert.assertNotNull(rayFoStrategy); Assert.assertEquals(rayFoStrategy.getClass(), RayComponentFailoverStrategy.class); } diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFoStrategyTest.java b/geaflow/geaflow-deploy/geaflow-on-ray/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFoStrategyTest.java similarity index 90% rename from geaflow/geaflow-deploy/geaflow-on-ray-community/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFoStrategyTest.java rename to geaflow/geaflow-deploy/geaflow-on-ray/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFoStrategyTest.java index 3b33bf5f..d548de0a 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFoStrategyTest.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFoStrategyTest.java @@ -28,11 +28,11 @@ public class RayDisableFoStrategyTest { @Test public void testLoad() { Configuration configuration = new Configuration(); - IFailoverStrategy foStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY_COMMUNITY, configuration.getString(FO_STRATEGY)); + IFailoverStrategy foStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY, configuration.getString(FO_STRATEGY)); Assert.assertNotNull(foStrategy); Assert.assertEquals(foStrategy.getType().name(), configuration.getString(FO_STRATEGY)); - IFailoverStrategy rayDisableFoStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY_COMMUNITY, "disable_fo"); + IFailoverStrategy rayDisableFoStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY, "disable_fo"); Assert.assertNotNull(rayDisableFoStrategy); Assert.assertEquals(rayDisableFoStrategy.getClass(), RayDisableFailoverStrategy.class); } diff --git a/geaflow/geaflow-deploy/pom.xml b/geaflow/geaflow-deploy/pom.xml index cc3cebdd..85301549 100644 --- a/geaflow/geaflow-deploy/pom.xml +++ b/geaflow/geaflow-deploy/pom.xml @@ -29,7 +29,7 @@ geaflow-cluster-runner - geaflow-on-ray-community + geaflow-on-ray geaflow-on-k8s geaflow-on-local geaflow-assembly diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/com/antgroup/geaflow/dsl/runtime/engine/GeaFlowGqlClient.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/com/antgroup/geaflow/dsl/runtime/engine/GeaFlowGqlClient.java index 566503e2..0cd49f7e 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/com/antgroup/geaflow/dsl/runtime/engine/GeaFlowGqlClient.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/com/antgroup/geaflow/dsl/runtime/engine/GeaFlowGqlClient.java @@ -88,8 +88,8 @@ public static Environment loadEnvironment(String[] args) { switch (clusterType) { case K8S: return EnvironmentFactory.onK8SEnvironment(args); - case RAY_COMMUNITY: - return EnvironmentFactory.onRayCommunityEnvironment(args); + case RAY: + return EnvironmentFactory.onRayEnvironment(args); default: return EnvironmentFactory.onLocalEnvironment(args); } diff --git a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamFoTest.java b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamFoTest.java index 89b3c1a6..7e589409 100644 --- a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamFoTest.java +++ b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamFoTest.java @@ -31,7 +31,7 @@ public class UnBoundedStreamFoTest { private static final Logger LOGGER = LoggerFactory.getLogger(UnBoundedStreamFoTest.class); public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { - Environment environment = EnvironmentFactory.onRayCommunityEnvironment(args); + Environment environment = EnvironmentFactory.onRayEnvironment(args); Configuration configuration = environment.getEnvironmentContext().getConfig(); StreamWordCountPipeline pipeline = new StreamWordCountPipeline(); diff --git a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamWordPrint.java b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamWordPrint.java index 7e6aa8aa..9d194a8a 100644 --- a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamWordPrint.java +++ b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamWordPrint.java @@ -20,7 +20,7 @@ public class UnBoundedStreamWordPrint { public static void main(String[] args) { - Environment environment = EnvironmentFactory.onRayCommunityEnvironment(); + Environment environment = EnvironmentFactory.onRayEnvironment(); StreamWordPrintPipeline pipeline = new StreamWordPrintPipeline(); pipeline.submit(environment); diff --git a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/WindowStreamWordCount.java b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/WindowStreamWordCount.java index 9aff071e..ec24f210 100644 --- a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/WindowStreamWordCount.java +++ b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/WindowStreamWordCount.java @@ -33,7 +33,7 @@ public class WindowStreamWordCount { public static void main(String[] args) { - Environment environment = EnvironmentFactory.onRayCommunityEnvironment(args); + Environment environment = EnvironmentFactory.onRayEnvironment(args); Pipeline pipeline = PipelineFactory.buildPipeline(environment); pipeline.submit(new PipelineTask() { @Override diff --git a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/util/EnvironmentUtil.java b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/util/EnvironmentUtil.java index ac531cdf..144c53ae 100644 --- a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/util/EnvironmentUtil.java +++ b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/util/EnvironmentUtil.java @@ -32,8 +32,8 @@ public static Environment loadEnvironment(String[] args) { switch (clusterType) { case K8S: return EnvironmentFactory.onK8SEnvironment(args); - case RAY_COMMUNITY: - return EnvironmentFactory.onRayCommunityEnvironment(args); + case RAY: + return EnvironmentFactory.onRayEnvironment(args); default: return EnvironmentFactory.onLocalEnvironment(args); } diff --git a/geaflow/pom.xml b/geaflow/pom.xml index 367f3290..f1e0e141 100644 --- a/geaflow/pom.xml +++ b/geaflow/pom.xml @@ -509,7 +509,7 @@ com.antgroup.tugraph - geaflow-on-ray-community + geaflow-on-ray ${project.version}