From 769919a8fbe7c2d65da805db947588e7f3ec30e6 Mon Sep 17 00:00:00 2001 From: "cheney.yin" Date: Thu, 14 Sep 2023 13:58:19 +0800 Subject: [PATCH] [Feature][Flink] Support Decimal Type with configurable precision and scale --- release-note.md | 1 + .../execution/FlinkRuntimeEnvironment.java | 25 ++++++++++++++- .../flink/execution/SinkExecuteProcessor.java | 5 ++- .../FlinkAbstractPluginExecuteProcessor.java | 32 +++++++++++++++++++ .../execution/FlinkRuntimeEnvironment.java | 25 +++++++++++++++ .../flink/execution/SinkExecuteProcessor.java | 5 ++- .../execution/SourceExecuteProcessor.java | 4 +++ .../execution/TransformExecuteProcessor.java | 14 ++++---- .../src/test/resources/fake_to_paimon.conf | 1 + .../src/test/resources/paimon_to_assert.conf | 2 +- .../flink/utils/TypeConverterUtilsTest.java | 11 +++++-- .../serialization/FlinkRowConverter.java | 24 ++++++++++++++ .../flink/utils/TypeConverterUtils.java | 21 ++++++------ 13 files changed, 141 insertions(+), 29 deletions(-) diff --git a/release-note.md b/release-note.md index 61664d773f4..26f710c8700 100644 --- a/release-note.md +++ b/release-note.md @@ -147,6 +147,7 @@ - [Core] [API] Add copy method to Catalog codes (#4414) - [Core] [API] Add options check before create source and sink and transform in FactoryUtil (#4424) - [Core] [Shade] Add guava shade module (#4358) +- [Core] [Flink] Support Decimal Type with configurable precision and scale (#5419) ### Connector-V2 diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index 996c9698fb0..4bd81769fbe 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.env.EnvCommonOptions; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.JobMode; @@ -51,8 +52,11 @@ import java.net.URL; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; @Slf4j @@ -64,7 +68,8 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment { private StreamExecutionEnvironment environment; private StreamTableEnvironment tableEnvironment; - + private Map stagedTypes = new LinkedHashMap<>(); + private Optional defaultType = Optional.empty(); private JobMode jobMode; private String jobName = Constants.LOGO; @@ -334,6 +339,24 @@ public void registerResultTable( name, tableEnvironment.fromChangelogStream(dataStream)); } + public void stageType(String tblName, SeaTunnelRowType type) { + stagedTypes.put(tblName, type); + } + + public void stageDefaultType(SeaTunnelRowType type) { + this.defaultType = Optional.of(type); + } + + public Optional type(String tblName) { + return stagedTypes.containsKey(tblName) + ? Optional.of(stagedTypes.get(tblName)) + : Optional.empty(); + } + + public Optional defaultType() { + return this.defaultType; + } + public static FlinkRuntimeEnvironment getInstance(Config config) { if (INSTANCE == null) { synchronized (FlinkRuntimeEnvironment.class) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index 03bd2077e50..cf0ad1e7be8 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -31,7 +31,6 @@ import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.translation.flink.sink.FlinkSink; -import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; @@ -101,8 +100,8 @@ public List> execute(List> upstreamDataStreams) SeaTunnelSink seaTunnelSink = plugins.get(i); DataStream stream = fromSourceTable(sinkConfig).orElse(input); - seaTunnelSink.setTypeInfo( - (SeaTunnelRowType) TypeConverterUtils.convert(stream.getType())); + SeaTunnelRowType sourceType = initSourceType(sinkConfig, stream); + seaTunnelSink.setTypeInfo(sourceType); if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) { SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink; DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode(); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java index 6c61f61b957..ed8b72a8f02 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java @@ -20,9 +20,11 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.utils.ReflectionUtils; import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor; import org.apache.seatunnel.core.starter.flink.utils.TableUtil; +import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.Table; @@ -117,6 +119,36 @@ protected void registerAppendStream(Config pluginConfig) { } } + protected void stageType(Config pluginConfig, SeaTunnelRowType type) { + if (!flinkRuntimeEnvironment.defaultType().isPresent()) { + flinkRuntimeEnvironment.stageDefaultType(type); + } + + if (pluginConfig.hasPath("result_table_name")) { + String tblName = pluginConfig.getString("result_table_name"); + flinkRuntimeEnvironment.stageType(tblName, type); + } + } + + protected Optional sourceType(Config pluginConfig) { + if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) { + String tblName = pluginConfig.getString(SOURCE_TABLE_NAME); + return flinkRuntimeEnvironment.type(tblName); + } else { + return flinkRuntimeEnvironment.defaultType(); + } + } + + protected SeaTunnelRowType initSourceType(Config sinkConfig, DataStream stream) { + SeaTunnelRowType sourceType = + sourceType(sinkConfig) + .orElseGet( + () -> + (SeaTunnelRowType) + TypeConverterUtils.convert(stream.getType())); + return sourceType; + } + protected abstract List initializePlugins( List jarPaths, List pluginConfigs); } diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index 12168921d8c..d8ff813f123 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.env.EnvCommonOptions; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.JobMode; @@ -51,8 +52,11 @@ import java.net.URL; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; @Slf4j @@ -65,6 +69,9 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment { private StreamTableEnvironment tableEnvironment; + private Map stagedTypes = new LinkedHashMap<>(); + private Optional defaultType = Optional.empty(); + private JobMode jobMode; private String jobName = Constants.LOGO; @@ -334,6 +341,24 @@ public void registerResultTable( name, tableEnvironment.fromChangelogStream(dataStream)); } + public void stageType(String tblName, SeaTunnelRowType type) { + stagedTypes.put(tblName, type); + } + + public void stageDefaultType(SeaTunnelRowType type) { + this.defaultType = Optional.of(type); + } + + public Optional type(String tblName) { + return stagedTypes.containsKey(tblName) + ? Optional.of(stagedTypes.get(tblName)) + : Optional.empty(); + } + + public Optional defaultType() { + return this.defaultType; + } + public static FlinkRuntimeEnvironment getInstance(Config config) { if (INSTANCE == null) { synchronized (FlinkRuntimeEnvironment.class) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index ca9a05f632a..340351d1d48 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -31,7 +31,6 @@ import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.translation.flink.sink.FlinkSink; -import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; @@ -102,8 +101,8 @@ public List> execute(List> upstreamDataStreams) SeaTunnelSink seaTunnelSink = plugins.get(i); DataStream stream = fromSourceTable(sinkConfig).orElse(input); - seaTunnelSink.setTypeInfo( - (SeaTunnelRowType) TypeConverterUtils.convert(stream.getType())); + SeaTunnelRowType sourceType = initSourceType(sinkConfig, stream); + seaTunnelSink.setTypeInfo(sourceType); if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) { SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink; DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode(); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java index f3ebdd04378..d74726a133f 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SupportCoordinate; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.core.starter.enums.PluginType; import org.apache.seatunnel.plugin.discovery.PluginIdentifier; @@ -76,12 +77,15 @@ public List> execute(List> upstreamDataStreams) boolean bounded = internalSource.getBoundedness() == org.apache.seatunnel.api.source.Boundedness.BOUNDED; + DataStreamSource sourceStream = addSource( executionEnvironment, sourceFunction, "SeaTunnel " + internalSource.getClass().getSimpleName(), bounded); + stageType(pluginConfig, (SeaTunnelRowType) internalSource.getProducedType()); + if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) { int parallelism = pluginConfig.getInt(CommonOptions.PARALLELISM.key()); sourceStream.setParallelism(parallelism); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java index a358fb6f330..0dc36c62b0b 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java @@ -20,8 +20,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.JobContext; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; import org.apache.seatunnel.plugin.discovery.PluginIdentifier; @@ -97,7 +97,10 @@ public List> execute(List> upstreamDataStreams) SeaTunnelTransform transform = plugins.get(i); Config pluginConfig = pluginConfigs.get(i); DataStream stream = fromSourceTable(pluginConfig).orElse(input); - input = flinkTransform(transform, stream); + SeaTunnelRowType sourceType = initSourceType(pluginConfig, stream); + transform.setTypeInfo(sourceType); + input = flinkTransform(sourceType, transform, stream); + stageType(pluginConfig, (SeaTunnelRowType) transform.getProducedType()); registerResultTable(pluginConfig, input); result.add(input); } catch (Exception e) { @@ -111,11 +114,10 @@ public List> execute(List> upstreamDataStreams) return result; } - protected DataStream flinkTransform(SeaTunnelTransform transform, DataStream stream) { - SeaTunnelDataType seaTunnelDataType = TypeConverterUtils.convert(stream.getType()); - transform.setTypeInfo(seaTunnelDataType); + protected DataStream flinkTransform( + SeaTunnelRowType sourceType, SeaTunnelTransform transform, DataStream stream) { TypeInformation rowTypeInfo = TypeConverterUtils.convert(transform.getProducedType()); - FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(seaTunnelDataType); + FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(sourceType); FlinkRowConverter transformOutputRowConverter = new FlinkRowConverter(transform.getProducedType()); DataStream output = diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf index 8afd81989ce..8e5f00ee7bc 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf @@ -27,6 +27,7 @@ env { source { FakeSource { + row.num = 100000 schema = { fields { c_map = "map" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf index d27893cedc0..cbd39a0fb0b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf @@ -44,7 +44,7 @@ sink { row_rules = [ { rule_type = MAX_ROW - rule_value = 5 + rule_value = 100000 } ], field_rules = [ diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java index d180af69aa0..95cfa335e7d 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java @@ -83,9 +83,14 @@ public void convertShortType() { @Test public void convertBigDecimalType() { - Assertions.assertEquals( - BasicTypeInfo.BIG_DEC_TYPE_INFO, - TypeConverterUtils.convert(new DecimalType(30, 2))); + /** + * To solve lost precision and scale of {@link + * org.apache.seatunnel.api.table.type.DecimalType}, use {@link + * org.apache.flink.api.common.typeinfo.BasicTypeInfo#STRING_TYPE_INFO} as the convert + * result of {@link org.apache.seatunnel.api.table.type.DecimalType} instance. + */ + Assertions.assertEquals( + BasicTypeInfo.STRING_TYPE_INFO, TypeConverterUtils.convert(new DecimalType(30, 2))); } @Test diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java index fa8d88e052f..a1278cdf853 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.translation.flink.serialization; +import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.MapType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -28,6 +29,8 @@ import org.apache.flink.types.RowKind; import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.util.HashMap; import java.util.Map; import java.util.function.BiFunction; @@ -68,6 +71,15 @@ private static Object convert(Object field, SeaTunnelDataType dataType) { case MAP: return convertMap( (Map) field, (MapType) dataType, FlinkRowConverter::convert); + + /** + * To solve lost precision and scale of {@link + * org.apache.seatunnel.api.table.type.DecimalType}, use {@link java.lang.String} as + * the convert result of {@link java.math.BigDecimal} instance. + */ + case DECIMAL: + BigDecimal decimal = (BigDecimal) field; + return decimal.toString(); default: return field; } @@ -122,6 +134,18 @@ private static Object reconvert(Object field, SeaTunnelDataType dataType) { case MAP: return convertMap( (Map) field, (MapType) dataType, FlinkRowConverter::reconvert); + + /** + * To solve lost precision and scale of {@link + * org.apache.seatunnel.api.table.type.DecimalType}, create {@link + * java.math.BigDecimal} instance from {@link java.lang.String} type field. + */ + case DECIMAL: + DecimalType decimalType = (DecimalType) dataType; + String decimalData = (String) field; + BigDecimal decimal = new BigDecimal(decimalData); + decimal.setScale(decimalType.getScale(), RoundingMode.HALF_UP); + return decimal; default: return field; } diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java index fc8b4f6b3cb..86fbd0833df 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java @@ -33,7 +33,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.MapTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo; import java.math.BigDecimal; import java.time.LocalDate; @@ -70,11 +69,15 @@ public class TypeConverterUtils { BridgedType.of(BasicType.DOUBLE_TYPE, BasicTypeInfo.DOUBLE_TYPE_INFO)); BRIDGED_TYPES.put( Void.class, BridgedType.of(BasicType.VOID_TYPE, BasicTypeInfo.VOID_TYPE_INFO)); - // TODO: there is a still an unresolved issue that the BigDecimal type will lose the - // precision and scale + /** + * To solve lost precision and scale of {@link + * org.apache.seatunnel.api.table.type.DecimalType}, use {@link + * org.apache.flink.api.common.typeinfo.BasicTypeInfo#STRING_TYPE_INFO} as the payload of + * {@link org.apache.seatunnel.api.table.type.DecimalType}. + */ BRIDGED_TYPES.put( BigDecimal.class, - BridgedType.of(new DecimalType(38, 18), BasicTypeInfo.BIG_DEC_TYPE_INFO)); + BridgedType.of(new DecimalType(38, 18), BasicTypeInfo.STRING_TYPE_INFO)); // data time types BRIDGED_TYPES.put( @@ -134,10 +137,7 @@ public static SeaTunnelDataType convert(TypeInformation dataType) { if (bridgedType != null) { return bridgedType.getSeaTunnelType(); } - if (dataType instanceof BigDecimalTypeInfo) { - BigDecimalTypeInfo decimalType = (BigDecimalTypeInfo) dataType; - return new DecimalType(decimalType.precision(), decimalType.scale()); - } + if (dataType instanceof MapTypeInfo) { MapTypeInfo mapTypeInfo = (MapTypeInfo) dataType; return new MapType<>( @@ -160,10 +160,7 @@ public static TypeInformation convert(SeaTunnelDataType dataType) { if (bridgedType != null) { return bridgedType.getFlinkType(); } - if (dataType instanceof DecimalType) { - DecimalType decimalType = (DecimalType) dataType; - return new BigDecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale()); - } + if (dataType instanceof MapType) { MapType mapType = (MapType) dataType; return new MapTypeInfo<>(