diff --git a/common/src/main/java/de/xab/porter/common/spi/ExtensionLoader.java b/common/src/main/java/de/xab/porter/common/spi/ExtensionLoader.java index a9fad9b..41bf16f 100644 --- a/common/src/main/java/de/xab/porter/common/spi/ExtensionLoader.java +++ b/common/src/main/java/de/xab/porter/common/spi/ExtensionLoader.java @@ -1,7 +1,5 @@ package de.xab.porter.common.spi; -import de.xab.porter.api.exception.PorterException; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -15,163 +13,148 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import static de.xab.porter.common.util.Strings.notNullOrEmpty; - /** * a extension loader can load any implements of one service, which is describe in resources/extensions. * ExtensionLoader load class using current class loader, construct new instance and inject type for consuming *

* extension must have these features: * implement at least one service, - * registered in resources/extensions + * registered in resources/META-INF/porter *

- * any service must opens to module {@link de.xab.porter.common} + * any extensions must opens to module {@link de.xab.porter.common} *

* comments after # will be ignored */ -public class ExtensionLoader { +public class ExtensionLoader { private static final String FOLDER = "META-INF/porter/"; - private static final Map, Map>> EXTENSION_HOLDER = new ConcurrentHashMap<>(); + private static final Map, ExtensionLoader> LOADERS = new ConcurrentHashMap<>(); + private final Map> extensions = new ConcurrentHashMap<>(); + private Class service; - public static ExtensionLoader getExtensionLoader() { - return new ExtensionLoader(); + public static ExtensionLoader getExtensionLoader(Class service) { + if (service == null) { + throw new IllegalArgumentException("service is null"); + } + if (!service.isInterface()) { + throw new IllegalArgumentException(String.format("service %s is not a interface", service)); + } + ExtensionLoader loader = (ExtensionLoader) LOADERS.get(service); + if (loader == null) { + LOADERS.putIfAbsent(service, new ExtensionLoader()); + loader = (ExtensionLoader) LOADERS.get(service); + loader.service = service; + } + return loader; } - public T loadExtension(String type, Class clazz) { - final Class driverClass = loadClass(type, clazz); - if (!driverClass.getModule().isOpen(driverClass.getPackageName(), this.getClass().getModule())) { - throw new PorterException(String.format( - "cannot access class %s at %s", driverClass.getName(), this.getClass().getModule())); + public T loadExtension(String type) { + Class clazz = loadExtensionClass(type); + if (!clazz.getModule().isOpen(clazz.getPackageName(), this.getClass().getModule())) { + throw new RuntimeException(String.format( + "cannot access class %s at %s", clazz.getName(), this.getClass().getModule())); } try { - final T instance = (T) driverClass.getConstructor().newInstance(); + T instance = clazz.getConstructor().newInstance(); injectExtension(instance, type); return instance; - } catch (InstantiationException | IllegalAccessException - | InvocationTargetException | NoSuchMethodException e) { - throw new PorterException("extension construct failed", e); + } catch (InstantiationException | NoSuchMethodException + | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(String.format("cannot create extension %s of %s", type, this.service), e); } } - /** - * load Class of extensions. PorterException will be thrown as load failed - * - * @param type exactly type of extension - * @param clazz Class of service - * @return Class of extension - */ - public Class loadClass(String type, Class clazz) { - Class subClass = EXTENSION_HOLDER.computeIfAbsent(clazz, ignored -> new ConcurrentHashMap<>()).get(type); - if (subClass != null) { - return subClass; - } - final String fileName = FOLDER + clazz.getName(); - final Enumeration urls; - final ClassLoader cl = findClassLoader(clazz); - try { - urls = cl.getResources(fileName); - } catch (IOException e) { - throw new PorterException(String.format("no implement(s) of %s found in %s", clazz.getName(), fileName)); + private Class loadExtensionClass(String type) { + Class extensionClass = this.extensions.get(type); + ClassLoader classLoader = findClassLoader(this.service); + if (extensionClass == null) { + synchronized (this.extensions) { + extensionClass = this.extensions.get(type); + if (extensionClass == null) { + String extensionName = null; + try { + extensionName = findExtensionName(classLoader, type); + extensionClass = (Class) classLoader.loadClass(extensionName); + } catch (IOException e) { + throw new TypeNotPresentException(type, e); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException( + String.format("cannot load class %s for %s: %s", extensionName, type, this.service)); + } + if (!implementedInterface(extensionClass, this.service)) { + throw new IllegalStateException(extensionName + " not implemented " + this.service); + } + this.extensions.put(type, extensionClass); + } + } } + return extensionClass; + } + + private String findExtensionName(ClassLoader classLoader, String type) throws IOException { + Enumeration urls; + String resourceFolder = FOLDER + this.service.getName(); + urls = classLoader.getResources(resourceFolder); while (urls != null && urls.hasMoreElements()) { - final URL url = urls.nextElement(); + URL url = urls.nextElement(); try (BufferedReader reader = new BufferedReader(new InputStreamReader( url.openStream(), StandardCharsets.UTF_8))) { String line; - final Map> specifyClassMap = EXTENSION_HOLDER.get(clazz); while ((line = reader.readLine()) != null) { - Map.Entry entry = parseTypeAndClass(line, clazz); - String driverType = entry.getKey(); - String className = entry.getValue(); - if (notNullOrEmpty(className)) { - subClass = loadClassByName(cl, clazz, className); - specifyClassMap.putIfAbsent(driverType, subClass); - EXTENSION_HOLDER.put(clazz, specifyClassMap); - if (type.equals(driverType)) { - return subClass; - } + Map.Entry tuple = parseLine(line); + if (tuple == null) { + continue; + } + String extensionType = tuple.getKey(); + String extensionName = tuple.getValue(); + if (type.equals(extensionType)) { + return extensionName; } } - } catch (IOException e) { - throw new PorterException(String.format("load extension class %s failed", clazz.getName()), e); } } - throw new PorterException(String.format("type `%s` of extension %s not found", type, clazz.getName())); + throw new IOException("no appropriate type found for " + this.service); } - private Map.Entry parseTypeAndClass(String line, Class clazz) { - String driverType; - String className; - String newLine = line; - final int ci = newLine.indexOf('#'); + private Map.Entry parseLine(String origin) { + String extensionType; + String extensionName; + String line = origin; + int ci = line.indexOf('#'); if (ci >= 0) { - newLine = newLine.substring(0, ci); + line = line.substring(0, ci); } - newLine = newLine.trim(); - if (newLine.length() > 0) { - final String[] split = newLine.split("="); + line = line.trim(); + if (line.length() > 0) { + final String[] split = line.split("="); if (split.length == 2) { - driverType = split[0]; - className = split[1]; - return Map.entry(driverType, className); - } - } - throw new PorterException(String.format( - "parse extension %s failed. expected `type=foo.bar.Extension`, got %s", clazz.getSimpleName(), line)); - } - - /** - * load class by given Class name - * - * @param loader classloader to load class - * @param service service of class implemented - * @param className class name to be loaded - * @return Class if load successfully - */ - private Class loadClassByName(ClassLoader loader, Class service, String className) { - try { - Class subClass = loader.loadClass(className); - if (implementedInterface(subClass, service)) { - return subClass; - } else { - throw new PorterException(String.format("no implement classes found of class %s", service)); + extensionType = split[0]; + extensionName = split[1]; + return Map.entry(extensionType, extensionName); } - } catch (ClassNotFoundException e) { - throw new PorterException(String.format("class %s not found", className), e); } + return null; } - /** - * whether extension defined in resources implemented service. - * - * @param subClass extension Class - * @param interfaceClass service Class - * @return true if extension if sub class of service - */ - private boolean implementedInterface(Class subClass, Class interfaceClass) { - Class currentClass = subClass; + private boolean implementedInterface(Class extensionClass, Class serviceClass) { + Class currentClass = extensionClass; boolean isImplemented = false; while (!isImplemented) { if (currentClass == Object.class) { break; } isImplemented = Arrays.stream(currentClass.getInterfaces()). - anyMatch(oneInterface -> oneInterface == interfaceClass); + anyMatch(oneInterface -> oneInterface == serviceClass); currentClass = currentClass.getSuperclass(); } return isImplemented; } - private void injectExtension(T instance, String type) { + private void injectExtension(T instance, String type) throws InvocationTargetException, IllegalAccessException { for (Method method : instance.getClass().getMethods()) { if (!isTypeSetter(method)) { continue; } - try { - method.invoke(instance, type); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new PorterException(String.format("instance %s inject %s failed", instance, type)); - } + method.invoke(instance, type); } } @@ -181,7 +164,7 @@ private boolean isTypeSetter(Method method) { && method.getParameterCount() == 1; } - private ClassLoader findClassLoader(Class clazz) { + private ClassLoader findClassLoader(Class clazz) { ClassLoader cl = Thread.currentThread().getContextClassLoader(); if (cl == null) { cl = clazz.getClassLoader(); @@ -191,4 +174,4 @@ private ClassLoader findClassLoader(Class clazz) { } return cl; } -} +} \ No newline at end of file diff --git a/common/src/main/java/de/xab/porter/common/util/Strings.java b/common/src/main/java/de/xab/porter/common/util/Strings.java index 7135ddb..76a64a8 100644 --- a/common/src/main/java/de/xab/porter/common/util/Strings.java +++ b/common/src/main/java/de/xab/porter/common/util/Strings.java @@ -7,7 +7,7 @@ public final class Strings { private Strings() { } - public static boolean notNullOrEmpty(String str) { + public static boolean notNullOrBlank(String str) { return str != null && !str.isBlank(); } } diff --git a/common/src/test/java/de/xab/porter/common/test/spi/SPITest.java b/common/src/test/java/de/xab/porter/common/test/spi/SPITest.java index 229b2dd..8b7d9cb 100644 --- a/common/src/test/java/de/xab/porter/common/test/spi/SPITest.java +++ b/common/src/test/java/de/xab/porter/common/test/spi/SPITest.java @@ -1,6 +1,5 @@ package de.xab.porter.common.test.spi; -import de.xab.porter.api.exception.PorterException; import de.xab.porter.common.spi.ExtensionLoader; import de.xab.porter.common.test.spi.service.MockService; import de.xab.porter.common.test.spi.service.UnregisteredService; @@ -15,31 +14,31 @@ public class SPITest { @Test public void testExists() { - MockService impl = ExtensionLoader.getExtensionLoader().loadExtension("impl", MockService.class); + MockService impl = ExtensionLoader.getExtensionLoader(MockService.class).loadExtension("impl"); assertEquals("hello world", impl.mock()); } @Test public void testNotExists() { - assertThrows(PorterException.class, () -> - ExtensionLoader.getExtensionLoader().loadExtension("none", MockService.class)); + assertThrows(TypeNotPresentException.class, () -> + ExtensionLoader.getExtensionLoader(MockService.class).loadExtension("none")); } @Test public void testTypoImpl() { - assertThrows(PorterException.class, () -> - ExtensionLoader.getExtensionLoader().loadExtension("typo", MockService.class)); + assertThrows(IllegalArgumentException.class, () -> + ExtensionLoader.getExtensionLoader(MockService.class).loadExtension("typo")); } @Test public void testNoImplemented() { - assertThrows(PorterException.class, () -> - ExtensionLoader.getExtensionLoader().loadExtension("noimpl", MockService.class)); + assertThrows(IllegalStateException.class, () -> + ExtensionLoader.getExtensionLoader(MockService.class).loadExtension("noimpl")); } @Test public void testNoneRegistered() { - assertThrows(PorterException.class, () -> - ExtensionLoader.getExtensionLoader().loadExtension("any", UnregisteredService.class)); + assertThrows(TypeNotPresentException.class, () -> + ExtensionLoader.getExtensionLoader(UnregisteredService.class).loadExtension("any")); } } diff --git a/common/src/test/java/de/xab/porter/common/test/string/StringTest.java b/common/src/test/java/de/xab/porter/common/test/string/StringTest.java index 71dab34..226db77 100644 --- a/common/src/test/java/de/xab/porter/common/test/string/StringTest.java +++ b/common/src/test/java/de/xab/porter/common/test/string/StringTest.java @@ -13,24 +13,24 @@ public class StringTest { @Test public void testNull() { String str = null; - assertFalse(Strings.notNullOrEmpty(str)); + assertFalse(Strings.notNullOrBlank(str)); } @Test public void testEmpty() { String str = ""; - assertFalse(Strings.notNullOrEmpty(str)); + assertFalse(Strings.notNullOrBlank(str)); } @Test public void testNotNullOrEmpty() { String str = "abc"; - Assertions.assertTrue(Strings.notNullOrEmpty(str)); + Assertions.assertTrue(Strings.notNullOrBlank(str)); } @Test public void testLongEmpty() { String str = " "; - assertFalse(Strings.notNullOrEmpty(str)); + assertFalse(Strings.notNullOrBlank(str)); } } diff --git a/core/src/main/java/de/xab/porter/core/Task.java b/core/src/main/java/de/xab/porter/core/Task.java index daf08fa..c8a005d 100644 --- a/core/src/main/java/de/xab/porter/core/Task.java +++ b/core/src/main/java/de/xab/porter/core/Task.java @@ -27,8 +27,8 @@ public Task(Context context) { } public void init() { - final SrcConnection srcConnection = context.getSrcConnection(); - this.reader = ExtensionLoader.getExtensionLoader().loadExtension(srcConnection.getType(), Reader.class); + SrcConnection srcConnection = context.getSrcConnection(); + this.reader = ExtensionLoader.getExtensionLoader(Reader.class).loadExtension(srcConnection.getType()); this.reader.setChannels(new ArrayList<>()); register(); //todo split @@ -38,15 +38,15 @@ public void init() { * construct relations among reader, writer and its channel, define the action when channel is ready to write */ public void register() { - final List sinkConnections = context.getSinkConnections(); + List sinkConnections = context.getSinkConnections(); this.writers = sinkConnections.stream(). map(sink -> { - final Writer writer = ExtensionLoader.getExtensionLoader(). - loadExtension(sink.getType(), Writer.class); - final Object dataSource = writer.getDataSource(sink); - final Object connection = writer.connect(sink, writer.getDataSource(sink)); - final Channel channel = ExtensionLoader.getExtensionLoader(). - loadExtension(this.context.getProperties().getChannel(), Channel.class); + Writer writer = ExtensionLoader.getExtensionLoader(Writer.class). + loadExtension(sink.getType()); + Object dataSource = writer.getDataSource(sink); + Object connection = writer.connect(sink, writer.getDataSource(sink)); + Channel channel = ExtensionLoader.getExtensionLoader(Channel.class). + loadExtension(this.context.getProperties().getChannel()); channel.setOnReadListener(data -> writer.write(connection, dataSource, sink, data)); reader.getChannels().add(channel); return Map.entry(writer, channel); @@ -58,9 +58,9 @@ public void register() { * init source behavior by sinks properties */ private void registerProperties() { - final SrcConnection srcConnection = context.getSrcConnection(); - final SrcConnection.Properties srcConnectionProperties = srcConnection.getProperties(); - final List sinkConnections = context.getSinkConnections(); + SrcConnection srcConnection = context.getSrcConnection(); + SrcConnection.Properties srcConnectionProperties = srcConnection.getProperties(); + List sinkConnections = context.getSinkConnections(); srcConnectionProperties.setCreate( sinkConnections.stream().map(SinkConnection::getProperties). anyMatch(SinkConnection.Properties::isCreate)); @@ -70,7 +70,7 @@ private void registerProperties() { * start a transmission task */ public void start() { - final Object dataSource = reader.getDataSource(context.getSrcConnection()); + Object dataSource = reader.getDataSource(context.getSrcConnection()); Object connection = null; try { connection = reader.connect(context.getSrcConnection(), dataSource); diff --git a/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/reader/JDBCReader.java b/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/reader/JDBCReader.java index ce81285..06d41e3 100644 --- a/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/reader/JDBCReader.java +++ b/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/reader/JDBCReader.java @@ -19,7 +19,7 @@ import static de.xab.porter.common.constant.Constant.DEFAULT_BATCH_SIZE; import static de.xab.porter.common.enums.SequenceEnum.*; -import static de.xab.porter.common.util.Strings.notNullOrEmpty; +import static de.xab.porter.common.util.Strings.notNullOrBlank; import static java.sql.ResultSet.CONCUR_READ_ONLY; import static java.sql.ResultSet.TYPE_FORWARD_ONLY; @@ -124,7 +124,7 @@ public Map getTableMetaData(Context context, Object connection) String pkName = primaryKeys.getString("PK_NAME"); short keySeq = primaryKeys.getShort("KEY_SEQ"); columnMap.computeIfPresent(columnName, (ignored, column) -> { - column.setPrimaryKey(notNullOrEmpty(pkName)); + column.setPrimaryKey(notNullOrBlank(pkName)); column.setIndexName(pkName); column.setPrimaryKeySeq(keySeq); return column; diff --git a/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/JDBCWriter.java b/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/JDBCWriter.java index b886048..9b0b950 100644 --- a/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/JDBCWriter.java +++ b/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/JDBCWriter.java @@ -21,7 +21,7 @@ import java.util.stream.Collectors; import static de.xab.porter.api.dataconnection.SinkConnection.Properties.*; -import static de.xab.porter.common.util.Strings.notNullOrEmpty; +import static de.xab.porter.common.util.Strings.notNullOrBlank; /** * common JDBC writer @@ -207,9 +207,9 @@ protected String getColumns(List meta, String quote) { "\t" + getColumnIdentifier(column.getName(), quote) + "\t" + getColumnType(column) + "\t" - + ((notNullOrEmpty(column.getNullable()) && "NO".equals(column.getNullable())) + + ((notNullOrBlank(column.getNullable()) && "NO".equals(column.getNullable())) ? "NOT NULL" : "NULL") - + (notNullOrEmpty(column.getComment()) + + (notNullOrBlank(column.getComment()) ? ("\tCOMMENT\t'" + column.getComment() + "'") : "")). collect(Collectors.joining(", \n")); } diff --git a/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/PostgreSQLWriter.java b/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/PostgreSQLWriter.java index 3949af7..6364b17 100644 --- a/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/PostgreSQLWriter.java +++ b/transfer/jdbc/src/main/java/de/xab/porter/transfer/jdbc/writer/PostgreSQLWriter.java @@ -19,7 +19,7 @@ import java.util.logging.Logger; import java.util.stream.Collectors; -import static de.xab.porter.common.util.Strings.notNullOrEmpty; +import static de.xab.porter.common.util.Strings.notNullOrBlank; /** * PostgreSQL JDBC writer @@ -77,7 +77,7 @@ protected String getColumns(List meta, String quote) { map(column -> "\t" + getColumnIdentifier(column.getName(), quote) + "\t" + getColumnType(column) - + "\t" + ((notNullOrEmpty(column.getNullable()) && "NO".equals(column.getNullable())) + + "\t" + ((notNullOrBlank(column.getNullable()) && "NO".equals(column.getNullable())) ? "NOT NULL" : "NULL")). collect(Collectors.joining(", \n")); } @@ -85,7 +85,7 @@ protected String getColumns(List meta, String quote) { @Override protected String getAfterDDL(String tableIdentifier, String quote, List meta) { return meta.stream(). - map(column -> notNullOrEmpty(column.getComment()) + map(column -> notNullOrBlank(column.getComment()) ? ";\nCOMMENT ON COLUMN " + tableIdentifier + "." + quote + column.getName() + quote + " IS '" + column.getComment() + "'" : ""). collect(Collectors.joining()) + ";"; diff --git a/transfer/jdbc/src/main/java/module-info.java b/transfer/jdbc/src/main/java/module-info.java index b22ff34..7ccf207 100644 --- a/transfer/jdbc/src/main/java/module-info.java +++ b/transfer/jdbc/src/main/java/module-info.java @@ -3,7 +3,6 @@ requires de.xab.porter.common; requires com.zaxxer.hikari; requires org.postgresql.jdbc; -// requires mysql.connector.java; opens de.xab.porter.transfer.jdbc.reader to de.xab.porter.common; opens de.xab.porter.transfer.jdbc.writer to de.xab.porter.common; diff --git a/transfer/src/main/java/de/xab/porter/transfer/exception/ConnectionException.java b/transfer/src/main/java/de/xab/porter/transfer/exception/ConnectionException.java new file mode 100644 index 0000000..1a4d5bf --- /dev/null +++ b/transfer/src/main/java/de/xab/porter/transfer/exception/ConnectionException.java @@ -0,0 +1,25 @@ +package de.xab.porter.transfer.exception; + +/** + * Exception during connection + */ +public class ConnectionException extends Exception { + public ConnectionException() { + } + + public ConnectionException(String message) { + super(message); + } + + public ConnectionException(String message, Throwable cause) { + super(message, cause); + } + + public ConnectionException(Throwable cause) { + super(cause); + } + + public ConnectionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/transfer/src/main/java/module-info.java b/transfer/src/main/java/module-info.java index d04577d..fab0f27 100644 --- a/transfer/src/main/java/module-info.java +++ b/transfer/src/main/java/module-info.java @@ -7,6 +7,7 @@ exports de.xab.porter.transfer.channel; exports de.xab.porter.transfer.reader; exports de.xab.porter.transfer.writer; + exports de.xab.porter.transfer.exception; opens de.xab.porter.transfer.channel to de.xab.porter.common; } \ No newline at end of file