Skip to content

Commit

Permalink
Improve storage classes and add test using bigquery emulator docker c…
Browse files Browse the repository at this point in the history
…ontainer
  • Loading branch information
ismailsimsek committed Oct 30, 2024
1 parent c4b9c45 commit 1ed4e17
Show file tree
Hide file tree
Showing 15 changed files with 414 additions and 297 deletions.
6 changes: 6 additions & 0 deletions debezium-server-bigquery-sinks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@
<version>${version.quarkus}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-mockito</artifactId>
<version>${version.quarkus}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
Expand Down Expand Up @@ -156,23 +157,41 @@ public static TableResult executeQuery(BigQuery bqClient, String query) throws S
return ConsumerUtil.executeQuery(bqClient, query, null);
}

public static InstantiatingGrpcChannelProvider bigQueryTransportChannelProvider(Boolean isBigqueryDevEmulator, Optional<String> bigQueryCustomGRPCHost) {

InstantiatingGrpcChannelProvider.Builder builder = BigQueryWriteSettings.defaultGrpcTransportProviderBuilder();
if (isBigqueryDevEmulator) {
builder.setChannelConfigurator(ManagedChannelBuilder::usePlaintext);
}

if (!bigQueryCustomGRPCHost.orElse("").isEmpty()) {
builder.setEndpoint(bigQueryCustomGRPCHost.get());
}

builder
.setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1))
.setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1))
.setKeepAliveWithoutCalls(true)
.setChannelsPerCpu(2)
;
return builder.build();
}

public static BigQueryWriteSettings bigQueryWriteSettings(Boolean isBigqueryDevEmulator, BigQuery bqClient, Optional<String> bigQueryCustomGRPCHost) throws IOException {
BigQueryWriteSettings.Builder builder = BigQueryWriteSettings.newBuilder();

if (isBigqueryDevEmulator) {
// it is bigquery emulator
builder.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
.build()
);
builder.setCredentialsProvider(NoCredentialsProvider.create());
} else {
builder.setCredentialsProvider(FixedCredentialsProvider.create(bqClient.getOptions().getCredentials()));
}

if (!bigQueryCustomGRPCHost.orElse("").isEmpty()) {
builder.setEndpoint(bigQueryCustomGRPCHost.get());
}

builder.setTransportChannelProvider(bigQueryTransportChannelProvider(isBigqueryDevEmulator, bigQueryCustomGRPCHost));
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@
package io.debezium.server.bigquery;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.*;
import com.google.cloud.bigquery.storage.v1.*;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.debezium.DebeziumException;
import io.grpc.Status;
import io.grpc.Status.Code;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
Expand All @@ -33,7 +31,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

/**
Expand All @@ -45,18 +42,8 @@
@Dependent
@Beta
public class StreamBigqueryChangeConsumer extends AbstractChangeConsumer {
protected static final ConcurrentHashMap<String, DataWriter> jsonStreamWriters = new ConcurrentHashMap<>();
protected static final ConcurrentHashMap<String, StreamDataWriter> jsonStreamWriters = new ConcurrentHashMap<>();
static final ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
private static final int MAX_RETRY_COUNT = 3;
private static final ImmutableList<Code> RETRIABLE_ERROR_CODES =
ImmutableList.of(
Code.INTERNAL,
Code.ABORTED,
Code.CANCELLED,
Code.FAILED_PRECONDITION,
Code.DEADLINE_EXCEEDED,
Code.UNAVAILABLE);

public static BigQueryWriteClient bigQueryWriteClient;
@ConfigProperty(name = "debezium.sink.batch.destination-regexp", defaultValue = "")
protected Optional<String> destinationRegexp;
Expand Down Expand Up @@ -107,7 +94,7 @@ void connect() throws InterruptedException {

@PreDestroy
void closeStreams() {
for (Map.Entry<String, DataWriter> sw : jsonStreamWriters.entrySet()) {
for (Map.Entry<String, StreamDataWriter> sw : jsonStreamWriters.entrySet()) {
try {
sw.getValue().close(bigQueryWriteClient);
} catch (Exception e) {
Expand All @@ -131,13 +118,16 @@ public void initizalize() throws InterruptedException {
}
}

private DataWriter getDataWriter(Table table) {
private StreamDataWriter getDataWriter(Table table) {
try {
return new DataWriter(
StreamDataWriter writer = new StreamDataWriter(
TableName.of(table.getTableId().getProject(), table.getTableId().getDataset(), table.getTableId().getTable()),
bigQueryWriteClient,
ignoreUnknownFields
ignoreUnknownFields,
ConsumerUtil.bigQueryTransportChannelProvider(isBigqueryDevEmulator, bigQueryCustomGRPCHost)
);
writer.initialize();
return writer;
} catch (DescriptorValidationException | IOException | InterruptedException e) {
throw new DebeziumException("Failed to initialize stream writer for table " + table.getTableId(), e);
}
Expand All @@ -148,7 +138,7 @@ public long uploadDestination(String destination, List<RecordConverter> data) {
long numRecords = data.size();
Table table = getTable(destination, data.get(0));
// get stream writer create if not yet exists!
DataWriter writer = jsonStreamWriters.computeIfAbsent(destination, k -> getDataWriter(table));
StreamDataWriter writer = jsonStreamWriters.computeIfAbsent(destination, k -> getDataWriter(table));
try {
// running with upsert mode deduplicate data! for the tables having Primary Key
// for the tables without primary key run append mode
Expand Down Expand Up @@ -305,61 +295,6 @@ private Table updateTableSchema(Table table, Schema updatedSchema, String destin
return table;
}

protected static class DataWriter {
private final JsonStreamWriter streamWriter;

public DataWriter(TableName parentTable, BigQueryWriteClient client,
Boolean ignoreUnknownFields)
throws DescriptorValidationException, IOException, InterruptedException {

// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
streamWriter = JsonStreamWriter
.newBuilder(parentTable.toString(), client)
.setIgnoreUnknownFields(ignoreUnknownFields)
.build();
}

private void appendSync(JSONArray data, int retryCount) throws DescriptorValidationException,
IOException {
ApiFuture<AppendRowsResponse> future = streamWriter.append(data);
try {
AppendRowsResponse response = future.get();
if (response.hasError()) {
throw new DebeziumException("Failed to append data to stream. " + response.getError().getMessage());
}
} catch (InterruptedException | ExecutionException throwable) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information,
// see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
Status status = Status.fromThrowable(throwable);
if (retryCount < MAX_RETRY_COUNT
&& RETRIABLE_ERROR_CODES.contains(status.getCode())) {
// Since default stream appends are not ordered, we can simply retry the appends.
// Retrying with exclusive streams requires more careful consideration.
this.appendSync(data, ++retryCount);
// Mark the existing attempt as done since it's being retried.
} else {
throw new DebeziumException("Failed to append data to stream " + streamWriter.getStreamName() + "\n" + throwable.getMessage(),
throwable);
}
}

}

public void appendSync(JSONArray data) throws DescriptorValidationException, IOException {
this.appendSync(data, 0);
}

public void close(BigQueryWriteClient client) {
if (streamWriter != null) {
streamWriter.close();
client.finalizeWriteStream(streamWriter.getStreamName());
}
}
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package io.debezium.server.bigquery;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.v1.*;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.debezium.DebeziumException;
import org.json.JSONArray;
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;


public class StreamDataWriter {
private static final int MAX_RECREATE_COUNT = 3;
private final BigQueryWriteClient client;
private final Boolean ignoreUnknownFields;
private final InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider;
private final TableName parentTable;
private final Object lock = new Object();
JsonStreamWriter streamWriter;
private AtomicInteger recreateCount = new AtomicInteger(0);


public StreamDataWriter(TableName parentTable, BigQueryWriteClient client,
Boolean ignoreUnknownFields, InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider)
throws DescriptorValidationException, IOException, InterruptedException {
this.client = client;
this.ignoreUnknownFields = ignoreUnknownFields;
this.instantiatingGrpcChannelProvider = instantiatingGrpcChannelProvider;
this.parentTable = parentTable;
}

public void initialize()
throws DescriptorValidationException, IOException, InterruptedException {
streamWriter = createStreamWriter(this.parentTable.toString());
}

private JsonStreamWriter createStreamWriter(String parentTableName)
throws DescriptorValidationException, IOException, InterruptedException {
// https://cloud.google.com/bigquery/docs/write-api-streaming
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
return JsonStreamWriter.newBuilder(parentTableName, client)
.setIgnoreUnknownFields(ignoreUnknownFields)
.setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100)))
.setChannelProvider(instantiatingGrpcChannelProvider)
.setEnableConnectionPool(true)
// If value is missing in json and there is a default value configured on bigquery
// column, apply the default value to the missing value field.
.setDefaultMissingValueInterpretation(AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE)
.setRetrySettings(retrySettings)
.build();
}


public void appendSync(JSONArray data) throws DescriptorValidationException, IOException {
try {
synchronized (this.lock) {
if (!streamWriter.isUserClosed() && streamWriter.isClosed() && recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) {
streamWriter = createStreamWriter(streamWriter.getStreamName());
}
}

ApiFuture<AppendRowsResponse> future = streamWriter.append(data);
AppendRowsResponse response = future.get();
if (response.hasError()) {
throw new DebeziumException("Failed to append data to stream. Error Code:" + response.getError().getCode() + " Error Message:" + response.getError().getMessage());
}
} catch (Exception throwable) {
throw new DebeziumException("Failed to append data to stream " + streamWriter.getStreamName() + "\n" + throwable.getMessage(),
throwable);
}
}

public void close(BigQueryWriteClient client) {
if (streamWriter != null) {
streamWriter.close();
client.finalizeWriteStream(streamWriter.getStreamName());
}
}
}
Loading

0 comments on commit 1ed4e17

Please sign in to comment.