diff --git a/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index daae0f2d60b..045b1fe7a7e 100644 --- a/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -31,7 +31,9 @@ import java.util.Arrays; import java.util.Collections; +import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; /** Unit test for {@link YamlPipelineDefinitionParser}. */ class YamlPipelineDefinitionParserTest { @@ -64,11 +66,81 @@ void testMinimizedDefinition() throws Exception { void testOverridingGlobalConfig() throws Exception { URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml"); YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); - ImmutableMap.builder().put("parallelism", "1").put("foo", "bar"); - PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration()); + PipelineDef pipelineDef = + parser.parse( + Paths.get(resource.toURI()), + Configuration.fromMap( + ImmutableMap.builder() + .put("parallelism", "1") + .put("foo", "bar") + .build())); assertThat(pipelineDef).isEqualTo(fullDefWithGlobalConf); } + @Test + void testEvaluateDefaultLocalTimeZone() throws Exception { + URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml"); + YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration()); + assertThat(pipelineDef.getConfig().get(PIPELINE_LOCAL_TIME_ZONE)) + .isNotEqualTo(PIPELINE_LOCAL_TIME_ZONE.defaultValue()); + } + + @Test + void testValidTimeZone() throws Exception { + URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml"); + YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = + parser.parse( + Paths.get(resource.toURI()), + Configuration.fromMap( + ImmutableMap.builder() + .put(PIPELINE_LOCAL_TIME_ZONE.key(), "Asia/Shanghai") + .build())); + assertThat(pipelineDef.getConfig().get(PIPELINE_LOCAL_TIME_ZONE)) + .isEqualTo("Asia/Shanghai"); + + pipelineDef = + parser.parse( + Paths.get(resource.toURI()), + Configuration.fromMap( + ImmutableMap.builder() + .put(PIPELINE_LOCAL_TIME_ZONE.key(), "GMT+08:00") + .build())); + assertThat(pipelineDef.getConfig().get(PIPELINE_LOCAL_TIME_ZONE)).isEqualTo("GMT+08:00"); + + pipelineDef = + parser.parse( + Paths.get(resource.toURI()), + Configuration.fromMap( + ImmutableMap.builder() + .put(PIPELINE_LOCAL_TIME_ZONE.key(), "UTC") + .build())); + assertThat(pipelineDef.getConfig().get(PIPELINE_LOCAL_TIME_ZONE)).isEqualTo("UTC"); + } + + @Test + void testInvalidTimeZone() throws Exception { + URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml"); + YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); + assertThatThrownBy( + () -> + parser.parse( + Paths.get(resource.toURI()), + Configuration.fromMap( + ImmutableMap.builder() + .put( + PIPELINE_LOCAL_TIME_ZONE.key(), + "invalid time zone") + .build()))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Invalid time zone. The valid value should be a Time Zone Database ID" + + " such as 'America/Los_Angeles' to include daylight saving time. " + + "Fixed offsets are supported using 'GMT-08:00' or 'GMT+08:00'. " + + "Or use 'UTC' without time zone and daylight saving time."); + } + private final PipelineDef fullDef = new PipelineDef( new SourceDef( diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/factories/Factory.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/factories/Factory.java index 9152ca85813..d1dfc4da6f3 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/factories/Factory.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/factories/Factory.java @@ -20,8 +20,6 @@ import com.ververica.cdc.common.configuration.ConfigOption; import com.ververica.cdc.common.configuration.Configuration; -import java.util.Collections; -import java.util.Map; import java.util.Set; /** @@ -63,23 +61,21 @@ public interface Factory { @PublicEvolving interface Context { - /** Gives the configuration of the current session. */ - Configuration getConfiguration(); - /** - * Returns the class loader of the current session. + * Returns the factory options used to create the object instances. * - *

The class loader is in particular useful for discovering factories. + * @return options of the current session. */ - ClassLoader getClassLoader(); + Configuration getFactoryConfiguration(); + + /** Returns the configuration of current pipeline. */ + Configuration getPipelineConfiguration(); /** - * Returns the options of the current session. + * Returns the class loader of the current session. * - * @return options of the current session. + *

The class loader is in particular useful for discovering factories. */ - default Map getEnrichmentOptions() { - return Collections.emptyMap(); - } + ClassLoader getClassLoader(); } } diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/factories/FactoryHelper.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/factories/FactoryHelper.java index 1b65d604891..5b58163c7a2 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/factories/FactoryHelper.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/factories/FactoryHelper.java @@ -20,8 +20,6 @@ import com.ververica.cdc.common.configuration.Configuration; -import java.util.Map; - /** A helper for working with {@link Factory}. */ @PublicEvolving public class FactoryHelper { @@ -29,32 +27,32 @@ public class FactoryHelper { /** Default implementation of {@link Factory.Context}. */ public static class DefaultContext implements Factory.Context { - private final Map enrichmentOptions; + private final Configuration factoryConfiguration; private final ClassLoader classLoader; - private final Configuration configuration; + private final Configuration pipelineConfiguration; public DefaultContext( - Map enrichmentOptions, - Configuration configuration, + Configuration factoryConfiguration, + Configuration pipelineConfiguration, ClassLoader classLoader) { - this.enrichmentOptions = enrichmentOptions; - this.configuration = configuration; + this.factoryConfiguration = factoryConfiguration; + this.pipelineConfiguration = pipelineConfiguration; this.classLoader = classLoader; } @Override - public Configuration getConfiguration() { - return configuration; + public Configuration getFactoryConfiguration() { + return factoryConfiguration; } @Override - public ClassLoader getClassLoader() { - return classLoader; + public Configuration getPipelineConfiguration() { + return pipelineConfiguration; } @Override - public Map getEnrichmentOptions() { - return enrichmentOptions; + public ClassLoader getClassLoader() { + return classLoader; } } } diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineOptions.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineOptions.java index 838e0c15990..47be8b65b55 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineOptions.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineOptions.java @@ -57,6 +57,26 @@ public class PipelineOptions { "EXCEPTION: Throw an exception to terminate the sync pipeline."))) .build()); + public static final ConfigOption PIPELINE_LOCAL_TIME_ZONE = + ConfigOptions.key("pipeline.local-time-zone") + .stringType() + // "systemDefault" is a special value to decide whether to use + // ZoneId.systemDefault() in + // PipelineOptions.getLocalTimeZone() + .defaultValue("systemDefault") + .withDescription( + Description.builder() + .text( + "The local time zone defines current session time zone id. ") + .linebreak() + .text( + "It is used when converting to/from TIMESTAMP WITH LOCAL TIME ZONE. " + + "Internally, timestamps with local time zone are always represented in the UTC time zone. " + + "However, when converting to data types that don't include a time zone (e.g. TIMESTAMP, STRING), " + + "the session time zone is used during conversion. The input of option is either a full name " + + "such as \"America/Los_Angeles\", or a custom timezone id such as \"GMT-08:00\".") + .build()); + public static final ConfigOption SCHEMA_OPERATOR_UID = ConfigOptions.key("pipeline.schema.operator.uid") .stringType() diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java index e5cbc57505a..6c65ae05d1e 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java @@ -16,10 +16,16 @@ package com.ververica.cdc.composer.definition; +import com.ververica.cdc.common.annotation.VisibleForTesting; import com.ververica.cdc.common.configuration.Configuration; +import com.ververica.cdc.common.types.LocalZonedTimestampType; +import java.time.ZoneId; import java.util.List; import java.util.Objects; +import java.util.TimeZone; + +import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE; /** * Definition of a pipeline. @@ -58,7 +64,7 @@ public PipelineDef( this.sink = sink; this.routes = routes; this.transforms = transforms; - this.config = config; + this.config = evaluatePipelineTimeZone(config); } public SourceDef getSource() { @@ -117,4 +123,50 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(source, sink, routes, transforms, config); } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Returns the current session time zone id. It is used when converting to/from {@code TIMESTAMP + * WITH LOCAL TIME ZONE}. + * + * @see LocalZonedTimestampType + */ + @VisibleForTesting + private static Configuration evaluatePipelineTimeZone(Configuration configuration) { + final String zone = configuration.get(PIPELINE_LOCAL_TIME_ZONE); + ZoneId zoneId; + if (PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(zone)) { + zoneId = ZoneId.systemDefault(); + } else { + validateTimeZone(zone); + zoneId = ZoneId.of(zone); + } + configuration.set(PIPELINE_LOCAL_TIME_ZONE, zoneId.toString()); + return configuration; + } + + /** + * Validates a time zone is valid or not. + * + * @param zone given time zone + */ + private static void validateTimeZone(String zone) { + boolean isValid; + try { + isValid = TimeZone.getTimeZone(zone).toZoneId().equals(ZoneId.of(zone)); + } catch (Exception ignore) { + isValid = false; + } + + if (!isValid) { + throw new IllegalArgumentException( + "Invalid time zone. The valid value should be a Time Zone Database ID " + + "such as 'America/Los_Angeles' to include daylight saving time. " + + "Fixed offsets are supported using 'GMT-08:00' or 'GMT+08:00'. " + + "Or use 'UTC' without time zone and daylight saving time."); + } + } } diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java index cc07ef7931f..1a3df4f42ff 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java @@ -20,6 +20,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.configuration.Configuration; import com.ververica.cdc.common.event.Event; import com.ververica.cdc.common.factories.DataSinkFactory; import com.ververica.cdc.common.factories.FactoryHelper; @@ -80,14 +81,14 @@ public PipelineExecution compose(PipelineDef pipelineDef) { // Source DataSourceTranslator sourceTranslator = new DataSourceTranslator(); DataStream stream = - sourceTranslator.translate(pipelineDef.getSource(), env, parallelism); + sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig()); // Route RouteTranslator routeTranslator = new RouteTranslator(); stream = routeTranslator.translate(stream, pipelineDef.getRoute()); // Create sink in advance as schema operator requires MetadataApplier - DataSink dataSink = createDataSink(pipelineDef.getSink()); + DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig()); // Schema operator SchemaOperatorTranslator schemaOperatorTranslator = @@ -118,7 +119,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) { env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking); } - private DataSink createDataSink(SinkDef sinkDef) { + private DataSink createDataSink(SinkDef sinkDef, Configuration pipelineConfig) { // Search the data sink factory DataSinkFactory sinkFactory = FactoryDiscoveryUtils.getFactoryByIdentifier( @@ -131,8 +132,8 @@ private DataSink createDataSink(SinkDef sinkDef) { // Create data sink return sinkFactory.createDataSink( new FactoryHelper.DefaultContext( - sinkDef.getConfig().toMap(), sinkDef.getConfig(), + pipelineConfig, Thread.currentThread().getContextClassLoader())); } diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/DataSourceTranslator.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/DataSourceTranslator.java index dbde8755838..4fb8aa54582 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/DataSourceTranslator.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/DataSourceTranslator.java @@ -21,9 +21,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.configuration.Configuration; import com.ververica.cdc.common.event.Event; import com.ververica.cdc.common.factories.DataSourceFactory; import com.ververica.cdc.common.factories.FactoryHelper; +import com.ververica.cdc.common.pipeline.PipelineOptions; import com.ververica.cdc.common.source.DataSource; import com.ververica.cdc.common.source.EventSourceProvider; import com.ververica.cdc.common.source.FlinkSourceFunctionProvider; @@ -39,8 +41,9 @@ */ @Internal public class DataSourceTranslator { + public DataStreamSource translate( - SourceDef sourceDef, StreamExecutionEnvironment env, int sourceParallelism) { + SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) { // Search the data source factory DataSourceFactory sourceFactory = FactoryDiscoveryUtils.getFactoryByIdentifier( @@ -50,8 +53,8 @@ public DataStreamSource translate( DataSource dataSource = sourceFactory.createDataSource( new FactoryHelper.DefaultContext( - sourceDef.getConfig().toMap(), sourceDef.getConfig(), + pipelineConfig, Thread.currentThread().getContextClassLoader())); // Add source JAR to environment @@ -59,6 +62,7 @@ public DataStreamSource translate( .ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar)); // Get source provider + final int sourceParallelism = pipelineConfig.get(PipelineOptions.GLOBAL_PARALLELISM); EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider(); if (eventSourceProvider instanceof FlinkSourceProvider) { // Source diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerTest.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerTest.java index 70ab04afe79..b7efe12ae95 100644 --- a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerTest.java +++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerTest.java @@ -49,8 +49,8 @@ public void testCreateDataSinkFromSinkDef() { DataSink dataSink = sinkFactory.createDataSink( new FactoryHelper.DefaultContext( - sinkDef.getConfig().toMap(), sinkDef.getConfig(), + new Configuration(), Thread.currentThread().getContextClassLoader())); Assert.assertTrue(dataSink instanceof DataSinkFactory1.TestDataSink); diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/translator/DataSourceTranslatorTest.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/translator/DataSourceTranslatorTest.java index 68fc7b9d857..8213ca3a8b6 100644 --- a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/translator/DataSourceTranslatorTest.java +++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/translator/DataSourceTranslatorTest.java @@ -49,8 +49,8 @@ public void testCreateDataSourceFromSourceDef() { DataSource dataSource = sourceFactory.createDataSource( new FactoryHelper.DefaultContext( - sourceDef.getConfig().toMap(), sourceDef.getConfig(), + new Configuration(), Thread.currentThread().getContextClassLoader())); Assert.assertTrue(dataSource instanceof DataSourceFactory1.TestDataSource); diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSinkFactory1.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSinkFactory1.java index 875711517bf..37297bd8aa9 100644 --- a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSinkFactory1.java +++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSinkFactory1.java @@ -29,7 +29,7 @@ public class DataSinkFactory1 implements DataSinkFactory { @Override public DataSink createDataSink(Context context) { - return new TestDataSink(context.getConfiguration().get(TestOptions.HOST)); + return new TestDataSink(context.getFactoryConfiguration().get(TestOptions.HOST)); } @Override diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSourceFactory1.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSourceFactory1.java index b33ecbd4641..8e8c2c13a43 100644 --- a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSourceFactory1.java +++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSourceFactory1.java @@ -29,7 +29,7 @@ public class DataSourceFactory1 implements DataSourceFactory { @Override public DataSource createDataSource(Context context) { - return new TestDataSource(context.getConfiguration().get(TestOptions.HOST)); + return new TestDataSource(context.getFactoryConfiguration().get(TestOptions.HOST)); } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/factory/ValuesDataFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/factory/ValuesDataFactory.java index 475a6150742..489972c63b6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/factory/ValuesDataFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/factory/ValuesDataFactory.java @@ -41,17 +41,18 @@ public class ValuesDataFactory implements DataSourceFactory, DataSinkFactory { @Override public DataSource createDataSource(Context context) { ValuesDataSourceHelper.EventSetId eventType = - context.getConfiguration().get(ValuesDataSourceOptions.EVENT_SET_ID); + context.getFactoryConfiguration().get(ValuesDataSourceOptions.EVENT_SET_ID); int failAtPos = - context.getConfiguration().get(ValuesDataSourceOptions.FAILURE_INJECTION_INDEX); + context.getFactoryConfiguration() + .get(ValuesDataSourceOptions.FAILURE_INJECTION_INDEX); return new ValuesDataSource(eventType, failAtPos); } @Override public DataSink createDataSink(Context context) { return new ValuesDataSink( - context.getConfiguration().get(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY), - context.getConfiguration().get(ValuesDataSinkOptions.PRINT_ENABLED)); + context.getFactoryConfiguration().get(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY), + context.getFactoryConfiguration().get(ValuesDataSinkOptions.PRINT_ENABLED)); } @Override