Skip to content

Commit

Permalink
datastore: support streaming gets
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Jul 31, 2015
1 parent b18e5e2 commit 46ac928
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 80 deletions.
102 changes: 73 additions & 29 deletions lib/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var request = require('request').defaults({
maxSockets: Infinity
}
});
var through = require('through2');

/**
* @type {module:datastore/entity}
Expand Down Expand Up @@ -92,11 +93,13 @@ function DatastoreRequest() {}
* transaction. Get operations require a valid key to retrieve the
* key-identified entity from Datastore.
*
* @throws {Error} If at least one Key object is not provided.
*
* @param {Key|Key[]} keys - Datastore key object(s).
* @param {function} callback - The callback function.
* @param {?error} callback.err - An error returned while making this request
* @param {module:datastore/entity|module:datastore/entity[]} callback.entity -
* Will return either a single Entity or a list of Entities
* Will return either a single Entity or a list of Entities.
* @param {object} callback.apiResponse - The full API response.
*
* @example
Expand All @@ -105,57 +108,98 @@ function DatastoreRequest() {}
* // your use, whether that be a Dataset or Transaction object.
* //-
*
* //-
* // Get a single entity.
* //-
* var key = dataset.key(['Company', 123]);
*
* transaction.get(key, function(err, entity, apiResponse) {});
*
* // Get multiple entities at once.
* transaction.get([
* //-
* // Get multiple entities at once with a callback.
* //-
* var keys = [
* dataset.key(['Company', 123]),
* dataset.key(['Product', 'Computer'])
* ], function(err, entities, apiResponse) {});
* ];
*
* transaction.get(keys, function(err, entities, apiResponse) {});
*
* //-
* // Or, get the entities as a readable object stream.
* //-
* transaction.get(keys)
* .on('error', function(err, apiResponse) {})
* .on('data', function(entity) {
* // entity is an entity object.
* })
* .on('end', function() {
* // All entities retrieved.
* });
*/
DatastoreRequest.prototype.get = function(keys, callback) {
var that = this;
var self = this;

var isMultipleRequest = Array.isArray(keys);
keys = isMultipleRequest ? keys : [keys];
var isStreamMode = !callback;
var stream;

callback = callback || util.noop;
if (isStreamMode) {
stream = through.obj();
}

var req = {
key: keys.map(entity.keyToKeyProto)
var isSingleLookup = !util.is(keys, 'array');
keys = util.arrayize(keys).map(entity.keyToKeyProto);

if (keys.length === 0) {
throw new Error('At least one Key object is required.');
}

var request = {
key: keys
};

this.makeReq_('lookup', req, function(err, resp) {
var entities = [];
this.makeReq_('lookup', request, onApiResponse);

function onApiResponse(err, resp) {
if (err) {
callback(err, null, resp);
if (isStreamMode) {
stream.emit('error', err, resp);
stream.end();
} else {
callback(err, null, resp);
}
return;
}

var found = entity.formatArray(resp.found);

if (isMultipleRequest && resp.deferred && resp.deferred.length) {
// There may be more results. Call `.get` again, and append the results.
that.get(
resp.deferred.map(entity.keyFromKeyProto), function(err, entities) {
if (err) {
callback(err, null, resp);
return;
}

if (resp) {
found = (found || []).concat(entities);
}
var results = entity.formatArray(resp.found);

callback(null, found, resp);
if (isStreamMode) {
results.forEach(function(entity) {
stream.push(entity);
});
} else {
entities = entities.concat(results);
}

var nextKeys = (resp.deferred || []).map(entity.keyFromKeyProto);

if (nextKeys.length > 0) {
self.get(nextKeys, onApiResponse);
return;
}

callback(null, isMultipleRequest ? found : found[0], resp);
});
if (isStreamMode) {
stream.push(null);
stream.end();
} else {
callback(null, isSingleLookup ? entities[0] : entities, resp);
}
}

if (isStreamMode) {
return stream;
}
};

/**
Expand Down
27 changes: 27 additions & 0 deletions system-test/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,33 @@ describe('datastore', function() {
});
});

it('should get multiple entities in a stream', function(done) {
var key1 = ds.key('Post');
var key2 = ds.key('Post');

ds.save([
{ key: key1, data: post },
{ key: key2, data: post }
], function(err) {
assert.ifError(err);

var firstKey = ds.key(['Post', key1.path[1]]);
var secondKey = ds.key(['Post', key2.path[1]]);

var numEntitiesEmitted = 0;

ds.get([firstKey, secondKey])
.on('error', done)
.on('data', function() {
numEntitiesEmitted++;
})
.on('end', function() {
assert.strictEqual(numEntitiesEmitted, 2);

ds.delete([firstKey, secondKey], done);
});
});
});
});

it('should save keys as a part of entity and query by key', function(done) {
Expand Down
Loading

0 comments on commit 46ac928

Please sign in to comment.