Skip to content

Commit

Permalink
static string sql template
Browse files Browse the repository at this point in the history
  • Loading branch information
gnehil committed Sep 14, 2023
1 parent 01c11b9 commit 6bcd9b8
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.doris.catalog;

import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
Expand Down Expand Up @@ -136,8 +137,7 @@ public String getDefaultDatabase() throws CatalogException {

@Override
public boolean databaseExists(String databaseName) throws CatalogException {
String query = DorisCatalogUtil.getDatabaseQuery();
try (PreparedStatement ps = conn.prepareStatement(query)) {
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.DATABASE_QUERY)) {
ps.setString(1, databaseName);
ResultSet rs = ps.executeQuery();
return rs.next();
Expand All @@ -148,9 +148,8 @@ public boolean databaseExists(String databaseName) throws CatalogException {

@Override
public List<String> listDatabases() throws CatalogException {
String query = DorisCatalogUtil.getAllDatabasesQuery();
List<String> databases = new ArrayList<>();
try (PreparedStatement ps = conn.prepareStatement(query)) {
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.ALL_DATABASES_QUERY)) {
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String database = rs.getString(1);
Expand All @@ -166,9 +165,8 @@ public List<String> listDatabases() throws CatalogException {
@Override
public List<String> listTables(String databaseName)
throws CatalogException, DatabaseNotExistException {
String query = DorisCatalogUtil.getTablesQueryWithDatabase();
List<String> tables = new ArrayList<>();
try (PreparedStatement ps = conn.prepareStatement(query)) {
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.TABLES_QUERY_WITH_DATABASE_QUERY)) {
ps.setString(1, databaseName);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
Expand All @@ -185,8 +183,7 @@ public List<String> listTables(String databaseName)

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
String query = DorisCatalogUtil.getTablesQueryWithIdentifier();
try (PreparedStatement ps = conn.prepareStatement(query)) {
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.TABLES_QUERY_WITH_IDENTIFIER_QUERY)) {
ps.setString(1, tablePath.getDatabaseName());
ps.setString(2, tablePath.getTableName());
ResultSet rs = ps.executeQuery();
Expand All @@ -205,8 +202,7 @@ public CatalogTable getTable(TablePath tablePath)
throw new TableNotExistException(catalogName, tablePath);
}
TableSchema.Builder builder = TableSchema.builder();
String query = DorisCatalogUtil.getTableSchemaQuery();
try (PreparedStatement ps = conn.prepareStatement(query)) {
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.TABLE_SCHEMA_QUERY)) {

List<String> keyList = new ArrayList<>();
ps.setString(1, tablePath.getDatabaseName());
Expand Down Expand Up @@ -245,15 +241,13 @@ public CatalogTable getTable(TablePath tablePath)
String.format("get table [%s] failed", tablePath.getFullName()), e);
}

String comment = "";

return CatalogTable.of(
TableIdentifier.of(
catalogName, tablePath.getDatabaseName(), tablePath.getTableName()),
builder.build(),
connectorOptions(),
Collections.emptyList(),
comment);
StringUtils.EMPTY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,26 @@

public class DorisCatalogUtil {

public static final String ALL_DATABASES_QUERY = "SELECT SCHEMA_NAME FROM information_schema.schemata WHERE CATALOG_NAME = 'internal' ORDER BY SCHEMA_NAME";

public static final String DATABASE_QUERY = "SELECT SCHEMA_NAME FROM information_schema.schemata "
+ "WHERE CATALOG_NAME = 'internal' AND SCHEMA_NAME = ? "
+ "ORDER BY SCHEMA_NAME";

public static final String TABLES_QUERY_WITH_DATABASE_QUERY = "SELECT TABLE_NAME FROM information_schema.tables "
+ "WHERE TABLE_CATALOG = 'internal' AND TABLE_SCHEMA = ? "
+ "ORDER BY TABLE_NAME";

public static final String TABLES_QUERY_WITH_IDENTIFIER_QUERY = "SELECT TABLE_NAME FROM information_schema.tables "
+ "WHERE TABLE_CATALOG = 'internal' AND TABLE_SCHEMA = ? AND TABLE_NAME = ? "
+ "ORDER BY TABLE_NAME";

public static final String TABLE_SCHEMA_QUERY = "SELECT COLUMN_NAME,ORDINAL_POSITION,COLUMN_DEFAULT,IS_NULLABLE,COLUMN_TYPE,COLUMN_SIZE,"
+ "COLUMN_KEY,NUMERIC_PRECISION,NUMERIC_SCALE,COLUMN_COMMENT "
+ "FROM information_schema.columns "
+ "WHERE TABLE_CATALOG = 'internal' AND TABLE_SCHEMA = ? AND TABLE_NAME = ? "
+ "ORDER BY ORDINAL_POSITION";

public static String randomFrontEndHost(String[] frontEndNodes) {
if (frontEndNodes.length == 1) {
return frontEndNodes[0].split(":")[0];
Expand All @@ -54,36 +74,6 @@ public static String getJdbcUrl(String host, Integer port, String database) {
return String.format("jdbc:mysql://%s:%d/%s", host, port, database);
}

public static String getAllDatabasesQuery() {
return "SELECT SCHEMA_NAME FROM information_schema.schemata WHERE CATALOG_NAME = 'internal' ORDER BY SCHEMA_NAME";
}

public static String getDatabaseQuery() {
return "SELECT SCHEMA_NAME FROM information_schema.schemata "
+ "WHERE CATALOG_NAME = 'internal' AND SCHEMA_NAME = ? "
+ "ORDER BY SCHEMA_NAME";
}

public static String getTablesQueryWithDatabase() {
return "SELECT TABLE_NAME FROM information_schema.tables "
+ "WHERE TABLE_CATALOG = 'internal' AND TABLE_SCHEMA = ? "
+ "ORDER BY TABLE_NAME";
}

public static String getTablesQueryWithIdentifier() {
return "SELECT TABLE_NAME FROM information_schema.tables "
+ "WHERE TABLE_CATALOG = 'internal' AND TABLE_SCHEMA = ? AND TABLE_NAME = ? "
+ "ORDER BY TABLE_NAME";
}

public static String getTableSchemaQuery() {
return "SELECT COLUMN_NAME,ORDINAL_POSITION,COLUMN_DEFAULT,IS_NULLABLE,COLUMN_TYPE,COLUMN_SIZE,"
+ "COLUMN_KEY,NUMERIC_PRECISION,NUMERIC_SCALE,COLUMN_COMMENT "
+ "FROM information_schema.columns "
+ "WHERE TABLE_CATALOG = 'internal' AND TABLE_SCHEMA = ? AND TABLE_NAME = ? "
+ "ORDER BY ORDINAL_POSITION";
}

public static String getCreateDatabaseQuery(String database, boolean ignoreIfExists) {
return "CREATE DATABASE " + (ignoreIfExists ? "IF NOT EXISTS " : "") + database;
}
Expand All @@ -93,12 +83,8 @@ public static String getDropDatabaseQuery(String database, boolean ignoreIfNotEx
}

/**
* CREATE TABLE ${table_identifier} ( ${column_definition} ) ENGINE = ${engine_type} UNIQUE KEY
* (${key_columns}) COMMENT ${table_comment} ${partition_info} DISTRIBUTED BY HASH
* (${distribution_columns}) BUCKETS ${distribution_bucket} PROPERTIES ( ${properties} )
*
* @param createTableTemplate create table template
* @param catalogTable catalog table
* @param catalogTable catalog table
* @return create table stmt
*/
public static String getCreateTableStatement(
Expand Down

0 comments on commit 6bcd9b8

Please sign in to comment.