From 168ee8fc98040cc7ed4b8b177d99fba2b3d57a5d Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 14 Nov 2024 12:20:29 +0100 Subject: [PATCH] Separate stream and batch event conversion part2 (#205) * Separate stream and batch event conversion part2 * Separate stream and batch event conversion part2 * Separate stream and batch event conversion part2 * Separate stream and batch event conversion part2 * Separate stream and batch event conversion part2 --- .../server/bigquery/BaseChangeConsumer.java | 18 ++-- .../server/bigquery/BaseRecordConverter.java | 98 +------------------ .../bigquery/BatchBigqueryChangeConsumer.java | 16 +-- .../server/bigquery/BatchRecordConverter.java | 58 +++++++++++ .../server/bigquery/RecordConverter.java | 9 +- .../StreamBigqueryChangeConsumer.java | 37 ++++--- .../bigquery/StreamRecordConverter.java | 47 +++++++++ ...BigqueryChangeConsumerMysqlUpsertTest.java | 16 +-- 8 files changed, 164 insertions(+), 135 deletions(-) diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java index 849af5f..3ea260e 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java @@ -67,7 +67,7 @@ public abstract class BaseChangeConsumer extends io.debezium.server.BaseChangeCo Instance batchSizeWaitInstances; BatchSizeWait batchSizeWait; - public void initizalize() throws InterruptedException { + public void initialize() throws InterruptedException { // configure and set valSerde.configure(Collections.emptyMap(), false); valDeserializer = valSerde.deserializer(); @@ -94,7 +94,7 @@ public void handleBatch(List> records, DebeziumEngin LOGGER.trace("Received {} events", records.size()); Instant start = Instant.now(); - Map> events = records.stream() + Map> events = records.stream() .map((ChangeEvent e) -> { try { @@ -103,20 +103,20 @@ public void handleBatch(List> records, DebeziumEngin throw new DebeziumException(ex); } }) - .collect(Collectors.groupingBy(BaseRecordConverter::destination)); + .collect(Collectors.groupingBy(RecordConverter::destination)); long numUploadedEvents = 0; - for (Map.Entry> destinationEvents : events.entrySet()) { + for (Map.Entry> destinationEvents : events.entrySet()) { // group list of events by their schema, if in the batch we have schema change events grouped by their schema // so with this uniform schema is guaranteed for each batch - Map> eventsGroupedBySchema = + Map> eventsGroupedBySchema = destinationEvents.getValue().stream() - .collect(Collectors.groupingBy(BaseRecordConverter::valueSchema)); + .collect(Collectors.groupingBy(RecordConverter::valueSchema)); LOGGER.debug("Destination {} got {} records with {} different schema!!", destinationEvents.getKey(), destinationEvents.getValue().size(), eventsGroupedBySchema.keySet().size()); - for (List schemaEvents : eventsGroupedBySchema.values()) { + for (List schemaEvents : eventsGroupedBySchema.values()) { numUploadedEvents += this.uploadDestination(destinationEvents.getKey(), schemaEvents); } } @@ -144,7 +144,7 @@ protected void logConsumerProgress(long numUploadedEvents) { } } - public abstract long uploadDestination(String destination, List data); + public abstract long uploadDestination(String destination, List data); - public abstract BaseRecordConverter eventAsRecordConverter(ChangeEvent e) throws IOException; + public abstract RecordConverter eventAsRecordConverter(ChangeEvent e) throws IOException; } diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java index b189af9..966068d 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java @@ -8,21 +8,14 @@ package io.debezium.server.bigquery; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.cloud.bigquery.*; -import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Instant; -import java.time.LocalDate; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; /** @@ -185,91 +178,6 @@ public JsonNode value() { return value; } - /** - * Used by `bigquerybatch` {@link BatchBigqueryChangeConsumer} consumer. - * - * @param schema Bigquery table schema - * @return returns Debezium event as a single line json string - * @throws JsonProcessingException - */ - @Override - public String valueAsJsonLine(Schema schema) throws JsonProcessingException { - - if (value == null) { - return null; - } - - // process JSON fields - if (schema != null) { - for (Field f : schema.getFields()) { - if (f.getType() == LegacySQLTypeName.JSON && value.has(f.getName())) { - ((ObjectNode) value).replace(f.getName(), mapper.readTree(value.get(f.getName()).asText("{}"))); - } - // process DATE values - if (f.getType() == LegacySQLTypeName.DATE && value.has(f.getName()) && !value.get(f.getName()).isNull()) { - ((ObjectNode) value).put(f.getName(), LocalDate.ofEpochDay(value.get(f.getName()).longValue()).toString()); - } - } - } - - // Process DEBEZIUM TS_MS values - TS_MS_FIELDS.forEach(tsf -> { - if (value.has(tsf)) { - ((ObjectNode) value).put(tsf, Instant.ofEpochMilli(value.get(tsf).longValue()).toString()); - } - }); - - // Process DEBEZIUM BOOLEAN values - BOOLEAN_FIELDS.forEach(bf -> { - if (value.has(bf)) { - ((ObjectNode) value).put(bf, Boolean.valueOf(value.get(bf).asText())); - } - }); - - return mapper.writeValueAsString(value); - } - - /** - * Used by `bigquerystream` {@link StreamBigqueryChangeConsumer} consumer. - * See https://cloud.google.com/bigquery/docs/write-api#data_type_conversions - * - * @param upsert when set to true it adds change type column `_CHANGE_TYPE`. Otherwise, all events are considered as insert/append - * @param upsertKeepDeletes when set to true it retains last deleted data row - * @return returns Debezium events as {@link JSONObject} - */ - @Override - public JSONObject valueAsJsonObject(boolean upsert, boolean upsertKeepDeletes) { - Map jsonMap = mapper.convertValue(value, new TypeReference<>() { - }); - // SET UPSERT meta field `_CHANGE_TYPE`! this additional field allows us to do deletes, updates in bigquery - if (upsert) { - // if its deleted row and upsertKeepDeletes = false, deleted records are deleted from target table - if (!upsertKeepDeletes && jsonMap.get("__op").equals("d")) { - jsonMap.put(CHANGE_TYPE_PSEUDO_COLUMN, "DELETE"); - } else { - // if it's not deleted row or upsertKeepDeletes = true then add deleted record to target table - jsonMap.put(CHANGE_TYPE_PSEUDO_COLUMN, "UPSERT"); - } - } - - // fix the TS_MS fields. - TS_MS_FIELDS.forEach(tsf -> { - if (jsonMap.containsKey(tsf)) { - // Convert millisecond to microseconds - jsonMap.replace(tsf, ((Long) jsonMap.get(tsf) * 1000L)); - } - }); - - // Fix boolean fields, create this fields as BOOLEAN instead of STRING in BigQuery - BOOLEAN_FIELDS.forEach(bf -> { - if (jsonMap.containsKey(bf)) { - jsonMap.replace(bf, Boolean.valueOf((String) jsonMap.get(bf))); - } - }); - - return new JSONObject(jsonMap); - } - @Override public JsonNode key() { return key; @@ -289,9 +197,9 @@ public JsonNode keySchema() { @Override public TableConstraints tableConstraints() { return - TableConstraints.newBuilder() - .setPrimaryKey(PrimaryKey.newBuilder().setColumns(this.keyFields()).build()) - .build(); + TableConstraints.newBuilder() + .setPrimaryKey(PrimaryKey.newBuilder().setColumns(this.keyFields()).build()) + .build(); } @Override diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java index 61dda40..b935066 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java @@ -77,11 +77,11 @@ public class BatchBigqueryChangeConsumer extends BaseChangeConsumer { @PostConstruct void connect() throws InterruptedException { - this.initizalize(); + this.initialize(); } - public void initizalize() throws InterruptedException { - super.initizalize(); + public void initialize() throws InterruptedException { + super.initialize(); bqClient = ConsumerUtil.bigqueryClient(isBigqueryDevEmulator, gcpProject, bqDataset, credentialsFile, bqLocation, bigQueryCustomHost); timePartitioning = TimePartitioning.newBuilder(TimePartitioning.Type.valueOf(partitionType)).setField(partitionField).build(); @@ -96,14 +96,14 @@ public void initizalize() throws InterruptedException { } @Override - public long uploadDestination(String destination, List data) { + public long uploadDestination(String destination, List data) { try { Instant start = Instant.now(); final long numRecords; TableId tableId = getTableId(destination); - BaseRecordConverter sampleEvent = data.get(0); + RecordConverter sampleEvent = data.get(0); Schema schema = sampleEvent.tableSchema(false); if (schema == null) { schema = bqClient.getTable(tableId).getDefinition().getSchema(); @@ -127,9 +127,9 @@ public long uploadDestination(String destination, List data try (TableDataWriteChannel writer = bqClient.writer(wCCBuilder.build())) { //Constructs a stream that writes bytes to the given channel. try (OutputStream stream = Channels.newOutputStream(writer)) { - for (BaseRecordConverter e : data) { + for (RecordConverter e : data) { - final String val = e.valueAsJsonLine(schema); + final String val = e.convert(schema); if (val == null) { LOGGER.warn("Null Value received skipping the entry! destination:{} key:{}", destination, getString(e.key())); @@ -186,7 +186,7 @@ TableId getTableId(String destination) { return TableId.of(gcpProject.get(), bqDataset.get(), tableName); } - public BaseRecordConverter eventAsRecordConverter(ChangeEvent e) throws IOException { + public RecordConverter eventAsRecordConverter(ChangeEvent e) throws IOException { return new BatchRecordConverter(e.destination(), valDeserializer.deserialize(e.destination(), getBytes(e.value())), e.key() == null ? null : keyDeserializer.deserialize(e.destination(), getBytes(e.key())), diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchRecordConverter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchRecordConverter.java index 2d00143..064396b 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchRecordConverter.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchRecordConverter.java @@ -8,7 +8,16 @@ package io.debezium.server.bigquery; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.Schema; +import io.debezium.DebeziumException; + +import java.time.Instant; +import java.time.LocalDate; /** * @author Ismail Simsek @@ -18,4 +27,53 @@ public class BatchRecordConverter extends BaseRecordConverter { public BatchRecordConverter(String destination, JsonNode value, JsonNode key, JsonNode valueSchema, JsonNode keySchema) { super(destination, value, key, valueSchema, keySchema); } + + /** + * Used by `bigquerybatch` {@link BatchBigqueryChangeConsumer} consumer. + * + * @param schema Bigquery table schema + * @return returns Debezium event as a single line json string + * @throws JsonProcessingException + */ + @Override + public String convert(Schema schema, boolean upsert, boolean upsertKeepDeletes) throws DebeziumException { + + if (value == null) { + return null; + } + + try { + // process JSON fields + if (schema != null) { + for (Field f : schema.getFields()) { + if (f.getType() == LegacySQLTypeName.JSON && value.has(f.getName())) { + ((ObjectNode) value).replace(f.getName(), mapper.readTree(value.get(f.getName()).asText("{}"))); + } + // process DATE values + if (f.getType() == LegacySQLTypeName.DATE && value.has(f.getName()) && !value.get(f.getName()).isNull()) { + ((ObjectNode) value).put(f.getName(), LocalDate.ofEpochDay(value.get(f.getName()).longValue()).toString()); + } + } + } + + // Process DEBEZIUM TS_MS values + TS_MS_FIELDS.forEach(tsf -> { + if (value.has(tsf)) { + ((ObjectNode) value).put(tsf, Instant.ofEpochMilli(value.get(tsf).longValue()).toString()); + } + }); + + // Process DEBEZIUM BOOLEAN values + BOOLEAN_FIELDS.forEach(bf -> { + if (value.has(bf)) { + ((ObjectNode) value).put(bf, Boolean.valueOf(value.get(bf).asText())); + } + }); + + return mapper.writeValueAsString(value); + } catch (JsonProcessingException e) { + throw new DebeziumException(e); + } + } + } diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java index 9f3f03c..ddc9ba1 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java @@ -1,20 +1,21 @@ package io.debezium.server.bigquery; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.google.cloud.bigquery.Clustering; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.TableConstraints; -import org.json.JSONObject; +import io.debezium.DebeziumException; public interface RecordConverter { String destination(); JsonNode value(); - String valueAsJsonLine(Schema schema) throws JsonProcessingException; + default T convert(Schema schema) throws DebeziumException { + return convert(schema, false, false); + } - JSONObject valueAsJsonObject(boolean upsert, boolean upsertKeepDeletes); + T convert(Schema schema, boolean upsert, boolean upsertKeepDeletes) throws DebeziumException; JsonNode key(); diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java index 08fb7a0..1de97b6 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java @@ -25,6 +25,7 @@ import jakarta.inject.Named; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.json.JSONArray; +import org.json.JSONObject; import java.io.IOException; import java.util.ArrayList; @@ -90,7 +91,7 @@ public class StreamBigqueryChangeConsumer extends BaseChangeConsumer { @PostConstruct void connect() throws InterruptedException { - this.initizalize(); + this.initialize(); } @PreDestroy @@ -105,8 +106,8 @@ void closeStreams() { } } - public void initizalize() throws InterruptedException { - super.initizalize(); + public void initialize() throws InterruptedException { + super.initialize(); bqClient = ConsumerUtil.bigqueryClient(isBigqueryDevEmulator, gcpProject, bqDataset, credentialsFile, bqLocation, bigQueryCustomHost); timePartitioning = @@ -134,8 +135,17 @@ private StreamDataWriter getDataWriter(Table table) { } } + boolean doTableHasPrimaryKey(Table table) { + if (table.getTableConstraints() == null) { + return false; + } + + return table.getTableConstraints().getPrimaryKey() != null; + + } + @Override - public long uploadDestination(String destination, List data) { + public long uploadDestination(String destination, List data) { long numRecords = data.size(); Table table = getTable(destination, data.get(0)); // get stream writer create if not yet exists! @@ -145,13 +155,18 @@ public long uploadDestination(String destination, List data // for the tables without primary key run append mode // Otherwise it throws Exception // INVALID_ARGUMENT:Create UPSERT stream is not supported for primary key disabled table: xyz - final boolean tableHasPrimaryKey = table.getTableConstraints().getPrimaryKey() != null; - if (upsert && tableHasPrimaryKey) { + final boolean tableHasPrimaryKey = doTableHasPrimaryKey(table); + final boolean doUpsert = upsert && tableHasPrimaryKey; + + if (doUpsert) { data = deduplicateBatch(data); } // add data to JSONArray JSONArray jsonArr = new JSONArray(); - data.forEach(e -> jsonArr.put(e.valueAsJsonObject(upsert && tableHasPrimaryKey, upsertKeepDeletes))); + data.forEach(e -> { + JSONObject val = e.convert(table.getDefinition().getSchema(), doUpsert, upsertKeepDeletes); + jsonArr.put(val); + }); writer.appendSync(jsonArr); } catch (DescriptorValidationException | IOException e) { throw new DebeziumException("Failed to append data to stream " + writer.streamWriter.getStreamName(), e); @@ -161,9 +176,9 @@ public long uploadDestination(String destination, List data } - protected List deduplicateBatch(List events) { + protected List deduplicateBatch(List events) { - ConcurrentHashMap deduplicatedEvents = new ConcurrentHashMap<>(); + ConcurrentHashMap deduplicatedEvents = new ConcurrentHashMap<>(); events.forEach(e -> // deduplicate using key(PK) @@ -235,7 +250,7 @@ private Table createTable(TableId tableId, Schema schema, Clustering clustering, return table; } - private Table getTable(String destination, BaseRecordConverter sampleBqEvent) { + private Table getTable(String destination, RecordConverter sampleBqEvent) { TableId tableId = getTableId(destination); Table table = bqClient.getTable(tableId); // create table if missing @@ -297,7 +312,7 @@ private Table updateTableSchema(Table table, Schema updatedSchema, String destin } - public BaseRecordConverter eventAsRecordConverter(ChangeEvent e) throws IOException { + public RecordConverter eventAsRecordConverter(ChangeEvent e) throws IOException { return new StreamRecordConverter(e.destination(), valDeserializer.deserialize(e.destination(), getBytes(e.value())), e.key() == null ? null : keyDeserializer.deserialize(e.destination(), getBytes(e.key())), diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamRecordConverter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamRecordConverter.java index 6af1f24..0bfc3f3 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamRecordConverter.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamRecordConverter.java @@ -8,7 +8,13 @@ package io.debezium.server.bigquery; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; +import com.google.cloud.bigquery.Schema; +import io.debezium.DebeziumException; +import org.json.JSONObject; + +import java.util.Map; /** * @author Ismail Simsek @@ -18,4 +24,45 @@ public class StreamRecordConverter extends BaseRecordConverter { public StreamRecordConverter(String destination, JsonNode value, JsonNode key, JsonNode valueSchema, JsonNode keySchema) { super(destination, value, key, valueSchema, keySchema); } + + /** + * Used by `bigquerystream` {@link StreamBigqueryChangeConsumer} consumer. + * See https://cloud.google.com/bigquery/docs/write-api#data_type_conversions + * + * @param upsert when set to true it adds change type column `_CHANGE_TYPE`. Otherwise, all events are considered as insert/append + * @param upsertKeepDeletes when set to true it retains last deleted data row + * @return returns Debezium events as {@link JSONObject} + */ + @Override + public JSONObject convert(Schema schema, boolean upsert, boolean upsertKeepDeletes) throws DebeziumException { + Map jsonMap = mapper.convertValue(value, new TypeReference<>() { + }); + // SET UPSERT meta field `_CHANGE_TYPE`! this additional field allows us to do deletes, updates in bigquery + if (upsert) { + // if its deleted row and upsertKeepDeletes = false, deleted records are deleted from target table + if (!upsertKeepDeletes && jsonMap.get("__op").equals("d")) { + jsonMap.put(CHANGE_TYPE_PSEUDO_COLUMN, "DELETE"); + } else { + // if it's not deleted row or upsertKeepDeletes = true then add deleted record to target table + jsonMap.put(CHANGE_TYPE_PSEUDO_COLUMN, "UPSERT"); + } + } + + // fix the TS_MS fields. + TS_MS_FIELDS.forEach(tsf -> { + if (jsonMap.containsKey(tsf)) { + // Convert millisecond to microseconds + jsonMap.replace(tsf, ((Long) jsonMap.get(tsf) * 1000L)); + } + }); + + // Fix boolean fields, create this fields as BOOLEAN instead of STRING in BigQuery + BOOLEAN_FIELDS.forEach(bf -> { + if (jsonMap.containsKey(bf)) { + jsonMap.replace(bf, Boolean.valueOf((String) jsonMap.get(bf))); + } + }); + + return new JSONObject(jsonMap); + } } diff --git a/debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumerMysqlUpsertTest.java b/debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumerMysqlUpsertTest.java index eaf8b56..f2cbb47 100644 --- a/debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumerMysqlUpsertTest.java +++ b/debezium-server-bigquery-sinks/src/test/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumerMysqlUpsertTest.java @@ -130,14 +130,14 @@ && getTableData(dest, "__deleted = false").getTotalRows() == 4 @Test public void testDeduplicateBatch() throws Exception { - BaseRecordConverter e1 = new RecordConverterBuilder() + RecordConverter e1 = new RecordConverterBuilder() .destination("destination") .addKeyField("id", 1) .addKeyField("first_name", "row1") .addField("__op", "r") .addField("__source_ts_ms", 3L) .build(); - BaseRecordConverter e2 = new RecordConverterBuilder() + RecordConverter e2 = new RecordConverterBuilder() .destination("destination") .addKeyField("id", 1) .addKeyField("first_name", "row1") @@ -145,26 +145,26 @@ public void testDeduplicateBatch() throws Exception { .addField("__source_ts_ms", 1L) .build(); - List records = List.of(e1, e2); - List dedups = consumer.deduplicateBatch(records); + List records = List.of(e1, e2); + List dedups = consumer.deduplicateBatch(records); Assertions.assertEquals(1, dedups.size()); Assertions.assertEquals(3L, dedups.get(0).value().get("__source_ts_ms").asLong(0L)); - BaseRecordConverter e21 = new RecordConverterBuilder() + RecordConverter e21 = new RecordConverterBuilder() .destination("destination") .addKeyField("id", 1) .addField("__op", "r") .addField("__source_ts_ms", 1L) .build(); - BaseRecordConverter e22 = new RecordConverterBuilder() + RecordConverter e22 = new RecordConverterBuilder() .destination("destination") .addKeyField("id", 1) .addField("__op", "u") .addField("__source_ts_ms", 1L) .build(); - List records2 = List.of(e21, e22); - List dedups2 = consumer.deduplicateBatch(records2); + List records2 = List.of(e21, e22); + List dedups2 = consumer.deduplicateBatch(records2); Assertions.assertEquals(1, dedups2.size()); Assertions.assertEquals("u", dedups2.get(0).value().get("__op").asText("x")); }