diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
index bcb75e3ca8..8e6c8958df 100644
--- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml
+++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
@@ -197,21 +197,11 @@
1001
com/google/cloud/bigquery/storage/v1/StreamWriter$SingleConnectionOrConnectionPool
-
- 7002
- com/google/cloud/bigquery/storage/v1/JsonStreamWriter
- java.util.Map getMissingValueInterpretationMap()
-
7002
com/google/cloud/bigquery/storage/v1/JsonStreamWriter
void setMissingValueInterpretationMap(java.util.Map)
-
- 7002
- com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter
- java.util.Map getMissingValueInterpretationMap()
-
7002
com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
index f16cee9b54..d6446113bd 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
@@ -119,6 +119,12 @@ public long getInflightWaitSeconds() {
return this.schemaAwareStreamWriter.getInflightWaitSeconds();
}
+ /** @return the missing value interpretation map used for the writer. */
+ public Map
+ getMissingValueInterpretationMap() {
+ return this.schemaAwareStreamWriter.getMissingValueInterpretationMap();
+ }
+
/**
* newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by
* StreamWriter by default.
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java
index 0c4b76eba5..b16a1d81de 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java
@@ -299,6 +299,12 @@ public long getInflightWaitSeconds() {
return streamWriter.getInflightWaitSeconds();
}
+ /** @return the missing value interpretation map used for the writer. */
+ public Map
+ getMissingValueInterpretationMap() {
+ return streamWriter.getMissingValueInterpretationMap();
+ }
+
/** Sets all StreamWriter settings. */
private void setStreamWriterSettings(
@Nullable TransportChannelProvider channelProvider,
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
index 9e9e56a688..c6e920192b 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
@@ -893,6 +893,8 @@ public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception {
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
testBigQueryWrite.addResponse(createAppendResponse(1));
+ // Verify the map before the writer is refreshed
+ assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
testBigQueryWrite.addResponse(createAppendResponse(2));
testBigQueryWrite.addResponse(createAppendResponse(3));
@@ -944,6 +946,8 @@ public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception {
testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()
|| testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema());
+ // Verify the map after the writer is refreshed
+ assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
assertEquals(
testBigQueryWrite.getAppendRequests().get(3).getDefaultMissingValueInterpretation(),
MissingValueInterpretation.DEFAULT_VALUE);
@@ -1618,6 +1622,8 @@ public void testAppendWithMissingValueMap() throws Exception {
.setTraceId("test:empty")
.build()) {
+ assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
+
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java
index 9d29d4cfd2..58f80dbce4 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java
@@ -1306,6 +1306,129 @@ public void testJsonStreamWriterSchemaUpdateConcurrent()
}
}
+ @Test
+ public void testJsonStreamWriterSchemaUpdateWithMissingValueInterpretationMap()
+ throws DescriptorValidationException, ExecutionException, IOException, InterruptedException,
+ ParseException {
+ String tableName = "SchemaUpdateMissingValueMapTestTable";
+ TableId tableId = TableId.of(DATASET, tableName);
+ tableInfo = TableInfo.newBuilder(tableId, defaultValueTableDefinition).build();
+ bigquery.create(tableInfo);
+ TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
+ WriteStream writeStream =
+ client.createWriteStream(
+ CreateWriteStreamRequest.newBuilder()
+ .setParent(parent.toString())
+ .setWriteStream(
+ WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
+ .build());
+ Map missingValueMap = new HashMap<>();
+ missingValueMap.put(
+ "foo_with_default", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
+ missingValueMap.put(
+ "date_with_default_to_current", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
+
+ try (JsonStreamWriter jsonStreamWriter =
+ JsonStreamWriter.newBuilder(writeStream.getName(), client)
+ .setMissingValueInterpretationMap(missingValueMap)
+ .build()) {
+ // Verify the missing value map
+ assertEquals(missingValueMap, jsonStreamWriter.getMissingValueInterpretationMap());
+
+ // First append with the current schema
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("bar_without_default", "existing_col_before_update");
+ JSONArray jsonArr = new JSONArray();
+ jsonArr.put(jsonObject);
+ ApiFuture response1 = jsonStreamWriter.append(jsonArr, 0);
+ assertEquals(0, response1.get().getAppendResult().getOffset().getValue());
+
+ // Add a column to the table
+ Field newCol =
+ Field.newBuilder("new_col_without_default", StandardSQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ ArrayList updatedFields =
+ new ArrayList<>(defaultValueTableDefinition.getSchema().getFields());
+ updatedFields.add(newCol);
+ Schema updatedSchema = Schema.of(updatedFields);
+ TableInfo updatedTableInfo =
+ TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build();
+ Table updatedTable = bigquery.update(updatedTableInfo);
+ assertEquals(updatedSchema, updatedTable.getDefinition().getSchema());
+
+ // Continue writing rows until backend acknowledges schema update
+ JSONObject jsonObject2 = new JSONObject();
+ jsonObject2.put("bar_without_default", "no_schema_update_yet");
+ JSONArray jsonArr2 = new JSONArray();
+ jsonArr2.put(jsonObject2);
+
+ int nextI = 0;
+ for (int i = 1; i < 100; i++) {
+ ApiFuture response2 = jsonStreamWriter.append(jsonArr2, i);
+ assertEquals(i, response2.get().getAppendResult().getOffset().getValue());
+ if (response2.get().hasUpdatedSchema()) {
+ nextI = i + 1;
+ break;
+ } else {
+ Thread.sleep(1000);
+ }
+ }
+
+ // Write using the new schema with 10 new requests
+ JSONObject updatedCol = new JSONObject();
+ updatedCol.put("bar_without_default", "existing_col");
+ updatedCol.put("new_col_without_default", "new_col");
+ JSONArray updatedJsonArr = new JSONArray();
+ updatedJsonArr.put(updatedCol);
+ for (int i = nextI; i < nextI + 10; i++) {
+ ApiFuture response3 = jsonStreamWriter.append(updatedJsonArr, i);
+ assertEquals(i, response3.get().getAppendResult().getOffset().getValue());
+ }
+
+ // List all rows to verify table data correctness
+ Iterator rowsIter = bigquery.listTableData(tableId).getValues().iterator();
+
+ // Verify 1st row (with "existing_col_before_update")
+ FieldValueList currentRow = rowsIter.next();
+ assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
+ assertEquals("existing_col_before_update", currentRow.get(1).getStringValue());
+ assertFalse(currentRow.get(2).getStringValue().isEmpty());
+ // Check whether the recorded value is close enough.
+ Instant parsedInstant =
+ Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
+ assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));
+
+ // A few rows (with "no_schema_update_yet") until the schema was updated
+ for (int j = 1; j < nextI; j++) {
+ currentRow = rowsIter.next();
+ assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
+ assertEquals("no_schema_update_yet", currentRow.get(1).getStringValue());
+ // Check whether the recorded value is close enough.
+ parsedInstant =
+ Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
+ assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));
+ }
+ // 10 rows after schema update with new column included
+ for (int j = nextI; j < nextI + 10; j++) {
+ currentRow = rowsIter.next();
+ assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
+ assertEquals("existing_col", currentRow.get(1).getStringValue());
+ assertFalse(currentRow.get(2).getStringValue().isEmpty());
+ // Check whether the recorded value is close enough.
+ parsedInstant =
+ Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
+ assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));
+ // Verify the new column
+ assertEquals("new_col", currentRow.get(3).getStringValue());
+ }
+ assertFalse(rowsIter.hasNext());
+
+ // Verify that the missing value map hasn't changed
+ assertEquals(missingValueMap, jsonStreamWriter.getMissingValueInterpretationMap());
+ }
+ }
+
@Test
public void testJsonStreamWriterWithFlexibleColumnName()
throws IOException, InterruptedException, ExecutionException,