From 46ac928935f0730b76a0ad221a1167640340f75f Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Fri, 31 Jul 2015 11:12:39 -0400 Subject: [PATCH] datastore: support streaming gets --- lib/datastore/request.js | 102 +++++++++++++----- system-test/datastore.js | 27 +++++ test/datastore/request.js | 222 +++++++++++++++++++++++++++++--------- 3 files changed, 271 insertions(+), 80 deletions(-) diff --git a/lib/datastore/request.js b/lib/datastore/request.js index 59e8e84ba495..83e23dcab387 100644 --- a/lib/datastore/request.js +++ b/lib/datastore/request.js @@ -25,6 +25,7 @@ var request = require('request').defaults({ maxSockets: Infinity } }); +var through = require('through2'); /** * @type {module:datastore/entity} @@ -92,11 +93,13 @@ function DatastoreRequest() {} * transaction. Get operations require a valid key to retrieve the * key-identified entity from Datastore. * + * @throws {Error} If at least one Key object is not provided. + * * @param {Key|Key[]} keys - Datastore key object(s). * @param {function} callback - The callback function. * @param {?error} callback.err - An error returned while making this request * @param {module:datastore/entity|module:datastore/entity[]} callback.entity - - * Will return either a single Entity or a list of Entities + * Will return either a single Entity or a list of Entities. * @param {object} callback.apiResponse - The full API response. * * @example @@ -105,57 +108,98 @@ function DatastoreRequest() {} * // your use, whether that be a Dataset or Transaction object. * //- * + * //- * // Get a single entity. + * //- * var key = dataset.key(['Company', 123]); + * * transaction.get(key, function(err, entity, apiResponse) {}); * - * // Get multiple entities at once. - * transaction.get([ + * //- + * // Get multiple entities at once with a callback. + * //- + * var keys = [ * dataset.key(['Company', 123]), * dataset.key(['Product', 'Computer']) - * ], function(err, entities, apiResponse) {}); + * ]; + * + * transaction.get(keys, function(err, entities, apiResponse) {}); + * + * //- + * // Or, get the entities as a readable object stream. + * //- + * transaction.get(keys) + * .on('error', function(err, apiResponse) {}) + * .on('data', function(entity) { + * // entity is an entity object. + * }) + * .on('end', function() { + * // All entities retrieved. + * }); */ DatastoreRequest.prototype.get = function(keys, callback) { - var that = this; + var self = this; - var isMultipleRequest = Array.isArray(keys); - keys = isMultipleRequest ? keys : [keys]; + var isStreamMode = !callback; + var stream; - callback = callback || util.noop; + if (isStreamMode) { + stream = through.obj(); + } - var req = { - key: keys.map(entity.keyToKeyProto) + var isSingleLookup = !util.is(keys, 'array'); + keys = util.arrayize(keys).map(entity.keyToKeyProto); + + if (keys.length === 0) { + throw new Error('At least one Key object is required.'); + } + + var request = { + key: keys }; - this.makeReq_('lookup', req, function(err, resp) { + var entities = []; + this.makeReq_('lookup', request, onApiResponse); + + function onApiResponse(err, resp) { if (err) { - callback(err, null, resp); + if (isStreamMode) { + stream.emit('error', err, resp); + stream.end(); + } else { + callback(err, null, resp); + } return; } - var found = entity.formatArray(resp.found); - - if (isMultipleRequest && resp.deferred && resp.deferred.length) { - // There may be more results. Call `.get` again, and append the results. - that.get( - resp.deferred.map(entity.keyFromKeyProto), function(err, entities) { - if (err) { - callback(err, null, resp); - return; - } - - if (resp) { - found = (found || []).concat(entities); - } + var results = entity.formatArray(resp.found); - callback(null, found, resp); + if (isStreamMode) { + results.forEach(function(entity) { + stream.push(entity); }); + } else { + entities = entities.concat(results); + } + + var nextKeys = (resp.deferred || []).map(entity.keyFromKeyProto); + if (nextKeys.length > 0) { + self.get(nextKeys, onApiResponse); return; } - callback(null, isMultipleRequest ? found : found[0], resp); - }); + if (isStreamMode) { + stream.push(null); + stream.end(); + } else { + callback(null, isSingleLookup ? entities[0] : entities, resp); + } + } + + if (isStreamMode) { + return stream; + } }; /** diff --git a/system-test/datastore.js b/system-test/datastore.js index 6a0b3e548810..566c01c1f550 100644 --- a/system-test/datastore.js +++ b/system-test/datastore.js @@ -183,6 +183,33 @@ describe('datastore', function() { }); }); + it('should get multiple entities in a stream', function(done) { + var key1 = ds.key('Post'); + var key2 = ds.key('Post'); + + ds.save([ + { key: key1, data: post }, + { key: key2, data: post } + ], function(err) { + assert.ifError(err); + + var firstKey = ds.key(['Post', key1.path[1]]); + var secondKey = ds.key(['Post', key2.path[1]]); + + var numEntitiesEmitted = 0; + + ds.get([firstKey, secondKey]) + .on('error', done) + .on('data', function() { + numEntitiesEmitted++; + }) + .on('end', function() { + assert.strictEqual(numEntitiesEmitted, 2); + + ds.delete([firstKey, secondKey], done); + }); + }); + }); }); it('should save keys as a part of entity and query by key', function(done) { diff --git a/test/datastore/request.js b/test/datastore/request.js index a52eae8e7ade..26dbfd39a406 100644 --- a/test/datastore/request.js +++ b/test/datastore/request.js @@ -21,6 +21,8 @@ var assert = require('assert'); var ByteBuffer = require('bytebuffer'); var entity = require('../../lib/datastore/entity.js'); +var extend = require('extend'); +var isStreamEnded = require('is-stream-ended'); var mockery = require('mockery'); var mockRespGet = require('../testdata/response_get.json'); var pb = require('../../lib/datastore/pb.js'); @@ -56,6 +58,16 @@ pb.FakeMethodResponse = { } }; +var entityOverrides = {}; +var fakeEntity; +fakeEntity = Object.keys(entity).reduce(function(fakeEntity, methodName) { + fakeEntity[methodName] = function() { + var method = entityOverrides[methodName] || entity[methodName]; + return method.apply(this, arguments); + }; + return fakeEntity; +}, {}); + var extended = false; var fakeStreamRouter = { extend: function(Class, methods) { @@ -77,6 +89,7 @@ describe('Request', function() { var CUSTOM_ENDPOINT = 'http://localhost:8080'; before(function() { + mockery.registerMock('./entity.js', fakeEntity); mockery.registerMock('./pb.js', pb); mockery.registerMock('../common/stream-router.js', fakeStreamRouter); mockery.registerMock('request', fakeRequest); @@ -97,6 +110,7 @@ describe('Request', function() { namespace: 'namespace', path: ['Company', 123] }); + entityOverrides = {}; requestOverride = null; request = new Request(); request.apiEndpoint = CUSTOM_ENDPOINT; @@ -120,67 +134,173 @@ describe('Request', function() { }); describe('get', function() { - it('should get by key', function(done) { - request.makeReq_ = function(method, req, callback) { - assert.equal(method, 'lookup'); - assert.equal(req.key.length, 1); - callback(null, mockRespGet); - }; - request.get(key, function(err, entity) { - var data = entity.data; - assert.deepEqual(entity.key.path, ['Kind', 5732568548769792]); - assert.strictEqual(data.author, 'Silvano'); - assert.strictEqual(data.isDraft, false); - assert.deepEqual(data.publishedAt, new Date(978336000000)); - done(); - }); + beforeEach(function() { + request.makeReq_ = function() {}; }); - it('should return apiResponse in callback', function(done) { - request.makeReq_ = function(method, req, callback) { - callback(null, mockRespGet); - }; - request.get(key, function(err, entity, apiResponse) { - assert.ifError(err); - assert.deepEqual(mockRespGet, apiResponse); + it('should throw if no keys are provided', function() { + assert.throws(function() { + request.get(); + }, /At least one Key object is required/); + }); + + it('should return a stream if no callback is provided', function() { + assert(request.get(key) instanceof stream); + }); + + it('should convert key to key proto', function(done) { + entityOverrides.keyToKeyProto = function(key_) { + assert.strictEqual(key_, key); done(); - }); + }; + + request.get(key, assert.ifError); }); - it('should multi get by keys', function(done) { - request.makeReq_ = function(method, req, callback) { + it('should make correct request', function(done) { + request.makeReq_ = function(method, req) { assert.equal(method, 'lookup'); - assert.equal(req.key.length, 1); - callback(null, mockRespGet); - }; - request.get([key], function(err, entities) { - var entity = entities[0]; - var data = entity.data; - assert.deepEqual(entity.key.path, ['Kind', 5732568548769792]); - assert.strictEqual(data.author, 'Silvano'); - assert.strictEqual(data.isDraft, false); - assert.deepEqual(data.publishedAt, new Date(978336000000)); + assert.deepEqual(req.key[0], entity.keyToKeyProto(key)); + done(); + }; + + request.get(key, assert.ifError); + }); + + describe('error', function() { + var error = new Error('Error.'); + var apiResponse = { a: 'b', c: 'd' }; + + beforeEach(function() { + request.makeReq_ = function(method, req, callback) { + setImmediate(function() { + callback(error, apiResponse); + }); + }; + }); + + describe('callback mode', function() { + it('should execute callback with error & API response', function(done) { + request.get(key, function(err, entity, apiResponse_) { + assert.strictEqual(err, error); + assert.strictEqual(entity, null); + assert.strictEqual(apiResponse_, apiResponse); + done(); + }); + }); + }); + + describe('stream mode', function() { + it('should emit error & API response', function(done) { + request.get(key) + .on('error', function(err, apiResponse_) { + assert.strictEqual(err, error); + assert.strictEqual(apiResponse_, apiResponse); + + done(); + }); + }); + + it('should end stream', function(done) { + var stream = request.get(key); + + stream.on('error', function() { + setImmediate(function() { + assert.strictEqual(isStreamEnded(stream), true); + done(); + }); + }); + }); }); }); - it('should continue looking for deferred results', function(done) { - var lookupCount = 0; - request.makeReq_ = function(method, req, callback) { - lookupCount++; - assert.equal(method, 'lookup'); - if (mockRespGet.deferred.length) { - // Revert deferred to original state. - mockRespGet.deferred = []; - } else { - mockRespGet.deferred = [ entity.keyToKeyProto(key) ]; - } - callback(null, mockRespGet); - }; - request.get([key, key], function(err, entities) { - assert.equal(entities.length, 2); - assert.equal(lookupCount, 2); - done(); + describe('success', function() { + var apiResponse = extend(true, {}, mockRespGet); + var expectedResult = entity.formatArray(apiResponse.found)[0]; + + beforeEach(function() { + request.makeReq_ = function(method, req, callback) { + callback(null, apiResponse); + }; + }); + + it('should format the results', function(done) { + entityOverrides.formatArray = function(arr) { + assert.strictEqual(arr, apiResponse.found); + done(); + }; + + request.get(key, assert.ifError); + }); + + it('should continue looking for deferred results', function(done) { + var lookupCount = 0; + + request.makeReq_ = function(method, req, callback) { + lookupCount++; + + if (lookupCount === 1) { + // Add deferred results, hoping it will call makeReq_ again. + var apiResponseWithDeferred = extend(true, {}, apiResponse); + apiResponseWithDeferred.deferred = [ entity.keyToKeyProto(key) ]; + callback(null, apiResponseWithDeferred); + return; + } + + if (lookupCount > 1) { + done(); + return; + } + }; + + request.get(key, assert.ifError); + }); + + describe('callback mode', function() { + it('should exec callback with results & API response', function(done) { + request.get(key, function(err, entity, apiResponse_) { + assert.ifError(err); + + assert.deepEqual(entity, expectedResult); + assert.strictEqual(apiResponse_, apiResponse); + + done(); + }); + }); + + it('should exec callback w/ array from multiple keys', function(done) { + var apiResponseWithMultiEntities = extend(true, {}, apiResponse); + var entities = apiResponseWithMultiEntities.found; + entities.push(entities[0]); + + var expectedResults = entity.formatArray(entities); + + request.makeReq_ = function(method, req, callback) { + callback(null, apiResponseWithMultiEntities); + }; + + request.get([key, key], function(err, entities, apiResponse) { + assert.ifError(err); + + assert.strictEqual(util.is(entities, 'array'), true); + assert.deepEqual(entities, expectedResults); + + assert.strictEqual(apiResponse, apiResponseWithMultiEntities); + done(); + }); + }); + }); + + describe('stream mode', function() { + it('should push results to the stream', function(done) { + request.get(key) + .on('error', done) + .on('data', function(entity) { + assert.deepEqual(entity, expectedResult); + }) + .on('end', done); + }); }); }); });