From f38351a8bb0e74cbfc0ac3418ff4377548c6ea27 Mon Sep 17 00:00:00 2001 From: terrymanu Date: Wed, 19 Sep 2018 15:06:50 +0800 Subject: [PATCH] for #1205, Get connection sync to prevent dead lock for sharding-jdbc --- .../prepare/SQLExecutePrepareCallback.java | 5 +- .../prepare/SQLExecutePrepareTemplate.java | 9 ++-- .../BatchPreparedStatementExecutor.java | 6 +-- .../executor/PreparedStatementExecutor.java | 6 +-- .../core/executor/StatementExecutor.java | 6 +-- .../adapter/AbstractConnectionAdapter.java | 48 +++++++++++++------ .../jdbc/execute/JDBCExecuteEngine.java | 5 +- 7 files changed, 51 insertions(+), 34 deletions(-) diff --git a/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/prepare/SQLExecutePrepareCallback.java b/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/prepare/SQLExecutePrepareCallback.java index 740c43bc74a96..ba60a15d13be0 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/prepare/SQLExecutePrepareCallback.java +++ b/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/prepare/SQLExecutePrepareCallback.java @@ -23,6 +23,7 @@ import java.sql.Connection; import java.sql.SQLException; +import java.util.List; /** * SQL execute prepare callback. @@ -36,11 +37,11 @@ public interface SQLExecutePrepareCallback { * Get connection. * * @param dataSourceName data source name - * @param index index of connection + * @param connectionSize connection size * @return connection * @throws SQLException SQL exception */ - Connection getConnection(String dataSourceName, int index) throws SQLException; + List getConnections(String dataSourceName, int connectionSize) throws SQLException; /** * Create SQL execute unit. diff --git a/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/prepare/SQLExecutePrepareTemplate.java b/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/prepare/SQLExecutePrepareTemplate.java index b682660dfee50..48a10529b3e51 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/prepare/SQLExecutePrepareTemplate.java +++ b/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/prepare/SQLExecutePrepareTemplate.java @@ -78,10 +78,11 @@ private List> getSQLExecuteGroups( final String dataSourceName, final List sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException { List> result = new LinkedList<>(); int desiredPartitionSize = Math.max(sqlUnits.size() / maxConnectionsSizePerQuery, 1); - int index = 0; - for (List each : Lists.partition(sqlUnits, desiredPartitionSize)) { - // TODO get connection sync to prevent dead lock - result.add(getSQLExecuteGroup(callback.getConnection(dataSourceName, index++), dataSourceName, each, callback)); + List> sqlUnitGroups = Lists.partition(sqlUnits, desiredPartitionSize); + List connections = callback.getConnections(dataSourceName, sqlUnitGroups.size()); + int count = 0; + for (List each : sqlUnitGroups) { + result.add(getSQLExecuteGroup(connections.get(count++), dataSourceName, each, callback)); } return result; } diff --git a/sharding-jdbc/src/main/java/io/shardingsphere/core/executor/BatchPreparedStatementExecutor.java b/sharding-jdbc/src/main/java/io/shardingsphere/core/executor/BatchPreparedStatementExecutor.java index 413d4303a02a3..fd96f1c1f34a4 100644 --- a/sharding-jdbc/src/main/java/io/shardingsphere/core/executor/BatchPreparedStatementExecutor.java +++ b/sharding-jdbc/src/main/java/io/shardingsphere/core/executor/BatchPreparedStatementExecutor.java @@ -86,10 +86,8 @@ public RouteUnit apply(final BatchRouteUnit input) { }), new SQLExecutePrepareCallback() { @Override - public Connection getConnection(final String dataSourceName, final int index) throws SQLException { - Connection conn = BatchPreparedStatementExecutor.super.getConnection().getConnection(dataSourceName, index); - getConnections().add(conn); - return conn; + public List getConnections(final String dataSourceName, final int connectionSize) throws SQLException { + return BatchPreparedStatementExecutor.super.getConnection().getConnections(dataSourceName, connectionSize); } @Override diff --git a/sharding-jdbc/src/main/java/io/shardingsphere/core/executor/PreparedStatementExecutor.java b/sharding-jdbc/src/main/java/io/shardingsphere/core/executor/PreparedStatementExecutor.java index 7feb29c026cb3..f9e55ea62f36f 100644 --- a/sharding-jdbc/src/main/java/io/shardingsphere/core/executor/PreparedStatementExecutor.java +++ b/sharding-jdbc/src/main/java/io/shardingsphere/core/executor/PreparedStatementExecutor.java @@ -73,10 +73,8 @@ private Collection> obtainExecuteGrou return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() { @Override - public Connection getConnection(final String dataSourceName, final int index) throws SQLException { - Connection conn = PreparedStatementExecutor.super.getConnection().getConnection(dataSourceName, index); - getConnections().add(conn); - return conn; + public List getConnections(final String dataSourceName, final int connectionSize) throws SQLException { + return PreparedStatementExecutor.super.getConnection().getConnections(dataSourceName, connectionSize); } @Override diff --git a/sharding-jdbc/src/main/java/io/shardingsphere/core/executor/StatementExecutor.java b/sharding-jdbc/src/main/java/io/shardingsphere/core/executor/StatementExecutor.java index 486c0d919b6c8..78c560299c004 100644 --- a/sharding-jdbc/src/main/java/io/shardingsphere/core/executor/StatementExecutor.java +++ b/sharding-jdbc/src/main/java/io/shardingsphere/core/executor/StatementExecutor.java @@ -67,10 +67,8 @@ private Collection> obtainExecuteGrou return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() { @Override - public Connection getConnection(final String dataSourceName, final int index) throws SQLException { - Connection conn = StatementExecutor.super.getConnection().getConnection(dataSourceName, index); - getConnections().add(conn); - return conn; + public List getConnections(final String dataSourceName, final int connectionSize) throws SQLException { + return StatementExecutor.super.getConnection().getConnections(dataSourceName, connectionSize); } @SuppressWarnings("MagicConstant") diff --git a/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/adapter/AbstractConnectionAdapter.java b/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/adapter/AbstractConnectionAdapter.java index 796145fbfc743..c7a41a1f0dce2 100644 --- a/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/adapter/AbstractConnectionAdapter.java +++ b/sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/adapter/AbstractConnectionAdapter.java @@ -46,7 +46,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLWarning; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; /** @@ -82,36 +84,54 @@ public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOpera * @throws SQLException SQL exception */ public final Connection getConnection(final String dataSourceName) throws SQLException { - return getConnection(dataSourceName, 0); + return getConnections(dataSourceName, 1).get(0); } /** - * Get database connection. + * Get database connections. * * @param dataSourceName data source name - * @param index index of connection - * @return database connection + * @param connectionSize size of connection list to be get + * @return database connections * @throws SQLException SQL exception */ - public final Connection getConnection(final String dataSourceName, final int index) throws SQLException { + public final List getConnections(final String dataSourceName, final int connectionSize) throws SQLException { ShardingEventBusInstance.getInstance().post(new GetConnectionStartEvent(dataSourceName)); DataSource dataSource = getDataSourceMap().get(dataSourceName); Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName); Collection connections = cachedConnections.get(dataSourceName); - if (cachedConnections.get(dataSourceName).size() > index) { - Connection result = cachedConnections.get(dataSourceName).toArray(new Connection[connections.size()])[index]; - postGetConnectionEvent(result); - return result; + List result; + if (connections.size() >= connectionSize) { + result = new ArrayList<>(cachedConnections.get(dataSourceName)).subList(0, connectionSize); + } else if (!connections.isEmpty()) { + result = new ArrayList<>(connectionSize); + result.addAll(connections); + List newConnections = createConnections(dataSource, connectionSize - connections.size()); + result.addAll(newConnections); + cachedConnections.putAll(dataSourceName, newConnections); + } else { + result = new ArrayList<>(createConnections(dataSource, connectionSize)); + cachedConnections.putAll(dataSourceName, result); } - Connection result = dataSource.getConnection(); - cachedConnections.put(dataSourceName, result); - replayMethodsInvocation(result); postGetConnectionEvent(result); return result; } - private void postGetConnectionEvent(final Connection connection) throws SQLException { - GetConnectionEvent finishEvent = new GetConnectionFinishEvent(DataSourceMetaDataFactory.newInstance(databaseType, connection.getMetaData().getURL())); + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + private synchronized List createConnections(final DataSource dataSource, final int connectionSize) throws SQLException { + List result = new ArrayList<>(connectionSize); + synchronized (dataSource) { + for (int i = 0; i < connectionSize; i++) { + Connection connection = dataSource.getConnection(); + replayMethodsInvocation(connection); + result.add(connection); + } + } + return result; + } + + private void postGetConnectionEvent(final List connections) throws SQLException { + GetConnectionEvent finishEvent = new GetConnectionFinishEvent(DataSourceMetaDataFactory.newInstance(databaseType, connections.get(0).getMetaData().getURL())); finishEvent.setExecuteSuccess(); ShardingEventBusInstance.getInstance().post(finishEvent); } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/JDBCExecuteEngine.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/JDBCExecuteEngine.java index 0ec7fbedf19f5..9d5fb088d3d36 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/JDBCExecuteEngine.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/JDBCExecuteEngine.java @@ -61,6 +61,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Collection; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -175,8 +176,8 @@ private final class ConnectionStrictlySQLExecutePrepareCallback implements SQLEx private final boolean isReturnGeneratedKeys; @Override - public Connection getConnection(final String dataSourceName, final int index) throws SQLException { - return getBackendConnection().getConnection(dataSourceName); + public List getConnections(final String dataSourceName, final int connectionSize) throws SQLException { + return Collections.singletonList(getBackendConnection().getConnection(dataSourceName)); } @Override