diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseBigqueryStorageConfig.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseBigqueryStorageConfig.java index 5568e4b..64a5523 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseBigqueryStorageConfig.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseBigqueryStorageConfig.java @@ -9,7 +9,11 @@ import java.util.Optional; import java.util.Properties; - +/** + * Abstract base class for BigQuery storage configuration. + *
+ * Provides common methods for accessing Debezium configuration properties + */ public abstract class BaseBigqueryStorageConfig { protected static final Logger LOG = LoggerFactory.getLogger(BaseBigqueryStorageConfig.class); protected Properties configCombined = new Properties(); diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java index 3ea260e..09fcb74 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseChangeConsumer.java @@ -38,7 +38,12 @@ import java.util.stream.Collectors; /** - * Implementation of the consumer that delivers the messages into Amazon S3 destination. + * Abstract base class for Debezium change consumers that deliver messages to a BigQuery destination. + *
This class provides a foundation for building Debezium change consumers that handle + * incoming change events and deliver them to a BigQuery destination. It implements the + * `DebeziumEngine.ChangeConsumer` interface, defining the `handleBatch` method for processing + * batches of change events. Concrete implementations of this class need to provide specific logic + * for uploading or persisting the converted data to the BigQuery destination. * * @author Ismail Simsek */ diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java index 966068d..645b932 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BaseRecordConverter.java @@ -19,6 +19,12 @@ import java.util.stream.Collectors; /** + * Abstract base class for Debezium event record conversion to BigQuery format. + *
This class provides the foundation for converting Debezium event records into a format + * suitable for writing to BigQuery tables. It handles common tasks like schema conversion, + * table constraint generation, and clustering configuration. Concrete implementations of this + * class can extend this functionality for specific use cases. + * * @author Ismail Simsek */ public abstract class BaseRecordConverter implements RecordConverter { diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java index b935066..3d0e9b1 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchBigqueryChangeConsumer.java @@ -29,10 +29,14 @@ import java.util.Optional; /** - * Implementation of the consumer that delivers the messages into Amazon S3 destination. + * Implementation of a Debezium change consumer that delivers batches of events to BigQuery tables. + *
This class extends the `BaseChangeConsumer` and provides functionality for uploading batches of
+ * Debezium change events to BigQuery tables. It leverages the BigQuery Java client library
+ * to perform data loading and table management tasks.
*
* @author Ismail Simsek
*/
+
@Named("bigquerybatch")
@Dependent
public class BatchBigqueryChangeConsumer This method transforms a Debezium event into a format suitable for writing to BigQuery.
+ * It performs value conversions based on the specified BigQuery schema to ensure compatibility
+ * with the target data types.
+ *
+ * @param schema The BigQuery schema to use for data type conversions.
+ * @param upsert Indicates whether to enable upsert operations. When set to `true`, a
+ * `_CHANGE_TYPE` column is added to track record changes (insert, update, delete).
+ * @param upsertKeepDeletes When set to `true` in conjunction with `upsert`, the last deleted
+ * row is retained.
+ * @param This method analyzes the provided Debezium event's key schema and generates a
+ * corresponding BigQuery table constraints configuration. The primary key constraint
+ * is derived from the fields present in the key schema. This configuration can be
+ * used to optimize table storage and query performance in BigQuery.
+ *
+ * @return The generated BigQuery table {@link TableConstraints}.
+ */
TableConstraints tableConstraints();
+
+ /**
+ * Determines the clustering fields for the BigQuery table.
+ *
+ * This method extracts field names from the Debezium event's key schema and combines them
+ * with a user-provided clustering field (defaulting to `__source_ts_ms`) to create a suitable
+ * clustering configuration for the BigQuery table. Clustering the table on these fields
+ * can significantly improve query performance, especially for time-series and analytical workloads.
+ *
+ * @param clusteringField The additional field to use for clustering, beyond the fields
+ * extracted from the key schema.
+ * @return The generated BigQuery {@link Clustering} configuration.
+ */
Clustering tableClustering(String clusteringField);
+
+ /**
+ * Transforms a Debezium event's value schema into a corresponding BigQuery schema.
+ *
+ * This method analyzes the provided Debezium event's value schema and generates a
+ * compatible BigQuery schema. It considers factors like field types and
+ * the `binaryAsString` flag to accurately map the schema.
+ *
+ * @param binaryAsString Indicates whether binary fields should be represented as strings
+ * or binary types in the resulting BigQuery schema.
+ * @return The generated BigQuery {@link Schema}.
+ */
Schema tableSchema(Boolean binaryAsString);
}
diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java
index 1de97b6..19be510 100644
--- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java
+++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamBigqueryChangeConsumer.java
@@ -36,10 +36,14 @@
import java.util.stream.Collectors;
/**
- * Implementation of the consumer that delivers the messages to Bigquery
+ * Implementation of a Debezium change consumer that delivers events to BigQuery tables in a streaming manner.
+ * This class extends the `BaseChangeConsumer` and provides functionality for streaming batches of
+ * Debezium change events to BigQuery tables using the BigQuery Write API. It offers features like
+ * upsert, deduplication, and table schema management.
*
* @author Ismail Simsek
*/
+
@Named("bigquerystream")
@Dependent
@Beta
diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java
index 9bbe136..6f7b925 100644
--- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java
+++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/StreamDataWriter.java
@@ -14,7 +14,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
-
+/**
+ * Class responsible for writing data to BigQuery in a streaming manner using the BigQuery Write API.
+ * This class creates and manages a {@link JsonStreamWriter} writer for a specific BigQuery table. It offers functionality
+ * for adding data in JSON format and handling potential errors during the write process.
+ */
public class StreamDataWriter {
private static final int MAX_RECREATE_COUNT = 3;
private final BigQueryWriteClient client;