Skip to content

Commit

Permalink
[Fix][Engine] Fix compatibility issues with SqlServer and PostgreSQL …
Browse files Browse the repository at this point in the history
…connector (#305)
  • Loading branch information
zixi0825 authored Dec 5, 2023
1 parent c44ec55 commit 6163e44
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,28 @@ default Map<String,String> getDialectKeyMap() {

List<String> getExcludeDatabases();

default String getFullQualifiedTableName(String database, String schema, String table) {
table = quoteIdentifier(table);
default String getFullQualifiedTableName(String database, String schema, String table, boolean needQuote) {

if (!StringUtils.isEmptyOrNullStr(schema)) {
table = quoteIdentifier(schema) + "." + table;
}
if (needQuote) {
table = quoteIdentifier(table);

if (!StringUtils.isEmptyOrNullStr(database)) {
table = quoteIdentifier(database) + "." + table;
}
if (!StringUtils.isEmptyOrNullStr(schema)) {
table = quoteIdentifier(schema) + "." + table;
}

return table;
}
if (!StringUtils.isEmptyOrNullStr(database)) {
table = quoteIdentifier(database) + "." + table;
}

default String getFullQualifiedTableNameForSpark(String database, String schema, String table) {
table = "`" + table + "`";
} else {

if (!StringUtils.isEmptyOrNullStr(schema)) {
table = "`" + schema + "`." + table ;
}
if (!StringUtils.isEmptyOrNullStr(schema)) {
table = schema + "." + table;
}

if (!StringUtils.isEmptyOrNullStr(database)) {
table = "`" + database + "`." + table;
if (!StringUtils.isEmptyOrNullStr(database)) {
table = database + "." + table;
}
}

return table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,4 @@ public boolean invalidateItemCanOutputToSelf() {
public boolean supportToBeErrorDataStorage() {
return true;
}

@Override
public String quoteIdentifier(String entity) {
return "`" + entity + "`";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,4 @@ public boolean invalidateItemCanOutputToSelf() {
public boolean supportToBeErrorDataStorage() {
return true;
}

@Override
public String quoteIdentifier(String entity) {
return entity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@

public class QuoteIdentifier {

public static String quote(String entity, boolean force) {
if (force) {
return "`" + entity + "`";
}

return entity;
}

public static String quote(String entity) {
return "`" + entity + "`";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,49 +143,49 @@ public static String generateUniqueCode(Map<String, String> inputParameterValue)
return Md5Utils.getMd5(sb.toString(),true);
}

public static String getOnClause(List<MappingColumn> mappingColumnList, Map<String,String> inputParameterValueResult) {
public static String getOnClause(List<MappingColumn> mappingColumnList, Map<String,String> inputParameterValueResult, boolean needQuote) {
//get on clause
String[] columnList = new String[mappingColumnList.size()];
for (int i = 0; i < mappingColumnList.size(); i++) {
MappingColumn column = mappingColumnList.get(i);
columnList[i] = getCoalesceString(inputParameterValueResult.get(TABLE_ALIAS), column.getColumn())
columnList[i] = getCoalesceString(inputParameterValueResult.get(TABLE_ALIAS), column.getColumn(),needQuote)
+ column.getOperator()
+ getCoalesceString(inputParameterValueResult.get(TABLE2_ALIAS), column.getColumn2());
+ getCoalesceString(inputParameterValueResult.get(TABLE2_ALIAS), column.getColumn2(),needQuote);
}

return String.join(AND,columnList);
}

public static String getTableAliasColumns(List<MappingColumn> mappingColumnList, String tableAlias, int index) {
public static String getTableAliasColumns(List<MappingColumn> mappingColumnList, String tableAlias, int index, boolean needQuote) {
String[] columnList = new String[mappingColumnList.size()];
for (int i = 0; i < mappingColumnList.size(); i++) {
MappingColumn column = mappingColumnList.get(i);
if (index == 1) {
columnList[i] = tableAlias + "." + QuoteIdentifier.quote(column.getColumn()) + " AS " + QuoteIdentifier.quote(column.getColumn() + "_" + index);;
columnList[i] = tableAlias + "." + QuoteIdentifier.quote(column.getColumn(), needQuote) + " AS " + QuoteIdentifier.quote(column.getColumn() + "_" + index, needQuote);;
} else if (index == 2){
columnList[i] = tableAlias + "." + QuoteIdentifier.quote(column.getColumn2()) + " AS " + QuoteIdentifier.quote(column.getColumn2() + "_" + index);;
columnList[i] = tableAlias + "." + QuoteIdentifier.quote(column.getColumn2(), needQuote) + " AS " + QuoteIdentifier.quote(column.getColumn2() + "_" + index, needQuote);;
}
}

return String.join(", ",columnList);
}

public static String getWhereClause(List<MappingColumn> mappingColumnList,Map<String,String> inputParameterValueResult) {
String columnNotNull = "( NOT (" + getColumnIsNullStr(inputParameterValueResult.get(TABLE_ALIAS),getColumnListInTable(mappingColumnList)) + " ))";
String columnIsNull2 = "( " + getColumnIsNullStr(inputParameterValueResult.get(TABLE2_ALIAS),getColumnListInTable2(mappingColumnList)) + " )";
public static String getWhereClause(List<MappingColumn> mappingColumnList,Map<String,String> inputParameterValueResult, boolean needQuote) {
String columnNotNull = "( NOT (" + getColumnIsNullStr(inputParameterValueResult.get(TABLE_ALIAS),getColumnListInTable(mappingColumnList), needQuote) + " ))";
String columnIsNull2 = "( " + getColumnIsNullStr(inputParameterValueResult.get(TABLE2_ALIAS),getColumnListInTable2(mappingColumnList), needQuote) + " )";

return columnNotNull + AND + columnIsNull2;
}

public static String getCoalesceString(String table, String column) {
return "coalesce(" + table + "." + QuoteIdentifier.quote(column) + ", '')";
public static String getCoalesceString(String table, String column, boolean needQuote) {
return "coalesce(" + table + "." + QuoteIdentifier.quote(column, needQuote) + ", '')";
}

public static String getColumnIsNullStr(String table, List<String> columns) {
public static String getColumnIsNullStr(String table, List<String> columns, boolean needQuote) {
String[] columnList = new String[columns.size()];
for (int i = 0; i < columns.size(); i++) {
String column = columns.get(i);
columnList[i] = table + "." + QuoteIdentifier.quote(column) + " IS NULL";
columnList[i] = table + "." + QuoteIdentifier.quote(column, needQuote) + " IS NULL";
}
return String.join(AND, columnList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,21 @@ protected List<SourceConfig> getSourceConfigs() throws DataVinesException {
Map<String, Object> connectorParameterMap = new HashMap<>(connectorParameter.getParameters());
connectorParameterMap.putAll(metricInputParameter);
connectorParameterMap = connectorFactory.getConnectorParameterConverter().converter(connectorParameterMap);
String connectorUUID = connectorFactory.getConnectorParameterConverter().getConnectorUUID(connectorParameterMap);

metricInputParameter.put(DATABASE_NAME,metricInputParameter.get(DATABASE));
metricInputParameter.put(TABLE_NAME,metricInputParameter.get(TABLE));
metricInputParameter.put(COLUMN_NAME,metricInputParameter.get(COLUMN));

metricInputParameter.put(DATABASE_NAME, metricInputParameter.get(DATABASE));
metricInputParameter.put(TABLE_NAME, metricInputParameter.get(TABLE));
metricInputParameter.put(COLUMN_NAME, metricInputParameter.get(COLUMN));
if (connectorParameter.getParameters().get(SCHEMA) != null) {
metricInputParameter.put(SCHEMA, (String)connectorParameter.getParameters().get(SCHEMA));
}
String table = connectorFactory.getDialect()
.getFullQualifiedTableName(metricInputParameter.get(DATABASE),metricInputParameter.get(SCHEMA),metricInputParameter.get(TABLE));

metricInputParameter.put(TABLE_ALIAS, metricInputParameter.get(DATABASE) + "_" + metricInputParameter.get(TABLE) + "_1");

String table = connectorFactory.getDialect().getFullQualifiedTableName(
metricInputParameter.get(DATABASE),
metricInputParameter.get(SCHEMA),
metricInputParameter.get(TABLE), true);
connectorParameterMap.put(TABLE, table);

String outputTable = metricInputParameter.get(TABLE);
Expand All @@ -97,6 +102,7 @@ protected List<SourceConfig> getSourceConfigs() throws DataVinesException {
invalidateItemCanOutput &= connectorFactory.getDialect().invalidateItemCanOutput();
metricInputParameter.put(INVALIDATE_ITEM_CAN_OUTPUT, String.valueOf(invalidateItemCanOutput));

String connectorUUID = connectorFactory.getConnectorParameterConverter().getConnectorUUID(connectorParameterMap);
if (sourceConnectorSet.contains(connectorUUID)) {
continue;
}
Expand All @@ -113,8 +119,8 @@ protected List<SourceConfig> getSourceConfigs() throws DataVinesException {
ConnectorParameter connectorParameter2 = jobExecutionParameter.getConnectorParameter2();
Map<String, Object> connectorParameterMap = new HashMap<>(connectorParameter2.getParameters());
connectorParameterMap.putAll(metricInputParameter);
connectorParameterMap.put(TABLE,metricInputParameter.get(TABLE2));
connectorParameterMap.put(DATABASE,metricInputParameter.get(DATABASE2));
connectorParameterMap.put(TABLE, metricInputParameter.get(TABLE2));
connectorParameterMap.put(DATABASE, metricInputParameter.get(DATABASE2));
ConnectorFactory connectorFactory = PluginLoader
.getPluginLoader(ConnectorFactory.class)
.getNewPlugin(connectorParameter2.getType());
Expand All @@ -123,12 +129,15 @@ protected List<SourceConfig> getSourceConfigs() throws DataVinesException {
metricInputParameter.put(SCHEMA2, (String)connectorParameter2.getParameters().get(SCHEMA));
}

String table = connectorFactory.getDialect()
.getFullQualifiedTableName(metricInputParameter.get(DATABASE2),metricInputParameter.get(SCHEMA2),metricInputParameter.get(TABLE2));
connectorParameterMap.put(TABLE, table);
metricInputParameter.put(TABLE2_ALIAS, metricInputParameter.get(DATABASE2) + "_" + metricInputParameter.get(TABLE2) + "_2");

String table = connectorFactory.getDialect().getFullQualifiedTableName(
metricInputParameter.get(DATABASE2),
metricInputParameter.get(SCHEMA2),
metricInputParameter.get(TABLE2), true);
connectorParameterMap.put(TABLE, table);
connectorParameterMap = connectorFactory.getConnectorParameterConverter().converter(connectorParameterMap);
String connectorUUID = connectorFactory.getConnectorParameterConverter().getConnectorUUID(connectorParameterMap);


String outputTable = metricInputParameter.get(TABLE2);
connectorParameterMap.put(OUTPUT_TABLE, outputTable);
Expand All @@ -138,10 +147,12 @@ protected List<SourceConfig> getSourceConfigs() throws DataVinesException {
connectorParameterMap.put(POST_SQL, metricInputParameter.get(POST_SQL));
metricInputParameter.putAll(connectorFactory.getDialect().getDialectKeyMap());
metricInputParameter.put(SRC_CONNECTOR_TYPE, connectorParameter2.getType());
metricInputParameter.put(TABLE, table);
metricInputParameter.put(TABLE2, table);
boolean invalidateItemCanOutput = Boolean.parseBoolean(metricInputParameter.get(INVALIDATE_ITEM_CAN_OUTPUT));
invalidateItemCanOutput &= connectorFactory.getDialect().invalidateItemCanOutput();
metricInputParameter.put(INVALIDATE_ITEM_CAN_OUTPUT, String.valueOf(invalidateItemCanOutput));

String connectorUUID = connectorFactory.getConnectorParameterConverter().getConnectorUUID(connectorParameterMap);
if (targetConnectorSet.contains(connectorUUID)) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,11 @@ public void buildTransformConfigs() {
String metricUniqueKey = getMetricUniqueKey(parameter);
Map<String, String> metricInputParameter = metric2InputParameter.get(metricUniqueKey);
metricInputParameter.put(METRIC_UNIQUE_KEY, metricUniqueKey);
metricInputParameter.put(TABLE_ALIAS, metricInputParameter.get(DATABASE)+"_"+metricInputParameter.get(TABLE) + "_1");
metricInputParameter.put(TABLE2_ALIAS, metricInputParameter.get(DATABASE2)+"_"+metricInputParameter.get(TABLE2) + "_2");
List<MappingColumn> mappingColumns = JSONUtils.toList(metricInputParameter.get(MAPPING_COLUMNS),MappingColumn.class);
metricInputParameter.put(TABLE_ALIAS_COLUMNS, MetricParserUtils.getTableAliasColumns(mappingColumns,metricInputParameter.get(TABLE_ALIAS),1));
metricInputParameter.put(TABLE2_ALIAS_COLUMNS, MetricParserUtils.getTableAliasColumns(mappingColumns,metricInputParameter.get(TABLE2_ALIAS),2));
metricInputParameter.put(ON_CLAUSE, MetricParserUtils.getOnClause(mappingColumns, metricInputParameter));
metricInputParameter.put(WHERE_CLAUSE, MetricParserUtils.getWhereClause(mappingColumns, metricInputParameter));
metricInputParameter.put(TABLE_ALIAS_COLUMNS, MetricParserUtils.getTableAliasColumns(mappingColumns,metricInputParameter.get(TABLE_ALIAS),1, false));
metricInputParameter.put(TABLE2_ALIAS_COLUMNS, MetricParserUtils.getTableAliasColumns(mappingColumns,metricInputParameter.get(TABLE2_ALIAS),2, false));
metricInputParameter.put(ON_CLAUSE, MetricParserUtils.getOnClause(mappingColumns, metricInputParameter, false));
metricInputParameter.put(WHERE_CLAUSE, MetricParserUtils.getWhereClause(mappingColumns, metricInputParameter,false));

metric2InputParameter.put(metricUniqueKey, metricInputParameter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ protected List<SourceConfig> getSourceConfigs() throws DataVinesException {
.getPluginLoader(ConnectorFactory.class)
.getNewPlugin(connectorParameter.getType());
String table = connectorFactory.getDialect()
.getFullQualifiedTableNameForSpark(metricInputParameter.get(DATABASE),
metricInputParameter.get(SCHEMA),metricInputParameter.get(TABLE));
.getFullQualifiedTableName(metricInputParameter.get(DATABASE),
metricInputParameter.get(SCHEMA),metricInputParameter.get(TABLE), false);

connectorParameterMap.put(TABLE, table);
connectorParameterMap.put(DATABASE, metricInputParameter.get(DATABASE));
Expand Down Expand Up @@ -131,9 +131,9 @@ protected List<SourceConfig> getSourceConfigs() throws DataVinesException {
ConnectorFactory connectorFactory = PluginLoader
.getPluginLoader(ConnectorFactory.class)
.getNewPlugin(connectorParameter2.getType());
String table = connectorFactory.getDialect().getFullQualifiedTableNameForSpark(metricInputParameter.get(DATABASE2),
String table = connectorFactory.getDialect().getFullQualifiedTableName(metricInputParameter.get(DATABASE2),
metricInputParameter.get(SCHEMA2),
metricInputParameter.get(TABLE2));
metricInputParameter.get(TABLE2),false);

connectorParameterMap.put(TABLE, table);
connectorParameterMap.put(DATABASE, metricInputParameter.get(DATABASE2));
Expand Down Expand Up @@ -248,6 +248,6 @@ public String getTableAlias(String database, String schema, String table, String
table = database + "_" + table;
}

return table + "_" + order;
return table+ "_" + order;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ public void buildTransformConfigs() {
Map<String, String> metricInputParameter = metric2InputParameter.get(metricUniqueKey);
metricInputParameter.put(METRIC_UNIQUE_KEY, metricUniqueKey);
List<MappingColumn> mappingColumns = JSONUtils.toList(metricInputParameter.get(MAPPING_COLUMNS),MappingColumn.class);
metricInputParameter.put(TABLE_ALIAS_COLUMNS, MetricParserUtils.getTableAliasColumns(mappingColumns,metricInputParameter.get(TABLE_ALIAS),1));
metricInputParameter.put(TABLE2_ALIAS_COLUMNS, MetricParserUtils.getTableAliasColumns(mappingColumns,metricInputParameter.get(TABLE2_ALIAS),2));
metricInputParameter.put(ON_CLAUSE, MetricParserUtils.getOnClause(mappingColumns, metricInputParameter));
metricInputParameter.put(WHERE_CLAUSE, MetricParserUtils.getWhereClause(mappingColumns, metricInputParameter));
metricInputParameter.put(TABLE_ALIAS_COLUMNS, MetricParserUtils.getTableAliasColumns(mappingColumns,metricInputParameter.get(TABLE_ALIAS),1, true));
metricInputParameter.put(TABLE2_ALIAS_COLUMNS, MetricParserUtils.getTableAliasColumns(mappingColumns,metricInputParameter.get(TABLE2_ALIAS),2, true));
metricInputParameter.put(ON_CLAUSE, MetricParserUtils.getOnClause(mappingColumns, metricInputParameter, true));
metricInputParameter.put(WHERE_CLAUSE, MetricParserUtils.getWhereClause(mappingColumns, metricInputParameter, true));
metric2InputParameter.put(metricUniqueKey, metricInputParameter);
}
}
Expand Down

0 comments on commit 6163e44

Please sign in to comment.