diff --git a/pom.xml b/pom.xml
index d6447b25..6f090b85 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,5 +1,5 @@
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
org.nlpcn
elasticsearch-sql
@@ -98,6 +98,14 @@
1.2.16
provided
+
+
+ javax.servlet
+ servlet-api
+ 2.5
+ provided
+
+
@@ -246,6 +254,4 @@
-
-
\ No newline at end of file
diff --git a/src/main/java/com/alibaba/druid/pool/ElasticSearchConnection.java b/src/main/java/com/alibaba/druid/pool/ElasticSearchConnection.java
new file mode 100644
index 00000000..0a413808
--- /dev/null
+++ b/src/main/java/com/alibaba/druid/pool/ElasticSearchConnection.java
@@ -0,0 +1,815 @@
+package com.alibaba.druid.pool;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+import java.util.concurrent.Executor;
+
+/**
+ * Created by allwefantasy on 8/30/16.
+ */
+public class ElasticSearchConnection implements Connection {
+
+ private Client client;
+
+ public ElasticSearchConnection(String jdbcUrl) {
+
+
+ Settings settings = Settings.builder().put("client.transport.ignore_cluster_name", true).build();
+ try {
+ TransportClient transportClient = TransportClient.builder().settings(settings).build();
+
+ String hostAndPortArrayStr = jdbcUrl.split("/")[2];
+ String[] hostAndPortArray = hostAndPortArrayStr.split(",");
+
+ for (String hostAndPort : hostAndPortArray) {
+ String host = hostAndPort.split(":")[0];
+ String port = hostAndPort.split(":")[1];
+ transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), Integer.parseInt(port)));
+ }
+ client = transportClient;
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public Client getClient() {
+ return client;
+ }
+
+ @Override
+ public Statement createStatement() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql) throws SQLException {
+ return new PreparedStatement() {
+ @Override
+ public ResultSet executeQuery() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public void setNull(int parameterIndex, int sqlType) throws SQLException {
+
+ }
+
+ @Override
+ public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+
+ }
+
+ @Override
+ public void setByte(int parameterIndex, byte x) throws SQLException {
+
+ }
+
+ @Override
+ public void setShort(int parameterIndex, short x) throws SQLException {
+
+ }
+
+ @Override
+ public void setInt(int parameterIndex, int x) throws SQLException {
+
+ }
+
+ @Override
+ public void setLong(int parameterIndex, long x) throws SQLException {
+
+ }
+
+ @Override
+ public void setFloat(int parameterIndex, float x) throws SQLException {
+
+ }
+
+ @Override
+ public void setDouble(int parameterIndex, double x) throws SQLException {
+
+ }
+
+ @Override
+ public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
+
+ }
+
+ @Override
+ public void setString(int parameterIndex, String x) throws SQLException {
+
+ }
+
+ @Override
+ public void setBytes(int parameterIndex, byte[] x) throws SQLException {
+
+ }
+
+ @Override
+ public void setDate(int parameterIndex, Date x) throws SQLException {
+
+ }
+
+ @Override
+ public void setTime(int parameterIndex, Time x) throws SQLException {
+
+ }
+
+ @Override
+ public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
+
+ }
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
+
+ }
+
+ @Override
+ public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
+
+ }
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
+
+ }
+
+ @Override
+ public void clearParameters() throws SQLException {
+
+ }
+
+ @Override
+ public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
+
+ }
+
+ @Override
+ public void setObject(int parameterIndex, Object x) throws SQLException {
+
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void addBatch() throws SQLException {
+
+ }
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException {
+
+ }
+
+ @Override
+ public void setRef(int parameterIndex, Ref x) throws SQLException {
+
+ }
+
+ @Override
+ public void setBlob(int parameterIndex, Blob x) throws SQLException {
+
+ }
+
+ @Override
+ public void setClob(int parameterIndex, Clob x) throws SQLException {
+
+ }
+
+ @Override
+ public void setArray(int parameterIndex, Array x) throws SQLException {
+
+ }
+
+ @Override
+ public ResultSetMetaData getMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
+
+ }
+
+ @Override
+ public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
+
+ }
+
+ @Override
+ public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
+
+ }
+
+ @Override
+ public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
+
+ }
+
+ @Override
+ public void setURL(int parameterIndex, URL x) throws SQLException {
+
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setRowId(int parameterIndex, RowId x) throws SQLException {
+
+ }
+
+ @Override
+ public void setNString(int parameterIndex, String value) throws SQLException {
+
+ }
+
+ @Override
+ public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
+
+ }
+
+ @Override
+ public void setNClob(int parameterIndex, NClob value) throws SQLException {
+
+ }
+
+ @Override
+ public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
+
+ }
+
+ @Override
+ public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
+
+ }
+
+ @Override
+ public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
+
+ }
+
+ @Override
+ public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
+
+ }
+
+ @Override
+ public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
+
+ }
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
+
+ }
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
+
+ }
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
+
+ }
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
+
+ }
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
+
+ }
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
+
+ }
+
+ @Override
+ public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
+
+ }
+
+ @Override
+ public void setClob(int parameterIndex, Reader reader) throws SQLException {
+
+ }
+
+ @Override
+ public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
+
+ }
+
+ @Override
+ public void setNClob(int parameterIndex, Reader reader) throws SQLException {
+
+ }
+
+ @Override
+ public ResultSet executeQuery(String sql) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public int executeUpdate(String sql) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ client.close();
+ }
+
+ @Override
+ public int getMaxFieldSize() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public void setMaxFieldSize(int max) throws SQLException {
+
+ }
+
+ @Override
+ public int getMaxRows() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public void setMaxRows(int max) throws SQLException {
+
+ }
+
+ @Override
+ public void setEscapeProcessing(boolean enable) throws SQLException {
+
+ }
+
+ @Override
+ public int getQueryTimeout() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public void setQueryTimeout(int seconds) throws SQLException {
+
+ }
+
+ @Override
+ public void cancel() throws SQLException {
+
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+
+ }
+
+ @Override
+ public void setCursorName(String name) throws SQLException {
+
+ }
+
+ @Override
+ public boolean execute(String sql) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public ResultSet getResultSet() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public int getUpdateCount() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public boolean getMoreResults() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setFetchDirection(int direction) throws SQLException {
+
+ }
+
+ @Override
+ public int getFetchDirection() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public void setFetchSize(int rows) throws SQLException {
+
+ }
+
+ @Override
+ public int getFetchSize() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public int getResultSetConcurrency() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public int getResultSetType() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public void addBatch(String sql) throws SQLException {
+
+ }
+
+ @Override
+ public void clearBatch() throws SQLException {
+
+ }
+
+ @Override
+ public int[] executeBatch() throws SQLException {
+ return new int[0];
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean getMoreResults(int current) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public ResultSet getGeneratedKeys() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean execute(String sql, String[] columnNames) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public int getResultSetHoldability() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setPoolable(boolean poolable) throws SQLException {
+
+ }
+
+ @Override
+ public boolean isPoolable() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void closeOnCompletion() throws SQLException {
+
+ }
+
+ @Override
+ public boolean isCloseOnCompletion() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public T unwrap(Class iface) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isWrapperFor(Class> iface) throws SQLException {
+ return false;
+ }
+ };
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public String nativeSQL(String sql) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setAutoCommit(boolean autoCommit) throws SQLException {
+
+ }
+
+ @Override
+ public boolean getAutoCommit() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void commit() throws SQLException {
+
+ }
+
+ @Override
+ public void rollback() throws SQLException {
+
+ }
+
+ @Override
+ public void close() throws SQLException {
+
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public DatabaseMetaData getMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setReadOnly(boolean readOnly) throws SQLException {
+
+ }
+
+ @Override
+ public boolean isReadOnly() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setCatalog(String catalog) throws SQLException {
+
+ }
+
+ @Override
+ public String getCatalog() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setTransactionIsolation(int level) throws SQLException {
+
+ }
+
+ @Override
+ public int getTransactionIsolation() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+
+ }
+
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Map> getTypeMap() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setTypeMap(Map> map) throws SQLException {
+
+ }
+
+ @Override
+ public void setHoldability(int holdability) throws SQLException {
+
+ }
+
+ @Override
+ public int getHoldability() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public Savepoint setSavepoint() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Savepoint setSavepoint(String name) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void rollback(Savepoint savepoint) throws SQLException {
+
+ }
+
+ @Override
+ public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+
+ }
+
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Clob createClob() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Blob createBlob() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public NClob createNClob() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public SQLXML createSQLXML() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isValid(int timeout) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setClientInfo(String name, String value) throws SQLClientInfoException {
+
+ }
+
+ @Override
+ public void setClientInfo(Properties properties) throws SQLClientInfoException {
+
+ }
+
+ @Override
+ public String getClientInfo(String name) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Properties getClientInfo() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setSchema(String schema) throws SQLException {
+
+ }
+
+ @Override
+ public String getSchema() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void abort(Executor executor) throws SQLException {
+
+ }
+
+ @Override
+ public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+
+ }
+
+ @Override
+ public int getNetworkTimeout() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public T unwrap(Class iface) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isWrapperFor(Class> iface) throws SQLException {
+ return false;
+ }
+}
diff --git a/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSource.java b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSource.java
new file mode 100644
index 00000000..51d27074
--- /dev/null
+++ b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSource.java
@@ -0,0 +1,2698 @@
+package com.alibaba.druid.pool;
+
+import com.alibaba.druid.Constants;
+import com.alibaba.druid.TransactionTimeoutException;
+import com.alibaba.druid.VERSION;
+import com.alibaba.druid.filter.AutoLoad;
+import com.alibaba.druid.filter.Filter;
+import com.alibaba.druid.pool.vendor.*;
+import com.alibaba.druid.proxy.DruidDriver;
+import com.alibaba.druid.proxy.jdbc.DataSourceProxyConfig;
+import com.alibaba.druid.proxy.jdbc.TransactionInfo;
+import com.alibaba.druid.sql.ast.SQLStatement;
+import com.alibaba.druid.sql.ast.statement.SQLSelectQuery;
+import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
+import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
+import com.alibaba.druid.sql.parser.SQLParserUtils;
+import com.alibaba.druid.sql.parser.SQLStatementParser;
+import com.alibaba.druid.stat.DruidDataSourceStatManager;
+import com.alibaba.druid.stat.JdbcDataSourceStat;
+import com.alibaba.druid.stat.JdbcSqlStat;
+import com.alibaba.druid.stat.JdbcSqlStatValue;
+import com.alibaba.druid.support.logging.Log;
+import com.alibaba.druid.support.logging.LogFactory;
+import com.alibaba.druid.util.*;
+import com.alibaba.druid.wall.WallFilter;
+import com.alibaba.druid.wall.WallProviderStatValue;
+
+
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.StringRefAddr;
+
+import javax.sql.ConnectionEvent;
+import javax.sql.ConnectionEventListener;
+import javax.sql.PooledConnection;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static com.alibaba.druid.util.Utils.getBoolean;
+
+/**
+ * Created by allwefantasy on 8/30/16.
+ */
+public class ElasticSearchDruidDataSource extends DruidDataSource {
+
+ private final static Log LOG = LogFactory.getLog(DruidDataSource.class);
+
+ private static final long serialVersionUID = 1L;
+
+ // stats
+ private final AtomicLong recycleErrorCount = new AtomicLong();
+ private long connectCount = 0L;
+ private long closeCount = 0L;
+ private final AtomicLong connectErrorCount = new AtomicLong();
+ private long recycleCount = 0L;
+ private long removeAbandonedCount = 0L;
+ private long notEmptyWaitCount = 0L;
+ private long notEmptySignalCount = 0L;
+ private long notEmptyWaitNanos = 0L;
+
+ private int activePeak = 0;
+ private long activePeakTime = 0;
+ private int poolingPeak = 0;
+ private long poolingPeakTime = 0;
+
+ // store
+ private volatile DruidConnectionHolder[] connections;
+ private int poolingCount = 0;
+ private int activeCount = 0;
+ private long discardCount = 0;
+ private int notEmptyWaitThreadCount = 0;
+ private int notEmptyWaitThreadPeak = 0;
+
+ // threads
+ private ScheduledFuture> destroySchedulerFuture;
+ private DestroyTask destoryTask;
+
+ private CreateConnectionThread createConnectionThread;
+ private DestroyConnectionThread destroyConnectionThread;
+ private LogStatsThread logStatsThread;
+ private int createTaskCount;
+
+ private final CountDownLatch initedLatch = new CountDownLatch(2);
+
+ private volatile boolean enable = true;
+
+ private boolean resetStatEnable = true;
+ private final AtomicLong resetCount = new AtomicLong();
+
+ private String initStackTrace;
+
+ private volatile boolean closed = false;
+ private long closeTimeMillis = -1L;
+
+ protected JdbcDataSourceStat dataSourceStat;
+
+ private boolean useGlobalDataSourceStat = false;
+
+ private boolean mbeanRegistered = false;
+
+ public static ThreadLocal waitNanosLocal = new ThreadLocal();
+
+ private boolean logDifferentThread = true;
+
+ public ElasticSearchDruidDataSource() {
+ this(false);
+ }
+
+ public ElasticSearchDruidDataSource(boolean fairLock) {
+ super(fairLock);
+
+ configFromPropety(System.getProperties());
+ }
+
+ public void configFromPropety(Properties properties) {
+ {
+ Boolean value = getBoolean(properties, "druid.testWhileIdle");
+ if (value != null) {
+ this.setTestWhileIdle(value);
+ }
+ }
+ {
+ Boolean value = getBoolean(properties, "druid.testOnBorrow");
+ if (value != null) {
+ this.setTestOnBorrow(value);
+ }
+ }
+ {
+ String property = properties.getProperty("druid.validationQuery");
+ if (property != null && property.length() > 0) {
+ this.setValidationQuery(property);
+ }
+ }
+ {
+ Boolean value = getBoolean(properties, "druid.useGlobalDataSourceStat");
+ if (value != null) {
+ this.setUseGlobalDataSourceStat(value);
+ }
+ }
+ {
+ Boolean value = getBoolean(properties, "druid.useGloalDataSourceStat"); // compatible for early versions
+ if (value != null) {
+ this.setUseGlobalDataSourceStat(value);
+ }
+ }
+ {
+ String property = properties.getProperty("druid.filters");
+
+ if (property != null && property.length() > 0) {
+ try {
+ this.setFilters(property);
+ } catch (SQLException e) {
+ LOG.error("setFilters error", e);
+ }
+ }
+ }
+ {
+ String property = properties.getProperty(Constants.DRUID_TIME_BETWEEN_LOG_STATS_MILLIS);
+ if (property != null && property.length() > 0) {
+ try {
+ long value = Long.parseLong(property);
+ this.setTimeBetweenLogStatsMillis(value);
+ } catch (NumberFormatException e) {
+ LOG.error("illegal property '" + Constants.DRUID_TIME_BETWEEN_LOG_STATS_MILLIS + "'", e);
+ }
+ }
+ }
+ {
+ String property = properties.getProperty(Constants.DRUID_STAT_SQL_MAX_SIZE);
+ if (property != null && property.length() > 0) {
+ try {
+ int value = Integer.parseInt(property);
+ if (dataSourceStat != null) {
+ dataSourceStat.setMaxSqlSize(value);
+ }
+ } catch (NumberFormatException e) {
+ LOG.error("illegal property '" + Constants.DRUID_STAT_SQL_MAX_SIZE + "'", e);
+ }
+ }
+ }
+ {
+ Boolean value = getBoolean(properties, "druid.clearFiltersEnable");
+ if (value != null) {
+ this.setClearFiltersEnable(value);
+ }
+ }
+ {
+ Boolean value = getBoolean(properties, "druid.resetStatEnable");
+ if (value != null) {
+ this.setResetStatEnable(value);
+ }
+ }
+ {
+ String property = properties.getProperty("druid.notFullTimeoutRetryCount");
+ if (property != null && property.length() > 0) {
+ try {
+ int value = Integer.parseInt(property);
+ this.setNotFullTimeoutRetryCount(value);
+ } catch (NumberFormatException e) {
+ LOG.error("illegal property 'druid.notFullTimeoutRetryCount'", e);
+ }
+ }
+ }
+ }
+
+ public boolean isUseGlobalDataSourceStat() {
+ return useGlobalDataSourceStat;
+ }
+
+ public void setUseGlobalDataSourceStat(boolean useGlobalDataSourceStat) {
+ this.useGlobalDataSourceStat = useGlobalDataSourceStat;
+ }
+
+ public String getInitStackTrace() {
+ return initStackTrace;
+ }
+
+ public boolean isResetStatEnable() {
+ return resetStatEnable;
+ }
+
+ public void setResetStatEnable(boolean resetStatEnable) {
+ this.resetStatEnable = resetStatEnable;
+ if (dataSourceStat != null) {
+ dataSourceStat.setResetStatEnable(resetStatEnable);
+ }
+ }
+
+ public long getDiscardCount() {
+ return discardCount;
+ }
+
+ public void restart() throws SQLException {
+ lock.lock();
+ try {
+ if (activeCount > 0) {
+ throw new SQLException("can not restart, activeCount not zero. " + activeCount);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("{dataSource-" + this.getID() + "} restart");
+ }
+
+ this.close();
+ this.resetStat();
+ this.inited = false;
+ this.enable = true;
+ this.closed = false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void resetStat() {
+ if (!isResetStatEnable()) {
+ return;
+ }
+
+ lock.lock();
+ try {
+ connectCount = 0;
+ closeCount = 0;
+ discardCount = 0;
+ recycleCount = 0;
+ createCount.set(0);
+ destroyCount.set(0);
+ removeAbandonedCount = 0;
+ notEmptyWaitCount = 0;
+ notEmptySignalCount = 0L;
+ notEmptyWaitNanos = 0;
+
+ activePeak = 0;
+ activePeakTime = 0;
+ poolingPeak = 0;
+ createTimespan = 0;
+ lastError = null;
+ lastErrorTimeMillis = 0;
+ lastCreateError = null;
+ lastCreateErrorTimeMillis = 0;
+ } finally {
+ lock.unlock();
+ }
+
+ connectErrorCount.set(0);
+ errorCount.set(0);
+ commitCount.set(0);
+ rollbackCount.set(0);
+ startTransactionCount.set(0);
+ cachedPreparedStatementHitCount.set(0);
+ closedPreparedStatementCount.set(0);
+ preparedStatementCount.set(0);
+ transactionHistogram.reset();
+ cachedPreparedStatementDeleteCount.set(0);
+ recycleErrorCount.set(0);
+
+ resetCount.incrementAndGet();
+ }
+
+ public long getResetCount() {
+ return this.resetCount.get();
+ }
+
+ public boolean isEnable() {
+ return enable;
+ }
+
+ public void setEnable(boolean enable) {
+ lock.lock();
+ try {
+ this.enable = enable;
+ if (!enable) {
+ notEmpty.signalAll();
+ notEmptySignalCount++;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void setPoolPreparedStatements(boolean value) {
+ if (this.poolPreparedStatements == value) {
+ return;
+ }
+
+ this.poolPreparedStatements = value;
+
+ if (!inited) {
+ return;
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("set poolPreparedStatements " + this.poolPreparedStatements + " -> " + value);
+ }
+
+ if (!value) {
+ lock.lock();
+ try {
+
+ for (int i = 0; i < poolingCount; ++i) {
+ DruidConnectionHolder connection = connections[i];
+
+ for (PreparedStatementHolder holder : connection.getStatementPool().getMap().values()) {
+ closePreapredStatement(holder);
+ }
+
+ connection.getStatementPool().getMap().clear();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ public void setMaxActive(int maxActive) {
+ if (this.maxActive == maxActive) {
+ return;
+ }
+
+ if (maxActive == 0) {
+ throw new IllegalArgumentException("maxActive can't not set zero");
+ }
+
+ if (!inited) {
+ this.maxActive = maxActive;
+ return;
+ }
+
+ if (maxActive < this.minIdle) {
+ throw new IllegalArgumentException("maxActive less than minIdle, " + maxActive + " < " + this.minIdle);
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("maxActive changed : " + this.maxActive + " -> " + maxActive);
+ }
+
+ lock.lock();
+ try {
+ int allCount = this.poolingCount + this.activeCount;
+
+ if (maxActive > allCount) {
+ this.connections = Arrays.copyOf(this.connections, maxActive);
+ } else {
+ this.connections = Arrays.copyOf(this.connections, allCount);
+ }
+
+ this.maxActive = maxActive;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void validateConnection(Connection conn) throws SQLException {
+
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void setConnectProperties(Properties properties) {
+ if (properties == null) {
+ properties = new Properties();
+ }
+
+ boolean equals;
+ if (properties.size() == this.connectProperties.size()) {
+ equals = true;
+ for (Map.Entry entry : properties.entrySet()) {
+ Object value = this.connectProperties.get(entry.getKey());
+ Object entryValue = entry.getValue();
+ if (value == null && entryValue != null) {
+ equals = false;
+ break;
+ }
+
+ if (!value.equals(entry.getValue())) {
+ equals = false;
+ break;
+ }
+ }
+ } else {
+ equals = false;
+ }
+
+ if (!equals) {
+ if (inited && LOG.isInfoEnabled()) {
+ LOG.info("connectProperties changed : " + this.connectProperties + " -> " + properties);
+ }
+
+ configFromPropety(properties);
+
+ for (Filter filter : this.filters) {
+ filter.configFromProperties(properties);
+ }
+
+ if (exceptionSorter != null) {
+ exceptionSorter.configFromProperties(properties);
+ }
+
+ if (validConnectionChecker != null) {
+ validConnectionChecker.configFromProperties(properties);
+ }
+
+ if (statLogger != null) {
+ statLogger.configFromProperties(properties);
+ }
+ }
+
+ this.connectProperties = properties;
+ }
+
+ public void init() throws SQLException {
+ if (inited) {
+ return;
+ }
+
+ final ReentrantLock lock = this.lock;
+ try {
+ lock.lockInterruptibly();
+ } catch (InterruptedException e) {
+ throw new SQLException("interrupt", e);
+ }
+
+ boolean init = false;
+ try {
+ if (inited) {
+ return;
+ }
+
+ init = true;
+
+ initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());
+
+ this.id = DruidDriver.createDataSourceId();
+ if (this.id > 1) {
+ long delta = (this.id - 1) * 100000;
+ this.connectionIdSeed.addAndGet(delta);
+ this.statementIdSeed.addAndGet(delta);
+ this.resultSetIdSeed.addAndGet(delta);
+ this.transactionIdSeed.addAndGet(delta);
+ }
+
+ if (this.jdbcUrl != null) {
+ this.jdbcUrl = this.jdbcUrl.trim();
+ initFromWrapDriverUrl();
+ }
+
+ if (this.dbType == null || this.dbType.length() == 0) {
+ this.dbType = JdbcUtils.getDbType(jdbcUrl, null);
+ }
+
+ for (Filter filter : filters) {
+ filter.init(this);
+ }
+
+ if (JdbcConstants.MYSQL.equals(this.dbType) || //
+ JdbcConstants.MARIADB.equals(this.dbType)) {
+ boolean cacheServerConfigurationSet = false;
+ if (this.connectProperties.containsKey("cacheServerConfiguration")) {
+ cacheServerConfigurationSet = true;
+ } else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
+ cacheServerConfigurationSet = true;
+ }
+ if (cacheServerConfigurationSet) {
+ this.connectProperties.put("cacheServerConfiguration", "true");
+ }
+ }
+
+ if (maxActive <= 0) {
+ throw new IllegalArgumentException("illegal maxActive " + maxActive);
+ }
+
+ if (maxActive < minIdle) {
+ throw new IllegalArgumentException("illegal maxActive " + maxActive);
+ }
+
+ if (getInitialSize() > maxActive) {
+ throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActieve "
+ + maxActive);
+ }
+
+ if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
+ throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
+ }
+
+ if (this.driverClass != null) {
+ this.driverClass = driverClass.trim();
+ }
+
+ initFromSPIServiceLoader();
+
+
+ initCheck();
+
+ initExceptionSorter();
+ initValidConnectionChecker();
+ validationQueryCheck();
+
+ if (isUseGlobalDataSourceStat()) {
+ dataSourceStat = JdbcDataSourceStat.getGlobal();
+ if (dataSourceStat == null) {
+ dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType);
+ JdbcDataSourceStat.setGlobal(dataSourceStat);
+ }
+ if (dataSourceStat.getDbType() == null) {
+ dataSourceStat.setDbType(this.getDbType());
+ }
+ } else {
+ dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties);
+ }
+ dataSourceStat.setResetStatEnable(this.resetStatEnable);
+
+ connections = new DruidConnectionHolder[maxActive];
+
+ SQLException connectError = null;
+
+ try {
+ // init connections
+ for (int i = 0, size = getInitialSize(); i < size; ++i) {
+ Connection conn = createPhysicalConnection();
+ DruidConnectionHolder holder = new DruidConnectionHolder(this, conn);
+ connections[poolingCount] = holder;
+ incrementPoolingCount();
+ }
+
+ if (poolingCount > 0) {
+ poolingPeak = poolingCount;
+ poolingPeakTime = System.currentTimeMillis();
+ }
+ } catch (SQLException ex) {
+ LOG.error("init datasource error, url: " + this.getUrl(), ex);
+ connectError = ex;
+ }
+
+ createAndLogThread();
+ createAndStartCreatorThread();
+ createAndStartDestroyThread();
+
+ initedLatch.await();
+
+ initedTime = new Date();
+ registerMbean();
+
+ if (connectError != null && poolingCount == 0) {
+ throw connectError;
+ }
+ } catch (SQLException e) {
+ LOG.error("dataSource init error", e);
+ throw e;
+ } catch (InterruptedException e) {
+ throw new SQLException(e.getMessage(), e);
+ } finally {
+ inited = true;
+ lock.unlock();
+
+ if (init && LOG.isInfoEnabled()) {
+ LOG.info("{dataSource-" + this.getID() + "} inited");
+ }
+ }
+ }
+
+ @Override
+ public Connection createPhysicalConnection() throws SQLException {
+ String url = this.getUrl();
+ Properties connectProperties = getConnectProperties();
+
+
+ Connection conn;
+
+ long startNano = System.nanoTime();
+
+ try {
+ conn = createPhysicalConnection(url, connectProperties);
+
+ if (conn == null) {
+ throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass);
+ }
+
+ initPhysicalConnection(conn);
+
+ validateConnection(conn);
+ createError = null;
+ } catch (SQLException ex) {
+ createErrorCount.incrementAndGet();
+ createError = ex;
+ lastCreateError = ex;
+ lastCreateErrorTimeMillis = System.currentTimeMillis();
+ throw ex;
+ } catch (RuntimeException ex) {
+ createErrorCount.incrementAndGet();
+ createError = ex;
+ lastCreateError = ex;
+ lastCreateErrorTimeMillis = System.currentTimeMillis();
+ throw ex;
+ } catch (Error ex) {
+ createErrorCount.incrementAndGet();
+ throw ex;
+ } finally {
+ long nano = System.nanoTime() - startNano;
+ createTimespan += nano;
+ }
+
+ return conn;
+ }
+
+ @Override
+ public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
+ Connection conn = new ElasticSearchConnection(url);
+ createCount.incrementAndGet();
+
+ return conn;
+ }
+
+ private void createAndLogThread() {
+ if (this.timeBetweenLogStatsMillis <= 0) {
+ return;
+ }
+
+ String threadName = "Druid-ConnectionPool-Log-" + System.identityHashCode(this);
+ logStatsThread = new LogStatsThread(threadName);
+ logStatsThread.start();
+
+ this.resetStatEnable = false;
+ }
+
+ protected void createAndStartDestroyThread() {
+ destoryTask = new DestroyTask();
+
+ if (destroyScheduler != null) {
+ long period = timeBetweenEvictionRunsMillis;
+ if (period <= 0) {
+ period = 1000;
+ }
+ destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destoryTask, period, period,
+ TimeUnit.MILLISECONDS);
+ initedLatch.countDown();
+ return;
+ }
+
+ String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
+ destroyConnectionThread = new DestroyConnectionThread(threadName);
+ destroyConnectionThread.start();
+ }
+
+ protected void createAndStartCreatorThread() {
+ if (createScheduler == null) {
+ String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
+ createConnectionThread = new CreateConnectionThread(threadName);
+ createConnectionThread.start();
+ return;
+ }
+
+ initedLatch.countDown();
+ }
+
+ /**
+ * load filters from SPI ServiceLoader
+ *
+ * @see ServiceLoader
+ */
+ private void initFromSPIServiceLoader() {
+
+ String property = System.getProperty("druid.load.spifilter.skip");
+ if (property != null) {
+ return;
+ }
+
+ ServiceLoader druidAutoFilterLoader = ServiceLoader.load(Filter.class);
+
+ for (Filter autoFilter : druidAutoFilterLoader) {
+ AutoLoad autoLoad = autoFilter.getClass().getAnnotation(AutoLoad.class);
+ if (autoLoad != null && autoLoad.value()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("load filter from spi :" + autoFilter.getClass().getName());
+ }
+ addFilter(autoFilter);
+ }
+ }
+ }
+
+ private void initFromWrapDriverUrl() throws SQLException {
+ if (!jdbcUrl.startsWith(DruidDriver.DEFAULT_PREFIX)) {
+ return;
+ }
+
+ DataSourceProxyConfig config = DruidDriver.parseConfig(jdbcUrl, null);
+ this.driverClass = config.getRawDriverClassName();
+
+ LOG.error("error url : '" + jdbcUrl + "', it should be : '" + config.getRawUrl() + "'");
+
+ this.jdbcUrl = config.getRawUrl();
+ if (this.name == null) {
+ this.name = config.getName();
+ }
+
+ for (Filter filter : config.getFilters()) {
+ addFilter(filter);
+ }
+ }
+
+ /**
+ * 会去重复
+ *
+ * @param filter
+ */
+ private void addFilter(Filter filter) {
+ boolean exists = false;
+ for (Filter initedFilter : this.filters) {
+ if (initedFilter.getClass() == filter.getClass()) {
+ exists = true;
+ break;
+ }
+ }
+
+ if (!exists) {
+ filter.init(this);
+ this.filters.add(filter);
+ }
+
+ }
+
+ private void validationQueryCheck() {
+ if (!(isTestOnBorrow() || isTestOnReturn() || isTestWhileIdle())) {
+ return;
+ }
+
+ if (this.validConnectionChecker != null) {
+ return;
+ }
+
+ if (this.getValidationQuery() != null && this.getValidationQuery().length() > 0) {
+ return;
+ }
+
+ String errorMessage = "";
+
+ if (isTestOnBorrow()) {
+ errorMessage += "testOnBorrow is true, ";
+ }
+
+ if (isTestOnReturn()) {
+ errorMessage += "testOnReturn is true, ";
+ }
+
+ if (isTestWhileIdle()) {
+ errorMessage += "testWhileIdle is true, ";
+ }
+
+ LOG.error(errorMessage + "validationQuery not set");
+ }
+
+ protected void initCheck() throws SQLException {
+ if (JdbcUtils.ORACLE.equals(this.dbType)) {
+ isOracle = true;
+
+ if (driver.getMajorVersion() < 10) {
+ throw new SQLException("not support oracle driver " + driver.getMajorVersion() + "."
+ + driver.getMinorVersion());
+ }
+
+ if (driver.getMajorVersion() == 10 && isUseOracleImplicitCache()) {
+ this.getConnectProperties().setProperty("oracle.jdbc.FreeMemoryOnEnterImplicitCache", "true");
+ }
+
+ oracleValidationQueryCheck();
+ } else if (JdbcUtils.DB2.equals(dbType)) {
+ db2ValidationQueryCheck();
+ }
+ }
+
+ private void oracleValidationQueryCheck() {
+ if (validationQuery == null) {
+ return;
+ }
+ if (validationQuery.length() == 0) {
+ return;
+ }
+
+ SQLStatementParser sqlStmtParser = SQLParserUtils.createSQLStatementParser(validationQuery, this.dbType);
+ List stmtList = sqlStmtParser.parseStatementList();
+
+ if (stmtList.size() != 1) {
+ return;
+ }
+
+ SQLStatement stmt = stmtList.get(0);
+ if (!(stmt instanceof SQLSelectStatement)) {
+ return;
+ }
+
+ SQLSelectQuery query = ((SQLSelectStatement) stmt).getSelect().getQuery();
+ if (query instanceof SQLSelectQueryBlock) {
+ if (((SQLSelectQueryBlock) query).getFrom() == null) {
+ LOG.error("invalid oracle validationQuery. " + validationQuery + ", may should be : " + validationQuery
+ + " FROM DUAL");
+ }
+ }
+ }
+
+ private void db2ValidationQueryCheck() {
+ if (validationQuery == null) {
+ return;
+ }
+ if (validationQuery.length() == 0) {
+ return;
+ }
+
+ SQLStatementParser sqlStmtParser = SQLParserUtils.createSQLStatementParser(validationQuery, this.dbType);
+ List stmtList = sqlStmtParser.parseStatementList();
+
+ if (stmtList.size() != 1) {
+ return;
+ }
+
+ SQLStatement stmt = stmtList.get(0);
+ if (!(stmt instanceof SQLSelectStatement)) {
+ return;
+ }
+
+ SQLSelectQuery query = ((SQLSelectStatement) stmt).getSelect().getQuery();
+ if (query instanceof SQLSelectQueryBlock) {
+ if (((SQLSelectQueryBlock) query).getFrom() == null) {
+ LOG.error("invalid db2 validationQuery. " + validationQuery + ", may should be : " + validationQuery
+ + " FROM SYSDUMMY");
+ }
+ }
+ }
+
+ private void initValidConnectionChecker() {
+ this.validConnectionChecker = new MySqlValidConnectionChecker();
+ }
+
+ private void initExceptionSorter() {
+ if (this.exceptionSorter != null) {
+ return;
+ }
+
+ this.exceptionSorter = new MySqlExceptionSorter();
+ }
+
+ @Override
+ public DruidPooledConnection getConnection() throws SQLException {
+ return getConnection(maxWait);
+ }
+
+ public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
+ init();
+ return getConnectionDirect(maxWaitMillis);
+ }
+
+ @Override
+ public PooledConnection getPooledConnection() throws SQLException {
+ return getConnection(maxWait);
+ }
+
+ @Override
+ public PooledConnection getPooledConnection(String user, String password) throws SQLException {
+ throw new UnsupportedOperationException("Not supported by DruidDataSource");
+ }
+
+ public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
+ int notFullTimeoutRetryCnt = 0;
+ for (; ; ) {
+ // handle notFullTimeoutRetry
+ DruidPooledConnection poolableConnection;
+ try {
+ poolableConnection = getConnectionInternal(maxWaitMillis);
+ } catch (GetConnectionTimeoutException ex) {
+ if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
+ notFullTimeoutRetryCnt++;
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("not full timeout retry : " + notFullTimeoutRetryCnt);
+ }
+ continue;
+ }
+ throw ex;
+ }
+
+
+ if (isRemoveAbandoned()) {
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ poolableConnection.setConnectStackTrace(stackTrace);
+ poolableConnection.setConnectedTimeNano();
+ poolableConnection.setTraceEnable(true);
+
+ synchronized (activeConnections) {
+ activeConnections.put(poolableConnection, PRESENT);
+ }
+ }
+
+
+ return poolableConnection;
+ }
+ }
+
+ /**
+ * 抛弃连接,不进行回收,而是抛弃
+ *
+ * @param realConnection
+ * @throws SQLException
+ */
+ public void discardConnection(Connection realConnection) {
+ JdbcUtils.close(realConnection);
+
+ lock.lock();
+ try {
+ activeCount--;
+ discardCount++;
+
+ if (activeCount <= 0) {
+ emptySignal();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
+ if (closed) {
+ connectErrorCount.incrementAndGet();
+ throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));
+ }
+
+ if (!enable) {
+ connectErrorCount.incrementAndGet();
+ throw new DataSourceDisableException();
+ }
+
+ final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
+ final int maxWaitThreadCount = getMaxWaitThreadCount();
+
+ DruidConnectionHolder holder;
+ try {
+ lock.lockInterruptibly();
+ } catch (InterruptedException e) {
+ connectErrorCount.incrementAndGet();
+ throw new SQLException("interrupt", e);
+ }
+
+ try {
+ if (maxWaitThreadCount > 0) {
+ if (notEmptyWaitThreadCount >= maxWaitThreadCount) {
+ connectErrorCount.incrementAndGet();
+ throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ + lock.getQueueLength());
+ }
+ }
+
+ connectCount++;
+
+ if (maxWait > 0) {
+ holder = pollLast(nanos);
+ } else {
+ holder = takeLast();
+ }
+
+ if (holder != null) {
+ activeCount++;
+ if (activeCount > activePeak) {
+ activePeak = activeCount;
+ activePeakTime = System.currentTimeMillis();
+ }
+ }
+ } catch (InterruptedException e) {
+ connectErrorCount.incrementAndGet();
+ throw new SQLException(e.getMessage(), e);
+ } catch (SQLException e) {
+ connectErrorCount.incrementAndGet();
+ throw e;
+ } finally {
+ lock.unlock();
+ }
+
+ if (holder == null) {
+ long waitNanos = waitNanosLocal.get();
+
+ StringBuilder buf = new StringBuilder();
+ buf.append("wait millis ")//
+ .append(waitNanos / (1000 * 1000))//
+ .append(", active " + activeCount)//
+ .append(", maxActive " + maxActive)//
+ ;
+
+ List sqlList = this.getDataSourceStat().getRuningSqlList();
+ for (int i = 0; i < sqlList.size(); ++i) {
+ if (i != 0) {
+ buf.append('\n');
+ } else {
+ buf.append(", ");
+ }
+ JdbcSqlStatValue sql = sqlList.get(i);
+ buf.append("runningSqlCount ");
+ buf.append(sql.getRunningCount());
+ buf.append(" : ");
+ buf.append(sql.getSql());
+ }
+
+ String errorMessage = buf.toString();
+
+ if (this.createError != null) {
+ throw new GetConnectionTimeoutException(errorMessage, createError);
+ } else {
+ throw new GetConnectionTimeoutException(errorMessage);
+ }
+ }
+
+ holder.incrementUseCount();
+
+ DruidPooledConnection poolalbeConnection = new ElasticSearchDruidPooledConnection(holder);
+ return poolalbeConnection;
+ }
+
+ public void handleConnectionException(DruidPooledConnection pooledConnection, Throwable t) throws SQLException {
+ final DruidConnectionHolder holder = pooledConnection.getConnectionHolder();
+
+ errorCount.incrementAndGet();
+ lastError = t;
+ lastErrorTimeMillis = System.currentTimeMillis();
+
+ if (t instanceof SQLException) {
+ SQLException sqlEx = (SQLException) t;
+
+ // broadcastConnectionError
+ ConnectionEvent event = new ConnectionEvent(pooledConnection, sqlEx);
+ for (ConnectionEventListener eventListener : holder.getConnectionEventListeners()) {
+ eventListener.connectionErrorOccurred(event);
+ }
+
+ // exceptionSorter.isExceptionFatal
+ if (exceptionSorter != null && exceptionSorter.isExceptionFatal(sqlEx)) {
+ if (pooledConnection.isTraceEnable()) {
+ synchronized (activeConnections) {
+ if (pooledConnection.isTraceEnable()) {
+ activeConnections.remove(pooledConnection);
+ pooledConnection.setTraceEnable(false);
+ }
+ }
+ }
+
+ boolean requireDiscard = false;
+ synchronized (pooledConnection) {
+ if ((!pooledConnection.isClosed()) || !pooledConnection.isDisable()) {
+ holder.setDiscard(true);
+ pooledConnection.disable(t);
+ requireDiscard = true;
+ }
+ }
+
+ if (requireDiscard) {
+ this.discardConnection(holder.getConnection());
+ holder.setDiscard(true);
+ }
+
+ LOG.error("discard connection", sqlEx);
+ }
+
+ throw sqlEx;
+ } else {
+ throw new SQLException("Error", t);
+ }
+ }
+
+ /**
+ * 回收连接
+ */
+ protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
+ final DruidConnectionHolder holder = pooledConnection.getConnectionHolder();
+
+ if (holder == null) {
+ LOG.warn("connectionHolder is null");
+ return;
+ }
+
+ if (logDifferentThread //
+ && (!isAsyncCloseConnectionEnable()) //
+ && pooledConnection.getOwnerThread() != Thread.currentThread()//
+ ) {
+ LOG.warn("get/close not same thread");
+ }
+
+ final Connection physicalConnection = holder.getConnection();
+
+ if (pooledConnection.isTraceEnable()) {
+ synchronized (activeConnections) {
+ if (pooledConnection.isTraceEnable()) {
+ Object oldInfo = activeConnections.remove(pooledConnection);
+ if (oldInfo == null) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("remove abandonded failed. activeConnections.size " + activeConnections.size());
+ }
+ }
+ pooledConnection.setTraceEnable(false);
+ }
+ }
+ }
+
+ final boolean isAutoCommit = holder.isUnderlyingAutoCommit();
+ final boolean isReadOnly = holder.isUnderlyingReadOnly();
+ final boolean testOnReturn = this.isTestOnReturn();
+
+ try {
+ // check need to rollback?
+ if ((!isAutoCommit) && (!isReadOnly)) {
+ pooledConnection.rollback();
+ }
+
+ // reset holder, restore default settings, clear warnings
+ boolean isSameThread = pooledConnection.getOwnerThread() == Thread.currentThread();
+ if (!isSameThread) {
+ synchronized (pooledConnection) {
+ holder.reset();
+ }
+ } else {
+ holder.reset();
+ }
+
+ if (holder.isDiscard()) {
+ return;
+ }
+
+ if (testOnReturn) {
+ boolean validate = testConnectionInternal(physicalConnection);
+ if (!validate) {
+ JdbcUtils.close(physicalConnection);
+
+ destroyCount.incrementAndGet();
+
+ lock.lock();
+ try {
+ activeCount--;
+ closeCount++;
+ } finally {
+ lock.unlock();
+ }
+ return;
+ }
+ }
+
+ if (!enable) {
+ discardConnection(holder.getConnection());
+ return;
+ }
+
+ final long lastActiveTimeMillis = System.currentTimeMillis();
+ lock.lockInterruptibly();
+ try {
+ activeCount--;
+ closeCount++;
+
+ putLast(holder, lastActiveTimeMillis);
+ recycleCount++;
+ } finally {
+ lock.unlock();
+ }
+ } catch (Throwable e) {
+ holder.clearStatementCache();
+
+ if (!holder.isDiscard()) {
+ this.discardConnection(physicalConnection);
+ holder.setDiscard(true);
+ }
+
+ LOG.error("recyle error", e);
+ recycleErrorCount.incrementAndGet();
+ }
+ }
+
+ public long getRecycleErrorCount() {
+ return recycleErrorCount.get();
+ }
+
+ public void clearStatementCache() throws SQLException {
+ lock.lock();
+ try {
+ for (int i = 0; i < poolingCount; ++i) {
+ DruidConnectionHolder conn = connections[i];
+ conn.getStatementPool().clear();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * close datasource
+ */
+ public void close() {
+ lock.lock();
+ try {
+ if (this.closed) {
+ return;
+ }
+
+ if (!this.inited) {
+ return;
+ }
+
+ if (logStatsThread != null) {
+ logStatsThread.interrupt();
+ }
+
+ if (createConnectionThread != null) {
+ createConnectionThread.interrupt();
+ }
+
+ if (destroyConnectionThread != null) {
+ destroyConnectionThread.interrupt();
+ }
+
+ if (destroySchedulerFuture != null) {
+ destroySchedulerFuture.cancel(true);
+ }
+
+ for (int i = 0; i < poolingCount; ++i) {
+ try {
+ DruidConnectionHolder connHolder = connections[i];
+
+ for (PreparedStatementHolder stmtHolder : connHolder.getStatementPool().getMap().values()) {
+ connHolder.getStatementPool().closeRemovedStatement(stmtHolder);
+ }
+ connHolder.getStatementPool().getMap().clear();
+
+ Connection physicalConnection = connHolder.getConnection();
+ physicalConnection.close();
+ connections[i] = null;
+ destroyCount.incrementAndGet();
+ } catch (Exception ex) {
+ LOG.warn("close connection error", ex);
+ }
+ }
+ poolingCount = 0;
+ unregisterMbean();
+
+ enable = false;
+ notEmpty.signalAll();
+ notEmptySignalCount++;
+
+ this.closed = true;
+ this.closeTimeMillis = System.currentTimeMillis();
+
+ for (Filter filter : filters) {
+ filter.destroy();
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("{dataSource-" + this.getID() + "} closed");
+ }
+ }
+
+ public void registerMbean() {
+ if (!mbeanRegistered) {
+ AccessController.doPrivileged(new PrivilegedAction