Skip to content

Commit

Permalink
[improve][connector-v2][sqlserver-cdc]Unified sqlserver TypeUtils typ…
Browse files Browse the repository at this point in the history
…e conversion mode (#5668)
  • Loading branch information
zhilinli123 authored Oct 25, 2023
1 parent 33fa8ff commit 75b814b
Showing 1 changed file with 77 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,56 +27,101 @@
import io.debezium.relational.Column;
import io.debezium.relational.Table;

import java.sql.Types;
import java.util.List;

/** Utilities for converting from SqlServer types to SeaTunnel types. */
public class SqlServerTypeUtils {
private static final String DATETIME_OFFSET = "datetimeoffset";

public static SeaTunnelDataType<?> convertFromColumn(Column column) {
// ============================data types=====================

switch (column.typeName().toLowerCase()) {
case DATETIME_OFFSET:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
}
// -------------------------string----------------------------
private static final String SQLSERVER_CHAR = "CHAR";
private static final String SQLSERVER_VARCHAR = "VARCHAR";
private static final String SQLSERVER_NCHAR = "NCHAR";
private static final String SQLSERVER_NVARCHAR = "NVARCHAR";
private static final String SQLSERVER_STRUCT = "STRUCT";
private static final String SQLSERVER_CLOB = "CLOB";
private static final String SQLSERVER_LONGVARCHAR = "LONGVARCHAR";
private static final String SQLSERVER_LONGNVARCHAR = "LONGNVARCHAR";
private static final String SQLSERVER_TEXT = "TEXT";
private static final String SQLSERVER_NTEXT = "NTEXT";
private static final String SQLSERVER_XML = "XML";

// ------------------------------blob-------------------------
private static final String SQLSERVER_BLOB = "BLOB";
private static final String SQLSERVER_VARBINARY = "VARBINARY";

switch (column.jdbcType()) {
case Types.CHAR:
case Types.VARCHAR:
case Types.NCHAR:
case Types.NVARCHAR:
case Types.STRUCT:
case Types.CLOB:
case Types.LONGVARCHAR:
case Types.LONGNVARCHAR:
// ------------------------------number-------------------------
private static final String SQLSERVER_INTEGER = "INT";
private static final String SQLSERVER_SMALLINT = "SMALLINT";
private static final String SQLSERVER_TINYINT = "TINYINT";
private static final String SQLSERVER_BIGINT = "BIGINT";
private static final String SQLSERVER_FLOAT = "FLOAT";
private static final String SQLSERVER_REAL = "REAL";
private static final String SQLSERVER_DOUBLE = "DOUBLE";
private static final String SQLSERVER_NUMERIC = "NUMERIC";
private static final String SQLSERVER_DECIMAL = "DECIMAL";
private static final String SQLSERVER_SMALLMONEY = "SMALLMONEY";
private static final String SQLSERVER_MONEY = "MONEY";

// ------------------------------date-------------------------
private static final String SQLSERVER_TIMESTAMP = "TIMESTAMP";
private static final String SQLSERVER_DATE = "DATE";
private static final String SQLSERVER_TIME = "TIME";
private static final String SQLSERVER_DATETIMEOFFSET = "DATETIMEOFFSET";
private static final String SQLSERVER_DATETIME2 = "DATETIME2";
private static final String SQLSERVER_DATETIME = "DATETIME";
private static final String SQLSERVER_SMALLDATETIME = "SMALLDATETIME";

// ------------------------------bool-------------------------
private static final String SQLSERVER_BIT = "BIT";

public static SeaTunnelDataType<?> convertFromColumn(Column column) {
String typeName = column.typeName().toUpperCase();
switch (typeName) {
case SQLSERVER_CHAR:
case SQLSERVER_VARCHAR:
case SQLSERVER_NCHAR:
case SQLSERVER_NVARCHAR:
case SQLSERVER_STRUCT:
case SQLSERVER_CLOB:
case SQLSERVER_LONGVARCHAR:
case SQLSERVER_LONGNVARCHAR:
case SQLSERVER_TEXT:
case SQLSERVER_NTEXT:
case SQLSERVER_XML:
return BasicType.STRING_TYPE;
case Types.BLOB:
case Types.VARBINARY:
case SQLSERVER_BLOB:
case SQLSERVER_VARBINARY:
return PrimitiveByteArrayType.INSTANCE;
case Types.INTEGER:
case SQLSERVER_INTEGER:
return BasicType.INT_TYPE;
case Types.SMALLINT:
case Types.TINYINT:
case SQLSERVER_SMALLINT:
case SQLSERVER_TINYINT:
return BasicType.SHORT_TYPE;
case Types.BIGINT:
case SQLSERVER_BIGINT:
return BasicType.LONG_TYPE;
case Types.FLOAT:
case Types.REAL:
case SQLSERVER_REAL:
return BasicType.FLOAT_TYPE;
case Types.DOUBLE:
case SQLSERVER_DOUBLE:
case SQLSERVER_FLOAT:
return BasicType.DOUBLE_TYPE;
case Types.NUMERIC:
case Types.DECIMAL:
case SQLSERVER_NUMERIC:
case SQLSERVER_DECIMAL:
case SQLSERVER_SMALLMONEY:
case SQLSERVER_MONEY:
return new DecimalType(column.length(), column.scale().orElse(0));
case Types.TIMESTAMP:
case SQLSERVER_TIMESTAMP:
case SQLSERVER_DATETIMEOFFSET:
case SQLSERVER_DATETIME2:
case SQLSERVER_DATETIME:
case SQLSERVER_SMALLDATETIME:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case Types.DATE:
case SQLSERVER_DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
case Types.TIME:
case SQLSERVER_TIME:
return LocalTimeType.LOCAL_TIME_TYPE;
case Types.BOOLEAN:
case Types.BIT:
case SQLSERVER_BIT:
return BasicType.BOOLEAN_TYPE;
default:
throw new UnsupportedOperationException(
Expand Down

0 comments on commit 75b814b

Please sign in to comment.