-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add support for missing value interpretation #428
Changes from 5 commits
1fdc05c
f5b00ba
2fb067f
770e4ca
02cf4b0
62e1879
b7dfc08
082c8c4
d3d9263
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,9 +24,60 @@ type ProtoData = | |
protos.google.cloud.bigquery.storage.v1.AppendRowsRequest.IProtoData; | ||
type IDescriptorProto = protos.google.protobuf.IDescriptorProto; | ||
type DescriptorProto = protos.google.protobuf.DescriptorProto; | ||
type MissingValueInterpretation = | ||
AppendRowRequest['defaultMissingValueInterpretation']; | ||
type MissingValueInterpretationMap = { | ||
[column: string]: MissingValueInterpretation; | ||
}; | ||
|
||
const DescriptorProto = protos.google.protobuf.DescriptorProto; | ||
|
||
export interface WriterOptions { | ||
/** The stream connection to the BigQuery streaming insert operation. */ | ||
connection: StreamConnection; | ||
|
||
/** The proto descriptor for the stream. */ | ||
protoDescriptor: IDescriptorProto; | ||
|
||
/** | ||
* Controls how missing values are interpreted by for a given stream. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has the extra 'by' too. |
||
* `missingValueInterpretations` set for individual colums can override the default chosen | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: s/colums/columns/ |
||
* with this option. | ||
* | ||
* For example, if you want to write | ||
* `NULL` instead of using default values for some columns, you can set | ||
* `defaultMissingValueInterpretation` to `DEFAULT_VALUE` and at the same | ||
* time, set `missingValueInterpretations` to `NULL_VALUE` on those columns. | ||
*/ | ||
defaultMissingValueInterpretation?: MissingValueInterpretation; | ||
|
||
/** | ||
* Control how missing values are interpreted for individual columns. | ||
* | ||
* You must provide an object to indicate how to interpret missing value for some fields. Missing | ||
* values are fields present in user schema but missing in rows. The key is | ||
* the field name. The value is the interpretation of missing values for the | ||
* field. | ||
* | ||
* For example, the following option would indicate that missing values in the "foo" | ||
* column are interpreted as null, whereas missing values in the "bar" column are | ||
* treated as the default value: | ||
* | ||
* { | ||
* "foo": 'DEFAULT_VALUE', | ||
* "bar": 'NULL_VALUE', | ||
* } | ||
* | ||
* If a field is not in this object and has missing values, the missing values | ||
* in this field are interpreted as NULL unless overridden with a default missing | ||
* value interpretation. | ||
* | ||
* Currently, field name can only be top-level column name, can't be a struct | ||
* field path like 'foo.bar'. | ||
*/ | ||
missingValueInterpretations?: MissingValueInterpretationMap; | ||
} | ||
|
||
/** | ||
* A BigQuery Storage API Writer that can be used to write data into BigQuery Table | ||
* using the Storage API. | ||
|
@@ -37,23 +88,26 @@ const DescriptorProto = protos.google.protobuf.DescriptorProto; | |
export class Writer { | ||
private _protoDescriptor: DescriptorProto; | ||
private _streamConnection: StreamConnection; | ||
private _defaultMissingValueInterpretation?: MissingValueInterpretation; | ||
private _missingValueInterpretations?: MissingValueInterpretationMap; | ||
|
||
/** | ||
* Creates a new Writer instance. | ||
* | ||
* @param {Object} params - The parameters for the JSONWriter. | ||
* @param {StreamConnection} params.connection - The stream connection | ||
* to the BigQuery streaming insert operation. | ||
* @param {IDescriptorProto} params.protoDescriptor - The proto descriptor | ||
* for the JSON rows. | ||
* @param {WriterOptions} params - The parameters for the Writer. | ||
* See WriterOptions docs for more information. | ||
*/ | ||
constructor(params: { | ||
connection: StreamConnection; | ||
protoDescriptor: IDescriptorProto; | ||
}) { | ||
const {connection, protoDescriptor} = params; | ||
constructor(params: WriterOptions) { | ||
const { | ||
connection, | ||
protoDescriptor, | ||
missingValueInterpretations, | ||
defaultMissingValueInterpretation, | ||
} = params; | ||
this._streamConnection = connection; | ||
this._protoDescriptor = new DescriptorProto(protoDescriptor); | ||
this._defaultMissingValueInterpretation = defaultMissingValueInterpretation; | ||
this._missingValueInterpretations = missingValueInterpretations; | ||
} | ||
|
||
/** | ||
|
@@ -72,6 +126,28 @@ export class Writer { | |
} | ||
} | ||
|
||
/** | ||
* Update how missing values are interpreted by for the given stream. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another 'by' |
||
* | ||
* @param {MissingValueInterpretation} defaultMissingValueInterpretation | ||
*/ | ||
setDefaultMissingValueInterpretation( | ||
defaultMissingValueInterpretation: MissingValueInterpretation | ||
) { | ||
this._defaultMissingValueInterpretation = defaultMissingValueInterpretation; | ||
} | ||
|
||
/** | ||
* Update how missing values are interpreted for individual columns. | ||
* | ||
* @param {MissingValueInterpretationMap} missingValueInterpretations | ||
*/ | ||
setMissingValueInterpretations( | ||
missingValueInterpretations: MissingValueInterpretationMap | ||
) { | ||
this._missingValueInterpretations = missingValueInterpretations; | ||
} | ||
|
||
/** | ||
* Schedules the writing of rows at given offset. | ||
* | ||
|
@@ -97,6 +173,10 @@ export class Writer { | |
protoDescriptor: this._protoDescriptor.toJSON(), | ||
}, | ||
}, | ||
defaultMissingValueInterpretation: | ||
this._defaultMissingValueInterpretation, | ||
missingValueInterpretations: this | ||
._missingValueInterpretations as AppendRowRequest['missingValueInterpretations'], | ||
offset, | ||
}; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -554,6 +554,105 @@ describe('managedwriter.WriterClient', () => { | |
}).timeout(30 * 1000); | ||
}); | ||
|
||
it('should fill default values when MissingValuesInterpretation is set', async () => { | ||
bqWriteClient.initialize(); | ||
const client = new WriterClient(); | ||
client.setClient(bqWriteClient); | ||
|
||
const updatedSchema: TableSchema = { | ||
fields: [ | ||
...(schema.fields || []), | ||
{ | ||
name: 'id', | ||
type: 'STRING', | ||
defaultValueExpression: 'GENERATE_UUID()', | ||
}, | ||
{ | ||
name: 'created_at', | ||
type: 'TIMESTAMP', | ||
defaultValueExpression: 'CURRENT_TIMESTAMP()', | ||
}, | ||
{ | ||
name: 'updated_at', | ||
type: 'TIMESTAMP', | ||
defaultValueExpression: 'CURRENT_TIMESTAMP()', | ||
}, | ||
], | ||
}; | ||
const [table] = await bigquery | ||
.dataset(datasetId) | ||
.createTable(tableId + '_default_values', {schema: updatedSchema}); | ||
const parent = `projects/${projectId}/datasets/${datasetId}/tables/${table.id}`; | ||
|
||
const storageSchema = | ||
adapt.convertBigQuerySchemaToStorageTableSchema(updatedSchema); | ||
const protoDescriptor: DescriptorProto = | ||
adapt.convertStorageSchemaToProto2Descriptor(storageSchema, 'root'); | ||
|
||
// Row 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These sample rows are confusing, as the row data isn't compatible with the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've pushed some updates to the test to make the full schema more explicit. The idea here is to send rows with some empty values and show that they are being filled or not accordingly to the MVI settings. |
||
const row1 = { | ||
customer_name: 'Ada Lovelace', | ||
row_num: 1, | ||
}; | ||
|
||
// Row 2 | ||
const row2 = { | ||
customer_name: 'Alan Turing', | ||
row_num: 2, | ||
}; | ||
|
||
try { | ||
const connection = await client.createStreamConnection({ | ||
streamType: managedwriter.PendingStream, | ||
destinationTable: parent, | ||
}); | ||
|
||
const streamId = connection.getStreamId(); | ||
const writer = new JSONWriter({ | ||
connection, | ||
protoDescriptor, | ||
defaultMissingValueInterpretation: 'DEFAULT_VALUE', | ||
missingValueInterpretations: { | ||
updated_at: 'NULL_VALUE', | ||
}, | ||
}); | ||
|
||
const pw = writer.appendRows([row1, row2], 0); | ||
const result = await pw.getResult(); | ||
|
||
assert.equal(result.error, null); | ||
|
||
const res = await connection.finalize(); | ||
connection.close(); | ||
assert.equal(res?.rowCount, 2); | ||
|
||
const commitResponse = await client.batchCommitWriteStream({ | ||
parent, | ||
writeStreams: [streamId], | ||
}); | ||
assert.equal(commitResponse.streamErrors?.length, 0); | ||
|
||
const [rows] = await bigquery.query( | ||
`SELECT * FROM \`${projectId}.${datasetId}.${table.id}\` order by row_num` | ||
); | ||
|
||
assert.strictEqual(rows.length, 2); | ||
const first = rows[0]; | ||
assert.notEqual(first.id, null); | ||
assert.notEqual(first.created_at, null); | ||
assert.equal(first.updated_at, null); | ||
|
||
const second = rows[1]; | ||
assert.notEqual(second.id, null); | ||
assert.notEqual(second.created_at, null); | ||
assert.equal(second.updated_at, null); | ||
|
||
writer.close(); | ||
} finally { | ||
client.close(); | ||
} | ||
}); | ||
|
||
describe('Error Scenarios', () => { | ||
it('send request with mismatched proto descriptor', async () => { | ||
bqWriteClient.initialize(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the 'by' is extra in there.