From ceb038c56dc72a9d0c0eb3a92cc262add91ec928 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 14 Nov 2024 12:49:47 +0100 Subject: [PATCH 1/3] Separate stream and batch event conversion part3 --- .../server/bigquery/RecordConverter.java | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java index ddc9ba1..dd68e3e 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java @@ -5,6 +5,7 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.TableConstraints; import io.debezium.DebeziumException; +import org.json.JSONObject; public interface RecordConverter { String destination(); @@ -15,6 +16,24 @@ default T convert(Schema schema) throws DebeziumException { return convert(schema, false, false); } + /** + * Converts a Debezium event for BigQuery ingestion. + * + *

This method transforms a Debezium event into a format suitable for writing to BigQuery. + * It performs value conversions based on the specified BigQuery schema to ensure compatibility + * with the target data types. + * + * @param schema The BigQuery schema to use for data type conversions. + * @param upsert Indicates whether to enable upsert operations. When set to `true`, a + * `_CHANGE_TYPE` column is added to track record changes (insert, update, delete). + * @param upsertKeepDeletes When set to `true` in conjunction with `upsert`, the last deleted + * row is retained. + * @param The return type of the converted event. This can be a serialized JSON string + * for batch operations or a {@link JSONObject} for stream operations, depending on the + * specific implementation. + * @return The converted Debezium event, ready for BigQuery ingestion. + * @throws DebeziumException If an error occurs during the conversion process. + */ T convert(Schema schema, boolean upsert, boolean upsertKeepDeletes) throws DebeziumException; JsonNode key(); @@ -23,9 +42,45 @@ default T convert(Schema schema) throws DebeziumException { JsonNode keySchema(); + + /** + * Extracts table constraints from the Debezium event's key schema. + * + *

This method analyzes the provided Debezium event's key schema and generates a + * corresponding BigQuery table constraints configuration. The primary key constraint + * is derived from the fields present in the key schema. This configuration can be + * used to optimize table storage and query performance in BigQuery. + * + * @return The generated BigQuery table {@link TableConstraints}. + */ TableConstraints tableConstraints(); + + /** + * Determines the clustering fields for the BigQuery table. + * + *

This method extracts field names from the Debezium event's key schema and combines them + * with a user-provided clustering field (defaulting to `__source_ts_ms`) to create a suitable + * clustering configuration for the BigQuery table. Clustering the table on these fields + * can significantly improve query performance, especially for time-series and analytical workloads. + * + * @param clusteringField The additional field to use for clustering, beyond the fields + * extracted from the key schema. + * @return The generated BigQuery {@link Clustering} configuration. + */ Clustering tableClustering(String clusteringField); + + /** + * Transforms a Debezium event's value schema into a corresponding BigQuery schema. + * + *

This method analyzes the provided Debezium event's value schema and generates a + * compatible BigQuery schema. It considers factors like field types and + * the `binaryAsString` flag to accurately map the schema. + * + * @param binaryAsString Indicates whether binary fields should be represented as strings + * or binary types in the resulting BigQuery schema. + * @return The generated BigQuery {@link Schema}. + */ Schema tableSchema(Boolean binaryAsString); } From 133986851598925570084acb7223d1a6462a5cb9 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 14 Nov 2024 13:06:47 +0100 Subject: [PATCH 2/3] Separate stream and batch event conversion part3 --- .../server/bigquery/BaseBigqueryStorageConfig.java | 6 +++++- .../io/debezium/server/bigquery/BaseChangeConsumer.java | 9 +++++++-- .../io/debezium/server/bigquery/BaseRecordConverter.java | 6 ++++++ .../server/bigquery/BatchBigqueryChangeConsumer.java | 8 ++++++-- .../java/io/debezium/server/bigquery/ConsumerUtil.java | 1 + .../server/bigquery/StreamBigqueryChangeConsumer.java | 8 ++++++-- .../io/debezium/server/bigquery/StreamDataWriter.java | 5 +++++ 7 files changed, 36 insertions(+), 7 deletions(-) diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseBigqueryStorageConfig.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseBigqueryStorageConfig.java index 5568e4b..64a5523 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseBigqueryStorageConfig.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseBigqueryStorageConfig.java @@ -9,7 +9,11 @@ import java.util.Optional; import java.util.Properties; - +/** + * Abstract base class for BigQuery storage configuration. + *

+ * Provides common methods for accessing Debezium configuration properties + */ public abstract class BaseBigqueryStorageConfig { protected static final Logger LOG = LoggerFactory.getLogger(BaseBigqueryStorageConfig.class); protected Properties configCombined = new Properties(); diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java index 3ea260e..196d9d7 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java @@ -38,8 +38,13 @@ import java.util.stream.Collectors; /** - * Implementation of the consumer that delivers the messages into Amazon S3 destination. - * + * Abstract base class for Debezium change consumers that deliver messages to a BigQuery destination. + *

This class provides a foundation for building Debezium change consumers that handle + * incoming change events and deliver them to a BigQuery destination. It implements the + * `DebeziumEngine.ChangeConsumer` interface, defining the `handleBatch` method for processing + * batches of change events. Concrete implementations of this class need to provide specific logic + * for uploading or persisting the converted data to the BigQuery destination. + * @author Ismail Simsek */ public abstract class BaseChangeConsumer extends io.debezium.server.BaseChangeConsumer implements DebeziumEngine.ChangeConsumer> { diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java index 966068d..86f6a55 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java @@ -19,6 +19,12 @@ import java.util.stream.Collectors; /** + * Abstract base class for Debezium event record conversion to BigQuery format. + *

This class provides the foundation for converting Debezium event records into a format + * suitable for writing to BigQuery tables. It handles common tasks like schema conversion, + * table constraint generation, and clustering configuration. Concrete implementations of this + * class can extend this functionality for specific use cases. + * @author Ismail Simsek */ public abstract class BaseRecordConverter implements RecordConverter { diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java index b935066..e85a03c 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java @@ -29,10 +29,14 @@ import java.util.Optional; /** - * Implementation of the consumer that delivers the messages into Amazon S3 destination. - * + * Implementation of a Debezium change consumer that delivers batches of events to BigQuery tables. + *

This class extends the `BaseChangeConsumer` and provides functionality for uploading batches of + * Debezium change events to BigQuery tables. It leverages the BigQuery Java client library + * to perform data loading and table management tasks. + * @author Ismail Simsek */ + @Named("bigquerybatch") @Dependent public class BatchBigqueryChangeConsumer extends BaseChangeConsumer { diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java index 78cb287..c854b2d 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java @@ -38,6 +38,7 @@ import java.util.Optional; /** + * Utility class for common BigQuery operations in Debezium Server. * @author Ismail Simsek */ public class ConsumerUtil { diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java index 1de97b6..cacf423 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java @@ -36,10 +36,14 @@ import java.util.stream.Collectors; /** - * Implementation of the consumer that delivers the messages to Bigquery - * + * Implementation of a Debezium change consumer that delivers events to BigQuery tables in a streaming manner. + *

This class extends the `BaseChangeConsumer` and provides functionality for streaming batches of + * Debezium change events to BigQuery tables using the BigQuery Write API. It offers features like + * upsert, deduplication, and table schema management. + * @author Ismail Simsek */ + @Named("bigquerystream") @Dependent @Beta diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java index 9bbe136..066737c 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java @@ -14,7 +14,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +/** + * Class responsible for writing data to BigQuery in a streaming manner using the BigQuery Write API. + * This class creates and manages a {@link JsonStreamWriter} writer for a specific BigQuery table. It offers functionality + * for adding data in JSON format and handling potential errors during the write process. + */ public class StreamDataWriter { private static final int MAX_RECREATE_COUNT = 3; private final BigQueryWriteClient client; From c2fde62b93577fbb8f4c13081070b792f5878477 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 14 Nov 2024 13:08:45 +0100 Subject: [PATCH 3/3] Separate stream and batch event conversion part3 --- .../java/io/debezium/server/bigquery/BaseChangeConsumer.java | 2 +- .../java/io/debezium/server/bigquery/BaseRecordConverter.java | 2 +- .../debezium/server/bigquery/BatchBigqueryChangeConsumer.java | 2 +- .../src/main/java/io/debezium/server/bigquery/ConsumerUtil.java | 1 + .../debezium/server/bigquery/StreamBigqueryChangeConsumer.java | 2 +- .../main/java/io/debezium/server/bigquery/StreamDataWriter.java | 1 - 6 files changed, 5 insertions(+), 5 deletions(-) diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java index 196d9d7..09fcb74 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java @@ -44,7 +44,7 @@ * `DebeziumEngine.ChangeConsumer` interface, defining the `handleBatch` method for processing * batches of change events. Concrete implementations of this class need to provide specific logic * for uploading or persisting the converted data to the BigQuery destination. - + * * @author Ismail Simsek */ public abstract class BaseChangeConsumer extends io.debezium.server.BaseChangeConsumer implements DebeziumEngine.ChangeConsumer> { diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java index 86f6a55..645b932 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java @@ -24,7 +24,7 @@ * suitable for writing to BigQuery tables. It handles common tasks like schema conversion, * table constraint generation, and clustering configuration. Concrete implementations of this * class can extend this functionality for specific use cases. - + * * @author Ismail Simsek */ public abstract class BaseRecordConverter implements RecordConverter { diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java index e85a03c..3d0e9b1 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java @@ -33,7 +33,7 @@ *

This class extends the `BaseChangeConsumer` and provides functionality for uploading batches of * Debezium change events to BigQuery tables. It leverages the BigQuery Java client library * to perform data loading and table management tasks. - + * * @author Ismail Simsek */ diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java index c854b2d..af5c8d8 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java @@ -39,6 +39,7 @@ /** * Utility class for common BigQuery operations in Debezium Server. + * * @author Ismail Simsek */ public class ConsumerUtil { diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java index cacf423..19be510 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java @@ -40,7 +40,7 @@ *

This class extends the `BaseChangeConsumer` and provides functionality for streaming batches of * Debezium change events to BigQuery tables using the BigQuery Write API. It offers features like * upsert, deduplication, and table schema management. - + * * @author Ismail Simsek */ diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java index 066737c..6f7b925 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java @@ -18,7 +18,6 @@ * Class responsible for writing data to BigQuery in a streaming manner using the BigQuery Write API. * This class creates and manages a {@link JsonStreamWriter} writer for a specific BigQuery table. It offers functionality * for adding data in JSON format and handling potential errors during the write process. - */ public class StreamDataWriter { private static final int MAX_RECREATE_COUNT = 3;