Skip to content

Commit

Permalink
Add javadoc code comments (#206)
Browse files Browse the repository at this point in the history
* Separate stream and batch event conversion part3

* Separate stream and batch event conversion part3

* Separate stream and batch event conversion part3
  • Loading branch information
ismailsimsek authored Nov 14, 2024
1 parent 168ee8f commit 6c94a08
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
import java.util.Optional;
import java.util.Properties;


/**
* Abstract base class for BigQuery storage configuration.
* <p>
* Provides common methods for accessing Debezium configuration properties
*/
public abstract class BaseBigqueryStorageConfig {
protected static final Logger LOG = LoggerFactory.getLogger(BaseBigqueryStorageConfig.class);
protected Properties configCombined = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@
import java.util.stream.Collectors;

/**
* Implementation of the consumer that delivers the messages into Amazon S3 destination.
* Abstract base class for Debezium change consumers that deliver messages to a BigQuery destination.
* <p>This class provides a foundation for building Debezium change consumers that handle
* incoming change events and deliver them to a BigQuery destination. It implements the
* `DebeziumEngine.ChangeConsumer` interface, defining the `handleBatch` method for processing
* batches of change events. Concrete implementations of this class need to provide specific logic
* for uploading or persisting the converted data to the BigQuery destination.
*
* @author Ismail Simsek
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
import java.util.stream.Collectors;

/**
* Abstract base class for Debezium event record conversion to BigQuery format.
* <p>This class provides the foundation for converting Debezium event records into a format
* suitable for writing to BigQuery tables. It handles common tasks like schema conversion,
* table constraint generation, and clustering configuration. Concrete implementations of this
* class can extend this functionality for specific use cases.
*
* @author Ismail Simsek
*/
public abstract class BaseRecordConverter implements RecordConverter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@
import java.util.Optional;

/**
* Implementation of the consumer that delivers the messages into Amazon S3 destination.
* Implementation of a Debezium change consumer that delivers batches of events to BigQuery tables.
* <p>This class extends the `BaseChangeConsumer` and provides functionality for uploading batches of
* Debezium change events to BigQuery tables. It leverages the BigQuery Java client library
* to perform data loading and table management tasks.
*
* @author Ismail Simsek
*/

@Named("bigquerybatch")
@Dependent
public class BatchBigqueryChangeConsumer<T> extends BaseChangeConsumer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.Optional;

/**
* Utility class for common BigQuery operations in Debezium Server.
*
* @author Ismail Simsek
*/
public class ConsumerUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableConstraints;
import io.debezium.DebeziumException;
import org.json.JSONObject;

public interface RecordConverter {
String destination();
Expand All @@ -15,6 +16,24 @@ default <T> T convert(Schema schema) throws DebeziumException {
return convert(schema, false, false);
}

/**
* Converts a Debezium event for BigQuery ingestion.
*
* <p>This method transforms a Debezium event into a format suitable for writing to BigQuery.
* It performs value conversions based on the specified BigQuery schema to ensure compatibility
* with the target data types.
*
* @param schema The BigQuery schema to use for data type conversions.
* @param upsert Indicates whether to enable upsert operations. When set to `true`, a
* `_CHANGE_TYPE` column is added to track record changes (insert, update, delete).
* @param upsertKeepDeletes When set to `true` in conjunction with `upsert`, the last deleted
* row is retained.
* @param <T> The return type of the converted event. This can be a serialized JSON string
* for batch operations or a {@link JSONObject} for stream operations, depending on the
* specific implementation.
* @return The converted Debezium event, ready for BigQuery ingestion.
* @throws DebeziumException If an error occurs during the conversion process.
*/
<T> T convert(Schema schema, boolean upsert, boolean upsertKeepDeletes) throws DebeziumException;

JsonNode key();
Expand All @@ -23,9 +42,45 @@ default <T> T convert(Schema schema) throws DebeziumException {

JsonNode keySchema();


/**
* Extracts table constraints from the Debezium event's key schema.
*
* <p>This method analyzes the provided Debezium event's key schema and generates a
* corresponding BigQuery table constraints configuration. The primary key constraint
* is derived from the fields present in the key schema. This configuration can be
* used to optimize table storage and query performance in BigQuery.
*
* @return The generated BigQuery table {@link TableConstraints}.
*/
TableConstraints tableConstraints();


/**
* Determines the clustering fields for the BigQuery table.
*
* <p>This method extracts field names from the Debezium event's key schema and combines them
* with a user-provided clustering field (defaulting to `__source_ts_ms`) to create a suitable
* clustering configuration for the BigQuery table. Clustering the table on these fields
* can significantly improve query performance, especially for time-series and analytical workloads.
*
* @param clusteringField The additional field to use for clustering, beyond the fields
* extracted from the key schema.
* @return The generated BigQuery {@link Clustering} configuration.
*/
Clustering tableClustering(String clusteringField);


/**
* Transforms a Debezium event's value schema into a corresponding BigQuery schema.
*
* <p>This method analyzes the provided Debezium event's value schema and generates a
* compatible BigQuery schema. It considers factors like field types and
* the `binaryAsString` flag to accurately map the schema.
*
* @param binaryAsString Indicates whether binary fields should be represented as strings
* or binary types in the resulting BigQuery schema.
* @return The generated BigQuery {@link Schema}.
*/
Schema tableSchema(Boolean binaryAsString);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@
import java.util.stream.Collectors;

/**
* Implementation of the consumer that delivers the messages to Bigquery
* Implementation of a Debezium change consumer that delivers events to BigQuery tables in a streaming manner.
* <p>This class extends the `BaseChangeConsumer` and provides functionality for streaming batches of
* Debezium change events to BigQuery tables using the BigQuery Write API. It offers features like
* upsert, deduplication, and table schema management.
*
* @author Ismail Simsek
*/

@Named("bigquerystream")
@Dependent
@Beta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;


/**
* Class responsible for writing data to BigQuery in a streaming manner using the BigQuery Write API.
* This class creates and manages a {@link JsonStreamWriter} writer for a specific BigQuery table. It offers functionality
* for adding data in JSON format and handling potential errors during the write process.
*/
public class StreamDataWriter {
private static final int MAX_RECREATE_COUNT = 3;
private final BigQueryWriteClient client;
Expand Down

0 comments on commit 6c94a08

Please sign in to comment.