diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java index 88a13fe781b..e6ca3cbe499 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java @@ -19,6 +19,8 @@ import org.apache.commons.lang3.StringUtils; +import com.google.common.annotations.VisibleForTesting; + import java.io.File; import java.io.IOException; import java.net.URI; @@ -67,7 +69,7 @@ public static DeployMode getDeployMode() { return MODE; } - private static String getSeaTunnelHome() { + public static String getSeaTunnelHome() { if (StringUtils.isNotEmpty(SEATUNNEL_HOME)) { return SEATUNNEL_HOME; @@ -83,6 +85,11 @@ private static String getSeaTunnelHome() { return SEATUNNEL_HOME; } + @VisibleForTesting + public static void setSeaTunnelHome(String seatunnelHome) { + SEATUNNEL_HOME = seatunnelHome; + } + /** * Root dir varies between different spark master and deploy mode, it also varies between * relative and absolute path. When running seatunnel in --master local, you can put plugins diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java index ee27429fce8..a2006f7dc9d 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java @@ -47,6 +47,7 @@ import java.net.URL; import java.net.URLClassLoader; import java.nio.file.Path; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -78,7 +79,7 @@ public abstract class AbstractPluginDiscovery implements PluginDiscovery { }; private final Path pluginDir; - private final Config pluginConfig; + private final Config pluginMappingConfig; private final BiConsumer addURLToClassLoaderConsumer; protected final ConcurrentHashMap> pluginJarPath = new ConcurrentHashMap<>(Common.COLLECTION_SIZE); @@ -95,16 +96,16 @@ public AbstractPluginDiscovery(Path pluginDir) { this(pluginDir, loadConnectorPluginConfig()); } - public AbstractPluginDiscovery(Path pluginDir, Config pluginConfig) { - this(pluginDir, pluginConfig, DEFAULT_URL_TO_CLASSLOADER); + public AbstractPluginDiscovery(Path pluginDir, Config pluginMappingConfig) { + this(pluginDir, pluginMappingConfig, DEFAULT_URL_TO_CLASSLOADER); } public AbstractPluginDiscovery( Path pluginDir, - Config pluginConfig, + Config pluginMappingConfig, BiConsumer addURLToClassLoaderConsumer) { this.pluginDir = pluginDir; - this.pluginConfig = pluginConfig; + this.pluginMappingConfig = pluginMappingConfig; this.addURLToClassLoaderConsumer = addURLToClassLoaderConsumer; log.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir); } @@ -328,16 +329,13 @@ protected Optional getPluginJarPath(PluginIdentifier pluginIdentifier) { * @return plugin jar path. */ private Optional findPluginJarPath(PluginIdentifier pluginIdentifier) { - if (pluginConfig.isEmpty()) { - return Optional.empty(); - } final String engineType = pluginIdentifier.getEngineType().toLowerCase(); final String pluginType = pluginIdentifier.getPluginType().toLowerCase(); final String pluginName = pluginIdentifier.getPluginName().toLowerCase(); - if (!pluginConfig.hasPath(engineType)) { + if (!pluginMappingConfig.hasPath(engineType)) { return Optional.empty(); } - Config engineConfig = pluginConfig.getConfig(engineType); + Config engineConfig = pluginMappingConfig.getConfig(engineType); if (!engineConfig.hasPath(pluginType)) { return Optional.empty(); } @@ -365,15 +363,24 @@ public boolean accept(File pathname) { if (ArrayUtils.isEmpty(targetPluginFiles)) { return Optional.empty(); } + if (targetPluginFiles.length > 1) { + throw new IllegalArgumentException( + "Found multiple plugin jar: " + + Arrays.stream(targetPluginFiles) + .map(File::getPath) + .collect(Collectors.joining(",")) + + " for pluginIdentifier: " + + pluginIdentifier); + } try { URL pluginJarPath = targetPluginFiles[0].toURI().toURL(); - log.info( - "Discovery plugin jar: {} at: {}", - pluginIdentifier.getPluginName(), - pluginJarPath); + log.info("Discovery plugin jar for: {} at: {}", pluginIdentifier, pluginJarPath); return Optional.of(pluginJarPath); } catch (MalformedURLException e) { - log.warn("Cannot get plugin URL: " + targetPluginFiles[0], e); + log.warn( + "Cannot get plugin URL: {} for pluginIdentifier: {}" + targetPluginFiles[0], + pluginIdentifier, + e); return Optional.empty(); } } diff --git a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java index 5141d7b71dd..379fcd2aceb 100644 --- a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java +++ b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java @@ -21,24 +21,33 @@ import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.common.constants.PluginType; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; import java.util.Map; -import java.util.Objects; @DisabledOnOs(OS.WINDOWS) public class AbstractPluginDiscoveryTest { + private String originSeatunnelHome = null; + private DeployMode originMode = null; + private static final String seatunnelHome = + AbstractPluginDiscoveryTest.class.getResource("/home").getPath(); + + @BeforeEach + public void before() { + originMode = Common.getDeployMode(); + Common.setDeployMode(DeployMode.CLIENT); + originSeatunnelHome = Common.getSeaTunnelHome(); + Common.setSeaTunnelHome(seatunnelHome); + } + @Test public void testGetAllPlugins() { - Common.setDeployMode(DeployMode.CLIENT); - System.setProperty( - "SEATUNNEL_HOME", - Objects.requireNonNull(AbstractPluginDiscoveryTest.class.getResource("/home")) - .getPath()); Map sourcePlugins = AbstractPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE); Assertions.assertEquals(27, sourcePlugins.size()); @@ -47,4 +56,10 @@ public void testGetAllPlugins() { AbstractPluginDiscovery.getAllSupportedPlugins(PluginType.SINK); Assertions.assertEquals(30, sinkPlugins.size()); } + + @AfterEach + public void after() { + Common.setSeaTunnelHome(originSeatunnelHome); + Common.setDeployMode(originMode); + } } diff --git a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java new file mode 100644 index 00000000000..81333d4b4df --- /dev/null +++ b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.plugin.discovery.seatunnel; + +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.plugin.discovery.PluginIdentifier; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +@DisabledOnOs(OS.WINDOWS) +class SeaTunnelSourcePluginDiscoveryTest { + + private String originSeatunnelHome = null; + private DeployMode originMode = null; + private static final String seatunnelHome = + SeaTunnelSourcePluginDiscoveryTest.class.getResource("/duplicate").getPath(); + private static final List pluginJars = + Lists.newArrayList( + Paths.get(seatunnelHome, "connectors", "connector-http-jira.jar"), + Paths.get(seatunnelHome, "connectors", "connector-http.jar")); + + @BeforeEach + public void before() throws IOException { + originMode = Common.getDeployMode(); + Common.setDeployMode(DeployMode.CLIENT); + originSeatunnelHome = Common.getSeaTunnelHome(); + Common.setSeaTunnelHome(seatunnelHome); + + // The file is created under target directory. + for (Path pluginJar : pluginJars) { + Files.createFile(pluginJar); + } + } + + @Test + void getPluginBaseClass() { + List pluginIdentifiers = + Lists.newArrayList( + PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), "HttpJira"), + PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), "HttpBase")); + SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery = + new SeaTunnelSourcePluginDiscovery(); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers)); + } + + @AfterEach + public void after() throws IOException { + for (Path pluginJar : pluginJars) { + Files.deleteIfExists(pluginJar); + } + Common.setSeaTunnelHome(originSeatunnelHome); + Common.setDeployMode(originMode); + } +} diff --git a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties new file mode 100644 index 00000000000..be38939a7f0 --- /dev/null +++ b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +seatunnel.source.HttpBase = connector-http +seatunnel.sink.HttpBase = connector-http +seatunnel.source.HttpJira = connector-http-jira +seatunnel.sink.HttpJira = connector-http-jira \ No newline at end of file