Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-12482] Update Schema Destination during Bigquery load job when using temporary tables using zeroloadjob #17365

Merged
merged 9 commits into from
May 13, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
PCollection<KV<TableDestination, WriteTables.Result>> tempTables =
writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView);

List<PCollectionView<?>> sideInputsForUpdateSchema =
Lists.newArrayList(tempLoadJobIdPrefixView);
sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs());

PCollection<TableDestination> successfulMultiPartitionWrites =
tempTables
// Now that the load job has happened, we want the rename to happen immediately.
Expand All @@ -368,6 +372,22 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
.setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder()))
.apply("GroupByKey", GroupByKey.create())
.apply("Extract Values", Values.create())
.apply(
ParDo.of(
new UpdateSchemaDestination(
bigQueryServices,
tempLoadJobIdPrefixView,
loadJobProjectId,
WriteDisposition.WRITE_APPEND,
CreateDisposition.CREATE_NEVER,
maxRetryJobs,
ignoreUnknownValues,
kmsKey,
rowWriterFactory.getSourceFormat(),
useAvroLogicalTypes,
schemaUpdateOptions,
dynamicDestinations))
.withSideInputs(sideInputsForUpdateSchema))
.apply(
"WriteRenameTriggered",
ParDo.of(
Expand Down Expand Up @@ -444,9 +464,29 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
PCollection<TableDestination> successfulSinglePartitionWrites =
writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView);

List<PCollectionView<?>> sideInputsForUpdateSchema =
Lists.newArrayList(tempLoadJobIdPrefixView);
sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs());

PCollection<TableDestination> successfulMultiPartitionWrites =
writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView)
.apply("ReifyRenameInput", new ReifyAsIterable<>())
.apply(
ParDo.of(
new UpdateSchemaDestination(
bigQueryServices,
tempLoadJobIdPrefixView,
loadJobProjectId,
WriteDisposition.WRITE_APPEND,
CreateDisposition.CREATE_NEVER,
maxRetryJobs,
ignoreUnknownValues,
kmsKey,
rowWriterFactory.getSourceFormat(),
useAvroLogicalTypes,
schemaUpdateOptions,
dynamicDestinations))
.withSideInputs(sideInputsForUpdateSchema))
.apply(
"WriteRenameUntriggered",
ParDo.of(
Expand Down Expand Up @@ -679,17 +719,6 @@ private PCollection<KV<TableDestination, WriteTables.Result>> writeTempTables(
ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
WritePartition.ResultCoder.INSTANCE);

// If the final destination table exists already (and we're appending to it), then the temp
// tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object
// with one that makes this happen.
@SuppressWarnings("unchecked")
DynamicDestinations<?, DestinationT> destinations = dynamicDestinations;
if (createDisposition.equals(CreateDisposition.CREATE_IF_NEEDED)
|| createDisposition.equals(CreateDisposition.CREATE_NEVER)) {
destinations =
DynamicDestinationsHelpers.matchTableDynamicDestinations(destinations, bigQueryServices);
}

Coder<TableDestination> tableDestinationCoder =
clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of();

Expand All @@ -711,7 +740,7 @@ private PCollection<KV<TableDestination, WriteTables.Result>> writeTempTables(
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
sideInputs,
destinations,
dynamicDestinations,
loadJobProjectId,
maxRetryJobs,
ignoreUnknownValues,
Expand All @@ -720,7 +749,7 @@ private PCollection<KV<TableDestination, WriteTables.Result>> writeTempTables(
useAvroLogicalTypes,
// Note that we can't pass through the schema update options when creating temporary
// tables. They also shouldn't be needed. See BEAM-12482 for additional details.
Collections.emptySet(),
schemaUpdateOptions,
tempDataset))
.setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.client.http.AbstractInputStreamContent;
import com.google.api.core.ApiFuture;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.Job;
Expand Down Expand Up @@ -66,6 +67,14 @@ public interface JobService extends AutoCloseable {
/** Start a BigQuery load job. */
void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
throws InterruptedException, IOException;

/** Start a BigQuery load job with stream content. */
void startLoadJob(
JobReference jobRef,
JobConfigurationLoad loadConfig,
AbstractInputStreamContent streamContent)
throws InterruptedException, IOException;

/** Start a BigQuery extract job. */
void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
throws InterruptedException, IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
import com.google.api.client.http.AbstractInputStreamContent;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
Expand Down Expand Up @@ -233,6 +234,28 @@ public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
startJob(job, errorExtractor, client);
}

/**
* {@inheritDoc}
*
* <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.
*
* @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
*/
@Override
public void startLoadJob(
JobReference jobRef, JobConfigurationLoad loadConfig, AbstractInputStreamContent stream)
throws InterruptedException, IOException {
Map<String, String> labelMap = new HashMap<>();
Job job =
new Job()
.setJobReference(jobRef)
.setConfiguration(
new JobConfiguration()
.setLoad(loadConfig)
.setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
startJobStream(job, stream, errorExtractor, client, Sleeper.DEFAULT, createDefaultBackoff());
}

/**
* {@inheritDoc}
*
Expand Down Expand Up @@ -335,6 +358,47 @@ static void startJob(
lastException);
}

static void startJobStream(
Job job,
AbstractInputStreamContent streamContent,
ApiErrorExtractor errorExtractor,
Bigquery client,
Sleeper sleeper,
BackOff backOff)
throws IOException, InterruptedException {
JobReference jobReference = job.getJobReference();
Exception exception;
do {
try {
client
.jobs()
.insert(jobReference.getProjectId(), job, streamContent)
.setPrettyPrint(false)
.execute();
LOG.info(
"Started BigQuery job: {}.\n{}",
jobReference,
formatBqStatusCommand(jobReference.getProjectId(), jobReference.getJobId()));
return;
} catch (IOException e) {
if (errorExtractor.itemAlreadyExists(e)) {
LOG.info(
"BigQuery job " + jobReference + " already exists, will not retry inserting it:",
e);
return; // SUCCEEDED
}
// ignore and retry
LOG.info("Failed to insert job " + jobReference + ", will retry:", e);
exception = e;
}
} while (nextBackOff(sleeper, backOff));
throw new IOException(
String.format(
"Unable to insert job: %s, aborting after %d .",
jobReference.getJobId(), MAX_RPC_RETRIES),
exception);
}

@Override
public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException {
BackOff backoff =
Expand Down
Loading