Skip to content

Commit

Permalink
Add async queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Ace Nassri committed Aug 12, 2016
1 parent d701672 commit 63f413a
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 127 deletions.
75 changes: 66 additions & 9 deletions bigquery/sync_query.js → bigquery/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ var gcloud = require('gcloud');
var bigquery = gcloud.bigquery();
// [END auth]

// [START query]
// [START sync_query]
/**
* Run an example synchronous query.
* @param {object} queryObj The BigQuery query to run, plus any additional options
* listed at https://cloud.google.com/bigquery/docs/reference/v2/jobs/query
* @param {function} callback Callback function.
* @param {function} callback Callback function to receive query results.
*/
function syncQuery (queryObj, callback) {
if (!queryObj || !queryObj.query) {
Expand All @@ -50,16 +50,66 @@ function syncQuery (queryObj, callback) {
return callback(err);
}

console.log('Found %d rows!', rows.length);
console.log('SyncQuery: found %d rows!', rows.length);
return callback(null, rows);
});
}
// [END query]
// [END sync_query]

// [START async_query]
/**
* Run an example asynchronous query.
* @param {object} queryObj The BigQuery query to run, plus any additional options
* listed at https://cloud.google.com/bigquery/docs/reference/v2/jobs/query
* @param {function} callback Callback function to receive job data.
*/
function asyncQuery (queryObj, callback) {
if (!queryObj || !queryObj.query) {
return callback(Error('queryObj must be an object with a "query" parameter'));
}

bigquery.startQuery(queryObj, function (err, job) {
if (err) {
return callback(err);
}

console.log('AsyncQuery: submitted job %s!', job.id);
return callback(null, job);
});
}

/**
* Poll an asynchronous query job for results.
* @param {object} jobId The ID of the BigQuery job to poll.
* @param {function} callback Callback function to receive job data.
*/
function asyncPoll (jobId, callback) {
if (!jobId) {
return callback(Error('"jobId" is required!'));
}

bigquery.job(jobId).getQueryResults(function (err, rows) {
if (err) {
return callback(err);
}

console.log('AsyncQuery: polled job %s; got %d rows!', jobId, rows.length);
return callback(null, rows);
});
}
// [END async_query]

// [START usage]
function printUsage () {
console.log('Usage: node sync_query QUERY');
console.log('Usage:');
console.log('\nCommands:\n');
console.log('\tnode query sync-query QUERY');
console.log('\tnode query async-query QUERY');
console.log('\tnode query poll JOB_ID');
console.log('\nExamples:\n');
console.log('\tnode sync_query "SELECT * FROM publicdata:samples.natality LIMIT 5;"');
console.log('\tnode query sync-query "SELECT * FROM publicdata:samples.natality LIMIT 5;"');
console.log('\tnode query async-query "SELECT * FROM publicdata:samples.natality LIMIT 5;"');
console.log('\tnode query poll 12345"');
}
// [END usage]

Expand All @@ -69,14 +119,21 @@ var program = {
printUsage: printUsage,

// Exports
asyncQuery: asyncQuery,
asyncPoll: asyncPoll,
syncQuery: syncQuery,
bigquery: bigquery,

// Run the sample
main: function (args, cb) {
if (args.length === 1 && !(args[0] === '-h' || args[0] === '--help')) {
var queryObj = { query: args[0], timeoutMs: 10000 };
this.syncQuery(queryObj, cb);
var command = args.shift();
var arg = args.shift();
if (command === 'sync-query') {
this.syncQuery({ query: arg, timeoutMs: 10000 }, cb);
} else if (command === 'async-query') {
this.asyncQuery({ query: arg }, cb);
} else if (command === 'poll') {
this.asyncPoll(arg, cb);
} else {
this.printUsage();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

var example = require('../sync_query');

describe('bigquery:sync_query', function () {
describe('bigquery:query', function () {
describe('sync_query', function () {
it('should fetch data given a query', function (done) {
example.syncQuery(
Expand All @@ -30,4 +30,24 @@ describe('bigquery:sync_query', function () {
);
});
});

describe('async_query', function () {
it('should submit a job and fetch its results', function (done) {
example.asyncQuery(
{ query: 'SELECT * FROM publicdata:samples.natality LIMIT 5;' },
function (err, job) {
assert.ifError(err);
setTimeout(function () {
example.asyncPoll(job.id, function (err, data) {
assert.ifError(err);
assert.notEqual(data, null);
assert(Array.isArray(data));
assert(data.length === 5);
done();
});
}, 5000);
}
);
});
});
});
214 changes: 214 additions & 0 deletions bigquery/test/query.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2016, Google, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

var proxyquire = require('proxyquire').noCallThru();

function getSample () {
var natalityMock = [
{ year: '2001' },
{ year: '2002' },
{ year: '2003' },
{ year: '2004' },
{ year: '2005' }
];

var jobId = 'abc';

var jobMock = {
id: jobId,
getQueryResults: sinon.stub().callsArgWith(0, null, natalityMock)
};

var bigqueryMock = {
job: sinon.stub().returns(jobMock),
startQuery: sinon.stub().callsArgWith(1, null, jobMock),
query: sinon.stub().callsArgWith(1, null, natalityMock)
};
var gcloudMock = {
bigquery: sinon.stub().returns(bigqueryMock)
};
return {
program: proxyquire('../sync_query', {
gcloud: gcloudMock
}),
mocks: {
gcloud: gcloudMock,
bigquery: bigqueryMock,
natality: natalityMock,
job: jobMock
},
jobId: jobId
};
}

describe('bigquery:query', function () {
describe('main', function () {
it('should show usage based on arguments', function () {
var program = getSample().program;
sinon.stub(program, 'printUsage');

program.main([]);
assert(program.printUsage.calledOnce);

program.main(['-h']);
assert(program.printUsage.calledTwice);

program.main(['--help']);
assert(program.printUsage.calledThrice);
});

it('should run the correct commands', function () {
var program = getSample().program;
sinon.stub(program, 'syncQuery');
sinon.stub(program, 'asyncQuery');
sinon.stub(program, 'asyncPoll');

program.main(['sync-query']);
assert(program.syncQuery.calledOnce);

program.main(['async-query']);
assert(program.asyncQuery.calledOnce);

program.main(['poll']);
assert(program.asyncPoll.calledOnce);
});

it('should execute queries', function () {
var example = getSample();
sinon.stub(example.program, 'syncQuery');

example.program.main(['foo'], function (err, data) {
assert.ifError(err);
assert(example.program.syncQuery.calledWith({ query: 'foo' }));
assert.deepEqual(data, example.mocks.natality);
});
});
});

describe('syncQuery', function () {
var queryObj = { query: 'foo' };

it('should return results', function () {
var example = getSample();
example.program.syncQuery(queryObj,
function (err, data) {
assert.ifError(err);
assert(example.mocks.bigquery.query.calledWith(queryObj));
assert.deepEqual(data, example.mocks.natality);
assert(console.log.calledWith('SyncQuery: found %d rows!', data.length));
}
);
});

it('should require a query', function () {
var example = getSample();
example.program.syncQuery({}, function (err, data) {
assert.deepEqual(err, Error('queryObj must be an object with a "query" parameter'));
assert.equal(data, undefined);
});
});

it('should handle error', function () {
var error = Error('syncQueryError');
var example = getSample();
example.mocks.bigquery.query = sinon.stub().callsArgWith(1, error);
example.program.syncQuery(queryObj, function (err, data) {
assert.deepEqual(err, error);
assert.equal(data, undefined);
});
});
});

describe('asyncQuery', function () {
var queryObj = { query: 'foo' };

it('should submit a job', function () {
var example = getSample();
example.program.asyncQuery(queryObj,
function (err, job) {
assert.ifError(err);
assert(example.mocks.bigquery.startQuery.calledWith(queryObj));
assert.deepEqual(example.mocks.job, job);
assert(console.log.calledWith('AsyncQuery: submitted job %s!', example.jobId));
}
);
});

it('should require a query', function () {
var example = getSample();
example.program.asyncQuery({}, function (err, job) {
assert.deepEqual(err, Error('queryObj must be an object with a "query" parameter'));
assert.equal(job, undefined);
});
});

it('should handle error', function () {
var error = Error('asyncQueryError');
var example = getSample();
example.mocks.bigquery.startQuery = sinon.stub().callsArgWith(1, error);
example.program.asyncQuery(queryObj, function (err, job) {
assert.deepEqual(err, error);
assert.equal(job, undefined);
});
});
});

describe('asyncPoll', function () {
it('should get the results of a job given its ID', function () {
var example = getSample();
example.program.asyncPoll(example.jobId,
function (err, job) {
assert.ifError(err);
assert(example.mocks.job.getQueryResults.called);
assert(console.log.calledWith('AsyncQuery: polled job %s; got %d rows!', example.jobId));
}
);
});

it('should require a job ID', function () {
var example = getSample();
example.program.asyncPoll(null, function (err, job) {
assert.deepEqual(err, Error('"jobId" is required!'));
assert.equal(job, undefined);
});
});

it('should handle error', function () {
var error = Error('asyncPollError');
var example = getSample();
example.mocks.job.getQueryResults = sinon.stub().callsArgWith(0, error);
example.program.asyncPoll(example.jobId, function (err, job) {
assert.deepEqual(err, error);
assert.equal(job, undefined);
});
});
});

describe('printUsage', function () {
it('should print usage', function () {
var program = getSample().program;
program.printUsage();
assert(console.log.calledWith('Usage:'));
assert(console.log.calledWith('\nCommands:\n'));
assert(console.log.calledWith('\tnode query sync-query QUERY'));
assert(console.log.calledWith('\tnode query async-query QUERY'));
assert(console.log.calledWith('\tnode query poll JOB_ID'));
assert(console.log.calledWith('\nExamples:\n'));
assert(console.log.calledWith('\tnode query sync-query "SELECT * FROM publicdata:samples.natality LIMIT 5;"'));
assert(console.log.calledWith('\tnode query async-query "SELECT * FROM publicdata:samples.natality LIMIT 5;"'));
assert(console.log.calledWith('\tnode query poll 12345"'));
});
});
});
Loading

0 comments on commit 63f413a

Please sign in to comment.