Skip to content

Commit

Permalink
feat: new transaction feature (#1239)
Browse files Browse the repository at this point in the history
* Move the state up, make it protected

Transaction state should move up to the super class

* Build new transaction options method

Add a method for building transaction options. Also add a method for detecting if a request is a transaction. Build new transaction options

* Correct the fn that builds transactions

Corrected getTransactionRequest. Build the request options properly.

* Set transaction to readOnly in system test.

When this system test isn’t set to be a readOnly transaction then the error occurs which says too much contention.

* Change #parseRunSuccess

Change parseRunSuccess so that it can be used more universally.

* Move parseRunSuccess up to protected

parseRunSuccess should move up to the super class because it now needs to be used there.

* Saves the transaction id

This change saves the transaction id returned from the server for read calls

* Add tests for testing read only transactions

Read only tests are needed

* Add tests that measure read time

Make sure that code using the new transaction option has better performance than code that doesn’t have it.

* ran linter

* Add a test for testing requests

Use the MockedTransactionWrapper to test requests being passed into the Gapic layer.

* Final changes to make test work

Mock out begin transaction. Test for newTransaction consistency type. Mock out commit.

* Add the transaction.run test

A transaction.run test is needed  for lookup, lookup, put, commit.

* run linter

* runQuery, lookup, put, commit

Add a test for this sequence of operations and ensure it works properly.

* runAggregationQuery, lookup, put, commit

Four operations that get all the results for running an aggregation query. Adding another test for these four operations.

* put, put, lookup, commit

Last test suite regarding new transaction unit tests.

* Add tests for the commits

Add a bunch of tests for the commit case. Check the commit gapic input.

* Add testing to ensure begin tx is called

Begin transaction should be called at least once. Add code here to increment the counter.

* Document #blockWithMutex

Remove commented code too.

* Document transaction state

* feat: new transaction feature branch

* Add a check for expired

Check for expired on most functions and write tests for the expired check.

* Add commit and rollback blocks

Check for expired state in the commit and rollback blocks.

* run the linter

* Don’t allow readtime to be specified in a txn

Matches the python client.

* throw error for both read time and consistency

* Add test for specifying readtime

* Remove the console logs

* Refactor the test

* Improve test to make sure Gapic layer isn’t called

Also, fix bug with read consistency.

* Run linter
Reorganize tests
New test for readtime and consistency

* Make change to allow the runAggregationQuery go up

* Should error when get is used

* Move tests over and refactor initialized datastore

* Remove only and remove imports

* Introduce parameterized testing for errors

* Use parameters in parameterized tests

* Migrate error tests to parameterized testing

* Change description

* Prepare second describe block for tests

* Revert "Prepare second describe block for tests"

This reverts commit 974117b.

* Always return after the error is sent back

This ensures that an extra call does not get made to the server.

* Add headers

* Tests and implementation for expired on rollback

* fix test

* Wrap rollbacks with a withBeginTransaction.

* Update the test so that it begins the tx before

* Throw error if transaction not started on rollback

* Remove only

* Add a comment to the test regarding new txn.run()

* Ensure that the errors get bubbled up

* Run. linter

* read time and consistency error

* Remove unnecessary change

* Eliminate the call to withBeginTransaction

* Throw error reported in documentation

* Work on streaming errors

Fix the concurrency tests and move the error reporting outside of the stream, add two tests for the streams to make sure errors get reported.

* Move the error throwing up the stack

This makes it so that it is reported in the streams.

* Fix linting errors

* Generate unit tests for getTransactionRequest

I am going to rewrite the getTransactionRequest function so it is a good idea not to change how it works.

* Generate unit tests

Fix the compiler error
Change code so that when readOnly is specified then readwrite options are ignored.
Fix the unit test to adopt this change.

* Make function one ternary operation

Add comments, simplify logic in the function.

* Add two comments

* Update the comment

* Run the linter
  • Loading branch information
danieljbruce committed Jun 24, 2024
1 parent 91237e0 commit cdd2ee9
Show file tree
Hide file tree
Showing 9 changed files with 1,760 additions and 206 deletions.
234 changes: 195 additions & 39 deletions src/request.ts

Large diffs are not rendered by default.

136 changes: 66 additions & 70 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,34 @@
*/

import {promisifyAll} from '@google-cloud/promisify';
import arrify = require('arrify');
import {CallOptions} from 'google-gax';

import {google} from '../protos/protos';

import {Datastore, TransactionOptions} from '.';
import {entity, Entity, Entities} from './entity';
import {Entities, Entity, entity} from './entity';
import {
Query,
RunQueryCallback,
RunQueryInfo,
RunQueryOptions,
RunQueryResponse,
} from './query';
import {
CommitCallback,
CommitResponse,
DatastoreRequest,
RequestOptions,
PrepareEntityObjectResponse,
CreateReadStreamOptions,
GetResponse,
DatastoreRequest,
GetCallback,
GetResponse,
getTransactionRequest,
PrepareEntityObjectResponse,
RequestCallback,
transactionExpiredError,
TransactionState,
} from './request';
import {AggregateQuery} from './aggregate';
import {Mutex} from 'async-mutex';
import arrify = require('arrify');

/*
* This type matches the value returned by the promise in the
Expand All @@ -53,11 +54,6 @@ interface BeginAsyncResponse {
resp?: google.datastore.v1.IBeginTransactionResponse;
}

enum TransactionState {
NOT_STARTED,
IN_PROGRESS, // IN_PROGRESS currently tracks the expired state as well
}

/**
* A transaction is a set of Datastore operations on one or more entities. Each
* transaction is guaranteed to be atomic, which means that transactions are
Expand Down Expand Up @@ -85,7 +81,6 @@ class Transaction extends DatastoreRequest {
modifiedEntities_: ModifiedEntities;
skipCommit?: boolean;
#mutex = new Mutex();
#state = TransactionState.NOT_STARTED;
constructor(datastore: Datastore, options?: TransactionOptions) {
super();
/**
Expand Down Expand Up @@ -115,6 +110,7 @@ class Transaction extends DatastoreRequest {

// Queue the requests to make when we send the transactional commit.
this.requests_ = [];
this.state = TransactionState.NOT_STARTED;
}

/*! Developer Documentation
Expand Down Expand Up @@ -177,6 +173,10 @@ class Transaction extends DatastoreRequest {
: () => {};
const gaxOptions =
typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {};
if (this.state === TransactionState.EXPIRED) {
callback(new Error(transactionExpiredError));
return;
}
// This ensures that the transaction is started before calling runCommit
this.#withBeginTransaction(
gaxOptions,
Expand Down Expand Up @@ -355,13 +355,9 @@ class Transaction extends DatastoreRequest {
const callback =
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!;
// This ensures that the transaction is started before calling get
this.#withBeginTransaction(
options.gaxOptions,
() => {
super.get(keys, options, callback);
},
callback
);
this.#blockWithMutex(() => {
super.get(keys, options, callback);
});
}

/**
Expand Down Expand Up @@ -434,6 +430,14 @@ class Transaction extends DatastoreRequest {
const callback =
typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!;

if (this.state === TransactionState.EXPIRED) {
callback(new Error(transactionExpiredError));
return;
}
if (this.state === TransactionState.NOT_STARTED) {
callback(new Error('Transaction is not started'));
return;
}
this.request_(
{
client: 'DatastoreClient',
Expand All @@ -442,6 +446,7 @@ class Transaction extends DatastoreRequest {
},
(err, resp) => {
this.skipCommit = true;
this.state = TransactionState.EXPIRED;
callback(err || null, resp);
}
);
Expand Down Expand Up @@ -511,7 +516,7 @@ class Transaction extends DatastoreRequest {
const callback =
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!;
this.#mutex.runExclusive(async () => {
if (this.#state === TransactionState.NOT_STARTED) {
if (this.state === TransactionState.NOT_STARTED) {
const runResults = await this.#beginTransactionAsync(options);
this.#processBeginResults(runResults, callback);
} else {
Expand Down Expand Up @@ -635,6 +640,7 @@ class Transaction extends DatastoreRequest {
return;
}

this.state = TransactionState.EXPIRED;
// The `callbacks` array was built previously. These are the callbacks
// that handle the API response normally when using the
// DatastoreRequest.save and .delete methods.
Expand Down Expand Up @@ -666,24 +672,11 @@ class Transaction extends DatastoreRequest {
if (err) {
callback(err, null, resp);
} else {
this.#parseRunSuccess(runResults);
this.parseTransactionResponse(resp);
callback(null, this, resp);
}
}

/**
* This function saves results from a successful beginTransaction call.
*
* @param {BeginAsyncResponse} [response] The response from a call to
* begin a transaction that completed successfully.
*
**/
#parseRunSuccess(runResults: BeginAsyncResponse) {
const resp = runResults.resp;
this.id = resp!.transaction;
this.#state = TransactionState.IN_PROGRESS;
}

/**
* This async function makes a beginTransaction call and returns a promise with
* the information returned from the call that was made.
Expand All @@ -696,24 +689,10 @@ class Transaction extends DatastoreRequest {
async #beginTransactionAsync(
options: RunOptions
): Promise<BeginAsyncResponse> {
const reqOpts: RequestOptions = {
transactionOptions: {},
};

if (options.readOnly || this.readOnly) {
reqOpts.transactionOptions!.readOnly = {};
}

if (options.transactionId || this.id) {
reqOpts.transactionOptions!.readWrite = {
previousTransaction: options.transactionId || this.id,
};
}

if (options.transactionOptions) {
reqOpts.transactionOptions = options.transactionOptions;
}
return new Promise((resolve: (value: BeginAsyncResponse) => void) => {
const reqOpts = {
transactionOptions: getTransactionRequest(this, options),
};
this.request_(
{
client: 'DatastoreClient',
Expand Down Expand Up @@ -766,13 +745,9 @@ class Transaction extends DatastoreRequest {
const callback =
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!;
// This ensures that the transaction is started before calling runAggregationQuery
this.#withBeginTransaction(
options.gaxOptions,
() => {
super.runAggregationQuery(query, options, callback);
},
callback
);
this.#blockWithMutex(() => {
super.runAggregationQuery(query, options, callback);
});
}

/**
Expand Down Expand Up @@ -805,13 +780,9 @@ class Transaction extends DatastoreRequest {
const callback =
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!;
// This ensures that the transaction is started before calling runQuery
this.#withBeginTransaction(
options.gaxOptions,
() => {
super.runQuery(query, options, callback);
},
callback
);
this.#blockWithMutex(() => {
super.runQuery(query, options, callback);
});
}

/**
Expand Down Expand Up @@ -1019,7 +990,7 @@ class Transaction extends DatastoreRequest {
* @param {CallOptions | undefined} [gaxOptions] Gax options provided by the
* user that are used for the beginTransaction grpc call.
* @param {function} [fn] A function which is run after ensuring a
* beginTransaction call is made.
* transaction has begun.
* @param {function} [callback] A callback provided by the user that expects
* an error in the first argument and a custom data type for the rest of the
* arguments.
Expand All @@ -1031,10 +1002,10 @@ class Transaction extends DatastoreRequest {
callback: (...args: [Error | null, ...T] | [Error | null]) => void
): void {
(async () => {
if (this.#state === TransactionState.NOT_STARTED) {
if (this.state === TransactionState.NOT_STARTED) {
try {
await this.#mutex.runExclusive(async () => {
if (this.#state === TransactionState.NOT_STARTED) {
if (this.state === TransactionState.NOT_STARTED) {
// This sends an rpc call to get the transaction id
const runResults = await this.#beginTransactionAsync({
gaxOptions,
Expand All @@ -1044,7 +1015,7 @@ class Transaction extends DatastoreRequest {
// Do not call the wrapped function.
throw runResults.err;
}
this.#parseRunSuccess(runResults);
this.parseTransactionResponse(runResults.resp);
// The rpc saving the transaction id was successful.
// Now the wrapped function fn will be called.
}
Expand All @@ -1057,6 +1028,31 @@ class Transaction extends DatastoreRequest {
return fn();
})();
}

/*
* Some rpc calls require that the transaction has been started (i.e, has a
* valid id) before they can be sent. #withBeginTransaction acts as a wrapper
* over those functions.
*
* If the transaction has not begun yet, `#blockWithMutex` will call the
* wrapped function which will begin the transaction in the rpc call it sends.
* If the transaction has begun, the wrapped function will be called, but it
* will not begin a transaction.
*
* @param {function} [fn] A function which is run after ensuring a
* transaction has begun.
*/
#blockWithMutex(fn: () => void) {
(async () => {
if (this.state === TransactionState.NOT_STARTED) {
await this.#mutex.runExclusive(async () => {
fn();
});
} else {
fn();
}
})();
}
}

export type ModifiedEntities = Array<{
Expand Down
Loading

0 comments on commit cdd2ee9

Please sign in to comment.