From 63f413a13ff753e0df200c6a9cd868021138cb27 Mon Sep 17 00:00:00 2001 From: Ace Nassri Date: Fri, 12 Aug 2016 12:35:34 -0700 Subject: [PATCH] Add async queries --- bigquery/{sync_query.js => query.js} | 75 +++++- .../{sync_query.test.js => query.test.js} | 22 +- bigquery/test/query.test.js | 214 ++++++++++++++++++ bigquery/test/sync_query.test.js | 117 ---------- 4 files changed, 301 insertions(+), 127 deletions(-) rename bigquery/{sync_query.js => query.js} (50%) rename bigquery/system-test/{sync_query.test.js => query.test.js} (61%) create mode 100644 bigquery/test/query.test.js delete mode 100644 bigquery/test/sync_query.test.js diff --git a/bigquery/sync_query.js b/bigquery/query.js similarity index 50% rename from bigquery/sync_query.js rename to bigquery/query.js index a71d146082..8e1be942db 100644 --- a/bigquery/sync_query.js +++ b/bigquery/query.js @@ -33,12 +33,12 @@ var gcloud = require('gcloud'); var bigquery = gcloud.bigquery(); // [END auth] -// [START query] +// [START sync_query] /** * Run an example synchronous query. * @param {object} queryObj The BigQuery query to run, plus any additional options * listed at https://cloud.google.com/bigquery/docs/reference/v2/jobs/query - * @param {function} callback Callback function. + * @param {function} callback Callback function to receive query results. */ function syncQuery (queryObj, callback) { if (!queryObj || !queryObj.query) { @@ -50,16 +50,66 @@ function syncQuery (queryObj, callback) { return callback(err); } - console.log('Found %d rows!', rows.length); + console.log('SyncQuery: found %d rows!', rows.length); return callback(null, rows); }); } -// [END query] +// [END sync_query] + +// [START async_query] +/** + * Run an example asynchronous query. + * @param {object} queryObj The BigQuery query to run, plus any additional options + * listed at https://cloud.google.com/bigquery/docs/reference/v2/jobs/query + * @param {function} callback Callback function to receive job data. + */ +function asyncQuery (queryObj, callback) { + if (!queryObj || !queryObj.query) { + return callback(Error('queryObj must be an object with a "query" parameter')); + } + + bigquery.startQuery(queryObj, function (err, job) { + if (err) { + return callback(err); + } + + console.log('AsyncQuery: submitted job %s!', job.id); + return callback(null, job); + }); +} + +/** + * Poll an asynchronous query job for results. + * @param {object} jobId The ID of the BigQuery job to poll. + * @param {function} callback Callback function to receive job data. + */ +function asyncPoll (jobId, callback) { + if (!jobId) { + return callback(Error('"jobId" is required!')); + } + + bigquery.job(jobId).getQueryResults(function (err, rows) { + if (err) { + return callback(err); + } + + console.log('AsyncQuery: polled job %s; got %d rows!', jobId, rows.length); + return callback(null, rows); + }); +} +// [END async_query] + // [START usage] function printUsage () { - console.log('Usage: node sync_query QUERY'); + console.log('Usage:'); + console.log('\nCommands:\n'); + console.log('\tnode query sync-query QUERY'); + console.log('\tnode query async-query QUERY'); + console.log('\tnode query poll JOB_ID'); console.log('\nExamples:\n'); - console.log('\tnode sync_query "SELECT * FROM publicdata:samples.natality LIMIT 5;"'); + console.log('\tnode query sync-query "SELECT * FROM publicdata:samples.natality LIMIT 5;"'); + console.log('\tnode query async-query "SELECT * FROM publicdata:samples.natality LIMIT 5;"'); + console.log('\tnode query poll 12345"'); } // [END usage] @@ -69,14 +119,21 @@ var program = { printUsage: printUsage, // Exports + asyncQuery: asyncQuery, + asyncPoll: asyncPoll, syncQuery: syncQuery, bigquery: bigquery, // Run the sample main: function (args, cb) { - if (args.length === 1 && !(args[0] === '-h' || args[0] === '--help')) { - var queryObj = { query: args[0], timeoutMs: 10000 }; - this.syncQuery(queryObj, cb); + var command = args.shift(); + var arg = args.shift(); + if (command === 'sync-query') { + this.syncQuery({ query: arg, timeoutMs: 10000 }, cb); + } else if (command === 'async-query') { + this.asyncQuery({ query: arg }, cb); + } else if (command === 'poll') { + this.asyncPoll(arg, cb); } else { this.printUsage(); } diff --git a/bigquery/system-test/sync_query.test.js b/bigquery/system-test/query.test.js similarity index 61% rename from bigquery/system-test/sync_query.test.js rename to bigquery/system-test/query.test.js index 48c115b0b0..a73d35d74c 100644 --- a/bigquery/system-test/sync_query.test.js +++ b/bigquery/system-test/query.test.js @@ -15,7 +15,7 @@ var example = require('../sync_query'); -describe('bigquery:sync_query', function () { +describe('bigquery:query', function () { describe('sync_query', function () { it('should fetch data given a query', function (done) { example.syncQuery( @@ -30,4 +30,24 @@ describe('bigquery:sync_query', function () { ); }); }); + + describe('async_query', function () { + it('should submit a job and fetch its results', function (done) { + example.asyncQuery( + { query: 'SELECT * FROM publicdata:samples.natality LIMIT 5;' }, + function (err, job) { + assert.ifError(err); + setTimeout(function () { + example.asyncPoll(job.id, function (err, data) { + assert.ifError(err); + assert.notEqual(data, null); + assert(Array.isArray(data)); + assert(data.length === 5); + done(); + }); + }, 5000); + } + ); + }); + }); }); diff --git a/bigquery/test/query.test.js b/bigquery/test/query.test.js new file mode 100644 index 0000000000..9884e9dab6 --- /dev/null +++ b/bigquery/test/query.test.js @@ -0,0 +1,214 @@ +// Copyright 2016, Google, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +var proxyquire = require('proxyquire').noCallThru(); + +function getSample () { + var natalityMock = [ + { year: '2001' }, + { year: '2002' }, + { year: '2003' }, + { year: '2004' }, + { year: '2005' } + ]; + + var jobId = 'abc'; + + var jobMock = { + id: jobId, + getQueryResults: sinon.stub().callsArgWith(0, null, natalityMock) + }; + + var bigqueryMock = { + job: sinon.stub().returns(jobMock), + startQuery: sinon.stub().callsArgWith(1, null, jobMock), + query: sinon.stub().callsArgWith(1, null, natalityMock) + }; + var gcloudMock = { + bigquery: sinon.stub().returns(bigqueryMock) + }; + return { + program: proxyquire('../sync_query', { + gcloud: gcloudMock + }), + mocks: { + gcloud: gcloudMock, + bigquery: bigqueryMock, + natality: natalityMock, + job: jobMock + }, + jobId: jobId + }; +} + +describe('bigquery:query', function () { + describe('main', function () { + it('should show usage based on arguments', function () { + var program = getSample().program; + sinon.stub(program, 'printUsage'); + + program.main([]); + assert(program.printUsage.calledOnce); + + program.main(['-h']); + assert(program.printUsage.calledTwice); + + program.main(['--help']); + assert(program.printUsage.calledThrice); + }); + + it('should run the correct commands', function () { + var program = getSample().program; + sinon.stub(program, 'syncQuery'); + sinon.stub(program, 'asyncQuery'); + sinon.stub(program, 'asyncPoll'); + + program.main(['sync-query']); + assert(program.syncQuery.calledOnce); + + program.main(['async-query']); + assert(program.asyncQuery.calledOnce); + + program.main(['poll']); + assert(program.asyncPoll.calledOnce); + }); + + it('should execute queries', function () { + var example = getSample(); + sinon.stub(example.program, 'syncQuery'); + + example.program.main(['foo'], function (err, data) { + assert.ifError(err); + assert(example.program.syncQuery.calledWith({ query: 'foo' })); + assert.deepEqual(data, example.mocks.natality); + }); + }); + }); + + describe('syncQuery', function () { + var queryObj = { query: 'foo' }; + + it('should return results', function () { + var example = getSample(); + example.program.syncQuery(queryObj, + function (err, data) { + assert.ifError(err); + assert(example.mocks.bigquery.query.calledWith(queryObj)); + assert.deepEqual(data, example.mocks.natality); + assert(console.log.calledWith('SyncQuery: found %d rows!', data.length)); + } + ); + }); + + it('should require a query', function () { + var example = getSample(); + example.program.syncQuery({}, function (err, data) { + assert.deepEqual(err, Error('queryObj must be an object with a "query" parameter')); + assert.equal(data, undefined); + }); + }); + + it('should handle error', function () { + var error = Error('syncQueryError'); + var example = getSample(); + example.mocks.bigquery.query = sinon.stub().callsArgWith(1, error); + example.program.syncQuery(queryObj, function (err, data) { + assert.deepEqual(err, error); + assert.equal(data, undefined); + }); + }); + }); + + describe('asyncQuery', function () { + var queryObj = { query: 'foo' }; + + it('should submit a job', function () { + var example = getSample(); + example.program.asyncQuery(queryObj, + function (err, job) { + assert.ifError(err); + assert(example.mocks.bigquery.startQuery.calledWith(queryObj)); + assert.deepEqual(example.mocks.job, job); + assert(console.log.calledWith('AsyncQuery: submitted job %s!', example.jobId)); + } + ); + }); + + it('should require a query', function () { + var example = getSample(); + example.program.asyncQuery({}, function (err, job) { + assert.deepEqual(err, Error('queryObj must be an object with a "query" parameter')); + assert.equal(job, undefined); + }); + }); + + it('should handle error', function () { + var error = Error('asyncQueryError'); + var example = getSample(); + example.mocks.bigquery.startQuery = sinon.stub().callsArgWith(1, error); + example.program.asyncQuery(queryObj, function (err, job) { + assert.deepEqual(err, error); + assert.equal(job, undefined); + }); + }); + }); + + describe('asyncPoll', function () { + it('should get the results of a job given its ID', function () { + var example = getSample(); + example.program.asyncPoll(example.jobId, + function (err, job) { + assert.ifError(err); + assert(example.mocks.job.getQueryResults.called); + assert(console.log.calledWith('AsyncQuery: polled job %s; got %d rows!', example.jobId)); + } + ); + }); + + it('should require a job ID', function () { + var example = getSample(); + example.program.asyncPoll(null, function (err, job) { + assert.deepEqual(err, Error('"jobId" is required!')); + assert.equal(job, undefined); + }); + }); + + it('should handle error', function () { + var error = Error('asyncPollError'); + var example = getSample(); + example.mocks.job.getQueryResults = sinon.stub().callsArgWith(0, error); + example.program.asyncPoll(example.jobId, function (err, job) { + assert.deepEqual(err, error); + assert.equal(job, undefined); + }); + }); + }); + + describe('printUsage', function () { + it('should print usage', function () { + var program = getSample().program; + program.printUsage(); + assert(console.log.calledWith('Usage:')); + assert(console.log.calledWith('\nCommands:\n')); + assert(console.log.calledWith('\tnode query sync-query QUERY')); + assert(console.log.calledWith('\tnode query async-query QUERY')); + assert(console.log.calledWith('\tnode query poll JOB_ID')); + assert(console.log.calledWith('\nExamples:\n')); + assert(console.log.calledWith('\tnode query sync-query "SELECT * FROM publicdata:samples.natality LIMIT 5;"')); + assert(console.log.calledWith('\tnode query async-query "SELECT * FROM publicdata:samples.natality LIMIT 5;"')); + assert(console.log.calledWith('\tnode query poll 12345"')); + }); + }); +}); diff --git a/bigquery/test/sync_query.test.js b/bigquery/test/sync_query.test.js deleted file mode 100644 index 9012df8025..0000000000 --- a/bigquery/test/sync_query.test.js +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2016, Google, Inc. -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -'use strict'; - -var proxyquire = require('proxyquire').noCallThru(); - -function getSample () { - var natalityMock = [ - { year: '2001' }, - { year: '2002' }, - { year: '2003' }, - { year: '2004' }, - { year: '2005' } - ]; - - var bigqueryMock = { - query: sinon.stub().callsArgWith(1, null, natalityMock) - }; - var gcloudMock = { - bigquery: sinon.stub().returns(bigqueryMock) - }; - return { - program: proxyquire('../sync_query', { - gcloud: gcloudMock - }), - mocks: { - gcloud: gcloudMock, - bigquery: bigqueryMock, - natality: natalityMock - } - }; -} - -describe('bigquery:sync_query', function () { - describe('main', function () { - it('should show usage based on arguments', function () { - var program = getSample().program; - sinon.stub(program, 'printUsage'); - - program.main([]); - assert(program.printUsage.calledOnce); - - program.main(['-h']); - assert(program.printUsage.calledTwice); - - program.main(['--help']); - assert(program.printUsage.calledThrice); - }); - - it('should execute queries', function () { - var example = getSample(); - sinon.stub(example.program, 'syncQuery'); - - example.program.main(['foo'], function (err, data) { - assert.ifError(err); - assert(example.program.syncQuery.calledWith({ query: 'foo' })); - assert.deepEqual(data, example.mocks.natality); - }); - }); - }); - - describe('syncQuery', function () { - var query = 'foo'; - var queryObj = { query: query }; - - it('should return results', function () { - var example = getSample(); - example.program.syncQuery(queryObj, - function (err, data) { - assert.ifError(err); - assert(example.mocks.bigquery.query.calledWith(queryObj)); - assert.deepEqual(data, example.mocks.natality); - assert(console.log.calledWith('Found %d rows!', data.length)); - } - ); - }); - - it('should require a query', function () { - var example = getSample(); - example.program.syncQuery({}, function (err, data) { - assert.deepEqual(err, Error('queryObj must be an object with a "query" parameter')); - assert.equal(data, undefined); - }); - }); - - it('should handle error', function () { - var error = Error('syncQueryError'); - var example = getSample(); - example.mocks.bigquery.query = sinon.stub().callsArgWith(1, error); - example.program.syncQuery(queryObj, function (err, data) { - assert.deepEqual(err, error); - assert.equal(data, undefined); - }); - }); - }); - - describe('printUsage', function () { - it('should print usage', function () { - var program = getSample().program; - program.printUsage(); - assert(console.log.calledWith('Usage: node sync_query QUERY')); - assert(console.log.calledWith('\nExamples:\n')); - assert(console.log.calledWith('\tnode sync_query "SELECT * FROM publicdata:samples.natality LIMIT 5;"')); - }); - }); -});