diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java index f565f7b3d06..46f91ddff4d 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java @@ -41,4 +41,10 @@ public interface DataSink { default HashFunctionProvider getDataChangeEventHashFunctionProvider() { return new DefaultDataChangeEventHashFunctionProvider(); } + + default HashFunctionProvider getDataChangeEventHashFunctionProvider( + int parallelism) { + return getDataChangeEventHashFunctionProvider(); // fallback to nullary version if it isn't + // overridden + } } diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index ca4378ad134..bddd5fc00fe 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -146,7 +146,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) { parallelism, parallelism, schemaOperatorIDGenerator.generate(), - dataSink.getDataChangeEventHashFunctionProvider()); + dataSink.getDataChangeEventHashFunctionProvider(parallelism)); // Build Sink Operator sinkTranslator.translate( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java index 6c94a3a74e5..5a95f1efd52 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java @@ -17,18 +17,21 @@ package org.apache.flink.cdc.connectors.paimon.sink; +import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.function.HashFunctionProvider; import org.apache.flink.cdc.common.sink.DataSink; import org.apache.flink.cdc.common.sink.EventSinkProvider; import org.apache.flink.cdc.common.sink.FlinkSinkProvider; import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonEventSink; import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordSerializer; -import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonSink; import org.apache.paimon.options.Options; import java.io.Serializable; +import java.time.ZoneId; import java.util.List; import java.util.Map; @@ -47,26 +50,41 @@ public class PaimonDataSink implements DataSink, Serializable { private final PaimonRecordSerializer serializer; + private final ZoneId zoneId; + + public final String schemaOperatorUid; + public PaimonDataSink( Options options, Map tableOptions, String commitUser, Map> partitionMaps, - PaimonRecordSerializer serializer) { + PaimonRecordSerializer serializer, + ZoneId zoneId, + String schemaOperatorUid) { this.options = options; this.tableOptions = tableOptions; this.commitUser = commitUser; this.partitionMaps = partitionMaps; this.serializer = serializer; + this.zoneId = zoneId; + this.schemaOperatorUid = schemaOperatorUid; } @Override public EventSinkProvider getEventSinkProvider() { - return FlinkSinkProvider.of(new PaimonSink<>(options, commitUser, serializer)); + return FlinkSinkProvider.of( + new PaimonEventSink(options, commitUser, serializer, schemaOperatorUid, zoneId)); } @Override public MetadataApplier getMetadataApplier() { return new PaimonMetadataApplier(options, tableOptions, partitionMaps); } + + @Override + public HashFunctionProvider getDataChangeEventHashFunctionProvider( + int parallelism) { + return new PaimonHashFunctionProvider(options, zoneId, parallelism); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java index cf207325d7e..302ba629ac9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java @@ -104,7 +104,17 @@ public DataSink createDataSink(Context context) { } } PaimonRecordSerializer serializer = new PaimonRecordEventSerializer(zoneId); - return new PaimonDataSink(options, tableOptions, commitUser, partitionMaps, serializer); + String schemaOperatorUid = + context.getPipelineConfiguration() + .get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID); + return new PaimonDataSink( + options, + tableOptions, + commitUser, + partitionMaps, + serializer, + zoneId, + schemaOperatorUid); } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java new file mode 100644 index 00000000000..b01ad39f2ba --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink; + +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.function.HashFunction; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriterHelper; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.sink.RowAssignerChannelComputer; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; + +import java.io.Serializable; +import java.time.ZoneId; +import java.util.List; + +/** + * A {@link HashFunction} implementation for {@link PaimonDataSink}. Shuffle {@link DataChangeEvent} + * by hash of PrimaryKey. + */ +public class PaimonHashFunction implements HashFunction, Serializable { + + private static final long serialVersionUID = 1L; + + private final List fieldGetters; + + private final RowAssignerChannelComputer channelComputer; + + public PaimonHashFunction( + Options options, TableId tableId, Schema schema, ZoneId zoneId, int parallelism) { + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options); + FileStoreTable table; + try { + table = (FileStoreTable) catalog.getTable(Identifier.fromString(tableId.toString())); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema, zoneId); + channelComputer = new RowAssignerChannelComputer(table.schema(), parallelism); + channelComputer.setup(parallelism); + } + + @Override + public int hashcode(DataChangeEvent event) { + GenericRow genericRow = PaimonWriterHelper.convertEventToGenericRow(event, fieldGetters); + return channelComputer.channel(genericRow); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java new file mode 100644 index 00000000000..5f641f409e0 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink; + +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.function.HashFunction; +import org.apache.flink.cdc.common.function.HashFunctionProvider; +import org.apache.flink.cdc.common.schema.Schema; + +import org.apache.paimon.options.Options; + +import javax.annotation.Nullable; + +import java.time.ZoneId; + +/** A {@link HashFunctionProvider} implementation for {@link PaimonDataSink}. */ +public class PaimonHashFunctionProvider implements HashFunctionProvider { + + private final Options options; + + private final ZoneId zoneId; + + private final int parallelism; + + public PaimonHashFunctionProvider(Options options, ZoneId zoneId, int parallelism) { + this.options = options; + this.zoneId = zoneId; + this.parallelism = parallelism; + } + + @Override + public HashFunction getHashFunction(@Nullable TableId tableId, Schema schema) { + return new PaimonHashFunction(options, tableId, schema, zoneId, parallelism); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/OperatorIDGenerator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/OperatorIDGenerator.java new file mode 100644 index 00000000000..125fb17f1cb --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/OperatorIDGenerator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.runtime.jobgraph.OperatorID; + +import org.apache.flink.shaded.guava31.com.google.common.hash.Hashing; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** Generating {@link OperatorID} for communication between Flink operators. */ +@Internal +public class OperatorIDGenerator { + private final String transformationUid; + + public OperatorIDGenerator(String transformationUid) { + this.transformationUid = transformationUid; + } + + /** + * Generate {@link OperatorID}. + * + *

Operator ID generation is an internal implementation inside Flink, happening during the + * stream graph generating phase, so our algorithm of generating operator ID should be exactly + * the same as in Flink, in order to make sure that operators can reach out each other on the + * cluster. + * + * @see + * org.apache.flink.streaming.api.graph.StreamGraphHasherV2#traverseStreamGraphAndGenerateHashes + * the algorithm of generating operator ID in Flink + */ + public OperatorID generate() { + byte[] hash = + Hashing.murmur3_128(0) + .newHasher() + .putString(transformationUid, UTF_8) + .hash() + .asBytes(); + return new OperatorID(hash); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java index eb136aae754..d23ca7e767a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java @@ -31,6 +31,7 @@ public class PaimonEvent { // if true, means that table schema has changed right before this genericRow. boolean shouldRefreshSchema; + int bucket; public PaimonEvent(Identifier tableId, GenericRow genericRow) { this.tableId = tableId; @@ -44,6 +45,14 @@ public PaimonEvent(Identifier tableId, GenericRow genericRow, boolean shouldRefr this.shouldRefreshSchema = shouldRefreshSchema; } + public PaimonEvent( + Identifier tableId, GenericRow genericRow, boolean shouldRefreshSchema, int bucket) { + this.tableId = tableId; + this.genericRow = genericRow; + this.shouldRefreshSchema = shouldRefreshSchema; + this.bucket = bucket; + } + public Identifier getTableId() { return tableId; } @@ -67,4 +76,12 @@ public GenericRow getGenericRow() { public void setGenericRow(GenericRow genericRow) { this.genericRow = genericRow; } + + public int getBucket() { + return bucket; + } + + public void setBucket(int bucket) { + this.bucket = bucket; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java new file mode 100644 index 00000000000..d6837915fa6 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator; +import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapper; +import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventTypeInfo; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology; +import org.apache.flink.streaming.api.datastream.DataStream; + +import org.apache.paimon.flink.sink.MultiTableCommittable; +import org.apache.paimon.flink.sink.MultiTableCommittableSerializer; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.sink.CommitMessageSerializer; + +import java.time.ZoneId; + +/** A {@link PaimonSink} to process {@link Event}. */ +public class PaimonEventSink extends PaimonSink implements WithPreWriteTopology { + + public final String schemaOperatorUid; + + public final ZoneId zoneId; + + public PaimonEventSink( + Options catalogOptions, + String commitUser, + PaimonRecordSerializer serializer, + String schemaOperatorUid, + ZoneId zoneId) { + super(catalogOptions, commitUser, serializer); + this.schemaOperatorUid = schemaOperatorUid; + this.zoneId = zoneId; + } + + @Override + public DataStream addPreWriteTopology(DataStream dataStream) { + // Shuffle by key hash => Assign bucket => Shuffle by bucket. + return dataStream + .transform( + "BucketAssign", + new BucketWrapperEventTypeInfo(), + new BucketAssignOperator( + catalogOptions, schemaOperatorUid, zoneId, commitUser)) + .name("Assign Bucket") + // All Events after BucketAssignOperator are decorated with BucketWrapper. + .partitionCustom( + (bucket, numPartitions) -> bucket % numPartitions, + (event) -> ((BucketWrapper) event).getBucket()); + } + + @Override + public SimpleVersionedSerializer getCommittableSerializer() { + CommitMessageSerializer fileSerializer = new CommitMessageSerializer(); + return new MultiTableCommittableSerializer(fileSerializer); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java index 53b63f3b599..c3ceb31ac0b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperChangeEvent; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.GenericRow; @@ -51,10 +52,12 @@ public PaimonRecordEventSerializer(ZoneId zoneId) { @Override public PaimonEvent serialize(Event event) { - Identifier tableId = - Identifier.create( - ((ChangeEvent) event).tableId().getSchemaName(), - ((ChangeEvent) event).tableId().getTableName()); + int bucket = 0; + if (event instanceof BucketWrapperChangeEvent) { + bucket = ((BucketWrapperChangeEvent) event).getBucket(); + event = ((BucketWrapperChangeEvent) event).getInnerEvent(); + } + Identifier tableId = Identifier.fromString(((ChangeEvent) event).tableId().toString()); if (event instanceof SchemaChangeEvent) { if (event instanceof CreateTableEvent) { CreateTableEvent createTableEvent = (CreateTableEvent) event; @@ -78,7 +81,7 @@ public PaimonEvent serialize(Event event) { PaimonWriterHelper.convertEventToGenericRow( dataChangeEvent, schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters()); - return new PaimonEvent(tableId, genericRow); + return new PaimonEvent(tableId, genericRow, false, bucket); } else { throw new IllegalArgumentException( "failed to convert Input into PaimonEvent, unsupported event: " + event); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java index 5e310b2c319..61824ec44a0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java @@ -40,9 +40,9 @@ public class PaimonSink implements WithPreCommitTopology serializer; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java index 1b51535b215..87229f36c00 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java @@ -33,7 +33,6 @@ import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.options.Options; -import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.ExecutorThreadFactory; @@ -134,7 +133,7 @@ public void write(InputT event, Context context) throws IOException { return storeSinkWrite; }); try { - write.write(paimonEvent.getGenericRow()); + write.write(paimonEvent.getGenericRow(), paimonEvent.getBucket()); } catch (Exception e) { throw new IOException(e); } @@ -142,22 +141,15 @@ public void write(InputT event, Context context) throws IOException { } private FileStoreTable getTable(Identifier tableId) { - FileStoreTable table = - tables.computeIfAbsent( - tableId, - id -> { - try { - return (FileStoreTable) catalog.getTable(tableId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - - if (table.bucketMode() != BucketMode.FIXED) { - throw new UnsupportedOperationException( - "Unified Sink only supports FIXED bucket mode, but is " + table.bucketMode()); - } - return table; + return tables.computeIfAbsent( + tableId, + id -> { + try { + return (FileStoreTable) catalog.getTable(tableId); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } /** diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java index 3d49086b2d7..941189a0ad4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java @@ -148,7 +148,7 @@ public SinkRecord write(InternalRow internalRow) throws Exception { @Override public SinkRecord write(InternalRow internalRow, int i) throws Exception { - return write.writeAndReturn(internalRow); + return write.writeAndReturn(internalRow, i); } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java new file mode 100644 index 00000000000..07509574c62 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket; + +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.cdc.common.event.ChangeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.connectors.paimon.sink.v2.OperatorIDGenerator; +import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriterHelper; +import org.apache.flink.cdc.connectors.paimon.sink.v2.TableSchemaInfo; +import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; +import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.index.BucketAssigner; +import org.apache.paimon.index.HashBucketAssigner; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.RowKeyExtractor; +import org.apache.paimon.table.sink.RowPartitionKeyExtractor; +import org.apache.paimon.utils.MathUtils; + +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** Assign bucket for every given {@link DataChangeEvent}. */ +public class BucketAssignOperator extends AbstractStreamOperator + implements OneInputStreamOperator { + + public final String commitUser; + + private final Options catalogOptions; + + private Catalog catalog; + + Map> + bucketAssignerMap; + + // maintain the latest schema of tableId. + private Map schemaMaps; + + private int totalTasksNumber; + + private int currentTaskNumber; + + public final String schemaOperatorUid; + + private transient SchemaEvolutionClient schemaEvolutionClient; + + private final ZoneId zoneId; + + public BucketAssignOperator( + Options catalogOptions, String schemaOperatorUid, ZoneId zoneId, String commitUser) { + this.catalogOptions = catalogOptions; + this.chainingStrategy = ChainingStrategy.ALWAYS; + this.schemaOperatorUid = schemaOperatorUid; + this.commitUser = commitUser; + this.zoneId = zoneId; + } + + @Override + public void open() throws Exception { + super.open(); + this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + this.bucketAssignerMap = new HashMap<>(); + this.totalTasksNumber = getRuntimeContext().getNumberOfParallelSubtasks(); + this.currentTaskNumber = getRuntimeContext().getIndexOfThisSubtask(); + this.schemaMaps = new HashMap<>(); + } + + @Override + public void setup( + StreamTask containingTask, + StreamConfig config, + Output> output) { + super.setup(containingTask, config, output); + TaskOperatorEventGateway toCoordinator = + getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway(); + schemaEvolutionClient = + new SchemaEvolutionClient( + toCoordinator, new OperatorIDGenerator(schemaOperatorUid).generate()); + } + + @Override + public void processElement(StreamRecord streamRecord) throws Exception { + Event event = streamRecord.getValue(); + if (event instanceof FlushEvent) { + output.collect( + new StreamRecord<>( + new BucketWrapperFlushEvent( + currentTaskNumber, ((FlushEvent) event).getTableId()))); + return; + } + + if (event instanceof DataChangeEvent) { + DataChangeEvent dataChangeEvent = (DataChangeEvent) event; + if (schemaMaps.containsKey(dataChangeEvent.tableId())) { + Optional schema = + schemaEvolutionClient.getLatestEvolvedSchema(dataChangeEvent.tableId()); + if (schema.isPresent()) { + schemaMaps.put( + dataChangeEvent.tableId(), new TableSchemaInfo(schema.get(), zoneId)); + } else { + throw new RuntimeException( + "Could not find schema message from SchemaRegistry for " + + dataChangeEvent.tableId()); + } + } + Tuple4 tuple4 = + bucketAssignerMap.computeIfAbsent( + dataChangeEvent.tableId(), this::getTableInfo); + int bucket; + GenericRow genericRow = + PaimonWriterHelper.convertEventToGenericRow( + dataChangeEvent, + schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters()); + switch (tuple4.f0) { + case DYNAMIC: + { + bucket = + tuple4.f2.assign( + tuple4.f3.partition(genericRow), + tuple4.f3.trimmedPrimaryKey(genericRow).hashCode()); + break; + } + case FIXED: + { + tuple4.f1.setRecord(genericRow); + bucket = tuple4.f1.bucket(); + break; + } + case UNAWARE: + { + bucket = 0; + break; + } + case GLOBAL_DYNAMIC: + default: + { + throw new RuntimeException("Unsupported bucket mode: " + tuple4.f0); + } + } + output.collect( + new StreamRecord<>(new BucketWrapperChangeEvent(bucket, (ChangeEvent) event))); + } else if (event instanceof CreateTableEvent) { + CreateTableEvent createTableEvent = (CreateTableEvent) event; + schemaMaps.put( + createTableEvent.tableId(), + new TableSchemaInfo(createTableEvent.getSchema(), zoneId)); + output.collect( + new StreamRecord<>( + new BucketWrapperChangeEvent(currentTaskNumber, (ChangeEvent) event))); + } else if (event instanceof SchemaChangeEvent) { + SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; + Schema schema = + SchemaUtils.applySchemaChangeEvent( + schemaMaps.get(schemaChangeEvent.tableId()).getSchema(), + schemaChangeEvent); + schemaMaps.put(schemaChangeEvent.tableId(), new TableSchemaInfo(schema, zoneId)); + output.collect( + new StreamRecord<>( + new BucketWrapperChangeEvent(currentTaskNumber, (ChangeEvent) event))); + } + } + + private Tuple4 + getTableInfo(TableId tableId) { + Preconditions.checkNotNull(tableId, "Invalid tableId in given event."); + FileStoreTable table; + try { + table = (FileStoreTable) catalog.getTable(Identifier.fromString(tableId.toString())); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum(); + Integer numAssigners = table.coreOptions().dynamicBucketInitialBuckets(); + return new Tuple4<>( + table.bucketMode(), + table.createRowKeyExtractor(), + new HashBucketAssigner( + table.snapshotManager(), + commitUser, + table.store().newIndexFileHandler(), + totalTasksNumber, + MathUtils.min(numAssigners, totalTasksNumber), + currentTaskNumber, + targetRowNum), + new RowPartitionKeyExtractor(table.schema())); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapper.java new file mode 100644 index 00000000000..ed56cee9795 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapper.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.flink.sink.StoreSinkWrite; + +/** Wrapper class with bucket. */ +public interface BucketWrapper { + + /** Bucket value that was passed in {@link StoreSinkWrite#write(InternalRow, int)}. */ + int getBucket(); +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java new file mode 100644 index 00000000000..6c0b3c155ad --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket; + +import org.apache.flink.cdc.common.event.ChangeEvent; +import org.apache.flink.cdc.common.event.TableId; + +import java.io.Serializable; +import java.util.Objects; + +/** A wrapper class for {@link ChangeEvent} to attach bucket id. */ +public class BucketWrapperChangeEvent implements ChangeEvent, BucketWrapper, Serializable { + private static final long serialVersionUID = 1L; + private final int bucket; + + private final ChangeEvent innerEvent; + + public BucketWrapperChangeEvent(int bucket, ChangeEvent innerEvent) { + this.bucket = bucket; + this.innerEvent = innerEvent; + } + + public int getBucket() { + return bucket; + } + + public ChangeEvent getInnerEvent() { + return innerEvent; + } + + @Override + public TableId tableId() { + return innerEvent.tableId(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BucketWrapperChangeEvent that = (BucketWrapperChangeEvent) o; + return bucket == that.bucket && Objects.equals(innerEvent, that.innerEvent); + } + + @Override + public int hashCode() { + return Objects.hash(bucket, innerEvent); + } + + @Override + public String toString() { + return "BucketWrapperChangeEvent{" + + "bucket=" + + bucket + + ", innerEvent=" + + innerEvent + + '}'; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java new file mode 100644 index 00000000000..f37d9952f98 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket; + +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.cdc.common.event.ChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.runtime.serializer.EnumSerializer; +import org.apache.flink.cdc.runtime.serializer.TableIdSerializer; +import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton; +import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** A {@link TypeSerializerSingleton} for {@link BucketWrapperChangeEvent}. */ +public class BucketWrapperEventSerializer extends TypeSerializerSingleton { + + private static final long serialVersionUID = 1L; + + private final EnumSerializer enumSerializer = + new EnumSerializer<>(EventClass.class); + + private final EventSerializer eventSerializer = EventSerializer.INSTANCE; + + private final TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE; + + /** Sharable instance of the TableIdSerializer. */ + public static final BucketWrapperEventSerializer INSTANCE = new BucketWrapperEventSerializer(); + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public Event createInstance() { + return new Event() {}; + } + + @Override + public Event copy(Event event) { + return event; + } + + @Override + public Event copy(Event from, Event reuse) { + return copy(from); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(Event event, DataOutputView dataOutputView) throws IOException { + if (event instanceof BucketWrapperChangeEvent) { + BucketWrapperChangeEvent bucketWrapperChangeEvent = (BucketWrapperChangeEvent) event; + enumSerializer.serialize(EventClass.BUCKET_WRAPPER_CHANGE_EVENT, dataOutputView); + dataOutputView.writeInt(bucketWrapperChangeEvent.getBucket()); + eventSerializer.serialize(bucketWrapperChangeEvent.getInnerEvent(), dataOutputView); + } else if (event instanceof BucketWrapperFlushEvent) { + enumSerializer.serialize(EventClass.BUCKET_WRAPPER_FLUSH_EVENT, dataOutputView); + BucketWrapperFlushEvent bucketWrapperFlushEvent = (BucketWrapperFlushEvent) event; + dataOutputView.writeInt(bucketWrapperFlushEvent.getBucket()); + tableIdSerializer.serialize(bucketWrapperFlushEvent.getTableId(), dataOutputView); + } + } + + @Override + public Event deserialize(DataInputView source) throws IOException { + EventClass eventClass = enumSerializer.deserialize(source); + if (eventClass.equals(EventClass.BUCKET_WRAPPER_FLUSH_EVENT)) { + return new BucketWrapperFlushEvent( + source.readInt(), tableIdSerializer.deserialize(source)); + } else { + return new BucketWrapperChangeEvent( + source.readInt(), (ChangeEvent) eventSerializer.deserialize(source)); + } + } + + @Override + public Event deserialize(Event reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + serialize(deserialize(source), target); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new EventSerializerSnapshot(); + } + + /** Serializer configuration snapshot for compatibility and format evolution. */ + @SuppressWarnings("WeakerAccess") + public static final class EventSerializerSnapshot extends SimpleTypeSerializerSnapshot { + + public EventSerializerSnapshot() { + super(() -> INSTANCE); + } + } + + enum EventClass { + BUCKET_WRAPPER_CHANGE_EVENT, + BUCKET_WRAPPER_FLUSH_EVENT + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventTypeInfo.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventTypeInfo.java new file mode 100644 index 00000000000..be1c54bfa06 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventTypeInfo.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonEvent; + +/** A {@link TypeInformation} for {@link PaimonEvent}. */ +public class BucketWrapperEventTypeInfo extends TypeInformation { + + private static final long serialVersionUID = 1L; + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 0; + } + + @Override + public int getTotalFields() { + return 1; + } + + @Override + public Class getTypeClass() { + return Event.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer createSerializer(ExecutionConfig config) { + return BucketWrapperEventSerializer.INSTANCE; + } + + @Override + public String toString() { + return "BucketWrapperEvent"; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof BucketWrapperEventTypeInfo; + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof BucketWrapperEventTypeInfo; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java new file mode 100644 index 00000000000..046a22a318f --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket; + +import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.TableId; + +import java.util.Objects; + +/** A wrapper class for {@link FlushEvent} to attach bucket id. */ +public class BucketWrapperFlushEvent extends FlushEvent implements BucketWrapper { + + private final int bucket; + + public BucketWrapperFlushEvent(int bucket, TableId tableId) { + super(tableId); + this.bucket = bucket; + } + + @Override + public int getBucket() { + return bucket; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + BucketWrapperFlushEvent that = (BucketWrapperFlushEvent) o; + return bucket == that.bucket; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), bucket); + } + + @Override + public String toString() { + return "BucketWrapperFlushEvent{tableId=" + getTableId() + ", bucket=" + bucket + '}'; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java new file mode 100644 index 00000000000..86ff2f7b22e --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink; + +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.options.Options; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.nio.file.Path; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link PaimonHashFunction}. */ +public class PaimonHashFunctionTest { + + @TempDir public static Path temporaryFolder; + + private Catalog catalog; + + private Options catalogOptions; + + private static final String TEST_DATABASE = "test"; + + @BeforeEach + public void beforeEach() throws Catalog.DatabaseAlreadyExistException { + catalogOptions = new Options(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.setString("warehouse", warehouse); + catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + catalog.createDatabase(TEST_DATABASE, true); + } + + @AfterEach + public void afterEach() throws Exception { + catalog.dropDatabase(TEST_DATABASE, true, true); + catalog.close(); + } + + @Test + public void testHashCodeForFixedBucketTable() { + TableId tableId = TableId.tableId(TEST_DATABASE, "test_table"); + Map tableOptions = new HashMap<>(); + tableOptions.put("bucket", "10"); + MetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + Schema schema = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING().notNull()) + .physicalColumn("col2", DataTypes.STRING()) + .physicalColumn("pt", DataTypes.STRING()) + .primaryKey("col1", "pt") + .partitionKey("pt") + .build(); + CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema); + metadataApplier.applySchemaChange(createTableEvent); + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])); + PaimonHashFunction hashFunction = + new PaimonHashFunction(catalogOptions, tableId, schema, ZoneId.systemDefault(), 4); + DataChangeEvent dataChangeEvent1 = + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + BinaryStringData.fromString("1"), + BinaryStringData.fromString("1"), + BinaryStringData.fromString("2024") + })); + int key1 = hashFunction.hashcode(dataChangeEvent1); + + DataChangeEvent dataChangeEvent2 = + DataChangeEvent.updateEvent( + tableId, + generator.generate( + new Object[] { + BinaryStringData.fromString("1"), + BinaryStringData.fromString("1"), + BinaryStringData.fromString("2024") + }), + generator.generate( + new Object[] { + BinaryStringData.fromString("1"), + BinaryStringData.fromString("2"), + BinaryStringData.fromString("2024") + })); + int key2 = hashFunction.hashcode(dataChangeEvent2); + + DataChangeEvent dataChangeEvent3 = + DataChangeEvent.deleteEvent( + tableId, + generator.generate( + new Object[] { + BinaryStringData.fromString("1"), + BinaryStringData.fromString("2"), + BinaryStringData.fromString("2024") + })); + int key3 = hashFunction.hashcode(dataChangeEvent3); + + assertThat(key1).isEqualTo(key2); + assertThat(key1).isEqualTo(key3); + } +}