diff --git a/.gitignore b/.gitignore index 2318e11827c..de692025d0f 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,5 @@ test.conf log4j.properties spark-warehouse *.flattened-pom.xml + +connectors diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java index 67034db0490..dca149256c9 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.common.config; +import org.apache.commons.lang3.StringUtils; + import java.io.File; import java.net.URISyntaxException; import java.nio.file.Path; @@ -97,14 +99,24 @@ public static Path connectorRootDir(String engine) { * Plugin Connector Jar Dir */ public static Path connectorJarDir(String engine) { - return Paths.get(appRootDir().toString(), "connectors", engine.toLowerCase()); + String seatunnelHome = System.getProperty("SEATUNNEL_HOME"); + if (StringUtils.isBlank(seatunnelHome)) { + return Paths.get(appRootDir().toString(), "connectors", engine.toLowerCase()); + } else { + return Paths.get(seatunnelHome, "connectors", engine.toLowerCase()); + } } /** * Plugin Connector Dir */ public static Path connectorDir() { - return Paths.get(appRootDir().toString(), "connectors"); + String seatunnelHome = System.getProperty("SEATUNNEL_HOME"); + if (StringUtils.isBlank(seatunnelHome)) { + return Paths.get(appRootDir().toString(), "connectors"); + } else { + return Paths.get(seatunnelHome, "connectors"); + } } public static Path pluginTarball() { diff --git a/seatunnel-e2e/pom.xml b/seatunnel-e2e/pom.xml index 472f7b83fbe..35bf7fce7c7 100644 --- a/seatunnel-e2e/pom.xml +++ b/seatunnel-e2e/pom.xml @@ -38,6 +38,7 @@ seatunnel-flink-connector-v2-e2e seatunnel-spark-connector-v2-e2e seatunnel-flink-sql-e2e + seatunnel-engine-e2e diff --git a/seatunnel-e2e/seatunnel-engine-e2e/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml new file mode 100644 index 00000000000..292665a77c6 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml @@ -0,0 +1,70 @@ + + + + + + seatunnel-e2e + org.apache.seatunnel + ${revision} + + 4.0.0 + + seatunnel-engine-e2e + + + + com.hazelcast + hazelcast + test + tests + + + org.apache.seatunnel + seatunnel-engine-client + ${project.version} + + + org.apache.seatunnel + connector-file-local + + + org.apache.seatunnel + connector-fake + + + + + org.apache.seatunnel + seatunnel-connectors-v2-dist + ${project.version} + + + * + * + + + + + org.apache.seatunnel + seatunnel-engine-server + ${project.version} + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java new file mode 100644 index 00000000000..26e4defc954 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.Arrays; + +public class TestUtils { + public static String getResource(String confFile) { + return System.getProperty("user.dir") + "/src/test/resources" + confFile; + } + + public static void initPluginDir() { + // copy connectors to project_root/connectors dir + System.setProperty("SEATUNNEL_HOME", System.getProperty("user.dir") + + String.format("%s..%s..%s", File.separator, File.separator, File.separator)); + File seatunnelRootDir = new File(System.getProperty("SEATUNNEL_HOME")); + + File connectorDir = new File(seatunnelRootDir + + File.separator + + "connectors/seatunnel"); + + if (connectorDir.exists()) { + connectorDir.delete(); + } + + connectorDir.mkdirs(); + + File connectorDistDir = new File( + seatunnelRootDir + + File.separator + + "seatunnel-connectors-v2-dist" + + File.separator + + "target" + + File.separator + + "lib"); + + Arrays.stream(connectorDistDir.listFiles()).forEach(file -> { + if (file.getName().startsWith("connector-")) { + Path copied = Paths.get(connectorDir + File.separator + file.getName()); + Path originalPath = file.toPath(); + try { + Files.copy(originalPath, copied, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + + Path targetPluginMappingFile = Paths.get(seatunnelRootDir + + File.separator + + "connectors" + + File.separator + + "plugin-mapping.properties"); + + Path sourcePluginMappingFile = Paths.get(seatunnelRootDir + File.separator + "plugin-mapping.properties"); + try { + Files.copy(sourcePluginMappingFile, targetPluginMappingFile, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java new file mode 100644 index 00000000000..fa6679e3d77 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e.engine; + +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; +import org.apache.seatunnel.engine.client.job.JobProxy; +import org.apache.seatunnel.engine.common.config.ConfigProvider; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig; +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.e2e.TestUtils; +import org.apache.seatunnel.engine.server.SeaTunnelNodeContext; + +import com.google.common.collect.Lists; +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.instance.impl.HazelcastInstanceFactory; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutionException; + +public class JobExecutionIT { + private static final Logger LOGGER = LoggerFactory.getLogger(JobExecutionIT.class); + + @BeforeClass + public static void beforeClass() throws Exception { + SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(), + Thread.currentThread().getName(), + new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig())); + } + + @Test + public void testSayHello() { + SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig(); + seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:5801")); + SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig); + + String msg = "Hello world"; + String s = engineClient.printMessageToMaster(msg); + Assert.assertEquals(msg, s); + } + + @Test + public void testExecuteJob() { + TestUtils.initPluginDir(); + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName("fake_to_file"); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); + JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig); + + JobProxy jobProxy = null; + try { + jobProxy = jobExecutionEnv.execute(); + JobStatus jobStatus = jobProxy.waitForJobComplete(); + Assert.assertEquals(JobStatus.FINISHED, jobStatus); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file.conf new file mode 100644 index 00000000000..c6f0f7bbf62 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file.conf @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "STREAMING" + execution.checkpoint.interval = 5000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + field_name = "name,age", + parallelism = 3 + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql +} + +sink { + LocalFile { + path="file:///tmp/hive/warehouse/test2" + field_delimiter="\t" + row_delimiter="\n" + partition_by=["age"] + partition_dir_expression="${k0}=${v0}" + is_partition_field_write_in_file=true + file_name_expression="${transactionId}_${now}" + file_format="text" + sink_columns=["name","age"] + filename_time_format="yyyy.MM.dd" + is_enable_transaction=true + save_mode="error" + + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file_complex.conf new file mode 100644 index 00000000000..dc73c2f09fd --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file_complex.conf @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "STREAMING" + execution.checkpoint.interval = 5000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + field_name = "name,age", + parallelism = 3 + } + + FakeSource { + result_table_name = "fake" + field_name = "name,age", + parallelism = 3 + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql +} + +sink { + LocalFile { + path="file:///tmp/hive/warehouse/test2" + field_delimiter="\t" + row_delimiter="\n" + partition_by=["age"] + partition_dir_expression="${k0}=${v0}" + is_partition_field_write_in_file=true + file_name_expression="${transactionId}_${now}" + file_format="text" + sink_columns=["name","age"] + filename_time_format="yyyy.MM.dd" + is_enable_transaction=true + save_mode="error", + source_table_name="fake" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java index d460deb8a2c..d4e8f703d45 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java @@ -35,7 +35,9 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import java.net.URL; +import java.util.HashSet; import java.util.List; +import java.util.Set; import scala.Serializable; @@ -44,7 +46,7 @@ private ConnectorInstanceLoader() { throw new IllegalStateException("Utility class"); } - public static ImmutablePair> loadSourceInstance(Config sourceConfig) { + public static ImmutablePair> loadSourceInstance(Config sourceConfig) { SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery(); PluginIdentifier pluginIdentifier = PluginIdentifier.of( CollectionConstants.SEATUNNEL_PLUGIN, @@ -61,10 +63,10 @@ public static ImmutablePair> loadSourceInstance(Confi throw new UnsupportedOperationException( String.format("'%s' source don't support off-line job.", seaTunnelSource.getPluginName())); } - return new ImmutablePair<>(seaTunnelSource, pluginJarPaths); + return new ImmutablePair<>(seaTunnelSource, new HashSet<>(pluginJarPaths)); } - public static ImmutablePair, List> loadSinkInstance( + public static ImmutablePair, Set> loadSinkInstance( Config sinkConfig) { SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); PluginIdentifier pluginIdentifier = PluginIdentifier.of( @@ -75,11 +77,12 @@ public static ImmutablePair seaTunnelSink = sinkPluginDiscovery.createPluginInstance(pluginIdentifier); seaTunnelSink.prepare(sinkConfig); + seaTunnelSink.setTypeInfo(null); seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext()); - return new ImmutablePair<>(seaTunnelSink, pluginJarPaths); + return new ImmutablePair<>(seaTunnelSink, new HashSet<>(pluginJarPaths)); } - public static ImmutablePair, List> loadTransformInstance(Config transformConfig) { + public static ImmutablePair, Set> loadTransformInstance(Config transformConfig) { SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery(); PluginIdentifier pluginIdentifier = PluginIdentifier.of( CollectionConstants.SEATUNNEL_PLUGIN, @@ -89,6 +92,6 @@ public static ImmutablePair, List> loadTransformInsta List pluginJarPaths = transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)); SeaTunnelTransform seaTunnelTransform = transformPluginDiscovery.createPluginInstance(pluginIdentifier); - return new ImmutablePair<>(seaTunnelTransform, pluginJarPaths); + return new ImmutablePair<>(seaTunnelTransform, new HashSet<>(pluginJarPaths)); } } diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java index 4d1850bcf15..31cfa9a1661 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java @@ -124,7 +124,7 @@ private void complexAnalyze(List sourceConfigs, initRelationMap(sourceConfigs, transformConfigs); for (Config config : sinkConfigs) { - ImmutablePair, List> + ImmutablePair, Set> sinkListImmutablePair = ConnectorInstanceLoader.loadSinkInstance(config); @@ -165,7 +165,7 @@ private SeaTunnelDataType sourceAnalyze(String sourceTableName, Action action) { SeaTunnelDataType dataType = null; AtomicInteger totalParallelism = new AtomicInteger(); for (Config sourceConfig : sourceConfigList) { - ImmutablePair> seaTunnelSourceListImmutablePair = + ImmutablePair> seaTunnelSourceListImmutablePair = ConnectorInstanceLoader.loadSourceInstance(sourceConfig); dataType = seaTunnelSourceListImmutablePair.getLeft().getProducedType(); SourceAction sourceAction = createSourceAction( @@ -192,7 +192,7 @@ private SeaTunnelDataType transformAnalyze(String sourceTableName, Action act AtomicInteger totalParallelism = new AtomicInteger(); SeaTunnelDataType dataTypeResult = null; for (Config config : transformConfigList) { - ImmutablePair, List> transformListImmutablePair = + ImmutablePair, Set> transformListImmutablePair = ConnectorInstanceLoader.loadTransformInstance(config); TransformAction transformAction = createTransformAction( idGenerator.getNextId(), @@ -256,20 +256,20 @@ private void initRelationMap(List sourceConfigs, List sourceConfigs, List transformConfigs, List sinkConfigs) { - ImmutablePair> pair = + ImmutablePair> pair = ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0)); SourceAction sourceAction = createSourceAction(idGenerator.getNextId(), pair.getLeft().getPluginName(), pair.getLeft(), pair.getRight()); sourceAction.setParallelism(getSourceParallelism(sourceConfigs.get(0))); SeaTunnelDataType dataType = sourceAction.getSource().getProducedType(); - ImmutablePair, List> + ImmutablePair, Set> sinkListImmutablePair = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0)); Action sinkUpstreamAction = sourceAction; if (!CollectionUtils.isEmpty(transformConfigs)) { - ImmutablePair, List> transformListImmutablePair = + ImmutablePair, Set> transformListImmutablePair = ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0)); transformListImmutablePair.getLeft().setTypeInfo(dataType); @@ -323,7 +323,7 @@ private int getSourceParallelism(Config sourceConfig) { private SourceAction createSourceAction(long id, @NonNull String name, @NonNull SeaTunnelSource source, - List jarUrls) { + Set jarUrls) { if (!CollectionUtils.isEmpty(jarUrls)) { jarUrlsSet.addAll(jarUrls); } @@ -334,7 +334,7 @@ private TransformAction createTransformAction(long id, @NonNull String name, @NonNull List upstreams, @NonNull SeaTunnelTransform transformation, - List jarUrls) { + Set jarUrls) { if (!CollectionUtils.isEmpty(jarUrls)) { jarUrlsSet.addAll(jarUrls); } @@ -345,7 +345,7 @@ private SinkAction createSinkAction(long id, @NonNull String name, @NonNull List upstreams, @NonNull SeaTunnelSink sink, - List jarUrls) { + Set jarUrls) { if (!CollectionUtils.isEmpty(jarUrls)) { jarUrlsSet.addAll(jarUrls); } @@ -355,7 +355,7 @@ private SinkAction createSinkAction(long id, private TransformAction createTransformAction(long id, @NonNull String name, @NonNull SeaTunnelTransform transformation, - List jarUrls) { + Set jarUrls) { if (!CollectionUtils.isEmpty(jarUrls)) { jarUrlsSet.addAll(jarUrls); } @@ -365,7 +365,7 @@ private TransformAction createTransformAction(long id, private SinkAction createSinkAction(long id, @NonNull String name, @NonNull SeaTunnelSink sink, - List jarUrls) { + Set jarUrls) { if (!CollectionUtils.isEmpty(jarUrls)) { jarUrlsSet.addAll(jarUrls); } diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java index dcd0235e2f5..0ac8ac789ab 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java @@ -59,29 +59,27 @@ public void submitJob() throws ExecutionException, InterruptedException { } @Override - public void waitForJobComplete() { + public JobStatus waitForJobComplete() { + JobStatus jobStatus = null; PassiveCompletableFuture jobFuture = seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture( SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobImmutableInformation.getJobId()), response -> { return JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(response)]; }); - - jobFuture.whenComplete((v, t) -> { - if (null != t) { - LOGGER.info(String.format("Job %s (%s) end with state %s, and throw Exception: %s", - jobImmutableInformation.getJobId(), - jobImmutableInformation.getJobConfig().getName(), - v, - ExceptionUtils.getMessage(t))); - } else { - LOGGER.info(String.format("Job %s (%s) end with state %s", - jobImmutableInformation.getJobId(), - jobImmutableInformation.getJobConfig().getName(), - v)); - } - }); - - jobFuture.join(); + try { + jobStatus = jobFuture.get(); + LOGGER.info(String.format("Job %s (%s) end with state %s", + jobImmutableInformation.getJobId(), + jobImmutableInformation.getJobConfig().getName(), + jobStatus)); + } catch (InterruptedException | ExecutionException e) { + LOGGER.info(String.format("Job %s (%s) end with unknown state, and throw Exception: %s", + jobImmutableInformation.getJobId(), + jobImmutableInformation.getJobConfig().getName(), + ExceptionUtils.getMessage(e))); + throw new RuntimeException(e); + } + return jobStatus; } } diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index 6ed009de77d..9318d6509c7 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -24,11 +24,10 @@ import org.apache.seatunnel.engine.client.job.JobProxy; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; -import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.SeaTunnelNodeContext; -import com.google.common.collect.Lists; import com.hazelcast.client.config.ClientConfig; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.instance.impl.HazelcastInstanceFactory; @@ -57,9 +56,8 @@ public void beforeClass() throws Exception { @Test public void testSayHello() { - SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig(); - seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:5801")); - SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig); + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); String msg = "Hello world"; String s = engineClient.printMessageToMaster(msg); @@ -81,7 +79,8 @@ public void testExecuteJob() { JobProxy jobProxy = null; try { jobProxy = jobExecutionEnv.execute(); - jobProxy.waitForJobComplete(); + JobStatus jobStatus = jobProxy.waitForJobComplete(); + Assert.assertEquals(JobStatus.FINISHED, jobStatus); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf index c6f0f7bbf62..7dc125e77a4 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf @@ -21,7 +21,7 @@ env { # You can set flink configuration here execution.parallelism = 1 - job.mode = "STREAMING" + job.mode = "BATCH" execution.checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } @@ -33,14 +33,9 @@ source { field_name = "name,age", parallelism = 3 } - - # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake } transform { - # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql } sink { @@ -59,7 +54,4 @@ sink { save_mode="error" } - - # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console } \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf index dc73c2f09fd..258c323523b 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf @@ -21,7 +21,7 @@ env { # You can set flink configuration here execution.parallelism = 1 - job.mode = "STREAMING" + job.mode = "BATCH" execution.checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } @@ -39,14 +39,9 @@ source { field_name = "name,age", parallelism = 3 } - - # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake } transform { - # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql } sink { @@ -65,7 +60,4 @@ sink { save_mode="error", source_table_name="fake" } - - # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console } \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java index 12f562c90dd..88697461e08 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java @@ -33,9 +33,4 @@ public EngineConfig setBackupCount(int newBackupCount) { this.backupCount = newBackupCount; return this; } - - public EngineConfig setServerExecutorPoolSize(int serverExecutorPoolSize) { - this.serverExecutorPoolSize = serverExecutorPoolSize; - return this; - } } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index 5d75a540a57..5531db478aa 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -71,11 +71,6 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) { getIntegerValue("backup-count", getTextContent(node)) ); break; - case "server-executor-pool-size": - engineConfig.setServerExecutorPoolSize( - getIntegerValue("server-executor-pool-size", getTextContent(node)) - ); - break; default: throw new AssertionError("Unrecognized element: " + name); } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java index 63aad155ac0..28c0f984b42 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java @@ -22,6 +22,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.List; +import java.util.Set; public abstract class AbstractAction implements Action { private String name; @@ -31,16 +32,16 @@ public abstract class AbstractAction implements Action { private int parallelism = 1; - private List jarUrls; + private Set jarUrls; - protected AbstractAction(long id, @NonNull String name, @NonNull List upstreams, @NonNull List jarUrls) { + protected AbstractAction(long id, @NonNull String name, @NonNull List upstreams, @NonNull Set jarUrls) { this.id = id; this.name = name; this.upstreams = upstreams; this.jarUrls = jarUrls; } - protected AbstractAction(long id, @NonNull String name, @NonNull List jarUrls) { + protected AbstractAction(long id, @NonNull String name, @NonNull Set jarUrls) { this.id = id; this.name = name; this.jarUrls = jarUrls; @@ -84,7 +85,7 @@ public long getId() { } @Override - public List getJarUrls() { + public Set getJarUrls() { return jarUrls; } } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java index e35e450f927..b06fcb7a7ee 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.net.URL; import java.util.List; +import java.util.Set; public interface Action extends Serializable { @NonNull @@ -40,5 +41,5 @@ public interface Action extends Serializable { long getId(); - List getJarUrls(); + Set getJarUrls(); } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java index 60e9acc114b..9fe9bd59930 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java @@ -23,6 +23,7 @@ import java.net.URL; import java.util.List; +import java.util.Set; public class PartitionTransformAction extends AbstractAction { @@ -32,7 +33,7 @@ public PartitionTransformAction(long id, @NonNull String name, @NonNull List upstreams, @NonNull PartitionSeaTunnelTransform partitionTransformation, - @NonNull List jarUrls) { + @NonNull Set jarUrls) { super(id, name, upstreams, jarUrls); this.partitionTransformation = partitionTransformation; } @@ -40,7 +41,7 @@ public PartitionTransformAction(long id, public PartitionTransformAction(long id, @NonNull String name, @NonNull PartitionSeaTunnelTransform partitionTransformation, - @NonNull List jarUrls) { + @NonNull Set jarUrls) { super(id, name, jarUrls); this.partitionTransformation = partitionTransformation; } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java index eef6c9f63ab..e9b83ead380 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java @@ -23,6 +23,7 @@ import java.net.URL; import java.util.List; +import java.util.Set; @SuppressWarnings("checkstyle:ClassTypeParameterName") public class SinkAction extends AbstractAction { @@ -32,7 +33,7 @@ public SinkAction(long id, @NonNull String name, @NonNull List upstreams, @NonNull SeaTunnelSink sink, - @NonNull List jarUrls) { + @NonNull Set jarUrls) { super(id, name, upstreams, jarUrls); this.sink = sink; } @@ -40,7 +41,7 @@ public SinkAction(long id, public SinkAction(long id, @NonNull String name, @NonNull SeaTunnelSink sink, - @NonNull List jarUrls) { + @NonNull Set jarUrls) { super(id, name, jarUrls); this.sink = sink; } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java index ed4fb19673d..0ce672b89d8 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java @@ -25,7 +25,7 @@ import java.io.Serializable; import java.net.URL; -import java.util.List; +import java.util.Set; public class SourceAction extends AbstractAction { @@ -35,7 +35,7 @@ public class SourceAction source, - @NonNull List jarUrls) { + @NonNull Set jarUrls) { super(id, name, Lists.newArrayList(), jarUrls); this.source = source; } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java index 4bdd6475ed5..2825233b5d6 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java @@ -23,6 +23,7 @@ import java.net.URL; import java.util.List; +import java.util.Set; public class TransformAction extends AbstractAction { private final SeaTunnelTransform transform; @@ -31,7 +32,7 @@ public TransformAction(long id, @NonNull String name, @NonNull List upstreams, @NonNull SeaTunnelTransform transform, - @NonNull List jarUrls) { + @NonNull Set jarUrls) { super(id, name, upstreams, jarUrls); this.transform = transform; } @@ -39,7 +40,7 @@ public TransformAction(long id, public TransformAction(long id, @NonNull String name, @NonNull SeaTunnelTransform transform, - @NonNull List jarUrls) { + @NonNull Set jarUrls) { super(id, name, jarUrls); this.transform = transform; } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java index 5c8745264ba..7ba5d90b9fe 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java @@ -23,6 +23,7 @@ import java.net.URL; import java.util.List; +import java.util.Set; public class TransformChainAction extends AbstractAction { @@ -32,7 +33,7 @@ public class TransformChainAction extends AbstractAction { public TransformChainAction(long id, @NonNull String name, @NonNull List upstreams, - @NonNull List jarUrls, + @NonNull Set jarUrls, @NonNull List> transforms) { super(id, name, upstreams, jarUrls); this.transforms = transforms; @@ -40,7 +41,7 @@ public TransformChainAction(long id, public TransformChainAction(long id, @NonNull String name, - @NonNull List jarUrls, + @NonNull Set jarUrls, @NonNull List> transforms) { super(id, name, jarUrls); this.transforms = transforms; diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java index 5ad043ed003..fa10655189b 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java @@ -24,5 +24,5 @@ public interface Job { void submitJob() throws ExecutionException, InterruptedException; - void waitForJobComplete(); + JobStatus waitForJobComplete(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java index 23f7613e20b..019a9779787 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.master.JobMaster; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.hazelcast.instance.impl.Node; import com.hazelcast.internal.serialization.Data; import com.hazelcast.internal.services.ManagedService; @@ -69,7 +70,8 @@ public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelCon this.liveOperationRegistry = new LiveOperationRegistry(); this.seaTunnelConfig = seaTunnelConfig; this.executorService = - Executors.newFixedThreadPool(seaTunnelConfig.getEngineConfig().getServerExecutorPoolSize()); + Executors.newCachedThreadPool(new ThreadFactoryBuilder() + .setNameFormat("seatunnel-server-executor-%d").build()); logger.info("SeaTunnel server start..."); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index 47133cdc4ef..df965176856 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.server.execution.ExecutionState; import org.apache.seatunnel.engine.server.execution.ProgressState; @@ -38,12 +39,15 @@ import org.apache.seatunnel.engine.server.execution.TaskTracker; import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation; +import com.google.common.collect.Lists; import com.hazelcast.internal.serialization.Data; +import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject; import com.hazelcast.logging.ILogger; import com.hazelcast.spi.impl.NodeEngineImpl; import com.hazelcast.spi.properties.HazelcastProperties; import lombok.NonNull; import lombok.SneakyThrows; +import org.apache.commons.collections4.CollectionUtils; import java.net.URL; import java.util.Collection; @@ -129,14 +133,21 @@ public PassiveCompletableFuture deployTask( nodeEngine.getSerializationService().toObject(taskImmutableInformation); Set jars = taskImmutableInfo.getJars(); - // TODO Use classloader load the connector jars and deserialize Task - taskGroup = nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup()); + if (!CollectionUtils.isEmpty(jars)) { + taskGroup = + CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(), + new SeatunnelChildFirstClassLoader(Lists.newArrayList(jars)), + taskImmutableInfo.getGroup()); + } else { + taskGroup = nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup()); + } return deployLocalTask(taskGroup, resultFuture); } catch (Throwable t) { logger.severe(String.format("TaskGroupID : %s deploy error with Exception: %s", taskGroup != null ? taskGroup.getId() : -1, ExceptionUtils.getMessage(t))); - resultFuture.complete(new TaskExecutionState(taskGroup != null ? taskGroup.getId() : -1, ExecutionState.FAILED, t)); + resultFuture.complete( + new TaskExecutionState(taskGroup != null ? taskGroup.getId() : -1, ExecutionState.FAILED, t)); } return new PassiveCompletableFuture<>(resultFuture); } @@ -167,7 +178,7 @@ public PassiveCompletableFuture deployLocalTask( cancellationFutures.put(taskGroup.getId(), cancellationFuture); } catch (Throwable t) { logger.severe(ExceptionUtils.getMessage(t)); - resultFuture.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, t)); + resultFuture.completeExceptionally(t); } return new PassiveCompletableFuture<>(resultFuture); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java index b895f1c69f3..69081fcc389 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java @@ -173,7 +173,7 @@ private void createExecutionVertex(LogicalVertex logicalVertex) { }); newAction = new TransformChainAction(newId, String.join("->", names), - new ArrayList<>(jars), + jars, transforms); } ExecutionVertex executionVertex = new ExecutionVertex(newId, newAction, logicalVertex.getParallelism()); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java index 00df7ebef10..b769f76be3e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java @@ -97,17 +97,12 @@ public PhysicalPlan(@NonNull List pipelineList, canceledPipelineNum.incrementAndGet(); } else if (PipelineState.FAILED.equals(v)) { LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job."); - cancelJob().whenComplete((v1, t1) -> { - LOGGER.severe(String.format("Cancel other pipelines complete")); - failedPipelineNum.incrementAndGet(); - }); + failedPipelineNum.incrementAndGet(); + cancelJob(); } else if (!PipelineState.FINISHED.equals(v)) { LOGGER.severe( "Pipeline Failed with Unknown PipelineState, Begin to cancel other pipelines in this job."); - cancelJob().whenComplete((v1, t1) -> { - LOGGER.severe(String.format("Cancel other pipelines complete")); - failedPipelineNum.incrementAndGet(); - }); + failedPipelineNum.incrementAndGet(); } if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java index 0f5dcdb52d5..6ac92406f68 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java @@ -204,7 +204,7 @@ private List getCommitterTask(List edges, flakeIdGenerator, pipelineIndex, totalPipelineNum, - null, + s.getJarUrls(), jobImmutableInformation, initializationTimestamp, nodeEngine); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java index f11019fa011..84f9c25db13 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java @@ -18,12 +18,15 @@ package org.apache.seatunnel.engine.server.dag.physical; import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex; import org.apache.seatunnel.engine.server.execution.ExecutionState; import org.apache.seatunnel.engine.server.execution.TaskExecutionState; import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl; +import org.apache.seatunnel.engine.server.operation.DeployTaskOperation; +import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation; import com.hazelcast.cluster.Address; import com.hazelcast.flakeidgen.FlakeIdGenerator; @@ -131,7 +134,7 @@ public PhysicalVertex(long physicalVertexId, "Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)]", jobImmutableInformation.getJobConfig().getName(), jobImmutableInformation.getJobId(), - pipelineIndex + 1, + pipelineIndex, totalPipelineNum, taskGroup.getTaskGroupName(), subTaskGroupIndex + 1, @@ -142,42 +145,34 @@ public PhysicalVertex(long physicalVertexId, @SuppressWarnings("checkstyle:MagicNumber") // This method must not throw an exception public void deploy(@NonNull Address address) { - /** - TaskGroupImmutableInformation taskGroupImmutableInformation = - new TaskGroupImmutableInformation(flakeIdGenerator.newId(), - nodeEngine.getSerializationService().toData(this.taskGroup), - this.pluginJarsUrls); - - try { - waitForCompleteByExecutionService = new NonCompletableFuture<>( - nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME, - new DeployTaskOperation(nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)), - address) - .invoke()); - } catch (Throwable th) { - LOGGER.severe(String.format("%s deploy error with Exception: %s", - this.taskFullName, - ExceptionUtils.getMessage(th))); - updateTaskState(ExecutionState.DEPLOYING, ExecutionState.FAILED); - taskFuture.complete( - new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED, null)); - }*/ - - waitForCompleteByExecutionService = new PassiveCompletableFuture<>(CompletableFuture.supplyAsync(() -> { - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return new TaskExecutionState(flakeIdGenerator.newId(), ExecutionState.FINISHED, null); - })); + TaskGroupImmutableInformation taskGroupImmutableInformation = + new TaskGroupImmutableInformation(flakeIdGenerator.newId(), + nodeEngine.getSerializationService().toData(this.taskGroup), + this.pluginJarsUrls); + + try { + waitForCompleteByExecutionService = new PassiveCompletableFuture<>( + nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME, + new DeployTaskOperation(nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)), + address) + .invoke()); + updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING); + } catch (Throwable th) { + LOGGER.severe(String.format("%s deploy error with Exception: %s", + this.taskFullName, + ExceptionUtils.getMessage(th))); + updateTaskState(ExecutionState.DEPLOYING, ExecutionState.FAILED); + taskFuture.complete( + new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED, th)); + } - updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING); waitForCompleteByExecutionService.whenComplete((v, t) -> { try { if (t != null) { LOGGER.severe("An unexpected error occurred while the task was running", t); - taskFuture.completeExceptionally(t); + taskFuture.complete( + new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED, + t)); } else { updateTaskState(executionState.get(), v.getExecutionState()); if (v.getThrowable() != null) { @@ -186,7 +181,7 @@ public void deploy(@NonNull Address address) { v.getExecutionState(), ExceptionUtils.getMessage(v.getThrowable()))); } else { - LOGGER.severe(String.format("%s end with state %s", + LOGGER.info(String.format("%s end with state %s", this.taskFullName, v.getExecutionState())); } @@ -196,7 +191,7 @@ public void deploy(@NonNull Address address) { LOGGER.severe( String.format("%s end with Exception: %s", this.taskFullName, ExceptionUtils.getMessage(th))); updateTaskState(ExecutionState.RUNNING, ExecutionState.FAILED); - v = new TaskExecutionState(v.getTaskExecutionId(), ExecutionState.FAILED, null); + v = new TaskExecutionState(v.getTaskExecutionId(), ExecutionState.FAILED, th); taskFuture.complete(v); } }); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java index 2a7c066896a..950a63e9508 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java @@ -99,7 +99,7 @@ public SubPlan(int pipelineIndex, "Job %s (%s), Pipeline: [(%d/%d)]", jobImmutableInformation.getJobConfig().getName(), jobImmutableInformation.getJobId(), - pipelineIndex + 1, + pipelineIndex, totalPipelineNum); Arrays.stream(this.waitForCompleteByPhysicalVertex).forEach(x -> { @@ -108,18 +108,16 @@ public SubPlan(int pipelineIndex, if (ExecutionState.CANCELED.equals(v.getExecutionState())) { canceledTaskNum.incrementAndGet(); } else if (ExecutionState.FAILED.equals(v.getExecutionState())) { - LOGGER.severe("Task Failed, Begin to cancel other tasks in this pipeline."); - cancelPipeline().whenComplete((v1, t1) -> { - LOGGER.severe("Cancel other tasks complete"); - failedTaskNum.incrementAndGet(); - }); + LOGGER.severe(String.format("Task Failed in %s, Begin to cancel other tasks in this pipeline.", + this.getPipelineFullName())); + failedTaskNum.incrementAndGet(); + cancelPipeline(); } else if (!ExecutionState.FINISHED.equals(v.getExecutionState())) { - LOGGER.severe( - "Task Failed with Unknown ExecutionState, Begin to cancel other tasks in this pipeline."); - cancelPipeline().whenComplete((v1, t1) -> { - LOGGER.severe("Cancel other tasks complete"); - failedTaskNum.incrementAndGet(); - }); + LOGGER.severe(String.format( + "Task Failed in %s, with Unknown ExecutionState, Begin to cancel other tasks in this pipeline.", + this.getPipelineFullName())); + failedTaskNum.incrementAndGet(); + cancelPipeline(); } if (finishedTaskNum.incrementAndGet() == (physicalVertexList.size() + coordinatorVertexList.size())) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 3af7ebe12b0..3a789538903 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; @@ -35,10 +36,12 @@ import com.hazelcast.flakeidgen.FlakeIdGenerator; import com.hazelcast.internal.serialization.Data; import com.hazelcast.jet.datamodel.Tuple2; +import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject; import com.hazelcast.logging.ILogger; import com.hazelcast.logging.Logger; import com.hazelcast.spi.impl.NodeEngine; import lombok.NonNull; +import org.apache.commons.collections4.CollectionUtils; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -73,7 +76,7 @@ public JobMaster(@NonNull Data jobImmutableInformationData, flakeIdGenerator = this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME); - this.resourceManager = new SimpleResourceManager(); + this.resourceManager = new SimpleResourceManager(this.nodeEngine); } public void init() throws Exception { @@ -83,8 +86,14 @@ public void init() throws Exception { LOGGER.info( "Job [" + jobImmutableInformation.getJobId() + "] jar urls " + jobImmutableInformation.getPluginJarsUrls()); - // TODO Use classloader load the connector jars and deserialize logicalDag - this.logicalDag = nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag()); + if (!CollectionUtils.isEmpty(jobImmutableInformation.getPluginJarsUrls())) { + this.logicalDag = + CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(), + new SeatunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls()), + jobImmutableInformation.getLogicalDag()); + } else { + this.logicalDag = nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag()); + } final Tuple2 planTuple = PlanUtils.fromLogicalDAG(logicalDag, nodeEngine, jobImmutableInformation, diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java index 74bba50a2f9..59bf5b20473 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java @@ -20,10 +20,10 @@ import org.apache.seatunnel.engine.common.exception.JobException; import com.hazelcast.cluster.Address; +import com.hazelcast.spi.impl.NodeEngine; import lombok.Data; import lombok.NonNull; -import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; @@ -33,23 +33,26 @@ public class SimpleResourceManager implements ResourceManager { // TODO We may need more detailed resource define, instead of the resource definition method of only Address. private Map> physicalVertexIdAndResourceMap = new HashMap<>(); + private final NodeEngine nodeEngine; + + public SimpleResourceManager(NodeEngine nodeEngine) { + this.nodeEngine = nodeEngine; + } + @SuppressWarnings("checkstyle:MagicNumber") @Override public Address applyForResource(long jobId, long taskId) { - try { - Map jobAddressMap = physicalVertexIdAndResourceMap.computeIfAbsent(jobId, k -> new HashMap<>()); + Map jobAddressMap = + physicalVertexIdAndResourceMap.computeIfAbsent(jobId, k -> new HashMap<>()); - Address localhost = - jobAddressMap.putIfAbsent(taskId, new Address("localhost", 5701)); - if (null == localhost) { - localhost = jobAddressMap.get(taskId); - } + Address localhost = + jobAddressMap.putIfAbsent(taskId, nodeEngine.getThisAddress()); + if (null == localhost) { + localhost = jobAddressMap.get(taskId); + } - return localhost; + return localhost; - } catch (UnknownHostException e) { - throw new RuntimeException(e); - } } @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java index bca5615c8b7..63e9ab4fbd9 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java @@ -36,8 +36,8 @@ * the {@link org.apache.seatunnel.api.source.SourceSplitEnumerator} */ public class SourceRegisterOperation extends Operation implements IdentifiedDataSerializable { - private static final int RETRY_TIME = 5; + private static final int RETRY_TIME_OUT = 2000; private TaskLocation readerTaskID; @@ -57,7 +57,8 @@ public void run() throws Exception { Address readerAddress = getCallerAddress(); RetryUtils.retryWithException(() -> { SourceSplitEnumeratorTask task = - server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID()).getTaskGroup().getTask(enumeratorTaskID.getTaskID()); + server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID()).getTaskGroup() + .getTask(enumeratorTaskID.getTaskID()); task.receivedReader(readerTaskID, readerAddress); return null; }, new RetryUtils.RetryMaterial(RETRY_TIME, true, diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java index b5f43ad3b7c..6ce4b928556 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java @@ -38,6 +38,7 @@ import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan; import org.apache.seatunnel.engine.server.dag.physical.PlanUtils; +import com.google.common.collect.Sets; import com.hazelcast.config.Config; import com.hazelcast.instance.impl.HazelcastInstanceFactory; import com.hazelcast.instance.impl.HazelcastInstanceImpl; @@ -67,7 +68,7 @@ public void before() { config.setInstanceName("test"); config.setClusterName("test"); instance = ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(config, - "taskTest", new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal(); + "taskTest", new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal(); nodeEngine = instance.node.nodeEngine; service = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME); } @@ -82,14 +83,14 @@ public void testTask() throws MalformedURLException { fakeSource.setSeaTunnelContext(SeaTunnelContext.getContext()); Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", fakeSource, - Collections.singletonList(new URL("file:///fake.jar"))); + Sets.newHashSet(new URL("file:///fake.jar"))); fake.setParallelism(3); LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3); ConsoleSink consoleSink = new ConsoleSink(); consoleSink.setSeaTunnelContext(SeaTunnelContext.getContext()); Action console = new SinkAction<>(idGenerator.getNextId(), "console", consoleSink, - Collections.singletonList(new URL("file:///console.jar"))); + Sets.newHashSet(new URL("file:///console.jar"))); console.setParallelism(3); LogicalVertex consoleVertex = new LogicalVertex(console.getId(), console, 3); @@ -105,7 +106,7 @@ public void testTask() throws MalformedURLException { config.setMode(JobMode.BATCH); JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1, - nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList()); + nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList()); PassiveCompletableFuture voidPassiveCompletableFuture = service.submitJob(jobImmutableInformation.getJobId(), @@ -120,15 +121,15 @@ public void testLogicalToPhysical() throws MalformedURLException { IdGenerator idGenerator = new IdGenerator(); Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", new FakeSource(), - Collections.singletonList(new URL("file:///fake.jar"))); + Sets.newHashSet(new URL("file:///fake.jar"))); LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 2); Action fake2 = new SourceAction<>(idGenerator.getNextId(), "fake", new FakeSource(), - Collections.singletonList(new URL("file:///fake.jar"))); + Sets.newHashSet(new URL("file:///fake.jar"))); LogicalVertex fake2Vertex = new LogicalVertex(fake2.getId(), fake2, 2); Action console = new SinkAction<>(idGenerator.getNextId(), "console", new ConsoleSink(), - Collections.singletonList(new URL("file:///console.jar"))); + Sets.newHashSet(new URL("file:///console.jar"))); LogicalVertex consoleVertex = new LogicalVertex(console.getId(), console, 2); LogicalEdge edge = new LogicalEdge(fakeVertex, consoleVertex); @@ -143,7 +144,7 @@ public void testLogicalToPhysical() throws MalformedURLException { config.setMode(JobMode.BATCH); JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1, - nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList()); + nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList()); PhysicalPlan physicalPlan = PlanUtils.fromLogicalDAG(logicalDag, nodeEngine, jobImmutableInformation, @@ -160,5 +161,4 @@ public void testLogicalToPhysical() throws MalformedURLException { public void after() { instance.shutdown(); } - } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java index 689094d213a..73bf8612d17 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.engine.server.execution; +import org.apache.seatunnel.common.utils.ExceptionUtils; + import com.hazelcast.logging.ILogger; import lombok.NonNull; @@ -48,8 +50,7 @@ public ProgressState call() { try { Thread.sleep(sleep); } catch (InterruptedException e) { - // The test needs to do that - logger.info(e.getMessage()); + logger.severe(ExceptionUtils.getMessage(e)); } progressState = ProgressState.MADE_PROGRESS; } else { diff --git a/seatunnel-examples/pom.xml b/seatunnel-examples/pom.xml index 6a5dba663c1..6a73ecebfd5 100644 --- a/seatunnel-examples/pom.xml +++ b/seatunnel-examples/pom.xml @@ -41,10 +41,14 @@ seatunnel-flink-sql-examples seatunnel-flink-connector-v2-example seatunnel-spark-connector-v2-example + seatunnel-engine-examples engine-all + + seatunnel-engine-examples + release diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-examples/seatunnel-engine-examples/pom.xml new file mode 100644 index 00000000000..9c3a5ca5d13 --- /dev/null +++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml @@ -0,0 +1,30 @@ + + + + + + seatunnel-examples + org.apache.seatunnel + ${revision} + + 4.0.0 + + seatunnel-engine-examples + \ No newline at end of file