diff --git a/pom.xml b/pom.xml index d6447b25..6f090b85 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,5 @@ + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 org.nlpcn elasticsearch-sql @@ -98,6 +98,14 @@ 1.2.16 provided + + + javax.servlet + servlet-api + 2.5 + provided + + @@ -246,6 +254,4 @@ - - \ No newline at end of file diff --git a/src/main/java/com/alibaba/druid/pool/ElasticSearchConnection.java b/src/main/java/com/alibaba/druid/pool/ElasticSearchConnection.java new file mode 100644 index 00000000..0a413808 --- /dev/null +++ b/src/main/java/com/alibaba/druid/pool/ElasticSearchConnection.java @@ -0,0 +1,815 @@ +package com.alibaba.druid.pool; + +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; +import java.sql.*; +import java.sql.Date; +import java.util.*; +import java.util.concurrent.Executor; + +/** + * Created by allwefantasy on 8/30/16. + */ +public class ElasticSearchConnection implements Connection { + + private Client client; + + public ElasticSearchConnection(String jdbcUrl) { + + + Settings settings = Settings.builder().put("client.transport.ignore_cluster_name", true).build(); + try { + TransportClient transportClient = TransportClient.builder().settings(settings).build(); + + String hostAndPortArrayStr = jdbcUrl.split("/")[2]; + String[] hostAndPortArray = hostAndPortArrayStr.split(","); + + for (String hostAndPort : hostAndPortArray) { + String host = hostAndPort.split(":")[0]; + String port = hostAndPort.split(":")[1]; + transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), Integer.parseInt(port))); + } + client = transportClient; + } catch (UnknownHostException e) { + e.printStackTrace(); + } + } + + public Client getClient() { + return client; + } + + @Override + public Statement createStatement() throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + return new PreparedStatement() { + @Override + public ResultSet executeQuery() throws SQLException { + return null; + } + + @Override + public int executeUpdate() throws SQLException { + return 0; + } + + @Override + public void setNull(int parameterIndex, int sqlType) throws SQLException { + + } + + @Override + public void setBoolean(int parameterIndex, boolean x) throws SQLException { + + } + + @Override + public void setByte(int parameterIndex, byte x) throws SQLException { + + } + + @Override + public void setShort(int parameterIndex, short x) throws SQLException { + + } + + @Override + public void setInt(int parameterIndex, int x) throws SQLException { + + } + + @Override + public void setLong(int parameterIndex, long x) throws SQLException { + + } + + @Override + public void setFloat(int parameterIndex, float x) throws SQLException { + + } + + @Override + public void setDouble(int parameterIndex, double x) throws SQLException { + + } + + @Override + public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException { + + } + + @Override + public void setString(int parameterIndex, String x) throws SQLException { + + } + + @Override + public void setBytes(int parameterIndex, byte[] x) throws SQLException { + + } + + @Override + public void setDate(int parameterIndex, Date x) throws SQLException { + + } + + @Override + public void setTime(int parameterIndex, Time x) throws SQLException { + + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException { + + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException { + + } + + @Override + public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException { + + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException { + + } + + @Override + public void clearParameters() throws SQLException { + + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException { + + } + + @Override + public void setObject(int parameterIndex, Object x) throws SQLException { + + } + + @Override + public boolean execute() throws SQLException { + return false; + } + + @Override + public void addBatch() throws SQLException { + + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException { + + } + + @Override + public void setRef(int parameterIndex, Ref x) throws SQLException { + + } + + @Override + public void setBlob(int parameterIndex, Blob x) throws SQLException { + + } + + @Override + public void setClob(int parameterIndex, Clob x) throws SQLException { + + } + + @Override + public void setArray(int parameterIndex, Array x) throws SQLException { + + } + + @Override + public ResultSetMetaData getMetaData() throws SQLException { + return null; + } + + @Override + public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException { + + } + + @Override + public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException { + + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException { + + } + + @Override + public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException { + + } + + @Override + public void setURL(int parameterIndex, URL x) throws SQLException { + + } + + @Override + public ParameterMetaData getParameterMetaData() throws SQLException { + return null; + } + + @Override + public void setRowId(int parameterIndex, RowId x) throws SQLException { + + } + + @Override + public void setNString(int parameterIndex, String value) throws SQLException { + + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException { + + } + + @Override + public void setNClob(int parameterIndex, NClob value) throws SQLException { + + } + + @Override + public void setClob(int parameterIndex, Reader reader, long length) throws SQLException { + + } + + @Override + public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException { + + } + + @Override + public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException { + + } + + @Override + public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException { + + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException { + + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException { + + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException { + + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException { + + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException { + + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { + + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException { + + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException { + + } + + @Override + public void setClob(int parameterIndex, Reader reader) throws SQLException { + + } + + @Override + public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException { + + } + + @Override + public void setNClob(int parameterIndex, Reader reader) throws SQLException { + + } + + @Override + public ResultSet executeQuery(String sql) throws SQLException { + return null; + } + + @Override + public int executeUpdate(String sql) throws SQLException { + return 0; + } + + @Override + public void close() throws SQLException { + client.close(); + } + + @Override + public int getMaxFieldSize() throws SQLException { + return 0; + } + + @Override + public void setMaxFieldSize(int max) throws SQLException { + + } + + @Override + public int getMaxRows() throws SQLException { + return 0; + } + + @Override + public void setMaxRows(int max) throws SQLException { + + } + + @Override + public void setEscapeProcessing(boolean enable) throws SQLException { + + } + + @Override + public int getQueryTimeout() throws SQLException { + return 0; + } + + @Override + public void setQueryTimeout(int seconds) throws SQLException { + + } + + @Override + public void cancel() throws SQLException { + + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void clearWarnings() throws SQLException { + + } + + @Override + public void setCursorName(String name) throws SQLException { + + } + + @Override + public boolean execute(String sql) throws SQLException { + return false; + } + + @Override + public ResultSet getResultSet() throws SQLException { + return null; + } + + @Override + public int getUpdateCount() throws SQLException { + return 0; + } + + @Override + public boolean getMoreResults() throws SQLException { + return false; + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + + } + + @Override + public int getFetchDirection() throws SQLException { + return 0; + } + + @Override + public void setFetchSize(int rows) throws SQLException { + + } + + @Override + public int getFetchSize() throws SQLException { + return 0; + } + + @Override + public int getResultSetConcurrency() throws SQLException { + return 0; + } + + @Override + public int getResultSetType() throws SQLException { + return 0; + } + + @Override + public void addBatch(String sql) throws SQLException { + + } + + @Override + public void clearBatch() throws SQLException { + + } + + @Override + public int[] executeBatch() throws SQLException { + return new int[0]; + } + + @Override + public Connection getConnection() throws SQLException { + return null; + } + + @Override + public boolean getMoreResults(int current) throws SQLException { + return false; + } + + @Override + public ResultSet getGeneratedKeys() throws SQLException { + return null; + } + + @Override + public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + return 0; + } + + @Override + public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { + return 0; + } + + @Override + public int executeUpdate(String sql, String[] columnNames) throws SQLException { + return 0; + } + + @Override + public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { + return false; + } + + @Override + public boolean execute(String sql, int[] columnIndexes) throws SQLException { + return false; + } + + @Override + public boolean execute(String sql, String[] columnNames) throws SQLException { + return false; + } + + @Override + public int getResultSetHoldability() throws SQLException { + return 0; + } + + @Override + public boolean isClosed() throws SQLException { + return false; + } + + @Override + public void setPoolable(boolean poolable) throws SQLException { + + } + + @Override + public boolean isPoolable() throws SQLException { + return false; + } + + @Override + public void closeOnCompletion() throws SQLException { + + } + + @Override + public boolean isCloseOnCompletion() throws SQLException { + return false; + } + + @Override + public T unwrap(Class iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return false; + } + }; + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return null; + } + + @Override + public String nativeSQL(String sql) throws SQLException { + return null; + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + + } + + @Override + public boolean getAutoCommit() throws SQLException { + return false; + } + + @Override + public void commit() throws SQLException { + + } + + @Override + public void rollback() throws SQLException { + + } + + @Override + public void close() throws SQLException { + + } + + @Override + public boolean isClosed() throws SQLException { + return false; + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return null; + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + + } + + @Override + public boolean isReadOnly() throws SQLException { + return false; + } + + @Override + public void setCatalog(String catalog) throws SQLException { + + } + + @Override + public String getCatalog() throws SQLException { + return null; + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + + } + + @Override + public int getTransactionIsolation() throws SQLException { + return 0; + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void clearWarnings() throws SQLException { + + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + @Override + public Map> getTypeMap() throws SQLException { + return null; + } + + @Override + public void setTypeMap(Map> map) throws SQLException { + + } + + @Override + public void setHoldability(int holdability) throws SQLException { + + } + + @Override + public int getHoldability() throws SQLException { + return 0; + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return null; + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + return null; + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return null; + } + + @Override + public Clob createClob() throws SQLException { + return null; + } + + @Override + public Blob createBlob() throws SQLException { + return null; + } + + @Override + public NClob createNClob() throws SQLException { + return null; + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return null; + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return false; + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + + } + + @Override + public String getClientInfo(String name) throws SQLException { + return null; + } + + @Override + public Properties getClientInfo() throws SQLException { + return null; + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return null; + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return null; + } + + @Override + public void setSchema(String schema) throws SQLException { + + } + + @Override + public String getSchema() throws SQLException { + return null; + } + + @Override + public void abort(Executor executor) throws SQLException { + + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + + } + + @Override + public int getNetworkTimeout() throws SQLException { + return 0; + } + + @Override + public T unwrap(Class iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return false; + } +} diff --git a/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSource.java b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSource.java new file mode 100644 index 00000000..51d27074 --- /dev/null +++ b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSource.java @@ -0,0 +1,2698 @@ +package com.alibaba.druid.pool; + +import com.alibaba.druid.Constants; +import com.alibaba.druid.TransactionTimeoutException; +import com.alibaba.druid.VERSION; +import com.alibaba.druid.filter.AutoLoad; +import com.alibaba.druid.filter.Filter; +import com.alibaba.druid.pool.vendor.*; +import com.alibaba.druid.proxy.DruidDriver; +import com.alibaba.druid.proxy.jdbc.DataSourceProxyConfig; +import com.alibaba.druid.proxy.jdbc.TransactionInfo; +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.ast.statement.SQLSelectQuery; +import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock; +import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; +import com.alibaba.druid.sql.parser.SQLParserUtils; +import com.alibaba.druid.sql.parser.SQLStatementParser; +import com.alibaba.druid.stat.DruidDataSourceStatManager; +import com.alibaba.druid.stat.JdbcDataSourceStat; +import com.alibaba.druid.stat.JdbcSqlStat; +import com.alibaba.druid.stat.JdbcSqlStatValue; +import com.alibaba.druid.support.logging.Log; +import com.alibaba.druid.support.logging.LogFactory; +import com.alibaba.druid.util.*; +import com.alibaba.druid.wall.WallFilter; +import com.alibaba.druid.wall.WallProviderStatValue; + + +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.StringRefAddr; + +import javax.sql.ConnectionEvent; +import javax.sql.ConnectionEventListener; +import javax.sql.PooledConnection; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static com.alibaba.druid.util.Utils.getBoolean; + +/** + * Created by allwefantasy on 8/30/16. + */ +public class ElasticSearchDruidDataSource extends DruidDataSource { + + private final static Log LOG = LogFactory.getLog(DruidDataSource.class); + + private static final long serialVersionUID = 1L; + + // stats + private final AtomicLong recycleErrorCount = new AtomicLong(); + private long connectCount = 0L; + private long closeCount = 0L; + private final AtomicLong connectErrorCount = new AtomicLong(); + private long recycleCount = 0L; + private long removeAbandonedCount = 0L; + private long notEmptyWaitCount = 0L; + private long notEmptySignalCount = 0L; + private long notEmptyWaitNanos = 0L; + + private int activePeak = 0; + private long activePeakTime = 0; + private int poolingPeak = 0; + private long poolingPeakTime = 0; + + // store + private volatile DruidConnectionHolder[] connections; + private int poolingCount = 0; + private int activeCount = 0; + private long discardCount = 0; + private int notEmptyWaitThreadCount = 0; + private int notEmptyWaitThreadPeak = 0; + + // threads + private ScheduledFuture destroySchedulerFuture; + private DestroyTask destoryTask; + + private CreateConnectionThread createConnectionThread; + private DestroyConnectionThread destroyConnectionThread; + private LogStatsThread logStatsThread; + private int createTaskCount; + + private final CountDownLatch initedLatch = new CountDownLatch(2); + + private volatile boolean enable = true; + + private boolean resetStatEnable = true; + private final AtomicLong resetCount = new AtomicLong(); + + private String initStackTrace; + + private volatile boolean closed = false; + private long closeTimeMillis = -1L; + + protected JdbcDataSourceStat dataSourceStat; + + private boolean useGlobalDataSourceStat = false; + + private boolean mbeanRegistered = false; + + public static ThreadLocal waitNanosLocal = new ThreadLocal(); + + private boolean logDifferentThread = true; + + public ElasticSearchDruidDataSource() { + this(false); + } + + public ElasticSearchDruidDataSource(boolean fairLock) { + super(fairLock); + + configFromPropety(System.getProperties()); + } + + public void configFromPropety(Properties properties) { + { + Boolean value = getBoolean(properties, "druid.testWhileIdle"); + if (value != null) { + this.setTestWhileIdle(value); + } + } + { + Boolean value = getBoolean(properties, "druid.testOnBorrow"); + if (value != null) { + this.setTestOnBorrow(value); + } + } + { + String property = properties.getProperty("druid.validationQuery"); + if (property != null && property.length() > 0) { + this.setValidationQuery(property); + } + } + { + Boolean value = getBoolean(properties, "druid.useGlobalDataSourceStat"); + if (value != null) { + this.setUseGlobalDataSourceStat(value); + } + } + { + Boolean value = getBoolean(properties, "druid.useGloalDataSourceStat"); // compatible for early versions + if (value != null) { + this.setUseGlobalDataSourceStat(value); + } + } + { + String property = properties.getProperty("druid.filters"); + + if (property != null && property.length() > 0) { + try { + this.setFilters(property); + } catch (SQLException e) { + LOG.error("setFilters error", e); + } + } + } + { + String property = properties.getProperty(Constants.DRUID_TIME_BETWEEN_LOG_STATS_MILLIS); + if (property != null && property.length() > 0) { + try { + long value = Long.parseLong(property); + this.setTimeBetweenLogStatsMillis(value); + } catch (NumberFormatException e) { + LOG.error("illegal property '" + Constants.DRUID_TIME_BETWEEN_LOG_STATS_MILLIS + "'", e); + } + } + } + { + String property = properties.getProperty(Constants.DRUID_STAT_SQL_MAX_SIZE); + if (property != null && property.length() > 0) { + try { + int value = Integer.parseInt(property); + if (dataSourceStat != null) { + dataSourceStat.setMaxSqlSize(value); + } + } catch (NumberFormatException e) { + LOG.error("illegal property '" + Constants.DRUID_STAT_SQL_MAX_SIZE + "'", e); + } + } + } + { + Boolean value = getBoolean(properties, "druid.clearFiltersEnable"); + if (value != null) { + this.setClearFiltersEnable(value); + } + } + { + Boolean value = getBoolean(properties, "druid.resetStatEnable"); + if (value != null) { + this.setResetStatEnable(value); + } + } + { + String property = properties.getProperty("druid.notFullTimeoutRetryCount"); + if (property != null && property.length() > 0) { + try { + int value = Integer.parseInt(property); + this.setNotFullTimeoutRetryCount(value); + } catch (NumberFormatException e) { + LOG.error("illegal property 'druid.notFullTimeoutRetryCount'", e); + } + } + } + } + + public boolean isUseGlobalDataSourceStat() { + return useGlobalDataSourceStat; + } + + public void setUseGlobalDataSourceStat(boolean useGlobalDataSourceStat) { + this.useGlobalDataSourceStat = useGlobalDataSourceStat; + } + + public String getInitStackTrace() { + return initStackTrace; + } + + public boolean isResetStatEnable() { + return resetStatEnable; + } + + public void setResetStatEnable(boolean resetStatEnable) { + this.resetStatEnable = resetStatEnable; + if (dataSourceStat != null) { + dataSourceStat.setResetStatEnable(resetStatEnable); + } + } + + public long getDiscardCount() { + return discardCount; + } + + public void restart() throws SQLException { + lock.lock(); + try { + if (activeCount > 0) { + throw new SQLException("can not restart, activeCount not zero. " + activeCount); + } + if (LOG.isInfoEnabled()) { + LOG.info("{dataSource-" + this.getID() + "} restart"); + } + + this.close(); + this.resetStat(); + this.inited = false; + this.enable = true; + this.closed = false; + } finally { + lock.unlock(); + } + } + + public void resetStat() { + if (!isResetStatEnable()) { + return; + } + + lock.lock(); + try { + connectCount = 0; + closeCount = 0; + discardCount = 0; + recycleCount = 0; + createCount.set(0); + destroyCount.set(0); + removeAbandonedCount = 0; + notEmptyWaitCount = 0; + notEmptySignalCount = 0L; + notEmptyWaitNanos = 0; + + activePeak = 0; + activePeakTime = 0; + poolingPeak = 0; + createTimespan = 0; + lastError = null; + lastErrorTimeMillis = 0; + lastCreateError = null; + lastCreateErrorTimeMillis = 0; + } finally { + lock.unlock(); + } + + connectErrorCount.set(0); + errorCount.set(0); + commitCount.set(0); + rollbackCount.set(0); + startTransactionCount.set(0); + cachedPreparedStatementHitCount.set(0); + closedPreparedStatementCount.set(0); + preparedStatementCount.set(0); + transactionHistogram.reset(); + cachedPreparedStatementDeleteCount.set(0); + recycleErrorCount.set(0); + + resetCount.incrementAndGet(); + } + + public long getResetCount() { + return this.resetCount.get(); + } + + public boolean isEnable() { + return enable; + } + + public void setEnable(boolean enable) { + lock.lock(); + try { + this.enable = enable; + if (!enable) { + notEmpty.signalAll(); + notEmptySignalCount++; + } + } finally { + lock.unlock(); + } + } + + public void setPoolPreparedStatements(boolean value) { + if (this.poolPreparedStatements == value) { + return; + } + + this.poolPreparedStatements = value; + + if (!inited) { + return; + } + + if (LOG.isInfoEnabled()) { + LOG.info("set poolPreparedStatements " + this.poolPreparedStatements + " -> " + value); + } + + if (!value) { + lock.lock(); + try { + + for (int i = 0; i < poolingCount; ++i) { + DruidConnectionHolder connection = connections[i]; + + for (PreparedStatementHolder holder : connection.getStatementPool().getMap().values()) { + closePreapredStatement(holder); + } + + connection.getStatementPool().getMap().clear(); + } + } finally { + lock.unlock(); + } + } + } + + public void setMaxActive(int maxActive) { + if (this.maxActive == maxActive) { + return; + } + + if (maxActive == 0) { + throw new IllegalArgumentException("maxActive can't not set zero"); + } + + if (!inited) { + this.maxActive = maxActive; + return; + } + + if (maxActive < this.minIdle) { + throw new IllegalArgumentException("maxActive less than minIdle, " + maxActive + " < " + this.minIdle); + } + + if (LOG.isInfoEnabled()) { + LOG.info("maxActive changed : " + this.maxActive + " -> " + maxActive); + } + + lock.lock(); + try { + int allCount = this.poolingCount + this.activeCount; + + if (maxActive > allCount) { + this.connections = Arrays.copyOf(this.connections, maxActive); + } else { + this.connections = Arrays.copyOf(this.connections, allCount); + } + + this.maxActive = maxActive; + } finally { + lock.unlock(); + } + } + + @Override + public void validateConnection(Connection conn) throws SQLException { + + } + + @SuppressWarnings("rawtypes") + public void setConnectProperties(Properties properties) { + if (properties == null) { + properties = new Properties(); + } + + boolean equals; + if (properties.size() == this.connectProperties.size()) { + equals = true; + for (Map.Entry entry : properties.entrySet()) { + Object value = this.connectProperties.get(entry.getKey()); + Object entryValue = entry.getValue(); + if (value == null && entryValue != null) { + equals = false; + break; + } + + if (!value.equals(entry.getValue())) { + equals = false; + break; + } + } + } else { + equals = false; + } + + if (!equals) { + if (inited && LOG.isInfoEnabled()) { + LOG.info("connectProperties changed : " + this.connectProperties + " -> " + properties); + } + + configFromPropety(properties); + + for (Filter filter : this.filters) { + filter.configFromProperties(properties); + } + + if (exceptionSorter != null) { + exceptionSorter.configFromProperties(properties); + } + + if (validConnectionChecker != null) { + validConnectionChecker.configFromProperties(properties); + } + + if (statLogger != null) { + statLogger.configFromProperties(properties); + } + } + + this.connectProperties = properties; + } + + public void init() throws SQLException { + if (inited) { + return; + } + + final ReentrantLock lock = this.lock; + try { + lock.lockInterruptibly(); + } catch (InterruptedException e) { + throw new SQLException("interrupt", e); + } + + boolean init = false; + try { + if (inited) { + return; + } + + init = true; + + initStackTrace = Utils.toString(Thread.currentThread().getStackTrace()); + + this.id = DruidDriver.createDataSourceId(); + if (this.id > 1) { + long delta = (this.id - 1) * 100000; + this.connectionIdSeed.addAndGet(delta); + this.statementIdSeed.addAndGet(delta); + this.resultSetIdSeed.addAndGet(delta); + this.transactionIdSeed.addAndGet(delta); + } + + if (this.jdbcUrl != null) { + this.jdbcUrl = this.jdbcUrl.trim(); + initFromWrapDriverUrl(); + } + + if (this.dbType == null || this.dbType.length() == 0) { + this.dbType = JdbcUtils.getDbType(jdbcUrl, null); + } + + for (Filter filter : filters) { + filter.init(this); + } + + if (JdbcConstants.MYSQL.equals(this.dbType) || // + JdbcConstants.MARIADB.equals(this.dbType)) { + boolean cacheServerConfigurationSet = false; + if (this.connectProperties.containsKey("cacheServerConfiguration")) { + cacheServerConfigurationSet = true; + } else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) { + cacheServerConfigurationSet = true; + } + if (cacheServerConfigurationSet) { + this.connectProperties.put("cacheServerConfiguration", "true"); + } + } + + if (maxActive <= 0) { + throw new IllegalArgumentException("illegal maxActive " + maxActive); + } + + if (maxActive < minIdle) { + throw new IllegalArgumentException("illegal maxActive " + maxActive); + } + + if (getInitialSize() > maxActive) { + throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActieve " + + maxActive); + } + + if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) { + throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true"); + } + + if (this.driverClass != null) { + this.driverClass = driverClass.trim(); + } + + initFromSPIServiceLoader(); + + + initCheck(); + + initExceptionSorter(); + initValidConnectionChecker(); + validationQueryCheck(); + + if (isUseGlobalDataSourceStat()) { + dataSourceStat = JdbcDataSourceStat.getGlobal(); + if (dataSourceStat == null) { + dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType); + JdbcDataSourceStat.setGlobal(dataSourceStat); + } + if (dataSourceStat.getDbType() == null) { + dataSourceStat.setDbType(this.getDbType()); + } + } else { + dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties); + } + dataSourceStat.setResetStatEnable(this.resetStatEnable); + + connections = new DruidConnectionHolder[maxActive]; + + SQLException connectError = null; + + try { + // init connections + for (int i = 0, size = getInitialSize(); i < size; ++i) { + Connection conn = createPhysicalConnection(); + DruidConnectionHolder holder = new DruidConnectionHolder(this, conn); + connections[poolingCount] = holder; + incrementPoolingCount(); + } + + if (poolingCount > 0) { + poolingPeak = poolingCount; + poolingPeakTime = System.currentTimeMillis(); + } + } catch (SQLException ex) { + LOG.error("init datasource error, url: " + this.getUrl(), ex); + connectError = ex; + } + + createAndLogThread(); + createAndStartCreatorThread(); + createAndStartDestroyThread(); + + initedLatch.await(); + + initedTime = new Date(); + registerMbean(); + + if (connectError != null && poolingCount == 0) { + throw connectError; + } + } catch (SQLException e) { + LOG.error("dataSource init error", e); + throw e; + } catch (InterruptedException e) { + throw new SQLException(e.getMessage(), e); + } finally { + inited = true; + lock.unlock(); + + if (init && LOG.isInfoEnabled()) { + LOG.info("{dataSource-" + this.getID() + "} inited"); + } + } + } + + @Override + public Connection createPhysicalConnection() throws SQLException { + String url = this.getUrl(); + Properties connectProperties = getConnectProperties(); + + + Connection conn; + + long startNano = System.nanoTime(); + + try { + conn = createPhysicalConnection(url, connectProperties); + + if (conn == null) { + throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass); + } + + initPhysicalConnection(conn); + + validateConnection(conn); + createError = null; + } catch (SQLException ex) { + createErrorCount.incrementAndGet(); + createError = ex; + lastCreateError = ex; + lastCreateErrorTimeMillis = System.currentTimeMillis(); + throw ex; + } catch (RuntimeException ex) { + createErrorCount.incrementAndGet(); + createError = ex; + lastCreateError = ex; + lastCreateErrorTimeMillis = System.currentTimeMillis(); + throw ex; + } catch (Error ex) { + createErrorCount.incrementAndGet(); + throw ex; + } finally { + long nano = System.nanoTime() - startNano; + createTimespan += nano; + } + + return conn; + } + + @Override + public Connection createPhysicalConnection(String url, Properties info) throws SQLException { + Connection conn = new ElasticSearchConnection(url); + createCount.incrementAndGet(); + + return conn; + } + + private void createAndLogThread() { + if (this.timeBetweenLogStatsMillis <= 0) { + return; + } + + String threadName = "Druid-ConnectionPool-Log-" + System.identityHashCode(this); + logStatsThread = new LogStatsThread(threadName); + logStatsThread.start(); + + this.resetStatEnable = false; + } + + protected void createAndStartDestroyThread() { + destoryTask = new DestroyTask(); + + if (destroyScheduler != null) { + long period = timeBetweenEvictionRunsMillis; + if (period <= 0) { + period = 1000; + } + destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destoryTask, period, period, + TimeUnit.MILLISECONDS); + initedLatch.countDown(); + return; + } + + String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this); + destroyConnectionThread = new DestroyConnectionThread(threadName); + destroyConnectionThread.start(); + } + + protected void createAndStartCreatorThread() { + if (createScheduler == null) { + String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this); + createConnectionThread = new CreateConnectionThread(threadName); + createConnectionThread.start(); + return; + } + + initedLatch.countDown(); + } + + /** + * load filters from SPI ServiceLoader + * + * @see ServiceLoader + */ + private void initFromSPIServiceLoader() { + + String property = System.getProperty("druid.load.spifilter.skip"); + if (property != null) { + return; + } + + ServiceLoader druidAutoFilterLoader = ServiceLoader.load(Filter.class); + + for (Filter autoFilter : druidAutoFilterLoader) { + AutoLoad autoLoad = autoFilter.getClass().getAnnotation(AutoLoad.class); + if (autoLoad != null && autoLoad.value()) { + if (LOG.isInfoEnabled()) { + LOG.info("load filter from spi :" + autoFilter.getClass().getName()); + } + addFilter(autoFilter); + } + } + } + + private void initFromWrapDriverUrl() throws SQLException { + if (!jdbcUrl.startsWith(DruidDriver.DEFAULT_PREFIX)) { + return; + } + + DataSourceProxyConfig config = DruidDriver.parseConfig(jdbcUrl, null); + this.driverClass = config.getRawDriverClassName(); + + LOG.error("error url : '" + jdbcUrl + "', it should be : '" + config.getRawUrl() + "'"); + + this.jdbcUrl = config.getRawUrl(); + if (this.name == null) { + this.name = config.getName(); + } + + for (Filter filter : config.getFilters()) { + addFilter(filter); + } + } + + /** + * 会去重复 + * + * @param filter + */ + private void addFilter(Filter filter) { + boolean exists = false; + for (Filter initedFilter : this.filters) { + if (initedFilter.getClass() == filter.getClass()) { + exists = true; + break; + } + } + + if (!exists) { + filter.init(this); + this.filters.add(filter); + } + + } + + private void validationQueryCheck() { + if (!(isTestOnBorrow() || isTestOnReturn() || isTestWhileIdle())) { + return; + } + + if (this.validConnectionChecker != null) { + return; + } + + if (this.getValidationQuery() != null && this.getValidationQuery().length() > 0) { + return; + } + + String errorMessage = ""; + + if (isTestOnBorrow()) { + errorMessage += "testOnBorrow is true, "; + } + + if (isTestOnReturn()) { + errorMessage += "testOnReturn is true, "; + } + + if (isTestWhileIdle()) { + errorMessage += "testWhileIdle is true, "; + } + + LOG.error(errorMessage + "validationQuery not set"); + } + + protected void initCheck() throws SQLException { + if (JdbcUtils.ORACLE.equals(this.dbType)) { + isOracle = true; + + if (driver.getMajorVersion() < 10) { + throw new SQLException("not support oracle driver " + driver.getMajorVersion() + "." + + driver.getMinorVersion()); + } + + if (driver.getMajorVersion() == 10 && isUseOracleImplicitCache()) { + this.getConnectProperties().setProperty("oracle.jdbc.FreeMemoryOnEnterImplicitCache", "true"); + } + + oracleValidationQueryCheck(); + } else if (JdbcUtils.DB2.equals(dbType)) { + db2ValidationQueryCheck(); + } + } + + private void oracleValidationQueryCheck() { + if (validationQuery == null) { + return; + } + if (validationQuery.length() == 0) { + return; + } + + SQLStatementParser sqlStmtParser = SQLParserUtils.createSQLStatementParser(validationQuery, this.dbType); + List stmtList = sqlStmtParser.parseStatementList(); + + if (stmtList.size() != 1) { + return; + } + + SQLStatement stmt = stmtList.get(0); + if (!(stmt instanceof SQLSelectStatement)) { + return; + } + + SQLSelectQuery query = ((SQLSelectStatement) stmt).getSelect().getQuery(); + if (query instanceof SQLSelectQueryBlock) { + if (((SQLSelectQueryBlock) query).getFrom() == null) { + LOG.error("invalid oracle validationQuery. " + validationQuery + ", may should be : " + validationQuery + + " FROM DUAL"); + } + } + } + + private void db2ValidationQueryCheck() { + if (validationQuery == null) { + return; + } + if (validationQuery.length() == 0) { + return; + } + + SQLStatementParser sqlStmtParser = SQLParserUtils.createSQLStatementParser(validationQuery, this.dbType); + List stmtList = sqlStmtParser.parseStatementList(); + + if (stmtList.size() != 1) { + return; + } + + SQLStatement stmt = stmtList.get(0); + if (!(stmt instanceof SQLSelectStatement)) { + return; + } + + SQLSelectQuery query = ((SQLSelectStatement) stmt).getSelect().getQuery(); + if (query instanceof SQLSelectQueryBlock) { + if (((SQLSelectQueryBlock) query).getFrom() == null) { + LOG.error("invalid db2 validationQuery. " + validationQuery + ", may should be : " + validationQuery + + " FROM SYSDUMMY"); + } + } + } + + private void initValidConnectionChecker() { + this.validConnectionChecker = new MySqlValidConnectionChecker(); + } + + private void initExceptionSorter() { + if (this.exceptionSorter != null) { + return; + } + + this.exceptionSorter = new MySqlExceptionSorter(); + } + + @Override + public DruidPooledConnection getConnection() throws SQLException { + return getConnection(maxWait); + } + + public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { + init(); + return getConnectionDirect(maxWaitMillis); + } + + @Override + public PooledConnection getPooledConnection() throws SQLException { + return getConnection(maxWait); + } + + @Override + public PooledConnection getPooledConnection(String user, String password) throws SQLException { + throw new UnsupportedOperationException("Not supported by DruidDataSource"); + } + + public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException { + int notFullTimeoutRetryCnt = 0; + for (; ; ) { + // handle notFullTimeoutRetry + DruidPooledConnection poolableConnection; + try { + poolableConnection = getConnectionInternal(maxWaitMillis); + } catch (GetConnectionTimeoutException ex) { + if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) { + notFullTimeoutRetryCnt++; + if (LOG.isWarnEnabled()) { + LOG.warn("not full timeout retry : " + notFullTimeoutRetryCnt); + } + continue; + } + throw ex; + } + + + if (isRemoveAbandoned()) { + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + poolableConnection.setConnectStackTrace(stackTrace); + poolableConnection.setConnectedTimeNano(); + poolableConnection.setTraceEnable(true); + + synchronized (activeConnections) { + activeConnections.put(poolableConnection, PRESENT); + } + } + + + return poolableConnection; + } + } + + /** + * 抛弃连接,不进行回收,而是抛弃 + * + * @param realConnection + * @throws SQLException + */ + public void discardConnection(Connection realConnection) { + JdbcUtils.close(realConnection); + + lock.lock(); + try { + activeCount--; + discardCount++; + + if (activeCount <= 0) { + emptySignal(); + } + } finally { + lock.unlock(); + } + } + + private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException { + if (closed) { + connectErrorCount.incrementAndGet(); + throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis)); + } + + if (!enable) { + connectErrorCount.incrementAndGet(); + throw new DataSourceDisableException(); + } + + final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait); + final int maxWaitThreadCount = getMaxWaitThreadCount(); + + DruidConnectionHolder holder; + try { + lock.lockInterruptibly(); + } catch (InterruptedException e) { + connectErrorCount.incrementAndGet(); + throw new SQLException("interrupt", e); + } + + try { + if (maxWaitThreadCount > 0) { + if (notEmptyWaitThreadCount >= maxWaitThreadCount) { + connectErrorCount.incrementAndGet(); + throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " + + lock.getQueueLength()); + } + } + + connectCount++; + + if (maxWait > 0) { + holder = pollLast(nanos); + } else { + holder = takeLast(); + } + + if (holder != null) { + activeCount++; + if (activeCount > activePeak) { + activePeak = activeCount; + activePeakTime = System.currentTimeMillis(); + } + } + } catch (InterruptedException e) { + connectErrorCount.incrementAndGet(); + throw new SQLException(e.getMessage(), e); + } catch (SQLException e) { + connectErrorCount.incrementAndGet(); + throw e; + } finally { + lock.unlock(); + } + + if (holder == null) { + long waitNanos = waitNanosLocal.get(); + + StringBuilder buf = new StringBuilder(); + buf.append("wait millis ")// + .append(waitNanos / (1000 * 1000))// + .append(", active " + activeCount)// + .append(", maxActive " + maxActive)// + ; + + List sqlList = this.getDataSourceStat().getRuningSqlList(); + for (int i = 0; i < sqlList.size(); ++i) { + if (i != 0) { + buf.append('\n'); + } else { + buf.append(", "); + } + JdbcSqlStatValue sql = sqlList.get(i); + buf.append("runningSqlCount "); + buf.append(sql.getRunningCount()); + buf.append(" : "); + buf.append(sql.getSql()); + } + + String errorMessage = buf.toString(); + + if (this.createError != null) { + throw new GetConnectionTimeoutException(errorMessage, createError); + } else { + throw new GetConnectionTimeoutException(errorMessage); + } + } + + holder.incrementUseCount(); + + DruidPooledConnection poolalbeConnection = new ElasticSearchDruidPooledConnection(holder); + return poolalbeConnection; + } + + public void handleConnectionException(DruidPooledConnection pooledConnection, Throwable t) throws SQLException { + final DruidConnectionHolder holder = pooledConnection.getConnectionHolder(); + + errorCount.incrementAndGet(); + lastError = t; + lastErrorTimeMillis = System.currentTimeMillis(); + + if (t instanceof SQLException) { + SQLException sqlEx = (SQLException) t; + + // broadcastConnectionError + ConnectionEvent event = new ConnectionEvent(pooledConnection, sqlEx); + for (ConnectionEventListener eventListener : holder.getConnectionEventListeners()) { + eventListener.connectionErrorOccurred(event); + } + + // exceptionSorter.isExceptionFatal + if (exceptionSorter != null && exceptionSorter.isExceptionFatal(sqlEx)) { + if (pooledConnection.isTraceEnable()) { + synchronized (activeConnections) { + if (pooledConnection.isTraceEnable()) { + activeConnections.remove(pooledConnection); + pooledConnection.setTraceEnable(false); + } + } + } + + boolean requireDiscard = false; + synchronized (pooledConnection) { + if ((!pooledConnection.isClosed()) || !pooledConnection.isDisable()) { + holder.setDiscard(true); + pooledConnection.disable(t); + requireDiscard = true; + } + } + + if (requireDiscard) { + this.discardConnection(holder.getConnection()); + holder.setDiscard(true); + } + + LOG.error("discard connection", sqlEx); + } + + throw sqlEx; + } else { + throw new SQLException("Error", t); + } + } + + /** + * 回收连接 + */ + protected void recycle(DruidPooledConnection pooledConnection) throws SQLException { + final DruidConnectionHolder holder = pooledConnection.getConnectionHolder(); + + if (holder == null) { + LOG.warn("connectionHolder is null"); + return; + } + + if (logDifferentThread // + && (!isAsyncCloseConnectionEnable()) // + && pooledConnection.getOwnerThread() != Thread.currentThread()// + ) { + LOG.warn("get/close not same thread"); + } + + final Connection physicalConnection = holder.getConnection(); + + if (pooledConnection.isTraceEnable()) { + synchronized (activeConnections) { + if (pooledConnection.isTraceEnable()) { + Object oldInfo = activeConnections.remove(pooledConnection); + if (oldInfo == null) { + if (LOG.isWarnEnabled()) { + LOG.warn("remove abandonded failed. activeConnections.size " + activeConnections.size()); + } + } + pooledConnection.setTraceEnable(false); + } + } + } + + final boolean isAutoCommit = holder.isUnderlyingAutoCommit(); + final boolean isReadOnly = holder.isUnderlyingReadOnly(); + final boolean testOnReturn = this.isTestOnReturn(); + + try { + // check need to rollback? + if ((!isAutoCommit) && (!isReadOnly)) { + pooledConnection.rollback(); + } + + // reset holder, restore default settings, clear warnings + boolean isSameThread = pooledConnection.getOwnerThread() == Thread.currentThread(); + if (!isSameThread) { + synchronized (pooledConnection) { + holder.reset(); + } + } else { + holder.reset(); + } + + if (holder.isDiscard()) { + return; + } + + if (testOnReturn) { + boolean validate = testConnectionInternal(physicalConnection); + if (!validate) { + JdbcUtils.close(physicalConnection); + + destroyCount.incrementAndGet(); + + lock.lock(); + try { + activeCount--; + closeCount++; + } finally { + lock.unlock(); + } + return; + } + } + + if (!enable) { + discardConnection(holder.getConnection()); + return; + } + + final long lastActiveTimeMillis = System.currentTimeMillis(); + lock.lockInterruptibly(); + try { + activeCount--; + closeCount++; + + putLast(holder, lastActiveTimeMillis); + recycleCount++; + } finally { + lock.unlock(); + } + } catch (Throwable e) { + holder.clearStatementCache(); + + if (!holder.isDiscard()) { + this.discardConnection(physicalConnection); + holder.setDiscard(true); + } + + LOG.error("recyle error", e); + recycleErrorCount.incrementAndGet(); + } + } + + public long getRecycleErrorCount() { + return recycleErrorCount.get(); + } + + public void clearStatementCache() throws SQLException { + lock.lock(); + try { + for (int i = 0; i < poolingCount; ++i) { + DruidConnectionHolder conn = connections[i]; + conn.getStatementPool().clear(); + } + } finally { + lock.unlock(); + } + } + + /** + * close datasource + */ + public void close() { + lock.lock(); + try { + if (this.closed) { + return; + } + + if (!this.inited) { + return; + } + + if (logStatsThread != null) { + logStatsThread.interrupt(); + } + + if (createConnectionThread != null) { + createConnectionThread.interrupt(); + } + + if (destroyConnectionThread != null) { + destroyConnectionThread.interrupt(); + } + + if (destroySchedulerFuture != null) { + destroySchedulerFuture.cancel(true); + } + + for (int i = 0; i < poolingCount; ++i) { + try { + DruidConnectionHolder connHolder = connections[i]; + + for (PreparedStatementHolder stmtHolder : connHolder.getStatementPool().getMap().values()) { + connHolder.getStatementPool().closeRemovedStatement(stmtHolder); + } + connHolder.getStatementPool().getMap().clear(); + + Connection physicalConnection = connHolder.getConnection(); + physicalConnection.close(); + connections[i] = null; + destroyCount.incrementAndGet(); + } catch (Exception ex) { + LOG.warn("close connection error", ex); + } + } + poolingCount = 0; + unregisterMbean(); + + enable = false; + notEmpty.signalAll(); + notEmptySignalCount++; + + this.closed = true; + this.closeTimeMillis = System.currentTimeMillis(); + + for (Filter filter : filters) { + filter.destroy(); + } + } finally { + lock.unlock(); + } + + if (LOG.isInfoEnabled()) { + LOG.info("{dataSource-" + this.getID() + "} closed"); + } + } + + public void registerMbean() { + if (!mbeanRegistered) { + AccessController.doPrivileged(new PrivilegedAction() { + + @Override + public Object run() { + ObjectName objectName = DruidDataSourceStatManager.addDataSource(ElasticSearchDruidDataSource.this, + ElasticSearchDruidDataSource.this.name); + + ElasticSearchDruidDataSource.this.setObjectName(objectName); + ElasticSearchDruidDataSource.this.mbeanRegistered = true; + + return null; + } + }); + } + } + + public void unregisterMbean() { + if (mbeanRegistered) { + AccessController.doPrivileged(new PrivilegedAction() { + + @Override + public Object run() { + DruidDataSourceStatManager.removeDataSource(ElasticSearchDruidDataSource.this); + ElasticSearchDruidDataSource.this.mbeanRegistered = false; + return null; + } + }); + } + } + + public boolean isMbeanRegistered() { + return mbeanRegistered; + } + + void putLast(DruidConnectionHolder e, long lastActiveTimeMillis) { + e.setLastActiveTimeMillis(lastActiveTimeMillis); + connections[poolingCount] = e; + incrementPoolingCount(); + + if (poolingCount > poolingPeak) { + poolingPeak = poolingCount; + poolingPeakTime = lastActiveTimeMillis; + } + + notEmpty.signal(); + notEmptySignalCount++; + } + + DruidConnectionHolder takeLast() throws InterruptedException, SQLException { + try { + while (poolingCount == 0) { + emptySignal(); // send signal to CreateThread create connection + notEmptyWaitThreadCount++; + if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { + notEmptyWaitThreadPeak = notEmptyWaitThreadCount; + } + try { + notEmpty.await(); // signal by recycle or creator + } finally { + notEmptyWaitThreadCount--; + } + notEmptyWaitCount++; + + if (!enable) { + connectErrorCount.incrementAndGet(); + throw new DataSourceDisableException(); + } + } + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to non-interrupted thread + notEmptySignalCount++; + throw ie; + } + + decrementPoolingCount(); + DruidConnectionHolder last = connections[poolingCount]; + connections[poolingCount] = null; + + return last; + } + + private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException { + long estimate = nanos; + + for (; ; ) { + if (poolingCount == 0) { + emptySignal(); // send signal to CreateThread create connection + + if (estimate <= 0) { + waitNanosLocal.set(nanos - estimate); + return null; + } + + notEmptyWaitThreadCount++; + if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { + notEmptyWaitThreadPeak = notEmptyWaitThreadCount; + } + + try { + long startEstimate = estimate; + estimate = notEmpty.awaitNanos(estimate); // signal by + // recycle or + // creator + notEmptyWaitCount++; + notEmptyWaitNanos += (startEstimate - estimate); + + if (!enable) { + connectErrorCount.incrementAndGet(); + throw new DataSourceDisableException(); + } + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to non-interrupted thread + notEmptySignalCount++; + throw ie; + } finally { + notEmptyWaitThreadCount--; + } + + if (poolingCount == 0) { + if (estimate > 0) { + continue; + } + + waitNanosLocal.set(nanos - estimate); + return null; + } + } + + decrementPoolingCount(); + DruidConnectionHolder last = connections[poolingCount]; + connections[poolingCount] = null; + + return last; + } + } + + private final void decrementPoolingCount() { + poolingCount--; + } + + private final void incrementPoolingCount() { + poolingCount++; + } + + @Override + public Connection getConnection(String username, String password) throws SQLException { + if (!StringUtils.equals(username, this.username)) { + throw new UnsupportedOperationException("Not supported by DruidDataSource"); + } + + if (!StringUtils.equals(password, this.password)) { + throw new UnsupportedOperationException("Not supported by DruidDataSource"); + } + + return getConnection(); + } + + public long getCreateCount() { + return createCount.get(); + } + + public long getDestroyCount() { + return destroyCount.get(); + } + + public long getConnectCount() { + lock.lock(); + try { + return connectCount; + } finally { + lock.unlock(); + } + } + + public long getCloseCount() { + return closeCount; + } + + public long getConnectErrorCount() { + return connectErrorCount.get(); + } + + @Override + public int getPoolingCount() { + lock.lock(); + try { + return poolingCount; + } finally { + lock.unlock(); + } + } + + public int getPoolingPeak() { + lock.lock(); + try { + return poolingPeak; + } finally { + lock.unlock(); + } + } + + public Date getPoolingPeakTime() { + if (poolingPeakTime <= 0) { + return null; + } + + return new Date(poolingPeakTime); + } + + public long getRecycleCount() { + return recycleCount; + } + + public int getActiveCount() { + lock.lock(); + try { + return activeCount; + } finally { + lock.unlock(); + } + } + + public void logStats() { + final DruidDataSourceStatLogger statLogger = this.statLogger; + if (statLogger == null) { + return; + } + + DruidDataSourceStatValue statValue = getStatValueAndReset(); + + statLogger.log(statValue); + } + + public DruidDataSourceStatValue getStatValueAndReset() { + DruidDataSourceStatValue value = new DruidDataSourceStatValue(); + + lock.lock(); + try { + value.setPoolingCount(this.poolingCount); + value.setPoolingPeak(this.poolingPeak); + value.setPoolingPeakTime(this.poolingPeakTime); + + value.setActiveCount(this.activeCount); + value.setActivePeak(this.activePeak); + value.setActivePeakTime(this.activePeakTime); + + value.setConnectCount(this.connectCount); + value.setCloseCount(this.closeCount); + value.setWaitThreadCount(lock.getWaitQueueLength(notEmpty)); + value.setNotEmptyWaitCount(this.notEmptyWaitCount); + value.setNotEmptyWaitNanos(this.notEmptyWaitNanos); + + // reset + this.poolingPeak = 0; + this.poolingPeakTime = 0; + this.activePeak = 0; + this.activePeakTime = 0; + this.connectCount = 0; + this.closeCount = 0; + + this.notEmptyWaitCount = 0; + this.notEmptyWaitNanos = 0; + } finally { + lock.unlock(); + } + + value.setName(this.getName()); + value.setDbType(this.getDbType()); + value.setDriverClassName(this.getDriverClassName()); + + value.setUrl(this.getUrl()); + value.setUserName(this.getUsername()); + value.setFilterClassNames(this.getFilterClassNames()); + + value.setInitialSize(this.getInitialSize()); + value.setMinIdle(this.getMinIdle()); + value.setMaxActive(this.getMaxActive()); + + value.setQueryTimeout(this.getQueryTimeout()); + value.setTransactionQueryTimeout(this.getTransactionQueryTimeout()); + value.setLoginTimeout(this.getLoginTimeout()); + value.setValidConnectionCheckerClassName(this.getValidConnectionCheckerClassName()); + value.setExceptionSorterClassName(this.getExceptionSorterClassName()); + + value.setTestOnBorrow(this.isTestOnBorrow()); + value.setTestOnReturn(this.isTestOnReturn()); + value.setTestWhileIdle(this.isTestWhileIdle()); + + value.setDefaultAutoCommit(this.isDefaultAutoCommit()); + + if (defaultReadOnly != null) { + value.setDefaultReadOnly(defaultReadOnly); + } + value.setDefaultTransactionIsolation(this.getDefaultTransactionIsolation()); + + value.setLogicConnectErrorCount(connectErrorCount.getAndSet(0)); + + value.setPhysicalConnectCount(createCount.getAndSet(0)); + value.setPhysicalCloseCount(destroyCount.getAndSet(0)); + value.setPhysicalConnectErrorCount(createErrorCount.getAndSet(0)); + + value.setExecuteCount(this.executeCount.getAndSet(0)); + value.setErrorCount(errorCount.getAndSet(0)); + value.setCommitCount(commitCount.getAndSet(0)); + value.setRollbackCount(rollbackCount.getAndSet(0)); + + value.setPstmtCacheHitCount(cachedPreparedStatementHitCount.getAndSet(0)); + value.setPstmtCacheMissCount(cachedPreparedStatementMissCount.getAndSet(0)); + + value.setStartTransactionCount(startTransactionCount.getAndSet(0)); + value.setTransactionHistogram(this.getTransactionHistogram().toArrayAndReset()); + + value.setConnectionHoldTimeHistogram(this.getDataSourceStat().getConnectionHoldHistogram().toArrayAndReset()); + value.removeAbandoned = this.isRemoveAbandoned(); + value.setClobOpenCount(this.getDataSourceStat().getClobOpenCountAndReset()); + value.setBlobOpenCount(this.getDataSourceStat().getBlobOpenCountAndReset()); + + value.setSqlSkipCount(this.getDataSourceStat().getSkipSqlCountAndReset()); + value.setSqlList(this.getDataSourceStat().getSqlStatMapAndReset()); + + return value; + } + + public long getRemoveAbandonedCount() { + return removeAbandonedCount; + } + + protected void put(Connection connection) { + DruidConnectionHolder holder = null; + try { + holder = new DruidConnectionHolder(ElasticSearchDruidDataSource.this, connection); + } catch (SQLException ex) { + lock.lock(); + try { + if (createScheduler != null) { + createTaskCount--; + } + } finally { + lock.unlock(); + } + LOG.error("create connection holder error", ex); + return; + } + + lock.lock(); + try { + connections[poolingCount] = holder; + incrementPoolingCount(); + + if (poolingCount > poolingPeak) { + poolingPeak = poolingCount; + poolingPeakTime = System.currentTimeMillis(); + } + + notEmpty.signal(); + notEmptySignalCount++; + + if (createScheduler != null) { + createTaskCount--; + + if (poolingCount + createTaskCount < notEmptyWaitThreadCount // + && activeCount + poolingCount + createTaskCount < maxActive) { + emptySignal(); + } + } + } finally { + lock.unlock(); + } + } + + public class CreateConnectionTask implements Runnable { + + private int errorCount = 0; + + @Override + public void run() { + for (; ; ) { + // addLast + try { + lock.lockInterruptibly(); + } catch (InterruptedException e2) { + LOG.error("interrupt: ", e2); + lock.lock(); + try { + createTaskCount--; + } finally { + lock.unlock(); + } + break; + } + + try { + // 必须存在线程等待,才创建连接 + if (poolingCount >= notEmptyWaitThreadCount) { + createTaskCount--; + return; + } + + // 防止创建超过maxActive数量的连接 + if (activeCount + poolingCount >= maxActive) { + createTaskCount--; + return; + } + } finally { + lock.unlock(); + } + + Connection connection = null; + + try { + connection = createPhysicalConnection(); + } catch (SQLException e) { + LOG.error("create connection error, url: " + jdbcUrl, e); + + errorCount++; + + if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { + if (breakAfterAcquireFailure) { + lock.lock(); + try { + createTaskCount--; + } finally { + lock.unlock(); + } + return; + } + + this.errorCount = 0; // reset errorCount + createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS); + return; + } + } catch (RuntimeException e) { + LOG.error("create connection error", e); + continue; + } catch (Error e) { + lock.lock(); + try { + createTaskCount--; + } finally { + lock.unlock(); + } + LOG.error("create connection error", e); + break; + } + + if (connection == null) { + continue; + } + + put(connection); + break; + } + } + } + + public class CreateConnectionThread extends Thread { + + public CreateConnectionThread(String name) { + super(name); + this.setDaemon(true); + } + + public void run() { + initedLatch.countDown(); + + int errorCount = 0; + for (; ; ) { + // addLast + try { + lock.lockInterruptibly(); + } catch (InterruptedException e2) { + break; + } + + try { + // 必须存在线程等待,才创建连接 + if (poolingCount >= notEmptyWaitThreadCount) { + empty.await(); + } + + // 防止创建超过maxActive数量的连接 + if (activeCount + poolingCount >= maxActive) { + empty.await(); + continue; + } + + } catch (InterruptedException e) { + lastCreateError = e; + lastErrorTimeMillis = System.currentTimeMillis(); + break; + } finally { + lock.unlock(); + } + + Connection connection = null; + + try { + connection = createPhysicalConnection(); + } catch (SQLException e) { + LOG.error("create connection error, url: " + jdbcUrl, e); + + errorCount++; + + if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { + if (breakAfterAcquireFailure) { + break; + } + + try { + Thread.sleep(timeBetweenConnectErrorMillis); + } catch (InterruptedException interruptEx) { + break; + } + } + } catch (RuntimeException e) { + LOG.error("create connection error", e); + continue; + } catch (Error e) { + LOG.error("create connection error", e); + break; + } + + if (connection == null) { + continue; + } + + put(connection); + + errorCount = 0; // reset errorCount + } + } + } + + public class DestroyConnectionThread extends Thread { + + public DestroyConnectionThread(String name) { + super(name); + this.setDaemon(true); + } + + public void run() { + initedLatch.countDown(); + + for (; ; ) { + // 从前面开始删除 + try { + if (closed) { + break; + } + + if (timeBetweenEvictionRunsMillis > 0) { + Thread.sleep(timeBetweenEvictionRunsMillis); + } else { + Thread.sleep(1000); // + } + + if (Thread.interrupted()) { + break; + } + + destoryTask.run(); + } catch (InterruptedException e) { + break; + } + } + } + + } + + public class DestroyTask implements Runnable { + + @Override + public void run() { + shrink(true); + + if (isRemoveAbandoned()) { + removeAbandoned(); + } + } + + } + + public class LogStatsThread extends Thread { + + public LogStatsThread(String name) { + super(name); + this.setDaemon(true); + } + + public void run() { + try { + for (; ; ) { + try { + logStats(); + } catch (Exception e) { + LOG.error("logStats error", e); + } + + Thread.sleep(timeBetweenLogStatsMillis); + } + } catch (InterruptedException e) { + // skip + } + } + } + + public int removeAbandoned() { + int removeCount = 0; + + long currrentNanos = System.nanoTime(); + + List abandonedList = new ArrayList(); + + synchronized (activeConnections) { + Iterator iter = activeConnections.keySet().iterator(); + + for (; iter.hasNext(); ) { + DruidPooledConnection pooledConnection = iter.next(); + + if (pooledConnection.isRunning()) { + continue; + } + + long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000); + + if (timeMillis >= removeAbandonedTimeoutMillis) { + iter.remove(); + pooledConnection.setTraceEnable(false); + abandonedList.add(pooledConnection); + } + } + } + + if (abandonedList.size() > 0) { + for (DruidPooledConnection pooledConnection : abandonedList) { + synchronized (pooledConnection) { + if (pooledConnection.isDisable()) { + continue; + } + } + + JdbcUtils.close(pooledConnection); + pooledConnection.abandond(); + removeAbandonedCount++; + removeCount++; + + if (isLogAbandoned()) { + StringBuilder buf = new StringBuilder(); + buf.append("abandon connection, owner thread: "); + buf.append(pooledConnection.getOwnerThread().getName()); + buf.append(", connected time nano: "); + buf.append(pooledConnection.getConnectedTimeNano()); + buf.append(", open stackTrace\n"); + + StackTraceElement[] trace = pooledConnection.getConnectStackTrace(); + for (int i = 0; i < trace.length; i++) { + buf.append("\tat "); + buf.append(trace[i].toString()); + buf.append("\n"); + } + + LOG.error(buf.toString()); + } + } + } + + return removeCount; + } + + /** + * Instance key + */ + protected String instanceKey = null; + + public Reference getReference() throws NamingException { + final String className = getClass().getName(); + final String factoryName = className + "Factory"; // XXX: not robust + Reference ref = new Reference(className, factoryName, null); + ref.add(new StringRefAddr("instanceKey", instanceKey)); + ref.add(new StringRefAddr("url", this.getUrl())); + ref.add(new StringRefAddr("username", this.getUsername())); + ref.add(new StringRefAddr("password", this.getPassword())); + // TODO ADD OTHER PROPERTIES + return ref; + } + + @Override + public List getFilterClassNames() { + List names = new ArrayList(); + for (Filter filter : filters) { + names.add(filter.getClass().getName()); + } + return names; + } + + public int getRawDriverMajorVersion() { + int version = -1; + if (this.driver != null) { + version = driver.getMajorVersion(); + } + return version; + } + + public int getRawDriverMinorVersion() { + int version = -1; + if (this.driver != null) { + version = driver.getMinorVersion(); + } + return version; + } + + public String getProperties() { + Properties properties = new Properties(); + properties.putAll(connectProperties); + if (properties.containsKey("password")) { + properties.put("password", "******"); + } + return properties.toString(); + } + + @Override + public void shrink() { + shrink(false); + } + + public void shrink(boolean checkTime) { + final List evictList = new ArrayList(); + try { + lock.lockInterruptibly(); + } catch (InterruptedException e) { + return; + } + + try { + final int checkCount = poolingCount - minIdle; + final long currentTimeMillis = System.currentTimeMillis(); + for (int i = 0; i < checkCount; ++i) { + DruidConnectionHolder connection = connections[i]; + + if (checkTime) { + long idleMillis = currentTimeMillis - connection.getLastActiveTimeMillis(); + if (idleMillis >= minEvictableIdleTimeMillis) { + evictList.add(connection); + } else { + break; + } + } else { + evictList.add(connection); + } + } + + int removeCount = evictList.size(); + if (removeCount > 0) { + System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount); + Arrays.fill(connections, poolingCount - removeCount, poolingCount, null); + poolingCount -= removeCount; + } + } finally { + lock.unlock(); + } + + for (DruidConnectionHolder item : evictList) { + Connection connection = item.getConnection(); + JdbcUtils.close(connection); + destroyCount.incrementAndGet(); + } + } + + public int getWaitThreadCount() { + lock.lock(); + try { + return lock.getWaitQueueLength(notEmpty); + } finally { + lock.unlock(); + } + } + + public long getNotEmptyWaitCount() { + return notEmptyWaitCount; + } + + public int getNotEmptyWaitThreadCount() { + lock.lock(); + try { + return notEmptyWaitThreadCount; + } finally { + lock.unlock(); + } + } + + public int getNotEmptyWaitThreadPeak() { + lock.lock(); + try { + return notEmptyWaitThreadPeak; + } finally { + lock.unlock(); + } + } + + public long getNotEmptySignalCount() { + return notEmptySignalCount; + } + + public long getNotEmptyWaitMillis() { + return notEmptyWaitNanos / (1000 * 1000); + } + + public long getNotEmptyWaitNanos() { + return notEmptyWaitNanos; + } + + public int getLockQueueLength() { + return lock.getQueueLength(); + } + + public int getActivePeak() { + return activePeak; + } + + public Date getActivePeakTime() { + if (activePeakTime <= 0) { + return null; + } + + return new Date(activePeakTime); + } + + public String dump() { + lock.lock(); + try { + return this.toString(); + } finally { + lock.unlock(); + } + } + + public long getErrorCount() { + return this.errorCount.get(); + } + + public String toString() { + StringBuilder buf = new StringBuilder(); + + buf.append("{"); + + buf.append("\n\tCreateTime:\""); + buf.append(Utils.toString(getCreatedTime())); + buf.append("\""); + + buf.append(",\n\tActiveCount:"); + buf.append(getActiveCount()); + + buf.append(",\n\tPoolingCount:"); + buf.append(getPoolingCount()); + + buf.append(",\n\tCreateCount:"); + buf.append(getCreateCount()); + + buf.append(",\n\tDestroyCount:"); + buf.append(getDestroyCount()); + + buf.append(",\n\tCloseCount:"); + buf.append(getCloseCount()); + + buf.append(",\n\tConnectCount:"); + buf.append(getConnectCount()); + + buf.append(",\n\tConnections:["); + for (int i = 0; i < poolingCount; ++i) { + DruidConnectionHolder conn = connections[i]; + if (conn != null) { + if (i != 0) { + buf.append(","); + } + buf.append("\n\t\t"); + buf.append(conn.toString()); + } + } + buf.append("\n\t]"); + + buf.append("\n}"); + + if (this.isPoolPreparedStatements()) { + buf.append("\n\n["); + for (int i = 0; i < poolingCount; ++i) { + DruidConnectionHolder conn = connections[i]; + if (conn != null) { + if (i != 0) { + buf.append(","); + } + buf.append("\n\t{\n\tID:"); + buf.append(System.identityHashCode(conn.getConnection())); + PreparedStatementPool pool = conn.getStatementPool(); + + buf.append(", \n\tpoolStatements:["); + + int entryIndex = 0; + try { + for (Map.Entry entry : pool.getMap().entrySet()) { + if (entryIndex != 0) { + buf.append(","); + } + buf.append("\n\t\t{hitCount:"); + buf.append(entry.getValue().getHitCount()); + buf.append(",sql:\""); + buf.append(entry.getKey().getSql()); + buf.append("\""); + buf.append("\t}"); + + entryIndex++; + } + } catch (ConcurrentModificationException e) { + // skip .. + } + + buf.append("\n\t\t]"); + + buf.append("\n\t}"); + } + } + buf.append("\n]"); + } + + return buf.toString(); + } + + public List> getPoolingConnectionInfo() { + List> list = new ArrayList>(); + lock.lock(); + try { + for (int i = 0; i < poolingCount; ++i) { + DruidConnectionHolder connHolder = connections[i]; + Connection conn = connHolder.getConnection(); + + Map map = new LinkedHashMap(); + map.put("id", System.identityHashCode(conn)); + map.put("useCount", connHolder.getUseCount()); + if (connHolder.getLastActiveTimeMillis() > 0) { + map.put("lastActiveTime", new Date(connHolder.getLastActiveTimeMillis())); + } + map.put("connectTime", new Date(connHolder.getTimeMillis())); + map.put("holdability", connHolder.getUnderlyingHoldability()); + map.put("transactionIsolation", connHolder.getUnderlyingTransactionIsolation()); + map.put("autoCommit", connHolder.isUnderlyingAutoCommit()); + map.put("readoOnly", connHolder.isUnderlyingReadOnly()); + + if (connHolder.isPoolPreparedStatements()) { + List> stmtCache = new ArrayList>(); + PreparedStatementPool stmtPool = connHolder.getStatementPool(); + for (PreparedStatementHolder stmtHolder : stmtPool.getMap().values()) { + Map stmtInfo = new LinkedHashMap(); + + stmtInfo.put("sql", stmtHolder.getKey().getSql()); + stmtInfo.put("defaultRowPretch", stmtHolder.getDefaultRowPrefetch()); + stmtInfo.put("rowPrefetch", stmtHolder.getRowPrefetch()); + stmtInfo.put("hitCount", stmtHolder.getHitCount()); + + stmtCache.add(stmtInfo); + } + + map.put("pscache", stmtCache); + } + + list.add(map); + } + } finally { + lock.unlock(); + } + return list; + } + + public void logTransaction(TransactionInfo info) { + long transactionMillis = info.getEndTimeMillis() - info.getStartTimeMillis(); + if (transactionThresholdMillis > 0 && transactionMillis > transactionThresholdMillis) { + StringBuilder buf = new StringBuilder(); + buf.append("long time transaction, take "); + buf.append(transactionMillis); + buf.append(" ms : "); + for (String sql : info.getSqlList()) { + buf.append(sql); + buf.append(";"); + } + LOG.error(buf.toString(), new TransactionTimeoutException()); + } + } + + @Override + public String getVersion() { + return VERSION.getVersionNumber(); + } + + @Override + public JdbcDataSourceStat getDataSourceStat() { + return dataSourceStat; + } + + public Object clone() throws CloneNotSupportedException { + return cloneDruidDataSource(); + } + + public DruidDataSource cloneDruidDataSource() { + DruidDataSource x = new DruidDataSource(); + + cloneTo(x); + + return x; + } + + public Map getStatDataForMBean() { + try { + Map map = new HashMap(); + + // 0 - 4 + map.put("Name", this.getName()); + map.put("URL", this.getUrl()); + map.put("CreateCount", this.getCreateCount()); + map.put("DestroyCount", this.getDestroyCount()); + map.put("ConnectCount", this.getConnectCount()); + + // 5 - 9 + map.put("CloseCount", this.getCloseCount()); + map.put("ActiveCount", this.getActivePeak()); + map.put("PoolingCount", this.getPoolingCount()); + map.put("LockQueueLength", this.getLockQueueLength()); + map.put("WaitThreadCount", this.getNotEmptyWaitThreadPeak()); + + // 10 - 14 + map.put("InitialSize", this.getInitialSize()); + map.put("MaxActive", this.getMaxActive()); + map.put("MinIdle", this.getMinIdle()); + map.put("PoolPreparedStatements", this.isPoolPreparedStatements()); + map.put("TestOnBorrow", this.isTestOnBorrow()); + + // 15 - 19 + map.put("TestOnReturn", this.isTestOnReturn()); + map.put("MinEvictableIdleTimeMillis", this.getMinEvictableIdleTimeMillis()); + map.put("ConnectErrorCount", this.getConnectErrorCount()); + map.put("CreateTimespanMillis", this.getCreateTimespanMillis()); + map.put("DbType", this.getDbType()); + + // 20 - 24 + map.put("ValidationQuery", this.getValidationQuery()); + map.put("ValidationQueryTimeout", this.getValidationQueryTimeout()); + map.put("DriverClassName", this.getDriverClassName()); + map.put("Username", this.getUsername()); + map.put("RemoveAbandonedCount", this.getRemoveAbandonedCount()); + + // 25 - 29 + map.put("NotEmptyWaitCount", this.getNotEmptyWaitCount()); + map.put("NotEmptyWaitNanos", this.getNotEmptyWaitNanos()); + map.put("ErrorCount", this.getErrorCount()); + map.put("ReusePreparedStatementCount", this.getCachedPreparedStatementHitCount()); + map.put("StartTransactionCount", this.getStartTransactionCount()); + + // 30 - 34 + map.put("CommitCount", this.getCommitCount()); + map.put("RollbackCount", this.getRollbackCount()); + map.put("LastError", JMXUtils.getErrorCompositeData(this.getLastError())); + map.put("LastCreateError", JMXUtils.getErrorCompositeData(this.getLastCreateError())); + map.put("PreparedStatementCacheDeleteCount", this.getCachedPreparedStatementDeleteCount()); + + // 35 - 39 + map.put("PreparedStatementCacheAccessCount", this.getCachedPreparedStatementAccessCount()); + map.put("PreparedStatementCacheMissCount", this.getCachedPreparedStatementMissCount()); + map.put("PreparedStatementCacheHitCount", this.getCachedPreparedStatementHitCount()); + map.put("PreparedStatementCacheCurrentCount", this.getCachedPreparedStatementCount()); + map.put("Version", this.getVersion()); + + // 40 - + map.put("LastErrorTime", this.getLastErrorTime()); + map.put("LastCreateErrorTime", this.getLastCreateErrorTime()); + map.put("CreateErrorCount", this.getCreateErrorCount()); + map.put("DiscardCount", this.getDiscardCount()); + + return map; + } catch (JMException ex) { + throw new IllegalStateException("getStatData error", ex); + } + } + + public Map getStatData() { + final int activeCount; + final int activePeak; + final Date activePeakTime; + + final int poolingCount; + final int poolingPeak; + final Date poolingPeakTime; + + final long connectCount; + final long closeCount; + + lock.lock(); + try { + poolingCount = this.poolingCount; + poolingPeak = this.poolingPeak; + poolingPeakTime = this.getPoolingPeakTime(); + + activeCount = this.activeCount; + activePeak = this.activePeak; + activePeakTime = this.getActivePeakTime(); + + connectCount = this.connectCount; + closeCount = this.closeCount; + } finally { + lock.unlock(); + } + Map dataMap = new LinkedHashMap(); + + dataMap.put("Identity", System.identityHashCode(this)); + dataMap.put("Name", this.getName()); + dataMap.put("DbType", this.getDbType()); + dataMap.put("DriverClassName", this.getDriverClassName()); + + dataMap.put("URL", this.getUrl()); + dataMap.put("UserName", this.getUsername()); + dataMap.put("FilterClassNames", this.getFilterClassNames()); + + dataMap.put("WaitThreadCount", this.getWaitThreadCount()); + dataMap.put("NotEmptyWaitCount", this.getNotEmptyWaitCount()); + dataMap.put("NotEmptyWaitMillis", this.getNotEmptyWaitMillis()); + + dataMap.put("PoolingCount", poolingCount); + dataMap.put("PoolingPeak", poolingPeak); + dataMap.put("PoolingPeakTime", poolingPeakTime); + + dataMap.put("ActiveCount", activeCount); + dataMap.put("ActivePeak", activePeak); + dataMap.put("ActivePeakTime", activePeakTime); + + dataMap.put("InitialSize", this.getInitialSize()); + dataMap.put("MinIdle", this.getMinIdle()); + dataMap.put("MaxActive", this.getMaxActive()); + + dataMap.put("QueryTimeout", this.getQueryTimeout()); + dataMap.put("TransactionQueryTimeout", this.getTransactionQueryTimeout()); + dataMap.put("LoginTimeout", this.getLoginTimeout()); + dataMap.put("ValidConnectionCheckerClassName", this.getValidConnectionCheckerClassName()); + dataMap.put("ExceptionSorterClassName", this.getExceptionSorterClassName()); + + dataMap.put("TestOnBorrow", this.isTestOnBorrow()); + dataMap.put("TestOnReturn", this.isTestOnReturn()); + dataMap.put("TestWhileIdle", this.isTestWhileIdle()); + + dataMap.put("DefaultAutoCommit", this.isDefaultAutoCommit()); + dataMap.put("DefaultReadOnly", this.getDefaultReadOnly()); + dataMap.put("DefaultTransactionIsolation", this.getDefaultTransactionIsolation()); + + dataMap.put("LogicConnectCount", connectCount); + dataMap.put("LogicCloseCount", closeCount); + dataMap.put("LogicConnectErrorCount", this.getConnectErrorCount()); + + dataMap.put("PhysicalConnectCount", this.getCreateCount()); + dataMap.put("PhysicalCloseCount", this.getDestroyCount()); + dataMap.put("PhysicalConnectErrorCount", this.getCreateErrorCount()); + + dataMap.put("ExecuteCount", this.getExecuteCount()); + dataMap.put("ErrorCount", this.getErrorCount()); + dataMap.put("CommitCount", this.getCommitCount()); + dataMap.put("RollbackCount", this.getRollbackCount()); + + dataMap.put("PSCacheAccessCount", this.getCachedPreparedStatementAccessCount()); + dataMap.put("PSCacheHitCount", this.getCachedPreparedStatementHitCount()); + dataMap.put("PSCacheMissCount", this.getCachedPreparedStatementMissCount()); + + dataMap.put("StartTransactionCount", this.getStartTransactionCount()); + dataMap.put("TransactionHistogram", this.getTransactionHistogramValues()); + + dataMap.put("ConnectionHoldTimeHistogram", this.getDataSourceStat().getConnectionHoldHistogram().toArray()); + dataMap.put("RemoveAbandoned", this.isRemoveAbandoned()); + dataMap.put("ClobOpenCount", this.getDataSourceStat().getClobOpenCount()); + dataMap.put("BlobOpenCount", this.getDataSourceStat().getBlobOpenCount()); + + return dataMap; + } + + public JdbcSqlStat getSqlStat(int sqlId) { + return this.getDataSourceStat().getSqlStat(sqlId); + } + + public JdbcSqlStat getSqlStat(long sqlId) { + return this.getDataSourceStat().getSqlStat(sqlId); + } + + public Map getSqlStatMap() { + return this.getDataSourceStat().getSqlStatMap(); + } + + public Map getWallStatMap() { + WallProviderStatValue wallStatValue = getWallStatValue(false); + + if (wallStatValue != null) { + return wallStatValue.toMap(); + } + + return null; + } + + public WallProviderStatValue getWallStatValue(boolean reset) { + for (Filter filter : this.filters) { + if (filter instanceof WallFilter) { + WallFilter wallFilter = (WallFilter) filter; + return wallFilter.getProvider().getStatValue(reset); + } + } + + return null; + } + + public Lock getLock() { + return lock; + } + + @Override + public boolean isWrapperFor(Class iface) { + for (Filter filter : this.filters) { + if (filter.isWrapperFor(iface)) { + return true; + } + } + + if (this.statLogger != null + && (this.statLogger.getClass() == iface || DruidDataSourceStatLogger.class == iface)) { + return true; + } + + return super.isWrapperFor(iface); + } + + @SuppressWarnings("unchecked") + public T unwrap(Class iface) { + for (Filter filter : this.filters) { + if (filter.isWrapperFor(iface)) { + return (T) filter; + } + } + + if (this.statLogger != null + && (this.statLogger.getClass() == iface || DruidDataSourceStatLogger.class == iface)) { + return (T) statLogger; + } + + return super.unwrap(iface); + } + + public boolean isLogDifferentThread() { + return logDifferentThread; + } + + public void setLogDifferentThread(boolean logDifferentThread) { + this.logDifferentThread = logDifferentThread; + } + + public DruidPooledConnection tryGetConnection() throws SQLException { + if (poolingCount == 0) { + return null; + } + return getConnection(); + } + + @Override + public int fill() throws SQLException { + return this.fill(this.maxActive); + } + + @Override + public int fill(int toCount) throws SQLException { + if (closed) { + throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis)); + } + + if (toCount < 0) { + throw new IllegalArgumentException("toCount can't not be less than zero"); + } + + init(); + + if (toCount > this.maxActive) { + toCount = this.maxActive; + } + + int fillCount = 0; + for (; ; ) { + try { + lock.lockInterruptibly(); + } catch (InterruptedException e) { + connectErrorCount.incrementAndGet(); + throw new SQLException("interrupt", e); + } + + boolean fillable = this.isFillable(toCount); + + lock.unlock(); + + if (!fillable) { + break; + } + + DruidConnectionHolder holder; + try { + Connection conn = createPhysicalConnection(); + holder = new DruidConnectionHolder(this, conn); + } catch (SQLException e) { + LOG.error("fill connection error, url: " + this.jdbcUrl, e); + connectErrorCount.incrementAndGet(); + throw e; + } + + try { + lock.lockInterruptibly(); + } catch (InterruptedException e) { + connectErrorCount.incrementAndGet(); + throw new SQLException("interrupt", e); + } + + try { + if (!this.isFillable(toCount)) { + JdbcUtils.close(holder.getConnection()); + break; + } + this.putLast(holder, System.currentTimeMillis()); + fillCount++; + } finally { + lock.unlock(); + } + } + + if (LOG.isInfoEnabled()) { + LOG.info("fill " + fillCount + " connections"); + } + + return fillCount; + } + + private boolean isFillable(int toCount) { + int currentCount = this.poolingCount + this.activeCount; + if (currentCount >= toCount || currentCount >= this.maxActive) { + return false; + } else { + return true; + } + } + + public boolean isFull() { + lock.lock(); + try { + return this.poolingCount + this.activeCount >= this.maxActive; + } finally { + lock.unlock(); + } + } + + private void emptySignal() { + if (createScheduler == null) { + empty.signal(); + return; + } + + if (createTaskCount >= maxCreateTaskCount) { + return; + } + + if (activeCount + poolingCount + createTaskCount >= maxActive) { + return; + } + + createTaskCount++; + CreateConnectionTask task = new CreateConnectionTask(); + createScheduler.submit(task); + } + + @Override + public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception { + //do nothing + //return original name to avoid NullPointerException + return name; + } + + @Override + public void postRegister(Boolean registrationDone) { + + } + + @Override + public void preDeregister() throws Exception { + + } + + @Override + public void postDeregister() { + + } + +} diff --git a/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSourceFactory.java b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSourceFactory.java new file mode 100644 index 00000000..ff1f6a72 --- /dev/null +++ b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSourceFactory.java @@ -0,0 +1,30 @@ +package com.alibaba.druid.pool; + +import javax.sql.DataSource; +import java.util.Map; +import java.util.Properties; + +/** + * Created by allwefantasy on 8/30/16. + */ +public class ElasticSearchDruidDataSourceFactory extends DruidDataSourceFactory { + + @Override + protected DataSource createDataSourceInternal(Properties properties) throws Exception { + DruidDataSource dataSource = new ElasticSearchDruidDataSource(); + config(dataSource, properties); + return dataSource; + } + + @SuppressWarnings("rawtypes") + public static DataSource createDataSource(Properties properties) throws Exception { + return createDataSource((Map) properties); + } + + @SuppressWarnings("rawtypes") + public static DataSource createDataSource(Map properties) throws Exception { + DruidDataSource dataSource = new ElasticSearchDruidDataSource(); + config(dataSource, properties); + return dataSource; + } +} diff --git a/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidPooledConnection.java b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidPooledConnection.java new file mode 100644 index 00000000..41ca2796 --- /dev/null +++ b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidPooledConnection.java @@ -0,0 +1,85 @@ +package com.alibaba.druid.pool; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Created by allwefantasy on 8/30/16. + */ +public class ElasticSearchDruidPooledConnection extends DruidPooledConnection { + public ElasticSearchDruidPooledConnection(DruidConnectionHolder holder) { + super(holder); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + checkState(); + + PreparedStatementHolder stmtHolder = null; + DruidPooledPreparedStatement.PreparedStatementKey key = new DruidPooledPreparedStatement.PreparedStatementKey(sql, getCatalog(), PreparedStatementPool.MethodType.M1); + + boolean poolPreparedStatements = holder.isPoolPreparedStatements(); + + if (poolPreparedStatements) { + stmtHolder = holder.getStatementPool().get(key); + } + + if (stmtHolder == null) { + try { + stmtHolder = new PreparedStatementHolder(key, conn.prepareStatement(sql)); + holder.getDataSource().incrementPreparedStatementCount(); + } catch (SQLException ex) { + handleException(ex); + } + } + + initStatement(stmtHolder); + + DruidPooledPreparedStatement rtnVal = new ElasticSearchDruidPooledPreparedStatement(this, stmtHolder); + + holder.addTrace(rtnVal); + + return rtnVal; + } + + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) + throws SQLException { + checkState(); + + PreparedStatementHolder stmtHolder = null; + DruidPooledPreparedStatement.PreparedStatementKey key = new DruidPooledPreparedStatement.PreparedStatementKey(sql, getCatalog(), PreparedStatementPool.MethodType.M2, resultSetType, + resultSetConcurrency); + + boolean poolPreparedStatements = holder.isPoolPreparedStatements(); + + if (poolPreparedStatements) { + stmtHolder = holder.getStatementPool().get(key); + } + + if (stmtHolder == null) { + try { + stmtHolder = new PreparedStatementHolder(key, conn.prepareStatement(sql, resultSetType, + resultSetConcurrency)); + holder.getDataSource().incrementPreparedStatementCount(); + } catch (SQLException ex) { + handleException(ex); + } + } + + initStatement(stmtHolder); + + DruidPooledPreparedStatement rtnVal = new ElasticSearchDruidPooledPreparedStatement(this, stmtHolder); + + holder.addTrace(rtnVal); + + return rtnVal; + } + + private void initStatement(PreparedStatementHolder stmtHolder) throws SQLException { + stmtHolder.incrementInUseCount(); + holder.getDataSource().initStatement(this, stmtHolder.getStatement()); + } + +} diff --git a/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidPooledPreparedStatement.java b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidPooledPreparedStatement.java new file mode 100644 index 00000000..cc08fd81 --- /dev/null +++ b/src/main/java/com/alibaba/druid/pool/ElasticSearchDruidPooledPreparedStatement.java @@ -0,0 +1,82 @@ +package com.alibaba.druid.pool; + +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.plugin.deletebyquery.DeleteByQueryPlugin; +import org.elasticsearch.plugin.nlpcn.QueryActionElasticExecutor; +import org.elasticsearch.plugin.nlpcn.executors.CSVResult; +import org.elasticsearch.plugin.nlpcn.executors.CSVResultsExtractor; +import org.elasticsearch.plugin.nlpcn.executors.CsvExtractorException; +import org.nlpcn.es4sql.SearchDao; +import org.nlpcn.es4sql.exception.SqlParseException; +import org.nlpcn.es4sql.jdbc.ObjectResult; +import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor; +import org.nlpcn.es4sql.query.QueryAction; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.List; + +/** + * Created by allwefantasy on 8/30/16. + */ +public class ElasticSearchDruidPooledPreparedStatement extends DruidPooledPreparedStatement { + + + Client client = null; + + public ElasticSearchDruidPooledPreparedStatement(DruidPooledConnection conn, PreparedStatementHolder holder) throws SQLException { + super(conn, holder); + this.client = ((ElasticSearchConnection) conn.getConnection()).getClient(); + } + + @Override + public ResultSet executeQuery() throws SQLException { + checkOpen(); + + incrementExecuteCount(); + transactionRecord(getSql()); + + oracleSetRowPrefetch(); + + conn.beforeExecute(); + try { + + + ObjectResult extractor = getObjectResult(true, getSql(), false, false); + List headers = extractor.getHeaders(); + List> lines = extractor.getLines(); + + ResultSet rs = new ElasticSearchResultSet(this, headers, lines); + + if (rs == null) { + return null; + } + + DruidPooledResultSet poolableResultSet = new DruidPooledResultSet(this, rs); + addResultSetTrace(poolableResultSet); + + return poolableResultSet; + } catch (Throwable t) { + throw checkException(t); + } finally { + conn.afterExecute(); + } + } + + private ObjectResult getObjectResult(boolean flat, String query, boolean includeScore, boolean includeType) throws SqlParseException, SQLFeatureNotSupportedException, Exception, CsvExtractorException { + SearchDao searchDao = new org.nlpcn.es4sql.SearchDao(client); + + //String rewriteSQL = searchDao.explain(getSql()).explain().explain(); + + QueryAction queryAction = searchDao.explain(query); + Object execution = QueryActionElasticExecutor.executeAnyAction(searchDao.getClient(), queryAction); + return new ObjectResultsExtractor(includeScore, includeType).extractResults(execution, flat, ","); + } + + @Override + public int executeUpdate() throws SQLException { + throw new SQLException("executeUpdate not support in ElasticSearch"); + } +} diff --git a/src/main/java/com/alibaba/druid/pool/ElasticSearchResultSet.java b/src/main/java/com/alibaba/druid/pool/ElasticSearchResultSet.java new file mode 100644 index 00000000..adfb2e3b --- /dev/null +++ b/src/main/java/com/alibaba/druid/pool/ElasticSearchResultSet.java @@ -0,0 +1,993 @@ +package com.alibaba.druid.pool; + +import com.alibaba.druid.util.jdbc.ResultSetMetaDataBase; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.*; +import java.util.Calendar; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Created by allwefantasy on 8/30/16. + */ +public class ElasticSearchResultSet implements ResultSet { + + Iterator> iterator; + List current = null; + List headers = null; + + + private ResultSetMetaData metaData; + + public ElasticSearchResultSet(Statement statement, final List headers, final List> lines) { + this.iterator = lines.iterator(); + this.headers = headers; + metaData = new ElasticSearchResultSetMetaDataBase(headers); + + } + + @Override + public boolean next() throws SQLException { + boolean hasNext = iterator.hasNext(); + if (hasNext) { + current = iterator.next(); + } + return hasNext; + } + + @Override + public void close() throws SQLException { + + } + + @Override + public boolean wasNull() throws SQLException { + return false; + } + + @Override + public String getString(int columnIndex) throws SQLException { + return (String) current.get(columnIndex); + } + + @Override + public boolean getBoolean(int columnIndex) throws SQLException { + return (Boolean) current.get(columnIndex); + } + + @Override + public byte getByte(int columnIndex) throws SQLException { + return 0; + } + + @Override + public short getShort(int columnIndex) throws SQLException { + return ((Short) current.get(columnIndex)); + } + + @Override + public int getInt(int columnIndex) throws SQLException { + return ((Integer) current.get(columnIndex)); + } + + @Override + public long getLong(int columnIndex) throws SQLException { + return (Long) current.get(columnIndex); + } + + @Override + public float getFloat(int columnIndex) throws SQLException { + return ((Float) current.get(columnIndex)).floatValue(); + } + + @Override + public double getDouble(int columnIndex) throws SQLException { + return (Double) current.get(columnIndex); + } + + @Override + public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException { + return null; + } + + @Override + public byte[] getBytes(int columnIndex) throws SQLException { + return new byte[0]; + } + + @Override + public Date getDate(int columnIndex) throws SQLException { + return (Date) current.get(columnIndex); + } + + @Override + public Time getTime(int columnIndex) throws SQLException { + return (Time) current.get(columnIndex); + } + + @Override + public Timestamp getTimestamp(int columnIndex) throws SQLException { + return (Timestamp) current.get(columnIndex); + } + + @Override + public InputStream getAsciiStream(int columnIndex) throws SQLException { + return null; + } + + @Override + public InputStream getUnicodeStream(int columnIndex) throws SQLException { + return null; + } + + @Override + public InputStream getBinaryStream(int columnIndex) throws SQLException { + return null; + } + + @Override + public String getString(String columnLabel) throws SQLException { + return (String) current.get(headers.indexOf(columnLabel)); + } + + @Override + public boolean getBoolean(String columnLabel) throws SQLException { + return (Boolean) current.get(headers.indexOf(columnLabel)); + } + + @Override + public byte getByte(String columnLabel) throws SQLException { + return 0; + } + + @Override + public short getShort(String columnLabel) throws SQLException { + return ((Short) current.get(headers.indexOf(columnLabel))); + } + + @Override + public int getInt(String columnLabel) throws SQLException { + return (Integer) current.get(headers.indexOf(columnLabel)); + } + + @Override + public long getLong(String columnLabel) throws SQLException { + return (Long) current.get(headers.indexOf(columnLabel)); + } + + @Override + public float getFloat(String columnLabel) throws SQLException { + return (Float) current.get(headers.indexOf(columnLabel)); + } + + @Override + public double getDouble(String columnLabel) throws SQLException { + return (Double) current.get(headers.indexOf(columnLabel)); + } + + @Override + public BigDecimal getBigDecimal(String columnLabel, int scale) throws SQLException { + return null; + } + + @Override + public byte[] getBytes(String columnLabel) throws SQLException { + return new byte[0]; + } + + @Override + public Date getDate(String columnLabel) throws SQLException { + return (Date) current.get(headers.indexOf(columnLabel)); + } + + @Override + public Time getTime(String columnLabel) throws SQLException { + return (Time) current.get(headers.indexOf(columnLabel)); + } + + @Override + public Timestamp getTimestamp(String columnLabel) throws SQLException { + return (Timestamp) current.get(headers.indexOf(columnLabel)); + } + + @Override + public InputStream getAsciiStream(String columnLabel) throws SQLException { + return null; + } + + @Override + public InputStream getUnicodeStream(String columnLabel) throws SQLException { + return null; + } + + @Override + public InputStream getBinaryStream(String columnLabel) throws SQLException { + return null; + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void clearWarnings() throws SQLException { + + } + + @Override + public String getCursorName() throws SQLException { + return null; + } + + @Override + public ResultSetMetaData getMetaData() throws SQLException { + return metaData; + } + + @Override + public Object getObject(int columnIndex) throws SQLException { + return current.get(columnIndex); + + } + + @Override + public Object getObject(String columnLabel) throws SQLException { + return current.get(headers.indexOf(columnLabel)); + } + + @Override + public int findColumn(String columnLabel) throws SQLException { + return ((ResultSetMetaDataBase) metaData).findColumn(columnLabel); + } + + @Override + public Reader getCharacterStream(int columnIndex) throws SQLException { + return null; + } + + @Override + public Reader getCharacterStream(String columnLabel) throws SQLException { + return null; + } + + @Override + public BigDecimal getBigDecimal(int columnIndex) throws SQLException { + return null; + } + + @Override + public BigDecimal getBigDecimal(String columnLabel) throws SQLException { + return null; + } + + @Override + public boolean isBeforeFirst() throws SQLException { + return false; + } + + @Override + public boolean isAfterLast() throws SQLException { + return false; + } + + @Override + public boolean isFirst() throws SQLException { + return false; + } + + @Override + public boolean isLast() throws SQLException { + return false; + } + + @Override + public void beforeFirst() throws SQLException { + + } + + @Override + public void afterLast() throws SQLException { + + } + + @Override + public boolean first() throws SQLException { + return false; + } + + @Override + public boolean last() throws SQLException { + return false; + } + + @Override + public int getRow() throws SQLException { + return 0; + } + + @Override + public boolean absolute(int row) throws SQLException { + return false; + } + + @Override + public boolean relative(int rows) throws SQLException { + return false; + } + + @Override + public boolean previous() throws SQLException { + return false; + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + + } + + @Override + public int getFetchDirection() throws SQLException { + return 0; + } + + @Override + public void setFetchSize(int rows) throws SQLException { + + } + + @Override + public int getFetchSize() throws SQLException { + return 0; + } + + @Override + public int getType() throws SQLException { + return 0; + } + + @Override + public int getConcurrency() throws SQLException { + return 0; + } + + @Override + public boolean rowUpdated() throws SQLException { + return false; + } + + @Override + public boolean rowInserted() throws SQLException { + return false; + } + + @Override + public boolean rowDeleted() throws SQLException { + return false; + } + + @Override + public void updateNull(int columnIndex) throws SQLException { + + } + + @Override + public void updateBoolean(int columnIndex, boolean x) throws SQLException { + + } + + @Override + public void updateByte(int columnIndex, byte x) throws SQLException { + + } + + @Override + public void updateShort(int columnIndex, short x) throws SQLException { + + } + + @Override + public void updateInt(int columnIndex, int x) throws SQLException { + + } + + @Override + public void updateLong(int columnIndex, long x) throws SQLException { + + } + + @Override + public void updateFloat(int columnIndex, float x) throws SQLException { + + } + + @Override + public void updateDouble(int columnIndex, double x) throws SQLException { + + } + + @Override + public void updateBigDecimal(int columnIndex, BigDecimal x) throws SQLException { + + } + + @Override + public void updateString(int columnIndex, String x) throws SQLException { + + } + + @Override + public void updateBytes(int columnIndex, byte[] x) throws SQLException { + + } + + @Override + public void updateDate(int columnIndex, Date x) throws SQLException { + + } + + @Override + public void updateTime(int columnIndex, Time x) throws SQLException { + + } + + @Override + public void updateTimestamp(int columnIndex, Timestamp x) throws SQLException { + + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x, int length) throws SQLException { + + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x, int length) throws SQLException { + + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x, int length) throws SQLException { + + } + + @Override + public void updateObject(int columnIndex, Object x, int scaleOrLength) throws SQLException { + + } + + @Override + public void updateObject(int columnIndex, Object x) throws SQLException { + throw new RuntimeException("update is not support yet"); + } + + @Override + public void updateNull(String columnLabel) throws SQLException { + + } + + @Override + public void updateBoolean(String columnLabel, boolean x) throws SQLException { + + } + + @Override + public void updateByte(String columnLabel, byte x) throws SQLException { + + } + + @Override + public void updateShort(String columnLabel, short x) throws SQLException { + + } + + @Override + public void updateInt(String columnLabel, int x) throws SQLException { + + } + + @Override + public void updateLong(String columnLabel, long x) throws SQLException { + + } + + @Override + public void updateFloat(String columnLabel, float x) throws SQLException { + + } + + @Override + public void updateDouble(String columnLabel, double x) throws SQLException { + + } + + @Override + public void updateBigDecimal(String columnLabel, BigDecimal x) throws SQLException { + + } + + @Override + public void updateString(String columnLabel, String x) throws SQLException { + + } + + @Override + public void updateBytes(String columnLabel, byte[] x) throws SQLException { + + } + + @Override + public void updateDate(String columnLabel, Date x) throws SQLException { + + } + + @Override + public void updateTime(String columnLabel, Time x) throws SQLException { + + } + + @Override + public void updateTimestamp(String columnLabel, Timestamp x) throws SQLException { + + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x, int length) throws SQLException { + + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x, int length) throws SQLException { + + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader, int length) throws SQLException { + + } + + @Override + public void updateObject(String columnLabel, Object x, int scaleOrLength) throws SQLException { + + } + + @Override + public void updateObject(String columnLabel, Object x) throws SQLException { + + } + + @Override + public void insertRow() throws SQLException { + + } + + @Override + public void updateRow() throws SQLException { + + } + + @Override + public void deleteRow() throws SQLException { + + } + + @Override + public void refreshRow() throws SQLException { + + } + + @Override + public void cancelRowUpdates() throws SQLException { + + } + + @Override + public void moveToInsertRow() throws SQLException { + + } + + @Override + public void moveToCurrentRow() throws SQLException { + + } + + @Override + public Statement getStatement() throws SQLException { + return null; + } + + @Override + public Object getObject(int columnIndex, Map> map) throws SQLException { + return null; + } + + @Override + public Ref getRef(int columnIndex) throws SQLException { + return null; + } + + @Override + public Blob getBlob(int columnIndex) throws SQLException { + return null; + } + + @Override + public Clob getClob(int columnIndex) throws SQLException { + return null; + } + + @Override + public Array getArray(int columnIndex) throws SQLException { + return null; + } + + @Override + public Object getObject(String columnLabel, Map> map) throws SQLException { + return null; + } + + @Override + public Ref getRef(String columnLabel) throws SQLException { + return null; + } + + @Override + public Blob getBlob(String columnLabel) throws SQLException { + return null; + } + + @Override + public Clob getClob(String columnLabel) throws SQLException { + return null; + } + + @Override + public Array getArray(String columnLabel) throws SQLException { + return null; + } + + @Override + public Date getDate(int columnIndex, Calendar cal) throws SQLException { + return null; + } + + @Override + public Date getDate(String columnLabel, Calendar cal) throws SQLException { + return null; + } + + @Override + public Time getTime(int columnIndex, Calendar cal) throws SQLException { + return null; + } + + @Override + public Time getTime(String columnLabel, Calendar cal) throws SQLException { + return null; + } + + @Override + public Timestamp getTimestamp(int columnIndex, Calendar cal) throws SQLException { + return null; + } + + @Override + public Timestamp getTimestamp(String columnLabel, Calendar cal) throws SQLException { + return null; + } + + @Override + public URL getURL(int columnIndex) throws SQLException { + return null; + } + + @Override + public URL getURL(String columnLabel) throws SQLException { + return null; + } + + @Override + public void updateRef(int columnIndex, Ref x) throws SQLException { + + } + + @Override + public void updateRef(String columnLabel, Ref x) throws SQLException { + + } + + @Override + public void updateBlob(int columnIndex, Blob x) throws SQLException { + + } + + @Override + public void updateBlob(String columnLabel, Blob x) throws SQLException { + + } + + @Override + public void updateClob(int columnIndex, Clob x) throws SQLException { + + } + + @Override + public void updateClob(String columnLabel, Clob x) throws SQLException { + + } + + @Override + public void updateArray(int columnIndex, Array x) throws SQLException { + + } + + @Override + public void updateArray(String columnLabel, Array x) throws SQLException { + + } + + @Override + public RowId getRowId(int columnIndex) throws SQLException { + return null; + } + + @Override + public RowId getRowId(String columnLabel) throws SQLException { + return null; + } + + @Override + public void updateRowId(int columnIndex, RowId x) throws SQLException { + + } + + @Override + public void updateRowId(String columnLabel, RowId x) throws SQLException { + + } + + @Override + public int getHoldability() throws SQLException { + return 0; + } + + @Override + public boolean isClosed() throws SQLException { + return false; + } + + @Override + public void updateNString(int columnIndex, String nString) throws SQLException { + + } + + @Override + public void updateNString(String columnLabel, String nString) throws SQLException { + + } + + @Override + public void updateNClob(int columnIndex, NClob nClob) throws SQLException { + + } + + @Override + public void updateNClob(String columnLabel, NClob nClob) throws SQLException { + + } + + @Override + public NClob getNClob(int columnIndex) throws SQLException { + return null; + } + + @Override + public NClob getNClob(String columnLabel) throws SQLException { + return null; + } + + @Override + public SQLXML getSQLXML(int columnIndex) throws SQLException { + return null; + } + + @Override + public SQLXML getSQLXML(String columnLabel) throws SQLException { + return null; + } + + @Override + public void updateSQLXML(int columnIndex, SQLXML xmlObject) throws SQLException { + + } + + @Override + public void updateSQLXML(String columnLabel, SQLXML xmlObject) throws SQLException { + + } + + @Override + public String getNString(int columnIndex) throws SQLException { + return null; + } + + @Override + public String getNString(String columnLabel) throws SQLException { + return null; + } + + @Override + public Reader getNCharacterStream(int columnIndex) throws SQLException { + return null; + } + + @Override + public Reader getNCharacterStream(String columnLabel) throws SQLException { + return null; + } + + @Override + public void updateNCharacterStream(int columnIndex, Reader x, long length) throws SQLException { + + } + + @Override + public void updateNCharacterStream(String columnLabel, Reader reader, long length) throws SQLException { + + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x, long length) throws SQLException { + + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x, long length) throws SQLException { + + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x, long length) throws SQLException { + + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x, long length) throws SQLException { + + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x, long length) throws SQLException { + + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader, long length) throws SQLException { + + } + + @Override + public void updateBlob(int columnIndex, InputStream inputStream, long length) throws SQLException { + + } + + @Override + public void updateBlob(String columnLabel, InputStream inputStream, long length) throws SQLException { + + } + + @Override + public void updateClob(int columnIndex, Reader reader, long length) throws SQLException { + + } + + @Override + public void updateClob(String columnLabel, Reader reader, long length) throws SQLException { + + } + + @Override + public void updateNClob(int columnIndex, Reader reader, long length) throws SQLException { + + } + + @Override + public void updateNClob(String columnLabel, Reader reader, long length) throws SQLException { + + } + + @Override + public void updateNCharacterStream(int columnIndex, Reader x) throws SQLException { + + } + + @Override + public void updateNCharacterStream(String columnLabel, Reader reader) throws SQLException { + + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x) throws SQLException { + + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x) throws SQLException { + + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x) throws SQLException { + + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x) throws SQLException { + + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x) throws SQLException { + + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader) throws SQLException { + + } + + @Override + public void updateBlob(int columnIndex, InputStream inputStream) throws SQLException { + + } + + @Override + public void updateBlob(String columnLabel, InputStream inputStream) throws SQLException { + + } + + @Override + public void updateClob(int columnIndex, Reader reader) throws SQLException { + + } + + @Override + public void updateClob(String columnLabel, Reader reader) throws SQLException { + + } + + @Override + public void updateNClob(int columnIndex, Reader reader) throws SQLException { + + } + + @Override + public void updateNClob(String columnLabel, Reader reader) throws SQLException { + + } + + @Override + public T getObject(int columnIndex, Class type) throws SQLException { + return null; + } + + @Override + public T getObject(String columnLabel, Class type) throws SQLException { + return null; + } + + @Override + public T unwrap(Class iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return false; + } +} diff --git a/src/main/java/com/alibaba/druid/pool/ElasticSearchResultSetMetaDataBase.java b/src/main/java/com/alibaba/druid/pool/ElasticSearchResultSetMetaDataBase.java new file mode 100644 index 00000000..1b8f4b77 --- /dev/null +++ b/src/main/java/com/alibaba/druid/pool/ElasticSearchResultSetMetaDataBase.java @@ -0,0 +1,23 @@ +package com.alibaba.druid.pool; + +import com.alibaba.druid.util.jdbc.ResultSetBase; +import com.alibaba.druid.util.jdbc.ResultSetMetaDataBase; + +import java.util.ArrayList; +import java.util.List; + +/** + * Created by allwefantasy on 8/31/16. + */ +public class ElasticSearchResultSetMetaDataBase extends ResultSetMetaDataBase { + private final List columns = new ArrayList(); + + public ElasticSearchResultSetMetaDataBase(List headers) { + for(String column:headers){ + ColumnMetaData columnMetaData = new ColumnMetaData(); + columnMetaData.setColumnLabel(column); + columnMetaData.setColumnName(column); + columns.add(columnMetaData); + } + } +} diff --git a/src/main/java/org/nlpcn/es4sql/jdbc/ObjectResult.java b/src/main/java/org/nlpcn/es4sql/jdbc/ObjectResult.java new file mode 100644 index 00000000..d6612629 --- /dev/null +++ b/src/main/java/org/nlpcn/es4sql/jdbc/ObjectResult.java @@ -0,0 +1,24 @@ +package org.nlpcn.es4sql.jdbc; + +import java.util.List; + +/** + * Created by allwefantasy on 8/30/16. + */ +public class ObjectResult { + private final List headers; + private final List> lines; + + public ObjectResult(List headers, List> lines) { + this.headers = headers; + this.lines = lines; + } + + public List getHeaders() { + return headers; + } + + public List> getLines() { + return lines; + } +} diff --git a/src/main/java/org/nlpcn/es4sql/jdbc/ObjectResultsExtractException.java b/src/main/java/org/nlpcn/es4sql/jdbc/ObjectResultsExtractException.java new file mode 100644 index 00000000..81912e54 --- /dev/null +++ b/src/main/java/org/nlpcn/es4sql/jdbc/ObjectResultsExtractException.java @@ -0,0 +1,11 @@ +package org.nlpcn.es4sql.jdbc; + +/** + * Created by allwefantasy on 8/30/16. + */ +public class ObjectResultsExtractException extends Exception { + public ObjectResultsExtractException(String message) { + super(message); + } +} + diff --git a/src/main/java/org/nlpcn/es4sql/jdbc/ObjectResultsExtractor.java b/src/main/java/org/nlpcn/es4sql/jdbc/ObjectResultsExtractor.java new file mode 100644 index 00000000..d4e37160 --- /dev/null +++ b/src/main/java/org/nlpcn/es4sql/jdbc/ObjectResultsExtractor.java @@ -0,0 +1,308 @@ +package org.nlpcn.es4sql.jdbc; + +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHitField; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; +import org.elasticsearch.search.aggregations.metrics.geobounds.GeoBounds; +import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles; +import org.elasticsearch.search.aggregations.metrics.stats.Stats; +import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStats; +import org.elasticsearch.search.aggregations.metrics.tophits.TopHits; +import org.nlpcn.es4sql.Util; + +import java.util.*; + +/** + * Created by allwefantasy on 8/30/16. + */ +public class ObjectResultsExtractor { + private final boolean includeType; + private final boolean includeScore; + private int currentLineIndex; + + public ObjectResultsExtractor(boolean includeScore, boolean includeType) { + this.includeScore = includeScore; + this.includeType = includeType; + this.currentLineIndex = 0; + } + + public ObjectResult extractResults(Object queryResult, boolean flat, String separator) throws ObjectResultsExtractException { + if (queryResult instanceof SearchHits) { + SearchHit[] hits = ((SearchHits) queryResult).getHits(); + List> docsAsMap = new ArrayList<>(); + List headers = createHeadersAndFillDocsMap(flat, hits, docsAsMap); + List> lines = createLinesFromDocs(flat, docsAsMap, headers); + return new ObjectResult(headers, lines); + } + if (queryResult instanceof Aggregations) { + List headers = new ArrayList<>(); + List> lines = new ArrayList<>(); + lines.add(new ArrayList()); + handleAggregations((Aggregations) queryResult, headers, lines); + + //todo: need to handle more options for aggregations: + //Aggregations that inhrit from base + //ScriptedMetric + + return new ObjectResult(headers, lines); + + } + return null; + } + + private void handleAggregations(Aggregations aggregations, List headers, List> lines) throws ObjectResultsExtractException { + if (allNumericAggregations(aggregations)) { + lines.get(this.currentLineIndex).addAll(fillHeaderAndCreateLineForNumericAggregations(aggregations, headers)); + return; + } + //aggregations with size one only supported when not metrics. + List aggregationList = aggregations.asList(); + if (aggregationList.size() > 1) { + throw new ObjectResultsExtractException("currently support only one aggregation at same level (Except for numeric metrics)"); + } + Aggregation aggregation = aggregationList.get(0); + //we want to skip singleBucketAggregations (nested,reverse_nested,filters) + if (aggregation instanceof SingleBucketAggregation) { + Aggregations singleBucketAggs = ((SingleBucketAggregation) aggregation).getAggregations(); + handleAggregations(singleBucketAggs, headers, lines); + return; + } + if (aggregation instanceof NumericMetricsAggregation) { + handleNumericMetricAggregation(headers, lines.get(currentLineIndex), aggregation); + return; + } + if (aggregation instanceof GeoBounds) { + handleGeoBoundsAggregation(headers, lines, (GeoBounds) aggregation); + return; + } + if (aggregation instanceof TopHits) { + //todo: handle this . it returns hits... maby back to normal? + //todo: read about this usages + // TopHits topHitsAggregation = (TopHits) aggregation; + } + if (aggregation instanceof MultiBucketsAggregation) { + MultiBucketsAggregation bucketsAggregation = (MultiBucketsAggregation) aggregation; + String name = bucketsAggregation.getName(); + //checking because it can comes from sub aggregation again + if (!headers.contains(name)) { + headers.add(name); + } + Collection buckets = bucketsAggregation.getBuckets(); + + //clone current line. + List currentLine = lines.get(this.currentLineIndex); + List clonedLine = new ArrayList<>(currentLine); + + //call handle_Agg with current_line++ + boolean firstLine = true; + for (MultiBucketsAggregation.Bucket bucket : buckets) { + //each bucket need to add new line with current line copied => except for first line + String key = bucket.getKeyAsString(); + if (firstLine) { + firstLine = false; + } else { + currentLineIndex++; + currentLine = new ArrayList(clonedLine); + lines.add(currentLine); + } + currentLine.add(key); + handleAggregations(bucket.getAggregations(), headers, lines); + + } + } + + } + + private void handleGeoBoundsAggregation(List headers, List> lines, GeoBounds geoBoundsAggregation) { + String geoBoundAggName = geoBoundsAggregation.getName(); + headers.add(geoBoundAggName + ".topLeft.lon"); + headers.add(geoBoundAggName + ".topLeft.lat"); + headers.add(geoBoundAggName + ".bottomRight.lon"); + headers.add(geoBoundAggName + ".bottomRight.lat"); + List line = lines.get(this.currentLineIndex); + line.add(String.valueOf(geoBoundsAggregation.topLeft().getLon())); + line.add(String.valueOf(geoBoundsAggregation.topLeft().getLat())); + line.add(String.valueOf(geoBoundsAggregation.bottomRight().getLon())); + line.add(String.valueOf(geoBoundsAggregation.bottomRight().getLat())); + lines.add(line); + } + + private List fillHeaderAndCreateLineForNumericAggregations(Aggregations aggregations, List header) throws ObjectResultsExtractException { + List line = new ArrayList<>(); + List aggregationList = aggregations.asList(); + for (Aggregation aggregation : aggregationList) { + handleNumericMetricAggregation(header, line, aggregation); + } + return line; + } + + private void handleNumericMetricAggregation(List header, List line, Aggregation aggregation) throws ObjectResultsExtractException { + String name = aggregation.getName(); + + if (aggregation instanceof NumericMetricsAggregation.SingleValue) { + if (!header.contains(name)) { + header.add(name); + } + line.add(((NumericMetricsAggregation.SingleValue) aggregation).value()); + } + //todo:Numeric MultiValue - Stats,ExtendedStats,Percentile... + else if (aggregation instanceof NumericMetricsAggregation.MultiValue) { + if (aggregation instanceof Stats) { + String[] statsHeaders = new String[]{"count", "sum", "avg", "min", "max"}; + boolean isExtendedStats = aggregation instanceof ExtendedStats; + if (isExtendedStats) { + String[] extendedHeaders = new String[]{"sumOfSquares", "variance", "stdDeviation"}; + statsHeaders = Util.concatStringsArrays(statsHeaders, extendedHeaders); + } + mergeHeadersWithPrefix(header, name, statsHeaders); + Stats stats = (Stats) aggregation; + line.add(stats.getCount()); + line.add(stats.getSum()); + line.add(stats.getAvg()); + line.add(stats.getMin()); + line.add(stats.getMax()); + if (isExtendedStats) { + ExtendedStats extendedStats = (ExtendedStats) aggregation; + line.add(extendedStats.getSumOfSquares()); + line.add(extendedStats.getVariance()); + line.add(extendedStats.getStdDeviation()); + } + } else if (aggregation instanceof Percentiles) { + String[] percentileHeaders = new String[]{"1.0", "5.0", "25.0", "50.0", "75.0", "95.0", "99.0"}; + mergeHeadersWithPrefix(header, name, percentileHeaders); + Percentiles percentiles = (Percentiles) aggregation; + line.add(percentiles.percentile(1.0)); + line.add(percentiles.percentile(5.0)); + line.add(percentiles.percentile(25.0)); + line.add(percentiles.percentile(50.0)); + line.add(percentiles.percentile(75)); + line.add(percentiles.percentile(95.0)); + line.add(percentiles.percentile(99.0)); + } else { + throw new ObjectResultsExtractException("unknown NumericMetricsAggregation.MultiValue:" + aggregation.getClass()); + } + + } else { + throw new ObjectResultsExtractException("unknown NumericMetricsAggregation" + aggregation.getClass()); + } + } + + private void mergeHeadersWithPrefix(List header, String prefix, String[] newHeaders) { + for (int i = 0; i < newHeaders.length; i++) { + String newHeader = newHeaders[i]; + if (prefix != null && !prefix.equals("")) { + newHeader = prefix + "." + newHeader; + } + if (!header.contains(newHeader)) { + header.add(newHeader); + } + } + } + + private boolean allNumericAggregations(Aggregations aggregations) { + List aggregationList = aggregations.asList(); + for (Aggregation aggregation : aggregationList) { + if (!(aggregation instanceof NumericMetricsAggregation)) { + return false; + } + } + return true; + } + + private Aggregation skipAggregations(Aggregation firstAggregation) { + while (firstAggregation instanceof SingleBucketAggregation) { + firstAggregation = getFirstAggregation(((SingleBucketAggregation) firstAggregation).getAggregations()); + } + return firstAggregation; + } + + private Aggregation getFirstAggregation(Aggregations aggregations) { + return aggregations.asList().get(0); + } + + private List> createLinesFromDocs(boolean flat, List> docsAsMap, List headers) { + List> objectLines = new ArrayList<>(); + for (Map doc : docsAsMap) { + List lines = new ArrayList<>(); + for (String header : headers) { + lines.add(findFieldValue(header, doc, flat)); + } + objectLines.add(lines); + } + return objectLines; + } + + private List createHeadersAndFillDocsMap(boolean flat, SearchHit[] hits, List> docsAsMap) { + Set csvHeaders = new HashSet<>(); + for (SearchHit hit : hits) { + Map doc = hit.sourceAsMap(); + Map fields = hit.getFields(); + for (SearchHitField searchHitField : fields.values()) { + doc.put(searchHitField.getName(), searchHitField.value()); + } + mergeHeaders(csvHeaders, doc, flat); + if (this.includeScore) { + doc.put("_score", hit.score()); + } + if (this.includeType) { + doc.put("_type", hit.type()); + } + docsAsMap.add(doc); + } + ArrayList headersList = new ArrayList<>(csvHeaders); + if (this.includeScore) { + headersList.add("_score"); + } + if (this.includeType) { + headersList.add("_type"); + } + return headersList; + } + + private Object findFieldValue(String header, Map doc, boolean flat) { + if (flat && header.contains(".")) { + String[] split = header.split("\\."); + Object innerDoc = doc; + for (String innerField : split) { + if (!(innerDoc instanceof Map)) { + return null; + } + innerDoc = ((Map) innerDoc).get(innerField); + if (innerDoc == null) { + return null; + } + + } + return innerDoc; + } else { + if (doc.containsKey(header)) { + return doc.get(header); + } + } + return null; + } + + private void mergeHeaders(Set headers, Map doc, boolean flat) { + if (!flat) { + headers.addAll(doc.keySet()); + return; + } + mergeFieldNamesRecursive(headers, doc, ""); + } + + private void mergeFieldNamesRecursive(Set headers, Map doc, String prefix) { + for (Map.Entry field : doc.entrySet()) { + Object value = field.getValue(); + if (value instanceof Map) { + mergeFieldNamesRecursive(headers, (Map) value, prefix + field.getKey() + "."); + } else { + headers.add(prefix + field.getKey()); + } + } + } +} diff --git a/src/test/java/org/nlpcn/es4sql/JDBCTests.java b/src/test/java/org/nlpcn/es4sql/JDBCTests.java new file mode 100644 index 00000000..d2112ad0 --- /dev/null +++ b/src/test/java/org/nlpcn/es4sql/JDBCTests.java @@ -0,0 +1,43 @@ +package org.nlpcn.es4sql; + + +import com.alibaba.druid.pool.DruidDataSource; + +import com.alibaba.druid.pool.ElasticSearchDruidDataSourceFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Created by allwefantasy on 8/26/16. + */ +public class JDBCTests { + @Test + public void testJDBC() throws Exception { + Properties properties = new Properties(); + properties.put("url", "jdbc:elasticsearch://127.0.0.1:9300/" + TestsConstants.TEST_INDEX); + DruidDataSource dds = (DruidDataSource) ElasticSearchDruidDataSourceFactory.createDataSource(properties); + Connection connection = dds.getConnection(); + PreparedStatement ps = connection.prepareStatement("SELECT gender,lastname,age from " + TestsConstants.TEST_INDEX + " where lastname='Heath'"); + ResultSet resultSet = ps.executeQuery(); + List result = new ArrayList(); + while (resultSet.next()) { + result.add(resultSet.getString("lastname") + "," + resultSet.getInt("age") + "," + resultSet.getString("gender")); + } + + ps.close(); + connection.close(); + dds.close(); + + Assert.assertTrue(result.size()==2); + Assert.assertTrue(result.get(0).equals("Heath,39,F")); + Assert.assertTrue(result.get(1).equals("Heath,39,F")); + } + +} + + diff --git a/src/test/java/org/nlpcn/es4sql/MainTestSuite.java b/src/test/java/org/nlpcn/es4sql/MainTestSuite.java index e799e668..717d740b 100644 --- a/src/test/java/org/nlpcn/es4sql/MainTestSuite.java +++ b/src/test/java/org/nlpcn/es4sql/MainTestSuite.java @@ -38,7 +38,8 @@ ShowTest.class, CSVResultsExtractorTests.class, SourceFieldTest.class, - SQLFunctionsTest.class + SQLFunctionsTest.class, + JDBCTests.class }) public class MainTestSuite { diff --git a/src/test/java/org/nlpcn/es4sql/SQLFunctionsTest.java b/src/test/java/org/nlpcn/es4sql/SQLFunctionsTest.java index 4501223b..1661d989 100644 --- a/src/test/java/org/nlpcn/es4sql/SQLFunctionsTest.java +++ b/src/test/java/org/nlpcn/es4sql/SQLFunctionsTest.java @@ -129,7 +129,7 @@ public void concat_ws_field_and_string() throws Exception { @Test public void test() throws Exception { - String query = "select case when ty=0 then '云点播CV' when ty=1 then '云直播CV' end as tyname,sum(cv) as cv from cloud_play_stat_cust where day='20160906' group by tyname"; + String query = "SELECT gender,lastname,age from " + TestsConstants.TEST_INDEX + " where lastname='Heath'"; SearchDao searchDao = MainTestSuite.getSearchDao() != null ? MainTestSuite.getSearchDao() : getSearchDao(); System.out.println(searchDao.explain(query).explain().explain());