diff --git a/.gitignore b/.gitignore
index 2318e11827c..de692025d0f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -42,3 +42,5 @@ test.conf
log4j.properties
spark-warehouse
*.flattened-pom.xml
+
+connectors
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
index 67034db0490..dca149256c9 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.common.config;
+import org.apache.commons.lang3.StringUtils;
+
import java.io.File;
import java.net.URISyntaxException;
import java.nio.file.Path;
@@ -97,14 +99,24 @@ public static Path connectorRootDir(String engine) {
* Plugin Connector Jar Dir
*/
public static Path connectorJarDir(String engine) {
- return Paths.get(appRootDir().toString(), "connectors", engine.toLowerCase());
+ String seatunnelHome = System.getProperty("SEATUNNEL_HOME");
+ if (StringUtils.isBlank(seatunnelHome)) {
+ return Paths.get(appRootDir().toString(), "connectors", engine.toLowerCase());
+ } else {
+ return Paths.get(seatunnelHome, "connectors", engine.toLowerCase());
+ }
}
/**
* Plugin Connector Dir
*/
public static Path connectorDir() {
- return Paths.get(appRootDir().toString(), "connectors");
+ String seatunnelHome = System.getProperty("SEATUNNEL_HOME");
+ if (StringUtils.isBlank(seatunnelHome)) {
+ return Paths.get(appRootDir().toString(), "connectors");
+ } else {
+ return Paths.get(seatunnelHome, "connectors");
+ }
}
public static Path pluginTarball() {
diff --git a/seatunnel-e2e/pom.xml b/seatunnel-e2e/pom.xml
index 472f7b83fbe..35bf7fce7c7 100644
--- a/seatunnel-e2e/pom.xml
+++ b/seatunnel-e2e/pom.xml
@@ -38,6 +38,7 @@
seatunnel-flink-connector-v2-e2e
seatunnel-spark-connector-v2-e2e
seatunnel-flink-sql-e2e
+ seatunnel-engine-e2e
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
new file mode 100644
index 00000000000..292665a77c6
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
@@ -0,0 +1,70 @@
+
+
+
+
+
+ seatunnel-e2e
+ org.apache.seatunnel
+ ${revision}
+
+ 4.0.0
+
+ seatunnel-engine-e2e
+
+
+
+ com.hazelcast
+ hazelcast
+ test
+ tests
+
+
+ org.apache.seatunnel
+ seatunnel-engine-client
+ ${project.version}
+
+
+ org.apache.seatunnel
+ connector-file-local
+
+
+ org.apache.seatunnel
+ connector-fake
+
+
+
+
+ org.apache.seatunnel
+ seatunnel-connectors-v2-dist
+ ${project.version}
+
+
+ *
+ *
+
+
+
+
+ org.apache.seatunnel
+ seatunnel-engine-server
+ ${project.version}
+
+
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
new file mode 100644
index 00000000000..26e4defc954
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+
+public class TestUtils {
+ public static String getResource(String confFile) {
+ return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+ }
+
+ public static void initPluginDir() {
+ // copy connectors to project_root/connectors dir
+ System.setProperty("SEATUNNEL_HOME", System.getProperty("user.dir") +
+ String.format("%s..%s..%s", File.separator, File.separator, File.separator));
+ File seatunnelRootDir = new File(System.getProperty("SEATUNNEL_HOME"));
+
+ File connectorDir = new File(seatunnelRootDir +
+ File.separator +
+ "connectors/seatunnel");
+
+ if (connectorDir.exists()) {
+ connectorDir.delete();
+ }
+
+ connectorDir.mkdirs();
+
+ File connectorDistDir = new File(
+ seatunnelRootDir +
+ File.separator +
+ "seatunnel-connectors-v2-dist" +
+ File.separator +
+ "target" +
+ File.separator +
+ "lib");
+
+ Arrays.stream(connectorDistDir.listFiles()).forEach(file -> {
+ if (file.getName().startsWith("connector-")) {
+ Path copied = Paths.get(connectorDir + File.separator + file.getName());
+ Path originalPath = file.toPath();
+ try {
+ Files.copy(originalPath, copied, StandardCopyOption.REPLACE_EXISTING);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ Path targetPluginMappingFile = Paths.get(seatunnelRootDir +
+ File.separator +
+ "connectors" +
+ File.separator +
+ "plugin-mapping.properties");
+
+ Path sourcePluginMappingFile = Paths.get(seatunnelRootDir + File.separator + "plugin-mapping.properties");
+ try {
+ Files.copy(sourcePluginMappingFile, targetPluginMappingFile, StandardCopyOption.REPLACE_EXISTING);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
new file mode 100644
index 00000000000..fa6679e3d77
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e.engine;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.JobProxy;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.e2e.TestUtils;
+import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+
+public class JobExecutionIT {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JobExecutionIT.class);
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+ HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
+ Thread.currentThread().getName(),
+ new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
+ }
+
+ @Test
+ public void testSayHello() {
+ SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig();
+ seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:5801"));
+ SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig);
+
+ String msg = "Hello world";
+ String s = engineClient.printMessageToMaster(msg);
+ Assert.assertEquals(msg, s);
+ }
+
+ @Test
+ public void testExecuteJob() {
+ TestUtils.initPluginDir();
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("fake_to_file");
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
+
+ JobProxy jobProxy = null;
+ try {
+ jobProxy = jobExecutionEnv.execute();
+ JobStatus jobStatus = jobProxy.waitForJobComplete();
+ Assert.assertEquals(JobStatus.FINISHED, jobStatus);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file.conf
new file mode 100644
index 00000000000..c6f0f7bbf62
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ execution.checkpoint.interval = 5000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age",
+ parallelism = 3
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ LocalFile {
+ path="file:///tmp/hive/warehouse/test2"
+ field_delimiter="\t"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="text"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error"
+
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file_complex.conf
new file mode 100644
index 00000000000..dc73c2f09fd
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file_complex.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ execution.checkpoint.interval = 5000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age",
+ parallelism = 3
+ }
+
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age",
+ parallelism = 3
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ LocalFile {
+ path="file:///tmp/hive/warehouse/test2"
+ field_delimiter="\t"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="text"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error",
+ source_table_name="fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
index d460deb8a2c..d4e8f703d45 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
@@ -35,7 +35,9 @@
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.net.URL;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import scala.Serializable;
@@ -44,7 +46,7 @@ private ConnectorInstanceLoader() {
throw new IllegalStateException("Utility class");
}
- public static ImmutablePair> loadSourceInstance(Config sourceConfig) {
+ public static ImmutablePair> loadSourceInstance(Config sourceConfig) {
SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
CollectionConstants.SEATUNNEL_PLUGIN,
@@ -61,10 +63,10 @@ public static ImmutablePair> loadSourceInstance(Confi
throw new UnsupportedOperationException(
String.format("'%s' source don't support off-line job.", seaTunnelSource.getPluginName()));
}
- return new ImmutablePair<>(seaTunnelSource, pluginJarPaths);
+ return new ImmutablePair<>(seaTunnelSource, new HashSet<>(pluginJarPaths));
}
- public static ImmutablePair, List> loadSinkInstance(
+ public static ImmutablePair, Set> loadSinkInstance(
Config sinkConfig) {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
@@ -75,11 +77,12 @@ public static ImmutablePair seaTunnelSink =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
+ seaTunnelSink.setTypeInfo(null);
seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
- return new ImmutablePair<>(seaTunnelSink, pluginJarPaths);
+ return new ImmutablePair<>(seaTunnelSink, new HashSet<>(pluginJarPaths));
}
- public static ImmutablePair, List> loadTransformInstance(Config transformConfig) {
+ public static ImmutablePair, Set> loadTransformInstance(Config transformConfig) {
SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
CollectionConstants.SEATUNNEL_PLUGIN,
@@ -89,6 +92,6 @@ public static ImmutablePair, List> loadTransformInsta
List pluginJarPaths = transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
SeaTunnelTransform> seaTunnelTransform =
transformPluginDiscovery.createPluginInstance(pluginIdentifier);
- return new ImmutablePair<>(seaTunnelTransform, pluginJarPaths);
+ return new ImmutablePair<>(seaTunnelTransform, new HashSet<>(pluginJarPaths));
}
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
index 4d1850bcf15..31cfa9a1661 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
@@ -124,7 +124,7 @@ private void complexAnalyze(List extends Config> sourceConfigs,
initRelationMap(sourceConfigs, transformConfigs);
for (Config config : sinkConfigs) {
- ImmutablePair, List>
+ ImmutablePair, Set>
sinkListImmutablePair =
ConnectorInstanceLoader.loadSinkInstance(config);
@@ -165,7 +165,7 @@ private SeaTunnelDataType sourceAnalyze(String sourceTableName, Action action) {
SeaTunnelDataType dataType = null;
AtomicInteger totalParallelism = new AtomicInteger();
for (Config sourceConfig : sourceConfigList) {
- ImmutablePair> seaTunnelSourceListImmutablePair =
+ ImmutablePair> seaTunnelSourceListImmutablePair =
ConnectorInstanceLoader.loadSourceInstance(sourceConfig);
dataType = seaTunnelSourceListImmutablePair.getLeft().getProducedType();
SourceAction sourceAction = createSourceAction(
@@ -192,7 +192,7 @@ private SeaTunnelDataType> transformAnalyze(String sourceTableName, Action act
AtomicInteger totalParallelism = new AtomicInteger();
SeaTunnelDataType> dataTypeResult = null;
for (Config config : transformConfigList) {
- ImmutablePair, List> transformListImmutablePair =
+ ImmutablePair, Set> transformListImmutablePair =
ConnectorInstanceLoader.loadTransformInstance(config);
TransformAction transformAction = createTransformAction(
idGenerator.getNextId(),
@@ -256,20 +256,20 @@ private void initRelationMap(List extends Config> sourceConfigs, List extend
private void sampleAnalyze(List extends Config> sourceConfigs,
List extends Config> transformConfigs,
List extends Config> sinkConfigs) {
- ImmutablePair> pair =
+ ImmutablePair> pair =
ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0));
SourceAction sourceAction =
createSourceAction(idGenerator.getNextId(), pair.getLeft().getPluginName(), pair.getLeft(),
pair.getRight());
sourceAction.setParallelism(getSourceParallelism(sourceConfigs.get(0)));
SeaTunnelDataType dataType = sourceAction.getSource().getProducedType();
- ImmutablePair, List>
+ ImmutablePair, Set>
sinkListImmutablePair = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0));
Action sinkUpstreamAction = sourceAction;
if (!CollectionUtils.isEmpty(transformConfigs)) {
- ImmutablePair, List> transformListImmutablePair =
+ ImmutablePair, Set> transformListImmutablePair =
ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0));
transformListImmutablePair.getLeft().setTypeInfo(dataType);
@@ -323,7 +323,7 @@ private int getSourceParallelism(Config sourceConfig) {
private SourceAction createSourceAction(long id,
@NonNull String name,
@NonNull SeaTunnelSource source,
- List jarUrls) {
+ Set jarUrls) {
if (!CollectionUtils.isEmpty(jarUrls)) {
jarUrlsSet.addAll(jarUrls);
}
@@ -334,7 +334,7 @@ private TransformAction createTransformAction(long id,
@NonNull String name,
@NonNull List upstreams,
@NonNull SeaTunnelTransform transformation,
- List jarUrls) {
+ Set jarUrls) {
if (!CollectionUtils.isEmpty(jarUrls)) {
jarUrlsSet.addAll(jarUrls);
}
@@ -345,7 +345,7 @@ private SinkAction createSinkAction(long id,
@NonNull String name,
@NonNull List upstreams,
@NonNull SeaTunnelSink sink,
- List jarUrls) {
+ Set jarUrls) {
if (!CollectionUtils.isEmpty(jarUrls)) {
jarUrlsSet.addAll(jarUrls);
}
@@ -355,7 +355,7 @@ private SinkAction createSinkAction(long id,
private TransformAction createTransformAction(long id,
@NonNull String name,
@NonNull SeaTunnelTransform transformation,
- List jarUrls) {
+ Set jarUrls) {
if (!CollectionUtils.isEmpty(jarUrls)) {
jarUrlsSet.addAll(jarUrls);
}
@@ -365,7 +365,7 @@ private TransformAction createTransformAction(long id,
private SinkAction createSinkAction(long id,
@NonNull String name,
@NonNull SeaTunnelSink sink,
- List jarUrls) {
+ Set jarUrls) {
if (!CollectionUtils.isEmpty(jarUrls)) {
jarUrlsSet.addAll(jarUrls);
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java
index dcd0235e2f5..0ac8ac789ab 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java
@@ -59,29 +59,27 @@ public void submitJob() throws ExecutionException, InterruptedException {
}
@Override
- public void waitForJobComplete() {
+ public JobStatus waitForJobComplete() {
+ JobStatus jobStatus = null;
PassiveCompletableFuture jobFuture =
seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobImmutableInformation.getJobId()),
response -> {
return JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(response)];
});
-
- jobFuture.whenComplete((v, t) -> {
- if (null != t) {
- LOGGER.info(String.format("Job %s (%s) end with state %s, and throw Exception: %s",
- jobImmutableInformation.getJobId(),
- jobImmutableInformation.getJobConfig().getName(),
- v,
- ExceptionUtils.getMessage(t)));
- } else {
- LOGGER.info(String.format("Job %s (%s) end with state %s",
- jobImmutableInformation.getJobId(),
- jobImmutableInformation.getJobConfig().getName(),
- v));
- }
- });
-
- jobFuture.join();
+ try {
+ jobStatus = jobFuture.get();
+ LOGGER.info(String.format("Job %s (%s) end with state %s",
+ jobImmutableInformation.getJobId(),
+ jobImmutableInformation.getJobConfig().getName(),
+ jobStatus));
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.info(String.format("Job %s (%s) end with unknown state, and throw Exception: %s",
+ jobImmutableInformation.getJobId(),
+ jobImmutableInformation.getJobConfig().getName(),
+ ExceptionUtils.getMessage(e)));
+ throw new RuntimeException(e);
+ }
+ return jobStatus;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 6ed009de77d..9318d6509c7 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -24,11 +24,10 @@
import org.apache.seatunnel.engine.client.job.JobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
-import com.google.common.collect.Lists;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
@@ -57,9 +56,8 @@ public void beforeClass() throws Exception {
@Test
public void testSayHello() {
- SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig();
- seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:5801"));
- SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig);
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
String msg = "Hello world";
String s = engineClient.printMessageToMaster(msg);
@@ -81,7 +79,8 @@ public void testExecuteJob() {
JobProxy jobProxy = null;
try {
jobProxy = jobExecutionEnv.execute();
- jobProxy.waitForJobComplete();
+ JobStatus jobStatus = jobProxy.waitForJobComplete();
+ Assert.assertEquals(JobStatus.FINISHED, jobStatus);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf
index c6f0f7bbf62..7dc125e77a4 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf
@@ -21,7 +21,7 @@
env {
# You can set flink configuration here
execution.parallelism = 1
- job.mode = "STREAMING"
+ job.mode = "BATCH"
execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
@@ -33,14 +33,9 @@ source {
field_name = "name,age",
parallelism = 3
}
-
- # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
}
transform {
- # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}
sink {
@@ -59,7 +54,4 @@ sink {
save_mode="error"
}
-
- # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf
index dc73c2f09fd..258c323523b 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf
@@ -21,7 +21,7 @@
env {
# You can set flink configuration here
execution.parallelism = 1
- job.mode = "STREAMING"
+ job.mode = "BATCH"
execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
@@ -39,14 +39,9 @@ source {
field_name = "name,age",
parallelism = 3
}
-
- # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
}
transform {
- # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}
sink {
@@ -65,7 +60,4 @@ sink {
save_mode="error",
source_table_name="fake"
}
-
- # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index 12f562c90dd..88697461e08 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -33,9 +33,4 @@ public EngineConfig setBackupCount(int newBackupCount) {
this.backupCount = newBackupCount;
return this;
}
-
- public EngineConfig setServerExecutorPoolSize(int serverExecutorPoolSize) {
- this.serverExecutorPoolSize = serverExecutorPoolSize;
- return this;
- }
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 5d75a540a57..5531db478aa 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -71,11 +71,6 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
getIntegerValue("backup-count", getTextContent(node))
);
break;
- case "server-executor-pool-size":
- engineConfig.setServerExecutorPoolSize(
- getIntegerValue("server-executor-pool-size", getTextContent(node))
- );
- break;
default:
throw new AssertionError("Unrecognized element: " + name);
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
index 63aad155ac0..28c0f984b42 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
@@ -22,6 +22,7 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
public abstract class AbstractAction implements Action {
private String name;
@@ -31,16 +32,16 @@ public abstract class AbstractAction implements Action {
private int parallelism = 1;
- private List jarUrls;
+ private Set jarUrls;
- protected AbstractAction(long id, @NonNull String name, @NonNull List upstreams, @NonNull List jarUrls) {
+ protected AbstractAction(long id, @NonNull String name, @NonNull List upstreams, @NonNull Set jarUrls) {
this.id = id;
this.name = name;
this.upstreams = upstreams;
this.jarUrls = jarUrls;
}
- protected AbstractAction(long id, @NonNull String name, @NonNull List jarUrls) {
+ protected AbstractAction(long id, @NonNull String name, @NonNull Set jarUrls) {
this.id = id;
this.name = name;
this.jarUrls = jarUrls;
@@ -84,7 +85,7 @@ public long getId() {
}
@Override
- public List getJarUrls() {
+ public Set getJarUrls() {
return jarUrls;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
index e35e450f927..b06fcb7a7ee 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
@@ -22,6 +22,7 @@
import java.io.Serializable;
import java.net.URL;
import java.util.List;
+import java.util.Set;
public interface Action extends Serializable {
@NonNull
@@ -40,5 +41,5 @@ public interface Action extends Serializable {
long getId();
- List getJarUrls();
+ Set getJarUrls();
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
index 60e9acc114b..9fe9bd59930 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
@@ -23,6 +23,7 @@
import java.net.URL;
import java.util.List;
+import java.util.Set;
public class PartitionTransformAction extends AbstractAction {
@@ -32,7 +33,7 @@ public PartitionTransformAction(long id,
@NonNull String name,
@NonNull List upstreams,
@NonNull PartitionSeaTunnelTransform partitionTransformation,
- @NonNull List jarUrls) {
+ @NonNull Set jarUrls) {
super(id, name, upstreams, jarUrls);
this.partitionTransformation = partitionTransformation;
}
@@ -40,7 +41,7 @@ public PartitionTransformAction(long id,
public PartitionTransformAction(long id,
@NonNull String name,
@NonNull PartitionSeaTunnelTransform partitionTransformation,
- @NonNull List jarUrls) {
+ @NonNull Set jarUrls) {
super(id, name, jarUrls);
this.partitionTransformation = partitionTransformation;
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
index eef6c9f63ab..e9b83ead380 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
@@ -23,6 +23,7 @@
import java.net.URL;
import java.util.List;
+import java.util.Set;
@SuppressWarnings("checkstyle:ClassTypeParameterName")
public class SinkAction extends AbstractAction {
@@ -32,7 +33,7 @@ public SinkAction(long id,
@NonNull String name,
@NonNull List upstreams,
@NonNull SeaTunnelSink sink,
- @NonNull List jarUrls) {
+ @NonNull Set jarUrls) {
super(id, name, upstreams, jarUrls);
this.sink = sink;
}
@@ -40,7 +41,7 @@ public SinkAction(long id,
public SinkAction(long id,
@NonNull String name,
@NonNull SeaTunnelSink sink,
- @NonNull List jarUrls) {
+ @NonNull Set jarUrls) {
super(id, name, jarUrls);
this.sink = sink;
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
index ed4fb19673d..0ce672b89d8 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
@@ -25,7 +25,7 @@
import java.io.Serializable;
import java.net.URL;
-import java.util.List;
+import java.util.Set;
public class SourceAction extends AbstractAction {
@@ -35,7 +35,7 @@ public class SourceAction source,
- @NonNull List jarUrls) {
+ @NonNull Set jarUrls) {
super(id, name, Lists.newArrayList(), jarUrls);
this.source = source;
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
index 4bdd6475ed5..2825233b5d6 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
@@ -23,6 +23,7 @@
import java.net.URL;
import java.util.List;
+import java.util.Set;
public class TransformAction extends AbstractAction {
private final SeaTunnelTransform> transform;
@@ -31,7 +32,7 @@ public TransformAction(long id,
@NonNull String name,
@NonNull List upstreams,
@NonNull SeaTunnelTransform> transform,
- @NonNull List jarUrls) {
+ @NonNull Set jarUrls) {
super(id, name, upstreams, jarUrls);
this.transform = transform;
}
@@ -39,7 +40,7 @@ public TransformAction(long id,
public TransformAction(long id,
@NonNull String name,
@NonNull SeaTunnelTransform> transform,
- @NonNull List jarUrls) {
+ @NonNull Set jarUrls) {
super(id, name, jarUrls);
this.transform = transform;
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
index 5c8745264ba..7ba5d90b9fe 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
@@ -23,6 +23,7 @@
import java.net.URL;
import java.util.List;
+import java.util.Set;
public class TransformChainAction extends AbstractAction {
@@ -32,7 +33,7 @@ public class TransformChainAction extends AbstractAction {
public TransformChainAction(long id,
@NonNull String name,
@NonNull List upstreams,
- @NonNull List jarUrls,
+ @NonNull Set jarUrls,
@NonNull List> transforms) {
super(id, name, upstreams, jarUrls);
this.transforms = transforms;
@@ -40,7 +41,7 @@ public TransformChainAction(long id,
public TransformChainAction(long id,
@NonNull String name,
- @NonNull List jarUrls,
+ @NonNull Set jarUrls,
@NonNull List> transforms) {
super(id, name, jarUrls);
this.transforms = transforms;
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index 5ad043ed003..fa10655189b 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -24,5 +24,5 @@ public interface Job {
void submitJob() throws ExecutionException, InterruptedException;
- void waitForJobComplete();
+ JobStatus waitForJobComplete();
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 23f7613e20b..019a9779787 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -23,6 +23,7 @@
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.master.JobMaster;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.services.ManagedService;
@@ -69,7 +70,8 @@ public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelCon
this.liveOperationRegistry = new LiveOperationRegistry();
this.seaTunnelConfig = seaTunnelConfig;
this.executorService =
- Executors.newFixedThreadPool(seaTunnelConfig.getEngineConfig().getServerExecutorPoolSize());
+ Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+ .setNameFormat("seatunnel-server-executor-%d").build());
logger.info("SeaTunnel server start...");
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 47133cdc4ef..df965176856 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -26,6 +26,7 @@
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.ProgressState;
@@ -38,12 +39,15 @@
import org.apache.seatunnel.engine.server.execution.TaskTracker;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
+import com.google.common.collect.Lists;
import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.properties.HazelcastProperties;
import lombok.NonNull;
import lombok.SneakyThrows;
+import org.apache.commons.collections4.CollectionUtils;
import java.net.URL;
import java.util.Collection;
@@ -129,14 +133,21 @@ public PassiveCompletableFuture deployTask(
nodeEngine.getSerializationService().toObject(taskImmutableInformation);
Set jars = taskImmutableInfo.getJars();
- // TODO Use classloader load the connector jars and deserialize Task
- taskGroup = nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
+ if (!CollectionUtils.isEmpty(jars)) {
+ taskGroup =
+ CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
+ new SeatunnelChildFirstClassLoader(Lists.newArrayList(jars)),
+ taskImmutableInfo.getGroup());
+ } else {
+ taskGroup = nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
+ }
return deployLocalTask(taskGroup, resultFuture);
} catch (Throwable t) {
logger.severe(String.format("TaskGroupID : %s deploy error with Exception: %s",
taskGroup != null ? taskGroup.getId() : -1,
ExceptionUtils.getMessage(t)));
- resultFuture.complete(new TaskExecutionState(taskGroup != null ? taskGroup.getId() : -1, ExecutionState.FAILED, t));
+ resultFuture.complete(
+ new TaskExecutionState(taskGroup != null ? taskGroup.getId() : -1, ExecutionState.FAILED, t));
}
return new PassiveCompletableFuture<>(resultFuture);
}
@@ -167,7 +178,7 @@ public PassiveCompletableFuture deployLocalTask(
cancellationFutures.put(taskGroup.getId(), cancellationFuture);
} catch (Throwable t) {
logger.severe(ExceptionUtils.getMessage(t));
- resultFuture.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, t));
+ resultFuture.completeExceptionally(t);
}
return new PassiveCompletableFuture<>(resultFuture);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index b895f1c69f3..69081fcc389 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -173,7 +173,7 @@ private void createExecutionVertex(LogicalVertex logicalVertex) {
});
newAction = new TransformChainAction(newId,
String.join("->", names),
- new ArrayList<>(jars),
+ jars,
transforms);
}
ExecutionVertex executionVertex = new ExecutionVertex(newId, newAction, logicalVertex.getParallelism());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 00df7ebef10..b769f76be3e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -97,17 +97,12 @@ public PhysicalPlan(@NonNull List pipelineList,
canceledPipelineNum.incrementAndGet();
} else if (PipelineState.FAILED.equals(v)) {
LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
- cancelJob().whenComplete((v1, t1) -> {
- LOGGER.severe(String.format("Cancel other pipelines complete"));
- failedPipelineNum.incrementAndGet();
- });
+ failedPipelineNum.incrementAndGet();
+ cancelJob();
} else if (!PipelineState.FINISHED.equals(v)) {
LOGGER.severe(
"Pipeline Failed with Unknown PipelineState, Begin to cancel other pipelines in this job.");
- cancelJob().whenComplete((v1, t1) -> {
- LOGGER.severe(String.format("Cancel other pipelines complete"));
- failedPipelineNum.incrementAndGet();
- });
+ failedPipelineNum.incrementAndGet();
}
if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 0f5dcdb52d5..6ac92406f68 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -204,7 +204,7 @@ private List getCommitterTask(List edges,
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
- null,
+ s.getJarUrls(),
jobImmutableInformation,
initializationTimestamp,
nodeEngine);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index f11019fa011..84f9c25db13 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -18,12 +18,15 @@
package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
+import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
+import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import com.hazelcast.cluster.Address;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
@@ -131,7 +134,7 @@ public PhysicalVertex(long physicalVertexId,
"Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)]",
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId(),
- pipelineIndex + 1,
+ pipelineIndex,
totalPipelineNum,
taskGroup.getTaskGroupName(),
subTaskGroupIndex + 1,
@@ -142,42 +145,34 @@ public PhysicalVertex(long physicalVertexId,
@SuppressWarnings("checkstyle:MagicNumber")
// This method must not throw an exception
public void deploy(@NonNull Address address) {
- /**
- TaskGroupImmutableInformation taskGroupImmutableInformation =
- new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
- nodeEngine.getSerializationService().toData(this.taskGroup),
- this.pluginJarsUrls);
-
- try {
- waitForCompleteByExecutionService = new NonCompletableFuture<>(
- nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
- new DeployTaskOperation(nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
- address)
- .invoke());
- } catch (Throwable th) {
- LOGGER.severe(String.format("%s deploy error with Exception: %s",
- this.taskFullName,
- ExceptionUtils.getMessage(th)));
- updateTaskState(ExecutionState.DEPLOYING, ExecutionState.FAILED);
- taskFuture.complete(
- new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED, null));
- }*/
-
- waitForCompleteByExecutionService = new PassiveCompletableFuture<>(CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- return new TaskExecutionState(flakeIdGenerator.newId(), ExecutionState.FINISHED, null);
- }));
+ TaskGroupImmutableInformation taskGroupImmutableInformation =
+ new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
+ nodeEngine.getSerializationService().toData(this.taskGroup),
+ this.pluginJarsUrls);
+
+ try {
+ waitForCompleteByExecutionService = new PassiveCompletableFuture<>(
+ nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+ new DeployTaskOperation(nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+ address)
+ .invoke());
+ updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
+ } catch (Throwable th) {
+ LOGGER.severe(String.format("%s deploy error with Exception: %s",
+ this.taskFullName,
+ ExceptionUtils.getMessage(th)));
+ updateTaskState(ExecutionState.DEPLOYING, ExecutionState.FAILED);
+ taskFuture.complete(
+ new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED, th));
+ }
- updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
waitForCompleteByExecutionService.whenComplete((v, t) -> {
try {
if (t != null) {
LOGGER.severe("An unexpected error occurred while the task was running", t);
- taskFuture.completeExceptionally(t);
+ taskFuture.complete(
+ new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED,
+ t));
} else {
updateTaskState(executionState.get(), v.getExecutionState());
if (v.getThrowable() != null) {
@@ -186,7 +181,7 @@ public void deploy(@NonNull Address address) {
v.getExecutionState(),
ExceptionUtils.getMessage(v.getThrowable())));
} else {
- LOGGER.severe(String.format("%s end with state %s",
+ LOGGER.info(String.format("%s end with state %s",
this.taskFullName,
v.getExecutionState()));
}
@@ -196,7 +191,7 @@ public void deploy(@NonNull Address address) {
LOGGER.severe(
String.format("%s end with Exception: %s", this.taskFullName, ExceptionUtils.getMessage(th)));
updateTaskState(ExecutionState.RUNNING, ExecutionState.FAILED);
- v = new TaskExecutionState(v.getTaskExecutionId(), ExecutionState.FAILED, null);
+ v = new TaskExecutionState(v.getTaskExecutionId(), ExecutionState.FAILED, th);
taskFuture.complete(v);
}
});
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 2a7c066896a..950a63e9508 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -99,7 +99,7 @@ public SubPlan(int pipelineIndex,
"Job %s (%s), Pipeline: [(%d/%d)]",
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId(),
- pipelineIndex + 1,
+ pipelineIndex,
totalPipelineNum);
Arrays.stream(this.waitForCompleteByPhysicalVertex).forEach(x -> {
@@ -108,18 +108,16 @@ public SubPlan(int pipelineIndex,
if (ExecutionState.CANCELED.equals(v.getExecutionState())) {
canceledTaskNum.incrementAndGet();
} else if (ExecutionState.FAILED.equals(v.getExecutionState())) {
- LOGGER.severe("Task Failed, Begin to cancel other tasks in this pipeline.");
- cancelPipeline().whenComplete((v1, t1) -> {
- LOGGER.severe("Cancel other tasks complete");
- failedTaskNum.incrementAndGet();
- });
+ LOGGER.severe(String.format("Task Failed in %s, Begin to cancel other tasks in this pipeline.",
+ this.getPipelineFullName()));
+ failedTaskNum.incrementAndGet();
+ cancelPipeline();
} else if (!ExecutionState.FINISHED.equals(v.getExecutionState())) {
- LOGGER.severe(
- "Task Failed with Unknown ExecutionState, Begin to cancel other tasks in this pipeline.");
- cancelPipeline().whenComplete((v1, t1) -> {
- LOGGER.severe("Cancel other tasks complete");
- failedTaskNum.incrementAndGet();
- });
+ LOGGER.severe(String.format(
+ "Task Failed in %s, with Unknown ExecutionState, Begin to cancel other tasks in this pipeline.",
+ this.getPipelineFullName()));
+ failedTaskNum.incrementAndGet();
+ cancelPipeline();
}
if (finishedTaskNum.incrementAndGet() == (physicalVertexList.size() + coordinatorVertexList.size())) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 3af7ebe12b0..3a789538903 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -19,6 +19,7 @@
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
@@ -35,10 +36,12 @@
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.jet.datamodel.Tuple2;
+import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.NodeEngine;
import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -73,7 +76,7 @@ public JobMaster(@NonNull Data jobImmutableInformationData,
flakeIdGenerator =
this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
- this.resourceManager = new SimpleResourceManager();
+ this.resourceManager = new SimpleResourceManager(this.nodeEngine);
}
public void init() throws Exception {
@@ -83,8 +86,14 @@ public void init() throws Exception {
LOGGER.info(
"Job [" + jobImmutableInformation.getJobId() + "] jar urls " + jobImmutableInformation.getPluginJarsUrls());
- // TODO Use classloader load the connector jars and deserialize logicalDag
- this.logicalDag = nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag());
+ if (!CollectionUtils.isEmpty(jobImmutableInformation.getPluginJarsUrls())) {
+ this.logicalDag =
+ CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
+ new SeatunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls()),
+ jobImmutableInformation.getLogicalDag());
+ } else {
+ this.logicalDag = nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag());
+ }
final Tuple2 planTuple = PlanUtils.fromLogicalDAG(logicalDag,
nodeEngine,
jobImmutableInformation,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
index 74bba50a2f9..59bf5b20473 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
@@ -20,10 +20,10 @@
import org.apache.seatunnel.engine.common.exception.JobException;
import com.hazelcast.cluster.Address;
+import com.hazelcast.spi.impl.NodeEngine;
import lombok.Data;
import lombok.NonNull;
-import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
@@ -33,23 +33,26 @@ public class SimpleResourceManager implements ResourceManager {
// TODO We may need more detailed resource define, instead of the resource definition method of only Address.
private Map> physicalVertexIdAndResourceMap = new HashMap<>();
+ private final NodeEngine nodeEngine;
+
+ public SimpleResourceManager(NodeEngine nodeEngine) {
+ this.nodeEngine = nodeEngine;
+ }
+
@SuppressWarnings("checkstyle:MagicNumber")
@Override
public Address applyForResource(long jobId, long taskId) {
- try {
- Map jobAddressMap = physicalVertexIdAndResourceMap.computeIfAbsent(jobId, k -> new HashMap<>());
+ Map jobAddressMap =
+ physicalVertexIdAndResourceMap.computeIfAbsent(jobId, k -> new HashMap<>());
- Address localhost =
- jobAddressMap.putIfAbsent(taskId, new Address("localhost", 5701));
- if (null == localhost) {
- localhost = jobAddressMap.get(taskId);
- }
+ Address localhost =
+ jobAddressMap.putIfAbsent(taskId, nodeEngine.getThisAddress());
+ if (null == localhost) {
+ localhost = jobAddressMap.get(taskId);
+ }
- return localhost;
+ return localhost;
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index bca5615c8b7..63e9ab4fbd9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -36,8 +36,8 @@
* the {@link org.apache.seatunnel.api.source.SourceSplitEnumerator}
*/
public class SourceRegisterOperation extends Operation implements IdentifiedDataSerializable {
-
private static final int RETRY_TIME = 5;
+
private static final int RETRY_TIME_OUT = 2000;
private TaskLocation readerTaskID;
@@ -57,7 +57,8 @@ public void run() throws Exception {
Address readerAddress = getCallerAddress();
RetryUtils.retryWithException(() -> {
SourceSplitEnumeratorTask> task =
- server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID()).getTaskGroup().getTask(enumeratorTaskID.getTaskID());
+ server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID()).getTaskGroup()
+ .getTask(enumeratorTaskID.getTaskID());
task.receivedReader(readerTaskID, readerAddress);
return null;
}, new RetryUtils.RetryMaterial(RETRY_TIME, true,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index b5f43ad3b7c..6ce4b928556 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -38,6 +38,7 @@
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
+import com.google.common.collect.Sets;
import com.hazelcast.config.Config;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
@@ -67,7 +68,7 @@ public void before() {
config.setInstanceName("test");
config.setClusterName("test");
instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(config,
- "taskTest", new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
+ "taskTest", new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
nodeEngine = instance.node.nodeEngine;
service = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
}
@@ -82,14 +83,14 @@ public void testTask() throws MalformedURLException {
fakeSource.setSeaTunnelContext(SeaTunnelContext.getContext());
Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", fakeSource,
- Collections.singletonList(new URL("file:///fake.jar")));
+ Sets.newHashSet(new URL("file:///fake.jar")));
fake.setParallelism(3);
LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
ConsoleSink consoleSink = new ConsoleSink();
consoleSink.setSeaTunnelContext(SeaTunnelContext.getContext());
Action console = new SinkAction<>(idGenerator.getNextId(), "console", consoleSink,
- Collections.singletonList(new URL("file:///console.jar")));
+ Sets.newHashSet(new URL("file:///console.jar")));
console.setParallelism(3);
LogicalVertex consoleVertex = new LogicalVertex(console.getId(), console, 3);
@@ -105,7 +106,7 @@ public void testTask() throws MalformedURLException {
config.setMode(JobMode.BATCH);
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
- nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
+ nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
PassiveCompletableFuture voidPassiveCompletableFuture =
service.submitJob(jobImmutableInformation.getJobId(),
@@ -120,15 +121,15 @@ public void testLogicalToPhysical() throws MalformedURLException {
IdGenerator idGenerator = new IdGenerator();
Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", new FakeSource(),
- Collections.singletonList(new URL("file:///fake.jar")));
+ Sets.newHashSet(new URL("file:///fake.jar")));
LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 2);
Action fake2 = new SourceAction<>(idGenerator.getNextId(), "fake", new FakeSource(),
- Collections.singletonList(new URL("file:///fake.jar")));
+ Sets.newHashSet(new URL("file:///fake.jar")));
LogicalVertex fake2Vertex = new LogicalVertex(fake2.getId(), fake2, 2);
Action console = new SinkAction<>(idGenerator.getNextId(), "console", new ConsoleSink(),
- Collections.singletonList(new URL("file:///console.jar")));
+ Sets.newHashSet(new URL("file:///console.jar")));
LogicalVertex consoleVertex = new LogicalVertex(console.getId(), console, 2);
LogicalEdge edge = new LogicalEdge(fakeVertex, consoleVertex);
@@ -143,7 +144,7 @@ public void testLogicalToPhysical() throws MalformedURLException {
config.setMode(JobMode.BATCH);
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
- nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
+ nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
PhysicalPlan physicalPlan = PlanUtils.fromLogicalDAG(logicalDag, nodeEngine,
jobImmutableInformation,
@@ -160,5 +161,4 @@ public void testLogicalToPhysical() throws MalformedURLException {
public void after() {
instance.shutdown();
}
-
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java
index 689094d213a..73bf8612d17 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server.execution;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+
import com.hazelcast.logging.ILogger;
import lombok.NonNull;
@@ -48,8 +50,7 @@ public ProgressState call() {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
- // The test needs to do that
- logger.info(e.getMessage());
+ logger.severe(ExceptionUtils.getMessage(e));
}
progressState = ProgressState.MADE_PROGRESS;
} else {
diff --git a/seatunnel-examples/pom.xml b/seatunnel-examples/pom.xml
index 6a5dba663c1..6a73ecebfd5 100644
--- a/seatunnel-examples/pom.xml
+++ b/seatunnel-examples/pom.xml
@@ -41,10 +41,14 @@
seatunnel-flink-sql-examples
seatunnel-flink-connector-v2-example
seatunnel-spark-connector-v2-example
+ seatunnel-engine-examples
engine-all
+
+ seatunnel-engine-examples
+
release
diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-examples/seatunnel-engine-examples/pom.xml
new file mode 100644
index 00000000000..9c3a5ca5d13
--- /dev/null
+++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml
@@ -0,0 +1,30 @@
+
+
+
+
+
+ seatunnel-examples
+ org.apache.seatunnel
+ ${revision}
+
+ 4.0.0
+
+ seatunnel-engine-examples
+
\ No newline at end of file