diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java index 2e4e8d201f6..9889175688d 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java @@ -28,11 +28,6 @@ public abstract class AbstractCommandArgs implements CommandArgs { - @Parameter(names = {"-c", "--config"}, - description = "Config file", - required = true) - private String configFile; - @Parameter(names = {"-i", "--variable"}, description = "variable substitution, such as -i city=beijing, or -i date=20190318") private List variables = Collections.emptyList(); @@ -56,14 +51,6 @@ public abstract class AbstractCommandArgs implements CommandArgs { */ private List originalParameters; - public String getConfigFile() { - return configFile; - } - - public void setConfigFile(String configFile) { - this.configFile = configFile; - } - public List getVariables() { return variables; } @@ -112,4 +99,12 @@ public DeployMode getDeployMode() { throw new UnsupportedOperationException("abstract class CommandArgs not support this method"); } + public String getConfigFile() { + throw new UnsupportedOperationException("abstract class CommandArgs not support this method"); + } + + public void setConfigFile(String configFile) { + throw new UnsupportedOperationException("abstract class CommandArgs not support this method"); + } + } diff --git a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java index 8dc84e15946..758fb6f249a 100644 --- a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java +++ b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.core.starter.command.AbstractCommandArgs; +import com.beust.jcommander.Parameter; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -44,6 +45,12 @@ public void getConfigPath() throws URISyntaxException { } private static class SparkCommandArgs extends AbstractCommandArgs { + + @Parameter(names = {"-c", "--config"}, + description = "Config file", + required = true) + private String configFile; + private DeployMode deployMode; public void setDeployMode(DeployMode deployMode) { @@ -53,5 +60,15 @@ public void setDeployMode(DeployMode deployMode) { public DeployMode getDeployMode() { return deployMode; } + + @Override + public String getConfigFile() { + return this.configFile; + } + + @Override + public void setConfigFile(String configFile) { + this.configFile = configFile; + } } } diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java index 4c98ff532c2..f8b6f5dd001 100644 --- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java +++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java @@ -27,6 +27,11 @@ public class FlinkCommandArgs extends AbstractCommandArgs { + @Parameter(names = {"-c", "--config"}, + description = "Config file", + required = true) + private String configFile; + @Parameter(names = {"-r", "--run-mode"}, converter = RunModeConverter.class, description = "job run mode, run or run-application") @@ -50,6 +55,16 @@ public void setRunMode(FlinkRunMode runMode) { this.runMode = runMode; } + @Override + public String getConfigFile() { + return this.configFile; + } + + @Override + public void setConfigFile(String configFile) { + this.configFile = configFile; + } + /** * Used to convert the run mode string to the enum value. */ @@ -70,5 +85,4 @@ public FlinkRunMode convert(String value) { throw new IllegalArgumentException(String.format("Run mode %s not supported", value)); } } - } diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java index eb5ef929d5e..a5c54eeb184 100644 --- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java +++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java @@ -26,6 +26,11 @@ public class SparkCommandArgs extends AbstractCommandArgs { + @Parameter(names = {"-c", "--config"}, + description = "Config file", + required = true) + private String configFile; + @Parameter(names = {"-e", "--deploy-mode"}, description = "Spark deploy mode", required = true, @@ -59,4 +64,14 @@ public void setMaster(String master) { this.master = master; } + @Override + public String getConfigFile() { + return this.configFile; + } + + @Override + public void setConfigFile(String configFile) { + this.configFile = configFile; + } + } diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java index cc79f1ffaf6..33dd11d4b18 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java @@ -37,10 +37,22 @@ public class ClientCommandArgs extends AbstractCommandArgs { converter = ExecutionModeConverter.class) private ExecutionMode executionMode = ExecutionMode.CLUSTER; + @Parameter(names = {"-c", "--config"}, + description = "Config file") + private String configFile; + @Parameter(names = {"-cn", "--cluster"}, description = "The name of cluster") private String clusterName = "seatunnel_default_cluster"; + @Parameter(names = {"-j", "--job-id"}, + description = "Get job status by JobId") + private String jobId; + + @Parameter(names = {"-l", "--list"}, + description = "list job status") + private boolean listJob = false; + public String getClusterName() { return clusterName; } @@ -65,6 +77,14 @@ public void setExecutionMode(ExecutionMode executionMode) { this.executionMode = executionMode; } + public String getJobId() { + return jobId; + } + + public boolean isListJob(){ + return listJob; + } + @Override public EngineType getEngineType() { return EngineType.SEATUNNEL; @@ -74,4 +94,14 @@ public EngineType getEngineType() { public DeployMode getDeployMode() { return DeployMode.CLIENT; } + + @Override + public String getConfigFile() { + return this.configFile; + } + + @Override + public void setConfigFile(String configFile) { + this.configFile = configFile; + } } diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index 0d65183499c..9c21d6ee4f9 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -33,6 +33,7 @@ import com.hazelcast.client.config.ClientConfig; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.instance.impl.HazelcastInstanceFactory; +import lombok.extern.slf4j.Slf4j; import java.nio.file.Path; import java.util.Random; @@ -41,6 +42,7 @@ /** * This command is used to execute the SeaTunnel engine job by SeaTunnel API. */ +@Slf4j public class ClientExecuteCommand implements Command { private final ClientCommandArgs clientCommandArgs; @@ -49,12 +51,9 @@ public ClientExecuteCommand(ClientCommandArgs clientCommandArgs) { this.clientCommandArgs = clientCommandArgs; } + @SuppressWarnings("checkstyle:RegexpSingleline") @Override public void execute() throws CommandExecuteException { - Path configFile = FileUtils.getConfigPath(clientCommandArgs); - - JobConfig jobConfig = new JobConfig(); - jobConfig.setName(clientCommandArgs.getJobName()); HazelcastInstance instance = null; SeaTunnelClient engineClient = null; try { @@ -66,10 +65,22 @@ public void execute() throws CommandExecuteException { ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(clusterName); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig); + if (clientCommandArgs.isListJob()) { + String jobstatus = engineClient.listJobStatus(); + System.out.println(jobstatus); + } else if (null != clientCommandArgs.getJobId()) { + String jobState = engineClient.getJobState(Long.parseLong(clientCommandArgs.getJobId())); + System.out.println(jobState); + } else { + Path configFile = FileUtils.getConfigPath(clientCommandArgs); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(clientCommandArgs.getJobName()); + JobExecutionEnvironment jobExecutionEnv = + engineClient.createExecutionContext(configFile.toString(), jobConfig); - ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); - clientJobProxy.waitForJobComplete(); + ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + clientJobProxy.waitForJobComplete(); + } } catch (ExecutionException | InterruptedException e) { throw new CommandExecuteException("SeaTunnel job executed failed", e); } finally { diff --git a/seatunnel-core/seatunnel-starter/src/main/resources/log4j.properties b/seatunnel-core/seatunnel-starter/src/main/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-core/seatunnel-starter/src/main/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/junit-platform.properties b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/junit-platform.properties new file mode 100644 index 00000000000..1b9e4750c6d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/junit-platform.properties @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +junit.jupiter.execution.parallel.mode.default = same_thread +junit.jupiter.execution.parallel.mode.classes.default = same_thread diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java index 5cdb22770d1..afd571e667c 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java @@ -392,7 +392,7 @@ public void testStreamJobRestoreIn3NodeWorkerDown() throws ExecutionException, I // shutdown on worker node node2.shutdown(); - Awaitility.await().atMost(180000, TimeUnit.MILLISECONDS) + Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { // Wait job write all rows in file Thread.sleep(2000); diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/junit-platform.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/junit-platform.properties new file mode 100644 index 00000000000..1b9e4750c6d --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/junit-platform.properties @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +junit.jupiter.execution.parallel.mode.default = same_thread +junit.jupiter.execution.parallel.mode.classes.default = same_thread diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/junit-platform.properties b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/junit-platform.properties new file mode 100644 index 00000000000..1b9e4750c6d --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/junit-platform.properties @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +junit.jupiter.execution.parallel.mode.default = same_thread +junit.jupiter.execution.parallel.mode.classes.default = same_thread diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java index 0f5d8fa4ea3..bd8d2dfe874 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.engine.client.job.JobClient; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStateCodec; +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec; import com.hazelcast.client.config.ClientConfig; @@ -64,4 +66,18 @@ public void shutdown() { hazelcastClient.shutdown(); } } + + public String getJobState(Long jobId){ + return hazelcastClient.requestOnMasterAndDecodeResponse( + SeaTunnelGetJobStateCodec.encodeRequest(jobId), + SeaTunnelGetJobStateCodec::decodeResponse + ); + } + + public String listJobStatus(){ + return hazelcastClient.requestOnMasterAndDecodeResponse( + SeaTunnelListJobStatusCodec.encodeRequest(), + SeaTunnelListJobStatusCodec::decodeResponse + ); + } } diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index 906edf640f3..c5dce270e17 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -96,6 +96,38 @@ public void testExecuteJob() { } } + @Test + public void testGetJobState() { + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("/client_test.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName("fake_to_console"); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest")); + SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); + JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig); + + try { + final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + CompletableFuture objectCompletableFuture = CompletableFuture.supplyAsync(() -> { + return clientJobProxy.waitForJobComplete(); + }); + long jobId = clientJobProxy.getJobId(); + + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue( + engineClient.getJobState(jobId).contains("RUNNING") && engineClient.listJobStatus().contains("RUNNING"))); + + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue( + engineClient.getJobState(jobId).contains("FINISHED") && engineClient.listJobStatus().contains("FINISHED"))); + + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + @AfterAll public static void after() { INSTANCE.shutdown(); diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java index f7534884e1f..a7acbe87614 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java @@ -44,6 +44,8 @@ public class Constant { public static final String IMAP_RUNNING_JOB_STATE = "runningJobState"; + public static final String IMAP_FINISHED_JOB_STATE = "finishedJobState"; + public static final String IMAP_STATE_TIMESTAMPS = "stateTimestamps"; public static final String IMAP_OWNED_SLOT_PROFILES = "ownedSlotProfilesIMap"; diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobStateCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobStateCodec.java new file mode 100644 index 00000000000..9b225608edb --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobStateCodec.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.core.protocol.codec; + +import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.client.impl.protocol.Generated; +import com.hazelcast.client.impl.protocol.codec.builtin.*; +import com.hazelcast.client.impl.protocol.codec.custom.*; + +import static com.hazelcast.client.impl.protocol.ClientMessage.*; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.*; + +/* + * This file is auto-generated by the Hazelcast Client Protocol Code Generator. + * To change this file, edit the templates or the protocol + * definitions on the https://github.com/hazelcast/hazelcast-client-protocol + * and regenerate it. + */ + +/** + */ +@Generated("56079ba8d58afe5c98dfe2b5dc6c301a") +public final class SeaTunnelGetJobStateCodec { + //hex: 0xDE0600 + public static final int REQUEST_MESSAGE_TYPE = 14550528; + //hex: 0xDE0601 + public static final int RESPONSE_MESSAGE_TYPE = 14550529; + private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES; + private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES; + private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES; + + private SeaTunnelGetJobStateCodec() { + } + + public static ClientMessage encodeRequest(long jobId) { + ClientMessage clientMessage = ClientMessage.createForEncode(); + clientMessage.setRetryable(true); + clientMessage.setOperationName("SeaTunnel.GetJobState"); + Frame initialFrame = new Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE); + encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE); + encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1); + encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId); + clientMessage.add(initialFrame); + return clientMessage; + } + + /** + */ + public static long decodeRequest(ClientMessage clientMessage) { + ForwardFrameIterator iterator = clientMessage.frameIterator(); + Frame initialFrame = iterator.next(); + return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET); + } + + public static ClientMessage encodeResponse(String response) { + ClientMessage clientMessage = ClientMessage.createForEncode(); + Frame initialFrame = new Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE); + encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE); + clientMessage.add(initialFrame); + + StringCodec.encode(clientMessage, response); + return clientMessage; + } + + /** + */ + public static String decodeResponse(ClientMessage clientMessage) { + ForwardFrameIterator iterator = clientMessage.frameIterator(); + //empty initial frame + iterator.next(); + return StringCodec.decode(iterator); + } +} diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelListJobStatusCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelListJobStatusCodec.java new file mode 100644 index 00000000000..e497281df1f --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelListJobStatusCodec.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.core.protocol.codec; + +import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.client.impl.protocol.Generated; +import com.hazelcast.client.impl.protocol.codec.builtin.*; +import com.hazelcast.client.impl.protocol.codec.custom.*; + + +import static com.hazelcast.client.impl.protocol.ClientMessage.*; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.*; + +/* + * This file is auto-generated by the Hazelcast Client Protocol Code Generator. + * To change this file, edit the templates or the protocol + * definitions on the https://github.com/hazelcast/hazelcast-client-protocol + * and regenerate it. + */ + +/** + */ +@Generated("ee7ee4fc67d26f72ccdf418fcb868148") +public final class SeaTunnelListJobStatusCodec { + //hex: 0xDE0700 + public static final int REQUEST_MESSAGE_TYPE = 14550784; + //hex: 0xDE0701 + public static final int RESPONSE_MESSAGE_TYPE = 14550785; + private static final int REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES; + private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES; + + private SeaTunnelListJobStatusCodec() { + } + + public static ClientMessage encodeRequest() { + ClientMessage clientMessage = ClientMessage.createForEncode(); + clientMessage.setRetryable(true); + clientMessage.setOperationName("SeaTunnel.ListJobStatus"); + Frame initialFrame = new Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE); + encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE); + encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1); + clientMessage.add(initialFrame); + return clientMessage; + } + + public static ClientMessage encodeResponse(String response) { + ClientMessage clientMessage = ClientMessage.createForEncode(); + Frame initialFrame = new Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE); + encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE); + clientMessage.add(initialFrame); + + StringCodec.encode(clientMessage, response); + return clientMessage; + } + + /** + */ + public static String decodeResponse(ClientMessage clientMessage) { + ForwardFrameIterator iterator = clientMessage.frameIterator(); + //empty initial frame + iterator.next(); + return StringCodec.decode(iterator); + } +} diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml index 39cb3c386c9..6d599a74707 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml +++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml @@ -116,3 +116,41 @@ methods: nullable: false since: 2.0 doc: '' + + - id: 6 + name: getJobState + since: 2.0 + doc: '' + request: + retryable: true + partitionIdentifier: -1 + params: + - name: jobId + type: long + nullable: false + since: 2.0 + doc: '' + response: + params: + - name: response + type: String + nullable: false + since: 2.0 + doc: '' + + - id: 7 + name: listJobStatus + since: 2.0 + doc: '' + request: + retryable: true + partitionIdentifier: -1 + params: [] + response: + params: + - name: response + type: String + nullable: false + since: 2.0 + doc: '' + diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 51793a8f96c..043d98a347f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -32,6 +32,7 @@ import org.apache.seatunnel.engine.server.execution.ExecutionState; import org.apache.seatunnel.engine.server.execution.TaskExecutionState; import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; +import org.apache.seatunnel.engine.server.master.JobHistoryService; import org.apache.seatunnel.engine.server.master.JobMaster; import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager; import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory; @@ -63,6 +64,8 @@ public class CoordinatorService { private volatile ResourceManager resourceManager; + private JobHistoryService jobHistoryService; + /** * IMap key is jobId and value is {@link RunningJobInfo}. * Tuple2 key is JobMaster init timestamp and value is the jobImmutableInformation which is sent by client when submit job @@ -135,6 +138,10 @@ public CoordinatorService(@NonNull NodeEngineImpl nodeEngine, @NonNull SeaTunnel masterActiveListener.scheduleAtFixedRate(this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS); } + public JobHistoryService getJobHistoryService() { + return jobHistoryService; + } + public JobMaster getJobMaster(Long jobId) { return runningJobMasterMap.get(jobId); } @@ -158,6 +165,13 @@ private void initCoordinatorService() { runningJobStateTimestampsIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS); ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES); + jobHistoryService = new JobHistoryService( + runningJobStateIMap, + logger, + runningJobMasterMap, + nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE) + ); + List> collect = runningJobInfoIMap.entrySet().stream().map(entry -> { return CompletableFuture.runAsync(() -> { logger.info(String.format("begin restore job (%s) from master active switch", entry.getKey())); @@ -243,6 +257,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull Runn jobMaster.run(); } finally { // storage job state info to HistoryStorage + jobHistoryService.storeFinishedJobState(jobMaster); removeJobIMap(jobMaster); runningJobMasterMap.remove(jobId); } @@ -332,6 +347,7 @@ public PassiveCompletableFuture submitJob(long jobId, Data jobImmutableInf jobMaster.run(); } finally { // storage job state info to HistoryStorage + jobHistoryService.storeFinishedJobState(jobMaster); removeJobIMap(jobMaster); runningJobMasterMap.remove(jobId); } @@ -365,9 +381,9 @@ private void removeJobIMap(JobMaster jobMaster) { public PassiveCompletableFuture waitForJobComplete(long jobId) { JobMaster runningJobMaster = runningJobMasterMap.get(jobId); if (runningJobMaster == null) { - // TODO Get Job Status from JobHistoryStorage + JobStatus jobStatus = jobHistoryService.getJobStatus(jobId).getJobStatus(); CompletableFuture future = new CompletableFuture<>(); - future.complete(JobStatus.FINISHED); + future.complete(jobStatus); return new PassiveCompletableFuture<>(future); } else { return runningJobMaster.getJobMasterCompleteFuture(); @@ -391,8 +407,7 @@ public PassiveCompletableFuture cancelJob(long jodId) { public JobStatus getJobStatus(long jobId) { JobMaster runningJobMaster = runningJobMasterMap.get(jobId); if (runningJobMaster == null) { - // TODO Get Job Status from JobHistoryStorage - return JobStatus.FINISHED; + return jobHistoryService.getJobStatus(jobId).getJobStatus(); } return runningJobMaster.getJobStatus(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java new file mode 100644 index 00000000000..4d309be38c8 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.master; + +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.core.job.PipelineStatus; +import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation; +import org.apache.seatunnel.engine.server.execution.ExecutionState; +import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.hazelcast.logging.ILogger; +import com.hazelcast.map.IMap; +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +public class JobHistoryService { + /** + * IMap key is one of jobId {@link org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and + * {@link org.apache.seatunnel.engine.server.execution.TaskGroupLocation} + *

+ * The value of IMap is one of {@link JobStatus} {@link PipelineStatus} + * {@link org.apache.seatunnel.engine.server.execution.ExecutionState} + *

+ * This IMap is used to recovery runningJobStateIMap in JobMaster when a new master node active + */ + private final IMap runningJobStateIMap; + + private final ILogger logger; + + /** + * key: job id; + *
value: job master; + */ + private final Map runningJobMasterMap; + + /** + * finishedJobStateImap key is jobId and value is jobState(json) + * JobStateData Indicates the status of the job, pipeline, and task + */ + //TODO need to limit the amount of storage + private final IMap finishedJobStateImap; + + private final ObjectMapper objectMapper; + + public JobHistoryService( + IMap runningJobStateIMap, + ILogger logger, + Map runningJobMasterMap, + IMap finishedJobStateImap + ) { + this.runningJobStateIMap = runningJobStateIMap; + this.logger = logger; + this.runningJobMasterMap = runningJobMasterMap; + this.finishedJobStateImap = finishedJobStateImap; + this.objectMapper = new ObjectMapper(); + this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + } + + // Gets the status of a running and completed job + public String listAllJob() { + ObjectNode objectNode = objectMapper.createObjectNode(); + ArrayNode jobs = objectNode.putArray("jobs"); + + Stream.concat(runningJobMasterMap.values().stream().map(this::toJobStateMapper), + finishedJobStateImap.values().stream()) + .forEach(jobStateData -> { + JobStatusData jobStatusData = new JobStatusData(jobStateData.jobId, jobStateData.jobStatus); + JsonNode jsonNode = objectMapper.valueToTree(jobStatusData); + jobs.add(jsonNode); + }); + return jobs.toString(); + } + + // Get detailed status of a single job + public JobStateData getJobStatus(Long jobId) { + return runningJobMasterMap.containsKey(jobId) ? toJobStateMapper(runningJobMasterMap.get(jobId)) : + finishedJobStateImap.getOrDefault(jobId, null); + } + + // Get detailed status of a single job as json + public String getJobStatusAsString(Long jobId) { + JobStateData jobStatus = getJobStatus(jobId); + if (null != jobStatus) { + try { + return objectMapper.writeValueAsString(jobStatus); + } catch (JsonProcessingException e) { + logger.severe("serialize jobStateMapper err", e); + ObjectNode objectNode = objectMapper.createObjectNode(); + objectNode.put("err", "serialize jobStateMapper err"); + return objectNode.toString(); + } + } + ObjectNode objectNode = objectMapper.createObjectNode(); + objectNode.put("err", String.format("jobId : %s not found", jobId)); + return objectNode.toString(); + } + + @SuppressWarnings("checkstyle:MagicNumber") + public void storeFinishedJobState(JobMaster jobMaster) { + JobStateData jobStateData = toJobStateMapper(jobMaster); + finishedJobStateImap.put(jobStateData.jobId, jobStateData, 14, TimeUnit.DAYS); + } + + private JobStateData toJobStateMapper(JobMaster jobMaster) { + + Long jobId = jobMaster.getJobImmutableInformation().getJobId(); + Map pipelineStateMapperMap = new HashMap<>(); + + jobMaster.getPhysicalPlan().getPipelineList().forEach(pipeline -> { + PipelineLocation pipelineLocation = pipeline.getPipelineLocation(); + PipelineStatus pipelineState = (PipelineStatus) runningJobStateIMap.get(pipelineLocation); + Map taskStateMap = new HashMap<>(); + pipeline.getCoordinatorVertexList().forEach(coordinator -> { + TaskGroupLocation taskGroupLocation = coordinator.getTaskGroupLocation(); + taskStateMap.put(taskGroupLocation, (ExecutionState) runningJobStateIMap.get(taskGroupLocation)); + }); + pipeline.getPhysicalVertexList().forEach(task -> { + TaskGroupLocation taskGroupLocation = task.getTaskGroupLocation(); + taskStateMap.put(taskGroupLocation, (ExecutionState) runningJobStateIMap.get(taskGroupLocation)); + }); + + PipelineStateData pipelineStateData = new PipelineStateData(pipelineState, taskStateMap); + pipelineStateMapperMap.put(pipelineLocation, pipelineStateData); + }); + JobStatus jobStatus = (JobStatus) runningJobStateIMap.get(jobId); + + return new JobStateData(jobId, jobStatus, pipelineStateMapperMap); + } + + @AllArgsConstructor + @Data + public static final class JobStatusData implements Serializable { + Long jobId; + JobStatus jobStatus; + } + + @AllArgsConstructor + @Data + public static final class JobStateData implements Serializable{ + Long jobId; + JobStatus jobStatus; + Map pipelineStateMapperMap; + } + + @AllArgsConstructor + @Data + public static final class PipelineStateData implements Serializable{ + PipelineStatus pipelineStatus; + Map executionStateMap; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStateOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStateOperation.java new file mode 100644 index 00000000000..6a7f39ae2a8 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStateOperation.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.operation; + +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.IdentifiedDataSerializable; +import com.hazelcast.spi.impl.AllowedDuringPassiveState; +import com.hazelcast.spi.impl.operationservice.Operation; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class GetJobStateOperation extends Operation implements IdentifiedDataSerializable, AllowedDuringPassiveState { + private Long jobId; + + private String response; + + public GetJobStateOperation() { + } + + public GetJobStateOperation(Long jobId) { + this.jobId = jobId; + } + + @Override + public final int getFactoryId() { + return OperationDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return OperationDataSerializerHook.PRINT_MESSAGE_OPERATOR; + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + super.writeInternal(out); + out.writeLong(jobId); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + super.readInternal(in); + jobId = in.readLong(); + } + + @Override + public void run() { + SeaTunnelServer service = getService(); + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + return service.getCoordinatorService().getJobHistoryService().getJobStatusAsString(jobId); + }); + + try { + response = future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new SeaTunnelEngineException(e); + } + } + + @Override + public Object getResponse() { + return response; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java new file mode 100644 index 00000000000..179577d96c4 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.operation; + +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.server.SeaTunnelServer; + +import com.hazelcast.spi.impl.AllowedDuringPassiveState; +import com.hazelcast.spi.impl.operationservice.Operation; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class ListJobStatusOperation extends Operation implements AllowedDuringPassiveState { + + private String response; + + public ListJobStatusOperation() { + } + + @Override + public void run() { + SeaTunnelServer service = getService(); + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + return service.getCoordinatorService().getJobHistoryService().listAllJob(); + }); + + try { + response = future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new SeaTunnelEngineException(e); + } + } + + @Override + public Object getResponse() { + return response; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobStateTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobStateTask.java new file mode 100644 index 00000000000..7aa3b213025 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobStateTask.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.protocol.task; + +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStateCodec; +import org.apache.seatunnel.engine.server.operation.GetJobStateOperation; + +import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.instance.impl.Node; +import com.hazelcast.internal.nio.Connection; +import com.hazelcast.spi.impl.operationservice.Operation; + +public class GetJobStateTask extends AbstractSeaTunnelMessageTask { + + protected GetJobStateTask(ClientMessage clientMessage, Node node, Connection connection) { + super(clientMessage, node, connection, + SeaTunnelGetJobStateCodec::decodeRequest, + SeaTunnelGetJobStateCodec::encodeResponse); + } + + @Override + protected Operation prepareOperation() { + return new GetJobStateOperation(parameters); + } + + @Override + public String getMethodName() { + return "getJobState"; + } + + @Override + public Object[] getParameters() { + return new Object[0]; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/ListJobStatusTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/ListJobStatusTask.java new file mode 100644 index 00000000000..acbe4fae05c --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/ListJobStatusTask.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.protocol.task; + +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec; +import org.apache.seatunnel.engine.server.operation.ListJobStatusOperation; + +import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.instance.impl.Node; +import com.hazelcast.internal.nio.Connection; +import com.hazelcast.spi.impl.operationservice.Operation; + +public class ListJobStatusTask extends AbstractSeaTunnelMessageTask { + + protected ListJobStatusTask(ClientMessage clientMessage, Node node, Connection connection) { + super(clientMessage, node, connection, + m -> null, + SeaTunnelListJobStatusCodec::encodeResponse); + } + + @Override + protected Operation prepareOperation() { + return new ListJobStatusOperation(); + } + + @Override + public String getMethodName() { + return "listJobStatus"; + } + + @Override + public Object[] getParameters() { + return new Object[0]; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java index 902e42cb638..7910bcd76ed 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java @@ -18,7 +18,9 @@ package org.apache.seatunnel.engine.server.protocol.task; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec; +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStateCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec; +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec; @@ -55,5 +57,9 @@ private void initFactories() { (clientMessage, connection) -> new CancelJobTask(clientMessage, node, connection)); factories.put(SeaTunnelGetJobStatusCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new GetJobStatusTask(clientMessage, node, connection)); + factories.put(SeaTunnelGetJobStateCodec.REQUEST_MESSAGE_TYPE, + (clientMessage, connection) -> new GetJobStateTask(clientMessage, node, connection)); + factories.put(SeaTunnelListJobStatusCodec.REQUEST_MESSAGE_TYPE, + (clientMessage, connection) -> new ListJobStatusTask(clientMessage, node, connection)); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java index 3125b58eacb..ee8aa5214a7 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java @@ -107,7 +107,7 @@ public void testClearCoordinatorService() clearCoordinatorServiceMethod.setAccessible(false); // because runningJobMasterMap is empty and we have no JobHistoryServer, so return finished. - Assertions.assertTrue(JobStatus.FINISHED.equals(coordinatorService.getJobStatus(jobId))); + Assertions.assertTrue(JobStatus.RUNNING.equals(coordinatorService.getJobStatus(jobId))); coordinatorServiceTest.shutdown(); } @@ -165,7 +165,7 @@ public void testJobRestoreWhenMasterNodeSwitch() throws InterruptedException { // because runningJobMasterMap is empty and we have no JobHistoryServer, so return finished. await().atMost(200000, TimeUnit.MILLISECONDS) .untilAsserted( - () -> Assertions.assertEquals(JobStatus.FINISHED, server2.getCoordinatorService().getJobStatus(jobId))); + () -> Assertions.assertEquals(JobStatus.CANCELED, server2.getCoordinatorService().getJobStatus(jobId))); instance2.shutdown(); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java new file mode 100644 index 00000000000..4e9f56f0f6e --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.master; + +import static org.awaitility.Awaitility.await; + +import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; +import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; +import org.apache.seatunnel.engine.core.job.JobImmutableInformation; +import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest; +import org.apache.seatunnel.engine.server.TestUtils; + +import com.hazelcast.internal.serialization.Data; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +@DisabledOnOs(OS.WINDOWS) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class JobHistoryServiceTest extends AbstractSeaTunnelServerTest { + + private static final Long JOB_1 = 1L; + private static final Long JOB_2 = 2L; + private static final Long JOB_3 = 3L; + + @Test + public void testlistJobState() throws Exception { + startJob(JOB_1, "fake_to_console.conf"); + + // waiting for JOB_1 status turn to RUNNING + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue( + server.getCoordinatorService().getJobHistoryService().listAllJob().contains(String.format("{\"jobId\":%s,\"jobStatus\":\"RUNNING\"}", JOB_1)))); + + // waiting for JOB_1 status turn to FINISHED + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue( + server.getCoordinatorService().getJobHistoryService().listAllJob().contains(String.format("{\"jobId\":%s,\"jobStatus\":\"FINISHED\"}", JOB_1)))); + + startJob(JOB_2, "fake_to_console.conf"); + // waiting for JOB_2 status turn to FINISHED and JOB_2 status turn to RUNNING + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue( + server.getCoordinatorService().getJobHistoryService().listAllJob().contains(String.format("{\"jobId\":%s,\"jobStatus\":\"FINISHED\"}", JOB_1)) + && + server.getCoordinatorService().getJobHistoryService().listAllJob().contains(String.format("{\"jobId\":%s,\"jobStatus\":\"RUNNING\"}", JOB_2)) + )); + } + + @Test + public void testGetJobStatus() throws Exception{ + startJob(JOB_3, "fake_to_console.conf"); + // waiting for JOB_3 status turn to RUNNING + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue( + server.getCoordinatorService().getJobHistoryService().getJobStatusAsString(JOB_3).contains("TaskGroupLocation") + && + server.getCoordinatorService().getJobHistoryService().getJobStatusAsString(JOB_3).contains("RUNNING") + )); + + // waiting for job1 status turn to FINISHED + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue( + server.getCoordinatorService().getJobHistoryService().getJobStatusAsString(JOB_3).contains("TaskGroupLocation") + && + server.getCoordinatorService().getJobHistoryService().getJobStatusAsString(JOB_3).contains("FINISHED") + )); + } + + private void startJob(Long jobid, String path){ + LogicalDag testLogicalDag = + TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid); + + JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(jobid, + nodeEngine.getSerializationService().toData(testLogicalDag), testLogicalDag.getJobConfig(), + Collections.emptyList()); + + Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation); + + PassiveCompletableFuture voidPassiveCompletableFuture = + server.getCoordinatorService().submitJob(jobid, data); + voidPassiveCompletableFuture.join(); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf new file mode 100644 index 00000000000..6b059913175 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" + execution.checkpoint.interval = 5000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + parallelism = 1 + schema = { + fields { + name = "string" + age = "int" + } + } + } + + FakeSource { + result_table_name = "fake" + parallelism = 1 + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +transform { +} + +sink { + console { + source_table_name="fake" + } +} \ No newline at end of file