Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve code and namings #187

Merged
merged 4 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package io.debezium.server.bigquery;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
Expand All @@ -18,6 +20,14 @@
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
Expand All @@ -28,17 +38,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of the consumer that delivers the messages into Amazon S3 destination.
*
Expand Down Expand Up @@ -96,11 +95,11 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
LOGGER.trace("Received {} events", records.size());

Instant start = Instant.now();
Map<String, List<DebeziumBigqueryEvent>> events = records.stream()
Map<String, List<RecordConverter>> events = records.stream()
.map((ChangeEvent<Object, Object> e)
-> {
try {
return new DebeziumBigqueryEvent(e.destination(),
return new RecordConverter(e.destination(),
valDeserializer.deserialize(e.destination(), getBytes(e.value())),
e.key() == null ? null : keyDeserializer.deserialize(e.destination(), getBytes(e.key())),
mapper.readTree(getBytes(e.value())).get("schema"),
Expand All @@ -110,20 +109,20 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
throw new DebeziumException(ex);
}
})
.collect(Collectors.groupingBy(DebeziumBigqueryEvent::destination));
.collect(Collectors.groupingBy(RecordConverter::destination));

long numUploadedEvents = 0;
for (Map.Entry<String, List<DebeziumBigqueryEvent>> destinationEvents : events.entrySet()) {
for (Map.Entry<String, List<RecordConverter>> destinationEvents : events.entrySet()) {
// group list of events by their schema, if in the batch we have schema change events grouped by their schema
// so with this uniform schema is guaranteed for each batch
Map<JsonNode, List<DebeziumBigqueryEvent>> eventsGroupedBySchema =
Map<JsonNode, List<RecordConverter>> eventsGroupedBySchema =
destinationEvents.getValue().stream()
.collect(Collectors.groupingBy(DebeziumBigqueryEvent::valueSchema));
.collect(Collectors.groupingBy(RecordConverter::valueSchema));
LOGGER.debug("Destination {} got {} records with {} different schema!!", destinationEvents.getKey(),
destinationEvents.getValue().size(),
eventsGroupedBySchema.keySet().size());

for (List<DebeziumBigqueryEvent> schemaEvents : eventsGroupedBySchema.values()) {
for (List<RecordConverter> schemaEvents : eventsGroupedBySchema.values()) {
numUploadedEvents += this.uploadDestination(destinationEvents.getKey(), schemaEvents);
}
}
Expand All @@ -150,7 +149,7 @@ protected void logConsumerProgress(long numUploadedEvents) {
logTimer = Threads.timer(clock, LOG_INTERVAL);
}
}
public abstract long uploadDestination(String destination, List<DebeziumBigqueryEvent> data);

public abstract long uploadDestination(String destination, List<RecordConverter> data);

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@

package io.debezium.server.bigquery;

import com.google.cloud.bigquery.*;
import io.debezium.DebeziumException;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -21,13 +27,6 @@
import java.util.List;
import java.util.Optional;

import com.google.cloud.bigquery.*;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.eclipse.microprofile.config.inject.ConfigProperty;

/**
* Implementation of the consumer that delivers the messages into Amazon S3 destination.
*
Expand Down Expand Up @@ -92,20 +91,20 @@ public void initizalize() throws InterruptedException {
}

@Override
public long uploadDestination(String destination, List<DebeziumBigqueryEvent> data) {
public long uploadDestination(String destination, List<RecordConverter> data) {

try {
Instant start = Instant.now();
final long numRecords;
TableId tableId = getTableId(destination);

DebeziumBigqueryEvent sampleEvent = data.get(0);
Schema schema = sampleEvent.getBigQuerySchema(false, false);
RecordConverter sampleEvent = data.get(0);
Schema schema = sampleEvent.tableSchema(false);
if (schema == null) {
schema = bqClient.getTable(tableId).getDefinition().getSchema();
}

Clustering clustering = sampleEvent.getBigQueryClustering(clusteringField);
Clustering clustering = sampleEvent.tableClustering(clusteringField);

// Google BigQuery Configuration for a load operation. A load configuration can be used to load data
// into a table with a {@link com.google.cloud.WriteChannel}
Expand All @@ -123,7 +122,7 @@ public long uploadDestination(String destination, List<DebeziumBigqueryEvent> da
try (TableDataWriteChannel writer = bqClient.writer(wCCBuilder.build())) {
//Constructs a stream that writes bytes to the given channel.
try (OutputStream stream = Channels.newOutputStream(writer)) {
for (DebeziumBigqueryEvent e : data) {
for (RecordConverter e : data) {

final String val = e.valueAsJsonLine(schema);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@

package io.debezium.server.bigquery;

import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -25,22 +18,30 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* @author Ismail Simsek
*/
public class DebeziumBigqueryEvent {
protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumBigqueryEvent.class);
public static final List<String> TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms");
public static final List<String> BOOLEAN_FIELDS = List.of("__deleted");
public class RecordConverter {
protected static final Logger LOGGER = LoggerFactory.getLogger(RecordConverter.class);
protected static final List<String> TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms");
protected static final List<String> BOOLEAN_FIELDS = List.of("__deleted");
protected static final ObjectMapper mapper = new ObjectMapper();
protected static final String CHANGE_TYPE_PSEUDO_COLUMN = "_CHANGE_TYPE";

protected final String destination;
protected final JsonNode value;
protected final JsonNode key;
protected final JsonNode valueSchema;
protected final JsonNode keySchema;

public DebeziumBigqueryEvent(String destination, JsonNode value, JsonNode key, JsonNode valueSchema, JsonNode keySchema) {
public RecordConverter(String destination, JsonNode value, JsonNode key, JsonNode valueSchema, JsonNode keySchema) {
this.destination = destination;
// @TODO process values. ts_ms values etc...
// TODO add field if exists backward compatible!
Expand All @@ -50,8 +51,7 @@ public DebeziumBigqueryEvent(String destination, JsonNode value, JsonNode key, J
this.keySchema = keySchema;
}

private static ArrayList<Field> getBigQuerySchemaFields(JsonNode schemaNode, Boolean binaryAsString,
boolean isStream) {
private static ArrayList<Field> schemaFields(JsonNode schemaNode, Boolean binaryAsString) {

ArrayList<Field> fields = new ArrayList<>();

Expand All @@ -78,20 +78,20 @@ private static ArrayList<Field> getBigQuerySchemaFields(JsonNode schemaNode, Boo
switch (fieldType) {
case "struct":
// recursive call for nested fields
ArrayList<Field> subFields = getBigQuerySchemaFields(jsonSchemaFieldNode, binaryAsString, isStream);
ArrayList<Field> subFields = schemaFields(jsonSchemaFieldNode, binaryAsString);
fields.add(Field.newBuilder(fieldName, StandardSQLTypeName.STRUCT, FieldList.of(subFields)).build());
break;
default:
// default to String type
fields.add(getPrimitiveField(fieldType, fieldName, fieldSemanticType, binaryAsString, isStream));
fields.add(schemaPrimitiveField(fieldType, fieldName, fieldSemanticType, binaryAsString));
break;
}
}

return fields;
}

private static Field getPrimitiveField(String fieldType, String fieldName, String fieldSemanticType, boolean binaryAsString, boolean isStream) {
private static Field schemaPrimitiveField(String fieldType, String fieldName, String fieldSemanticType, boolean binaryAsString) {
switch (fieldType) {
case "int8":
case "int16":
Expand Down Expand Up @@ -165,7 +165,7 @@ private static Field getPrimitiveField(String fieldType, String fieldName, Strin

}

public ArrayList<String> keyFields() {
private ArrayList<String> keyFields() {

ArrayList<String> keyFields = new ArrayList<>();
for (JsonNode jsonSchemaFieldNode : this.keySchema().get("fields")) {
Expand Down Expand Up @@ -224,17 +224,17 @@ public String valueAsJsonLine(Schema schema) throws JsonProcessingException {
*
* @return
*/
public JSONObject valueAsJSONObject(boolean upsert, boolean upsertKeepDeletes) {
public JSONObject valueAsJsonObject(boolean upsert, boolean upsertKeepDeletes) {
Map<String, Object> jsonMap = mapper.convertValue(value, new TypeReference<>() {
});
// SET UPSERT meta field `_CHANGE_TYPE`
if (upsert) {
// if its deleted row and upsertKeepDeletes = false, deleted records are deleted from target table
if (!upsertKeepDeletes && jsonMap.get("__op").equals("d")) {
jsonMap.put("_CHANGE_TYPE", "DELETE");
jsonMap.put(CHANGE_TYPE_PSEUDO_COLUMN, "DELETE");
} else {
// if it's not deleted row or upsertKeepDeletes = true then add deleted record to target table
jsonMap.put("_CHANGE_TYPE", "UPSERT");
jsonMap.put(CHANGE_TYPE_PSEUDO_COLUMN, "UPSERT");
}
}

Expand Down Expand Up @@ -267,14 +267,14 @@ public JsonNode keySchema() {
}


public TableConstraints getBigQueryTableConstraints() {
public TableConstraints tableConstraints() {
return
TableConstraints.newBuilder()
.setPrimaryKey(PrimaryKey.newBuilder().setColumns(this.keyFields()).build())
.build();
}
public Clustering getBigQueryClustering(String clusteringField) {

public Clustering tableClustering(String clusteringField) {
// special destinations like "heartbeat.topics"
if (this.destination().startsWith("__debezium")) {
return Clustering.newBuilder().build();
Expand All @@ -291,8 +291,8 @@ public Clustering getBigQueryClustering(String clusteringField) {
}
}

public Schema getBigQuerySchema(Boolean binaryAsString, boolean isStream) {
ArrayList<Field> fields = getBigQuerySchemaFields(this.valueSchema(), binaryAsString, isStream);
public Schema tableSchema(Boolean binaryAsString) {
ArrayList<Field> fields = schemaFields(this.valueSchema(), binaryAsString);

if (fields.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,6 @@

package io.debezium.server.bigquery;

import io.debezium.DebeziumException;
import io.grpc.Status;
import io.grpc.Status.Code;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.core.ApiFuture;
import com.google.api.gax.core.FixedCredentialsProvider;
Expand All @@ -30,6 +17,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.debezium.DebeziumException;
import io.grpc.Status;
import io.grpc.Status.Code;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
Expand All @@ -38,6 +28,15 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.json.JSONArray;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

/**
* Implementation of the consumer that delivers the messages to Bigquery
*
Expand Down Expand Up @@ -145,7 +144,7 @@ private DataWriter getDataWriter(Table table) {
}

@Override
public long uploadDestination(String destination, List<DebeziumBigqueryEvent> data) {
public long uploadDestination(String destination, List<RecordConverter> data) {
long numRecords = data.size();
Table table = getTable(destination, data.get(0));
// get stream writer create if not yet exists!
Expand All @@ -161,7 +160,7 @@ public long uploadDestination(String destination, List<DebeziumBigqueryEvent> da
}
// add data to JSONArray
JSONArray jsonArr = new JSONArray();
data.forEach(e -> jsonArr.put(e.valueAsJSONObject(upsert && tableHasPrimaryKey, upsertKeepDeletes)));
data.forEach(e -> jsonArr.put(e.valueAsJsonObject(upsert && tableHasPrimaryKey, upsertKeepDeletes)));
writer.appendSync(jsonArr);
} catch (DescriptorValidationException | IOException e) {
throw new DebeziumException("Failed to append data to stream " + writer.streamWriter.getStreamName(), e);
Expand All @@ -171,9 +170,9 @@ public long uploadDestination(String destination, List<DebeziumBigqueryEvent> da
}


protected List<DebeziumBigqueryEvent> deduplicateBatch(List<DebeziumBigqueryEvent> events) {
protected List<RecordConverter> deduplicateBatch(List<RecordConverter> events) {

ConcurrentHashMap<JsonNode, DebeziumBigqueryEvent> deduplicatedEvents = new ConcurrentHashMap<>();
ConcurrentHashMap<JsonNode, RecordConverter> deduplicatedEvents = new ConcurrentHashMap<>();

events.forEach(e ->
// deduplicate using key(PK)
Expand Down Expand Up @@ -245,21 +244,21 @@ private Table createTable(TableId tableId, Schema schema, Clustering clustering,
return table;
}

private Table getTable(String destination, DebeziumBigqueryEvent sampleBqEvent) {
private Table getTable(String destination, RecordConverter sampleBqEvent) {
TableId tableId = getTableId(destination);
Table table = bqClient.getTable(tableId);
// create table if missing
if (createIfNeeded && table == null) {
table = this.createTable(tableId,
sampleBqEvent.getBigQuerySchema(true, true),
sampleBqEvent.getBigQueryClustering(clusteringField),
sampleBqEvent.getBigQueryTableConstraints()
sampleBqEvent.tableSchema(true),
sampleBqEvent.tableClustering(clusteringField),
sampleBqEvent.tableConstraints()
);
}

// alter table schema add new fields
if (allowFieldAddition && table != null) {
table = this.updateTableSchema(table, sampleBqEvent.getBigQuerySchema(true, true), destination);
table = this.updateTableSchema(table, sampleBqEvent.tableSchema(true), destination);
}
return table;
}
Expand Down
Loading