diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 98310ebb8e14e..fc2b727ec79cf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -357,6 +357,10 @@ private WriteResult expandTriggered(PCollection> inpu PCollection> tempTables = writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView); + List> sideInputsForUpdateSchema = + Lists.newArrayList(tempLoadJobIdPrefixView); + sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs()); + PCollection successfulMultiPartitionWrites = tempTables // Now that the load job has happened, we want the rename to happen immediately. @@ -368,6 +372,22 @@ private WriteResult expandTriggered(PCollection> inpu .setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder())) .apply("GroupByKey", GroupByKey.create()) .apply("Extract Values", Values.create()) + .apply( + ParDo.of( + new UpdateSchemaDestination( + bigQueryServices, + tempLoadJobIdPrefixView, + loadJobProjectId, + WriteDisposition.WRITE_APPEND, + CreateDisposition.CREATE_NEVER, + maxRetryJobs, + ignoreUnknownValues, + kmsKey, + rowWriterFactory.getSourceFormat(), + useAvroLogicalTypes, + schemaUpdateOptions, + dynamicDestinations)) + .withSideInputs(sideInputsForUpdateSchema)) .apply( "WriteRenameTriggered", ParDo.of( @@ -444,9 +464,29 @@ public WriteResult expandUntriggered(PCollection> inp PCollection successfulSinglePartitionWrites = writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView); + List> sideInputsForUpdateSchema = + Lists.newArrayList(tempLoadJobIdPrefixView); + sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs()); + PCollection successfulMultiPartitionWrites = writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView) .apply("ReifyRenameInput", new ReifyAsIterable<>()) + .apply( + ParDo.of( + new UpdateSchemaDestination( + bigQueryServices, + tempLoadJobIdPrefixView, + loadJobProjectId, + WriteDisposition.WRITE_APPEND, + CreateDisposition.CREATE_NEVER, + maxRetryJobs, + ignoreUnknownValues, + kmsKey, + rowWriterFactory.getSourceFormat(), + useAvroLogicalTypes, + schemaUpdateOptions, + dynamicDestinations)) + .withSideInputs(sideInputsForUpdateSchema)) .apply( "WriteRenameUntriggered", ParDo.of( @@ -679,17 +719,6 @@ private PCollection> writeTempTables( ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), WritePartition.ResultCoder.INSTANCE); - // If the final destination table exists already (and we're appending to it), then the temp - // tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object - // with one that makes this happen. - @SuppressWarnings("unchecked") - DynamicDestinations destinations = dynamicDestinations; - if (createDisposition.equals(CreateDisposition.CREATE_IF_NEEDED) - || createDisposition.equals(CreateDisposition.CREATE_NEVER)) { - destinations = - DynamicDestinationsHelpers.matchTableDynamicDestinations(destinations, bigQueryServices); - } - Coder tableDestinationCoder = clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of(); @@ -711,7 +740,7 @@ private PCollection> writeTempTables( WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, sideInputs, - destinations, + dynamicDestinations, loadJobProjectId, maxRetryJobs, ignoreUnknownValues, @@ -720,7 +749,7 @@ private PCollection> writeTempTables( useAvroLogicalTypes, // Note that we can't pass through the schema update options when creating temporary // tables. They also shouldn't be needed. See BEAM-12482 for additional details. - Collections.emptySet(), + schemaUpdateOptions, tempDataset)) .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE)); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 23fce8ba6ff04..598431407d145 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import com.google.api.client.http.AbstractInputStreamContent; import com.google.api.core.ApiFuture; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.Job; @@ -68,6 +69,14 @@ public interface JobService extends AutoCloseable { /** Start a BigQuery load job. */ void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) throws InterruptedException, IOException; + + /** Start a BigQuery load job with stream content. */ + void startLoadJob( + JobReference jobRef, + JobConfigurationLoad loadConfig, + AbstractInputStreamContent streamContent) + throws InterruptedException, IOException; + /** Start a BigQuery extract job. */ void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) throws InterruptedException, IOException; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 2949150c9eeb7..0185b5a40693e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -22,6 +22,7 @@ import com.google.api.client.googleapis.json.GoogleJsonError; import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; +import com.google.api.client.http.AbstractInputStreamContent; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; @@ -236,6 +237,28 @@ public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) startJob(job, errorExtractor, client); } + /** + * {@inheritDoc} + * + *

Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. + * + * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. + */ + @Override + public void startLoadJob( + JobReference jobRef, JobConfigurationLoad loadConfig, AbstractInputStreamContent stream) + throws InterruptedException, IOException { + Map labelMap = new HashMap<>(); + Job job = + new Job() + .setJobReference(jobRef) + .setConfiguration( + new JobConfiguration() + .setLoad(loadConfig) + .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap))); + startJobStream(job, stream, errorExtractor, client, Sleeper.DEFAULT, createDefaultBackoff()); + } + /** * {@inheritDoc} * @@ -338,6 +361,47 @@ static void startJob( lastException); } + static void startJobStream( + Job job, + AbstractInputStreamContent streamContent, + ApiErrorExtractor errorExtractor, + Bigquery client, + Sleeper sleeper, + BackOff backOff) + throws IOException, InterruptedException { + JobReference jobReference = job.getJobReference(); + Exception exception; + do { + try { + client + .jobs() + .insert(jobReference.getProjectId(), job, streamContent) + .setPrettyPrint(false) + .execute(); + LOG.info( + "Started BigQuery job: {}.\n{}", + jobReference, + formatBqStatusCommand(jobReference.getProjectId(), jobReference.getJobId())); + return; + } catch (IOException e) { + if (errorExtractor.itemAlreadyExists(e)) { + LOG.info( + "BigQuery job " + jobReference + " already exists, will not retry inserting it:", + e); + return; // SUCCEEDED + } + // ignore and retry + LOG.info("Failed to insert job " + jobReference + ", will retry:", e); + exception = e; + } + } while (nextBackOff(sleeper, backOff)); + throw new IOException( + String.format( + "Unable to insert job: %s, aborting after %d .", + jobReference.getJobId(), MAX_RPC_RETRIES), + exception); + } + @Override public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException { BackOff backoff = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java new file mode 100644 index 0000000000000..4ae1064bc431b --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.client.http.ByteArrayContent; +import com.google.api.services.bigquery.model.Clustering; +import com.google.api.services.bigquery.model.EncryptionConfiguration; +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({"nullness", "rawtypes"}) +public class UpdateSchemaDestination + extends DoFn< + Iterable>, + Iterable>> { + + private static final Logger LOG = LoggerFactory.getLogger(UpdateSchemaDestination.class); + private final BigQueryServices bqServices; + private final PCollectionView loadJobIdPrefixView; + private final ValueProvider loadJobProjectId; + private transient @Nullable DatasetService datasetService; + + private final int maxRetryJobs; + private final @Nullable String kmsKey; + private final String sourceFormat; + private final boolean useAvroLogicalTypes; + private @Nullable BigQueryServices.JobService jobService; + private final boolean ignoreUnknownValues; + private final Set schemaUpdateOptions; + private BigQueryIO.Write.WriteDisposition writeDisposition; + private BigQueryIO.Write.CreateDisposition createDisposition; + private DynamicDestinations dynamicDestinations; + + private static class PendingJobData { + final BigQueryHelpers.PendingJob retryJob; + final TableDestination tableDestination; + final BoundedWindow window; + + public PendingJobData( + BigQueryHelpers.PendingJob retryJob, + TableDestination tableDestination, + BoundedWindow window) { + this.retryJob = retryJob; + this.tableDestination = tableDestination; + this.window = window; + } + } + + private List pendingJobs = Lists.newArrayList(); + + public UpdateSchemaDestination( + BigQueryServices bqServices, + PCollectionView loadJobIdPrefixView, + @Nullable ValueProvider loadJobProjectId, + BigQueryIO.Write.WriteDisposition writeDisposition, + BigQueryIO.Write.CreateDisposition createDisposition, + int maxRetryJobs, + boolean ignoreUnknownValues, + String kmsKey, + String sourceFormat, + boolean useAvroLogicalTypes, + Set schemaUpdateOptions, + DynamicDestinations dynamicDestinations) { + this.loadJobProjectId = loadJobProjectId; + this.loadJobIdPrefixView = loadJobIdPrefixView; + this.bqServices = bqServices; + this.maxRetryJobs = maxRetryJobs; + this.ignoreUnknownValues = ignoreUnknownValues; + this.kmsKey = kmsKey; + this.sourceFormat = sourceFormat; + this.useAvroLogicalTypes = useAvroLogicalTypes; + this.schemaUpdateOptions = schemaUpdateOptions; + this.createDisposition = createDisposition; + this.writeDisposition = writeDisposition; + this.dynamicDestinations = dynamicDestinations; + } + + @StartBundle + public void startBundle(StartBundleContext c) { + pendingJobs.clear(); + } + + @ProcessElement + public void processElement( + @Element Iterable> element, + ProcessContext context, + BoundedWindow window) + throws IOException { + Object destination = null; + for (KV entry : element) { + destination = entry.getKey(); + if (destination != null) { + break; + } + } + if (destination != null) { + TableDestination tableDestination = dynamicDestinations.getTable(destination); + TableSchema schema = dynamicDestinations.getSchema(destination); + TableReference tableReference = tableDestination.getTableReference(); + String jobIdPrefix = + BigQueryResourceNaming.createJobIdWithDestination( + context.sideInput(loadJobIdPrefixView), + tableDestination, + 1, + context.pane().getIndex()); + jobIdPrefix += "_schemaUpdateDestination"; + BigQueryHelpers.PendingJob updateSchemaDestinationJob = + startZeroLoadJob( + getJobService(context.getPipelineOptions().as(BigQueryOptions.class)), + getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + tableReference, + tableDestination.getTimePartitioning(), + tableDestination.getClustering(), + schema, + writeDisposition, + createDisposition, + schemaUpdateOptions); + if (updateSchemaDestinationJob != null) { + pendingJobs.add(new PendingJobData(updateSchemaDestinationJob, tableDestination, window)); + } + context.output(element); + } + } + + @Teardown + public void onTeardown() { + try { + if (datasetService != null) { + datasetService.close(); + datasetService = null; + } + if (jobService != null) { + jobService.close(); + jobService = null; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @FinishBundle + public void finishBundle(FinishBundleContext context) throws Exception { + DatasetService datasetService = + getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class)); + BigQueryHelpers.PendingJobManager jobManager = new BigQueryHelpers.PendingJobManager(); + for (final PendingJobData pendingJobData : pendingJobs) { + jobManager = + jobManager.addPendingJob( + pendingJobData.retryJob, + j -> { + try { + if (pendingJobData.tableDestination.getTableDescription() != null) { + TableReference ref = pendingJobData.tableDestination.getTableReference(); + datasetService.patchTableDescription( + ref.clone() + .setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), + pendingJobData.tableDestination.getTableDescription()); + } + return null; + } catch (IOException | InterruptedException e) { + return e; + } + }); + } + jobManager.waitForDone(); + } + + private BigQueryHelpers.PendingJob startZeroLoadJob( + BigQueryServices.JobService jobService, + DatasetService datasetService, + String jobIdPrefix, + TableReference tableReference, + TimePartitioning timePartitioning, + Clustering clustering, + @Nullable TableSchema schema, + BigQueryIO.Write.WriteDisposition writeDisposition, + BigQueryIO.Write.CreateDisposition createDisposition, + Set schemaUpdateOptions) { + JobConfigurationLoad loadConfig = + new JobConfigurationLoad() + .setDestinationTable(tableReference) + .setSchema(schema) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()) + .setSourceFormat(sourceFormat) + .setIgnoreUnknownValues(ignoreUnknownValues) + .setUseAvroLogicalTypes(useAvroLogicalTypes); + if (schemaUpdateOptions != null) { + List options = + schemaUpdateOptions.stream() + .map(Enum::name) + .collect(Collectors.toList()); + loadConfig.setSchemaUpdateOptions(options); + } + if (!loadConfig + .getWriteDisposition() + .equals(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE.toString()) + && !loadConfig + .getWriteDisposition() + .equals(BigQueryIO.Write.WriteDisposition.WRITE_APPEND.toString())) { + return null; + } + Table destinationTable = null; + try { + destinationTable = datasetService.getTable(tableReference); + if (destinationTable == null) { + return null; // no need to update schema ahead if table does not exists + } + } catch (IOException | InterruptedException e) { + LOG.warn("Failed to get table {} with {}", tableReference, e.toString()); + throw new RuntimeException(e); + } + if (destinationTable.getSchema().equals(schema)) { + return null; // no need to update schema ahead if schema is already the same + } + if (timePartitioning != null) { + loadConfig.setTimePartitioning(timePartitioning); + // only set clustering if timePartitioning is set + if (clustering != null) { + loadConfig.setClustering(clustering); + } + } + if (kmsKey != null) { + loadConfig.setDestinationEncryptionConfiguration( + new EncryptionConfiguration().setKmsKeyName(kmsKey)); + } + String projectId = + loadJobProjectId == null || loadJobProjectId.get() == null + ? tableReference.getProjectId() + : loadJobProjectId.get(); + String bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, tableReference.getProjectId(), tableReference.getDatasetId()); + + BigQueryHelpers.PendingJob retryJob = + new BigQueryHelpers.PendingJob( + // Function to load the data. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + LOG.info( + "Loading zero rows using job {}, job id {} iteration {}", + tableReference, + jobRef, + jobId.getRetryIndex()); + try { + jobService.startLoadJob( + jobRef, loadConfig, new ByteArrayContent("text/plain", new byte[0])); + } catch (IOException | InterruptedException e) { + LOG.warn("Load job {} failed with {}", jobRef, e.toString()); + throw new RuntimeException(e); + } + return null; + }, + // Function to poll the result of a load job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + // Function to lookup a job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.getJob(jobRef); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + }, + maxRetryJobs, + jobIdPrefix); + return retryJob; + } + + private BigQueryServices.JobService getJobService(PipelineOptions pipelineOptions) + throws IOException { + if (jobService == null) { + jobService = bqServices.getJobService(pipelineOptions.as(BigQueryOptions.class)); + } + return jobService; + } + + private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { + if (datasetService == null) { + datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); + } + return datasetService; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 737ab4ff41dec..f30388b523cc7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -269,7 +269,7 @@ public void processElement( } else if (tempTable) { // In this case, we are writing to a temp table and always need to create it. // WRITE_TRUNCATE is set so that we properly handle retries of this pane. - writeDisposition = WriteDisposition.WRITE_TRUNCATE; + writeDisposition = WriteDisposition.WRITE_APPEND; createDisposition = CreateDisposition.CREATE_IF_NEEDED; } @@ -286,6 +286,7 @@ public void processElement( writeDisposition, createDisposition, schemaUpdateOptions); + pendingJobs.add( new PendingJobData( window, @@ -354,7 +355,6 @@ public void finishBundle(FinishBundleContext c) throws Exception { BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), pendingJob.tableDestination.getTableDescription()); } - Result result = new AutoValue_WriteTables_Result( BigQueryHelpers.toJsonString(pendingJob.tableReference), @@ -451,6 +451,7 @@ public PCollection> expand( .apply(GroupByKey.create()) .apply(Values.create()) .apply(ParDo.of(new GarbageCollectTemporaryFiles())); + return writeTablesOutputs.get(mainOutputTag); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java index 81017f45c57b4..6856a5c20f2e7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import com.google.api.client.http.AbstractInputStreamContent; import com.google.api.client.json.JsonFactory; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; @@ -177,6 +178,15 @@ public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) } } + @Override + public void startLoadJob( + JobReference jobRef, + JobConfigurationLoad loadConfig, + AbstractInputStreamContent streamContent) + throws InterruptedException, IOException { + // TODO + } + @Override public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) throws IOException { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java index ed75a66880759..dd9ab1508f3fd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaUpdateOptionsIT.java @@ -28,6 +28,7 @@ import java.security.SecureRandom; import java.util.Arrays; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -219,4 +220,68 @@ public void testAllowFieldRelaxation() throws Exception { List> expectedResult = Arrays.asList(Arrays.asList(value)); runWriteTest(schemaUpdateOptions, tableName, newSchema, rowToInsert, testQuery, expectedResult); } + + @Test + public void runWriteTestTempTables() throws Exception { + String tableName = makeTestTable(); + + Set schemaUpdateOptions = + EnumSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION); + + TableSchema schema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("new_field").setType("STRING"), + new TableFieldSchema().setName("optional_field").setType("STRING"), + new TableFieldSchema() + .setName("required_field") + .setType("STRING") + .setMode("REQUIRED"))); + + String[] values = {"meow", "bark"}; + + String testQuery = + String.format( + "SELECT new_field, required_field FROM [%s.%s];", BIG_QUERY_DATASET_ID, tableName); + + List> expectedResult = + Arrays.asList(Arrays.asList(values[0], values[1]), Arrays.asList(values[1], values[0])); + + Options options = TestPipeline.testingPipelineOptions().as(Options.class); + options.setTempLocation(options.getTempRoot() + "/bq_it_temp"); + + Pipeline p = Pipeline.create(options); + Create.Values input = + Create.of( + Arrays.asList( + new TableRow().set("new_field", values[0]).set("required_field", values[1]), + new TableRow().set("new_field", values[1]).set("required_field", values[0]))); + + Write writer = + BigQueryIO.writeTableRows() + .to(String.format("%s:%s.%s", options.getProject(), BIG_QUERY_DATASET_ID, tableName)) + .withSchema(schema) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) + .withSchemaUpdateOptions(schemaUpdateOptions) + .withMaxBytesPerPartition(1) + .withMaxFilesPerPartition(1); + + p.apply(input).apply(writer); + p.run().waitUntilFinish(); + + QueryResponse response = BQ_CLIENT.queryWithRetries(testQuery, project); + + List> result = + response.getRows().stream() + .map( + row -> + row.getF().stream() + .map(cell -> cell.getV().toString()) + .collect(Collectors.toList())) + .collect(Collectors.toList()); + + assertEquals(new HashSet<>(expectedResult), new HashSet<>(result)); + } }