From 9659430fa2e6b1b19fc04aa864cd1daa5e0667a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=97=AD=E6=97=A5=E4=B8=9C=E5=8D=87?= <59563468+Mr-LiuXu@users.noreply.github.com> Date: Tue, 8 Nov 2022 18:41:50 +0800 Subject: [PATCH 1/4] [Feature][Doc] Improved connectors v2 contribution guide (#3218) --- seatunnel-connectors-v2/README.md | 33 +++++++++++++++++++++++++--- seatunnel-connectors-v2/README.zh.md | 29 +++++++++++++++++++++++- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/seatunnel-connectors-v2/README.md b/seatunnel-connectors-v2/README.md index e6a6c600614..61aad40f8de 100644 --- a/seatunnel-connectors-v2/README.md +++ b/seatunnel-connectors-v2/README.md @@ -10,6 +10,15 @@ this [issue](https://github.com/apache/incubator-seatunnel/issues/1608) for deta In order to separate from the old code, we have defined new modules for execution flow. This facilitates parallel development at the current stage, and reduces the difficulty of merging. +### engineering structure + +- ../`seatunnel-connectors-v2` connector-v2 code implementation +- ../`seatunnel-translation` translation layer for the connector-v2 +- ../seatunnel-e2e/`seatunnel-flink-connector-v2-e2e` end to end testcase running on flink +- ../seatunnel-e2e/`seatunnel-spark-connector-v2-e2e` end to end testcase running on spark +- ../seatunnel-examples/`seatunnel-flink-connector-v2-example` seatunnel connector-v2 example use flink local running instance +- ../seatunnel-examples/`seatunnel-spark-connector-v2-example` seatunnel connector-v2 example use spark local running instance + ### **Example** We have prepared two new version of the locally executable example program in `seatunnel-examples`,one @@ -22,13 +31,31 @@ configuration files used in example are saved in the "resources/examples" folder own connectors, you need to follow the steps below. 1. Add the groupId, artifactId and version of the connector to be tested to - seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(or add it to - seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml when you want to runs it in Spark engine) as a + `seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml`(or add it to + `seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml` when you want to runs it in Spark engine) as a dependency. 2. Find the dependency in your connector pom file which scope is test or provided and then add them to seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(or add it to seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml) file and modify the scope to compile. -3. Refer to the SeaTunnelApiExample class to develop your sample code. +3. Add the task configuration file under resources/examples. +4. Configure the file in the `SeaTunnelApiExample` main method. +5. Just run the main method. + +### **Create new seatunnel v2 connector** + +1.Create a new module under the `seatunnel-connectors-v2` directory and name it connector - {connector name}. + +2.The pom file can refer to the pom file of the existing connector, and add the current sub model to the pom file of the parent model + +3.Create two packages corresponding to source and sink + +​ package org.apache.seatunnel.connectors.seatunnel.{connector name}}.source + +​ package org.apache.seatunnel.connectors.seatunnel.{connector name}}.sink + +4.add connector info to plugin-mapping.properties file in seatunnel root path. + +5.add connector dependency to seatunnel-dist/pom.xml, so the connector jar can be find in binary package. ### **Startup Class** diff --git a/seatunnel-connectors-v2/README.zh.md b/seatunnel-connectors-v2/README.zh.md index 02a59af2e16..4efbc6d594f 100644 --- a/seatunnel-connectors-v2/README.zh.md +++ b/seatunnel-connectors-v2/README.zh.md @@ -7,6 +7,15 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过 为了和老的代码分开,方便现阶段的并行开发,以及降低merge的难度。我们为新的执行流程定义了新的模块 +### **工程结构** + +- ../`seatunnel-connectors-v2` connector-v2代码实现 +- ../`seatunnel-translation` connector-v2的翻译层 +- ../seatunnel-e2e/`seatunnel-flink-connector-v2-e2e` flink上运行的端到端testcase +- ../seatunnel-e2e/`seatunnel-spark-connector-v2-e2e` spark上运行的端到端testcase +- ../seatunnel-examples/`seatunnel-flink-connector-v2-example` seatunnel connector-v2的flink local运行的实例 +- ../seatunnel-examples/`seatunnel-spark-connector-v2-example` seatunnel connector-v2的spark local运行的实例 + ### Example 我们已经在`seatunnel-examples` @@ -18,7 +27,25 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过 version.(或者当你想在spark引擎运行时在`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml`添加依赖) 2. 如果你的connector中存在scope为test或provided的依赖,将这些依赖添加到seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml( 或者在seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml)中,并且修改scope为compile. -3. 参考`SeaTunnelApiExample`开发自己的案例程序。 +3. 在resources/examples下添加任务配置文件. +4. 在`SeaTunnelApiExample` main方法中配置文件. +5. 运行main方法即可. + +### 创建新的seatunnel v2 connector + +1.在`seatunnel-connectors-v2`目录下新建一个module,命名为connector-{连接器名}. + +2.pom文件可以参考已有连接器的pom文件,并在父model的pom文件中添加当前子model. + +3.新建两个package分别对应source和sink + +​ package org.apache.seatunnel.connectors.seatunnel.{连接器名}.source + +​ package org.apache.seatunnel.connectors.seatunnel.{连接器名}.sink + +4.将连接器信息添加到在项目根目录的plugin-mapping.properties文件中. + +5.将连接器添加到seatunnel-dist/pom.xml,这样连接器jar就可以在二进制包中找到. ### 启动类 From d9519d696abf2513875ac65bd8feb527264dd19d Mon Sep 17 00:00:00 2001 From: Zongwen Li Date: Tue, 8 Nov 2022 18:45:26 +0800 Subject: [PATCH 2/4] [improve][connector] The Factory#factoryIdentifier must be consistent with PluginIdentifierInterface#getPluginName (#3328) --- .../connectors/seatunnel/assertion/sink/AssertSinkFactory.java | 2 +- .../seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java | 2 +- .../clickhouse/sink/file/ClickhouseFileSinkFactory.java | 2 +- .../seatunnel/clickhouse/source/ClickhouseSourceFactory.java | 2 +- .../connectors/seatunnel/console/sink/ConsoleSinkFactory.java | 2 +- .../seatunnel/connectors/seatunnel/sink/DataHubSinkFactory.java | 2 +- .../connectors/seatunnel/sink/DingTalkSinkFactory.java | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java index c3c6ad553dc..95e7b4577d9 100644 --- a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java +++ b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java @@ -30,7 +30,7 @@ public class AssertSinkFactory implements TableSinkFactory { @Override public String factoryIdentifier() { - return "AssertSink"; + return "Assert"; } @Override diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java index a804400f94a..01fc05eceee 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java @@ -38,7 +38,7 @@ public class ClickhouseSinkFactory implements TableSinkFactory { @Override public String factoryIdentifier() { - return "ClickhouseSink"; + return "Clickhouse"; } @Override diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java index c08f3948489..2d9ccf220e3 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java @@ -38,7 +38,7 @@ public class ClickhouseFileSinkFactory implements TableSinkFactory { @Override public String factoryIdentifier() { - return "ClickhouseFileSink"; + return "ClickhouseFile"; } @Override diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java index 0f1c8e285f9..d57e5a62d49 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java @@ -33,7 +33,7 @@ public class ClickhouseSourceFactory implements TableSourceFactory { @Override public String factoryIdentifier() { - return "ClickhouseSource"; + return "Clickhouse"; } @Override diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java index 6da69baf793..5363f2395d4 100644 --- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java +++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java @@ -27,7 +27,7 @@ public class ConsoleSinkFactory implements TableSinkFactory { @Override public String factoryIdentifier() { - return "ConsoleSink"; + return "Console"; } @Override diff --git a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubSinkFactory.java b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubSinkFactory.java index ac3d5b01866..41277620c7e 100644 --- a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubSinkFactory.java +++ b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubSinkFactory.java @@ -35,7 +35,7 @@ public class DataHubSinkFactory implements TableSinkFactory { @Override public String factoryIdentifier() { - return "DataHubSink"; + return "DataHub"; } @Override diff --git a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSinkFactory.java b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSinkFactory.java index e72cac30ed1..ca8aa287329 100644 --- a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSinkFactory.java +++ b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSinkFactory.java @@ -30,7 +30,7 @@ public class DingTalkSinkFactory implements TableSinkFactory { @Override public String factoryIdentifier() { - return "DingTalkSink"; + return "DingTalk"; } @Override From 5bfd508552b800edbd08b94dfd8c63b7976bae6b Mon Sep 17 00:00:00 2001 From: ic4y <83933160+ic4y@users.noreply.github.com> Date: Tue, 8 Nov 2022 18:45:39 +0800 Subject: [PATCH 3/4] [feature][st-engine] Add jobHistory (#3191) * [feature][st-engine] add jobHistory * [feature][st-engine] add jobHistory * [feature][st-engine] add jobHistory * [feature][st-engine] add jobHistory * [feature][st-engine] add jobHistory * [feature][st-engine] add jobHistory * [feature][st-engine] add jobHistory * [feature][st-engine] add jobHistory * [bugfix][connector-v2] make jdbc it test single concurrent * [bugfix][connector-v2] make jdbc it test single concurrent * [bugfix][connector-v2] make jdbc it test single concurrent --- .../starter/command/AbstractCommandArgs.java | 21 +-- .../core/starter/utils/FileUtilsTest.java | 17 ++ .../starter/flink/args/FlinkCommandArgs.java | 16 +- .../starter/spark/args/SparkCommandArgs.java | 15 ++ .../seatunnel/args/ClientCommandArgs.java | 30 +++ .../command/ClientExecuteCommand.java | 25 ++- .../src/main/resources/log4j.properties | 22 +++ .../test/resources/junit-platform.properties | 19 ++ .../engine/e2e/ClusterFaultToleranceIT.java | 2 +- .../test/resources/junit-platform.properties | 19 ++ .../test/resources/junit-platform.properties | 19 ++ .../engine/client/SeaTunnelClient.java | 16 ++ .../engine/client/SeaTunnelClientTest.java | 32 ++++ .../seatunnel/engine/common/Constant.java | 2 + .../codec/SeaTunnelGetJobStateCodec.java | 88 +++++++++ .../codec/SeaTunnelListJobStatusCodec.java | 79 ++++++++ .../SeaTunnelEngine.yaml | 38 ++++ .../engine/server/CoordinatorService.java | 23 ++- .../server/master/JobHistoryService.java | 178 ++++++++++++++++++ .../operation/GetJobStateOperation.java | 86 +++++++++ .../operation/ListJobStatusOperation.java | 54 ++++++ .../server/protocol/task/GetJobStateTask.java | 50 +++++ .../protocol/task/ListJobStatusTask.java | 50 +++++ .../SeaTunnelMessageTaskFactoryProvider.java | 6 + .../engine/server/CoordinatorServiceTest.java | 4 +- .../server/master/JobHistoryServiceTest.java | 104 ++++++++++ .../src/test/resources/fake_to_console.conf | 61 ++++++ 27 files changed, 1048 insertions(+), 28 deletions(-) create mode 100644 seatunnel-core/seatunnel-starter/src/main/resources/log4j.properties create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/junit-platform.properties create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/junit-platform.properties create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/junit-platform.properties create mode 100644 seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobStateCodec.java create mode 100644 seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelListJobStatusCodec.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStateOperation.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobStateTask.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/ListJobStatusTask.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java index 2e4e8d201f6..9889175688d 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java @@ -28,11 +28,6 @@ public abstract class AbstractCommandArgs implements CommandArgs { - @Parameter(names = {"-c", "--config"}, - description = "Config file", - required = true) - private String configFile; - @Parameter(names = {"-i", "--variable"}, description = "variable substitution, such as -i city=beijing, or -i date=20190318") private List variables = Collections.emptyList(); @@ -56,14 +51,6 @@ public abstract class AbstractCommandArgs implements CommandArgs { */ private List originalParameters; - public String getConfigFile() { - return configFile; - } - - public void setConfigFile(String configFile) { - this.configFile = configFile; - } - public List getVariables() { return variables; } @@ -112,4 +99,12 @@ public DeployMode getDeployMode() { throw new UnsupportedOperationException("abstract class CommandArgs not support this method"); } + public String getConfigFile() { + throw new UnsupportedOperationException("abstract class CommandArgs not support this method"); + } + + public void setConfigFile(String configFile) { + throw new UnsupportedOperationException("abstract class CommandArgs not support this method"); + } + } diff --git a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java index 8dc84e15946..758fb6f249a 100644 --- a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java +++ b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/FileUtilsTest.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.core.starter.command.AbstractCommandArgs; +import com.beust.jcommander.Parameter; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -44,6 +45,12 @@ public void getConfigPath() throws URISyntaxException { } private static class SparkCommandArgs extends AbstractCommandArgs { + + @Parameter(names = {"-c", "--config"}, + description = "Config file", + required = true) + private String configFile; + private DeployMode deployMode; public void setDeployMode(DeployMode deployMode) { @@ -53,5 +60,15 @@ public void setDeployMode(DeployMode deployMode) { public DeployMode getDeployMode() { return deployMode; } + + @Override + public String getConfigFile() { + return this.configFile; + } + + @Override + public void setConfigFile(String configFile) { + this.configFile = configFile; + } } } diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java index 4c98ff532c2..f8b6f5dd001 100644 --- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java +++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java @@ -27,6 +27,11 @@ public class FlinkCommandArgs extends AbstractCommandArgs { + @Parameter(names = {"-c", "--config"}, + description = "Config file", + required = true) + private String configFile; + @Parameter(names = {"-r", "--run-mode"}, converter = RunModeConverter.class, description = "job run mode, run or run-application") @@ -50,6 +55,16 @@ public void setRunMode(FlinkRunMode runMode) { this.runMode = runMode; } + @Override + public String getConfigFile() { + return this.configFile; + } + + @Override + public void setConfigFile(String configFile) { + this.configFile = configFile; + } + /** * Used to convert the run mode string to the enum value. */ @@ -70,5 +85,4 @@ public FlinkRunMode convert(String value) { throw new IllegalArgumentException(String.format("Run mode %s not supported", value)); } } - } diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java index eb5ef929d5e..a5c54eeb184 100644 --- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java +++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java @@ -26,6 +26,11 @@ public class SparkCommandArgs extends AbstractCommandArgs { + @Parameter(names = {"-c", "--config"}, + description = "Config file", + required = true) + private String configFile; + @Parameter(names = {"-e", "--deploy-mode"}, description = "Spark deploy mode", required = true, @@ -59,4 +64,14 @@ public void setMaster(String master) { this.master = master; } + @Override + public String getConfigFile() { + return this.configFile; + } + + @Override + public void setConfigFile(String configFile) { + this.configFile = configFile; + } + } diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java index cc79f1ffaf6..33dd11d4b18 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java @@ -37,10 +37,22 @@ public class ClientCommandArgs extends AbstractCommandArgs { converter = ExecutionModeConverter.class) private ExecutionMode executionMode = ExecutionMode.CLUSTER; + @Parameter(names = {"-c", "--config"}, + description = "Config file") + private String configFile; + @Parameter(names = {"-cn", "--cluster"}, description = "The name of cluster") private String clusterName = "seatunnel_default_cluster"; + @Parameter(names = {"-j", "--job-id"}, + description = "Get job status by JobId") + private String jobId; + + @Parameter(names = {"-l", "--list"}, + description = "list job status") + private boolean listJob = false; + public String getClusterName() { return clusterName; } @@ -65,6 +77,14 @@ public void setExecutionMode(ExecutionMode executionMode) { this.executionMode = executionMode; } + public String getJobId() { + return jobId; + } + + public boolean isListJob(){ + return listJob; + } + @Override public EngineType getEngineType() { return EngineType.SEATUNNEL; @@ -74,4 +94,14 @@ public EngineType getEngineType() { public DeployMode getDeployMode() { return DeployMode.CLIENT; } + + @Override + public String getConfigFile() { + return this.configFile; + } + + @Override + public void setConfigFile(String configFile) { + this.configFile = configFile; + } } diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index 0d65183499c..9c21d6ee4f9 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -33,6 +33,7 @@ import com.hazelcast.client.config.ClientConfig; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.instance.impl.HazelcastInstanceFactory; +import lombok.extern.slf4j.Slf4j; import java.nio.file.Path; import java.util.Random; @@ -41,6 +42,7 @@ /** * This command is used to execute the SeaTunnel engine job by SeaTunnel API. */ +@Slf4j public class ClientExecuteCommand implements Command { private final ClientCommandArgs clientCommandArgs; @@ -49,12 +51,9 @@ public ClientExecuteCommand(ClientCommandArgs clientCommandArgs) { this.clientCommandArgs = clientCommandArgs; } + @SuppressWarnings("checkstyle:RegexpSingleline") @Override public void execute() throws CommandExecuteException { - Path configFile = FileUtils.getConfigPath(clientCommandArgs); - - JobConfig jobConfig = new JobConfig(); - jobConfig.setName(clientCommandArgs.getJobName()); HazelcastInstance instance = null; SeaTunnelClient engineClient = null; try { @@ -66,10 +65,22 @@ public void execute() throws CommandExecuteException { ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(clusterName); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig); + if (clientCommandArgs.isListJob()) { + String jobstatus = engineClient.listJobStatus(); + System.out.println(jobstatus); + } else if (null != clientCommandArgs.getJobId()) { + String jobState = engineClient.getJobState(Long.parseLong(clientCommandArgs.getJobId())); + System.out.println(jobState); + } else { + Path configFile = FileUtils.getConfigPath(clientCommandArgs); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(clientCommandArgs.getJobName()); + JobExecutionEnvironment jobExecutionEnv = + engineClient.createExecutionContext(configFile.toString(), jobConfig); - ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); - clientJobProxy.waitForJobComplete(); + ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + clientJobProxy.waitForJobComplete(); + } } catch (ExecutionException | InterruptedException e) { throw new CommandExecuteException("SeaTunnel job executed failed", e); } finally { diff --git a/seatunnel-core/seatunnel-starter/src/main/resources/log4j.properties b/seatunnel-core/seatunnel-starter/src/main/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-core/seatunnel-starter/src/main/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/junit-platform.properties b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/junit-platform.properties new file mode 100644 index 00000000000..1b9e4750c6d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/junit-platform.properties @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +junit.jupiter.execution.parallel.mode.default = same_thread +junit.jupiter.execution.parallel.mode.classes.default = same_thread diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java index 5cdb22770d1..afd571e667c 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java @@ -392,7 +392,7 @@ public void testStreamJobRestoreIn3NodeWorkerDown() throws ExecutionException, I // shutdown on worker node node2.shutdown(); - Awaitility.await().atMost(180000, TimeUnit.MILLISECONDS) + Awaitility.await().atMost(360000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { // Wait job write all rows in file Thread.sleep(2000); diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/junit-platform.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/junit-platform.properties new file mode 100644 index 00000000000..1b9e4750c6d --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/junit-platform.properties @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +junit.jupiter.execution.parallel.mode.default = same_thread +junit.jupiter.execution.parallel.mode.classes.default = same_thread diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/junit-platform.properties b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/junit-platform.properties new file mode 100644 index 00000000000..1b9e4750c6d --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/junit-platform.properties @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +junit.jupiter.execution.parallel.mode.default = same_thread +junit.jupiter.execution.parallel.mode.classes.default = same_thread diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java index 0f5d8fa4ea3..bd8d2dfe874 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.engine.client.job.JobClient; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStateCodec; +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec; import com.hazelcast.client.config.ClientConfig; @@ -64,4 +66,18 @@ public void shutdown() { hazelcastClient.shutdown(); } } + + public String getJobState(Long jobId){ + return hazelcastClient.requestOnMasterAndDecodeResponse( + SeaTunnelGetJobStateCodec.encodeRequest(jobId), + SeaTunnelGetJobStateCodec::decodeResponse + ); + } + + public String listJobStatus(){ + return hazelcastClient.requestOnMasterAndDecodeResponse( + SeaTunnelListJobStatusCodec.encodeRequest(), + SeaTunnelListJobStatusCodec::decodeResponse + ); + } } diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index 906edf640f3..c5dce270e17 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -96,6 +96,38 @@ public void testExecuteJob() { } } + @Test + public void testGetJobState() { + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("/client_test.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName("fake_to_console"); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest")); + SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); + JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig); + + try { + final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + CompletableFuture objectCompletableFuture = CompletableFuture.supplyAsync(() -> { + return clientJobProxy.waitForJobComplete(); + }); + long jobId = clientJobProxy.getJobId(); + + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue( + engineClient.getJobState(jobId).contains("RUNNING") && engineClient.listJobStatus().contains("RUNNING"))); + + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue( + engineClient.getJobState(jobId).contains("FINISHED") && engineClient.listJobStatus().contains("FINISHED"))); + + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + @AfterAll public static void after() { INSTANCE.shutdown(); diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java index f7534884e1f..a7acbe87614 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java @@ -44,6 +44,8 @@ public class Constant { public static final String IMAP_RUNNING_JOB_STATE = "runningJobState"; + public static final String IMAP_FINISHED_JOB_STATE = "finishedJobState"; + public static final String IMAP_STATE_TIMESTAMPS = "stateTimestamps"; public static final String IMAP_OWNED_SLOT_PROFILES = "ownedSlotProfilesIMap"; diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobStateCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobStateCodec.java new file mode 100644 index 00000000000..9b225608edb --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobStateCodec.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.core.protocol.codec; + +import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.client.impl.protocol.Generated; +import com.hazelcast.client.impl.protocol.codec.builtin.*; +import com.hazelcast.client.impl.protocol.codec.custom.*; + +import static com.hazelcast.client.impl.protocol.ClientMessage.*; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.*; + +/* + * This file is auto-generated by the Hazelcast Client Protocol Code Generator. + * To change this file, edit the templates or the protocol + * definitions on the https://github.com/hazelcast/hazelcast-client-protocol + * and regenerate it. + */ + +/** + */ +@Generated("56079ba8d58afe5c98dfe2b5dc6c301a") +public final class SeaTunnelGetJobStateCodec { + //hex: 0xDE0600 + public static final int REQUEST_MESSAGE_TYPE = 14550528; + //hex: 0xDE0601 + public static final int RESPONSE_MESSAGE_TYPE = 14550529; + private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES; + private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES; + private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES; + + private SeaTunnelGetJobStateCodec() { + } + + public static ClientMessage encodeRequest(long jobId) { + ClientMessage clientMessage = ClientMessage.createForEncode(); + clientMessage.setRetryable(true); + clientMessage.setOperationName("SeaTunnel.GetJobState"); + Frame initialFrame = new Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE); + encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE); + encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1); + encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId); + clientMessage.add(initialFrame); + return clientMessage; + } + + /** + */ + public static long decodeRequest(ClientMessage clientMessage) { + ForwardFrameIterator iterator = clientMessage.frameIterator(); + Frame initialFrame = iterator.next(); + return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET); + } + + public static ClientMessage encodeResponse(String response) { + ClientMessage clientMessage = ClientMessage.createForEncode(); + Frame initialFrame = new Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE); + encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE); + clientMessage.add(initialFrame); + + StringCodec.encode(clientMessage, response); + return clientMessage; + } + + /** + */ + public static String decodeResponse(ClientMessage clientMessage) { + ForwardFrameIterator iterator = clientMessage.frameIterator(); + //empty initial frame + iterator.next(); + return StringCodec.decode(iterator); + } +} diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelListJobStatusCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelListJobStatusCodec.java new file mode 100644 index 00000000000..e497281df1f --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelListJobStatusCodec.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.core.protocol.codec; + +import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.client.impl.protocol.Generated; +import com.hazelcast.client.impl.protocol.codec.builtin.*; +import com.hazelcast.client.impl.protocol.codec.custom.*; + + +import static com.hazelcast.client.impl.protocol.ClientMessage.*; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.*; + +/* + * This file is auto-generated by the Hazelcast Client Protocol Code Generator. + * To change this file, edit the templates or the protocol + * definitions on the https://github.com/hazelcast/hazelcast-client-protocol + * and regenerate it. + */ + +/** + */ +@Generated("ee7ee4fc67d26f72ccdf418fcb868148") +public final class SeaTunnelListJobStatusCodec { + //hex: 0xDE0700 + public static final int REQUEST_MESSAGE_TYPE = 14550784; + //hex: 0xDE0701 + public static final int RESPONSE_MESSAGE_TYPE = 14550785; + private static final int REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES; + private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES; + + private SeaTunnelListJobStatusCodec() { + } + + public static ClientMessage encodeRequest() { + ClientMessage clientMessage = ClientMessage.createForEncode(); + clientMessage.setRetryable(true); + clientMessage.setOperationName("SeaTunnel.ListJobStatus"); + Frame initialFrame = new Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE); + encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE); + encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1); + clientMessage.add(initialFrame); + return clientMessage; + } + + public static ClientMessage encodeResponse(String response) { + ClientMessage clientMessage = ClientMessage.createForEncode(); + Frame initialFrame = new Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE); + encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE); + clientMessage.add(initialFrame); + + StringCodec.encode(clientMessage, response); + return clientMessage; + } + + /** + */ + public static String decodeResponse(ClientMessage clientMessage) { + ForwardFrameIterator iterator = clientMessage.frameIterator(); + //empty initial frame + iterator.next(); + return StringCodec.decode(iterator); + } +} diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml index 39cb3c386c9..6d599a74707 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml +++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml @@ -116,3 +116,41 @@ methods: nullable: false since: 2.0 doc: '' + + - id: 6 + name: getJobState + since: 2.0 + doc: '' + request: + retryable: true + partitionIdentifier: -1 + params: + - name: jobId + type: long + nullable: false + since: 2.0 + doc: '' + response: + params: + - name: response + type: String + nullable: false + since: 2.0 + doc: '' + + - id: 7 + name: listJobStatus + since: 2.0 + doc: '' + request: + retryable: true + partitionIdentifier: -1 + params: [] + response: + params: + - name: response + type: String + nullable: false + since: 2.0 + doc: '' + diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 51793a8f96c..043d98a347f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -32,6 +32,7 @@ import org.apache.seatunnel.engine.server.execution.ExecutionState; import org.apache.seatunnel.engine.server.execution.TaskExecutionState; import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; +import org.apache.seatunnel.engine.server.master.JobHistoryService; import org.apache.seatunnel.engine.server.master.JobMaster; import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager; import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory; @@ -63,6 +64,8 @@ public class CoordinatorService { private volatile ResourceManager resourceManager; + private JobHistoryService jobHistoryService; + /** * IMap key is jobId and value is {@link RunningJobInfo}. * Tuple2 key is JobMaster init timestamp and value is the jobImmutableInformation which is sent by client when submit job @@ -135,6 +138,10 @@ public CoordinatorService(@NonNull NodeEngineImpl nodeEngine, @NonNull SeaTunnel masterActiveListener.scheduleAtFixedRate(this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS); } + public JobHistoryService getJobHistoryService() { + return jobHistoryService; + } + public JobMaster getJobMaster(Long jobId) { return runningJobMasterMap.get(jobId); } @@ -158,6 +165,13 @@ private void initCoordinatorService() { runningJobStateTimestampsIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS); ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES); + jobHistoryService = new JobHistoryService( + runningJobStateIMap, + logger, + runningJobMasterMap, + nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE) + ); + List> collect = runningJobInfoIMap.entrySet().stream().map(entry -> { return CompletableFuture.runAsync(() -> { logger.info(String.format("begin restore job (%s) from master active switch", entry.getKey())); @@ -243,6 +257,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull Runn jobMaster.run(); } finally { // storage job state info to HistoryStorage + jobHistoryService.storeFinishedJobState(jobMaster); removeJobIMap(jobMaster); runningJobMasterMap.remove(jobId); } @@ -332,6 +347,7 @@ public PassiveCompletableFuture submitJob(long jobId, Data jobImmutableInf jobMaster.run(); } finally { // storage job state info to HistoryStorage + jobHistoryService.storeFinishedJobState(jobMaster); removeJobIMap(jobMaster); runningJobMasterMap.remove(jobId); } @@ -365,9 +381,9 @@ private void removeJobIMap(JobMaster jobMaster) { public PassiveCompletableFuture waitForJobComplete(long jobId) { JobMaster runningJobMaster = runningJobMasterMap.get(jobId); if (runningJobMaster == null) { - // TODO Get Job Status from JobHistoryStorage + JobStatus jobStatus = jobHistoryService.getJobStatus(jobId).getJobStatus(); CompletableFuture future = new CompletableFuture<>(); - future.complete(JobStatus.FINISHED); + future.complete(jobStatus); return new PassiveCompletableFuture<>(future); } else { return runningJobMaster.getJobMasterCompleteFuture(); @@ -391,8 +407,7 @@ public PassiveCompletableFuture cancelJob(long jodId) { public JobStatus getJobStatus(long jobId) { JobMaster runningJobMaster = runningJobMasterMap.get(jobId); if (runningJobMaster == null) { - // TODO Get Job Status from JobHistoryStorage - return JobStatus.FINISHED; + return jobHistoryService.getJobStatus(jobId).getJobStatus(); } return runningJobMaster.getJobStatus(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java new file mode 100644 index 00000000000..4d309be38c8 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.master; + +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.core.job.PipelineStatus; +import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation; +import org.apache.seatunnel.engine.server.execution.ExecutionState; +import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.hazelcast.logging.ILogger; +import com.hazelcast.map.IMap; +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +public class JobHistoryService { + /** + * IMap key is one of jobId {@link org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and + * {@link org.apache.seatunnel.engine.server.execution.TaskGroupLocation} + *

+ * The value of IMap is one of {@link JobStatus} {@link PipelineStatus} + * {@link org.apache.seatunnel.engine.server.execution.ExecutionState} + *

+ * This IMap is used to recovery runningJobStateIMap in JobMaster when a new master node active + */ + private final IMap runningJobStateIMap; + + private final ILogger logger; + + /** + * key: job id; + *
value: job master; + */ + private final Map runningJobMasterMap; + + /** + * finishedJobStateImap key is jobId and value is jobState(json) + * JobStateData Indicates the status of the job, pipeline, and task + */ + //TODO need to limit the amount of storage + private final IMap finishedJobStateImap; + + private final ObjectMapper objectMapper; + + public JobHistoryService( + IMap runningJobStateIMap, + ILogger logger, + Map runningJobMasterMap, + IMap finishedJobStateImap + ) { + this.runningJobStateIMap = runningJobStateIMap; + this.logger = logger; + this.runningJobMasterMap = runningJobMasterMap; + this.finishedJobStateImap = finishedJobStateImap; + this.objectMapper = new ObjectMapper(); + this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + } + + // Gets the status of a running and completed job + public String listAllJob() { + ObjectNode objectNode = objectMapper.createObjectNode(); + ArrayNode jobs = objectNode.putArray("jobs"); + + Stream.concat(runningJobMasterMap.values().stream().map(this::toJobStateMapper), + finishedJobStateImap.values().stream()) + .forEach(jobStateData -> { + JobStatusData jobStatusData = new JobStatusData(jobStateData.jobId, jobStateData.jobStatus); + JsonNode jsonNode = objectMapper.valueToTree(jobStatusData); + jobs.add(jsonNode); + }); + return jobs.toString(); + } + + // Get detailed status of a single job + public JobStateData getJobStatus(Long jobId) { + return runningJobMasterMap.containsKey(jobId) ? toJobStateMapper(runningJobMasterMap.get(jobId)) : + finishedJobStateImap.getOrDefault(jobId, null); + } + + // Get detailed status of a single job as json + public String getJobStatusAsString(Long jobId) { + JobStateData jobStatus = getJobStatus(jobId); + if (null != jobStatus) { + try { + return objectMapper.writeValueAsString(jobStatus); + } catch (JsonProcessingException e) { + logger.severe("serialize jobStateMapper err", e); + ObjectNode objectNode = objectMapper.createObjectNode(); + objectNode.put("err", "serialize jobStateMapper err"); + return objectNode.toString(); + } + } + ObjectNode objectNode = objectMapper.createObjectNode(); + objectNode.put("err", String.format("jobId : %s not found", jobId)); + return objectNode.toString(); + } + + @SuppressWarnings("checkstyle:MagicNumber") + public void storeFinishedJobState(JobMaster jobMaster) { + JobStateData jobStateData = toJobStateMapper(jobMaster); + finishedJobStateImap.put(jobStateData.jobId, jobStateData, 14, TimeUnit.DAYS); + } + + private JobStateData toJobStateMapper(JobMaster jobMaster) { + + Long jobId = jobMaster.getJobImmutableInformation().getJobId(); + Map pipelineStateMapperMap = new HashMap<>(); + + jobMaster.getPhysicalPlan().getPipelineList().forEach(pipeline -> { + PipelineLocation pipelineLocation = pipeline.getPipelineLocation(); + PipelineStatus pipelineState = (PipelineStatus) runningJobStateIMap.get(pipelineLocation); + Map taskStateMap = new HashMap<>(); + pipeline.getCoordinatorVertexList().forEach(coordinator -> { + TaskGroupLocation taskGroupLocation = coordinator.getTaskGroupLocation(); + taskStateMap.put(taskGroupLocation, (ExecutionState) runningJobStateIMap.get(taskGroupLocation)); + }); + pipeline.getPhysicalVertexList().forEach(task -> { + TaskGroupLocation taskGroupLocation = task.getTaskGroupLocation(); + taskStateMap.put(taskGroupLocation, (ExecutionState) runningJobStateIMap.get(taskGroupLocation)); + }); + + PipelineStateData pipelineStateData = new PipelineStateData(pipelineState, taskStateMap); + pipelineStateMapperMap.put(pipelineLocation, pipelineStateData); + }); + JobStatus jobStatus = (JobStatus) runningJobStateIMap.get(jobId); + + return new JobStateData(jobId, jobStatus, pipelineStateMapperMap); + } + + @AllArgsConstructor + @Data + public static final class JobStatusData implements Serializable { + Long jobId; + JobStatus jobStatus; + } + + @AllArgsConstructor + @Data + public static final class JobStateData implements Serializable{ + Long jobId; + JobStatus jobStatus; + Map pipelineStateMapperMap; + } + + @AllArgsConstructor + @Data + public static final class PipelineStateData implements Serializable{ + PipelineStatus pipelineStatus; + Map executionStateMap; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStateOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStateOperation.java new file mode 100644 index 00000000000..6a7f39ae2a8 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStateOperation.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.operation; + +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.IdentifiedDataSerializable; +import com.hazelcast.spi.impl.AllowedDuringPassiveState; +import com.hazelcast.spi.impl.operationservice.Operation; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class GetJobStateOperation extends Operation implements IdentifiedDataSerializable, AllowedDuringPassiveState { + private Long jobId; + + private String response; + + public GetJobStateOperation() { + } + + public GetJobStateOperation(Long jobId) { + this.jobId = jobId; + } + + @Override + public final int getFactoryId() { + return OperationDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return OperationDataSerializerHook.PRINT_MESSAGE_OPERATOR; + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + super.writeInternal(out); + out.writeLong(jobId); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + super.readInternal(in); + jobId = in.readLong(); + } + + @Override + public void run() { + SeaTunnelServer service = getService(); + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + return service.getCoordinatorService().getJobHistoryService().getJobStatusAsString(jobId); + }); + + try { + response = future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new SeaTunnelEngineException(e); + } + } + + @Override + public Object getResponse() { + return response; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java new file mode 100644 index 00000000000..179577d96c4 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/ListJobStatusOperation.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.operation; + +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.server.SeaTunnelServer; + +import com.hazelcast.spi.impl.AllowedDuringPassiveState; +import com.hazelcast.spi.impl.operationservice.Operation; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class ListJobStatusOperation extends Operation implements AllowedDuringPassiveState { + + private String response; + + public ListJobStatusOperation() { + } + + @Override + public void run() { + SeaTunnelServer service = getService(); + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + return service.getCoordinatorService().getJobHistoryService().listAllJob(); + }); + + try { + response = future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new SeaTunnelEngineException(e); + } + } + + @Override + public Object getResponse() { + return response; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobStateTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobStateTask.java new file mode 100644 index 00000000000..7aa3b213025 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobStateTask.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.protocol.task; + +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStateCodec; +import org.apache.seatunnel.engine.server.operation.GetJobStateOperation; + +import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.instance.impl.Node; +import com.hazelcast.internal.nio.Connection; +import com.hazelcast.spi.impl.operationservice.Operation; + +public class GetJobStateTask extends AbstractSeaTunnelMessageTask { + + protected GetJobStateTask(ClientMessage clientMessage, Node node, Connection connection) { + super(clientMessage, node, connection, + SeaTunnelGetJobStateCodec::decodeRequest, + SeaTunnelGetJobStateCodec::encodeResponse); + } + + @Override + protected Operation prepareOperation() { + return new GetJobStateOperation(parameters); + } + + @Override + public String getMethodName() { + return "getJobState"; + } + + @Override + public Object[] getParameters() { + return new Object[0]; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/ListJobStatusTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/ListJobStatusTask.java new file mode 100644 index 00000000000..acbe4fae05c --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/ListJobStatusTask.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.protocol.task; + +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec; +import org.apache.seatunnel.engine.server.operation.ListJobStatusOperation; + +import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.instance.impl.Node; +import com.hazelcast.internal.nio.Connection; +import com.hazelcast.spi.impl.operationservice.Operation; + +public class ListJobStatusTask extends AbstractSeaTunnelMessageTask { + + protected ListJobStatusTask(ClientMessage clientMessage, Node node, Connection connection) { + super(clientMessage, node, connection, + m -> null, + SeaTunnelListJobStatusCodec::encodeResponse); + } + + @Override + protected Operation prepareOperation() { + return new ListJobStatusOperation(); + } + + @Override + public String getMethodName() { + return "listJobStatus"; + } + + @Override + public Object[] getParameters() { + return new Object[0]; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java index 902e42cb638..7910bcd76ed 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java @@ -18,7 +18,9 @@ package org.apache.seatunnel.engine.server.protocol.task; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec; +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStateCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec; +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec; @@ -55,5 +57,9 @@ private void initFactories() { (clientMessage, connection) -> new CancelJobTask(clientMessage, node, connection)); factories.put(SeaTunnelGetJobStatusCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new GetJobStatusTask(clientMessage, node, connection)); + factories.put(SeaTunnelGetJobStateCodec.REQUEST_MESSAGE_TYPE, + (clientMessage, connection) -> new GetJobStateTask(clientMessage, node, connection)); + factories.put(SeaTunnelListJobStatusCodec.REQUEST_MESSAGE_TYPE, + (clientMessage, connection) -> new ListJobStatusTask(clientMessage, node, connection)); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java index 3125b58eacb..ee8aa5214a7 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java @@ -107,7 +107,7 @@ public void testClearCoordinatorService() clearCoordinatorServiceMethod.setAccessible(false); // because runningJobMasterMap is empty and we have no JobHistoryServer, so return finished. - Assertions.assertTrue(JobStatus.FINISHED.equals(coordinatorService.getJobStatus(jobId))); + Assertions.assertTrue(JobStatus.RUNNING.equals(coordinatorService.getJobStatus(jobId))); coordinatorServiceTest.shutdown(); } @@ -165,7 +165,7 @@ public void testJobRestoreWhenMasterNodeSwitch() throws InterruptedException { // because runningJobMasterMap is empty and we have no JobHistoryServer, so return finished. await().atMost(200000, TimeUnit.MILLISECONDS) .untilAsserted( - () -> Assertions.assertEquals(JobStatus.FINISHED, server2.getCoordinatorService().getJobStatus(jobId))); + () -> Assertions.assertEquals(JobStatus.CANCELED, server2.getCoordinatorService().getJobStatus(jobId))); instance2.shutdown(); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java new file mode 100644 index 00000000000..4e9f56f0f6e --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobHistoryServiceTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.master; + +import static org.awaitility.Awaitility.await; + +import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; +import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; +import org.apache.seatunnel.engine.core.job.JobImmutableInformation; +import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest; +import org.apache.seatunnel.engine.server.TestUtils; + +import com.hazelcast.internal.serialization.Data; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +@DisabledOnOs(OS.WINDOWS) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class JobHistoryServiceTest extends AbstractSeaTunnelServerTest { + + private static final Long JOB_1 = 1L; + private static final Long JOB_2 = 2L; + private static final Long JOB_3 = 3L; + + @Test + public void testlistJobState() throws Exception { + startJob(JOB_1, "fake_to_console.conf"); + + // waiting for JOB_1 status turn to RUNNING + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue( + server.getCoordinatorService().getJobHistoryService().listAllJob().contains(String.format("{\"jobId\":%s,\"jobStatus\":\"RUNNING\"}", JOB_1)))); + + // waiting for JOB_1 status turn to FINISHED + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue( + server.getCoordinatorService().getJobHistoryService().listAllJob().contains(String.format("{\"jobId\":%s,\"jobStatus\":\"FINISHED\"}", JOB_1)))); + + startJob(JOB_2, "fake_to_console.conf"); + // waiting for JOB_2 status turn to FINISHED and JOB_2 status turn to RUNNING + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue( + server.getCoordinatorService().getJobHistoryService().listAllJob().contains(String.format("{\"jobId\":%s,\"jobStatus\":\"FINISHED\"}", JOB_1)) + && + server.getCoordinatorService().getJobHistoryService().listAllJob().contains(String.format("{\"jobId\":%s,\"jobStatus\":\"RUNNING\"}", JOB_2)) + )); + } + + @Test + public void testGetJobStatus() throws Exception{ + startJob(JOB_3, "fake_to_console.conf"); + // waiting for JOB_3 status turn to RUNNING + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue( + server.getCoordinatorService().getJobHistoryService().getJobStatusAsString(JOB_3).contains("TaskGroupLocation") + && + server.getCoordinatorService().getJobHistoryService().getJobStatusAsString(JOB_3).contains("RUNNING") + )); + + // waiting for job1 status turn to FINISHED + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue( + server.getCoordinatorService().getJobHistoryService().getJobStatusAsString(JOB_3).contains("TaskGroupLocation") + && + server.getCoordinatorService().getJobHistoryService().getJobStatusAsString(JOB_3).contains("FINISHED") + )); + } + + private void startJob(Long jobid, String path){ + LogicalDag testLogicalDag = + TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid); + + JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(jobid, + nodeEngine.getSerializationService().toData(testLogicalDag), testLogicalDag.getJobConfig(), + Collections.emptyList()); + + Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation); + + PassiveCompletableFuture voidPassiveCompletableFuture = + server.getCoordinatorService().submitJob(jobid, data); + voidPassiveCompletableFuture.join(); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf new file mode 100644 index 00000000000..6b059913175 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" + execution.checkpoint.interval = 5000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + parallelism = 1 + schema = { + fields { + name = "string" + age = "int" + } + } + } + + FakeSource { + result_table_name = "fake" + parallelism = 1 + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +transform { +} + +sink { + console { + source_table_name="fake" + } +} \ No newline at end of file From 630e88479138e54224227e2fa974835a23b4bb31 Mon Sep 17 00:00:00 2001 From: Bibo <33744252+531651225@users.noreply.github.com> Date: Tue, 8 Nov 2022 18:53:18 +0800 Subject: [PATCH 4/4] [Feature][Connector-V2] influxdb sink connector (#3174) * [Feature][Connector-V2] Add influxDB connector sink * fix doc style * remove old e2e for influxdb * fix e2e and License header * add Changelog * delete useless log4j file * mv scheduler to constructor of InfluxDBSinkWriter * remove InfluxDBSinkWriter useless synchronized --- docs/en/connector-v2/sink/InfluxDB.md | 104 +++++++++ plugin-mapping.properties | 1 + .../influxdb/client/InfluxDBClient.java | 15 ++ .../influxdb/config/InfluxDBConfig.java | 36 +-- .../seatunnel/influxdb/config/SinkConfig.java | 97 +++++++++ .../influxdb/config/SourceConfig.java | 67 ++++++ .../influxdb/config/TimePrecision.java | 49 +++++ .../influxdb/serialize/DefaultSerializer.java | 162 ++++++++++++++ .../influxdb/serialize/Serializer.java | 26 +++ .../seatunnel/influxdb/sink/InfluxDBSink.java | 75 +++++++ .../influxdb/sink/InfluxDBSinkWriter.java | 176 +++++++++++++++ .../influxdb/source/InfluxDBSource.java | 23 +- .../source/InfluxDBSourceSplitEnumerator.java | 12 +- .../connector-influxdb-e2e/pom.xml | 5 +- .../influxdb/InfluxDBSourceToAssertIT.java | 122 ----------- .../e2e/connector/influxdb/InfluxdbIT.java | 205 ++++++++++++++++++ .../test/resources/influxdb-to-influxdb.conf | 58 +++++ .../resources/influxdb_source_to_assert.conf | 188 ---------------- 18 files changed, 1056 insertions(+), 365 deletions(-) create mode 100644 docs/en/connector-v2/sink/InfluxDB.md create mode 100644 seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java create mode 100644 seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java create mode 100644 seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java create mode 100644 seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java create mode 100644 seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java create mode 100644 seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java create mode 100644 seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf diff --git a/docs/en/connector-v2/sink/InfluxDB.md b/docs/en/connector-v2/sink/InfluxDB.md new file mode 100644 index 00000000000..cff9787f94f --- /dev/null +++ b/docs/en/connector-v2/sink/InfluxDB.md @@ -0,0 +1,104 @@ +# InfluxDB + +> InfluxDB sink connector + +## Description + +Write data to InfluxDB. + +## Key features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [schema projection](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|-----------------------------|----------|----------|-------------------------------| +| url | string | yes | - | +| database | string | yes | | +| measurement | string | yes | | +| username | string | no | - | +| password | string | no | - | +| key_time | string | yes | processing time | +| key_tags | array | no | exclude `field` & `key_time` | +| batch_size | int | no | 1024 | +| batch_interval_ms | int | no | - | +| max_retries | int | no | - | +| retry_backoff_multiplier_ms | int | no | - | +| connect_timeout_ms | long | no | 15000 | + +### url +the url to connect to influxDB e.g. +``` +http://influxdb-host:8086 +``` + +### database [string] + +The name of `influxDB` database + +### measurement [string] + +The name of `influxDB` measurement + +### username [string] + +`influxDB` user username + +### password [string] + +`influxDB` user password + +### key_time [string] + +Specify field-name of the `influxDB` measurement timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp + +### key_tags [array] + +Specify field-name of the `influxDB` measurement tags in SeaTunnelRow. +If not specified, include all fields with `influxDB` measurement field + +### batch_size [int] + +For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB + +### batch_interval_ms [int] + +For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB + +### max_retries [int] + +The number of retries to flush failed + +### retry_backoff_multiplier_ms [int] + +Using as a multiplier for generating the next delay for backoff + +### max_retry_backoff_ms [int] + +The amount of time to wait before attempting to retry a request to `influxDB` + +### connect_timeout_ms [long] +the timeout for connecting to InfluxDB, in milliseconds + +## Examples +```hocon +sink { + InfluxDB { + url = "http://influxdb-host:8086" + database = "test" + measurement = "sink" + key_time = "time" + key_tags = ["label"] + batch_size = 1 + } +} + +``` + +## Changelog + +### next version + +- Add InfluxDB Sink Connector \ No newline at end of file diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 11d7fd4a92b..1ac1e268725 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -140,3 +140,4 @@ seatunnel.sink.S3File = connector-file-s3 seatunnel.source.Amazondynamodb = connector-amazondynamodb seatunnel.sink.Amazondynamodb = connector-amazondynamodb seatunnel.sink.StarRocks = connector-starrocks +seatunnel.sink.InfluxDB = connector-influxdb diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java index 8743d5aa2c5..3ad3a99d53b 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.client; import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig; import lombok.extern.slf4j.Slf4j; import okhttp3.HttpUrl; @@ -75,4 +76,18 @@ public Response intercept(Chain chain) throws IOException { log.info("connect influxdb successful. sever version :{}.", version); return influxDB; } + + public static void setWriteProperty(InfluxDB influxDB, SinkConfig sinkConfig) { + String rp = sinkConfig.getRp(); + if (!StringUtils.isEmpty(rp)) { + influxDB.setRetentionPolicy(rp); + } + } + + public static InfluxDB getWriteClient(SinkConfig sinkConfig) throws ConnectException { + InfluxDB influxDB = getInfluxDB(sinkConfig); + influxDB.setDatabase(sinkConfig.getDatabase()); + setWriteProperty(getInfluxDB(sinkConfig), sinkConfig); + return influxDB; + } } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java index 9a04e7d4aec..1332c5cab57 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java @@ -23,7 +23,6 @@ import lombok.Data; import java.io.Serializable; -import java.util.List; @Data public class InfluxDBConfig implements Serializable { @@ -33,34 +32,16 @@ public class InfluxDBConfig implements Serializable { public static final String URL = "url"; private static final String CONNECT_TIMEOUT_MS = "connect_timeout_ms"; private static final String QUERY_TIMEOUT_SEC = "query_timeout_sec"; - - public static final String SQL = "sql"; - public static final String SQL_WHERE = "where"; - public static final String DATABASES = "database"; - public static final String SPLIT_COLUMN = "split_column"; - private static final String PARTITION_NUM = "partition_num"; - private static final String UPPER_BOUND = "upper_bound"; - private static final String LOWER_BOUND = "lower_bound"; - - private static final String DEFAULT_FORMAT = "MSGPACK"; - private static final String EPOCH = "epoch"; - - public static final String DEFAULT_PARTITIONS = "0"; + protected static final String EPOCH = "epoch"; private static final int DEFAULT_QUERY_TIMEOUT_SEC = 3; private static final long DEFAULT_CONNECT_TIMEOUT_MS = 15000; - private static final String DEFAULT_EPOCH = "n"; private String url; private String username; private String password; - private String sql; - private int partitionNum = 0; - private String splitKey; - private long lowerBound; - private long upperBound; private String database; private String format = DEFAULT_FORMAT; @@ -69,11 +50,8 @@ public class InfluxDBConfig implements Serializable { private String epoch = DEFAULT_EPOCH; - List columnsIndex; - public InfluxDBConfig(Config config) { this.url = config.getString(URL); - this.sql = config.getString(SQL); if (config.hasPath(USERNAME)) { this.username = config.getString(USERNAME); @@ -81,18 +59,6 @@ public InfluxDBConfig(Config config) { if (config.hasPath(PASSWORD)) { this.password = config.getString(PASSWORD); } - if (config.hasPath(PARTITION_NUM)) { - this.partitionNum = config.getInt(PARTITION_NUM); - } - if (config.hasPath(UPPER_BOUND)) { - this.upperBound = config.getInt(UPPER_BOUND); - } - if (config.hasPath(LOWER_BOUND)) { - this.lowerBound = config.getInt(LOWER_BOUND); - } - if (config.hasPath(SPLIT_COLUMN)) { - this.splitKey = config.getString(SPLIT_COLUMN); - } if (config.hasPath(DATABASES)) { this.database = config.getString(DATABASES); } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java new file mode 100644 index 00000000000..c97d807fabf --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.util.List; + +@Setter +@Getter +@ToString +public class SinkConfig extends InfluxDBConfig{ + public SinkConfig(Config config) { + super(config); + } + + private static final String KEY_TIME = "key_time"; + private static final String KEY_TAGS = "key_tags"; + public static final String KEY_MEASUREMENT = "measurement"; + + private static final String BATCH_SIZE = "batch_size"; + private static final String BATCH_INTERVAL_MS = "batch_interval_ms"; + private static final String MAX_RETRIES = "max_retries"; + private static final String WRITE_TIMEOUT = "write_timeout"; + private static final String RETRY_BACKOFF_MULTIPLIER_MS = "retry_backoff_multiplier_ms"; + private static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms"; + private static final String RETENTION_POLICY = "rp"; + private static final int DEFAULT_BATCH_SIZE = 1024; + private static final int DEFAULT_WRITE_TIMEOUT = 5; + private static final TimePrecision DEFAULT_TIME_PRECISION = TimePrecision.NS; + + private String rp; + private String measurement; + private int writeTimeout = DEFAULT_WRITE_TIMEOUT; + private String keyTime; + private List keyTags; + private int batchSize = DEFAULT_BATCH_SIZE; + private Integer batchIntervalMs; + private int maxRetries; + private int retryBackoffMultiplierMs; + private int maxRetryBackoffMs; + private TimePrecision precision = DEFAULT_TIME_PRECISION; + + public static SinkConfig loadConfig(Config config) { + SinkConfig sinkConfig = new SinkConfig(config); + + if (config.hasPath(KEY_TIME)) { + sinkConfig.setKeyTime(config.getString(KEY_TIME)); + } + if (config.hasPath(KEY_TAGS)) { + sinkConfig.setKeyTags(config.getStringList(KEY_TAGS)); + } + if (config.hasPath(BATCH_INTERVAL_MS)) { + sinkConfig.setBatchIntervalMs(config.getInt(BATCH_INTERVAL_MS)); + } + if (config.hasPath(MAX_RETRIES)) { + sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES)); + } + if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) { + sinkConfig.setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS)); + } + if (config.hasPath(MAX_RETRY_BACKOFF_MS)) { + sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS)); + } + if (config.hasPath(WRITE_TIMEOUT)) { + sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT)); + } + if (config.hasPath(RETENTION_POLICY)) { + sinkConfig.setRp(config.getString(RETENTION_POLICY)); + } + if (config.hasPath(EPOCH)) { + sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH))); + } + sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT)); + return sinkConfig; + } + +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java new file mode 100644 index 00000000000..d0c3fe65e2e --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.Getter; + +import java.util.List; + +@Getter +public class SourceConfig extends InfluxDBConfig{ + public static final String SQL = "sql"; + public static final String SQL_WHERE = "where"; + public static final String SPLIT_COLUMN = "split_column"; + private static final String PARTITION_NUM = "partition_num"; + private static final String UPPER_BOUND = "upper_bound"; + private static final String LOWER_BOUND = "lower_bound"; + public static final String DEFAULT_PARTITIONS = "0"; + private String sql; + private int partitionNum = 0; + private String splitKey; + private long lowerBound; + private long upperBound; + + List columnsIndex; + + public SourceConfig(Config config) { + super(config); + } + + public static SourceConfig loadConfig(Config config) { + SourceConfig sourceConfig = new SourceConfig(config); + + sourceConfig.sql = config.getString(SQL); + + if (config.hasPath(PARTITION_NUM)) { + sourceConfig.partitionNum = config.getInt(PARTITION_NUM); + } + if (config.hasPath(UPPER_BOUND)) { + sourceConfig.upperBound = config.getInt(UPPER_BOUND); + } + if (config.hasPath(LOWER_BOUND)) { + sourceConfig.lowerBound = config.getInt(LOWER_BOUND); + } + if (config.hasPath(SPLIT_COLUMN)) { + sourceConfig.splitKey = config.getString(SPLIT_COLUMN); + } + return sourceConfig; + } + +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java new file mode 100644 index 00000000000..18af2cdd6e0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.config; + +import java.util.concurrent.TimeUnit; + +public enum TimePrecision { + NS("NS", TimeUnit.NANOSECONDS), + U("U", TimeUnit.MICROSECONDS), + MS("MS", TimeUnit.MILLISECONDS), + S("S", TimeUnit.SECONDS), + M("M", TimeUnit.MINUTES), + H("H", TimeUnit.HOURS); + private String desc; + private TimeUnit precision; + + TimePrecision(String desc, TimeUnit precision) { + this.desc = desc; + this.precision = precision; + } + + public TimeUnit getTimeUnit() { + return this.precision; + } + + public static TimePrecision getPrecision(String desc) { + for (TimePrecision timePrecision : TimePrecision.values()) { + if (desc.equals(timePrecision.desc)) { + return timePrecision; + } + } + return TimePrecision.NS; + } +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java new file mode 100644 index 00000000000..8cc458939de --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import com.google.common.base.Strings; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.influxdb.dto.Point; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class DefaultSerializer implements Serializer { + private SeaTunnelRowType seaTunnelRowType; + + private final BiConsumer timestampExtractor; + private final BiConsumer fieldExtractor; + private final BiConsumer tagExtractor; + private String measurement; + + private TimeUnit precision; + + public DefaultSerializer(SeaTunnelRowType seaTunnelRowType, TimeUnit precision, List tagKeys, + String timestampKey, + String measurement) { + this.measurement = measurement; + this.seaTunnelRowType = seaTunnelRowType; + this.timestampExtractor = createTimestampExtractor(seaTunnelRowType, timestampKey); + this.tagExtractor = createTagExtractor(seaTunnelRowType, tagKeys); + List fieldKeys = getFieldKeys(seaTunnelRowType, timestampKey, tagKeys); + this.fieldExtractor = createFieldExtractor(seaTunnelRowType, fieldKeys); + this.precision = precision; + } + + @Override + public Point serialize(SeaTunnelRow seaTunnelRow) { + Point.Builder builder = Point.measurement(measurement); + timestampExtractor.accept(seaTunnelRow, builder); + tagExtractor.accept(seaTunnelRow, builder); + fieldExtractor.accept(seaTunnelRow, builder); + return builder.build(); + } + + private BiConsumer createFieldExtractor(SeaTunnelRowType seaTunnelRowType, List fieldKeys) { + return (row, builder) -> { + for (int i = 0; i < fieldKeys.size(); i++) { + String field = fieldKeys.get(i); + int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(field); + SeaTunnelDataType dataType = seaTunnelRowType.getFieldType(indexOfSeaTunnelRow); + Object val = row.getField(indexOfSeaTunnelRow); + switch (dataType.getSqlType()) { + case BOOLEAN: + builder.addField(field, Boolean.valueOf((Boolean) val)); + break; + case SMALLINT: + builder.addField(field, Short.valueOf((Short) val)); + break; + case INT: + builder.addField(field, ((Number) val).intValue()); + break; + case BIGINT: + // Only timstamp support be bigint,however it is processed in specicalField + builder.addField(field, ((Number) val).longValue()); + break; + case FLOAT: + builder.addField(field, ((Number) val).floatValue()); + break; + case DOUBLE: + builder.addField(field, ((Number) val).doubleValue()); + break; + case STRING: + builder.addField(field, val.toString()); + break; + default: + throw new UnsupportedOperationException("Unsupported dataType: " + dataType); + } + } + }; + } + + private BiConsumer createTimestampExtractor(SeaTunnelRowType seaTunnelRowType, + String timeKey) { + //not config timeKey, use processing time + if (Strings.isNullOrEmpty(timeKey)) { + return (row, builder) -> builder.time(System.currentTimeMillis(), precision); + } + + int timeFieldIndex = seaTunnelRowType.indexOf(timeKey); + return (row, builder) -> { + Object time = row.getField(timeFieldIndex); + if (time == null) { + builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + SeaTunnelDataType timestampFieldType = seaTunnelRowType.getFieldType(timeFieldIndex); + switch (timestampFieldType.getSqlType()) { + case STRING: + builder.time(Long.parseLong((String) time), precision); + break; + case TIMESTAMP: + builder.time(LocalDateTime.class.cast(time) + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(), precision); + break; + case BIGINT: + builder.time((Long) time, precision); + break; + default: + throw new UnsupportedOperationException("Unsupported data type: " + timestampFieldType); + } + }; + } + + private BiConsumer createTagExtractor(SeaTunnelRowType seaTunnelRowType, + List tagKeys) { + //not config tagKeys + if (CollectionUtils.isEmpty(tagKeys)) { + return (row, builder) -> {}; + } + + return (row, builder) -> { + for (int i = 0; i < tagKeys.size(); i++) { + String tagKey = tagKeys.get(i); + int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(tagKey); + builder.tag(tagKey, row.getField(indexOfSeaTunnelRow).toString()); + } + }; + } + + private List getFieldKeys(SeaTunnelRowType seaTunnelRowType, + String timestampKey, + List tagKeys) { + return Stream.of(seaTunnelRowType.getFieldNames()) + .filter(name -> CollectionUtils.isEmpty(tagKeys) || !tagKeys.contains(name)) + .filter(name -> StringUtils.isEmpty(timestampKey) || !name.equals(timestampKey)) + .collect(Collectors.toList()); + } +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java new file mode 100644 index 00000000000..b910efafd40 --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.influxdb.dto.Point; + +public interface Serializer { + Point serialize(SeaTunnelRow seaTunnelRow); +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java new file mode 100644 index 00000000000..8d3eb7290ff --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.sink; + +import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL; +import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + +@AutoService(SeaTunnelSink.class) +public class InfluxDBSink extends AbstractSimpleSink { + + private Config pluginConfig; + private SeaTunnelRowType seaTunnelRowType; + + @Override + public String getPluginName() { + return "InfluxDB"; + } + + @Override + public void prepare(Config config) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(config, URL, KEY_MEASUREMENT); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } + this.pluginConfig = config; + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType); + } +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java new file mode 100644 index 00000000000..809a3eaaa88 --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig; +import org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.DefaultSerializer; +import org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.Serializer; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.influxdb.InfluxDB; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; + +import java.io.IOException; +import java.net.ConnectException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class InfluxDBSinkWriter extends AbstractSinkWriter { + + private final Serializer serializer; + private InfluxDB influxDB; + private SinkConfig sinkConfig; + private final List batchList; + private ScheduledExecutorService scheduler; + private ScheduledFuture scheduledFuture; + private volatile Exception flushException; + private final Integer batchIntervalMs; + + public InfluxDBSinkWriter(Config pluginConfig, + SeaTunnelRowType seaTunnelRowType) throws ConnectException { + this.sinkConfig = SinkConfig.loadConfig(pluginConfig); + this.batchIntervalMs = sinkConfig.getBatchIntervalMs(); + this.serializer = new DefaultSerializer( + seaTunnelRowType, sinkConfig.getPrecision().getTimeUnit(), sinkConfig.getKeyTags(), sinkConfig.getKeyTime(), sinkConfig.getMeasurement()); + this.batchList = new ArrayList<>(); + + if (batchIntervalMs != null) { + scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("influxDB-sink-output-%s").build()); + scheduledFuture = scheduler.scheduleAtFixedRate( + () -> { + try { + flush(); + } catch (IOException e) { + flushException = e; + } + }, + batchIntervalMs, + batchIntervalMs, + TimeUnit.MILLISECONDS); + } + + connect(); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + Point record = serializer.serialize(element); + write(record); + } + + @SneakyThrows + @Override + public Optional prepareCommit() { + // Flush to storage before snapshot state is performed + flush(); + return super.prepareCommit(); + } + + @Override + public void close() throws IOException { + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + scheduler.shutdown(); + } + + flush(); + + if (influxDB != null) { + influxDB.close(); + influxDB = null; + } + } + + public void write(Point record) throws IOException { + checkFlushException(); + + batchList.add(record); + if (sinkConfig.getBatchSize() > 0 + && batchList.size() >= sinkConfig.getBatchSize()) { + flush(); + } + } + + public void flush() throws IOException { + checkFlushException(); + if (batchList.isEmpty()) { + return; + } + BatchPoints.Builder batchPoints = BatchPoints.database(sinkConfig.getDatabase()); + for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) { + try { + batchPoints.points(batchList); + influxDB.write(batchPoints.build()); + } catch (Exception e) { + log.error("Writing records to influxdb failed, retry times = {}", i, e); + if (i >= sinkConfig.getMaxRetries()) { + throw new IOException("Writing records to InfluxDB failed.", e); + } + + try { + long backoff = Math.min(sinkConfig.getRetryBackoffMultiplierMs() * i, + sinkConfig.getMaxRetryBackoffMs()); + Thread.sleep(backoff); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException( + "Unable to flush; interrupted while doing another attempt.", e); + } + } + } + + batchList.clear(); + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing records to InfluxDB failed.", flushException); + } + } + + public void connect() throws ConnectException { + if (influxDB == null) { + influxDB = InfluxDBClient.getWriteClient(sinkConfig); + String version = influxDB.version(); + if (!influxDB.ping().isGood()) { + String errorMessage = + String.format( + "connect influxdb failed, due to influxdb version info is unknown, the url is: {%s}", + sinkConfig.getUrl()); + throw new ConnectException(errorMessage); + } + log.info("connect influxdb successful. sever version :{}.", version); + } + } +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java index bc971476bda..804e804f560 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java @@ -17,8 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.source; -import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL; -import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL; +import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL; import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.source.Boundedness; @@ -33,7 +32,7 @@ import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; -import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig; import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -53,7 +52,7 @@ @AutoService(SeaTunnelSource.class) public class InfluxDBSource implements SeaTunnelSource { private SeaTunnelRowType typeInfo; - private InfluxDBConfig influxDBConfig; + private SourceConfig sourceConfig; private List columnsIndexList; @@ -66,15 +65,15 @@ public String getPluginName() { @Override public void prepare(Config config) throws PrepareFailException { - CheckResult result = CheckConfigUtil.checkAllExists(config, URL, SQL); + CheckResult result = CheckConfigUtil.checkAllExists(config, SQL); if (!result.isSuccess()) { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); } try { - this.influxDBConfig = new InfluxDBConfig(config); + this.sourceConfig = SourceConfig.loadConfig(config); SeaTunnelSchema seatunnelSchema = SeaTunnelSchema.buildWithConfig(config); this.typeInfo = seatunnelSchema.getSeaTunnelRowType(); - this.columnsIndexList = initColumnsIndex(InfluxDBClient.getInfluxDB(influxDBConfig)); + this.columnsIndexList = initColumnsIndex(InfluxDBClient.getInfluxDB(sourceConfig)); } catch (Exception e) { throw new PrepareFailException("InfluxDB", PluginType.SOURCE, e.toString()); } @@ -92,26 +91,26 @@ public SeaTunnelDataType getProducedType() { @Override public SourceReader createReader(SourceReader.Context readerContext) throws Exception { - return new InfluxdbSourceReader(influxDBConfig, readerContext, typeInfo, columnsIndexList); + return new InfluxdbSourceReader(sourceConfig, readerContext, typeInfo, columnsIndexList); } @Override public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception { - return new InfluxDBSourceSplitEnumerator(enumeratorContext, influxDBConfig); + return new InfluxDBSourceSplitEnumerator(enumeratorContext, sourceConfig); } @Override public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, InfluxDBSourceState checkpointState) throws Exception { - return new InfluxDBSourceSplitEnumerator(enumeratorContext, checkpointState, influxDBConfig); + return new InfluxDBSourceSplitEnumerator(enumeratorContext, checkpointState, sourceConfig); } private List initColumnsIndex(InfluxDB influxDB) { //query one row to get column info - String query = influxDBConfig.getSql() + QUERY_LIMIT; + String query = sourceConfig.getSql() + QUERY_LIMIT; List fieldNames = new ArrayList<>(); try { QueryResult queryResult = influxDB.query( - new Query(query, influxDBConfig.getDatabase())); + new Query(query, sourceConfig.getDatabase())); List serieList = queryResult.getResults().get(0).getSeries(); fieldNames.addAll(serieList.get(0).getColumns()); diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java index d22eba1166e..139a6e3ad5c 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java @@ -17,10 +17,10 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.source; -import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE; +import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL_WHERE; import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig; import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState; import lombok.extern.slf4j.Slf4j; @@ -37,17 +37,17 @@ @Slf4j public class InfluxDBSourceSplitEnumerator implements SourceSplitEnumerator { - final InfluxDBConfig config; + final SourceConfig config; private final Context context; private final Map> pendingSplit; private final Object stateLock = new Object(); private volatile boolean shouldEnumerate; - public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, InfluxDBConfig config) { + public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, SourceConfig config) { this(context, null, config); } - public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, InfluxDBSourceState sourceState, InfluxDBConfig config) { + public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, InfluxDBSourceState sourceState, SourceConfig config) { this.context = context; this.config = config; this.pendingSplit = new HashMap<>(); @@ -113,7 +113,7 @@ private Set getInfluxDBSplit() { Set influxDBSourceSplits = new HashSet<>(); // no need numPartitions, use one partition if (config.getPartitionNum() == 0) { - influxDBSourceSplits.add(new InfluxDBSourceSplit(InfluxDBConfig.DEFAULT_PARTITIONS, sql)); + influxDBSourceSplits.add(new InfluxDBSourceSplit(SourceConfig.DEFAULT_PARTITIONS, sql)); return influxDBSourceSplits; } //calculate numRange base on (lowerBound upperBound partitionNum) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml index a3b93102e55..652782b0631 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml @@ -26,6 +26,7 @@ connector-influxdb-e2e + org.apache.seatunnel connector-influxdb @@ -34,9 +35,9 @@ org.apache.seatunnel - connector-assert + connector-console ${project.version} test - \ No newline at end of file + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java deleted file mode 100644 index d39aa4f395e..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.e2e.connector.influxdb; - -import static org.awaitility.Awaitility.given; - -import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; -import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; -import org.apache.seatunnel.e2e.common.TestResource; -import org.apache.seatunnel.e2e.common.TestSuiteBase; -import org.apache.seatunnel.e2e.common.container.TestContainer; - -import lombok.extern.slf4j.Slf4j; -import org.influxdb.InfluxDB; -import org.influxdb.dto.BatchPoints; -import org.influxdb.dto.Point; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.TestTemplate; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.DockerLoggerFactory; - -import java.io.IOException; -import java.net.ConnectException; -import java.util.Date; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -@Slf4j -public class InfluxDBSourceToAssertIT extends TestSuiteBase implements TestResource { - - private static final String INFLUXDB_DOCKER_IMAGE = "influxdb:1.8"; - private static final String INFLUXDB_CONTAINER_HOST = "influxdb-host"; - private static final int INFLUXDB_CONTAINER_PORT = 8086; - private static final String INFLUXDB_DATABASE = "test"; - private static final String INFLUXDB_MEASUREMENT = "test"; - - private GenericContainer influxDBServer; - private InfluxDB influxDB; - - @BeforeAll - @Override - public void startUp() { - influxDBServer = new GenericContainer<>(INFLUXDB_DOCKER_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases(INFLUXDB_CONTAINER_HOST) - .withExposedPorts(INFLUXDB_CONTAINER_PORT) - .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(INFLUXDB_DOCKER_IMAGE))); - Startables.deepStart(Stream.of(influxDBServer)).join(); - log.info("influxdb container started"); - given().ignoreExceptions() - .await() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(30, TimeUnit.SECONDS) - .untilAsserted(() -> initializeInfluxDBClient()); - batchInsertData(); - } - - @TestTemplate - public void testInfluxDBSource(TestContainer container) throws IOException, InterruptedException { - Container.ExecResult execResult = container.executeJob("/influxdb_source_to_assert.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - } - - private void initializeInfluxDBClient() throws ConnectException { - InfluxDBConfig influxDBConfig = new InfluxDBConfig(String.format("http://%s:%s", influxDBServer.getHost(), influxDBServer.getFirstMappedPort())); - influxDB = InfluxDBClient.getInfluxDB(influxDBConfig); - } - - public void batchInsertData() { - influxDB.createDatabase(INFLUXDB_DATABASE); - BatchPoints batchPoints = BatchPoints - .database(INFLUXDB_DATABASE) - .build(); - for (int i = 0; i < 100; i++) { - Point point = Point.measurement(INFLUXDB_MEASUREMENT) - .time(new Date().getTime(), TimeUnit.NANOSECONDS) - .tag("label", String.format("label_%s", i)) - .addField("f1", String.format("f1_%s", i)) - .addField("f2", Double.valueOf(i + 1)) - .addField("f3", Long.valueOf(i + 2)) - .addField("f4", Float.valueOf(i + 3)) - .addField("f5", Integer.valueOf(i)) - .addField("f6", (short) (i + 4)) - .addField("f7", i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE) - .build(); - batchPoints.point(point); - } - influxDB.write(batchPoints); - } - - @AfterAll - @Override - public void tearDown() { - if (influxDB != null) { - influxDB.close(); - } - if (influxDBServer != null) { - influxDBServer.stop(); - } - } -} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java new file mode 100644 index 00000000000..20cc6dce012 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.influxdb; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import lombok.extern.slf4j.Slf4j; +import org.influxdb.InfluxDB; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.IOException; +import java.net.ConnectException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import scala.Tuple2; + +@Slf4j +public class InfluxdbIT extends TestSuiteBase implements TestResource { + private static final String IMAGE = "influxdb:1.8"; + private static final String HOST = "influxdb-host"; + private static final int PORT = 8086; + private static final String INFLUXDB_DATABASE = "test"; + private static final String INFLUXDB_SOURCE_MEASUREMENT = "source"; + private static final String INFLUXDB_SINK_MEASUREMENT = "sink"; + + + private static final Tuple2> TEST_DATASET = generateTestDataSet(); + + private GenericContainer influxdbContainer; + private String influxDBConnectUrl; + + private InfluxDB influxDB; + + @BeforeAll + @Override + public void startUp() throws Exception { + this.influxdbContainer = new GenericContainer<>(DockerImageName.parse(IMAGE)) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withExposedPorts(PORT) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE))) + .waitingFor(new HostPortWaitStrategy() + .withStartupTimeout(Duration.ofMinutes(2))); + Startables.deepStart(Stream.of(influxdbContainer)).join(); + influxDBConnectUrl = String.format("http://%s:%s", influxdbContainer.getHost(), influxdbContainer.getFirstMappedPort()); + log.info("Influxdb container started"); + this.initializeInfluxDBClient(); + this.initSourceData(); + } + + private void initSourceData() { + influxDB.createDatabase(INFLUXDB_DATABASE); + BatchPoints batchPoints = BatchPoints + .database(INFLUXDB_DATABASE) + .build(); + List rows = TEST_DATASET._2(); + SeaTunnelRowType rowType = TEST_DATASET._1(); + + for (int i = 0; i < rows.size(); i++) { + SeaTunnelRow row = rows.get(i); + Point point = Point.measurement(INFLUXDB_SOURCE_MEASUREMENT) + .time((Long) row.getField(0), TimeUnit.NANOSECONDS) + .tag(rowType.getFieldName(1), (String) row.getField(1)) + .addField(rowType.getFieldName(2), (String) row.getField(2)) + .addField(rowType.getFieldName(3), (Double) row.getField(3)) + .addField(rowType.getFieldName(4), (Long) row.getField(4)) + .addField(rowType.getFieldName(5), (Float) row.getField(5)) + .addField(rowType.getFieldName(6), (Integer) row.getField(6)) + .addField(rowType.getFieldName(7), (Short) row.getField(7)) + .addField(rowType.getFieldName(8), (Boolean) row.getField(8)) + .build(); + batchPoints.point(point); + } + influxDB.write(batchPoints); + } + + private static Tuple2> generateTestDataSet() { + SeaTunnelRowType rowType = new SeaTunnelRowType( + new String[]{ + "time", + "label", + "c_string", + "c_double", + "c_bigint", + "c_float", + "c_int", + "c_smallint", + "c_boolean" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.DOUBLE_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.INT_TYPE, + BasicType.SHORT_TYPE, + BasicType.BOOLEAN_TYPE + } + ); + + List rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + new Date().getTime(), + String.format("label_%s", i), + String.format("f1_%s", i), + Double.parseDouble("1.1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Integer.valueOf(i), + Short.parseShort("1"), + i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE + }); + rows.add(row); + } + return Tuple2.apply(rowType, rows); + } + + @AfterAll + @Override + public void tearDown() throws Exception { + influxDB.close(); + influxdbContainer.stop(); + } + + @TestTemplate + public void testInfluxdb(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/influxdb-to-influxdb.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + String sourceSql = String.format("select * from %s order by time", INFLUXDB_SOURCE_MEASUREMENT); + String sinkSql = String.format("select * from %s order by time", INFLUXDB_SINK_MEASUREMENT); + QueryResult sourceQueryResult = influxDB.query(new Query(sourceSql, INFLUXDB_DATABASE)); + QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql, INFLUXDB_DATABASE)); + //assert data count + Assertions.assertEquals(sourceQueryResult.getResults().size(), sinkQueryResult.getResults().size()); + //assert data values + List> sourceValues = sourceQueryResult.getResults().get(0).getSeries().get(0).getValues(); + List> sinkValues = sinkQueryResult.getResults().get(0).getSeries().get(0).getValues(); + int rowSize = sourceValues.size(); + int colSize = sourceValues.get(0).size(); + + for (int row = 0; row < rowSize; row++) { + for (int col = 0; col < colSize; col++) { + Object sourceColValue = sourceValues.get(row).get(col); + Object sinkColValue = sinkValues.get(row).get(col); + + if (!Objects.deepEquals(sourceColValue, sinkColValue)) { + Assertions.assertEquals(sourceColValue, sinkColValue); + } + } + + } + } + + private void initializeInfluxDBClient() throws ConnectException { + InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl); + influxDB = InfluxDBClient.getInfluxDB(influxDBConfig); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf new file mode 100644 index 00000000000..f95af29a2c6 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + InfluxDB { + url = "http://influxdb-host:8086" + sql = "select label, c_string, c_double, c_bigint, c_float, c_int, c_smallint, c_boolean from source" + database = "test" + upper_bound = 99 + lower_bound = 0 + partition_num = 4 + split_column = "c_int" + fields { + label = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } +} + +transform { +} + +sink { + InfluxDB { + url = "http://influxdb-host:8086" + database = "test" + measurement = "sink" + key_time = "time" + key_tags = ["label"] + batch_size = 1 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf deleted file mode 100644 index ea0e6e17740..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf +++ /dev/null @@ -1,188 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -###### -###### This config file is a demonstration of streaming processing in seatunnel config -###### - -env { - execution.parallelism = 1 - job.mode = "BATCH" - - # You can set spark configuration here - spark.app.name = "SeaTunnel" - spark.executor.instances = 2 - spark.executor.cores = 1 - spark.executor.memory = "1g" - spark.master = local -} - -source { - # This is a example source plugin **only for test and demonstrate the feature source plugin** - InfluxDB { - url = "http://influxdb-host:8086" - sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test" - database = "test" - upper_bound = 99 - lower_bound = 0 - partition_num = 4 - split_column = "f5" - fields { - label = STRING - f1 = STRING - f2 = DOUBLE - f3 = BIGINT - f4 = FLOAT - f5 = INT - f6 = SMALLINT - f7 = BOOLEAN - } - } -} - -sink { - Assert { - rules = - { - row_rules = [ - { - rule_type = MAX_ROW - rule_value = 100 - }, - { - rule_type = MIN_ROW - rule_value = 100 - } - ], - field_rules = [{ - field_name = f1 - field_type = string - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN_LENGTH - rule_value = 4 - }, - { - rule_type = MAX_LENGTH - rule_value = 5 - } - ] - },{ - field_name = f2 - field_type = double - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 1 - }, - { - rule_type = MAX - rule_value = 100 - } - ] - },{ - field_name = f3 - field_type = long - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 2 - }, - { - rule_type = MAX - rule_value = 101 - } - ] - },{ - field_name = f4 - field_type = float - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 3 - }, - { - rule_type = MAX - rule_value = 102 - } - ] - },{ - field_name = f5 - field_type = int - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 0 - }, - { - rule_type = MAX - rule_value = 99 - } - ] - },{ - field_name = f6 - field_type = short - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 4 - }, - { - rule_type = MAX - rule_value = 103 - } - ] - },{ - field_name = f7 - field_type = boolean - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 0 - }, - { - rule_type = MAX - rule_value = 1 - } - ] - } - ] - } - } - # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/category/sink-v2 -} \ No newline at end of file