Skip to content

Commit

Permalink
datastore: add streamrouter to request.get
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Jul 30, 2015
1 parent f57d3b1 commit f872342
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 43 deletions.
118 changes: 85 additions & 33 deletions lib/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

'use strict';

var extend = require('extend');
var request = require('request').defaults({
pool: {
maxSockets: Infinity
Expand Down Expand Up @@ -92,7 +93,13 @@ function DatastoreRequest() {}
* transaction. Get operations require a valid key to retrieve the
* key-identified entity from Datastore.
*
* @param {Key|Key[]} keys - Datastore key object(s).
* @throws {Error} If at least one Key object is not provided.
*
* @param {object|Key|Key[]} options - Configuration object or Datastore key
* object(s).
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: true.
* @param {Key|Key[]} options.keys - Datastore key objects.
* @param {function} callback - The callback function.
* @param {?error} callback.err - An error returned while making this request
* @param {module:datastore/entity|module:datastore/entity[]} callback.entity -
Expand All @@ -105,56 +112,101 @@ function DatastoreRequest() {}
* // your use, whether that be a Dataset or Transaction object.
* //-
*
* //-
* // Get a single entity.
* //-
* var key = dataset.key(['Company', 123]);
* transaction.get(key, function(err, entity, apiResponse) {});
*
* transaction.get(key, function(err, entities) {
* if (!err) {
* var entity = entities[0];
* }
* });
*
* //-
* // Get multiple entities at once.
* //-
* transaction.get([
* dataset.key(['Company', 123]),
* dataset.key(['Product', 'Computer'])
* ], function(err, entities, apiResponse) {});
* ], function(err, entities) {});
*
* //-
* // To control how many API requests are made and page through the results
* // manually, set `autoPaginate` to `false`.
* //-
* function onApiResponse(err, entities, nextQuery, apiResponse) {
* if (err) {
* console.error(err);
* return;
* }
*
* // `entities` is an array of results.
*
* if (nextQuery) {
* transaction.get(nextQuery, onApiResponse);
* }
* }
*
* var options = {
* keys: [
* dataset.key(['Company', 123]),
* // ...
* ],
* autoPaginate: false
* };
*
* transaction.get(options, onApiResponse);
*
* //-
* // Get the entities as a readable object stream.
* //-
* var keys = [
* dataset.key(['Company', 123]),
* // ...
* ];
*
* transaction.get(keys)
* .on('error', console.error)
* .on('data', function(entity) {
* // entity is an entity object.
* })
* .on('end', function() {
* // All entities retrieved.
* });
*/
DatastoreRequest.prototype.get = function(keys, callback) {
var that = this;

var isMultipleRequest = Array.isArray(keys);
keys = isMultipleRequest ? keys : [keys];
DatastoreRequest.prototype.get = function(options, callback) {
if (options instanceof entity.Key || util.is(options, 'array')) {
options = {
keys: util.arrayize(options)
};
}

callback = callback || util.noop;
if (!options.keys) {
throw new Error('At least one Key object is required.');
}

var req = {
key: keys.map(entity.keyToKeyProto)
var request = {
key: options.keys.map(entity.keyToKeyProto)
};

this.makeReq_('lookup', req, function(err, resp) {
this.makeReq_('lookup', request, function(err, resp) {
if (err) {
callback(err, null, resp);
callback(err, null, null, resp);
return;
}

var found = entity.formatArray(resp.found);
var entities = entity.formatArray(resp.found);

if (isMultipleRequest && resp.deferred && resp.deferred.length) {
// There may be more results. Call `.get` again, and append the results.
that.get(
resp.deferred.map(entity.keyFromKeyProto), function(err, entities) {
if (err) {
callback(err, null, resp);
return;
}

if (resp) {
found = (found || []).concat(entities);
}

callback(null, found, resp);
});
var nextQuery = null;
var nextKeys = (resp.deferred || []).map(entity.keyFromKeyProto);

return;
if (nextKeys.length > 0) {
nextQuery = extend(true, {}, options);
nextQuery.keys = nextQuery.keys.concat(nextKeys);
}

callback(null, isMultipleRequest ? found : found[0], resp);
callback(null, entities, nextQuery, resp);
});
};

Expand Down Expand Up @@ -764,9 +816,9 @@ DatastoreRequest.prototype.makeReq_ = function(method, body, callback) {

/*! Developer Documentation
*
* This method can be used with either a callback or as a readable object
* These methods can be used with either a callback or as a readable object
* stream. `streamRouter` is used to add this dual behavior.
*/
streamRouter.extend(DatastoreRequest, 'runQuery');
streamRouter.extend(DatastoreRequest, ['get', 'runQuery']);

module.exports = DatastoreRequest;
31 changes: 21 additions & 10 deletions system-test/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ describe('datastore', function() {
ds.save({ key: postKey, data: post }, function(err) {
assert.ifError(err);

ds.get(postKey, function(err, entity) {
ds.get(postKey, function(err, entities) {
assert.ifError(err);

var entity = entities[0];
assert.deepEqual(entity.data, post);

ds.delete(postKey, done);
Expand All @@ -68,9 +69,10 @@ describe('datastore', function() {
ds.save({ key: postKey, data: post }, function(err) {
assert.ifError(err);

ds.get(postKey, function(err, entity) {
ds.get(postKey, function(err, entities) {
assert.ifError(err);

var entity = entities[0];
assert.deepEqual(entity.data, post);

ds.delete(postKey, done);
Expand All @@ -90,9 +92,10 @@ describe('datastore', function() {
var assignedId = postKey.path[1];
assert(assignedId);

ds.get(postKey, function(err, entity) {
ds.get(postKey, function(err, entities) {
assert.ifError(err);

var entity = entities[0];
assert.deepEqual(entity.data, data);

ds.delete(ds.key(['Post', assignedId]), done);
Expand All @@ -109,9 +112,10 @@ describe('datastore', function() {
// The key's path should now be complete.
assert(postKey.path[1]);

ds.get(postKey, function(err, entity) {
ds.get(postKey, function(err, entities) {
assert.ifError(err);

var entity = entities[0];
assert.deepEqual(entity.data, post);

ds.delete(postKey, done);
Expand All @@ -131,9 +135,10 @@ describe('datastore', function() {
ds.save({ key: postKey, method: 'insert', data: post }, function(err) {
assert.notEqual(err, null); // should fail insert

ds.get(postKey, function(err, entity) {
ds.get(postKey, function(err, entities) {
assert.ifError(err);

var entity = entities[0];
assert.deepEqual(entity.data, post);

ds.delete(postKey, done);
Expand Down Expand Up @@ -505,9 +510,10 @@ describe('datastore', function() {
}, function(err) {
assert.ifError(err);

ds.get(key, function(err, entity) {
ds.get(key, function(err, entities) {
assert.ifError(err);

var entity = entities[0];
assert.deepEqual(entity.data, obj);

ds.delete(key, done);
Expand Down Expand Up @@ -544,17 +550,22 @@ describe('datastore', function() {
async.parallel([
// The key queued for deletion should have been deleted.
function(done) {
ds.get(deleteKey, function(err, entity) {
ds.get(deleteKey, function(err, entities) {
assert.ifError(err);

var entity = entities[0];
assert.equal(typeof entity, 'undefined');

done();
});
},

// Data should have been updated on the key.
function(done) {
ds.get(key, function(err, entity) {
ds.get(key, function(err, entities) {
assert.ifError(err);

var entity = entities[0];
assert.equal(entity.data.rating, 10);
done();
});
Expand Down Expand Up @@ -586,9 +597,9 @@ describe('datastore', function() {
assert.ifError(err);

// Should not return a result.
ds.get(key, function(err, entity) {
ds.get(key, function(err, entities) {
assert.ifError(err);
assert.strictEqual(entity, undefined);
assert.strictEqual(entities.length, 0);

// Incomplete key should have been given an id.
assert.equal(incompleteKey.path.length, 2);
Expand Down

0 comments on commit f872342

Please sign in to comment.