Skip to content

Commit

Permalink
feat: 增加 reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
arjenzhou committed Aug 29, 2021
1 parent 9eda588 commit f5af84f
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 13 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/de/xab/porter/core/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public class Session {

public void start(Context context) {
final Task task = new Task(context);
Task task = new Task(context);
task.init();
task.start();
}
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/java/de/xab/porter/core/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import de.xab.porter.transfer.channel.Channel;
import de.xab.porter.transfer.exception.ConnectionException;
import de.xab.porter.transfer.reader.Reader;
import de.xab.porter.transfer.reporter.Reporter;
import de.xab.porter.transfer.writer.Writer;

import java.util.ArrayList;
Expand Down Expand Up @@ -44,6 +45,8 @@ public void init() {
*/
public void register() {
List<SinkConnection> sinkConnections = context.getSinkConnections();
Reporter reporter = ExtensionLoader.getExtensionLoader(Reporter.class).
loadExtension(null, "default");
writers = sinkConnections.stream().
map(sink -> {
Writer<?> writer = ExtensionLoader.getExtensionLoader(Writer.class).
Expand All @@ -56,7 +59,10 @@ public void register() {
}
Channel channel = ExtensionLoader.getExtensionLoader(Channel.class).
loadExtension(null, this.context.getProperties().getChannel());
channel.setOnReadListener(writer::write);
channel.setOnReadListener(data -> {
writer.write(data);
reporter.report(data);
});
reader.getChannels().add(channel);
return Map.entry(writer, channel);
}).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ public void doRead(Map<String, Column> columnMap) {
Instant start = Instant.now();
long batch = 0L;
try {
if (!connection.isReadOnly()) {
connection.setReadOnly(true);
}
connection.setReadOnly(true);
statement = getStatement();
resultSet = statement.executeQuery(properties.getSql());
ResultSetMetaData metaData = resultSet.getMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,4 @@ public interface Channel {
* register a listener for channel, eligible channel wil notify writer to read data from it
*/
void setOnReadListener(Consumer<Result<?>> listener);

void setType(String type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
public class DefaultChannel implements Channel {
private final BlockingQueue<Result<?>> resultQueue = new LinkedBlockingQueue<>();
private String type;
private Consumer<Result<?>> onReadListener;

public void push(Result<?> result) {
Expand Down Expand Up @@ -41,9 +40,4 @@ public void notifyWriter() {
public void setOnReadListener(Consumer<Result<?>> listener) {
this.onReadListener = listener;
}

@Override
public void setType(String type) {
this.type = type;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package de.xab.porter.transfer.reporter;

import de.xab.porter.api.Result;
import de.xab.porter.common.util.Loggers;

import java.util.logging.Level;
import java.util.logging.Logger;

public class DefaultReporter implements Reporter {
Logger logger = Loggers.getLogger("REPORTER");

@Override
public <T> void report(T t) {
Result<?> result = (Result<?>) t;
logger.log(Level.INFO, String.format("wrote %sth batch", result.getSequenceNum()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package de.xab.porter.transfer.reporter;

public interface Reporter {
<T> void report(T t);
}
2 changes: 2 additions & 0 deletions transfer/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

exports de.xab.porter.transfer.connector;
exports de.xab.porter.transfer.channel;
exports de.xab.porter.transfer.reporter;
exports de.xab.porter.transfer.reader;
exports de.xab.porter.transfer.writer;
exports de.xab.porter.transfer.exception;

opens de.xab.porter.transfer.channel to de.xab.porter.common;
opens de.xab.porter.transfer.reporter to de.xab.porter.common;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
default=de.xab.porter.transfer.reporter.DefaultReporter

0 comments on commit f5af84f

Please sign in to comment.