diff --git a/lib/datastore/request.js b/lib/datastore/request.js index e47e3ff9396..50ab4d39964 100644 --- a/lib/datastore/request.js +++ b/lib/datastore/request.js @@ -106,14 +106,10 @@ DatastoreRequest.prototype.get = function(keys, callback) { var isMultipleRequest = Array.isArray(keys); keys = isMultipleRequest ? keys : [keys]; callback = callback || util.noop; - var req = new pb.LookupRequest({ + var req = { key: keys.map(entity.keyToKeyProto) - }); - var res = pb.LookupResponse; - if (this.id) { - req.transaction = this.id; - } - this.createRequest_('lookup', req, res, function(err, resp) { + }; + this.makeReq_('lookup', req, function(err, resp) { if (err) { callback(err); return; @@ -217,7 +213,6 @@ DatastoreRequest.prototype.save = function(entities, callback) { return entityObject.key; }); var req = { - mode: MODE_NON_TRANSACTIONAL, mutation: entities.reduce(function(acc, entityObject, index) { var ent = {}; if (Array.isArray(entityObject.data)) { @@ -242,13 +237,7 @@ DatastoreRequest.prototype.save = function(entities, callback) { return acc; }.bind(this), { upsert: [], insert_auto_id: [] }) }; - if (this.id) { - req.transaction = this.id; - req.mode = MODE_TRANSACTIONAL; - } - req = new pb.CommitRequest(req); - var res = pb.CommitResponse; - this.createRequest_('commit', req, res, function(err, resp) { + this.makeReq_('commit', req, function(err, resp) { if (err || !resp) { callback(err); return; @@ -290,20 +279,12 @@ DatastoreRequest.prototype.delete = function(keys, callback) { var isMultipleRequest = Array.isArray(keys); keys = isMultipleRequest ? keys : [keys]; callback = callback || util.noop; - var req = { - mode: MODE_NON_TRANSACTIONAL, mutation: { delete: keys.map(entity.keyToKeyProto) } }; - if (this.id) { - req.transaction = this.id; - req.mode = MODE_TRANSACTIONAL; - } - req = new pb.CommitRequest(req); - var res = pb.CommitResponse; - this.createRequest_('commit', req, res, function(err) { + this.makeReq_('commit', req, function(err) { if (!err && this.id) { this.isFinalized = true; } @@ -334,7 +315,6 @@ DatastoreRequest.prototype.delete = function(keys, callback) { * } * }); */ - DatastoreRequest.prototype.runQuery = function(q, callback) { callback = callback || util.noop; var req = { @@ -342,20 +322,13 @@ DatastoreRequest.prototype.runQuery = function(q, callback) { query: entity.queryToQueryProto(q) }; - if (this.id) { - req.read_options.transaction = this.id; - } - if (q.namespace) { req.partition_id = { namespace: q.namespace }; } - req = new pb.RunQueryRequest(req); - var res = pb.RunQueryResponse; - - this.createRequest_('runQuery', req, res, function(err, resp) { + this.makeReq_('runQuery', req, function(err, resp) { if (err || !resp.batch || !resp.batch.entity_result) { callback(err); return; @@ -406,11 +379,10 @@ DatastoreRequest.prototype.allocateIds = function(incompleteKey, n, callback) { for (var i = 0; i < n; i++) { incompleteKeys.push(entity.keyToKeyProto(incompleteKey)); } - - this.createRequest_( - 'allocateIds', - new pb.AllocateIdsRequest({ key: incompleteKeys }), - pb.AllocateIdsResponse, function(err, resp) { + var req = { + key: incompleteKeys + }; + this.makeReq_('allocateIds', req, function(err, resp) { if (err) { callback(err); return; @@ -424,28 +396,50 @@ DatastoreRequest.prototype.allocateIds = function(incompleteKey, n, callback) { }; /** - * Make a request to the API endpoint. + * Make a request to the API endpoint. Properties to indicate a transactional or + * non-transactional operation are added automatically. * - * @param {string} method - Transaction action (allocateIds, commit, etc.). - * @param {object} req - Request configuration object. - * @param {object} respType - Response type. - * @param {function} cb - The callback function. + * @param {string} method - Datastore action (allocateIds, commit, etc.). + * @param {object=} body - Request configuration object. + * @param {function} callback - The callback function. * * @private * * @example * var deleteRequest = { - * MODE: 'NON_TRANSACTIONAL', * mutation: { * delete: [] // datastore key objects. * } * }; * transaction.makeReq('commit', deleteRequest, function(err) {}); */ -DatastoreRequest.prototype.createRequest_ = - function(method, req, respType, cb) { +DatastoreRequest.prototype.makeReq_ = function(method, body, callback) { // TODO: Handle non-HTTP 200 cases. - cb = cb || util.noop; + if (!callback) { + callback = body; + body = {}; + } + callback = callback || util.noop; + + // Set properties to indicate if we're in a transaction or not. + if (method === 'commit') { + if (this.id) { + body.mode = MODE_TRANSACTIONAL; + body.transaction = this.id; + } else { + body.mode = MODE_NON_TRANSACTIONAL; + } + } + + if (method === 'lookup' && this.id) { + body.read_options = body.read_options || {}; + body.read_options.transaction = this.id; + } + + var pbKey = method[0].toUpperCase() + method.substr(1); + var pbRequest = new pb[pbKey + 'Request'](body).toBuffer(); + var pbResponse = pb[pbKey + 'Response']; + this.connection.createAuthorizedReq({ method: 'POST', host: GOOGLE_APIS_HOST, @@ -455,7 +449,7 @@ DatastoreRequest.prototype.createRequest_ = } }, function(err, request) { if (err) { - cb(err); + callback(err); return; } var remoteStream = https.request(request, function(resp) { @@ -466,15 +460,15 @@ DatastoreRequest.prototype.createRequest_ = resp.on('end', function() { util.handleResp(null, resp, buffer.toString(), function(err) { if (err) { - cb(err); + callback(err); return; } - cb(null, respType.decode(buffer)); + callback(null, pbResponse.decode(buffer)); }); }); }); - remoteStream.on('error', cb); - remoteStream.write(req.toBuffer()); + remoteStream.on('error', callback); + remoteStream.write(pbRequest); remoteStream.end(); }); }; diff --git a/lib/datastore/transaction.js b/lib/datastore/transaction.js index 0d68ea8048e..dda603cc4b9 100644 --- a/lib/datastore/transaction.js +++ b/lib/datastore/transaction.js @@ -22,12 +22,6 @@ var nodeutil = require('util'); -/** - * @type module:datastore/pb - * @private - */ -var pb = require('./pb.js'); - /** * @type module:common/util * @private @@ -66,11 +60,9 @@ var DatastoreRequest = require('./request.js'); */ function Transaction(connection, projectId) { this.connection = connection; - this.projectId = projectId; - // the default transaction has no id. - // if id is not set, run operations non-transactional. this.id = null; this.isFinalized = false; + this.projectId = projectId; } nodeutil.inherits(Transaction, DatastoreRequest); @@ -96,17 +88,14 @@ nodeutil.inherits(Transaction, DatastoreRequest); */ Transaction.prototype.begin = function(callback) { callback = callback || util.noop; - var that = this; - var req = new pb.BeginTransactionRequest(); - var res = pb.BeginTransactionResponse; - this.createRequest_('beginTransaction', req, res, function(err, resp) { + this.makeReq_('beginTransaction', function(err, resp) { if (err) { callback(err); return; } - that.id = resp.transaction; + this.id = resp.transaction; callback(null); - }); + }.bind(this)); }; /** @@ -125,13 +114,10 @@ Transaction.prototype.begin = function(callback) { */ Transaction.prototype.rollback = function(callback) { callback = callback || util.noop; - var that = this; - var req = new pb.RollbackRequest({ transaction: this.id }); - var res = pb.RollbackResponse; - this.createRequest_('rollback', req, res, function(err) { - that.isFinalized = true; + this.makeReq_('rollback', function(err) { + this.isFinalized = true; callback(err || null); - }); + }.bind(this)); }; /** @@ -150,17 +136,14 @@ Transaction.prototype.rollback = function(callback) { */ Transaction.prototype.commit = function(callback) { callback = callback || util.noop; - var that = this; - var req = new pb.CommitRequest({ transaction: this.id }); - var res = pb.CommitResponse; - this.createRequest_('commit', req, res, function(err) { + this.makeReq_('commit', function(err) { if (err) { callback(err); return; } - that.isFinalized = true; + this.isFinalized = true; callback(null); - }); + }.bind(this)); }; /** diff --git a/test/datastore/request.js b/test/datastore/request.js index 33988d1a1a7..0b4d28b1d95 100644 --- a/test/datastore/request.js +++ b/test/datastore/request.js @@ -23,22 +23,39 @@ var ByteBuffer = require('bytebuffer'); var duplexify = require('duplexify'); var entity = require('../../lib/datastore/entity.js'); var mockRespGet = require('../testdata/response_get.json'); +var pb = require('../../lib/datastore/pb.js'); var Query = require('../../lib/datastore/query.js'); var util = require('../../lib/common/util.js'); var httpsRequestOverride = util.noop; var https = { request: function() { - var result = httpsRequestOverride.apply(this, util.toArray(arguments)); + var requestFn = httpsRequestOverride; httpsRequestOverride = util.noop; - return result; + return requestFn.apply(this, util.toArray(arguments)); + } +}; + +// Create a protobuf "FakeMethod" request & response. +pb.FakeMethodRequest = function() { + this.toBuffer = function() { + return new Buffer(''); + }; +}; +var pbFakeMethodResponseDecode = util.noop; +pb.FakeMethodResponse = { + decode: function() { + var decodeFn = pbFakeMethodResponseDecode; + pbFakeMethodResponseDecode = util.noop; + return decodeFn.apply(this, util.toArray(arguments)); } }; var Request = require('sandboxed-module') .require('../../lib/datastore/request.js', { requires: { - 'https': https + 'https': https, + './pb.js': pb } }); @@ -56,9 +73,9 @@ describe('Request', function() { describe('get', function() { it('should get by key', function(done) { - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { assert.equal(method, 'lookup'); - assert.equal(proto.key.length, 1); + assert.equal(req.key.length, 1); callback(null, mockRespGet); }; request.get(key, function(err, entity) { @@ -72,9 +89,9 @@ describe('Request', function() { }); it('should multi get by keys', function(done) { - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { assert.equal(method, 'lookup'); - assert.equal(proto.key.length, 1); + assert.equal(req.key.length, 1); callback(null, mockRespGet); }; request.get([key], function(err, entities) { @@ -90,7 +107,7 @@ describe('Request', function() { it('should continue looking for deferred results', function(done) { var lookupCount = 0; - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { lookupCount++; assert.equal(method, 'lookup'); if (mockRespGet.deferred.length) { @@ -111,9 +128,9 @@ describe('Request', function() { describe('save', function() { it('should save with incomplete key', function(done) { - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { assert.equal(method, 'commit'); - assert.equal(proto.mutation.insert_auto_id.length, 1); + assert.equal(req.mutation.insert_auto_id.length, 1); callback(); }; var key = new entity.Key({ namespace: 'ns', path: ['Company'] }); @@ -121,12 +138,12 @@ describe('Request', function() { }); it('should save with keys', function(done) { - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { assert.equal(method, 'commit'); - assert.equal(proto.mutation.upsert.length, 2); - assert.equal(proto.mutation.upsert[0].property[0].name, 'k'); + assert.equal(req.mutation.upsert.length, 2); + assert.equal(req.mutation.upsert[0].property[0].name, 'k'); assert.equal( - proto.mutation.upsert[0].property[0].value.string_value, 'v'); + req.mutation.upsert[0].property[0].value.string_value, 'v'); callback(); }; request.save([ @@ -143,7 +160,7 @@ describe('Request', function() { it('should mark transaction as finalized', function(done) { assert.strictEqual(request.isFinalized, undefined); - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { callback(null, { mutation_result: {} }); }; request.save({ key: key, data: {} }, function(err) { @@ -155,7 +172,7 @@ describe('Request', function() { it('should not mark as finalized if an error occurred', function(done) { assert.strictEqual(request.isFinalized, undefined); - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { callback(new Error('Error.')); }; request.save({ key: key, data: {} }, function() { @@ -165,11 +182,11 @@ describe('Request', function() { }); it('should not set an indexed value by default', function(done) { - request.createRequest_ = function(method, proto) { - var property = proto.mutation.upsert[0].property[0]; + request.makeReq_ = function(method, req) { + var property = req.mutation.upsert[0].property[0]; assert.equal(property.name, 'name'); assert.equal(property.value.string_value, 'value'); - assert.strictEqual(property.value.indexed, null); + assert.strictEqual(property.value.indexed, undefined); done(); }; request.save({ @@ -179,8 +196,8 @@ describe('Request', function() { }); it('should allow setting the indexed value of property', function(done) { - request.createRequest_ = function(method, proto) { - var property = proto.mutation.upsert[0].property[0]; + request.makeReq_ = function(method, req) { + var property = req.mutation.upsert[0].property[0]; assert.equal(property.name, 'name'); assert.equal(property.value.string_value, 'value'); assert.strictEqual(property.value.indexed, false); @@ -196,18 +213,18 @@ describe('Request', function() { describe('delete', function() { it('should delete by key', function(done) { - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { assert.equal(method, 'commit'); - assert.equal(!!proto.mutation.delete, true); + assert.equal(!!req.mutation.delete, true); callback(); }; request.delete(key, done); }); it('should multi delete by keys', function(done) { - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { assert.equal(method, 'commit'); - assert.equal(proto.mutation.delete.length, 2); + assert.equal(req.mutation.delete.length, 2); callback(); }; request.delete([ key, key ], done); @@ -221,7 +238,7 @@ describe('Request', function() { it('should mark transaction as finalized', function(done) { assert.strictEqual(request.isFinalized, undefined); - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { callback(null, { mutation_result: {} }); }; request.delete(key, function(err) { @@ -233,7 +250,7 @@ describe('Request', function() { it('should not mark as finalized if an error occurred', function(done) { assert.strictEqual(request.isFinalized, undefined); - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { callback(new Error('Error.')); }; request.delete(key, function() { @@ -266,7 +283,7 @@ describe('Request', function() { describe('errors', function() { it('should handle upstream errors', function() { var error = new Error('Error.'); - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { assert.equal(method, 'runQuery'); callback(error); }; @@ -277,7 +294,7 @@ describe('Request', function() { }); it('should handle missing results error', function() { - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { assert.equal(method, 'runQuery'); callback(null, mockResponse.withoutResults); }; @@ -290,7 +307,7 @@ describe('Request', function() { }); it('should execute callback with results', function() { - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { assert.equal(method, 'runQuery'); callback(null, mockResponse.withResults); }; @@ -307,7 +324,7 @@ describe('Request', function() { }); it('should return a new query if results remain', function() { - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { assert.equal(method, 'runQuery'); callback(null, mockResponse.withResultsAndEndCursor); }; @@ -321,13 +338,9 @@ describe('Request', function() { describe('allocateIds', function() { it('should produce proper allocate IDs req protos', function(done) { - request.createRequest_ = function(method, proto, typ, callback) { + request.makeReq_ = function(method, req, callback) { assert.equal(method, 'allocateIds'); - assert.equal(proto.key.length, 1); - assert.deepEqual(proto.key[0], { - partition_id: null, - path_element :[{ kind: 'Kind', name: null, id: null }] - }); + assert.equal(req.key.length, 1); callback(null, { key: [ { path_element: [{ kind: 'Kind', id: 123 }] } @@ -350,12 +363,7 @@ describe('Request', function() { }); }); - describe('createRequest_', function() { - var mockProtoRequest = { fake: 'request' }; - mockProtoRequest.toBuffer = function() { - return mockProtoRequest; - }; - + describe('makeReq_', function() { beforeEach(function() { request.connection = { createAuthorizedReq: util.noop @@ -373,11 +381,11 @@ describe('Request', function() { assert.equal(opts.headers['content-type'], 'application/x-protobuf'); done(); }; - request.createRequest_(method, null, null, util.noop); + request.makeReq_(method, {}, util.noop); }); it('should make https request', function(done) { - var mockRequest = { fake: 'request' }; + var mockRequest = { mock: 'request' }; httpsRequestOverride = function(req) { assert.deepEqual(req, mockRequest); done(); @@ -386,7 +394,115 @@ describe('Request', function() { request.connection.createAuthorizedReq = function(opts, callback) { callback(null, mockRequest); }; - request.createRequest_('commit', mockProtoRequest, null, util.noop); + request.makeReq_('commit', {}, util.noop); + }); + + it('should send protobuf request', function(done) { + var requestOptions = { mode: 'NON_TRANSACTIONAL' }; + var decoded = new pb.CommitRequest(requestOptions).toBuffer(); + httpsRequestOverride = function() { + var stream = { on: util.noop, end: util.noop }; + stream.write = function(data) { + assert.equal(String(data), String(decoded)); + done(); + }; + return stream; + }; + request.connection.createAuthorizedReq = function(opts, callback) { + callback(); + }; + request.makeReq_('commit', requestOptions, util.noop); + }); + + it('should decode protobuf response', function(done) { + pbFakeMethodResponseDecode = function() { + done(); + }; + httpsRequestOverride = function(req, callback) { + var responseStream = duplexify(); + callback(responseStream); + responseStream.emit('end'); + return duplexify(); + }; + request.connection.createAuthorizedReq = function(opts, callback) { + callback(); + }; + request.makeReq_('fakeMethod', util.noop); + }); + + describe('transactional and non-transactional properties', function() { + beforeEach(function() { + request.connection.createAuthorizedReq = function(opts, callback) { + callback(); + }; + }); + + describe('commit', function() { + it('should attach transactional properties', function(done) { + request.id = 'transaction-id'; + var expected = new pb.CommitRequest({ + mode: 'TRANSACTIONAL', + transaction: request.id + }).toBuffer(); + httpsRequestOverride = function() { + var stream = { on: util.noop, end: util.noop }; + stream.write = function(data) { + assert.deepEqual(data, expected); + done(); + }; + return stream; + }; + request.makeReq_('commit', util.noop); + }); + + it('should attach non-transactional properties', function(done) { + var expected = new pb.CommitRequest({ + mode: 'NON_TRANSACTIONAL' + }).toBuffer(); + httpsRequestOverride = function() { + var stream = { on: util.noop, end: util.noop }; + stream.write = function(data) { + assert.deepEqual(data, expected); + done(); + }; + return stream; + }; + request.makeReq_('commit', util.noop); + }); + }); + + describe('lookup', function() { + it('should attach transactional properties', function(done) { + request.id = 'transaction-id'; + var expected = new pb.LookupRequest({ + read_options: { + transaction: request.id + } + }).toBuffer(); + httpsRequestOverride = function() { + var stream = { on: util.noop, end: util.noop }; + stream.write = function(data) { + assert.deepEqual(data, expected); + done(); + }; + return stream; + }; + request.makeReq_('lookup', util.noop); + }); + + it('should not attach transactional properties', function(done) { + var expected = new pb.LookupRequest().toBuffer(); + httpsRequestOverride = function() { + var stream = { on: util.noop, end: util.noop }; + stream.write = function(data) { + assert.deepEqual(data, expected); + done(); + }; + return stream; + }; + request.makeReq_('lookup', util.noop); + }); + }); }); }); }); diff --git a/test/datastore/transaction.js b/test/datastore/transaction.js index dc3d87f400e..848627dd90e 100644 --- a/test/datastore/transaction.js +++ b/test/datastore/transaction.js @@ -31,7 +31,8 @@ describe('Transaction', function() { describe('begin', function() { it('should begin', function(done) { - transaction.createRequest_ = function(method) { + transaction.makeReq_ = function(method, req, callback) { + callback = callback || req; assert.equal(method, 'beginTransaction'); done(); }; @@ -39,7 +40,8 @@ describe('Transaction', function() { }); it('should set transaction id', function(done) { - transaction.createRequest_ = function(method, proto, respType, callback) { + transaction.makeReq_ = function(method, req, callback) { + callback = callback || req; callback(null, { transaction: TRANSACTION_ID }); }; transaction.begin(function(err) { @@ -51,7 +53,8 @@ describe('Transaction', function() { it('should pass error to callback', function(done) { var error = new Error('Error.'); - transaction.createRequest_ = function(method, proto, respType, callback) { + transaction.makeReq_ = function(method, req, callback) { + callback = callback || req; callback(error); }; transaction.begin(function(err) { @@ -67,11 +70,8 @@ describe('Transaction', function() { }); it('should rollback', function(done) { - transaction.createRequest_ = function(method, proto) { + transaction.makeReq_ = function(method) { assert.equal(method, 'rollback'); - assert.equal( - proto.transaction.toBase64(), - new Buffer(transaction.id).toString('base64')); done(); }; transaction.rollback(); @@ -79,7 +79,8 @@ describe('Transaction', function() { it('should pass error to callback', function(done) { var error = new Error('Error.'); - transaction.createRequest_ = function(method, proto, respType, callback) { + transaction.makeReq_ = function(method, req, callback) { + callback = callback || req; callback(error); }; transaction.rollback(function(err) { @@ -89,7 +90,8 @@ describe('Transaction', function() { }); it('should mark as finalized', function(done) { - transaction.createRequest_ = function(method, proto, respType, callback) { + transaction.makeReq_ = function(method, req, callback) { + callback = callback || req; callback(); }; transaction.rollback(function() { @@ -99,8 +101,9 @@ describe('Transaction', function() { }); it('should mark as finalized when rollback errors', function(done) { - transaction.createRequest_ = function(method, proto, respType, callback) { - callback(new Error('Error.')); + transaction.makeReq_ = function(method, req, callback) { + callback = callback || req; + callback(new Error('Error.')); }; transaction.rollback(function() { assert.strictEqual(transaction.isFinalized, true); @@ -115,11 +118,8 @@ describe('Transaction', function() { }); it('should commit', function(done) { - transaction.createRequest_ = function(method, proto) { + transaction.makeReq_ = function(method) { assert.equal(method, 'commit'); - assert.equal( - proto.transaction.toBase64(), - new Buffer(transaction.id).toString('base64')); done(); }; transaction.commit(); @@ -127,7 +127,8 @@ describe('Transaction', function() { it('should pass error to callback', function(done) { var error = new Error('Error.'); - transaction.createRequest_ = function(method, proto, respType, callback) { + transaction.makeReq_ = function(method, req, callback) { + callback = callback || req; callback(error); }; transaction.commit(function(err) { @@ -137,7 +138,8 @@ describe('Transaction', function() { }); it('should mark as finalized', function(done) { - transaction.createRequest_ = function(method, proto, respType, callback) { + transaction.makeReq_ = function(method, req, callback) { + callback = callback || req; callback(); }; transaction.commit(function() { @@ -147,7 +149,8 @@ describe('Transaction', function() { }); it('should not mark as finalized if commit errors', function(done) { - transaction.createRequest_ = function(method, proto, respType, callback) { + transaction.makeReq_ = function(method, req, callback) { + callback = callback || req; callback(new Error('Error.')); }; transaction.commit(function() {