Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datastore: tests: refactor pb/transaction. #255

Merged
merged 3 commits into from
Oct 13, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 46 additions & 52 deletions lib/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,10 @@ DatastoreRequest.prototype.get = function(keys, callback) {
var isMultipleRequest = Array.isArray(keys);
keys = isMultipleRequest ? keys : [keys];
callback = callback || util.noop;
var req = new pb.LookupRequest({
var req = {
key: keys.map(entity.keyToKeyProto)
});
var res = pb.LookupResponse;
if (this.id) {
req.transaction = this.id;
}
this.createRequest_('lookup', req, res, function(err, resp) {
};
this.makeReq_('lookup', req, function(err, resp) {
if (err) {
callback(err);
return;
Expand Down Expand Up @@ -217,7 +213,6 @@ DatastoreRequest.prototype.save = function(entities, callback) {
return entityObject.key;
});
var req = {
mode: MODE_NON_TRANSACTIONAL,
mutation: entities.reduce(function(acc, entityObject, index) {
var ent = {};
if (Array.isArray(entityObject.data)) {
Expand All @@ -242,13 +237,7 @@ DatastoreRequest.prototype.save = function(entities, callback) {
return acc;
}.bind(this), { upsert: [], insert_auto_id: [] })
};
if (this.id) {
req.transaction = this.id;
req.mode = MODE_TRANSACTIONAL;
}
req = new pb.CommitRequest(req);
var res = pb.CommitResponse;
this.createRequest_('commit', req, res, function(err, resp) {
this.makeReq_('commit', req, function(err, resp) {
if (err || !resp) {
callback(err);
return;
Expand Down Expand Up @@ -290,20 +279,12 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
var isMultipleRequest = Array.isArray(keys);
keys = isMultipleRequest ? keys : [keys];
callback = callback || util.noop;

var req = {
mode: MODE_NON_TRANSACTIONAL,
mutation: {
delete: keys.map(entity.keyToKeyProto)
}
};
if (this.id) {
req.transaction = this.id;
req.mode = MODE_TRANSACTIONAL;
}
req = new pb.CommitRequest(req);
var res = pb.CommitResponse;
this.createRequest_('commit', req, res, function(err) {
this.makeReq_('commit', req, function(err) {
if (!err && this.id) {
this.isFinalized = true;
}
Expand Down Expand Up @@ -334,28 +315,20 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
* }
* });
*/

DatastoreRequest.prototype.runQuery = function(q, callback) {
callback = callback || util.noop;
var req = {
read_options: {},
query: entity.queryToQueryProto(q)
};

if (this.id) {
req.read_options.transaction = this.id;
}

if (q.namespace) {
req.partition_id = {
namespace: q.namespace
};
}

req = new pb.RunQueryRequest(req);
var res = pb.RunQueryResponse;

this.createRequest_('runQuery', req, res, function(err, resp) {
this.makeReq_('runQuery', req, function(err, resp) {
if (err || !resp.batch || !resp.batch.entity_result) {
callback(err);
return;
Expand Down Expand Up @@ -406,11 +379,10 @@ DatastoreRequest.prototype.allocateIds = function(incompleteKey, n, callback) {
for (var i = 0; i < n; i++) {
incompleteKeys.push(entity.keyToKeyProto(incompleteKey));
}

this.createRequest_(
'allocateIds',
new pb.AllocateIdsRequest({ key: incompleteKeys }),
pb.AllocateIdsResponse, function(err, resp) {
var req = {
key: incompleteKeys
};
this.makeReq_('allocateIds', req, function(err, resp) {
if (err) {
callback(err);
return;
Expand All @@ -424,28 +396,50 @@ DatastoreRequest.prototype.allocateIds = function(incompleteKey, n, callback) {
};

/**
* Make a request to the API endpoint.
* Make a request to the API endpoint. Properties to indicate a transactional or
* non-transactional operation are added automatically.
*
* @param {string} method - Transaction action (allocateIds, commit, etc.).
* @param {object} req - Request configuration object.
* @param {object} respType - Response type.
* @param {function} cb - The callback function.
* @param {string} method - Datastore action (allocateIds, commit, etc.).
* @param {object=} body - Request configuration object.
* @param {function} callback - The callback function.
*
* @private
*
* @example
* var deleteRequest = {
* MODE: 'NON_TRANSACTIONAL',
* mutation: {
* delete: [] // datastore key objects.
* }
* };
* transaction.makeReq('commit', deleteRequest, function(err) {});
*/
DatastoreRequest.prototype.createRequest_ =
function(method, req, respType, cb) {
DatastoreRequest.prototype.makeReq_ = function(method, body, callback) {
// TODO: Handle non-HTTP 200 cases.
cb = cb || util.noop;
if (!callback) {
callback = body;
body = {};
}
callback = callback || util.noop;

// Set properties to indicate if we're in a transaction or not.
if (method === 'commit') {
if (this.id) {
body.mode = MODE_TRANSACTIONAL;
body.transaction = this.id;
} else {
body.mode = MODE_NON_TRANSACTIONAL;
}
}

if (method === 'lookup' && this.id) {
body.read_options = body.read_options || {};
body.read_options.transaction = this.id;
}

This comment was marked as spam.

This comment was marked as spam.


var pbKey = method[0].toUpperCase() + method.substr(1);
var pbRequest = new pb[pbKey + 'Request'](body).toBuffer();
var pbResponse = pb[pbKey + 'Response'];

This comment was marked as spam.

This comment was marked as spam.


this.connection.createAuthorizedReq({
method: 'POST',
host: GOOGLE_APIS_HOST,
Expand All @@ -455,7 +449,7 @@ DatastoreRequest.prototype.createRequest_ =
}
}, function(err, request) {
if (err) {
cb(err);
callback(err);
return;
}
var remoteStream = https.request(request, function(resp) {
Expand All @@ -466,15 +460,15 @@ DatastoreRequest.prototype.createRequest_ =
resp.on('end', function() {
util.handleResp(null, resp, buffer.toString(), function(err) {
if (err) {
cb(err);
callback(err);
return;
}
cb(null, respType.decode(buffer));
callback(null, pbResponse.decode(buffer));
});
});
});
remoteStream.on('error', cb);
remoteStream.write(req.toBuffer());
remoteStream.on('error', callback);
remoteStream.write(pbRequest);
remoteStream.end();
});
};
Expand Down
37 changes: 10 additions & 27 deletions lib/datastore/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@

var nodeutil = require('util');

/**
* @type module:datastore/pb
* @private
*/
var pb = require('./pb.js');

/**
* @type module:common/util
* @private
Expand Down Expand Up @@ -66,11 +60,9 @@ var DatastoreRequest = require('./request.js');
*/
function Transaction(connection, projectId) {
this.connection = connection;
this.projectId = projectId;
// the default transaction has no id.
// if id is not set, run operations non-transactional.
this.id = null;
this.isFinalized = false;
this.projectId = projectId;
}

nodeutil.inherits(Transaction, DatastoreRequest);
Expand All @@ -96,17 +88,14 @@ nodeutil.inherits(Transaction, DatastoreRequest);
*/
Transaction.prototype.begin = function(callback) {
callback = callback || util.noop;
var that = this;
var req = new pb.BeginTransactionRequest();
var res = pb.BeginTransactionResponse;
this.createRequest_('beginTransaction', req, res, function(err, resp) {
this.makeReq_('beginTransaction', function(err, resp) {
if (err) {
callback(err);
return;
}
that.id = resp.transaction;
this.id = resp.transaction;
callback(null);
});
}.bind(this));
};

/**
Expand All @@ -125,13 +114,10 @@ Transaction.prototype.begin = function(callback) {
*/
Transaction.prototype.rollback = function(callback) {
callback = callback || util.noop;
var that = this;
var req = new pb.RollbackRequest({ transaction: this.id });
var res = pb.RollbackResponse;
this.createRequest_('rollback', req, res, function(err) {
that.isFinalized = true;
this.makeReq_('rollback', function(err) {
this.isFinalized = true;
callback(err || null);
});
}.bind(this));

This comment was marked as spam.

};

/**
Expand All @@ -150,17 +136,14 @@ Transaction.prototype.rollback = function(callback) {
*/
Transaction.prototype.commit = function(callback) {
callback = callback || util.noop;
var that = this;
var req = new pb.CommitRequest({ transaction: this.id });
var res = pb.CommitResponse;
this.createRequest_('commit', req, res, function(err) {
this.makeReq_('commit', function(err) {
if (err) {
callback(err);
return;
}
that.isFinalized = true;
this.isFinalized = true;
callback(null);
});
}.bind(this));
};

/**
Expand Down
Loading