Skip to content

Commit

Permalink
oracle catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
eskabetxe committed Aug 1, 2024
1 parent 2ec0b81 commit 924f0d2
Show file tree
Hide file tree
Showing 5 changed files with 923 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package org.apache.flink.connector.jdbc.oracle.database.catalog;

import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.connector.jdbc.core.database.catalog.AbstractJdbcCatalog;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.exceptions.*;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.*;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties;


/**
* OracleCatalog 用于查表和查数据库,便于重建
*/
public class OracleCatalog extends AbstractJdbcCatalog {

private static final Logger LOG = LoggerFactory.getLogger(OracleCatalog.class);

public static final String DEFAULT_DATABASE = "helowin";


public static final String IDENTIFIER = "jdbc";
private static final String ORACLE_DRIVER = "oracle.driver.OracleDriver";
private OracleTypeMapper dialectTypeMapper;
private static final Set<String> builtinDatabases = new HashSet<String>() {
{
add("SCOTT");
add("ANONYMOUS");
add("XS$NULL");
add("DIP");
add("SPATIAL_WFS_ADMIN_USR");
add("SPATIAL_CSW_ADMIN_USR");
add("APEX_PUBLIC_USER");
add("ORACLE_OCM");
add("MDDATA");
}
};

public OracleCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
this(userClassLoader, catalogName, defaultDatabase, baseUrl, getBriefAuthProperties(username, pwd));
}

public OracleCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String baseUrl, Properties connectProperties) {
super(userClassLoader, catalogName, defaultDatabase, baseUrl, connectProperties);
String driverVersion = Preconditions.checkNotNull(getDriverVersion(), "Driver version must not be null.");
String databaseVersion = Preconditions.checkNotNull(getDatabaseVersion(), "Database version must not be null.");
LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion);
this.dialectTypeMapper = new OracleTypeMapper(databaseVersion, driverVersion);
}

private String getDatabaseVersion() {
try (Connection conn = DriverManager.getConnection(defaultUrl, connectionProperties)) {
return conn.getMetaData().getDatabaseProductVersion();
} catch (Exception e) {
throw new CatalogException( String.format("Failed in getting Oracle version by %s.", defaultUrl), e);
}
}

private String getDriverVersion() {
try (Connection conn = DriverManager.getConnection(defaultUrl, connectionProperties)) {
String driverVersion = conn.getMetaData().getDriverVersion();
Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
Matcher matcher = regexp.matcher(driverVersion);
return matcher.find() ? matcher.group(0) : null;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed in getting Oracle driver version by %s.", defaultUrl), e);

}
}

@Override
public List<String> listDatabases() throws CatalogException {
return extractColumnValuesBySQL(this.defaultUrl,
"select username from sys.dba_users " +
"where DEFAULT_TABLESPACE <> 'SYSTEM' and DEFAULT_TABLESPACE <> 'SYSAUX' " +
" order by username",
1,
dbName -> !builtinDatabases.contains(dbName));
}

@Override
public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
Preconditions.checkState(!StringUtils.isBlank(databaseName), "Database name must not be blank");
if (listDatabases().contains(databaseName)) {
return new CatalogDatabaseImpl(Collections.emptyMap(), null);
} else {
throw new DatabaseNotExistException(getName(), databaseName);
}
}


@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
Preconditions.checkState(StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
if (!databaseExists(databaseName)){// 注意这个值是 oracle 实例名称
throw new DatabaseNotExistException(getName(), databaseName);
}

List<String> listDatabases = listDatabases().stream().map(username -> "'" + username + "'")
.collect(Collectors.toList());
return extractColumnValuesBySQL(this.defaultUrl,
"SELECT OWNER||'.'||TABLE_NAME AS schemaTableName FROM sys.all_tables WHERE OWNER IN (" + String.join(",", listDatabases) + ")"+
"ORDER BY OWNER,TABLE_NAME",1, null, null);
}


@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
if (!tableExists(tablePath)) {
throw new TableNotExistException(getName(), tablePath);
}

String databaseName = tablePath.getDatabaseName();
String dbUrl = baseUrl + databaseName;
try(Connection conn = DriverManager.getConnection(dbUrl, connectionProperties)) {
DatabaseMetaData metaData = conn.getMetaData();
Optional<UniqueConstraint> primaryKey = getPrimaryKey(metaData, databaseName, getSchemaName(tablePath), getTableName(tablePath));
String statement = String.format("SELECT * FROM %s ", getSchemaTableName(tablePath)) ;
PreparedStatement ps = conn.prepareStatement(statement);
ResultSetMetaData resultSetMetaData = ps.getMetaData();

String[] columnNames = new String[resultSetMetaData.getColumnCount()];
DataType[] types = new DataType[resultSetMetaData.getColumnCount()];

for (int i = 1; i<=resultSetMetaData.getColumnCount(); i++) {
columnNames[i - 1] = resultSetMetaData.getColumnName(i);
types[i - 1] = fromJDBCType(tablePath, resultSetMetaData,i);
if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
types[i-1] = types[i-1].notNull();
}
}

Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
primaryKey.ifPresent( pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
Schema tableSchema = schemaBuilder.build();
Map<String, String> props = new HashMap<>();
props.put(FactoryUtil.CONNECTOR.key(), IDENTIFIER);
props.put("username" , getUsername());
props.put("password", getPassword());
props.put("table_name", getSchemaTableName(tablePath));
props.put("driverName", ORACLE_DRIVER);
return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);

} catch (Exception ex) {
throw new CatalogException(String.format("Failed getting Table %s", tablePath.getFullName()), ex);
}
}

@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
String[] schemaTableNames = getSchemaTableName(tablePath).split("\\.");
return !extractColumnValuesBySQL(
defaultUrl, "SELECT table_name FROM sys.all_tables where OWNER = ? and table_name = ?",
1, null, schemaTableNames[0], schemaTableNames[1])
.isEmpty();

}

protected List<String> extractColumnValuesBySQL(String connUrl, String sql, int columnIndex, Predicate<String> filterFunc, Object... params){
List<String> columnValues = Lists.newArrayList();

try (Connection conn = DriverManager.getConnection(connUrl, connectionProperties);
PreparedStatement ps = conn.prepareStatement(sql)){
if (Objects.nonNull(params) && params.length >0){
for (int i=0; i<params.length; i++) {
ps.setObject(i + 1, params[i]);
}
}
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String columnValue = rs.getString(columnIndex);
if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
columnValues.add(columnValue);
}
return columnValues;
}
} catch (Exception ex){
throw new CatalogException(String.format("The following SQL query could not be executed (%s): %s", connUrl, sql ), ex);
}
return columnValues;
}

protected String getSchemaTableName(ObjectPath tablePath) {
return PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath();
}

protected String getSchemaName(ObjectPath tablePath) {
return PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgSchemaName();
}

protected String getTableName(ObjectPath tablePath) {
return PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgTableName();
}


protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)

throws SQLException {

return dialectTypeMapper.mapping(tablePath, metadata, colIndex);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.apache.flink.connector.jdbc.oracle.database.catalog;

import oracle.jdbc.OracleTypes;
import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalogTypeMapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;

import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;

public class OracleTypeMapper implements JdbcCatalogTypeMapper {

private final String databaseVersion;

private final String driverVersion;

public OracleTypeMapper(String databaseVersion, String driverVersion) {
this.databaseVersion =databaseVersion;
this.driverVersion = driverVersion;
}

@Override
public DataType mapping(ObjectPath tablePath, ResultSetMetaData metaData, int colIndex) throws SQLException {
int jdbcType = metaData.getColumnType(colIndex);
String columnName = metaData.getColumnName(colIndex);
String oracleType = metaData.getColumnTypeName(colIndex).toUpperCase();
int precision = metaData.getPrecision(colIndex);
int scale = metaData.getScale(colIndex);

switch (jdbcType) {

case Types.CHAR:
case Types.VARCHAR:
case Types.NCHAR:
case Types.NVARCHAR:
case Types.STRUCT:
case Types.CLOB:
return DataTypes.STRING();
case Types.BLOB:
return DataTypes.BYTES();
case Types.INTEGER:
case Types.SMALLINT:
case Types.TINYINT:
return DataTypes.INT();
case Types.FLOAT:
case Types.REAL:
case OracleTypes.BINARY_FLOAT:
return DataTypes.FLOAT();
case Types.DOUBLE:
case OracleTypes.BINARY_DOUBLE:
return DataTypes.DOUBLE();
case Types.NUMERIC:
case Types.DECIMAL:
if (precision > 0 && precision < DecimalType.MAX_PRECISION) {
return DataTypes.DECIMAL(precision, metaData.getScale(colIndex));
}
return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18);
case Types.DATE:
return DataTypes.DATE();
case Types.TIMESTAMP:
case Types.TIMESTAMP_WITH_TIMEZONE:
case OracleTypes.TIMESTAMPTZ:
case OracleTypes.TIMESTAMPLTZ:
return scale > 0 ? DataTypes.TIMESTAMP(scale) : DataTypes.TIMESTAMP();
case OracleTypes.INTERVALYM:
return DataTypes.INTERVAL(DataTypes.YEAR(), DataTypes.MONTH());
case OracleTypes.INTERVALDS:
return DataTypes.INTERVAL(DataTypes.DAY(), DataTypes.SECOND());
case Types.BOOLEAN:
return DataTypes.BOOLEAN();
default:
final String jdbcColumnName = metaData.getColumnName(colIndex);
throw new UnsupportedOperationException(
String.format(
"Doesn't support Oracle type '%s' on column '%s' in Oracle version %s, driver version %s yet.",
oracleType, jdbcColumnName, databaseVersion, driverVersion));
}
}
}
Loading

0 comments on commit 924f0d2

Please sign in to comment.