Skip to content

Commit

Permalink
[Feature][Jdbc] Add Dameng catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 committed Sep 8, 2023
1 parent 8a3f261 commit 07ae24e
Show file tree
Hide file tree
Showing 4 changed files with 458 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,9 @@ public AbstractJdbcCatalog(
String defaultSchema) {

checkArgument(StringUtils.isNotBlank(username));
urlInfo.getDefaultDatabase()
.orElseThrow(
() -> new IllegalArgumentException("Can't find default database in url"));
checkArgument(StringUtils.isNotBlank(urlInfo.getUrlWithoutDatabase()));
this.catalogName = catalogName;
this.defaultDatabase = urlInfo.getDefaultDatabase().get();
this.defaultDatabase = urlInfo.getDefaultDatabase().orElse(null);
this.username = username;
this.pwd = pwd;
this.baseUrl = urlInfo.getUrlWithoutDatabase();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;

import lombok.extern.slf4j.Slf4j;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
public class DamengCatalog extends AbstractJdbcCatalog {
private static final DamengDataTypeConvertor DATA_TYPE_CONVERTOR =
new DamengDataTypeConvertor();
private static final List<String> EXCLUDED_SCHEMAS =
Collections.unmodifiableList(
Arrays.asList("SYS", "SYSDBA", "SYSSSO", "SYSAUDITOR", "CTISYS"));

private static final String SELECT_COLUMNS_SQL =
"SELECT COLUMNS.COLUMN_NAME, COLUMNS.DATA_TYPE, COLUMNS.DATA_LENGTH, COLUMNS.DATA_PRECISION, COLUMNS.DATA_SCALE "
+ ", COLUMNS.NULLABLE, COLUMNS.DATA_DEFAULT, COMMENTS.COMMENTS "
+ "FROM ALL_TAB_COLUMNS COLUMNS "
+ "LEFT JOIN ALL_COL_COMMENTS COMMENTS "
+ "ON COLUMNS.OWNER = COMMENTS.SCHEMA_NAME "
+ "AND COLUMNS.TABLE_NAME = COMMENTS.TABLE_NAME "
+ "AND COLUMNS.COLUMN_NAME = COMMENTS.COLUMN_NAME "
+ "WHERE COLUMNS.OWNER = ? "
+ "AND COLUMNS.TABLE_NAME = ? "
+ "ORDER BY COLUMNS.COLUMN_ID ASC";

public DamengCatalog(
String catalogName,
String username,
String pwd,
JdbcUrlUtil.UrlInfo urlInfo,
String defaultSchema) {
super(catalogName, username, pwd, urlInfo, defaultSchema);
}

@Override
protected String getListDatabaseSql() {
return "SELECT name FROM v$database";
}

@Override
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
throw new UnsupportedOperationException();
}

@Override
protected String getDropTableSql(TablePath tablePath) {
return String.format("DROP TABLE %s", getTableName(tablePath));
}

@Override
protected String getTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName().toUpperCase();
}

@Override
protected String getListTableSql(String databaseName) {
return "SELECT OWNER, TABLE_NAME FROM ALL_TABLES";
}

@Override
protected String getTableName(ResultSet rs) throws SQLException {
if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
return null;
}
return rs.getString(1) + "." + rs.getString(2);
}

@Override
protected String getSelectColumnsSql(TablePath tablePath) {
return String.format(
SELECT_COLUMNS_SQL, tablePath.getSchemaName(), tablePath.getTableName());
}

@Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
String typeName = resultSet.getString("DATA_TYPE");
long columnLength = resultSet.getLong("DATA_LENGTH");
long columnPrecision = resultSet.getLong("DATA_PRECISION");
long columnScale = resultSet.getLong("DATA_SCALE");
String columnComment = resultSet.getString("COMMENTS");
Object defaultValue = resultSet.getObject("DATA_DEFAULT");
boolean isNullable = resultSet.getString("NULLABLE").equals("Y");

SeaTunnelDataType<?> type = fromJdbcType(typeName, columnPrecision, columnScale);

return PhysicalColumn.of(
columnName,
type,
0,
isNullable,
defaultValue,
columnComment,
typeName,
false,
false,
0L,
null,
columnLength);
}

private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision, long scale) {
Map<String, Object> dataTypeProperties = new HashMap<>();
dataTypeProperties.put(DamengDataTypeConvertor.PRECISION, precision);
dataTypeProperties.put(DamengDataTypeConvertor.SCALE, scale);
return DATA_TYPE_CONVERTOR.toSeaTunnelType(typeName, dataTypeProperties);
}

@Override
protected String getUrlFromDatabaseName(String databaseName) {
return defaultUrl;
}

@Override
protected String getOptionTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm;

import org.apache.seatunnel.shade.com.google.common.base.Preconditions;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;

import org.apache.commons.lang3.StringUtils;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class DamengCatalogFactory implements CatalogFactory {

public static final String IDENTIFIER = "Dameng";

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
Preconditions.checkArgument(
StringUtils.isNoneBlank(urlWithDatabase),
"Miss config <base-url>! Please check your config.");
JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
return new DamengCatalog(
catalogName,
options.get(JdbcCatalogOptions.USERNAME),
options.get(JdbcCatalogOptions.PASSWORD),
urlInfo,
options.get(JdbcCatalogOptions.SCHEMA));
}

@Override
public OptionRule optionRule() {
return JdbcCatalogOptions.BASE_RULE.build();
}
}
Loading

0 comments on commit 07ae24e

Please sign in to comment.