Skip to content

Commit

Permalink
[Feature] split transform
Browse files Browse the repository at this point in the history
  • Loading branch information
liunaijie committed Aug 5, 2024
1 parent 34a6b8e commit b9223cc
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 48 deletions.
9 changes: 9 additions & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,12 @@ seatunnel.source.ObsFile = connector-file-obs
seatunnel.sink.ObsFile = connector-file-obs
seatunnel.source.Milvus = connector-milvus
seatunnel.sink.Milvus = connector-milvus
seatunnel.transform.Sql = seatunnel-transforms-v2
seatunnel.transform.FieldMapper = seatunnel-transforms-v2
seatunnel.transform.Filter = seatunnel-transforms-v2
seatunnel.transform.FilterRowKind = seatunnel-transforms-v2
seatunnel.transform.JsonPath = seatunnel-transforms-v2
seatunnel.transform.Replace = seatunnel-transforms-v2
seatunnel.transform.Split = seatunnel-transforms-v2
seatunnel.transform.Copy = seatunnel-transforms-v2
seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
48 changes: 17 additions & 31 deletions seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,15 @@
<scope>provided</scope>
</dependencySet>

<!-- ============ Connectors Jars ============ -->
<!-- ============ Connectors Jars And Transforms V2 Jar ============ -->
<!-- SeaTunnel connectors -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveDependencies>true</useTransitiveDependencies>
<unpack>false</unpack>
<includes>
<include>org.apache.seatunnel:connector-*:jar</include>
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
</includes>
<excludes>
<exclude>org.apache.seatunnel:connector-common</exclude>
Expand All @@ -160,36 +161,7 @@
<scope>provided</scope>
</dependencySet>

<!-- ============ SeaTunnel Transforms V2 Jars And SeaTunnel Hadoop3 Uber Jar ============ -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveDependencies>true</useTransitiveDependencies>
<unpack>false</unpack>
<includes>
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
<include>org.apache.hadoop:hadoop-aws:jar</include>
<include>com.amazonaws:aws-java-sdk-bundle:jar</include>
<include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include>
<!--Add hadoop aliyun jar -->
<include>org.apache.hadoop:hadoop-aliyun:jar</include>
<include>com.aliyun.oss:aliyun-sdk-oss:jar</include>
<include>org.jdom:jdom:jar</include>

<!--Add netty buffer jar -->
<include>io.netty:netty-buffer:jar</include>
<include>io.netty:netty-common:jar</include>

<!--Add hive exec jar -->
<include>org.apache.hive:hive-exec:jar</include>
<include>org.apache.hive:hive-service:jar</include>
<include>org.apache.thrift:libfb303:jar</include>
</includes>
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
<outputDirectory>/lib</outputDirectory>
<scope>provided</scope>
</dependencySet>

<!-- =================== JDBC Connector Drivers =================== -->
<!-- =================== JDBC Connector Drivers And SeaTunnel Hadoop3 Uber Jar =================== -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveDependencies>true</useTransitiveDependencies>
Expand All @@ -209,6 +181,20 @@
<include>com.amazon.redshift:redshift-jdbc42:jar</include>
<include>net.snowflake.snowflake-jdbc:jar</include>
<include>com.xugudb:xugu-jdbc:jar</include>
<include>org.apache.hadoop:hadoop-aws:jar</include>
<include>com.amazonaws:aws-java-sdk-bundle:jar</include>
<include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include>
<!--Add hadoop aliyun jar -->
<include>org.apache.hadoop:hadoop-aliyun:jar</include>
<include>com.aliyun.oss:aliyun-sdk-oss:jar</include>
<include>org.jdom:jdom:jar</include>
<!--Add netty buffer jar -->
<include>io.netty:netty-buffer:jar</include>
<include>io.netty:netty-common:jar</include>
<!--Add hive exec jar -->
<include>org.apache.hive:hive-exec:jar</include>
<include>org.apache.hive:hive-service:jar</include>
<include>org.apache.thrift:libfb303:jar</include>
</includes>
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
<outputDirectory>/lib</outputDirectory>
Expand Down
6 changes: 3 additions & 3 deletions seatunnel-dist/src/main/assembly/assembly-bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,21 +161,20 @@
<scope>provided</scope>
</dependencySet>

<!-- ============ SeaTunnel Transforms V2 Jars And SeaTunnel Hadoop3 Uber Jar============ -->
<!-- ============ SeaTunnel Hadoop3 Uber Jar============ -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveDependencies>true</useTransitiveDependencies>
<unpack>false</unpack>
<includes>
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
<include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include>
</includes>
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
<outputDirectory>/lib</outputDirectory>
<scope>provided</scope>
</dependencySet>

<!-- ============ Connectors Jars ============ -->
<!-- ============ Connectors Jars And Transforms V2 Jar ============ -->
<!-- SeaTunnel connectors for Demo -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
Expand All @@ -184,6 +183,7 @@
<includes>
<include>org.apache.seatunnel:connector-fake:jar</include>
<include>org.apache.seatunnel:connector-console:jar</include>
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
</includes>
<outputDirectory>/connectors</outputDirectory>
<scope>provided</scope>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,13 @@ public static void copySeaTunnelStarterToContainer(
MountableFile.forHostPath(startJarPath),
Paths.get(seatunnelHomeInContainer, "starter", startJarName).toString());

// copy lib
// copy transform
String transformJar = "seatunnel-transforms-v2.jar";
Path transformJarPath =
Paths.get(PROJECT_ROOT_PATH, "seatunnel-transforms-v2", "target", transformJar);
container.withCopyFileToContainer(
MountableFile.forHostPath(transformJarPath),
Paths.get(seatunnelHomeInContainer, "lib", transformJar).toString());
Paths.get(seatunnelHomeInContainer, "connectors", transformJar).toString());

// copy bin
final String startBinPath = startModulePath + File.separator + "src/main/bin/";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService classLoade
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "sink", Collections.emptyList());

List<URL> connectorJars = getConnectorJarList(sourceConfigs, sinkConfigs);
List<URL> connectorJars = getConnectorJarList(sourceConfigs, transformConfigs, sinkConfigs);
if (!commonPluginJars.isEmpty()) {
connectorJars.addAll(commonPluginJars);
}
Expand Down Expand Up @@ -238,18 +238,32 @@ public Set<URL> getUsedFactoryUrls(List<Action> sinkActions) {
}

private List<URL> getConnectorJarList(
List<? extends Config> sourceConfigs, List<? extends Config> sinkConfigs) {
List<? extends Config> sourceConfigs,
List<? extends Config> transformConfigs,
List<? extends Config> sinkConfigs) {
List<PluginIdentifier> factoryIds =
Stream.concat(
sourceConfigs.stream()
.map(ConfigParserUtil::getFactoryId)
.map(
factory ->
PluginIdentifier.of(
CollectionConstants
.SEATUNNEL_PLUGIN,
CollectionConstants.SOURCE_PLUGIN,
factory)),
Stream.concat(
sourceConfigs.stream()
.map(ConfigParserUtil::getFactoryId)
.map(
factory ->
PluginIdentifier.of(
CollectionConstants
.SEATUNNEL_PLUGIN,
CollectionConstants
.SOURCE_PLUGIN,
factory)),
transformConfigs.stream()
.map(ConfigParserUtil::getFactoryId)
.map(
factory ->
PluginIdentifier.of(
CollectionConstants
.SEATUNNEL_PLUGIN,
CollectionConstants
.TRANSFORM_PLUGIN,
factory))),
sinkConfigs.stream()
.map(ConfigParserUtil::getFactoryId)
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class SeaTunnelTransformPluginDiscovery extends AbstractPluginDiscovery<SeaTunnelTransform> {

public SeaTunnelTransformPluginDiscovery() {
super(Common.libDir());
super(Common.connectorDir());
}

@Override
Expand Down

0 comments on commit b9223cc

Please sign in to comment.