Skip to content

Commit

Permalink
[Fix][Connector] Fix presto connector execute error & target table co…
Browse files Browse the repository at this point in the history
…unt error (#299)
  • Loading branch information
zixi0825 authored Dec 3, 2023
1 parent d3528f8 commit 1b65967
Show file tree
Hide file tree
Showing 51 changed files with 353 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public String getDatabase() {
return jdbcConnectionInfo.getDatabase();
}

public String getSchema() {
return jdbcConnectionInfo.getSchema();
}

public String getProperties() {
return jdbcConnectionInfo.getProperties();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,15 @@ public class JdbcConnectionInfo {
private String catalog;

/**
* database(schema) name
* database name
*/
private String database;

/**
* schema name
*/
private String schema;

/**
* properties
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public DataSource getDataSource(BaseJdbcDataSourceInfo baseJdbcDataSourceInfo) t
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(baseJdbcDataSourceInfo.getJdbcUrl());
druidDataSource.setUsername(baseJdbcDataSourceInfo.getUser());
druidDataSource.setPassword(StringUtils.isEmpty(baseJdbcDataSourceInfo.getPassword()) ? null : baseJdbcDataSourceInfo.getPassword());
druidDataSource.setPassword(StringUtils.isEmptyOrNullStr(baseJdbcDataSourceInfo.getPassword()) ? null : baseJdbcDataSourceInfo.getPassword());
druidDataSource.setDriverClassName(baseJdbcDataSourceInfo.getDriverClass());

druidDataSource.setMaxActive(10);
Expand Down Expand Up @@ -93,7 +93,7 @@ public DataSource getDataSource(Map<String,Object> configMap) throws SQLExceptio
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(url);
druidDataSource.setUsername(username);
druidDataSource.setPassword(StringUtils.isEmpty(password) ? null : password);
druidDataSource.setPassword(StringUtils.isEmptyOrNullStr(password) ? null : password);
druidDataSource.setDriverClassName(driver);

druidDataSource.setMaxActive(10);
Expand Down Expand Up @@ -136,7 +136,7 @@ public DataSource getDataSource(Properties properties) throws SQLException {
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(url);
druidDataSource.setUsername(username);
druidDataSource.setPassword(StringUtils.isEmpty(password) ? null : password);
druidDataSource.setPassword(StringUtils.isEmptyOrNullStr(password) ? null : password);
druidDataSource.setDriverClassName(driver);

druidDataSource.setMaxActive(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package io.datavines.common.utils;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import io.datavines.common.config.Config;
import io.datavines.common.datasource.jdbc.JdbcDataSourceManager;
import io.datavines.common.exception.DataVinesException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ public interface ConnectorFactory {
TypeConverter getTypeConverter();

ConfigBuilder getConfigBuilder();

DataSourceClient getDataSourceClient();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.datavines.connector.api;

import io.datavines.common.datasource.jdbc.BaseJdbcDataSourceInfo;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;

public interface DataSourceClient {

DataSource getDataSource(BaseJdbcDataSourceInfo baseJdbcDataSourceInfo) throws SQLException;

DataSource getDataSource(Map<String,Object> configMap) throws SQLException;

DataSource getDataSource(Properties properties) throws SQLException;

Connection getConnection(BaseJdbcDataSourceInfo baseJdbcDataSourceInfo) throws SQLException;

Connection getConnection(Map<String,Object> configMap) throws SQLException;

Connection getConnection(Properties properties) throws SQLException;

JdbcTemplate getJdbcTemplate(BaseJdbcDataSourceInfo baseJdbcDataSourceInfo) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.datavines.connector.api;

import io.datavines.common.enums.DataType;
import io.datavines.common.utils.StringUtils;
import io.datavines.connector.api.entity.StructField;
import org.apache.commons.collections4.CollectionUtils;

Expand All @@ -39,6 +40,20 @@ default Map<String,String> getDialectKeyMap() {

List<String> getExcludeDatabases();

default String getFullQualifiedTableName(String database, String schema, String table) {
table = quoteIdentifier(table);

if (!StringUtils.isEmptyOrNullStr(schema)) {
table = quoteIdentifier(schema) + "." + table;
}

if (!StringUtils.isEmptyOrNullStr(database)) {
table = quoteIdentifier(database) + "." + table;
}

return table;
}

default boolean invalidateItemCanOutput(){
return true;
}
Expand All @@ -59,8 +74,8 @@ default DataType getDataType(String jdbcType) {
return DataType.valueOf(jdbcType);
}

default String quoteIdentifier(String column) {
return "`" + column + "`";
default String quoteIdentifier(String entity) {
return "`" + entity + "`";
}

default String getTableExistsQuery(String table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ public Connector getConnector() {

@Override
public Executor getExecutor() {
return new ClickHouseExecutor();
return new ClickHouseExecutor(new JdbcDataSourceClient());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

public class ClickHouseExecutor extends BaseJdbcExecutor {

public ClickHouseExecutor(JdbcDataSourceClient jdbcDataSourceClient) {
super(jdbcDataSourceClient);
}

@Override
public BaseJdbcDataSourceInfo getDatasourceInfo(JdbcConnectionInfo jdbcConnectionInfo) {
return new ClickHouseDataSourceInfo(jdbcConnectionInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public Dialect getDialect() {

@Override
public Executor getExecutor() {
return new DatabendExecutor();
return new DatabendExecutor(new JdbcDataSourceClient());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import io.datavines.common.datasource.jdbc.JdbcConnectionInfo;

public class DatabendExecutor extends BaseJdbcExecutor {

public DatabendExecutor(JdbcDataSourceClient jdbcDataSourceClient) {
super(jdbcDataSourceClient);
}
@Override
public BaseJdbcDataSourceInfo getDatasourceInfo(JdbcConnectionInfo jdbcConnectionInfo) {
return new DatabendDataSourceInfo(jdbcConnectionInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public ConnectorParameterConverter getConnectorParameterConverter() {

@Override
public Executor getExecutor() {
return new DmExecutor();
return new DmExecutor(new JdbcDataSourceClient());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

public class DmExecutor extends BaseJdbcExecutor{

public DmExecutor(JdbcDataSourceClient jdbcDataSourceClient) {
super(jdbcDataSourceClient);
}

@Override
public BaseJdbcDataSourceInfo getDatasourceInfo(JdbcConnectionInfo jdbcConnectionInfo) {
return new DmDataSourceInfo(jdbcConnectionInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public Connector getConnector() {

@Override
public Executor getExecutor() {
return new DorisExecutor();
return new DorisExecutor(new JdbcDataSourceClient());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

public class DorisExecutor extends MysqlExecutor {

public DorisExecutor(JdbcDataSourceClient jdbcDataSourceClient) {
super(jdbcDataSourceClient);
}

@Override
public BaseJdbcDataSourceInfo getDatasourceInfo(JdbcConnectionInfo jdbcConnectionInfo) {
return new DorisDataSourceInfo(jdbcConnectionInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ public TypeConverter getTypeConverter() {
public ConfigBuilder getConfigBuilder() {
return null;
}

@Override
public DataSourceClient getDataSourceClient() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public Connector getConnector() {

@Override
public Executor getExecutor() {
return new HiveExecutor();
return new HiveExecutor(new JdbcDataSourceClient());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

public class HiveExecutor extends BaseJdbcExecutor {

public HiveExecutor(JdbcDataSourceClient jdbcDataSourceClient) {
super(jdbcDataSourceClient);
}

@Override
public BaseJdbcDataSourceInfo getDatasourceInfo(JdbcConnectionInfo jdbcConnectionInfo) {
return new HiveDataSourceInfo(jdbcConnectionInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ public Connector getConnector() {

@Override
public Executor getExecutor() {
return new ImpalaExecutor();
return new ImpalaExecutor(new JdbcDataSourceClient());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

public class ImpalaExecutor extends BaseJdbcExecutor {

public ImpalaExecutor(JdbcDataSourceClient jdbcDataSourceClient) {
super(jdbcDataSourceClient);
}

@Override
public BaseJdbcDataSourceInfo getDatasourceInfo(JdbcConnectionInfo jdbcConnectionInfo) {
return new ImpalaDataSourceInfo(jdbcConnectionInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ public TypeConverter getTypeConverter() {
public ConfigBuilder getConfigBuilder() {
return new JdbcConfigBuilder();
}

@Override
public DataSourceClient getDataSourceClient() {
return new JdbcDataSourceClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@

public abstract class BaseJdbcExecutor implements Executor, IJdbcDataSourceInfo {

private final JdbcExecutorClientManager jdbcExecutorClientManager = JdbcExecutorClientManager.getInstance();
private final JdbcDataSourceClient jdbcDataSourceClient;

public BaseJdbcExecutor(JdbcDataSourceClient jdbcDataSourceClient) {
this.jdbcDataSourceClient = jdbcDataSourceClient;
}

protected ListWithQueryColumn query(JdbcTemplate jdbcTemplate, String sql, int limit) {
return SqlUtils.query(jdbcTemplate, sql, limit);
Expand All @@ -43,12 +47,8 @@ public ConnectorResponse queryForPage(ExecuteRequestParam param) throws SQLExcep

JdbcConnectionInfo jdbcConnectionInfo = JSONUtils.parseObject(dataSourceParam, JdbcConnectionInfo.class);

JdbcExecutorClient executorClient = jdbcExecutorClientManager
.getExecutorClient(
JdbcDataSourceInfoManager.getDatasourceInfo(dataSourceParam,
getDatasourceInfo(jdbcConnectionInfo)));

JdbcTemplate jdbcTemplate = executorClient.getJdbcTemplate();
JdbcTemplate jdbcTemplate = jdbcDataSourceClient.getJdbcTemplate(
JdbcDataSourceInfoManager.getDatasourceInfo(dataSourceParam, getDatasourceInfo(jdbcConnectionInfo)));

String sql = param.getScript();
if (StringUtils.isEmpty(sql)) {
Expand All @@ -68,12 +68,8 @@ public ConnectorResponse queryForOne(ExecuteRequestParam param) throws SQLExcept
String dataSourceParam = param.getDataSourceParam();

JdbcConnectionInfo jdbcConnectionInfo = JSONUtils.parseObject(dataSourceParam, JdbcConnectionInfo.class);
JdbcExecutorClient executorClient = jdbcExecutorClientManager
.getExecutorClient(
JdbcDataSourceInfoManager.getDatasourceInfo(dataSourceParam,
getDatasourceInfo(jdbcConnectionInfo)));

JdbcTemplate jdbcTemplate = executorClient.getJdbcTemplate();
JdbcTemplate jdbcTemplate = jdbcDataSourceClient.getJdbcTemplate(
JdbcDataSourceInfoManager.getDatasourceInfo(dataSourceParam, getDatasourceInfo(jdbcConnectionInfo)));

String sql = param.getScript() + " limit 1";
if (StringUtils.isEmpty(sql)) {
Expand All @@ -93,11 +89,8 @@ public ConnectorResponse queryForList(ExecuteRequestParam param) throws Exceptio
String dataSourceParam = param.getDataSourceParam();
JdbcConnectionInfo jdbcConnectionInfo = JSONUtils.parseObject(dataSourceParam, JdbcConnectionInfo.class);

JdbcExecutorClient executorClient = jdbcExecutorClientManager
.getExecutorClient(
JdbcDataSourceInfoManager.getDatasourceInfo(dataSourceParam,
getDatasourceInfo(jdbcConnectionInfo)));
JdbcTemplate jdbcTemplate = executorClient.getJdbcTemplate();
JdbcTemplate jdbcTemplate = jdbcDataSourceClient.getJdbcTemplate(
JdbcDataSourceInfoManager.getDatasourceInfo(dataSourceParam, getDatasourceInfo(jdbcConnectionInfo)));

String sql = param.getScript();
if (StringUtils.isEmpty(sql)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.datavines.connector.plugin;

import io.datavines.common.datasource.jdbc.BaseJdbcDataSourceInfo;
import io.datavines.common.datasource.jdbc.JdbcDataSourceManager;
import io.datavines.connector.api.DataSourceClient;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;

public class JdbcDataSourceClient implements DataSourceClient {

@Override
public DataSource getDataSource(BaseJdbcDataSourceInfo baseJdbcDataSourceInfo) throws SQLException {
return JdbcDataSourceManager.getInstance().getDataSource(baseJdbcDataSourceInfo);
}

@Override
public DataSource getDataSource(Map<String, Object> configMap) throws SQLException {
return JdbcDataSourceManager.getInstance().getDataSource(configMap);
}

@Override
public DataSource getDataSource(Properties properties) throws SQLException {
return JdbcDataSourceManager.getInstance().getDataSource(properties);
}

@Override
public Connection getConnection(BaseJdbcDataSourceInfo baseJdbcDataSourceInfo) throws SQLException {
return JdbcDataSourceManager.getInstance().getDataSource(baseJdbcDataSourceInfo).getConnection();
}

@Override
public Connection getConnection(Map<String, Object> configMap) throws SQLException {
return JdbcDataSourceManager.getInstance().getDataSource(configMap).getConnection();
}

@Override
public Connection getConnection(Properties properties) throws SQLException {
return JdbcDataSourceManager.getInstance().getDataSource(properties).getConnection();
}

@Override
public JdbcTemplate getJdbcTemplate(BaseJdbcDataSourceInfo baseJdbcDataSourceInfo) throws SQLException {
JdbcTemplate jdbcTemplate = new JdbcTemplate(JdbcDataSourceManager.getInstance().getDataSource(baseJdbcDataSourceInfo));
jdbcTemplate.setFetchSize(500);
return jdbcTemplate;
}
}
Loading

0 comments on commit 1b65967

Please sign in to comment.