-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ST-Engine] Use SeatunnelChildFirstClassLoader to loader connector plugin #2479
Changes from all commits
9effca0
77398bd
98a0e72
9327116
e62d8e5
50d7311
c8459da
f49817a
2f1192c
a7a3db7
f0cb49f
8bc8660
31c26d1
0d06e0c
790f24b
c29882e
8a162e6
b9f9605
f2151a6
0351ca2
c8316d8
06353c0
968870d
e342f04
89967d7
53a6d0c
2ea7ac1
6203f0b
f34ba00
9355d19
770a506
bda40d9
0cbd074
dcb8931
2aad9fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,3 +42,5 @@ test.conf | |
log4j.properties | ||
spark-warehouse | ||
*.flattened-pom.xml | ||
|
||
connectors |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
~ Licensed to the Apache Software Foundation (ASF) under one or more | ||
~ contributor license agreements. See the NOTICE file distributed with | ||
~ this work for additional information regarding copyright ownership. | ||
~ The ASF licenses this file to You under the Apache License, Version 2.0 | ||
~ (the "License"); you may not use this file except in compliance with | ||
~ the License. You may obtain a copy of the License at | ||
~ | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Unless required by applicable law or agreed to in writing, software | ||
~ distributed under the License is distributed on an "AS IS" BASIS, | ||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
~ See the License for the specific language governing permissions and | ||
~ limitations under the License. | ||
--> | ||
|
||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>seatunnel-e2e</artifactId> | ||
<groupId>org.apache.seatunnel</groupId> | ||
<version>${revision}</version> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>seatunnel-engine-e2e</artifactId> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>com.hazelcast</groupId> | ||
<artifactId>hazelcast</artifactId> | ||
<scope>test</scope> | ||
<classifier>tests</classifier> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.seatunnel</groupId> | ||
<artifactId>seatunnel-engine-client</artifactId> | ||
<version>${project.version}</version> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>org.apache.seatunnel</groupId> | ||
<artifactId>connector-file-local</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>org.apache.seatunnel</groupId> | ||
<artifactId>connector-fake</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.seatunnel</groupId> | ||
<artifactId>seatunnel-connectors-v2-dist</artifactId> | ||
<version>${project.version}</version> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>*</groupId> | ||
<artifactId>*</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.seatunnel</groupId> | ||
<artifactId>seatunnel-engine-server</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
</dependencies> | ||
</project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.engine.e2e; | ||
|
||
import java.io.File; | ||
import java.io.IOException; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.Paths; | ||
import java.nio.file.StandardCopyOption; | ||
import java.util.Arrays; | ||
|
||
public class TestUtils { | ||
public static String getResource(String confFile) { | ||
return System.getProperty("user.dir") + "/src/test/resources" + confFile; | ||
} | ||
|
||
public static void initPluginDir() { | ||
// copy connectors to project_root/connectors dir | ||
System.setProperty("SEATUNNEL_HOME", System.getProperty("user.dir") + | ||
String.format("%s..%s..%s", File.separator, File.separator, File.separator)); | ||
File seatunnelRootDir = new File(System.getProperty("SEATUNNEL_HOME")); | ||
|
||
File connectorDir = new File(seatunnelRootDir + | ||
File.separator + | ||
"connectors/seatunnel"); | ||
|
||
if (connectorDir.exists()) { | ||
connectorDir.delete(); | ||
} | ||
|
||
connectorDir.mkdirs(); | ||
|
||
File connectorDistDir = new File( | ||
seatunnelRootDir + | ||
File.separator + | ||
"seatunnel-connectors-v2-dist" + | ||
File.separator + | ||
"target" + | ||
File.separator + | ||
"lib"); | ||
|
||
Arrays.stream(connectorDistDir.listFiles()).forEach(file -> { | ||
if (file.getName().startsWith("connector-")) { | ||
Path copied = Paths.get(connectorDir + File.separator + file.getName()); | ||
Path originalPath = file.toPath(); | ||
try { | ||
Files.copy(originalPath, copied, StandardCopyOption.REPLACE_EXISTING); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
}); | ||
|
||
Path targetPluginMappingFile = Paths.get(seatunnelRootDir + | ||
File.separator + | ||
"connectors" + | ||
File.separator + | ||
"plugin-mapping.properties"); | ||
|
||
Path sourcePluginMappingFile = Paths.get(seatunnelRootDir + File.separator + "plugin-mapping.properties"); | ||
try { | ||
Files.copy(sourcePluginMappingFile, targetPluginMappingFile, StandardCopyOption.REPLACE_EXISTING); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.engine.e2e.engine; | ||
|
||
import org.apache.seatunnel.common.config.Common; | ||
import org.apache.seatunnel.common.config.DeployMode; | ||
import org.apache.seatunnel.engine.client.SeaTunnelClient; | ||
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; | ||
import org.apache.seatunnel.engine.client.job.JobProxy; | ||
import org.apache.seatunnel.engine.common.config.ConfigProvider; | ||
import org.apache.seatunnel.engine.common.config.JobConfig; | ||
import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig; | ||
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; | ||
import org.apache.seatunnel.engine.core.job.JobStatus; | ||
import org.apache.seatunnel.engine.e2e.TestUtils; | ||
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext; | ||
|
||
import com.google.common.collect.Lists; | ||
import com.hazelcast.client.config.ClientConfig; | ||
import com.hazelcast.instance.impl.HazelcastInstanceFactory; | ||
import org.junit.Assert; | ||
import org.junit.BeforeClass; | ||
import org.junit.Test; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.concurrent.ExecutionException; | ||
|
||
public class JobExecutionIT { | ||
private static final Logger LOGGER = LoggerFactory.getLogger(JobExecutionIT.class); | ||
|
||
@BeforeClass | ||
public static void beforeClass() throws Exception { | ||
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); | ||
HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(), | ||
Thread.currentThread().getName(), | ||
new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig())); | ||
} | ||
|
||
@Test | ||
public void testSayHello() { | ||
SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig(); | ||
seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:5801")); | ||
SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig); | ||
|
||
String msg = "Hello world"; | ||
String s = engineClient.printMessageToMaster(msg); | ||
Assert.assertEquals(msg, s); | ||
} | ||
|
||
@Test | ||
public void testExecuteJob() { | ||
TestUtils.initPluginDir(); | ||
Common.setDeployMode(DeployMode.CLIENT); | ||
String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf"); | ||
JobConfig jobConfig = new JobConfig(); | ||
jobConfig.setName("fake_to_file"); | ||
|
||
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); | ||
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); | ||
JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig); | ||
|
||
JobProxy jobProxy = null; | ||
try { | ||
jobProxy = jobExecutionEnv.execute(); | ||
JobStatus jobStatus = jobProxy.waitForJobComplete(); | ||
Assert.assertEquals(JobStatus.FINISHED, jobStatus); | ||
} catch (ExecutionException | InterruptedException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
###### | ||
###### This config file is a demonstration of streaming processing in seatunnel config | ||
###### | ||
|
||
env { | ||
# You can set flink configuration here | ||
execution.parallelism = 1 | ||
job.mode = "STREAMING" | ||
execution.checkpoint.interval = 5000 | ||
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" | ||
} | ||
|
||
source { | ||
# This is a example source plugin **only for test and demonstrate the feature source plugin** | ||
FakeSource { | ||
result_table_name = "fake" | ||
field_name = "name,age", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use new config style? #2406 example: FakeSource {
result_table_name = "fake"
fields {
name = string
age = int
}
} |
||
parallelism = 3 | ||
} | ||
|
||
# If you would like to get more information about how to configure seatunnel and see full list of source plugins, | ||
# please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake | ||
} | ||
|
||
transform { | ||
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins, | ||
# please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
|
||
sink { | ||
LocalFile { | ||
path="file:///tmp/hive/warehouse/test2" | ||
field_delimiter="\t" | ||
row_delimiter="\n" | ||
partition_by=["age"] | ||
partition_dir_expression="${k0}=${v0}" | ||
is_partition_field_write_in_file=true | ||
file_name_expression="${transactionId}_${now}" | ||
file_format="text" | ||
sink_columns=["name","age"] | ||
filename_time_format="yyyy.MM.dd" | ||
is_enable_transaction=true | ||
save_mode="error" | ||
|
||
} | ||
|
||
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins, | ||
# please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will it stop automatically?