diff --git a/common/src/main/java/de/xab/porter/common/spi/ExtensionLoader.java b/common/src/main/java/de/xab/porter/common/spi/ExtensionLoader.java
index a9fad9b..41bf16f 100644
--- a/common/src/main/java/de/xab/porter/common/spi/ExtensionLoader.java
+++ b/common/src/main/java/de/xab/porter/common/spi/ExtensionLoader.java
@@ -1,7 +1,5 @@
package de.xab.porter.common.spi;
-import de.xab.porter.api.exception.PorterException;
-
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -15,163 +13,148 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import static de.xab.porter.common.util.Strings.notNullOrEmpty;
-
/**
* a extension loader can load any implements of one service, which is describe in resources/extensions.
* ExtensionLoader load class using current class loader, construct new instance and inject type for consuming
*
* extension must have these features:
* implement at least one service,
- * registered in resources/extensions
+ * registered in resources/META-INF/porter
*
- * any service must opens to module {@link de.xab.porter.common}
+ * any extensions must opens to module {@link de.xab.porter.common}
*
* comments after # will be ignored
*/
-public class ExtensionLoader {
+public class ExtensionLoader {
private static final String FOLDER = "META-INF/porter/";
- private static final Map, Map>> EXTENSION_HOLDER = new ConcurrentHashMap<>();
+ private static final Map, ExtensionLoader>> LOADERS = new ConcurrentHashMap<>();
+ private final Map> extensions = new ConcurrentHashMap<>();
+ private Class service;
- public static ExtensionLoader getExtensionLoader() {
- return new ExtensionLoader();
+ public static ExtensionLoader getExtensionLoader(Class service) {
+ if (service == null) {
+ throw new IllegalArgumentException("service is null");
+ }
+ if (!service.isInterface()) {
+ throw new IllegalArgumentException(String.format("service %s is not a interface", service));
+ }
+ ExtensionLoader loader = (ExtensionLoader) LOADERS.get(service);
+ if (loader == null) {
+ LOADERS.putIfAbsent(service, new ExtensionLoader());
+ loader = (ExtensionLoader) LOADERS.get(service);
+ loader.service = service;
+ }
+ return loader;
}
- public T loadExtension(String type, Class clazz) {
- final Class> driverClass = loadClass(type, clazz);
- if (!driverClass.getModule().isOpen(driverClass.getPackageName(), this.getClass().getModule())) {
- throw new PorterException(String.format(
- "cannot access class %s at %s", driverClass.getName(), this.getClass().getModule()));
+ public T loadExtension(String type) {
+ Class clazz = loadExtensionClass(type);
+ if (!clazz.getModule().isOpen(clazz.getPackageName(), this.getClass().getModule())) {
+ throw new RuntimeException(String.format(
+ "cannot access class %s at %s", clazz.getName(), this.getClass().getModule()));
}
try {
- final T instance = (T) driverClass.getConstructor().newInstance();
+ T instance = clazz.getConstructor().newInstance();
injectExtension(instance, type);
return instance;
- } catch (InstantiationException | IllegalAccessException
- | InvocationTargetException | NoSuchMethodException e) {
- throw new PorterException("extension construct failed", e);
+ } catch (InstantiationException | NoSuchMethodException
+ | IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(String.format("cannot create extension %s of %s", type, this.service), e);
}
}
- /**
- * load Class of extensions. PorterException will be thrown as load failed
- *
- * @param type exactly type of extension
- * @param clazz Class of service
- * @return Class of extension
- */
- public Class> loadClass(String type, Class> clazz) {
- Class> subClass = EXTENSION_HOLDER.computeIfAbsent(clazz, ignored -> new ConcurrentHashMap<>()).get(type);
- if (subClass != null) {
- return subClass;
- }
- final String fileName = FOLDER + clazz.getName();
- final Enumeration urls;
- final ClassLoader cl = findClassLoader(clazz);
- try {
- urls = cl.getResources(fileName);
- } catch (IOException e) {
- throw new PorterException(String.format("no implement(s) of %s found in %s", clazz.getName(), fileName));
+ private Class loadExtensionClass(String type) {
+ Class extensionClass = this.extensions.get(type);
+ ClassLoader classLoader = findClassLoader(this.service);
+ if (extensionClass == null) {
+ synchronized (this.extensions) {
+ extensionClass = this.extensions.get(type);
+ if (extensionClass == null) {
+ String extensionName = null;
+ try {
+ extensionName = findExtensionName(classLoader, type);
+ extensionClass = (Class) classLoader.loadClass(extensionName);
+ } catch (IOException e) {
+ throw new TypeNotPresentException(type, e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(
+ String.format("cannot load class %s for %s: %s", extensionName, type, this.service));
+ }
+ if (!implementedInterface(extensionClass, this.service)) {
+ throw new IllegalStateException(extensionName + " not implemented " + this.service);
+ }
+ this.extensions.put(type, extensionClass);
+ }
+ }
}
+ return extensionClass;
+ }
+
+ private String findExtensionName(ClassLoader classLoader, String type) throws IOException {
+ Enumeration urls;
+ String resourceFolder = FOLDER + this.service.getName();
+ urls = classLoader.getResources(resourceFolder);
while (urls != null && urls.hasMoreElements()) {
- final URL url = urls.nextElement();
+ URL url = urls.nextElement();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(
url.openStream(), StandardCharsets.UTF_8))) {
String line;
- final Map> specifyClassMap = EXTENSION_HOLDER.get(clazz);
while ((line = reader.readLine()) != null) {
- Map.Entry entry = parseTypeAndClass(line, clazz);
- String driverType = entry.getKey();
- String className = entry.getValue();
- if (notNullOrEmpty(className)) {
- subClass = loadClassByName(cl, clazz, className);
- specifyClassMap.putIfAbsent(driverType, subClass);
- EXTENSION_HOLDER.put(clazz, specifyClassMap);
- if (type.equals(driverType)) {
- return subClass;
- }
+ Map.Entry tuple = parseLine(line);
+ if (tuple == null) {
+ continue;
+ }
+ String extensionType = tuple.getKey();
+ String extensionName = tuple.getValue();
+ if (type.equals(extensionType)) {
+ return extensionName;
}
}
- } catch (IOException e) {
- throw new PorterException(String.format("load extension class %s failed", clazz.getName()), e);
}
}
- throw new PorterException(String.format("type `%s` of extension %s not found", type, clazz.getName()));
+ throw new IOException("no appropriate type found for " + this.service);
}
- private Map.Entry parseTypeAndClass(String line, Class> clazz) {
- String driverType;
- String className;
- String newLine = line;
- final int ci = newLine.indexOf('#');
+ private Map.Entry parseLine(String origin) {
+ String extensionType;
+ String extensionName;
+ String line = origin;
+ int ci = line.indexOf('#');
if (ci >= 0) {
- newLine = newLine.substring(0, ci);
+ line = line.substring(0, ci);
}
- newLine = newLine.trim();
- if (newLine.length() > 0) {
- final String[] split = newLine.split("=");
+ line = line.trim();
+ if (line.length() > 0) {
+ final String[] split = line.split("=");
if (split.length == 2) {
- driverType = split[0];
- className = split[1];
- return Map.entry(driverType, className);
- }
- }
- throw new PorterException(String.format(
- "parse extension %s failed. expected `type=foo.bar.Extension`, got %s", clazz.getSimpleName(), line));
- }
-
- /**
- * load class by given Class name
- *
- * @param loader classloader to load class
- * @param service service of class implemented
- * @param className class name to be loaded
- * @return Class if load successfully
- */
- private Class> loadClassByName(ClassLoader loader, Class> service, String className) {
- try {
- Class> subClass = loader.loadClass(className);
- if (implementedInterface(subClass, service)) {
- return subClass;
- } else {
- throw new PorterException(String.format("no implement classes found of class %s", service));
+ extensionType = split[0];
+ extensionName = split[1];
+ return Map.entry(extensionType, extensionName);
}
- } catch (ClassNotFoundException e) {
- throw new PorterException(String.format("class %s not found", className), e);
}
+ return null;
}
- /**
- * whether extension defined in resources implemented service.
- *
- * @param subClass extension Class
- * @param interfaceClass service Class
- * @return true if extension if sub class of service
- */
- private boolean implementedInterface(Class> subClass, Class> interfaceClass) {
- Class> currentClass = subClass;
+ private boolean implementedInterface(Class extensionClass, Class serviceClass) {
+ Class> currentClass = extensionClass;
boolean isImplemented = false;
while (!isImplemented) {
if (currentClass == Object.class) {
break;
}
isImplemented = Arrays.stream(currentClass.getInterfaces()).
- anyMatch(oneInterface -> oneInterface == interfaceClass);
+ anyMatch(oneInterface -> oneInterface == serviceClass);
currentClass = currentClass.getSuperclass();
}
return isImplemented;
}
- private void injectExtension(T instance, String type) {
+ private void injectExtension(T instance, String type) throws InvocationTargetException, IllegalAccessException {
for (Method method : instance.getClass().getMethods()) {
if (!isTypeSetter(method)) {
continue;
}
- try {
- method.invoke(instance, type);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new PorterException(String.format("instance %s inject %s failed", instance, type));
- }
+ method.invoke(instance, type);
}
}
@@ -181,7 +164,7 @@ private boolean isTypeSetter(Method method) {
&& method.getParameterCount() == 1;
}
- private ClassLoader findClassLoader(Class> clazz) {
+ private ClassLoader findClassLoader(Class clazz) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null) {
cl = clazz.getClassLoader();
@@ -191,4 +174,4 @@ private ClassLoader findClassLoader(Class> clazz) {
}
return cl;
}
-}
+}
\ No newline at end of file
diff --git a/common/src/main/java/de/xab/porter/common/util/Strings.java b/common/src/main/java/de/xab/porter/common/util/Strings.java
index 7135ddb..76a64a8 100644
--- a/common/src/main/java/de/xab/porter/common/util/Strings.java
+++ b/common/src/main/java/de/xab/porter/common/util/Strings.java
@@ -7,7 +7,7 @@ public final class Strings {
private Strings() {
}
- public static boolean notNullOrEmpty(String str) {
+ public static boolean notNullOrBlank(String str) {
return str != null && !str.isBlank();
}
}
diff --git a/common/src/test/java/de/xab/porter/common/test/spi/SPITest.java b/common/src/test/java/de/xab/porter/common/test/spi/SPITest.java
index 229b2dd..8b7d9cb 100644
--- a/common/src/test/java/de/xab/porter/common/test/spi/SPITest.java
+++ b/common/src/test/java/de/xab/porter/common/test/spi/SPITest.java
@@ -1,6 +1,5 @@
package de.xab.porter.common.test.spi;
-import de.xab.porter.api.exception.PorterException;
import de.xab.porter.common.spi.ExtensionLoader;
import de.xab.porter.common.test.spi.service.MockService;
import de.xab.porter.common.test.spi.service.UnregisteredService;
@@ -15,31 +14,31 @@
public class SPITest {
@Test
public void testExists() {
- MockService impl = ExtensionLoader.getExtensionLoader().loadExtension("impl", MockService.class);
+ MockService impl = ExtensionLoader.getExtensionLoader(MockService.class).loadExtension("impl");
assertEquals("hello world", impl.mock());
}
@Test
public void testNotExists() {
- assertThrows(PorterException.class, () ->
- ExtensionLoader.getExtensionLoader().loadExtension("none", MockService.class));
+ assertThrows(TypeNotPresentException.class, () ->
+ ExtensionLoader.getExtensionLoader(MockService.class).loadExtension("none"));
}
@Test
public void testTypoImpl() {
- assertThrows(PorterException.class, () ->
- ExtensionLoader.getExtensionLoader().loadExtension("typo", MockService.class));
+ assertThrows(IllegalArgumentException.class, () ->
+ ExtensionLoader.getExtensionLoader(MockService.class).loadExtension("typo"));
}
@Test
public void testNoImplemented() {
- assertThrows(PorterException.class, () ->
- ExtensionLoader.getExtensionLoader().loadExtension("noimpl", MockService.class));
+ assertThrows(IllegalStateException.class, () ->
+ ExtensionLoader.getExtensionLoader(MockService.class).loadExtension("noimpl"));
}
@Test
public void testNoneRegistered() {
- assertThrows(PorterException.class, () ->
- ExtensionLoader.getExtensionLoader().loadExtension("any", UnregisteredService.class));
+ assertThrows(TypeNotPresentException.class, () ->
+ ExtensionLoader.getExtensionLoader(UnregisteredService.class).loadExtension("any"));
}
}
diff --git a/common/src/test/java/de/xab/porter/common/test/string/StringTest.java b/common/src/test/java/de/xab/porter/common/test/string/StringTest.java
index 71dab34..226db77 100644
--- a/common/src/test/java/de/xab/porter/common/test/string/StringTest.java
+++ b/common/src/test/java/de/xab/porter/common/test/string/StringTest.java
@@ -13,24 +13,24 @@ public class StringTest {
@Test
public void testNull() {
String str = null;
- assertFalse(Strings.notNullOrEmpty(str));
+ assertFalse(Strings.notNullOrBlank(str));
}
@Test
public void testEmpty() {
String str = "";
- assertFalse(Strings.notNullOrEmpty(str));
+ assertFalse(Strings.notNullOrBlank(str));
}
@Test
public void testNotNullOrEmpty() {
String str = "abc";
- Assertions.assertTrue(Strings.notNullOrEmpty(str));
+ Assertions.assertTrue(Strings.notNullOrBlank(str));
}
@Test
public void testLongEmpty() {
String str = " ";
- assertFalse(Strings.notNullOrEmpty(str));
+ assertFalse(Strings.notNullOrBlank(str));
}
}
diff --git a/core/src/main/java/de/xab/porter/core/Task.java b/core/src/main/java/de/xab/porter/core/Task.java
index daf08fa..c8a005d 100644
--- a/core/src/main/java/de/xab/porter/core/Task.java
+++ b/core/src/main/java/de/xab/porter/core/Task.java
@@ -27,8 +27,8 @@ public Task(Context context) {
}
public void init() {
- final SrcConnection srcConnection = context.getSrcConnection();
- this.reader = ExtensionLoader.getExtensionLoader().loadExtension(srcConnection.getType(), Reader.class);
+ SrcConnection srcConnection = context.getSrcConnection();
+ this.reader = ExtensionLoader.getExtensionLoader(Reader.class).loadExtension(srcConnection.getType());
this.reader.setChannels(new ArrayList<>());
register();
//todo split
@@ -38,15 +38,15 @@ public void init() {
* construct relations among reader, writer and its channel, define the action when channel is ready to write
*/
public void register() {
- final List sinkConnections = context.getSinkConnections();
+ List sinkConnections = context.getSinkConnections();
this.writers = sinkConnections.stream().
map(sink -> {
- final Writer writer = ExtensionLoader.getExtensionLoader().
- loadExtension(sink.getType(), Writer.class);
- final Object dataSource = writer.getDataSource(sink);
- final Object connection = writer.connect(sink, writer.getDataSource(sink));
- final Channel channel = ExtensionLoader.getExtensionLoader().
- loadExtension(this.context.getProperties().getChannel(), Channel.class);
+ Writer writer = ExtensionLoader.getExtensionLoader(Writer.class).
+ loadExtension(sink.getType());
+ Object dataSource = writer.getDataSource(sink);
+ Object connection = writer.connect(sink, writer.getDataSource(sink));
+ Channel channel = ExtensionLoader.getExtensionLoader(Channel.class).
+ loadExtension(this.context.getProperties().getChannel());
channel.setOnReadListener(data -> writer.write(connection, dataSource, sink, data));
reader.getChannels().add(channel);
return Map.entry(writer, channel);
@@ -58,9 +58,9 @@ public void register() {
* init source behavior by sinks properties
*/
private void registerProperties() {
- final SrcConnection srcConnection = context.getSrcConnection();
- final SrcConnection.Properties srcConnectionProperties = srcConnection.getProperties();
- final List sinkConnections = context.getSinkConnections();
+ SrcConnection srcConnection = context.getSrcConnection();
+ SrcConnection.Properties srcConnectionProperties = srcConnection.getProperties();
+ List sinkConnections = context.getSinkConnections();
srcConnectionProperties.setCreate(
sinkConnections.stream().map(SinkConnection::getProperties).
anyMatch(SinkConnection.Properties::isCreate));
@@ -70,7 +70,7 @@ private void registerProperties() {
* start a transmission task
*/
public void start() {
- final Object dataSource = reader.getDataSource(context.getSrcConnection());
+ Object dataSource = reader.getDataSource(context.getSrcConnection());
Object connection = null;
try {
connection = reader.connect(context.getSrcConnection(), dataSource);
diff --git a/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/reader/JDBCReader.java b/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/reader/JDBCReader.java
index ce81285..06d41e3 100644
--- a/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/reader/JDBCReader.java
+++ b/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/reader/JDBCReader.java
@@ -19,7 +19,7 @@
import static de.xab.porter.common.constant.Constant.DEFAULT_BATCH_SIZE;
import static de.xab.porter.common.enums.SequenceEnum.*;
-import static de.xab.porter.common.util.Strings.notNullOrEmpty;
+import static de.xab.porter.common.util.Strings.notNullOrBlank;
import static java.sql.ResultSet.CONCUR_READ_ONLY;
import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
@@ -124,7 +124,7 @@ public Map getTableMetaData(Context context, Object connection)
String pkName = primaryKeys.getString("PK_NAME");
short keySeq = primaryKeys.getShort("KEY_SEQ");
columnMap.computeIfPresent(columnName, (ignored, column) -> {
- column.setPrimaryKey(notNullOrEmpty(pkName));
+ column.setPrimaryKey(notNullOrBlank(pkName));
column.setIndexName(pkName);
column.setPrimaryKeySeq(keySeq);
return column;
diff --git a/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/JDBCWriter.java b/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/JDBCWriter.java
index b886048..9b0b950 100644
--- a/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/JDBCWriter.java
+++ b/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/JDBCWriter.java
@@ -21,7 +21,7 @@
import java.util.stream.Collectors;
import static de.xab.porter.api.dataconnection.SinkConnection.Properties.*;
-import static de.xab.porter.common.util.Strings.notNullOrEmpty;
+import static de.xab.porter.common.util.Strings.notNullOrBlank;
/**
* common JDBC writer
@@ -207,9 +207,9 @@ protected String getColumns(List meta, String quote) {
"\t" + getColumnIdentifier(column.getName(), quote)
+ "\t" + getColumnType(column)
+ "\t"
- + ((notNullOrEmpty(column.getNullable()) && "NO".equals(column.getNullable()))
+ + ((notNullOrBlank(column.getNullable()) && "NO".equals(column.getNullable()))
? "NOT NULL" : "NULL")
- + (notNullOrEmpty(column.getComment())
+ + (notNullOrBlank(column.getComment())
? ("\tCOMMENT\t'" + column.getComment() + "'") : "")).
collect(Collectors.joining(", \n"));
}
diff --git a/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/PostgreSQLWriter.java b/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/PostgreSQLWriter.java
index 3949af7..6364b17 100644
--- a/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/PostgreSQLWriter.java
+++ b/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/PostgreSQLWriter.java
@@ -19,7 +19,7 @@
import java.util.logging.Logger;
import java.util.stream.Collectors;
-import static de.xab.porter.common.util.Strings.notNullOrEmpty;
+import static de.xab.porter.common.util.Strings.notNullOrBlank;
/**
* PostgreSQL JDBC writer
@@ -77,7 +77,7 @@ protected String getColumns(List meta, String quote) {
map(column ->
"\t" + getColumnIdentifier(column.getName(), quote)
+ "\t" + getColumnType(column)
- + "\t" + ((notNullOrEmpty(column.getNullable()) && "NO".equals(column.getNullable()))
+ + "\t" + ((notNullOrBlank(column.getNullable()) && "NO".equals(column.getNullable()))
? "NOT NULL" : "NULL")).
collect(Collectors.joining(", \n"));
}
@@ -85,7 +85,7 @@ protected String getColumns(List meta, String quote) {
@Override
protected String getAfterDDL(String tableIdentifier, String quote, List meta) {
return meta.stream().
- map(column -> notNullOrEmpty(column.getComment())
+ map(column -> notNullOrBlank(column.getComment())
? ";\nCOMMENT ON COLUMN " + tableIdentifier + "." + quote + column.getName() + quote
+ " IS '" + column.getComment() + "'" : "").
collect(Collectors.joining()) + ";";
diff --git a/transfer/jdbc/src/main/java/module-info.java b/transfer/jdbc/src/main/java/module-info.java
index b22ff34..7ccf207 100644
--- a/transfer/jdbc/src/main/java/module-info.java
+++ b/transfer/jdbc/src/main/java/module-info.java
@@ -3,7 +3,6 @@
requires de.xab.porter.common;
requires com.zaxxer.hikari;
requires org.postgresql.jdbc;
-// requires mysql.connector.java;
opens de.xab.porter.transfer.jdbc.reader to de.xab.porter.common;
opens de.xab.porter.transfer.jdbc.writer to de.xab.porter.common;
diff --git a/transfer/src/main/java/de/xab/porter/transfer/exception/ConnectionException.java b/transfer/src/main/java/de/xab/porter/transfer/exception/ConnectionException.java
new file mode 100644
index 0000000..1a4d5bf
--- /dev/null
+++ b/transfer/src/main/java/de/xab/porter/transfer/exception/ConnectionException.java
@@ -0,0 +1,25 @@
+package de.xab.porter.transfer.exception;
+
+/**
+ * Exception during connection
+ */
+public class ConnectionException extends Exception {
+ public ConnectionException() {
+ }
+
+ public ConnectionException(String message) {
+ super(message);
+ }
+
+ public ConnectionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ConnectionException(Throwable cause) {
+ super(cause);
+ }
+
+ public ConnectionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/transfer/src/main/java/module-info.java b/transfer/src/main/java/module-info.java
index d04577d..fab0f27 100644
--- a/transfer/src/main/java/module-info.java
+++ b/transfer/src/main/java/module-info.java
@@ -7,6 +7,7 @@
exports de.xab.porter.transfer.channel;
exports de.xab.porter.transfer.reader;
exports de.xab.porter.transfer.writer;
+ exports de.xab.porter.transfer.exception;
opens de.xab.porter.transfer.channel to de.xab.porter.common;
}
\ No newline at end of file