Skip to content

Commit 9ad64e1

Browse files
committed
[FLINK-35891][cdc-connector][paimon] support dynamic bucket.
1 parent b15a226 commit 9ad64e1

File tree

24 files changed

+980
-36
lines changed

24 files changed

+980
-36
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ public interface DataSink {
3838
* Get the {@code HashFunctionProvider<DataChangeEvent>} for calculating hash value if you need
3939
* to partition by data change event before Sink.
4040
*/
41-
default HashFunctionProvider<DataChangeEvent> getDataChangeEventHashFunctionProvider() {
41+
default HashFunctionProvider<DataChangeEvent> getDataChangeEventHashFunctionProvider(
42+
int parallelism) {
4243
return new DefaultDataChangeEventHashFunctionProvider();
4344
}
4445
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
140140
parallelism,
141141
parallelism,
142142
schemaOperatorIDGenerator.generate(),
143-
dataSink.getDataChangeEventHashFunctionProvider());
143+
dataSink.getDataChangeEventHashFunctionProvider(parallelism));
144144

145145
// Build Sink Operator
146146
DataSinkTranslator sinkTranslator = new DataSinkTranslator();

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,21 @@
1717

1818
package org.apache.flink.cdc.connectors.paimon.sink;
1919

20+
import org.apache.flink.cdc.common.event.DataChangeEvent;
2021
import org.apache.flink.cdc.common.event.Event;
2122
import org.apache.flink.cdc.common.event.TableId;
23+
import org.apache.flink.cdc.common.function.HashFunctionProvider;
2224
import org.apache.flink.cdc.common.sink.DataSink;
2325
import org.apache.flink.cdc.common.sink.EventSinkProvider;
2426
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
2527
import org.apache.flink.cdc.common.sink.MetadataApplier;
28+
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonEventSink;
2629
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordSerializer;
27-
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonSink;
2830

2931
import org.apache.paimon.options.Options;
3032

3133
import java.io.Serializable;
34+
import java.time.ZoneId;
3235
import java.util.List;
3336
import java.util.Map;
3437

@@ -47,26 +50,36 @@ public class PaimonDataSink implements DataSink, Serializable {
4750

4851
private final PaimonRecordSerializer<Event> serializer;
4952

53+
private final ZoneId zoneId;
54+
5055
public PaimonDataSink(
5156
Options options,
5257
Map<String, String> tableOptions,
5358
String commitUser,
5459
Map<TableId, List<String>> partitionMaps,
55-
PaimonRecordSerializer<Event> serializer) {
60+
PaimonRecordSerializer<Event> serializer,
61+
ZoneId zoneId) {
5662
this.options = options;
5763
this.tableOptions = tableOptions;
5864
this.commitUser = commitUser;
5965
this.partitionMaps = partitionMaps;
6066
this.serializer = serializer;
67+
this.zoneId = zoneId;
6168
}
6269

6370
@Override
6471
public EventSinkProvider getEventSinkProvider() {
65-
return FlinkSinkProvider.of(new PaimonSink<>(options, commitUser, serializer));
72+
return FlinkSinkProvider.of(new PaimonEventSink(options, commitUser, serializer));
6673
}
6774

6875
@Override
6976
public MetadataApplier getMetadataApplier() {
7077
return new PaimonMetadataApplier(options, tableOptions, partitionMaps);
7178
}
79+
80+
@Override
81+
public HashFunctionProvider<DataChangeEvent> getDataChangeEventHashFunctionProvider(
82+
int parallelism) {
83+
return new PaimonHashFunctionProvider(options, zoneId, parallelism);
84+
}
7285
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ public DataSink createDataSink(Context context) {
104104
}
105105
}
106106
PaimonRecordSerializer<Event> serializer = new PaimonRecordEventSerializer(zoneId);
107-
return new PaimonDataSink(options, tableOptions, commitUser, partitionMaps, serializer);
107+
return new PaimonDataSink(
108+
options, tableOptions, commitUser, partitionMaps, serializer, zoneId);
108109
}
109110

110111
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.paimon.sink;
19+
20+
import org.apache.flink.cdc.common.event.CreateTableEvent;
21+
import org.apache.flink.cdc.common.event.DataChangeEvent;
22+
import org.apache.flink.cdc.common.event.TableId;
23+
import org.apache.flink.cdc.common.function.HashFunction;
24+
import org.apache.flink.cdc.common.schema.Schema;
25+
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordEventSerializer;
26+
27+
import org.apache.paimon.catalog.Catalog;
28+
import org.apache.paimon.catalog.Identifier;
29+
import org.apache.paimon.data.GenericRow;
30+
import org.apache.paimon.flink.FlinkCatalogFactory;
31+
import org.apache.paimon.flink.sink.RowAssignerChannelComputer;
32+
import org.apache.paimon.options.Options;
33+
import org.apache.paimon.table.FileStoreTable;
34+
35+
import java.io.Serializable;
36+
import java.time.ZoneId;
37+
38+
/**
39+
* A {@link HashFunction} implementation for {@link PaimonDataSink}. Shuffle {@link DataChangeEvent}
40+
* by hash of PrimaryKey.
41+
*/
42+
public class PaimonHashFunction implements HashFunction<DataChangeEvent>, Serializable {
43+
44+
private static final long serialVersionUID = 1L;
45+
46+
private final PaimonRecordEventSerializer eventSerializer;
47+
48+
private final RowAssignerChannelComputer channelComputer;
49+
50+
public PaimonHashFunction(
51+
Options options, TableId tableId, Schema schema, ZoneId zoneId, int parallelism) {
52+
Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options);
53+
FileStoreTable table;
54+
try {
55+
table = (FileStoreTable) catalog.getTable(Identifier.fromString(tableId.toString()));
56+
} catch (Catalog.TableNotExistException e) {
57+
throw new RuntimeException(e);
58+
}
59+
eventSerializer = new PaimonRecordEventSerializer(zoneId);
60+
eventSerializer.serialize(new CreateTableEvent(tableId, schema));
61+
channelComputer = new RowAssignerChannelComputer(table.schema(), parallelism);
62+
channelComputer.setup(parallelism);
63+
}
64+
65+
@Override
66+
public int hashcode(DataChangeEvent event) {
67+
GenericRow genericRow = eventSerializer.serialize(event).getGenericRow();
68+
return channelComputer.channel(genericRow);
69+
}
70+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.paimon.sink;
19+
20+
import org.apache.flink.cdc.common.event.DataChangeEvent;
21+
import org.apache.flink.cdc.common.event.TableId;
22+
import org.apache.flink.cdc.common.function.HashFunction;
23+
import org.apache.flink.cdc.common.function.HashFunctionProvider;
24+
import org.apache.flink.cdc.common.schema.Schema;
25+
26+
import org.apache.paimon.options.Options;
27+
28+
import javax.annotation.Nullable;
29+
30+
import java.time.ZoneId;
31+
32+
/** A {@link HashFunctionProvider} implementation for {@link PaimonDataSink}. */
33+
public class PaimonHashFunctionProvider implements HashFunctionProvider<DataChangeEvent> {
34+
35+
private final Options options;
36+
37+
private final ZoneId zoneId;
38+
39+
private final int parallelism;
40+
41+
public PaimonHashFunctionProvider(Options options, ZoneId zoneId, int parallelism) {
42+
this.options = options;
43+
this.zoneId = zoneId;
44+
this.parallelism = parallelism;
45+
}
46+
47+
@Override
48+
public HashFunction<DataChangeEvent> getHashFunction(@Nullable TableId tableId, Schema schema) {
49+
return new PaimonHashFunction(options, tableId, schema, zoneId, parallelism);
50+
}
51+
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java

+17
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class PaimonEvent {
3131

3232
// if true, means that table schema has changed right before this genericRow.
3333
boolean shouldRefreshSchema;
34+
int bucket;
3435

3536
public PaimonEvent(Identifier tableId, GenericRow genericRow) {
3637
this.tableId = tableId;
@@ -44,6 +45,14 @@ public PaimonEvent(Identifier tableId, GenericRow genericRow, boolean shouldRefr
4445
this.shouldRefreshSchema = shouldRefreshSchema;
4546
}
4647

48+
public PaimonEvent(
49+
Identifier tableId, GenericRow genericRow, boolean shouldRefreshSchema, int bucket) {
50+
this.tableId = tableId;
51+
this.genericRow = genericRow;
52+
this.shouldRefreshSchema = shouldRefreshSchema;
53+
this.bucket = bucket;
54+
}
55+
4756
public Identifier getTableId() {
4857
return tableId;
4958
}
@@ -67,4 +76,12 @@ public GenericRow getGenericRow() {
6776
public void setGenericRow(GenericRow genericRow) {
6877
this.genericRow = genericRow;
6978
}
79+
80+
public int getBucket() {
81+
return bucket;
82+
}
83+
84+
public void setBucket(int bucket) {
85+
this.bucket = bucket;
86+
}
7087
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.paimon.sink.v2;
19+
20+
import org.apache.flink.cdc.common.event.Event;
21+
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
22+
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventKeySelector;
23+
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventPartitioner;
24+
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventTypeInfo;
25+
import org.apache.flink.core.io.SimpleVersionedSerializer;
26+
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
27+
import org.apache.flink.streaming.api.datastream.DataStream;
28+
29+
import org.apache.paimon.flink.sink.MultiTableCommittable;
30+
import org.apache.paimon.flink.sink.MultiTableCommittableSerializer;
31+
import org.apache.paimon.options.Options;
32+
import org.apache.paimon.table.sink.CommitMessageSerializer;
33+
34+
/** A {@link PaimonSink} to process {@link Event}. */
35+
public class PaimonEventSink extends PaimonSink<Event> implements WithPreWriteTopology<Event> {
36+
37+
public PaimonEventSink(
38+
Options catalogOptions, String commitUser, PaimonRecordSerializer<Event> serializer) {
39+
super(catalogOptions, commitUser, serializer);
40+
}
41+
42+
@Override
43+
public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) {
44+
// Shuffle by key hash => Assign bucket => Shuffle by bucket.
45+
return dataStream
46+
.transform(
47+
"BucketAssign",
48+
new BucketWrapperEventTypeInfo(),
49+
new BucketAssignOperator(catalogOptions))
50+
.name("Assign Bucket")
51+
.partitionCustom(
52+
new BucketWrapperEventPartitioner(), new BucketWrapperEventKeySelector());
53+
}
54+
55+
@Override
56+
public SimpleVersionedSerializer<MultiTableCommittable> getCommittableSerializer() {
57+
CommitMessageSerializer fileSerializer = new CommitMessageSerializer();
58+
return new MultiTableCommittableSerializer(fileSerializer);
59+
}
60+
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2525
import org.apache.flink.cdc.common.event.TableId;
2626
import org.apache.flink.cdc.common.utils.SchemaUtils;
27+
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperChangeEvent;
2728

2829
import org.apache.paimon.catalog.Identifier;
2930
import org.apache.paimon.data.GenericRow;
@@ -51,10 +52,12 @@ public PaimonRecordEventSerializer(ZoneId zoneId) {
5152

5253
@Override
5354
public PaimonEvent serialize(Event event) {
54-
Identifier tableId =
55-
Identifier.create(
56-
((ChangeEvent) event).tableId().getSchemaName(),
57-
((ChangeEvent) event).tableId().getTableName());
55+
int bucket = 0;
56+
if (event instanceof BucketWrapperChangeEvent) {
57+
bucket = ((BucketWrapperChangeEvent) event).getBucket();
58+
event = ((BucketWrapperChangeEvent) event).getInnerEvent();
59+
}
60+
Identifier tableId = Identifier.fromString(((ChangeEvent) event).tableId().toString());
5861
if (event instanceof SchemaChangeEvent) {
5962
if (event instanceof CreateTableEvent) {
6063
CreateTableEvent createTableEvent = (CreateTableEvent) event;
@@ -78,7 +81,7 @@ public PaimonEvent serialize(Event event) {
7881
PaimonWriterHelper.convertEventToGenericRow(
7982
dataChangeEvent,
8083
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
81-
return new PaimonEvent(tableId, genericRow);
84+
return new PaimonEvent(tableId, genericRow, false, bucket);
8285
} else {
8386
throw new IllegalArgumentException(
8487
"failed to convert Input into PaimonEvent, unsupported event: " + event);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ public class PaimonSink<InputT> implements WithPreCommitTopology<InputT, MultiTa
4040
// provided a default commit user.
4141
public static final String DEFAULT_COMMIT_USER = "admin";
4242

43-
private final Options catalogOptions;
43+
protected final Options catalogOptions;
4444

45-
private final String commitUser;
45+
protected final String commitUser;
4646

4747
private final PaimonRecordSerializer<InputT> serializer;
4848

0 commit comments

Comments
 (0)