Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for missing value interpretation #428

45 changes: 34 additions & 11 deletions src/managedwriter/json_writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
import * as protobuf from 'protobufjs';
import * as protos from '../../protos/protos';
import {PendingWrite} from './pending_write';
import {StreamConnection, RemoveListener} from './stream_connection';
import {RemoveListener} from './stream_connection';
import * as adapt from '../adapt';
import {Writer} from './writer';
import {Writer, WriterOptions} from './writer';

type TableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema;
type MissingValueInterpretation =
protos.google.cloud.bigquery.storage.v1.AppendRowsRequest['defaultMissingValueInterpretation'];
type MissingValueInterpretationMap = {
[column: string]: MissingValueInterpretation;
};
type IInt64Value = protos.google.protobuf.IInt64Value;
type IDescriptorProto = protos.google.protobuf.IDescriptorProto;
type DescriptorProto = protos.google.protobuf.DescriptorProto;
Expand Down Expand Up @@ -55,16 +60,10 @@ export class JSONWriter {
/**
* Creates a new JSONWriter instance.
*
* @param {Object} params - The parameters for the JSONWriter.
* @param {StreamConnection} params.connection - The stream connection
* to the BigQuery streaming insert operation.
* @param {IDescriptorProto} params.protoDescriptor - The proto descriptor
* for the JSON rows.
* @param {WriterOptions} params - The parameters for the JSONWriter.
* See WriterOptions docs for more information.
*/
constructor(params: {
connection: StreamConnection;
protoDescriptor: IDescriptorProto;
}) {
constructor(params: WriterOptions) {
const {connection, protoDescriptor} = params;
this._writer = new Writer(params);
this._schemaListener = connection.onSchemaUpdated(this.onSchemaUpdated);
Expand Down Expand Up @@ -94,6 +93,30 @@ export class JSONWriter {
this._writer.setProtoDescriptor(protoDescriptor);
}

/**
* Update how missing values are interpreted by for the given stream.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the 'by' is extra in there.

*
* @param {MissingValueInterpretation} defaultMissingValueInterpretation
*/
setDefaultMissingValueInterpretation(
defaultMissingValueInterpretation: MissingValueInterpretation
) {
this._writer.setDefaultMissingValueInterpretation(
defaultMissingValueInterpretation
);
}

/**
* Update how missing values are interpreted for individual columns.
*
* @param {MissingValueInterpretationMap} missingValueInterpretations
*/
setMissingValueInterpretations(
missingValueInterpretations: MissingValueInterpretationMap
) {
this._writer.setMissingValueInterpretations(missingValueInterpretations);
}

/**
* Writes a JSONList that contains objects to be written to the BigQuery table by first converting
* the JSON data to protobuf messages, then using Writer's appendRows() to write the data at current end
Expand Down
100 changes: 90 additions & 10 deletions src/managedwriter/writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,60 @@ type ProtoData =
protos.google.cloud.bigquery.storage.v1.AppendRowsRequest.IProtoData;
type IDescriptorProto = protos.google.protobuf.IDescriptorProto;
type DescriptorProto = protos.google.protobuf.DescriptorProto;
type MissingValueInterpretation =
AppendRowRequest['defaultMissingValueInterpretation'];
type MissingValueInterpretationMap = {
[column: string]: MissingValueInterpretation;
};

const DescriptorProto = protos.google.protobuf.DescriptorProto;

export interface WriterOptions {
/** The stream connection to the BigQuery streaming insert operation. */
connection: StreamConnection;

/** The proto descriptor for the stream. */
protoDescriptor: IDescriptorProto;

/**
* Controls how missing values are interpreted by for a given stream.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the extra 'by' too.

* `missingValueInterpretations` set for individual colums can override the default chosen
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/colums/columns/

* with this option.
*
* For example, if you want to write
* `NULL` instead of using default values for some columns, you can set
* `defaultMissingValueInterpretation` to `DEFAULT_VALUE` and at the same
* time, set `missingValueInterpretations` to `NULL_VALUE` on those columns.
*/
defaultMissingValueInterpretation?: MissingValueInterpretation;

/**
* Control how missing values are interpreted for individual columns.
*
* You must provide an object to indicate how to interpret missing value for some fields. Missing
* values are fields present in user schema but missing in rows. The key is
* the field name. The value is the interpretation of missing values for the
* field.
*
* For example, the following option would indicate that missing values in the "foo"
* column are interpreted as null, whereas missing values in the "bar" column are
* treated as the default value:
*
* {
* "foo": 'DEFAULT_VALUE',
* "bar": 'NULL_VALUE',
* }
*
* If a field is not in this object and has missing values, the missing values
* in this field are interpreted as NULL unless overridden with a default missing
* value interpretation.
*
* Currently, field name can only be top-level column name, can't be a struct
* field path like 'foo.bar'.
*/
missingValueInterpretations?: MissingValueInterpretationMap;
}

/**
* A BigQuery Storage API Writer that can be used to write data into BigQuery Table
* using the Storage API.
Expand All @@ -37,23 +88,26 @@ const DescriptorProto = protos.google.protobuf.DescriptorProto;
export class Writer {
private _protoDescriptor: DescriptorProto;
private _streamConnection: StreamConnection;
private _defaultMissingValueInterpretation?: MissingValueInterpretation;
private _missingValueInterpretations?: MissingValueInterpretationMap;

/**
* Creates a new Writer instance.
*
* @param {Object} params - The parameters for the JSONWriter.
* @param {StreamConnection} params.connection - The stream connection
* to the BigQuery streaming insert operation.
* @param {IDescriptorProto} params.protoDescriptor - The proto descriptor
* for the JSON rows.
* @param {WriterOptions} params - The parameters for the Writer.
* See WriterOptions docs for more information.
*/
constructor(params: {
connection: StreamConnection;
protoDescriptor: IDescriptorProto;
}) {
const {connection, protoDescriptor} = params;
constructor(params: WriterOptions) {
const {
connection,
protoDescriptor,
missingValueInterpretations,
defaultMissingValueInterpretation,
} = params;
this._streamConnection = connection;
this._protoDescriptor = new DescriptorProto(protoDescriptor);
this._defaultMissingValueInterpretation = defaultMissingValueInterpretation;
this._missingValueInterpretations = missingValueInterpretations;
}

/**
Expand All @@ -72,6 +126,28 @@ export class Writer {
}
}

/**
* Update how missing values are interpreted by for the given stream.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another 'by'

*
* @param {MissingValueInterpretation} defaultMissingValueInterpretation
*/
setDefaultMissingValueInterpretation(
defaultMissingValueInterpretation: MissingValueInterpretation
) {
this._defaultMissingValueInterpretation = defaultMissingValueInterpretation;
}

/**
* Update how missing values are interpreted for individual columns.
*
* @param {MissingValueInterpretationMap} missingValueInterpretations
*/
setMissingValueInterpretations(
missingValueInterpretations: MissingValueInterpretationMap
) {
this._missingValueInterpretations = missingValueInterpretations;
}

/**
* Schedules the writing of rows at given offset.
*
Expand All @@ -97,6 +173,10 @@ export class Writer {
protoDescriptor: this._protoDescriptor.toJSON(),
},
},
defaultMissingValueInterpretation:
this._defaultMissingValueInterpretation,
missingValueInterpretations: this
._missingValueInterpretations as AppendRowRequest['missingValueInterpretations'],
offset,
};

Expand Down
99 changes: 99 additions & 0 deletions system-test/managed_writer_client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,105 @@ describe('managedwriter.WriterClient', () => {
}).timeout(30 * 1000);
});

it('should fill default values when MissingValuesInterpretation is set', async () => {
bqWriteClient.initialize();
const client = new WriterClient();
client.setClient(bqWriteClient);

const updatedSchema: TableSchema = {
fields: [
...(schema.fields || []),
{
name: 'id',
type: 'STRING',
defaultValueExpression: 'GENERATE_UUID()',
},
{
name: 'created_at',
type: 'TIMESTAMP',
defaultValueExpression: 'CURRENT_TIMESTAMP()',
},
{
name: 'updated_at',
type: 'TIMESTAMP',
defaultValueExpression: 'CURRENT_TIMESTAMP()',
},
],
};
const [table] = await bigquery
.dataset(datasetId)
.createTable(tableId + '_default_values', {schema: updatedSchema});
const parent = `projects/${projectId}/datasets/${datasetId}/tables/${table.id}`;

const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(updatedSchema);
const protoDescriptor: DescriptorProto =
adapt.convertStorageSchemaToProto2Descriptor(storageSchema, 'root');

// Row 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These sample rows are confusing, as the row data isn't compatible with the updatedSchema above. If you're wanting empty rows that's fine, just describe what you're doing so the interaction is clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pushed some updates to the test to make the full schema more explicit. The idea here is to send rows with some empty values and show that they are being filled or not accordingly to the MVI settings.

const row1 = {
customer_name: 'Ada Lovelace',
row_num: 1,
};

// Row 2
const row2 = {
customer_name: 'Alan Turing',
row_num: 2,
};

try {
const connection = await client.createStreamConnection({
streamType: managedwriter.PendingStream,
destinationTable: parent,
});

const streamId = connection.getStreamId();
const writer = new JSONWriter({
connection,
protoDescriptor,
defaultMissingValueInterpretation: 'DEFAULT_VALUE',
missingValueInterpretations: {
updated_at: 'NULL_VALUE',
},
});

const pw = writer.appendRows([row1, row2], 0);
const result = await pw.getResult();

assert.equal(result.error, null);

const res = await connection.finalize();
connection.close();
assert.equal(res?.rowCount, 2);

const commitResponse = await client.batchCommitWriteStream({
parent,
writeStreams: [streamId],
});
assert.equal(commitResponse.streamErrors?.length, 0);

const [rows] = await bigquery.query(
`SELECT * FROM \`${projectId}.${datasetId}.${table.id}\` order by row_num`
);

assert.strictEqual(rows.length, 2);
const first = rows[0];
assert.notEqual(first.id, null);
assert.notEqual(first.created_at, null);
assert.equal(first.updated_at, null);

const second = rows[1];
assert.notEqual(second.id, null);
assert.notEqual(second.created_at, null);
assert.equal(second.updated_at, null);

writer.close();
} finally {
client.close();
}
});

describe('Error Scenarios', () => {
it('send request with mismatched proto descriptor', async () => {
bqWriteClient.initialize();
Expand Down
Loading