diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseBigqueryStorageConfig.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseBigqueryStorageConfig.java index 5568e4b..64a5523 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseBigqueryStorageConfig.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseBigqueryStorageConfig.java @@ -9,7 +9,11 @@ import java.util.Optional; import java.util.Properties; - +/** + * Abstract base class for BigQuery storage configuration. + *

+ * Provides common methods for accessing Debezium configuration properties + */ public abstract class BaseBigqueryStorageConfig { protected static final Logger LOG = LoggerFactory.getLogger(BaseBigqueryStorageConfig.class); protected Properties configCombined = new Properties(); diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java index 3ea260e..09fcb74 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java @@ -38,7 +38,12 @@ import java.util.stream.Collectors; /** - * Implementation of the consumer that delivers the messages into Amazon S3 destination. + * Abstract base class for Debezium change consumers that deliver messages to a BigQuery destination. + *

This class provides a foundation for building Debezium change consumers that handle + * incoming change events and deliver them to a BigQuery destination. It implements the + * `DebeziumEngine.ChangeConsumer` interface, defining the `handleBatch` method for processing + * batches of change events. Concrete implementations of this class need to provide specific logic + * for uploading or persisting the converted data to the BigQuery destination. * * @author Ismail Simsek */ diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java index 966068d..645b932 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java @@ -19,6 +19,12 @@ import java.util.stream.Collectors; /** + * Abstract base class for Debezium event record conversion to BigQuery format. + *

This class provides the foundation for converting Debezium event records into a format + * suitable for writing to BigQuery tables. It handles common tasks like schema conversion, + * table constraint generation, and clustering configuration. Concrete implementations of this + * class can extend this functionality for specific use cases. + * * @author Ismail Simsek */ public abstract class BaseRecordConverter implements RecordConverter { diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java index b935066..3d0e9b1 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java @@ -29,10 +29,14 @@ import java.util.Optional; /** - * Implementation of the consumer that delivers the messages into Amazon S3 destination. + * Implementation of a Debezium change consumer that delivers batches of events to BigQuery tables. + *

This class extends the `BaseChangeConsumer` and provides functionality for uploading batches of + * Debezium change events to BigQuery tables. It leverages the BigQuery Java client library + * to perform data loading and table management tasks. * * @author Ismail Simsek */ + @Named("bigquerybatch") @Dependent public class BatchBigqueryChangeConsumer extends BaseChangeConsumer { diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java index 78cb287..af5c8d8 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/ConsumerUtil.java @@ -38,6 +38,8 @@ import java.util.Optional; /** + * Utility class for common BigQuery operations in Debezium Server. + * * @author Ismail Simsek */ public class ConsumerUtil { diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java index ddc9ba1..dd68e3e 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/RecordConverter.java @@ -5,6 +5,7 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.TableConstraints; import io.debezium.DebeziumException; +import org.json.JSONObject; public interface RecordConverter { String destination(); @@ -15,6 +16,24 @@ default T convert(Schema schema) throws DebeziumException { return convert(schema, false, false); } + /** + * Converts a Debezium event for BigQuery ingestion. + * + *

This method transforms a Debezium event into a format suitable for writing to BigQuery. + * It performs value conversions based on the specified BigQuery schema to ensure compatibility + * with the target data types. + * + * @param schema The BigQuery schema to use for data type conversions. + * @param upsert Indicates whether to enable upsert operations. When set to `true`, a + * `_CHANGE_TYPE` column is added to track record changes (insert, update, delete). + * @param upsertKeepDeletes When set to `true` in conjunction with `upsert`, the last deleted + * row is retained. + * @param The return type of the converted event. This can be a serialized JSON string + * for batch operations or a {@link JSONObject} for stream operations, depending on the + * specific implementation. + * @return The converted Debezium event, ready for BigQuery ingestion. + * @throws DebeziumException If an error occurs during the conversion process. + */ T convert(Schema schema, boolean upsert, boolean upsertKeepDeletes) throws DebeziumException; JsonNode key(); @@ -23,9 +42,45 @@ default T convert(Schema schema) throws DebeziumException { JsonNode keySchema(); + + /** + * Extracts table constraints from the Debezium event's key schema. + * + *

This method analyzes the provided Debezium event's key schema and generates a + * corresponding BigQuery table constraints configuration. The primary key constraint + * is derived from the fields present in the key schema. This configuration can be + * used to optimize table storage and query performance in BigQuery. + * + * @return The generated BigQuery table {@link TableConstraints}. + */ TableConstraints tableConstraints(); + + /** + * Determines the clustering fields for the BigQuery table. + * + *

This method extracts field names from the Debezium event's key schema and combines them + * with a user-provided clustering field (defaulting to `__source_ts_ms`) to create a suitable + * clustering configuration for the BigQuery table. Clustering the table on these fields + * can significantly improve query performance, especially for time-series and analytical workloads. + * + * @param clusteringField The additional field to use for clustering, beyond the fields + * extracted from the key schema. + * @return The generated BigQuery {@link Clustering} configuration. + */ Clustering tableClustering(String clusteringField); + + /** + * Transforms a Debezium event's value schema into a corresponding BigQuery schema. + * + *

This method analyzes the provided Debezium event's value schema and generates a + * compatible BigQuery schema. It considers factors like field types and + * the `binaryAsString` flag to accurately map the schema. + * + * @param binaryAsString Indicates whether binary fields should be represented as strings + * or binary types in the resulting BigQuery schema. + * @return The generated BigQuery {@link Schema}. + */ Schema tableSchema(Boolean binaryAsString); } diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java index 1de97b6..19be510 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java @@ -36,10 +36,14 @@ import java.util.stream.Collectors; /** - * Implementation of the consumer that delivers the messages to Bigquery + * Implementation of a Debezium change consumer that delivers events to BigQuery tables in a streaming manner. + *

This class extends the `BaseChangeConsumer` and provides functionality for streaming batches of + * Debezium change events to BigQuery tables using the BigQuery Write API. It offers features like + * upsert, deduplication, and table schema management. * * @author Ismail Simsek */ + @Named("bigquerystream") @Dependent @Beta diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java index 9bbe136..6f7b925 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java @@ -14,7 +14,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; - +/** + * Class responsible for writing data to BigQuery in a streaming manner using the BigQuery Write API. + * This class creates and manages a {@link JsonStreamWriter} writer for a specific BigQuery table. It offers functionality + * for adding data in JSON format and handling potential errors during the write process. + */ public class StreamDataWriter { private static final int MAX_RECREATE_COUNT = 3; private final BigQueryWriteClient client;