Skip to content

Commit

Permalink
Separate stream and batch event conversion part2 (#205)
Browse files Browse the repository at this point in the history
* Separate stream and batch event conversion part2

* Separate stream and batch event conversion part2

* Separate stream and batch event conversion part2

* Separate stream and batch event conversion part2

* Separate stream and batch event conversion part2
  • Loading branch information
ismailsimsek authored Nov 14, 2024
1 parent 72f465c commit 168ee8f
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public abstract class BaseChangeConsumer extends io.debezium.server.BaseChangeCo
Instance<BatchSizeWait> batchSizeWaitInstances;
BatchSizeWait batchSizeWait;

public void initizalize() throws InterruptedException {
public void initialize() throws InterruptedException {
// configure and set
valSerde.configure(Collections.emptyMap(), false);
valDeserializer = valSerde.deserializer();
Expand All @@ -94,7 +94,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
LOGGER.trace("Received {} events", records.size());

Instant start = Instant.now();
Map<String, List<BaseRecordConverter>> events = records.stream()
Map<String, List<RecordConverter>> events = records.stream()
.map((ChangeEvent<Object, Object> e)
-> {
try {
Expand All @@ -103,20 +103,20 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
throw new DebeziumException(ex);
}
})
.collect(Collectors.groupingBy(BaseRecordConverter::destination));
.collect(Collectors.groupingBy(RecordConverter::destination));

long numUploadedEvents = 0;
for (Map.Entry<String, List<BaseRecordConverter>> destinationEvents : events.entrySet()) {
for (Map.Entry<String, List<RecordConverter>> destinationEvents : events.entrySet()) {
// group list of events by their schema, if in the batch we have schema change events grouped by their schema
// so with this uniform schema is guaranteed for each batch
Map<JsonNode, List<BaseRecordConverter>> eventsGroupedBySchema =
Map<JsonNode, List<RecordConverter>> eventsGroupedBySchema =
destinationEvents.getValue().stream()
.collect(Collectors.groupingBy(BaseRecordConverter::valueSchema));
.collect(Collectors.groupingBy(RecordConverter::valueSchema));
LOGGER.debug("Destination {} got {} records with {} different schema!!", destinationEvents.getKey(),
destinationEvents.getValue().size(),
eventsGroupedBySchema.keySet().size());

for (List<BaseRecordConverter> schemaEvents : eventsGroupedBySchema.values()) {
for (List<RecordConverter> schemaEvents : eventsGroupedBySchema.values()) {
numUploadedEvents += this.uploadDestination(destinationEvents.getKey(), schemaEvents);
}
}
Expand Down Expand Up @@ -144,7 +144,7 @@ protected void logConsumerProgress(long numUploadedEvents) {
}
}

public abstract long uploadDestination(String destination, List<BaseRecordConverter> data);
public abstract long uploadDestination(String destination, List<RecordConverter> data);

public abstract BaseRecordConverter eventAsRecordConverter(ChangeEvent<Object, Object> e) throws IOException;
public abstract RecordConverter eventAsRecordConverter(ChangeEvent<Object, Object> e) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,14 @@

package io.debezium.server.bigquery;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.bigquery.*;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -185,91 +178,6 @@ public JsonNode value() {
return value;
}

/**
* Used by `bigquerybatch` {@link BatchBigqueryChangeConsumer} consumer.
*
* @param schema Bigquery table schema
* @return returns Debezium event as a single line json string
* @throws JsonProcessingException
*/
@Override
public String valueAsJsonLine(Schema schema) throws JsonProcessingException {

if (value == null) {
return null;
}

// process JSON fields
if (schema != null) {
for (Field f : schema.getFields()) {
if (f.getType() == LegacySQLTypeName.JSON && value.has(f.getName())) {
((ObjectNode) value).replace(f.getName(), mapper.readTree(value.get(f.getName()).asText("{}")));
}
// process DATE values
if (f.getType() == LegacySQLTypeName.DATE && value.has(f.getName()) && !value.get(f.getName()).isNull()) {
((ObjectNode) value).put(f.getName(), LocalDate.ofEpochDay(value.get(f.getName()).longValue()).toString());
}
}
}

// Process DEBEZIUM TS_MS values
TS_MS_FIELDS.forEach(tsf -> {
if (value.has(tsf)) {
((ObjectNode) value).put(tsf, Instant.ofEpochMilli(value.get(tsf).longValue()).toString());
}
});

// Process DEBEZIUM BOOLEAN values
BOOLEAN_FIELDS.forEach(bf -> {
if (value.has(bf)) {
((ObjectNode) value).put(bf, Boolean.valueOf(value.get(bf).asText()));
}
});

return mapper.writeValueAsString(value);
}

/**
* Used by `bigquerystream` {@link StreamBigqueryChangeConsumer} consumer.
* See https://cloud.google.com/bigquery/docs/write-api#data_type_conversions
*
* @param upsert when set to true it adds change type column `_CHANGE_TYPE`. Otherwise, all events are considered as insert/append
* @param upsertKeepDeletes when set to true it retains last deleted data row
* @return returns Debezium events as {@link JSONObject}
*/
@Override
public JSONObject valueAsJsonObject(boolean upsert, boolean upsertKeepDeletes) {
Map<String, Object> jsonMap = mapper.convertValue(value, new TypeReference<>() {
});
// SET UPSERT meta field `_CHANGE_TYPE`! this additional field allows us to do deletes, updates in bigquery
if (upsert) {
// if its deleted row and upsertKeepDeletes = false, deleted records are deleted from target table
if (!upsertKeepDeletes && jsonMap.get("__op").equals("d")) {
jsonMap.put(CHANGE_TYPE_PSEUDO_COLUMN, "DELETE");
} else {
// if it's not deleted row or upsertKeepDeletes = true then add deleted record to target table
jsonMap.put(CHANGE_TYPE_PSEUDO_COLUMN, "UPSERT");
}
}

// fix the TS_MS fields.
TS_MS_FIELDS.forEach(tsf -> {
if (jsonMap.containsKey(tsf)) {
// Convert millisecond to microseconds
jsonMap.replace(tsf, ((Long) jsonMap.get(tsf) * 1000L));
}
});

// Fix boolean fields, create this fields as BOOLEAN instead of STRING in BigQuery
BOOLEAN_FIELDS.forEach(bf -> {
if (jsonMap.containsKey(bf)) {
jsonMap.replace(bf, Boolean.valueOf((String) jsonMap.get(bf)));
}
});

return new JSONObject(jsonMap);
}

@Override
public JsonNode key() {
return key;
Expand All @@ -289,9 +197,9 @@ public JsonNode keySchema() {
@Override
public TableConstraints tableConstraints() {
return
TableConstraints.newBuilder()
.setPrimaryKey(PrimaryKey.newBuilder().setColumns(this.keyFields()).build())
.build();
TableConstraints.newBuilder()
.setPrimaryKey(PrimaryKey.newBuilder().setColumns(this.keyFields()).build())
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ public class BatchBigqueryChangeConsumer<T> extends BaseChangeConsumer {

@PostConstruct
void connect() throws InterruptedException {
this.initizalize();
this.initialize();
}

public void initizalize() throws InterruptedException {
super.initizalize();
public void initialize() throws InterruptedException {
super.initialize();
bqClient = ConsumerUtil.bigqueryClient(isBigqueryDevEmulator, gcpProject, bqDataset, credentialsFile, bqLocation, bigQueryCustomHost);
timePartitioning =
TimePartitioning.newBuilder(TimePartitioning.Type.valueOf(partitionType)).setField(partitionField).build();
Expand All @@ -96,14 +96,14 @@ public void initizalize() throws InterruptedException {
}

@Override
public long uploadDestination(String destination, List<BaseRecordConverter> data) {
public long uploadDestination(String destination, List<RecordConverter> data) {

try {
Instant start = Instant.now();
final long numRecords;
TableId tableId = getTableId(destination);

BaseRecordConverter sampleEvent = data.get(0);
RecordConverter sampleEvent = data.get(0);
Schema schema = sampleEvent.tableSchema(false);
if (schema == null) {
schema = bqClient.getTable(tableId).getDefinition().getSchema();
Expand All @@ -127,9 +127,9 @@ public long uploadDestination(String destination, List<BaseRecordConverter> data
try (TableDataWriteChannel writer = bqClient.writer(wCCBuilder.build())) {
//Constructs a stream that writes bytes to the given channel.
try (OutputStream stream = Channels.newOutputStream(writer)) {
for (BaseRecordConverter e : data) {
for (RecordConverter e : data) {

final String val = e.valueAsJsonLine(schema);
final String val = e.convert(schema);

if (val == null) {
LOGGER.warn("Null Value received skipping the entry! destination:{} key:{}", destination, getString(e.key()));
Expand Down Expand Up @@ -186,7 +186,7 @@ TableId getTableId(String destination) {
return TableId.of(gcpProject.get(), bqDataset.get(), tableName);
}

public BaseRecordConverter eventAsRecordConverter(ChangeEvent<Object, Object> e) throws IOException {
public RecordConverter eventAsRecordConverter(ChangeEvent<Object, Object> e) throws IOException {
return new BatchRecordConverter(e.destination(),
valDeserializer.deserialize(e.destination(), getBytes(e.value())),
e.key() == null ? null : keyDeserializer.deserialize(e.destination(), getBytes(e.key())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,16 @@

package io.debezium.server.bigquery;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import io.debezium.DebeziumException;

import java.time.Instant;
import java.time.LocalDate;

/**
* @author Ismail Simsek
Expand All @@ -18,4 +27,53 @@ public class BatchRecordConverter extends BaseRecordConverter {
public BatchRecordConverter(String destination, JsonNode value, JsonNode key, JsonNode valueSchema, JsonNode keySchema) {
super(destination, value, key, valueSchema, keySchema);
}

/**
* Used by `bigquerybatch` {@link BatchBigqueryChangeConsumer} consumer.
*
* @param schema Bigquery table schema
* @return returns Debezium event as a single line json string
* @throws JsonProcessingException
*/
@Override
public String convert(Schema schema, boolean upsert, boolean upsertKeepDeletes) throws DebeziumException {

if (value == null) {
return null;
}

try {
// process JSON fields
if (schema != null) {
for (Field f : schema.getFields()) {
if (f.getType() == LegacySQLTypeName.JSON && value.has(f.getName())) {
((ObjectNode) value).replace(f.getName(), mapper.readTree(value.get(f.getName()).asText("{}")));
}
// process DATE values
if (f.getType() == LegacySQLTypeName.DATE && value.has(f.getName()) && !value.get(f.getName()).isNull()) {
((ObjectNode) value).put(f.getName(), LocalDate.ofEpochDay(value.get(f.getName()).longValue()).toString());
}
}
}

// Process DEBEZIUM TS_MS values
TS_MS_FIELDS.forEach(tsf -> {
if (value.has(tsf)) {
((ObjectNode) value).put(tsf, Instant.ofEpochMilli(value.get(tsf).longValue()).toString());
}
});

// Process DEBEZIUM BOOLEAN values
BOOLEAN_FIELDS.forEach(bf -> {
if (value.has(bf)) {
((ObjectNode) value).put(bf, Boolean.valueOf(value.get(bf).asText()));
}
});

return mapper.writeValueAsString(value);
} catch (JsonProcessingException e) {
throw new DebeziumException(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package io.debezium.server.bigquery;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.Clustering;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableConstraints;
import org.json.JSONObject;
import io.debezium.DebeziumException;

public interface RecordConverter {
String destination();

JsonNode value();

String valueAsJsonLine(Schema schema) throws JsonProcessingException;
default <T> T convert(Schema schema) throws DebeziumException {
return convert(schema, false, false);
}

JSONObject valueAsJsonObject(boolean upsert, boolean upsertKeepDeletes);
<T> T convert(Schema schema, boolean upsert, boolean upsertKeepDeletes) throws DebeziumException;

JsonNode key();

Expand Down
Loading

0 comments on commit 168ee8f

Please sign in to comment.