Skip to content

Commit

Permalink
Throw IllegalArgumentException when find multiple connector jar for o…
Browse files Browse the repository at this point in the history
…ne pluginIdentifier
  • Loading branch information
ruanwenjun committed Sep 25, 2023
1 parent 17a4164 commit d972566
Showing 1 changed file with 36 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

package org.apache.seatunnel.plugin.discovery;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
Expand All @@ -34,11 +31,10 @@
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.common.utils.ReflectionUtils;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;

import java.io.File;
import java.io.FileFilter;
Expand All @@ -47,6 +43,7 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -59,6 +56,8 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {

Expand All @@ -78,7 +77,7 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
};

private final Path pluginDir;
private final Config pluginConfig;
private final Config pluginMappingConfig;
private final BiConsumer<ClassLoader, URL> addURLToClassLoaderConsumer;
protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> pluginJarPath =
new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
Expand All @@ -95,16 +94,16 @@ public AbstractPluginDiscovery(Path pluginDir) {
this(pluginDir, loadConnectorPluginConfig());
}

public AbstractPluginDiscovery(Path pluginDir, Config pluginConfig) {
this(pluginDir, pluginConfig, DEFAULT_URL_TO_CLASSLOADER);
public AbstractPluginDiscovery(Path pluginDir, Config pluginMappingConfig) {
this(pluginDir, pluginMappingConfig, DEFAULT_URL_TO_CLASSLOADER);
}

public AbstractPluginDiscovery(
Path pluginDir,
Config pluginConfig,
BiConsumer<ClassLoader, URL> addURLToClassLoaderConsumer) {
Path pluginDir,
Config pluginMappingConfig,
BiConsumer<ClassLoader, URL> addURLToClassLoaderConsumer) {
this.pluginDir = pluginDir;
this.pluginConfig = pluginConfig;
this.pluginMappingConfig = pluginMappingConfig;
this.addURLToClassLoaderConsumer = addURLToClassLoaderConsumer;
log.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir);
}
Expand Down Expand Up @@ -328,52 +327,48 @@ protected Optional<URL> getPluginJarPath(PluginIdentifier pluginIdentifier) {
* @return plugin jar path.
*/
private Optional<URL> findPluginJarPath(PluginIdentifier pluginIdentifier) {
if (pluginConfig.isEmpty()) {
return Optional.empty();
}
final String engineType = pluginIdentifier.getEngineType().toLowerCase();
final String pluginType = pluginIdentifier.getPluginType().toLowerCase();
final String pluginName = pluginIdentifier.getPluginName().toLowerCase();
if (!pluginConfig.hasPath(engineType)) {
if (pluginMappingConfig.isEmpty() || !pluginMappingConfig.hasPath(engineType)) {
return Optional.empty();
}
Config engineConfig = pluginConfig.getConfig(engineType);
Config engineConfig = pluginMappingConfig.getConfig(engineType);
if (!engineConfig.hasPath(pluginType)) {
return Optional.empty();
}
Config typeConfig = engineConfig.getConfig(pluginType);
Optional<Map.Entry<String, ConfigValue>> optional =
typeConfig.entrySet().stream()
.filter(entry -> StringUtils.equalsIgnoreCase(entry.getKey(), pluginName))
.findFirst();
typeConfig.entrySet().stream()
.filter(entry -> StringUtils.equalsIgnoreCase(entry.getKey(), pluginName))
.findFirst();
if (!optional.isPresent()) {
return Optional.empty();
}
String pluginJarPrefix = optional.get().getValue().unwrapped().toString();
File[] targetPluginFiles =
pluginDir
.toFile()
.listFiles(
new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().endsWith(".jar")
&& StringUtils.startsWithIgnoreCase(
pathname.getName(), pluginJarPrefix);
}
});
pluginDir
.toFile()
.listFiles(
new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().endsWith(".jar") && StringUtils.startsWithIgnoreCase(pathname.getName(), pluginJarPrefix);
}
});
if (ArrayUtils.isEmpty(targetPluginFiles)) {
return Optional.empty();
}
if (targetPluginFiles.length > 1) {
throw new IllegalArgumentException("Found multiple plugin jar: " +
Arrays.stream(targetPluginFiles).map(File::getPath).collect(Collectors.joining(",")) + " for pluginIdentifier: " + pluginIdentifier);
}
try {
URL pluginJarPath = targetPluginFiles[0].toURI().toURL();
log.info(
"Discovery plugin jar: {} at: {}",
pluginIdentifier.getPluginName(),
pluginJarPath);
log.info("Discovery plugin jar for: {} at: {}", pluginIdentifier, pluginJarPath);
return Optional.of(pluginJarPath);
} catch (MalformedURLException e) {
log.warn("Cannot get plugin URL: " + targetPluginFiles[0], e);
log.warn("Cannot get plugin URL: {} for pluginIdentifier: {}" + targetPluginFiles[0], pluginIdentifier, e);
return Optional.empty();
}
}
Expand Down

0 comments on commit d972566

Please sign in to comment.