Skip to content

Commit

Permalink
Throw IllegalArgumentException when find multiple connector jar for o…
Browse files Browse the repository at this point in the history
…ne pluginIdentifier (apache#5551)
  • Loading branch information
ruanwenjun authored Sep 28, 2023
1 parent fd77278 commit 512d982
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.commons.lang3.StringUtils;

import com.google.common.annotations.VisibleForTesting;

import java.io.File;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -67,7 +69,7 @@ public static DeployMode getDeployMode() {
return MODE;
}

private static String getSeaTunnelHome() {
public static String getSeaTunnelHome() {

if (StringUtils.isNotEmpty(SEATUNNEL_HOME)) {
return SEATUNNEL_HOME;
Expand All @@ -83,6 +85,11 @@ private static String getSeaTunnelHome() {
return SEATUNNEL_HOME;
}

@VisibleForTesting
public static void setSeaTunnelHome(String seatunnelHome) {
SEATUNNEL_HOME = seatunnelHome;
}

/**
* Root dir varies between different spark master and deploy mode, it also varies between
* relative and absolute path. When running seatunnel in --master local, you can put plugins
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -78,7 +79,7 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
};

private final Path pluginDir;
private final Config pluginConfig;
private final Config pluginMappingConfig;
private final BiConsumer<ClassLoader, URL> addURLToClassLoaderConsumer;
protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> pluginJarPath =
new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
Expand All @@ -95,16 +96,16 @@ public AbstractPluginDiscovery(Path pluginDir) {
this(pluginDir, loadConnectorPluginConfig());
}

public AbstractPluginDiscovery(Path pluginDir, Config pluginConfig) {
this(pluginDir, pluginConfig, DEFAULT_URL_TO_CLASSLOADER);
public AbstractPluginDiscovery(Path pluginDir, Config pluginMappingConfig) {
this(pluginDir, pluginMappingConfig, DEFAULT_URL_TO_CLASSLOADER);
}

public AbstractPluginDiscovery(
Path pluginDir,
Config pluginConfig,
Config pluginMappingConfig,
BiConsumer<ClassLoader, URL> addURLToClassLoaderConsumer) {
this.pluginDir = pluginDir;
this.pluginConfig = pluginConfig;
this.pluginMappingConfig = pluginMappingConfig;
this.addURLToClassLoaderConsumer = addURLToClassLoaderConsumer;
log.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir);
}
Expand Down Expand Up @@ -328,16 +329,13 @@ protected Optional<URL> getPluginJarPath(PluginIdentifier pluginIdentifier) {
* @return plugin jar path.
*/
private Optional<URL> findPluginJarPath(PluginIdentifier pluginIdentifier) {
if (pluginConfig.isEmpty()) {
return Optional.empty();
}
final String engineType = pluginIdentifier.getEngineType().toLowerCase();
final String pluginType = pluginIdentifier.getPluginType().toLowerCase();
final String pluginName = pluginIdentifier.getPluginName().toLowerCase();
if (!pluginConfig.hasPath(engineType)) {
if (!pluginMappingConfig.hasPath(engineType)) {
return Optional.empty();
}
Config engineConfig = pluginConfig.getConfig(engineType);
Config engineConfig = pluginMappingConfig.getConfig(engineType);
if (!engineConfig.hasPath(pluginType)) {
return Optional.empty();
}
Expand Down Expand Up @@ -365,15 +363,24 @@ public boolean accept(File pathname) {
if (ArrayUtils.isEmpty(targetPluginFiles)) {
return Optional.empty();
}
if (targetPluginFiles.length > 1) {
throw new IllegalArgumentException(
"Found multiple plugin jar: "
+ Arrays.stream(targetPluginFiles)
.map(File::getPath)
.collect(Collectors.joining(","))
+ " for pluginIdentifier: "
+ pluginIdentifier);
}
try {
URL pluginJarPath = targetPluginFiles[0].toURI().toURL();
log.info(
"Discovery plugin jar: {} at: {}",
pluginIdentifier.getPluginName(),
pluginJarPath);
log.info("Discovery plugin jar for: {} at: {}", pluginIdentifier, pluginJarPath);
return Optional.of(pluginJarPath);
} catch (MalformedURLException e) {
log.warn("Cannot get plugin URL: " + targetPluginFiles[0], e);
log.warn(
"Cannot get plugin URL: {} for pluginIdentifier: {}" + targetPluginFiles[0],
pluginIdentifier,
e);
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,33 @@
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.constants.PluginType;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import java.util.Map;
import java.util.Objects;

@DisabledOnOs(OS.WINDOWS)
public class AbstractPluginDiscoveryTest {

private String originSeatunnelHome = null;
private DeployMode originMode = null;
private static final String seatunnelHome =
AbstractPluginDiscoveryTest.class.getResource("/home").getPath();

@BeforeEach
public void before() {
originMode = Common.getDeployMode();
Common.setDeployMode(DeployMode.CLIENT);
originSeatunnelHome = Common.getSeaTunnelHome();
Common.setSeaTunnelHome(seatunnelHome);
}

@Test
public void testGetAllPlugins() {
Common.setDeployMode(DeployMode.CLIENT);
System.setProperty(
"SEATUNNEL_HOME",
Objects.requireNonNull(AbstractPluginDiscoveryTest.class.getResource("/home"))
.getPath());
Map<PluginIdentifier, String> sourcePlugins =
AbstractPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE);
Assertions.assertEquals(27, sourcePlugins.size());
Expand All @@ -47,4 +56,10 @@ public void testGetAllPlugins() {
AbstractPluginDiscovery.getAllSupportedPlugins(PluginType.SINK);
Assertions.assertEquals(30, sinkPlugins.size());
}

@AfterEach
public void after() {
Common.setSeaTunnelHome(originSeatunnelHome);
Common.setDeployMode(originMode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.plugin.discovery.seatunnel;

import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import com.google.common.collect.Lists;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;

@DisabledOnOs(OS.WINDOWS)
class SeaTunnelSourcePluginDiscoveryTest {

private String originSeatunnelHome = null;
private DeployMode originMode = null;
private static final String seatunnelHome =
SeaTunnelSourcePluginDiscoveryTest.class.getResource("/duplicate").getPath();
private static final List<Path> pluginJars =
Lists.newArrayList(
Paths.get(seatunnelHome, "connectors", "connector-http-jira.jar"),
Paths.get(seatunnelHome, "connectors", "connector-http.jar"));

@BeforeEach
public void before() throws IOException {
originMode = Common.getDeployMode();
Common.setDeployMode(DeployMode.CLIENT);
originSeatunnelHome = Common.getSeaTunnelHome();
Common.setSeaTunnelHome(seatunnelHome);

// The file is created under target directory.
for (Path pluginJar : pluginJars) {
Files.createFile(pluginJar);
}
}

@Test
void getPluginBaseClass() {
List<PluginIdentifier> pluginIdentifiers =
Lists.newArrayList(
PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), "HttpJira"),
PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), "HttpBase"));
SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
new SeaTunnelSourcePluginDiscovery();
Assertions.assertThrows(
IllegalArgumentException.class,
() -> seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers));
}

@AfterEach
public void after() throws IOException {
for (Path pluginJar : pluginJars) {
Files.deleteIfExists(pluginJar);
}
Common.setSeaTunnelHome(originSeatunnelHome);
Common.setDeployMode(originMode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

seatunnel.source.HttpBase = connector-http
seatunnel.sink.HttpBase = connector-http
seatunnel.source.HttpJira = connector-http-jira
seatunnel.sink.HttpJira = connector-http-jira

0 comments on commit 512d982

Please sign in to comment.