Skip to content

Commit

Permalink
[cdc-composer] Pass pipeline configuration during construct DataSourc…
Browse files Browse the repository at this point in the history
…e/DataSink
  • Loading branch information
leonardBang committed Dec 3, 2023
1 parent cfb3fd2 commit be2deae
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import java.util.Arrays;
import java.util.Collections;

import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

/** Unit test for {@link YamlPipelineDefinitionParser}. */
class YamlPipelineDefinitionParserTest {
Expand Down Expand Up @@ -64,11 +66,81 @@ void testMinimizedDefinition() throws Exception {
void testOverridingGlobalConfig() throws Exception {
URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
ImmutableMap.<String, String>builder().put("parallelism", "1").put("foo", "bar");
PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration());
PipelineDef pipelineDef =
parser.parse(
Paths.get(resource.toURI()),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("parallelism", "1")
.put("foo", "bar")
.build()));
assertThat(pipelineDef).isEqualTo(fullDefWithGlobalConf);
}

@Test
void testEvaluateDefaultLocalTimeZone() throws Exception {
URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration());
assertThat(pipelineDef.getConfig().get(PIPELINE_LOCAL_TIME_ZONE))
.isNotEqualTo(PIPELINE_LOCAL_TIME_ZONE.defaultValue());
}

@Test
void testValidTimeZone() throws Exception {
URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
parser.parse(
Paths.get(resource.toURI()),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put(PIPELINE_LOCAL_TIME_ZONE.key(), "Asia/Shanghai")
.build()));
assertThat(pipelineDef.getConfig().get(PIPELINE_LOCAL_TIME_ZONE))
.isEqualTo("Asia/Shanghai");

pipelineDef =
parser.parse(
Paths.get(resource.toURI()),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put(PIPELINE_LOCAL_TIME_ZONE.key(), "GMT+08:00")
.build()));
assertThat(pipelineDef.getConfig().get(PIPELINE_LOCAL_TIME_ZONE)).isEqualTo("GMT+08:00");

pipelineDef =
parser.parse(
Paths.get(resource.toURI()),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put(PIPELINE_LOCAL_TIME_ZONE.key(), "UTC")
.build()));
assertThat(pipelineDef.getConfig().get(PIPELINE_LOCAL_TIME_ZONE)).isEqualTo("UTC");
}

@Test
void testInvalidTimeZone() throws Exception {
URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
assertThatThrownBy(
() ->
parser.parse(
Paths.get(resource.toURI()),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put(
PIPELINE_LOCAL_TIME_ZONE.key(),
"invalid time zone")
.build())))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"Invalid time zone. The valid value should be a Time Zone Database ID"
+ " such as 'America/Los_Angeles' to include daylight saving time. "
+ "Fixed offsets are supported using 'GMT-08:00' or 'GMT+08:00'. "
+ "Or use 'UTC' without time zone and daylight saving time.");
}

private final PipelineDef fullDef =
new PipelineDef(
new SourceDef(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,26 @@ public class PipelineOptions {
"EXCEPTION: Throw an exception to terminate the sync pipeline.")))
.build());

public static final ConfigOption<String> PIPELINE_LOCAL_TIME_ZONE =
ConfigOptions.key("pipeline.local-time-zone")
.stringType()
// "systemDefault" is a special value to decide whether to use
// ZoneId.systemDefault() in
// PipelineOptions.getLocalTimeZone()
.defaultValue("systemDefault")
.withDescription(
Description.builder()
.text(
"The local time zone defines current session time zone id. ")
.linebreak()
.text(
"It is used when converting to/from <code>TIMESTAMP WITH LOCAL TIME ZONE</code>. "
+ "Internally, timestamps with local time zone are always represented in the UTC time zone. "
+ "However, when converting to data types that don't include a time zone (e.g. TIMESTAMP, STRING), "
+ "the session time zone is used during conversion. The input of option is either a full name "
+ "such as \"America/Los_Angeles\", or a custom timezone id such as \"GMT-08:00\".")
.build());

public static final ConfigOption<String> SCHEMA_OPERATOR_UID =
ConfigOptions.key("pipeline.schema.operator.uid")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@

package com.ververica.cdc.composer.definition;

import com.ververica.cdc.common.annotation.VisibleForTesting;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.types.LocalZonedTimestampType;

import java.time.ZoneId;
import java.util.List;
import java.util.Objects;
import java.util.TimeZone;

import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;

/**
* Definition of a pipeline.
Expand Down Expand Up @@ -58,7 +64,7 @@ public PipelineDef(
this.sink = sink;
this.routes = routes;
this.transforms = transforms;
this.config = config;
this.config = evaluatePipelineTimeZone(config);
}

public SourceDef getSource() {
Expand Down Expand Up @@ -117,4 +123,50 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(source, sink, routes, transforms, config);
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

/**
* Returns the current session time zone id. It is used when converting to/from {@code TIMESTAMP
* WITH LOCAL TIME ZONE}.
*
* @see LocalZonedTimestampType
*/
@VisibleForTesting
private static Configuration evaluatePipelineTimeZone(Configuration configuration) {
final String zone = configuration.get(PIPELINE_LOCAL_TIME_ZONE);
ZoneId zoneId;
if (PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(zone)) {
zoneId = ZoneId.systemDefault();
} else {
validateTimeZone(zone);
zoneId = ZoneId.of(zone);
}
configuration.set(PIPELINE_LOCAL_TIME_ZONE, zoneId.toString());
return configuration;
}

/**
* Validates a time zone is valid or not.
*
* @param zone given time zone
*/
private static void validateTimeZone(String zone) {
boolean isValid;
try {
isValid = TimeZone.getTimeZone(zone).toZoneId().equals(ZoneId.of(zone));
} catch (Exception ignore) {
isValid = false;
}

if (!isValid) {
throw new IllegalArgumentException(
"Invalid time zone. The valid value should be a Time Zone Database ID "
+ "such as 'America/Los_Angeles' to include daylight saving time. "
+ "Fixed offsets are supported using 'GMT-08:00' or 'GMT+08:00'. "
+ "Or use 'UTC' without time zone and daylight saving time.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.factories.DataSinkFactory;
import com.ververica.cdc.common.factories.FactoryHelper;
Expand Down Expand Up @@ -80,14 +81,14 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
// Source
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
sourceTranslator.translate(pipelineDef.getSource(), env, parallelism);
sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig());

// Route
RouteTranslator routeTranslator = new RouteTranslator();
stream = routeTranslator.translate(stream, pipelineDef.getRoute());

// Create sink in advance as schema operator requires MetadataApplier
DataSink dataSink = createDataSink(pipelineDef.getSink());
DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig());

// Schema operator
SchemaOperatorTranslator schemaOperatorTranslator =
Expand Down Expand Up @@ -118,7 +119,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
}

private DataSink createDataSink(SinkDef sinkDef) {
private DataSink createDataSink(SinkDef sinkDef, Configuration pipelineConfig) {
// Search the data sink factory
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
Expand All @@ -132,7 +133,7 @@ private DataSink createDataSink(SinkDef sinkDef) {
return sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
sinkDef.getConfig().toMap(),
sinkDef.getConfig(),
pipelineConfig,
Thread.currentThread().getContextClassLoader()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.factories.DataSourceFactory;
import com.ververica.cdc.common.factories.FactoryHelper;
import com.ververica.cdc.common.pipeline.PipelineOptions;
import com.ververica.cdc.common.source.DataSource;
import com.ververica.cdc.common.source.EventSourceProvider;
import com.ververica.cdc.common.source.FlinkSourceFunctionProvider;
Expand All @@ -39,8 +41,9 @@
*/
@Internal
public class DataSourceTranslator {

public DataStreamSource<Event> translate(
SourceDef sourceDef, StreamExecutionEnvironment env, int sourceParallelism) {
SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
// Search the data source factory
DataSourceFactory sourceFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
Expand All @@ -51,14 +54,15 @@ public DataStreamSource<Event> translate(
sourceFactory.createDataSource(
new FactoryHelper.DefaultContext(
sourceDef.getConfig().toMap(),
sourceDef.getConfig(),
pipelineConfig,
Thread.currentThread().getContextClassLoader()));

// Add source JAR to environment
FactoryDiscoveryUtils.getJarPathByIdentifier(sourceDef.getType(), DataSourceFactory.class)
.ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));

// Get source provider
final int sourceParallelism = pipelineConfig.get(PipelineOptions.GLOBAL_PARALLELISM);
EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider();
if (eventSourceProvider instanceof FlinkSourceProvider) {
// Source
Expand Down

0 comments on commit be2deae

Please sign in to comment.