Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add javadoc code comments #206

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
import java.util.Optional;
import java.util.Properties;


/**
* Abstract base class for BigQuery storage configuration.
* <p>
* Provides common methods for accessing Debezium configuration properties
*/
public abstract class BaseBigqueryStorageConfig {
protected static final Logger LOG = LoggerFactory.getLogger(BaseBigqueryStorageConfig.class);
protected Properties configCombined = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@
import java.util.stream.Collectors;

/**
* Implementation of the consumer that delivers the messages into Amazon S3 destination.
* Abstract base class for Debezium change consumers that deliver messages to a BigQuery destination.
* <p>This class provides a foundation for building Debezium change consumers that handle
* incoming change events and deliver them to a BigQuery destination. It implements the
* `DebeziumEngine.ChangeConsumer` interface, defining the `handleBatch` method for processing
* batches of change events. Concrete implementations of this class need to provide specific logic
* for uploading or persisting the converted data to the BigQuery destination.
*
* @author Ismail Simsek
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
import java.util.stream.Collectors;

/**
* Abstract base class for Debezium event record conversion to BigQuery format.
* <p>This class provides the foundation for converting Debezium event records into a format
* suitable for writing to BigQuery tables. It handles common tasks like schema conversion,
* table constraint generation, and clustering configuration. Concrete implementations of this
* class can extend this functionality for specific use cases.
*
* @author Ismail Simsek
*/
public abstract class BaseRecordConverter implements RecordConverter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@
import java.util.Optional;

/**
* Implementation of the consumer that delivers the messages into Amazon S3 destination.
* Implementation of a Debezium change consumer that delivers batches of events to BigQuery tables.
* <p>This class extends the `BaseChangeConsumer` and provides functionality for uploading batches of
* Debezium change events to BigQuery tables. It leverages the BigQuery Java client library
* to perform data loading and table management tasks.
*
* @author Ismail Simsek
*/

@Named("bigquerybatch")
@Dependent
public class BatchBigqueryChangeConsumer<T> extends BaseChangeConsumer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.Optional;

/**
* Utility class for common BigQuery operations in Debezium Server.
*
* @author Ismail Simsek
*/
public class ConsumerUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableConstraints;
import io.debezium.DebeziumException;
import org.json.JSONObject;

public interface RecordConverter {
String destination();
Expand All @@ -15,6 +16,24 @@ default <T> T convert(Schema schema) throws DebeziumException {
return convert(schema, false, false);
}

/**
* Converts a Debezium event for BigQuery ingestion.
*
* <p>This method transforms a Debezium event into a format suitable for writing to BigQuery.
* It performs value conversions based on the specified BigQuery schema to ensure compatibility
* with the target data types.
*
* @param schema The BigQuery schema to use for data type conversions.
* @param upsert Indicates whether to enable upsert operations. When set to `true`, a
* `_CHANGE_TYPE` column is added to track record changes (insert, update, delete).
* @param upsertKeepDeletes When set to `true` in conjunction with `upsert`, the last deleted
* row is retained.
* @param <T> The return type of the converted event. This can be a serialized JSON string
* for batch operations or a {@link JSONObject} for stream operations, depending on the
* specific implementation.
* @return The converted Debezium event, ready for BigQuery ingestion.
* @throws DebeziumException If an error occurs during the conversion process.
*/
<T> T convert(Schema schema, boolean upsert, boolean upsertKeepDeletes) throws DebeziumException;

JsonNode key();
Expand All @@ -23,9 +42,45 @@ default <T> T convert(Schema schema) throws DebeziumException {

JsonNode keySchema();


/**
* Extracts table constraints from the Debezium event's key schema.
*
* <p>This method analyzes the provided Debezium event's key schema and generates a
* corresponding BigQuery table constraints configuration. The primary key constraint
* is derived from the fields present in the key schema. This configuration can be
* used to optimize table storage and query performance in BigQuery.
*
* @return The generated BigQuery table {@link TableConstraints}.
*/
TableConstraints tableConstraints();


/**
* Determines the clustering fields for the BigQuery table.
*
* <p>This method extracts field names from the Debezium event's key schema and combines them
* with a user-provided clustering field (defaulting to `__source_ts_ms`) to create a suitable
* clustering configuration for the BigQuery table. Clustering the table on these fields
* can significantly improve query performance, especially for time-series and analytical workloads.
*
* @param clusteringField The additional field to use for clustering, beyond the fields
* extracted from the key schema.
* @return The generated BigQuery {@link Clustering} configuration.
*/
Clustering tableClustering(String clusteringField);


/**
* Transforms a Debezium event's value schema into a corresponding BigQuery schema.
*
* <p>This method analyzes the provided Debezium event's value schema and generates a
* compatible BigQuery schema. It considers factors like field types and
* the `binaryAsString` flag to accurately map the schema.
*
* @param binaryAsString Indicates whether binary fields should be represented as strings
* or binary types in the resulting BigQuery schema.
* @return The generated BigQuery {@link Schema}.
*/
Schema tableSchema(Boolean binaryAsString);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@
import java.util.stream.Collectors;

/**
* Implementation of the consumer that delivers the messages to Bigquery
* Implementation of a Debezium change consumer that delivers events to BigQuery tables in a streaming manner.
* <p>This class extends the `BaseChangeConsumer` and provides functionality for streaming batches of
* Debezium change events to BigQuery tables using the BigQuery Write API. It offers features like
* upsert, deduplication, and table schema management.
*
* @author Ismail Simsek
*/

@Named("bigquerystream")
@Dependent
@Beta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;


/**
* Class responsible for writing data to BigQuery in a streaming manner using the BigQuery Write API.
* This class creates and manages a {@link JsonStreamWriter} writer for a specific BigQuery table. It offers functionality
* for adding data in JSON format and handling potential errors during the write process.
*/
public class StreamDataWriter {
private static final int MAX_RECREATE_COUNT = 3;
private final BigQueryWriteClient client;
Expand Down