Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix!: return API objects in streams #967

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"merge-stream": "^2.0.0",
"p-queue": "^6.0.2",
"protobufjs": "^6.10.1",
"pumpify": "2.0.1",
"split-array-stream": "^2.0.0",
"stack-trace": "0.0.10",
"stream-events": "^1.0.4",
Expand Down
32 changes: 23 additions & 9 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import * as through from 'through2';
import {CallOptions, grpc, Operation as GaxOperation} from 'google-gax';
import {Backup} from './backup';
import {BatchTransaction, TransactionIdentifier} from './batch-transaction';
import * as pumpify from 'pumpify';
import {
google as databaseAdmin,
google,
Expand Down Expand Up @@ -371,8 +372,7 @@ class Database extends common.GrpcServiceObject {
[CLOUD_RESOURCE_HEADER]: this.formattedName_,
};
this.request = instance.request;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
this.requestStream = instance.requestStream as any;
this.requestStream = instance.requestStream;
this.pool_.on('error', this.emit.bind(this, 'error'));
this.pool_.open();
this.queryOptions_ = Object.assign(
Expand Down Expand Up @@ -1513,13 +1513,27 @@ class Database extends common.GrpcServiceObject {
delete gaxOpts.pageToken;
}

return this.requestStream({
client: 'SpannerClient',
method: 'listSessionsStream',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
});
return new pumpify.obj([
olavloite marked this conversation as resolved.
Show resolved Hide resolved
this.requestStream({
client: 'SpannerClient',
method: 'listSessionsStream',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
}),
new Transform({
objectMode: true,
transform: (
chunk: databaseAdmin.spanner.v1.ISession,
enc: string,
cb: Function
) => {
const session = this.session(chunk.name!);
session.metadata = chunk;
cb(null, session);
},
}),
]);
}

getSnapshot(options?: TimestampBounds): Promise<[Snapshot]>;
Expand Down
30 changes: 23 additions & 7 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import {PartitionedDml, Snapshot, Transaction} from './transaction';
import grpcGcpModule = require('grpc-gcp');
const grpcGcp = grpcGcpModule(grpc);
import * as v1 from './v1';
import * as pumpify from 'pumpify';
import {Transform} from 'stream';

// eslint-disable-next-line @typescript-eslint/no-var-requires
const gcpApiConfig = require('./spanner_grpc_config.json');
Expand Down Expand Up @@ -619,13 +621,27 @@ class Spanner extends GrpcService {
delete gaxOpts.pageToken;
}

return this.requestStream({
client: 'InstanceAdminClient',
method: 'listInstancesStream',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
});
return new pumpify.obj([
this.requestStream({
client: 'InstanceAdminClient',
method: 'listInstancesStream',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
}),
new Transform({
objectMode: true,
transform: (
chunk: instanceAdmin.spanner.admin.instance.v1.IInstance,
enc: string,
cb: Function
) => {
const instance = this.instance(chunk.name!);
instance.metadata = chunk;
cb(null, instance);
},
}),
]);
}

getInstanceConfigs(
Expand Down
51 changes: 36 additions & 15 deletions src/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import arrify = require('arrify');
import {ServiceObjectConfig, GetConfig} from '@google-cloud/common';
// eslint-disable-next-line @typescript-eslint/no-var-requires
const common = require('./common-grpc/service-object');
import * as pumpify from 'pumpify';
import {promisifyAll} from '@google-cloud/promisify';
import * as extend from 'extend';
import snakeCase = require('lodash.snakecase');
Expand All @@ -33,7 +34,7 @@ import {
PagedOptionsWithFilter,
CLOUD_RESOURCE_HEADER,
} from './common';
import {Duplex} from 'stream';
import {Duplex, Transform} from 'stream';
import {SessionPoolOptions, SessionPool} from './session-pool';
import {grpc, Operation as GaxOperation, CallOptions} from 'google-gax';
import {Backup} from './backup';
Expand Down Expand Up @@ -433,13 +434,23 @@ class Instance extends common.GrpcServiceObject {
delete gaxOpts.pageToken;
}

return this.requestStream({
client: 'DatabaseAdminClient',
method: 'listBackupsStream',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
});
return new pumpify.obj([
this.requestStream({
client: 'DatabaseAdminClient',
method: 'listBackupsStream',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
}),
new Transform({
objectMode: true,
transform: (chunk: IBackup, enc: string, cb: Function) => {
const backup = this.backup(chunk.name!);
backup.metadata = chunk;
cb(null, backup);
},
}),
]);
}

getBackupOperations(
Expand Down Expand Up @@ -1326,13 +1337,23 @@ class Instance extends common.GrpcServiceObject {
delete gaxOpts.pageToken;
}

return this.requestStream({
client: 'DatabaseAdminClient',
method: 'listDatabasesStream',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
});
return new pumpify.obj([
this.requestStream({
client: 'DatabaseAdminClient',
method: 'listDatabasesStream',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
}),
new Transform({
objectMode: true,
transform: (chunk: IDatabase, enc: string, cb: Function) => {
const database = this.database(chunk.name!, {min: 0});
database.metadata = chunk;
cb(null, database);
},
}),
]);
}

getMetadata(
Expand Down
43 changes: 40 additions & 3 deletions system-test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import * as crypto from 'crypto';
import * as extend from 'extend';
import * as is from 'is';
import * as uuid from 'uuid';
import {Backup, Database, Spanner, Instance} from '../src';
import {Backup, Database, Instance, Session, Spanner} from '../src';
import {Key} from '../src/table';
import {
ReadRequest,
Expand Down Expand Up @@ -114,7 +114,7 @@ describe('Spanner', () => {
after(async () => {
if (generateInstanceForTest) {
// Deleting all backups before an instance can be deleted.
await Promise.all(
await Promise.all<google.protobuf.IEmpty>(
RESOURCES_TO_CLEAN.filter(
resource => resource instanceof Backup
).map(backup => backup.delete(GAX_OPTIONS))
Expand All @@ -124,7 +124,7 @@ describe('Spanner', () => {
* All databasess will automatically be deleted with instance.
* @see {@link https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.admin.instance.v1#google.spanner.admin.instance.v1.InstanceAdmin.DeleteInstance}
*/
await Promise.all(
await Promise.all<google.protobuf.IEmpty>(
RESOURCES_TO_CLEAN.filter(
resource => resource instanceof Instance
).map(instance => instance.delete(GAX_OPTIONS))
Expand Down Expand Up @@ -1269,6 +1269,23 @@ describe('Spanner', () => {
);
});

it('should list backups steaming', done => {
const backups: Backup[] = [];
instance
.getBackupsStream()
.on('error', assert.ifError)
.on('data', backup => {
backups.push(backup);
})
.on('end', () => {
assert.ok(backups.length > 0);
assert.ok(
backups.find(b => b.formattedName_ === backup1.formattedName_)
);
done();
});
});

it('should list backups with pagination', async () => {
const [page1, , resp1] = await instance.getBackups({
pageSize: 1,
Expand Down Expand Up @@ -1480,6 +1497,26 @@ describe('Spanner', () => {

await Promise.all(sessions.map(session => session.delete()));
});

it('should list sessions', async () => {
const [sessions] = await DATABASE.getSessions();
assert.ok(sessions.length > 0);
assert.ok(sessions.find(s => s.id === session.id));
});

it('should list sessions streaming', done => {
const sessions: Session[] = [];
DATABASE.getSessionsStream()
.on('error', assert.ifError)
.on('data', sessionObj => {
sessions.push(sessionObj);
})
.on('end', () => {
assert.ok(sessions.length > 0);
assert.ok(sessions.find(s => s.id === session.id));
done();
});
});
});

describe('Tables', () => {
Expand Down
35 changes: 29 additions & 6 deletions test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import * as extend from 'extend';
import {ApiError, util} from '@google-cloud/common';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';
import {Transform, Duplex} from 'stream';
import {Transform} from 'stream';
import * as through from 'through2';
import * as pfy from '@google-cloud/promisify';
import {grpc} from 'google-gax';
Expand All @@ -32,6 +32,7 @@ import {Instance} from '../src';
import {MockError} from './mockserver/mockspanner';
import {IOperation} from '../src/instance';
import {CLOUD_RESOURCE_HEADER} from '../src/common';
import duplexify = require('duplexify');
import {google} from '../protos/protos';
import EncryptionType = google.spanner.admin.database.v1.RestoreDatabaseEncryptionConfig.EncryptionType;

Expand Down Expand Up @@ -2186,7 +2187,7 @@ describe('Database', () => {
const OPTIONS = {
gaxOptions: {autoPaginate: false},
} as db.GetSessionsOptions;
const returnValue = {} as Duplex;
const returnValue = through.obj();

it('should make and return the correct gax API call', () => {
const expectedReqOpts = extend({}, OPTIONS, {
Expand All @@ -2207,7 +2208,7 @@ describe('Database', () => {
};

const returnedValue = database.getSessionsStream(OPTIONS);
assert.strictEqual(returnedValue, returnValue);
assert.ok(returnedValue instanceof duplexify);
});

it('should pass pageSize and pageToken from gaxOptions into reqOpts', () => {
Expand All @@ -2234,7 +2235,7 @@ describe('Database', () => {
};

const returnedValue = database.getSessionsStream(options);
assert.strictEqual(returnedValue, returnValue);
assert.ok(returnedValue instanceof duplexify);
});

it('pageSize and pageToken in options should take precedence over gaxOptions', () => {
Expand Down Expand Up @@ -2268,7 +2269,7 @@ describe('Database', () => {
};

const returnedValue = database.getSessionsStream(options);
assert.strictEqual(returnedValue, returnValue);
assert.ok(returnedValue instanceof duplexify);
});

it('should not require options', () => {
Expand All @@ -2283,7 +2284,29 @@ describe('Database', () => {
};

const returnedValue = database.getSessionsStream();
assert.strictEqual(returnedValue, returnValue);
assert.ok(returnedValue instanceof duplexify);
});

it('should create and return Session objects', done => {
const stream = through.obj();
database.requestStream = () => {
return stream;
};
const protoSession = {name: 'session'};
setImmediate(() => {
stream.push(protoSession);
stream.push(null);
});

database
.getSessionsStream()
.on('error', assert.ifError)
.on('data', session => {
assert.ok(session instanceof FakeSession);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
assert.strictEqual((session as any).metadata, protoSession);
})
.on('end', done);
});
});

Expand Down
Loading