Skip to content

Commit

Permalink
fix: return API objects in streams
Browse files Browse the repository at this point in the history
  • Loading branch information
AVaksman committed May 21, 2020
1 parent 18049bc commit d4c8929
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 51 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"merge-stream": "^2.0.0",
"p-queue": "^6.0.2",
"protobufjs": "^6.8.6",
"pumpify": "2.0.1",
"split-array-stream": "^2.0.0",
"stack-trace": "0.0.10",
"stream-events": "^1.0.4",
Expand Down
30 changes: 24 additions & 6 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {grpc, Operation as GaxOperation, CallOptions} from 'google-gax';
import {Backup} from './backup';
import {BatchTransaction, TransactionIdentifier} from './batch-transaction';
import {google as databaseAdmin} from '../protos/protos';
import * as pumpify from 'pumpify';
import {
Instance,
CreateDatabaseOptions,
Expand Down Expand Up @@ -1348,12 +1349,29 @@ class Database extends GrpcServiceObject {
delete gaxOpts.pageToken;
}

return this.requestStream({
client: 'SpannerClient',
method: 'listSessionsStream',
reqOpts,
gaxOpts,
});
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
const transform = function (
this: Transform,
chunk: databaseAdmin.spanner.v1.ISession,
enc: string,
callback: Function
) {
const session = self.session(chunk.name!);
session.metadata = chunk;
this.push(session);
callback();
};

return new pumpify.obj([
this.requestStream({
client: 'SpannerClient',
method: 'listSessionsStream',
reqOpts,
gaxOpts,
}),
new Transform({objectMode: true, transform}),
]);
}

getSnapshot(options?: TimestampBounds): Promise<[Snapshot]>;
Expand Down
31 changes: 25 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import {PartitionedDml, Snapshot, Transaction} from './transaction';
import grpcGcpModule = require('grpc-gcp');
const grpcGcp = grpcGcpModule(grpc);
import * as v1 from './v1';
import * as pumpify from 'pumpify';
import {Transform} from 'stream';

// eslint-disable-next-line @typescript-eslint/no-var-requires
const gcpApiConfig = require('./spanner_grpc_config.json');
Expand Down Expand Up @@ -603,12 +605,29 @@ class Spanner extends GrpcService {
delete gaxOpts.pageToken;
}

return this.requestStream({
client: 'InstanceAdminClient',
method: 'listInstancesStream',
reqOpts,
gaxOpts,
});
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
const transform = function (
this: Transform,
chunk: instanceAdmin.spanner.admin.instance.v1.IInstance,
enc: string,
callback: Function
) {
const instance = self.instance(chunk.name!);
instance.metadata = chunk;
this.push(instance);
callback();
};

return new pumpify.obj([
this.requestStream({
client: 'InstanceAdminClient',
method: 'listInstancesStream',
reqOpts,
gaxOpts,
}),
new Transform({objectMode: true, transform}),
]);
}

getInstanceConfigs(
Expand Down
58 changes: 45 additions & 13 deletions src/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import arrify = require('arrify');
import {ServiceObjectConfig, GetConfig} from '@google-cloud/common';
// eslint-disable-next-line @typescript-eslint/no-var-requires
const common = require('./common-grpc/service-object');
import * as pumpify from 'pumpify';
import {promisifyAll} from '@google-cloud/promisify';
import * as extend from 'extend';
import snakeCase = require('lodash.snakecase');
Expand All @@ -32,7 +33,7 @@ import {
ResourceCallback,
PagedOptionsWithFilter,
} from './common';
import {Duplex} from 'stream';
import {Duplex, Transform} from 'stream';
import {SessionPoolOptions, SessionPool} from './session-pool';
import {grpc, Operation as GaxOperation} from 'google-gax';
import {Backup} from './backup';
Expand Down Expand Up @@ -420,13 +421,27 @@ class Instance extends common.GrpcServiceObject {
delete gaxOpts.pageSize;
delete gaxOpts.pageToken;
}
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
const transform = function (
this: Transform,
chunk: instanceAdmin.spanner.admin.database.v1.IBackup,
enc: string,
callback: Function
) {
this.push(self.backup(chunk.name!));
callback();
};

return this.requestStream({
client: 'DatabaseAdminClient',
method: 'listBackupsStream',
reqOpts,
gaxOpts,
});
return new pumpify.obj([
this.requestStream({
client: 'DatabaseAdminClient',
method: 'listBackupsStream',
reqOpts,
gaxOpts,
}),
new Transform({objectMode: true, transform}),
]);
}

getBackupOperations(
Expand Down Expand Up @@ -1264,12 +1279,29 @@ class Instance extends common.GrpcServiceObject {
delete gaxOpts.pageToken;
}

return this.requestStream({
client: 'DatabaseAdminClient',
method: 'listDatabasesStream',
reqOpts,
gaxOpts,
});
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
const transform = function (
this: Transform,
chunk: databaseAdmin.spanner.admin.database.v1.IDatabase,
enc: string,
callback: Function
) {
const database = self.database(chunk.name!);
database.metadata = chunk;
this.push(database);
callback();
};

return new pumpify.obj([
this.requestStream({
client: 'DatabaseAdminClient',
method: 'listDatabasesStream',
reqOpts,
gaxOpts,
}),
new Transform({objectMode: true, transform}),
]);
}

getMetadata(
Expand Down
45 changes: 41 additions & 4 deletions system-test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import * as crypto from 'crypto';
import * as extend from 'extend';
import * as is from 'is';
import * as uuid from 'uuid';
import {Backup, Database, Spanner, Instance} from '../src';
import {Backup, Database, Instance, Session, Spanner} from '../src';
import {Key} from '../src/table';
import {
ReadRequest,
Expand Down Expand Up @@ -94,7 +94,7 @@ describe('Spanner', () => {
after(async () => {
if (generateInstanceForTest) {
// Deleting all backups before an instance can be deleted.
await Promise.all(
await Promise.all<google.protobuf.IEmpty>(
RESOURCES_TO_CLEAN.filter(
resource => resource instanceof Backup
).map(backup => backup.delete())
Expand All @@ -104,7 +104,7 @@ describe('Spanner', () => {
* All databasess will automatically be deleted with instance.
* @see {@link https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.admin.instance.v1#google.spanner.admin.instance.v1.InstanceAdmin.DeleteInstance}
*/
await Promise.all(
await Promise.all<google.protobuf.IEmpty>(
RESOURCES_TO_CLEAN.filter(
resource => resource instanceof Instance
).map(instance => instance.delete())
Expand All @@ -116,7 +116,7 @@ describe('Spanner', () => {
* @see {@link https://cloud.google.com/spanner/quotas#administrative_limits}
*/
const limit = pLimit(5);
await Promise.all(
await Promise.all<google.protobuf.IEmpty>(
RESOURCES_TO_CLEAN.map(resource => limit(() => resource.delete()))
);
}
Expand Down Expand Up @@ -1151,6 +1151,23 @@ describe('Spanner', () => {
);
});

it('should list backups steaming', done => {
const backups: Backup[] = [];
instance
.getBackupsStream()
.on('error', assert.ifError)
.on('data', backup => {
backups.push(backup);
})
.on('end', () => {
assert.ok(backups.length > 0);
assert.ok(
backups.find(b => b.formattedName_ === backup1.formattedName_)
);
done();
});
});

it('should list backups with pagination', async () => {
const [page1, , resp1] = await instance.getBackups({
pageSize: 1,
Expand Down Expand Up @@ -1362,6 +1379,26 @@ describe('Spanner', () => {

await Promise.all(sessions.map(session => session.delete()));
});

it('should list sessions', async () => {
const [sessions] = await DATABASE.getSessions();
assert.ok(sessions.length > 0);
assert.ok(sessions.find(s => s.id === session.id));
});

it('should list sessions streaming', done => {
const sessions: Session[] = [];
DATABASE.getSessionsStream()
.on('error', assert.ifError)
.on('data', sessionObj => {
sessions.push(sessionObj);
})
.on('end', () => {
assert.ok(sessions.length > 0);
assert.ok(sessions.find(s => s.id === session.id));
done();
});
});
});

describe('Tables', () => {
Expand Down
35 changes: 29 additions & 6 deletions test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import * as extend from 'extend';
import {ApiError, util} from '@google-cloud/common';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';
import {Transform, Duplex} from 'stream';
import {Transform} from 'stream';
import * as through from 'through2';
import * as pfy from '@google-cloud/promisify';
import {grpc} from 'google-gax';
import * as db from '../src/database';
import {Instance} from '../src';
import {MockError} from './mockserver/mockspanner';
import {IOperation} from '../src/instance';
import duplexify = require('duplexify');

let promisified = false;
const fakePfy = extend({}, pfy, {
Expand Down Expand Up @@ -2020,7 +2021,7 @@ describe('Database', () => {
const OPTIONS = {
gaxOptions: {autoPaginate: false},
} as db.GetSessionsOptions;
const returnValue = {} as Duplex;
const returnValue = through.obj();

it('should make and return the correct gax API call', () => {
const expectedReqOpts = extend({}, OPTIONS, {
Expand All @@ -2040,7 +2041,7 @@ describe('Database', () => {
};

const returnedValue = database.getSessionsStream(OPTIONS);
assert.strictEqual(returnedValue, returnValue);
assert.ok(returnedValue instanceof duplexify);
});

it('should pass pageSize and pageToken from gaxOptions into reqOpts', () => {
Expand All @@ -2067,7 +2068,7 @@ describe('Database', () => {
};

const returnedValue = database.getSessionsStream(options);
assert.strictEqual(returnedValue, returnValue);
assert.ok(returnedValue instanceof duplexify);
});

it('pageSize and pageToken in options should take precedence over gaxOptions', () => {
Expand Down Expand Up @@ -2101,7 +2102,7 @@ describe('Database', () => {
};

const returnedValue = database.getSessionsStream(options);
assert.strictEqual(returnedValue, returnValue);
assert.ok(returnedValue instanceof duplexify);
});

it('should not require options', () => {
Expand All @@ -2116,7 +2117,29 @@ describe('Database', () => {
};

const returnedValue = database.getSessionsStream();
assert.strictEqual(returnedValue, returnValue);
assert.ok(returnedValue instanceof duplexify);
});

it('should create and return Session objects', done => {
const stream = through.obj();
database.requestStream = () => {
return stream;
};
const protoSession = {name: 'session'};
setImmediate(() => {
stream.push(protoSession);
stream.push(null);
});

database
.getSessionsStream()
.on('error', assert.ifError)
.on('data', session => {
assert.ok(session instanceof FakeSession);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
assert.strictEqual((session as any).metadata, protoSession);
})
.on('end', done);
});
});

Expand Down
Loading

0 comments on commit d4c8929

Please sign in to comment.