Skip to content

Commit

Permalink
[BUG][Connector-V2][Mongo-cdc] Incremental data kind error in snapsho…
Browse files Browse the repository at this point in the history
…t phase (apache#5184)

* [BUG][Connector-V2][Mongo-cdc] Incremental data kind error in snapshot phase
  • Loading branch information
Carl-Zhou-CN authored Aug 15, 2023
1 parent 8d6b07e commit ead1c5f
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ENCODE_VALUE_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;

public class MongoDBConnectorDeserializationSchema
Expand Down Expand Up @@ -154,17 +155,6 @@ private SeaTunnelRow extractRowData(BsonDocument document) {
return (SeaTunnelRow) physicalConverter.convert(document);
}

private BsonDocument extractBsonDocument(
Struct value, @Nonnull Schema valueSchema, String fieldName) {
if (valueSchema.field(fieldName) != null) {
String docString = value.getString(fieldName);
if (docString != null) {
return BsonDocument.parse(docString);
}
}
return null;
}

// -------------------------------------------------------------------------------------
// Runtime Converters
// -------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.BsonType;
import org.bson.BsonValue;

Expand All @@ -50,12 +53,21 @@
import java.util.stream.Collectors;

import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE_INSERT;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_TRUE;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SOURCE_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.TS_MS_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.BsonUtils.compareBsonValue;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.buildSourceRecord;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getDocumentKey;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getResumeToken;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.createMongoClient;
Expand Down Expand Up @@ -172,9 +184,27 @@ public void rewriteOutputBuffer(

switch (OperationType.fromString(operationType)) {
case INSERT:
outputBuffer.put(key, changeRecord);
break;
case UPDATE:
case REPLACE:
outputBuffer.put(key, changeRecord);
Schema valueSchema = changeRecord.valueSchema();
BsonDocument fullDocument =
extractBsonDocument(value, valueSchema, FULL_DOCUMENT);
if (fullDocument == null) {
break;
}
BsonDocument valueDocument = normalizeSnapshotDocument(fullDocument, value);
SourceRecord record =
buildSourceRecord(
changeRecord.sourcePartition(),
changeRecord.sourceOffset(),
changeRecord.topic(),
changeRecord.kafkaPartition(),
changeRecord.keySchema(),
changeRecord.key(),
valueDocument);
outputBuffer.put(key, record);
break;
case DELETE:
outputBuffer.remove(key);
Expand Down Expand Up @@ -202,6 +232,30 @@ record -> {
.collect(Collectors.toList());
}

private BsonDocument normalizeSnapshotDocument(
@Nonnull final BsonDocument fullDocument, Struct value) {
return new BsonDocument()
.append(ID_FIELD, new BsonString(value.getString(DOCUMENT_KEY)))
.append(OPERATION_TYPE, new BsonString(OPERATION_TYPE_INSERT))
.append(
NS_FIELD,
new BsonDocument(
DB_FIELD,
new BsonString(
value.getStruct(NS_FIELD).getString(DB_FIELD)))
.append(
COLL_FIELD,
new BsonString(
value.getStruct(NS_FIELD).getString(COLL_FIELD))))
.append(DOCUMENT_KEY, new BsonString(value.getString(DOCUMENT_KEY)))
.append(FULL_DOCUMENT, fullDocument)
.append(TS_MS_FIELD, new BsonInt64(value.getInt64(TS_MS_FIELD)))
.append(
SOURCE_FIELD,
new BsonDocument(SNAPSHOT_FIELD, new BsonString(SNAPSHOT_TRUE))
.append(TS_MS_FIELD, new BsonInt64(0L)));
}

@Override
public void close() {
Runtime.getRuntime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -66,7 +67,18 @@ public static BsonDocument getResumeToken(SourceRecord sourceRecord) {

public static BsonDocument getDocumentKey(@Nonnull SourceRecord sourceRecord) {
Struct value = (Struct) sourceRecord.value();
return BsonDocument.parse(value.getString(DOCUMENT_KEY));
return extractBsonDocument(value, sourceRecord.valueSchema(), DOCUMENT_KEY);
}

public static BsonDocument extractBsonDocument(
Struct value, @Nonnull Schema valueSchema, String fieldName) {
if (valueSchema.field(fieldName) != null) {
String docString = value.getString(fieldName);
if (docString != null) {
return BsonDocument.parse(docString);
}
}
return null;
}

public static String getOffsetValue(@Nonnull SourceRecord sourceRecord, String key) {
Expand Down Expand Up @@ -139,6 +151,30 @@ public static String getOffsetValue(@Nonnull SourceRecord sourceRecord, String k
valueSchemaAndValue.value());
}

public static @Nonnull SourceRecord buildSourceRecord(
Map<String, ?> sourcePartition,
Map<String, ?> sourceOffset,
String topicName,
Integer partition,
Schema keySchema,
Object key,
BsonDocument valueDocument) {
BsonValueToSchemaAndValue schemaAndValue =
new BsonValueToSchemaAndValue(new DefaultJson().getJsonWriterSettings());
SchemaAndValue valueSchemaAndValue =
schemaAndValue.toSchemaAndValue(fromJson(OUTPUT_SCHEMA), valueDocument);

return new SourceRecord(
sourcePartition,
sourceOffset,
topicName,
partition,
keySchema,
key,
valueSchemaAndValue.schema(),
valueSchemaAndValue.value());
}

public static @Nonnull Map<String, String> createSourceOffsetMap(
@Nonnull BsonDocument idDocument, boolean isSnapshotRecord) {
Map<String, String> sourceOffset = new HashMap<>();
Expand Down

0 comments on commit ead1c5f

Please sign in to comment.