Skip to content

Commit

Permalink
feat: add support for blind writes
Browse files Browse the repository at this point in the history
chore(main): release 7.9.0 (googleapis#2053)

:robot: I have created a release *beep* *boop*
---

* **spanner:** Add support for batchWrite ([googleapis#2054](https://togithub.com/googleapis/nodejs-spanner/issues/2054)) ([06aab6e](https://togithub.com/googleapis/nodejs-spanner/commit/06aab6e39bbce9e3786f1ac631c80e8909197e92))

* **deps:** Update dependency google-gax to v4.3.4 ([googleapis#2051](https://togithub.com/googleapis/nodejs-spanner/issues/2051)) ([80abf06](https://togithub.com/googleapis/nodejs-spanner/commit/80abf06ba8ef9497318ffc597b83fb63e4408f9c))
* **deps:** Update dependency google-gax to v4.3.5 ([googleapis#2055](https://togithub.com/googleapis/nodejs-spanner/issues/2055)) ([702c9b0](https://togithub.com/googleapis/nodejs-spanner/commit/702c9b0f34e6cc34233c5aa52b97601b19f70980))
* **deps:** Update dependency google-gax to v4.3.6 ([googleapis#2057](https://togithub.com/googleapis/nodejs-spanner/issues/2057)) ([74ebf1e](https://togithub.com/googleapis/nodejs-spanner/commit/74ebf1e45cddf614c180295f3a761a8f84c5cb32))
* **deps:** Update dependency google-gax to v4.3.7 ([googleapis#2068](https://togithub.com/googleapis/nodejs-spanner/issues/2068)) ([28fec6c](https://togithub.com/googleapis/nodejs-spanner/commit/28fec6ca505d78d725efc123950be978e0c84ab7))

---
This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).

refactor: blind write method

fix: lint errors

fix: Retry with timeout (googleapis#2071)

Use `gaxOptions.timeout` during retry in streaming calls. Earlier the timeout value was only used for a single RPC not for the whole operation including retries. Now if RPC returns `Unavailable` error and the timeout value has been reached, library will throw an Deadline exceeded error.

```
const query = {
        sql: 'Select 1',
        gaxOptions: {timeout: 500}
    }
const [rows] = await database.run(query);
```

chore(main): release 7.9.1 (googleapis#2072)

:robot: I have created a release *beep* *boop*
---

* Retry with timeout ([googleapis#2071](https://togithub.com/googleapis/nodejs-spanner/issues/2071)) ([a943257](https://togithub.com/googleapis/nodejs-spanner/commit/a943257a0402b26fd80196057a9724fd28fc5c1b))

---
This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).

refactor: blind write method

test: unit test for blind write

test: unit test for blind write

refactor

fix: lint errors

feat: add support for change streams transaction exclusion option for Batch Write (googleapis#2070)

* feat: change stream transaction exclusion option for Batch Write

* refactor

docs: add doc to blindWrite method

docs: add doc to the setQueuedMutations

refactor: doc setQueuedMutations

fix: presubmit error

fix(deps): update dependency google-gax to v4.3.8 (googleapis#2077)

[![Mend Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com)

This PR contains the following updates:

| Package | Change | Age | Adoption | Passing | Confidence |
|---|---|---|---|---|---|
| [google-gax](https://togithub.com/googleapis/gax-nodejs) ([source](https://togithub.com/googleapis/gax-nodejs/tree/HEAD/gax)) | [`4.3.7` -> `4.3.8`](https://renovatebot.com/diffs/npm/google-gax/4.3.7/4.3.8) | [![age](https://developer.mend.io/api/mc/badges/age/npm/google-gax/4.3.8?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://developer.mend.io/api/mc/badges/adoption/npm/google-gax/4.3.8?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://developer.mend.io/api/mc/badges/compatibility/npm/google-gax/4.3.7/4.3.8?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://developer.mend.io/api/mc/badges/confidence/npm/google-gax/4.3.7/4.3.8?slim=true)](https://docs.renovatebot.com/merge-confidence/) |

---

<details>
<summary>googleapis/gax-nodejs (google-gax)</summary>

[Compare Source](https://togithub.com/googleapis/gax-nodejs/compare/google-gax-v4.3.7...google-gax-v4.3.8)

-   **deps:** remove rimraf in favor of native node rm function ([#&#8203;1626](https://togithub.com/googleapis/gax-nodejs/issues/1626)) ([dd87646](https://togithub.com/googleapis/gax-nodejs/commit/dd87646618d5026549920e224df7f85cbb5ff6a8))

</details>

---

📅 **Schedule**: Branch creation - "after 9am and before 3pm" (UTC), Automerge - At any time (no schedule defined).

🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied.

♻ **Rebasing**: Whenever PR is behind base branch, or you tick the rebase/retry checkbox.

🔕 **Ignore**: Close this PR and you won't be reminded about this update again.

---

 - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check this box

---

This PR has been generated by [Mend Renovate](https://www.mend.io/free-developer-tools/renovate/). View repository job log [here](https://developer.mend.io/github/googleapis/nodejs-spanner).
<!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzNy40MjUuMSIsInVwZGF0ZWRJblZlciI6IjM3LjQyNS4xIiwidGFyZ2V0QnJhbmNoIjoibWFpbiIsImxhYmVscyI6W119-->

updated

updated

lint

refactor
  • Loading branch information
release-please[bot] authored and alkatrivedi committed Jul 18, 2024
1 parent c154d12 commit 74a0188
Show file tree
Hide file tree
Showing 10 changed files with 391 additions and 134 deletions.
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,28 @@

[1]: https://www.npmjs.com/package/nodejs-spanner?activeTab=versions

## [7.9.1](https://github.com/googleapis/nodejs-spanner/compare/v7.9.0...v7.9.1) (2024-06-26)


### Bug Fixes

* Retry with timeout ([#2071](https://github.com/googleapis/nodejs-spanner/issues/2071)) ([a943257](https://github.com/googleapis/nodejs-spanner/commit/a943257a0402b26fd80196057a9724fd28fc5c1b))

## [7.9.0](https://github.com/googleapis/nodejs-spanner/compare/v7.8.0...v7.9.0) (2024-06-21)


### Features

* **spanner:** Add support for batchWrite ([#2054](https://github.com/googleapis/nodejs-spanner/issues/2054)) ([06aab6e](https://github.com/googleapis/nodejs-spanner/commit/06aab6e39bbce9e3786f1ac631c80e8909197e92))


### Bug Fixes

* **deps:** Update dependency google-gax to v4.3.4 ([#2051](https://github.com/googleapis/nodejs-spanner/issues/2051)) ([80abf06](https://github.com/googleapis/nodejs-spanner/commit/80abf06ba8ef9497318ffc597b83fb63e4408f9c))
* **deps:** Update dependency google-gax to v4.3.5 ([#2055](https://github.com/googleapis/nodejs-spanner/issues/2055)) ([702c9b0](https://github.com/googleapis/nodejs-spanner/commit/702c9b0f34e6cc34233c5aa52b97601b19f70980))
* **deps:** Update dependency google-gax to v4.3.6 ([#2057](https://github.com/googleapis/nodejs-spanner/issues/2057)) ([74ebf1e](https://github.com/googleapis/nodejs-spanner/commit/74ebf1e45cddf614c180295f3a761a8f84c5cb32))
* **deps:** Update dependency google-gax to v4.3.7 ([#2068](https://github.com/googleapis/nodejs-spanner/issues/2068)) ([28fec6c](https://github.com/googleapis/nodejs-spanner/commit/28fec6ca505d78d725efc123950be978e0c84ab7))

## [7.8.0](https://github.com/googleapis/nodejs-spanner/compare/v7.7.0...v7.8.0) (2024-05-24)


Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@google-cloud/spanner",
"description": "Cloud Spanner Client Library for Node.js",
"version": "7.8.0",
"version": "7.9.1",
"license": "Apache-2.0",
"author": "Google Inc.",
"engines": {
Expand Down Expand Up @@ -66,7 +66,7 @@
"events-intercept": "^2.0.0",
"extend": "^3.0.2",
"google-auth-library": "^9.0.0",
"google-gax": "4.3.7",
"google-gax": "4.3.8",
"grpc-gcp": "^1.0.0",
"is": "^3.2.1",
"lodash.snakecase": "^4.1.1",
Expand Down
47 changes: 0 additions & 47 deletions samples/blind-writes.js

This file was deleted.

2 changes: 1 addition & 1 deletion samples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"dependencies": {
"@google-cloud/kms": "^4.0.0",
"@google-cloud/precise-date": "^4.0.0",
"@google-cloud/spanner": "^7.8.0",
"@google-cloud/spanner": "^7.9.1",
"yargs": "^17.0.0",
"protobufjs": "^7.0.0"
},
Expand Down
117 changes: 85 additions & 32 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ import {
import {CreateTableCallback, CreateTableResponse, Table} from './table';
import {
BatchWriteOptions,
CommitCallback,
CommitResponse,
ExecuteSqlRequest,
MutationGroup,
Mutations,
Mutation,
RunCallback,
RunResponse,
RunUpdateCallback,
Expand Down Expand Up @@ -3291,6 +3292,7 @@ class Database extends common.GrpcServiceObject {
session: session!.formattedName_!,
mutationGroups: mutationGroups.map(mg => mg.proto()),
requestOptions: options?.requestOptions,
excludeTxnFromChangeStream: options?.excludeTxnFromChangeStreams,
}
);
let dataReceived = false;
Expand Down Expand Up @@ -3331,39 +3333,89 @@ class Database extends common.GrpcServiceObject {
return proxyStream as NodeJS.ReadableStream;
}

blindWrite(
mutations: Mutations,
options?: CallOptions
): Promise<CommitResponse>;
blindWrite(
mutations: Mutations,
options?: CallOptions
/**
* Apply Blind Write of the Mutations
*
* writeAtLeastOnce(Blind Write) requests are not replay protected, meaning that it may apply mutations more
* than once, if the mutations are not idempotent, this may lead to a failure being
* reported when the mutation was applied once. Replays of non-idempotent mutations may
* have undesirable effects. For example, replays of an insert mutation may produce an
* already exists error. For this reason, most users of the library will prefer to use
* {@link runTransaction} instead.
*
* However, {@link writeAtLeastOnce()} requires only a single RPC, whereas {@link runTransaction()}
* requires two RPCs (one of which may be performed in advance), and so this method may be
* appropriate for latency sensitive and/or high throughput blind writing.
*
* We recommend structuring your mutation groups to be idempotent to avoid this issue.
*
* @param {Mutation} [mutation] Mutations to be applied.
* @param {CallOptions} [options] Options object for blind write request.
* @param {CommitCallback} [callback] Callback function for blind write request.
*
* @returns {Promise}
*
* @example
* ```
* const {Spanner} = require('@google-cloud/spanner');
* const spanner = new Spanner();
*
* const instance = spanner.instance('my-instance');
* const database = instance.database('my-database');
* const mutation = new Mutation();
* mutation.insert('Singers', {
* SingerId: '1',
* FirstName: 'Marc',
* LastName: 'Richards',
* });
* mutation.update('Singers', {
* SingerId: '1',
* FirstName: 'John',
* LastName: 'Richards',
* });
*
* try {
* const [response, err] = await database.writeAtLeastOnce(mutation, {});
* console.log(response.commitTimestamp);
* } catch(err) {
* console.log("Error: ", err);
* }
* ```
*/
writeAtLeastOnce(mutation: Mutation): Promise<CommitResponse>;
writeAtLeastOnce(
mutation: Mutation,
options: CallOptions
): Promise<CommitResponse>;
async blindWrite(
mutations: Mutations,
options?: CallOptions
): Promise<CommitResponse> {
while (true) {
try {
const getSession = this.pool_.getSession.bind(this.pool_);
const [session, transaction] = await promisify(getSession)();
transaction._queuedMutations = mutations.proto();
try {
return transaction!.commit();
} finally {
this.pool_.release(session);
}
} catch (e) {
if (!isSessionNotFoundError(e as ServiceError)) {
throw e;
}
writeAtLeastOnce(mutation: Mutation, callback: CommitCallback): void;
writeAtLeastOnce(
mutation: Mutation,
optionsOrCallback?: CallOptions | CommitCallback,
callback?: CommitCallback
): void | Promise<CommitResponse> {
const cb =
typeof optionsOrCallback === 'function'
? (optionsOrCallback as CommitCallback)
: callback;
const options =
typeof optionsOrCallback === 'object' && optionsOrCallback
? (optionsOrCallback as CallOptions)
: {};
this.pool_.getSession((err, session?, transaction?) => {
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {
cb
? this.writeAtLeastOnce(mutation, cb)
: this.writeAtLeastOnce(mutation, options);
return;
}
}
// let promise = await this.getTransaction();
// let transaction = promise[0];
// transaction._queuedMutations = mutations.proto();
// transaction.useInRunner();
// return transaction!.commit(options);
if (err) {
cb!(err as grpc.ServiceError);
return;
}
this._releaseOnEnd(session!, transaction!);
transaction?.setQueuedMutations(mutation.proto());
return transaction?.commit(options, cb!);
});
}

/**
Expand Down Expand Up @@ -3694,6 +3746,7 @@ callbackifyAll(Database, {
'batchCreateSessions',
'batchTransaction',
'batchWriteAtLeastOnce',
'writeAtLeastOnce',
'close',
'createBatchTransaction',
'createSession',
Expand Down
18 changes: 16 additions & 2 deletions src/partial-result-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import mergeStream = require('merge-stream');
import {common as p} from 'protobufjs';
import {Readable, Transform} from 'stream';
import * as streamEvents from 'stream-events';
import {grpc} from 'google-gax';
import {isRetryableInternalError} from './transaction-runner';
import {grpc, CallOptions} from 'google-gax';
import {DeadlineError, isRetryableInternalError} from './transaction-runner';

import {codec, JSONOptions, Json, Field, Value} from './codec';
import {google} from '../protos/protos';
Expand Down Expand Up @@ -96,6 +96,7 @@ export interface RowOptions {
* };
*/
columnsMetadata?: object;
gaxOptions?: CallOptions;
}

/**
Expand Down Expand Up @@ -491,6 +492,8 @@ export function partialResultStream(
const maxQueued = 10;
let lastResumeToken: ResumeToken;
let lastRequestStream: Readable;
const startTime = Date.now();
const timeout = options?.gaxOptions?.timeout ?? Infinity;

// mergeStream allows multiple streams to be connected into one. This is good;
// if we need to retry a request and pipe more data to the user's stream.
Expand Down Expand Up @@ -541,6 +544,17 @@ export function partialResultStream(
};

const retry = (err: grpc.ServiceError): void => {
const elapsed = Date.now() - startTime;
if (elapsed >= timeout) {
// The timeout has reached so this will flush any rows the
// checkpoint stream has queued. After that, we will destroy the
// user's stream with the Deadline exceeded error.
setImmediate(() =>
batchAndSplitOnTokenStream.destroy(new DeadlineError(err))
);
return;
}

if (
!(
err.code &&
Expand Down
Loading

0 comments on commit 74a0188

Please sign in to comment.