Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ST-Engine] Use SeatunnelChildFirstClassLoader to loader connector plugin #2479

Merged
merged 35 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9effca0
Add SeaTunnel Engine ConfigProvider and seatunnel-seatunnel-starter
EricJoy2048 Aug 3, 2022
77398bd
add source file to licenserc.yaml
EricJoy2048 Aug 3, 2022
98a0e72
fix checkstyle
EricJoy2048 Aug 3, 2022
9327116
fix error
EricJoy2048 Aug 4, 2022
e62d8e5
Merge remote-tracking branch 'apache/st-engine' into scheduler
EricJoy2048 Aug 4, 2022
50d7311
tmp
EricJoy2048 Aug 4, 2022
c8459da
tmp
EricJoy2048 Aug 5, 2022
f49817a
Update PhysicalPlan to support scheduler by pipeline
EricJoy2048 Aug 8, 2022
2f1192c
fix error
EricJoy2048 Aug 8, 2022
a7a3db7
remove init in run
EricJoy2048 Aug 8, 2022
f0cb49f
tmp
EricJoy2048 Aug 9, 2022
8bc8660
complete scheduler
EricJoy2048 Aug 10, 2022
31c26d1
opitimze state log
EricJoy2048 Aug 10, 2022
0d06e0c
optimize scheduler
EricJoy2048 Aug 11, 2022
790f24b
merge from upstream
EricJoy2048 Aug 11, 2022
c29882e
fix checkstyle
EricJoy2048 Aug 11, 2022
8a162e6
fix checkstyle
EricJoy2048 Aug 11, 2022
b9f9605
Add wait for job complete in JobProxy
EricJoy2048 Aug 12, 2022
f2151a6
Add wait for job complete in JobProxy
EricJoy2048 Aug 12, 2022
0351ca2
merge from upstream
EricJoy2048 Aug 15, 2022
c8316d8
add license header
EricJoy2048 Aug 15, 2022
06353c0
merge from upstream
EricJoy2048 Aug 16, 2022
968870d
fix UT error
EricJoy2048 Aug 16, 2022
e342f04
tmp
EricJoy2048 Aug 17, 2022
89967d7
tmp
EricJoy2048 Aug 17, 2022
53a6d0c
merge from upstream
EricJoy2048 Aug 19, 2022
2ea7ac1
fix merge bug
EricJoy2048 Aug 19, 2022
6203f0b
fix main workflow error
EricJoy2048 Aug 22, 2022
f34ba00
fix ci error
EricJoy2048 Aug 22, 2022
9355d19
merge from upstream
EricJoy2048 Aug 22, 2022
770a506
merge from upstream
EricJoy2048 Aug 22, 2022
bda40d9
merge from upstream
EricJoy2048 Aug 22, 2022
0cbd074
fix style error
EricJoy2048 Aug 23, 2022
dcb8931
merge from upstream
EricJoy2048 Aug 24, 2022
2aad9fa
update job to bacth job
EricJoy2048 Aug 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@ test.conf
log4j.properties
spark-warehouse
*.flattened-pom.xml

connectors
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.common.config;

import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.net.URISyntaxException;
import java.nio.file.Path;
Expand Down Expand Up @@ -97,14 +99,24 @@ public static Path connectorRootDir(String engine) {
* Plugin Connector Jar Dir
*/
public static Path connectorJarDir(String engine) {
return Paths.get(appRootDir().toString(), "connectors", engine.toLowerCase());
String seatunnelHome = System.getProperty("SEATUNNEL_HOME");
if (StringUtils.isBlank(seatunnelHome)) {
return Paths.get(appRootDir().toString(), "connectors", engine.toLowerCase());
} else {
return Paths.get(seatunnelHome, "connectors", engine.toLowerCase());
}
}

/**
* Plugin Connector Dir
*/
public static Path connectorDir() {
return Paths.get(appRootDir().toString(), "connectors");
String seatunnelHome = System.getProperty("SEATUNNEL_HOME");
if (StringUtils.isBlank(seatunnelHome)) {
return Paths.get(appRootDir().toString(), "connectors");
} else {
return Paths.get(seatunnelHome, "connectors");
}
}

public static Path pluginTarball() {
Expand Down
1 change: 1 addition & 0 deletions seatunnel-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<module>seatunnel-flink-connector-v2-e2e</module>
<module>seatunnel-spark-connector-v2-e2e</module>
<module>seatunnel-flink-sql-e2e</module>
<module>seatunnel-engine-e2e</module>
</modules>
<dependencies>
<dependency>
Expand Down
70 changes: 70 additions & 0 deletions seatunnel-e2e/seatunnel-engine-e2e/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>seatunnel-e2e</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>seatunnel-engine-e2e</artifactId>

<dependencies>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-engine-client</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file-local</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-fake</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connectors-v2-dist</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-engine-server</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.e2e;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;

public class TestUtils {
public static String getResource(String confFile) {
return System.getProperty("user.dir") + "/src/test/resources" + confFile;
}

public static void initPluginDir() {
// copy connectors to project_root/connectors dir
System.setProperty("SEATUNNEL_HOME", System.getProperty("user.dir") +
String.format("%s..%s..%s", File.separator, File.separator, File.separator));
File seatunnelRootDir = new File(System.getProperty("SEATUNNEL_HOME"));

File connectorDir = new File(seatunnelRootDir +
File.separator +
"connectors/seatunnel");

if (connectorDir.exists()) {
connectorDir.delete();
}

connectorDir.mkdirs();

File connectorDistDir = new File(
seatunnelRootDir +
File.separator +
"seatunnel-connectors-v2-dist" +
File.separator +
"target" +
File.separator +
"lib");

Arrays.stream(connectorDistDir.listFiles()).forEach(file -> {
if (file.getName().startsWith("connector-")) {
Path copied = Paths.get(connectorDir + File.separator + file.getName());
Path originalPath = file.toPath();
try {
Files.copy(originalPath, copied, StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});

Path targetPluginMappingFile = Paths.get(seatunnelRootDir +
File.separator +
"connectors" +
File.separator +
"plugin-mapping.properties");

Path sourcePluginMappingFile = Paths.get(seatunnelRootDir + File.separator + "plugin-mapping.properties");
try {
Files.copy(sourcePluginMappingFile, targetPluginMappingFile, StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.e2e.engine;

import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.JobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.e2e.TestUtils;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;

import com.google.common.collect.Lists;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;

public class JobExecutionIT {
private static final Logger LOGGER = LoggerFactory.getLogger(JobExecutionIT.class);

@BeforeClass
public static void beforeClass() throws Exception {
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
Thread.currentThread().getName(),
new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
}

@Test
public void testSayHello() {
SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig();
seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:5801"));
SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig);

String msg = "Hello world";
String s = engineClient.printMessageToMaster(msg);
Assert.assertEquals(msg, s);
}

@Test
public void testExecuteJob() {
TestUtils.initPluginDir();
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_file");

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);

JobProxy jobProxy = null;
try {
jobProxy = jobExecutionEnv.execute();
JobStatus jobStatus = jobProxy.waitForJobComplete();
Assert.assertEquals(JobStatus.FINISHED, jobStatus);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "STREAMING"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it stop automatically?

execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
field_name = "name,age",
Copy link
Member

@hailin0 hailin0 Aug 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use new config style? #2406

example:

FakeSource {
    result_table_name = "fake"
    fields {
        name = string
        age = int
    }
}

parallelism = 3
}

# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use https://seatunnel.apache.org/docs/transform/sql ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

sink {
LocalFile {
path="file:///tmp/hive/warehouse/test2"
field_delimiter="\t"
row_delimiter="\n"
partition_by=["age"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format="text"
sink_columns=["name","age"]
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error"

}

# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use https://seatunnel.apache.org/docs/connector-v2/sink/LocalFile ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
Loading