Skip to content

Commit

Permalink
streamrouter: support autoPaginate: true
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Jul 14, 2015
1 parent af8299a commit 88fd2fa
Show file tree
Hide file tree
Showing 18 changed files with 659 additions and 105 deletions.
40 changes: 37 additions & 3 deletions lib/bigquery/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ BigQuery.prototype.dataset = function(id) {
*
* @param {object=} query - Configuration object.
* @param {boolean} query.all - List all datasets, including hidden ones.
* @param {boolean} query.autoPaginate - Have pagination handled automatically.
* Default: false.
* @param {number} query.maxResults - Maximum number of results to return.
* @param {string} query.pageToken - Token returned from a previous call, to
* request the next page of results.
Expand All @@ -178,6 +180,16 @@ BigQuery.prototype.dataset = function(id) {
* bigquery.getDatasets(callback);
*
* //-
* // To have pagination handled for you, set `autoPaginate`. Note the changed
* // callback parameters.
* //-
* bigquery.getDatasets({
* autoPaginate: true
* }, function(err, datasets) {
* // Called after all datasets have been retrieved.
* });
*
* //-
* // Get the datasets from your project as a readable object stream.
* //-
* bigquery.getDatasets()
Expand Down Expand Up @@ -238,6 +250,8 @@ BigQuery.prototype.getDatasets = function(query, callback) {
* @param {object=} options - Configuration object.
* @param {boolean=} options.allUsers - Display jobs owned by all users in the
* project.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: false.
* @param {number=} options.maxResults - Maximum number of results to return.
* @param {string=} options.pageToken - Token returned from a previous call, to
* request the next page of results.
Expand All @@ -254,6 +268,15 @@ BigQuery.prototype.getDatasets = function(query, callback) {
* });
*
* //-
* // To have pagination handled for you, set `autoPaginate`. Note the changed
* // callback parameters.
* //-
* bigquery.getJobs({
* autoPaginate: true
* }, function(err, jobs) {
* // Called after all jobs have been retrieved.
* });
* //-
* // Get the jobs from your project as a readable object stream.
* //-
* bigquery.getJobs()
Expand Down Expand Up @@ -336,16 +359,16 @@ BigQuery.prototype.job = function(id) {
* queries for you, pushing each row to the stream.
*
* @param {string|object} options - A string SQL query or configuration object.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: false.
* @param {number} options.maxResults - Maximum number of results to read.
* @param {string} options.query - A query string, following the BigQuery query
* syntax, of the query to execute.
* @param {number} options.timeoutMs - How long to wait for the query to
* complete, in milliseconds, before returning. Default is to return
* immediately. If the timeout passes before the job completes, the request
* will fail with a `TIMEOUT` error.
* @param {function=} callback - The callback function. If you intend to
* continuously run this query until all results are in as part of a stream,
* do not pass a callback.
* @param {function=} callback - The callback function.
*
* @example
* var query = 'SELECT url FROM [publicdata:samples.github_nested] LIMIT 100';
Expand All @@ -364,6 +387,17 @@ BigQuery.prototype.job = function(id) {
* bigquery.query(query, callback);
*
* //-
* // To have pagination handled for you, set `autoPaginate`. Note the changed
* // callback parameters.
* //-
* bigquery.query({
* query: query,
* autoPaginate: true
* }, function(err, rows) {
* // Called after all rows have been retrieved.
* });
*
* //-
* // You can also use the `query` method as a readable object stream by
* // omitting the callback.
* //-
Expand Down
27 changes: 22 additions & 5 deletions lib/bigquery/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ Job.prototype.getMetadata = function(callback) {
* Get the results of a job.
*
* @param {object=} options - Configuration object.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: false.
* @param {number} options.maxResults - Maximum number of results to read.
* @param {string} options.pageToken - Page token, returned by a previous call,
* to request the next page of results. Note: This is automatically added to
Expand All @@ -105,27 +107,42 @@ Job.prototype.getMetadata = function(callback) {
* do not pass a callback.
*
* @example
* var callback = function(err, rows, nextQuery, apiResponse) {
* if (nextQuery) {
* // More results exist.
* job.getQueryResults(nextQuery, callback);
* }
* };
*
* //-
* // Use the default options to get the results of a query.
* //-
* job.getQueryResults(function(err, rows, nextQuery, apiResponse) {});
* job.getQueryResults(callback);
*
* //-
* // Customize the results you want to fetch.
* //-
* var options = {
* job.getQueryResults({
* maxResults: 100
* };
* }, callback);
*
* job.getQueryResults(options, function(err, rows, nextQuery, apiResponse) {});
* //-
* // To have pagination handled for you, set `autoPaginate`. Note the changed
* // callback parameters.
* //-
* job.getQueryResults({
* autoPaginate: true
* }, function(err, rows) {
* // Called after all rows have been retrieved.
* });
*
* //-
* // Consume the results from the query as a readable object stream.
* //-
* var through2 = require('through2');
* var fs = require('fs');
*
* job.getQueryResults(options)
* job.getQueryResults()
* .pipe(through2.obj(function (row, enc, next) {
* this.push(JSON.stringify(row) + '\n');
* }))
Expand Down
12 changes: 12 additions & 0 deletions lib/bigquery/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,8 @@ Table.prototype.getMetadata = function(callback) {
* your callback as an array of objects matching your table's schema.
*
* @param {object=} options - The configuration object.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: false.
* @param {number} options.maxResults - Maximum number of results to return.
* @param {function} callback - The callback function.
*
Expand All @@ -501,6 +503,16 @@ Table.prototype.getMetadata = function(callback) {
* table.getRows(options, callback);
*
* //-
* // To have pagination handled for you, set `autoPaginate`. Note the changed
* // callback parameters.
* //-
* table.getRows({
* autoPaginate: true
* }, function(err, rows) {
* // Called after all rows have been retrieved.
* });
*
* //-
* // Get the rows as a readable object stream.
* //-
* table.getRows(options)
Expand Down
119 changes: 82 additions & 37 deletions lib/common/stream-router.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

'use strict';

var concat = require('concat-stream');
var streamEvents = require('stream-events');
var through = require('through2');

Expand Down Expand Up @@ -60,36 +61,100 @@ streamRouter.extend = function(Class, methodNames) {
var originalMethod = Class.prototype[methodName];

Class.prototype[methodName] = function() {
return streamRouter.router_(arguments, originalMethod.bind(this));
var parsedArguments = streamRouter.parseArguments_(arguments);
return streamRouter.router_(parsedArguments, originalMethod.bind(this));
};
});
};

/**
* The router accepts all incoming arguments to the overwritten method. If the
* last argument is a function, simply pass them through to the original method.
* If the last argument is not a function, activate stream mode.
* Parse a pseudo-array `arguments` for a query and callback.
*
* Stream mode simply calls the nextQuery recursively. The stream ends when
* `nextQuery` is null.
* @param {array} args - The original `arguments` pseduo-array that the original
* method received.
*/
streamRouter.parseArguments_ = function(args) {
var parsedArguments = {};

var firstArgument = args[0];
var lastArgument = args[args.length - 1];

if (util.is(firstArgument, 'function')) {
parsedArguments.callback = firstArgument;
} else {
parsedArguments.query = firstArgument;
}

if (util.is(lastArgument, 'function')) {
parsedArguments.callback = lastArgument;
}

return parsedArguments;
};

/**
* The router accepts a query and callback that were passed to the overwritten
* method. If there's a callback, simply pass the query and/or callback through
* to the original method. If there isn't a callback. stream mode is activated.
*
* @param {array} args - The original `arguments` pseudo-array as it was
* received by the original method.
* @param {array} parsedArguments - Parsed arguments from the original method
* call.
* @param {object=|string=} parsedArguments.query - Query object. This is most
* commonly an object, but to make the API more simple, it can also be a
* string in some places.
* @param {function=} parsedArguments.callback - Callback function.
* @param {function} originalMethod - The cached method that accepts a callback
* and returns `nextQuery` to receive more results.
* @return {undefined|stream}
*/
streamRouter.router_ = function(args, originalMethod) {
args = util.toArray(args);
streamRouter.router_ = function(parsedArguments, originalMethod) {
var query = parsedArguments.query || {};
var callback = parsedArguments.callback;

if (callback) {
if (query.autoPaginate === true || query.autoPaginateVal === true) {
delete query.autoPaginate;
delete query.autoPaginateVal;

this.runAsStream_(query, originalMethod)
.on('error', callback)
.pipe(concat(function(results) {
callback(null, results);
}));
} else {
originalMethod(query, callback);
}
} else {
return this.runAsStream_(query, originalMethod);
}
};

var firstArgument = args[0];
var lastArgument = args[args.length - 1];
/**
* This method simply calls the nextQuery recursively, emitting results to a
* stream. The stream ends when `nextQuery` is null.
*
* `maxResults` and `limitVal` (from Datastore) will act as a cap for how many
* results are fetched and emitted to the stream.
*
* @param {object=|string=} query - Query object. This is most
* commonly an object, but to make the API more simple, it can also be a
* string in some places.
* @param {function} originalMethod - The cached method that accepts a callback
* and returns `nextQuery` to receive more results.
* @return {stream} - Readable object stream.
*/
streamRouter.runAsStream_ = function(query, originalMethod) {
query = query || {};

var isStreamMode = !util.is(lastArgument, 'function');
var resultsToSend = -1;

if (!isStreamMode) {
originalMethod.apply(null, args);
return;
// Check if the user only asked for a certain amount of results.
if (util.is(query.maxResults, 'number')) {
// `maxResults` is used API-wide.
resultsToSend = query.maxResults;
} else if (util.is(query.limitVal, 'number')) {
// `limitVal` is part of a Datastore query.
resultsToSend = query.limitVal;
}

var stream = streamEvents(through.obj());
Expand All @@ -106,19 +171,6 @@ streamRouter.router_ = function(args, originalMethod) {
_end.apply(this, arguments);
};

var resultsToSend = -1;
if (util.is(firstArgument, 'object')) {
// `firstArgument` is a query object. Check if the user only asked for a
// certain amount of results.
if (util.is(firstArgument.maxResults, 'number')) {
// `maxResults` is used API-wide.
resultsToSend = firstArgument.maxResults;
} else if (util.is(firstArgument.limitVal, 'number')) {
// `limitVal` is part of a Datastore query.
resultsToSend = firstArgument.limitVal;
}
}

function onResultSet(err, results, nextQuery) {
if (err) {
stream.emit('error', err);
Expand Down Expand Up @@ -149,14 +201,7 @@ streamRouter.router_ = function(args, originalMethod) {
}

stream.once('reading', function() {
if (util.is(lastArgument, 'undefined')) {
// Replace it with onResultSet.
args.splice(args.length - 1, 1, onResultSet);
} else {
args = args.concat(onResultSet);
}

originalMethod.apply(null, args);
originalMethod.call(null, query, onResultSet);
});

return stream;
Expand Down
10 changes: 10 additions & 0 deletions lib/datastore/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,22 @@ function Query(namespace, kinds) {
this.selectVal = [];

// pagination
this.autoPaginateVal = false;
this.startVal = null;
this.endVal = null;
this.limitVal = -1;
this.offsetVal = -1;
}

/**
* @return {module:datastore/query}
*/
Query.prototype.autoPaginate = function(autoPaginateVal) {
var query = extend(new Query(), this);
query.autoPaginateVal = autoPaginateVal !== false;
return query;
};

/**
* Datastore allows querying on properties. Supported comparison operators
* are `=`, `<`, `>`, `<=`, and `>=`. "Not equal" and `IN` operators are
Expand Down
11 changes: 11 additions & 0 deletions lib/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,17 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
* transaction.runQuery(query, callback);
*
* //-
* // To have pagination handled for you, call `autoPaginate()`. Note the
* // changed callback parameters.
* //-
*
* var queryWithAutoPagination = dataset.createQuery('Lion').autoPaginate();
*
* transaction.runQuery(queryWithAutoPagination, function(err, entities) {
* // Called after all entities have been retrieved.
* });
*
* //-
* // If you omit the callback, runQuery will automatically call subsequent
* // queries until no results remain. Entity objects will be pushed as they are
* // found.
Expand Down
Loading

0 comments on commit 88fd2fa

Please sign in to comment.