Skip to content

Commit

Permalink
breaking: refactor(transaction): split logic into new classes (#506)
Browse files Browse the repository at this point in the history
* refactor(transaction): split logic into new classes

Previously all transaction logic was housed in a
single class. This refactor breaks the logic up
into multiple classes:

      Snapshot - read only transactions
      Transaction - read/write transactions
      PartitionedDml - dml capable transaction
  • Loading branch information
callmehiphop authored Feb 8, 2019
1 parent 17c0ba2 commit 4b447e7
Show file tree
Hide file tree
Showing 25 changed files with 4,260 additions and 4,734 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
"merge-stream": "^1.0.1",
"p-limit": "^2.0.0",
"p-queue": "^3.0.0",
"p-retry": "^3.0.0",
"protobufjs": "^6.8.6",
"split-array-stream": "^2.0.0",
"stack-trace": "0.0.10",
Expand Down
16 changes: 8 additions & 8 deletions samples/dml.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function insertUsingDml(instanceId, databaseId, projectId) {
return;
}
try {
const rowCount = await transaction.runUpdate({
const [rowCount] = await transaction.runUpdate({
sql:
'INSERT Singers (SingerId, FirstName, LastName) VALUES (10, @firstName, @lastName)',
params: {
Expand Down Expand Up @@ -93,7 +93,7 @@ function updateUsingDml(instanceId, databaseId, projectId) {
return;
}
try {
const rowCount = await transaction.runUpdate({
const [rowCount] = await transaction.runUpdate({
sql: `UPDATE Albums SET MarketingBudget = MarketingBudget * 2
WHERE SingerId = 1 and AlbumId = 1`,
});
Expand Down Expand Up @@ -137,7 +137,7 @@ function deleteUsingDml(instanceId, databaseId, projectId) {
return;
}
try {
const rowCount = await transaction.runUpdate({
const [rowCount] = await transaction.runUpdate({
sql: `DELETE Singers WHERE FirstName = 'Alice'`,
});

Expand Down Expand Up @@ -180,7 +180,7 @@ function updateUsingDmlWithTimestamp(instanceId, databaseId, projectId) {
return;
}
try {
const rowCount = await transaction.runUpdate({
const [rowCount] = await transaction.runUpdate({
sql: `UPDATE Albums
SET LastUpdateTime = PENDING_COMMIT_TIMESTAMP()
WHERE SingerId = 1`,
Expand Down Expand Up @@ -281,7 +281,7 @@ function updateUsingDmlWithStruct(instanceId, databaseId, projectId) {
return;
}
try {
const rowCount = await transaction.runUpdate({
const [rowCount] = await transaction.runUpdate({
sql: `UPDATE Singers SET LastName = 'Grant'
WHERE STRUCT<FirstName STRING, LastName STRING>(FirstName, LastName) = @name`,
params: {
Expand Down Expand Up @@ -328,7 +328,7 @@ function writeUsingDml(instanceId, databaseId, projectId) {
return;
}
try {
const rowCount = await transaction.runUpdate({
const [rowCount] = await transaction.runUpdate({
sql: `INSERT Singers (SingerId, FirstName, LastName) VALUES
(12, 'Melissa', 'Garcia'),
(13, 'Russell', 'Morales'),
Expand Down Expand Up @@ -481,7 +481,7 @@ async function updateUsingPartitionedDml(instanceId, databaseId, projectId) {
const database = instance.database(databaseId);

try {
const rowCount = await database.runPartitionedUpdate({
const [rowCount] = await database.runPartitionedUpdate({
sql: `UPDATE Albums SET MarketingBudget = 100000 WHERE SingerId > 1`,
});
console.log(`Successfully updated ${rowCount} records.`);
Expand Down Expand Up @@ -516,7 +516,7 @@ async function deleteUsingPartitionedDml(instanceId, databaseId, projectId) {
const database = instance.database(databaseId);

try {
const rowCount = await database.runPartitionedUpdate({
const [rowCount] = await database.runPartitionedUpdate({
sql: `DELETE Singers WHERE SingerId > 10`,
});
console.log(`Successfully deleted ${rowCount} records.`);
Expand Down
9 changes: 3 additions & 6 deletions samples/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function readOnlyTransaction(instanceId, databaseId, projectId) {

// Gets a transaction object that captures the database state
// at a specific point in time
database.runTransaction({readOnly: true}, async (err, transaction) => {
database.getSnapshot(async (err, transaction) => {
if (err) {
console.error(err);
return;
Expand All @@ -60,9 +60,6 @@ function readOnlyTransaction(instanceId, databaseId, projectId) {

const queryTwo = {
columns: ['SingerId', 'AlbumId', 'AlbumTitle'],
keySet: {
all: true,
},
};

// Read #2, using the `read` method. Even if changes occur
Expand All @@ -80,12 +77,12 @@ function readOnlyTransaction(instanceId, databaseId, projectId) {
});

console.log('Successfully executed read-only transaction.');
await transaction.end();
} catch (err) {
console.error('ERROR:', err);
} finally {
transaction.end();
// Close the database when finished.
database.close();
await database.close();
}
});
// [END spanner_read_only_transaction]
Expand Down
52 changes: 32 additions & 20 deletions src/batch-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,18 @@ import {promisifyAll} from '@google-cloud/promisify';
import * as extend from 'extend';
import * as is from 'is';
import {codec} from './codec';
import {Transaction} from './transaction';
import {Snapshot} from './transaction';

/**
* Use a BatchTransaction object to create partitions and read/query against
* your Cloud Spanner database.
*
* @class
* @extends Transaction
* @extends Snapshot
*
* @param {TransactionOptions} [options] [Transaction options](https://cloud.google.com/spanner/docs/timestamp-bounds).
* @param {TimestampBounds} [options] [Timestamp Bounds](https://cloud.google.com/spanner/docs/timestamp-bounds).
*/
class BatchTransaction extends Transaction {
readTimestamp?: {};

constructor(session) {
super(session, {readOnly: true});
}
class BatchTransaction extends Snapshot {
/**
* Closes all open resources.
*
Expand Down Expand Up @@ -117,15 +112,18 @@ class BatchTransaction extends Transaction {
sql: query,
};
}
const reqOpts = codec.encodeQuery(query);
const gaxOpts = query.gaxOptions;

const reqOpts = Object.assign({}, query, Snapshot.encodeParams(query));

delete reqOpts.gaxOptions;
delete reqOpts.types;

this.createPartitions_(
{
client: 'SpannerClient',
method: 'partitionQuery',
reqOpts,
gaxOpts,
gaxOpts: query.gaxOptions,
},
callback);
}
Expand All @@ -146,18 +144,27 @@ class BatchTransaction extends Transaction {
});
config.reqOpts = extend({}, query);
delete query.partitionOptions;
this.request(config, (err, resp) => {
this.session.request(config, (err, resp) => {
if (err) {
callback(err, null, resp);
return;
}

const partitions = resp.partitions.map(partition => {
return extend({}, query, partition);
});

if (resp.transaction) {
this.id = resp.transaction.id;
this.readTimestamp = resp.transaction.readTimestamp;
const {id, readTimestamp} = resp.transaction;

this.id = id;

if (readTimestamp) {
this.readTimestampProto = readTimestamp;
this.readTimestamp = codec.convertProtoTimestampToDate(readTimestamp);
}
}

callback(null, partitions, resp);
});
}
Expand Down Expand Up @@ -190,15 +197,20 @@ class BatchTransaction extends Transaction {
* @returns {Promise<CreateReadPartitionsResponse>}
*/
createReadPartitions(options, callback) {
const reqOpts = codec.encodeRead(options);
const gaxOpts = options.gaxOptions;
const reqOpts = Object.assign({}, options, {
keySet: Snapshot.encodeKeySet(options),
});

delete reqOpts.gaxOptions;
delete reqOpts.keys;
delete reqOpts.ranges;

this.createPartitions_(
{
client: 'SpannerClient',
method: 'partitionRead',
reqOpts,
gaxOpts,
gaxOpts: options.gaxOptions,
},
callback);
}
Expand Down Expand Up @@ -302,9 +314,9 @@ class BatchTransaction extends Transaction {
*/
identifier() {
return {
transaction: this.id.toString('base64'),
transaction: (this.id! as Buffer).toString('base64'),
session: this.session.id,
timestamp: this.readTimestamp,
timestamp: this.readTimestampProto,
};
}
}
Expand Down
Loading

0 comments on commit 4b447e7

Please sign in to comment.