Skip to content

Commit

Permalink
for #1172, add ExecutorDataMap to ShardingExecuteEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Sep 25, 2018
1 parent 7028929 commit 2d67276
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;

import java.sql.SQLException;
import java.util.ArrayList;
Expand All @@ -30,6 +31,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -90,11 +92,13 @@ public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteC

private <I, O> Collection<ListenableFuture<O>> asyncExecute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) {
Collection<ListenableFuture<O>> result = new ArrayList<>(inputs.size());
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
for (final I each : inputs) {
result.add(executorService.submit(new Callable<O>() {

@Override
public O call() throws SQLException {
ExecutorDataMap.setDataMap(dataMap);
return callback.execute(each, false);
}
}));
Expand Down Expand Up @@ -164,10 +168,12 @@ private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(fin
}

private <I, O> ListenableFuture<Collection<O>> asyncGroupExecute(final ShardingExecuteGroup<I> inputGroup, final ShardingGroupExecuteCallback<I, O> callback) {
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
return executorService.submit(new Callable<Collection<O>>() {

@Override
public Collection<O> call() throws SQLException {
ExecutorDataMap.setDataMap(dataMap);
return callback.execute(inputGroup.getInputs(), false);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.shardingsphere.core.executor.ShardingExecuteCallback;
import io.shardingsphere.core.executor.ShardingGroupExecuteCallback;
import io.shardingsphere.core.executor.StatementExecuteUnit;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorExceptionHandler;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaDataFactory;
Expand All @@ -38,7 +37,6 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
* Statement execute callback interface.
Expand All @@ -57,12 +55,8 @@ public abstract class SQLExecuteCallback<T> implements ShardingExecuteCallback<S

private final boolean isExceptionThrown;

private final Map<String, Object> dataMap;

private final EventBus shardingEventBus = ShardingEventBusInstance.getInstance();

private final SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();

@Override
public final T execute(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread) throws SQLException {
return execute0(statementExecuteUnit, isTrunkThread);
Expand All @@ -79,9 +73,9 @@ public final Collection<T> execute(final Collection<StatementExecuteUnit> statem

private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread) throws SQLException {
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
List<List<Object>> parameterSets = statementExecuteUnit.getRouteUnit().getSqlUnit().getParameterSets();
DataSourceMetaData dataSourceMetaData = DataSourceMetaDataFactory.newInstance(databaseType, statementExecuteUnit.getStatement().getConnection().getMetaData().getURL());
SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
for (List<Object> each : parameterSets) {
sqlExecutionHook.start(statementExecuteUnit.getRouteUnit().getDataSourceName(), statementExecuteUnit.getRouteUnit().getSqlUnit().getSql(), each, dataSourceMetaData, isTrunkThread);
// TODO remove after BED removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.shardingsphere.core.constant.ConnectionMode;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.executor.sql.execute.SQLExecuteCallback;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorExceptionHandler;
import io.shardingsphere.core.executor.sql.prepare.SQLExecutePrepareCallback;
import io.shardingsphere.core.jdbc.core.connection.ShardingConnection;
Expand All @@ -42,7 +41,6 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
Expand Down Expand Up @@ -158,8 +156,7 @@ private void handleNewRouteUnits(final Collection<BatchRouteUnit> newRouteUnits)
*/
public int[] executeBatch() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
SQLExecuteCallback<int[]> callback = new SQLExecuteCallback<int[]>(getDatabaseType(), getSqlType(), isExceptionThrown, dataMap) {
SQLExecuteCallback<int[]> callback = new SQLExecuteCallback<int[]>(getDatabaseType(), getSqlType(), isExceptionThrown) {

@Override
protected int[] executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.shardingsphere.core.executor.sql.execute.SQLExecuteCallback;
import io.shardingsphere.core.executor.sql.execute.result.MemoryQueryResult;
import io.shardingsphere.core.executor.sql.execute.result.StreamQueryResult;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorExceptionHandler;
import io.shardingsphere.core.executor.sql.prepare.SQLExecutePrepareCallback;
import io.shardingsphere.core.jdbc.core.connection.ShardingConnection;
Expand All @@ -37,7 +36,6 @@
import java.sql.Statement;
import java.util.Collection;
import java.util.List;
import java.util.Map;

/**
* Prepared statement executor.
Expand Down Expand Up @@ -99,8 +97,7 @@ private PreparedStatement createPreparedStatement(final Connection connection, f
*/
public List<QueryResult> executeQuery() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), getSqlType(), isExceptionThrown, dataMap) {
SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), getSqlType(), isExceptionThrown) {

@Override
protected QueryResult executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException {
Expand All @@ -125,8 +122,7 @@ private QueryResult getQueryResult(final StatementExecuteUnit statementExecuteUn
*/
public int executeUpdate() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
SQLExecuteCallback<Integer> executeCallback = new SQLExecuteCallback<Integer>(getDatabaseType(), getSqlType(), isExceptionThrown, dataMap) {
SQLExecuteCallback<Integer> executeCallback = new SQLExecuteCallback<Integer>(getDatabaseType(), getSqlType(), isExceptionThrown) {

@Override
protected Integer executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException {
Expand All @@ -153,8 +149,7 @@ private int accumulate(final List<Integer> results) {
*/
public boolean execute() throws SQLException {
boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
SQLExecuteCallback<Boolean> executeCallback = new SQLExecuteCallback<Boolean>(getDatabaseType(), getSqlType(), isExceptionThrown, dataMap) {
SQLExecuteCallback<Boolean> executeCallback = new SQLExecuteCallback<Boolean>(getDatabaseType(), getSqlType(), isExceptionThrown) {

@Override
protected Boolean executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.shardingsphere.core.executor.sql.execute.SQLExecuteCallback;
import io.shardingsphere.core.executor.sql.execute.result.MemoryQueryResult;
import io.shardingsphere.core.executor.sql.execute.result.StreamQueryResult;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorExceptionHandler;
import io.shardingsphere.core.executor.sql.prepare.SQLExecutePrepareCallback;
import io.shardingsphere.core.jdbc.core.connection.ShardingConnection;
Expand All @@ -35,7 +34,6 @@
import java.sql.Statement;
import java.util.Collection;
import java.util.List;
import java.util.Map;

/**
* Statement executor.
Expand Down Expand Up @@ -88,8 +86,7 @@ public StatementExecuteUnit createStatementExecuteUnit(final Connection connecti
*/
public List<QueryResult> executeQuery() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), getSqlType(), isExceptionThrown, dataMap) {
SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), getSqlType(), isExceptionThrown) {

@Override
protected QueryResult executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException {
Expand Down Expand Up @@ -174,8 +171,7 @@ public int executeUpdate(final Statement statement, final String sql) throws SQL

private int executeUpdate(final Updater updater) throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
SQLExecuteCallback<Integer> executeCallback = new SQLExecuteCallback<Integer>(getDatabaseType(), getSqlType(), isExceptionThrown, dataMap) {
SQLExecuteCallback<Integer> executeCallback = new SQLExecuteCallback<Integer>(getDatabaseType(), getSqlType(), isExceptionThrown) {

@Override
protected Integer executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException {
Expand Down Expand Up @@ -263,8 +259,7 @@ public boolean execute(final Statement statement, final String sql) throws SQLEx

private boolean execute(final Executor executor) throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
SQLExecuteCallback<Boolean> executeCallback = new SQLExecuteCallback<Boolean>(getDatabaseType(), getSqlType(), isExceptionThrown, dataMap) {
SQLExecuteCallback<Boolean> executeCallback = new SQLExecuteCallback<Boolean>(getDatabaseType(), getSqlType(), isExceptionThrown) {

@Override
protected Boolean executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,20 @@ public void start(final String dataSourceName, final String sql, final List<Obje

@Override
public void finishSuccess() {
span.finish();
if (null != span) {
span.finish();
}
if (null != activeSpan) {
activeSpan.deactivate();
}
}

@Override
public void finishFailure(final Exception cause) {
ShardingErrorSpan.setError(span, cause);
span.finish();
if (null != span) {
ShardingErrorSpan.setError(span, cause);
span.finish();
}
if (null != activeSpan) {
activeSpan.deactivate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.shardingsphere.core.executor.sql.execute.SQLExecuteTemplate;
import io.shardingsphere.core.executor.sql.execute.result.MemoryQueryResult;
import io.shardingsphere.core.executor.sql.execute.result.StreamQueryResult;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorExceptionHandler;
import io.shardingsphere.core.executor.sql.prepare.SQLExecutePrepareCallback;
import io.shardingsphere.core.executor.sql.prepare.SQLExecutePrepareTemplate;
Expand Down Expand Up @@ -65,7 +64,6 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
* SQL Execute engine for JDBC.
Expand Down Expand Up @@ -111,12 +109,11 @@ public ExecuteResponse execute(final SQLRouteResult routeResult) throws SQLExcep
boolean isReturnGeneratedKeys = routeResult.getSqlStatement() instanceof InsertStatement;
SQLType sqlType = routeResult.getSqlStatement().getType();
boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
Collection<ShardingExecuteGroup<StatementExecuteUnit>> sqlExecuteGroups =
sqlExecutePrepareTemplate.getExecuteUnitGroups(routeResult.getRouteUnits(), new ProxyJDBCExecutePrepareCallback(isReturnGeneratedKeys));
Collection<ExecuteResponseUnit> executeResponseUnits = sqlExecuteTemplate.executeGroup((Collection) sqlExecuteGroups,
new FirstProxyJDBCExecuteCallback(sqlType, isExceptionThrown, dataMap, isReturnGeneratedKeys),
new ProxyJDBCExecuteCallback(sqlType, isExceptionThrown, dataMap, isReturnGeneratedKeys));
new FirstProxyJDBCExecuteCallback(sqlType, isExceptionThrown, isReturnGeneratedKeys),
new ProxyJDBCExecuteCallback(sqlType, isExceptionThrown, isReturnGeneratedKeys));
ExecuteResponseUnit firstExecuteResponseUnit = executeResponseUnits.iterator().next();
return firstExecuteResponseUnit instanceof ExecuteQueryResponseUnit
? getExecuteQueryResponse(((ExecuteQueryResponseUnit) firstExecuteResponseUnit).getQueryResponsePackets(), executeResponseUnits) : new ExecuteUpdateResponse(executeResponseUnits);
Expand Down Expand Up @@ -200,8 +197,8 @@ private final class FirstProxyJDBCExecuteCallback extends SQLExecuteCallback<Exe

private boolean hasMetaData;

private FirstProxyJDBCExecuteCallback(final SQLType sqlType, final boolean isExceptionThrown, final Map<String, Object> dataMap, final boolean isReturnGeneratedKeys) {
super(DatabaseType.MySQL, sqlType, isExceptionThrown, dataMap);
private FirstProxyJDBCExecuteCallback(final SQLType sqlType, final boolean isExceptionThrown, final boolean isReturnGeneratedKeys) {
super(DatabaseType.MySQL, sqlType, isExceptionThrown);
this.isReturnGeneratedKeys = isReturnGeneratedKeys;
}

Expand All @@ -222,8 +219,8 @@ private final class ProxyJDBCExecuteCallback extends SQLExecuteCallback<ExecuteR

private final boolean isReturnGeneratedKeys;

private ProxyJDBCExecuteCallback(final SQLType sqlType, final boolean isExceptionThrown, final Map<String, Object> dataMap, final boolean isReturnGeneratedKeys) {
super(DatabaseType.MySQL, sqlType, isExceptionThrown, dataMap);
private ProxyJDBCExecuteCallback(final SQLType sqlType, final boolean isExceptionThrown, final boolean isReturnGeneratedKeys) {
super(DatabaseType.MySQL, sqlType, isExceptionThrown);
this.isReturnGeneratedKeys = isReturnGeneratedKeys;
}

Expand Down

0 comments on commit 2d67276

Please sign in to comment.