-
Notifications
You must be signed in to change notification settings - Fork 103
/
Copy pathtransaction.ts
2809 lines (2645 loc) · 87.2 KB
/
transaction.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*!
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {DateStruct, PreciseDate} from '@google-cloud/precise-date';
import {promisifyAll} from '@google-cloud/promisify';
import arrify = require('arrify');
import Long = require('long');
import {EventEmitter} from 'events';
import {grpc, CallOptions, ServiceError, Status, GoogleError} from 'google-gax';
import * as is from 'is';
import {common as p} from 'protobufjs';
import {Readable, PassThrough} from 'stream';
import {codec, Json, JSONOptions, Type, Value} from './codec';
import {
PartialResultStream,
partialResultStream,
ResumeToken,
Row,
} from './partial-result-stream';
import {Session} from './session';
import {Key} from './table';
import {google as spannerClient} from '../protos/protos';
import {
NormalCallback,
CLOUD_RESOURCE_HEADER,
addLeaderAwareRoutingHeader,
} from './common';
import {google} from '../protos/protos';
import IAny = google.protobuf.IAny;
import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions;
import IRequestOptions = google.spanner.v1.IRequestOptions;
import {Database, Spanner} from '.';
import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode;
export type Rows = Array<Row | Json>;
const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo';
const RETRY_INFO_BIN = 'google.rpc.retryinfo-bin';
export interface TimestampBounds {
strong?: boolean;
minReadTimestamp?: PreciseDate | spannerClient.protobuf.ITimestamp;
maxStaleness?: number | spannerClient.protobuf.IDuration;
readTimestamp?: PreciseDate | spannerClient.protobuf.ITimestamp;
exactStaleness?: number | spannerClient.protobuf.IDuration;
returnReadTimestamp?: boolean;
}
export interface BatchWriteOptions {
requestOptions?: Pick<IRequestOptions, 'priority' | 'transactionTag'>;
gaxOptions?: CallOptions;
excludeTxnFromChangeStreams?: boolean;
}
export interface RequestOptions {
json?: boolean;
jsonOptions?: JSONOptions;
gaxOptions?: CallOptions;
maxResumeRetries?: number;
/**
* An object where column names as keys and custom objects as corresponding
* values for deserialization. This is only needed for proto columns
* where deserialization logic is on user-specific code. When provided,
* the custom object enables deserialization of backend-received column data.
* If not provided, data remains serialized as buffer for Proto Messages and
* integer for Proto Enums.
*
* @example
* To obtain Proto Messages and Proto Enums as JSON objects, you must supply
* additional metadata. This metadata should include the protobufjs-cli
* generated proto message function and enum object. It encompasses the essential
* logic for proper data deserialization.
*
* Eg: To read data from Proto Columns in json format using DQL, you should pass
* columnsMetadata where key is the name of the column and value is the protobufjs-cli
* generated proto message function and enum object.
*
* const query = {
* sql: `SELECT SingerId,
* FirstName,
* LastName,
* SingerInfo,
* SingerGenre,
* SingerInfoArray,
* SingerGenreArray
* FROM Singers
* WHERE SingerId = 6`,
* columnsMetadata: {
* SingerInfo: music.SingerInfo,
* SingerInfoArray: music.SingerInfo,
* SingerGenre: music.Genre,
* SingerGenreArray: music.Genre,
* },
* };
*/
columnsMetadata?: object;
}
export interface CommitOptions {
requestOptions?: Pick<IRequestOptions, 'priority'>;
returnCommitStats?: boolean;
maxCommitDelay?: spannerClient.protobuf.IDuration;
gaxOptions?: CallOptions;
}
export interface Statement {
sql: string;
params?: {[param: string]: Value};
types?: Type | {[param: string]: Value};
}
export interface ExecuteSqlRequest extends Statement, RequestOptions {
resumeToken?: ResumeToken;
queryMode?: spannerClient.spanner.v1.ExecuteSqlRequest.QueryMode;
partitionToken?: Uint8Array | string;
seqno?: number;
queryOptions?: IQueryOptions;
requestOptions?: Omit<IRequestOptions, 'transactionTag'>;
dataBoostEnabled?: boolean | null;
directedReadOptions?: google.spanner.v1.IDirectedReadOptions;
}
export interface KeyRange {
startClosed?: Value[];
startOpen?: Value[];
endClosed?: Value[];
endOpen?: Value[];
}
export interface ReadRequest extends RequestOptions {
table?: string;
index?: string;
columns?: string[] | null;
keys?: string[] | string[][];
ranges?: KeyRange[];
keySet?: spannerClient.spanner.v1.IKeySet | null;
limit?: number | Long | string | null;
resumeToken?: Uint8Array | null;
partitionToken?: Uint8Array | null;
requestOptions?: Omit<IRequestOptions, 'transactionTag'>;
dataBoostEnabled?: boolean | null;
directedReadOptions?: google.spanner.v1.IDirectedReadOptions;
}
export interface BatchUpdateError extends grpc.ServiceError {
rowCounts: number[];
}
export type CommitRequest = spannerClient.spanner.v1.ICommitRequest;
export type BatchUpdateResponse = [
number[],
spannerClient.spanner.v1.ExecuteBatchDmlResponse,
];
export type BeginResponse = [spannerClient.spanner.v1.ITransaction];
export type BeginTransactionCallback =
NormalCallback<spannerClient.spanner.v1.ITransaction>;
export type CommitResponse = [spannerClient.spanner.v1.ICommitResponse];
export type ReadResponse = [Rows];
export type RunResponse = [
Rows,
spannerClient.spanner.v1.ResultSetStats,
spannerClient.spanner.v1.ResultSetMetadata,
];
export type RunUpdateResponse = [number];
export interface BatchUpdateOptions {
requestOptions?: Omit<IRequestOptions, 'transactionTag'>;
gaxOptions?: CallOptions;
}
export interface BatchUpdateCallback {
(
err: null | BatchUpdateError,
rowCounts: number[],
response?: spannerClient.spanner.v1.ExecuteBatchDmlResponse
): void;
}
export interface BatchUpdateOptions {
requestOptions?: Omit<IRequestOptions, 'transactionTag'>;
gaxOptions?: CallOptions;
}
export type ReadCallback = NormalCallback<Rows>;
export interface RunCallback {
(
err: null | grpc.ServiceError,
rows: Rows,
stats: spannerClient.spanner.v1.ResultSetStats,
metadata?: spannerClient.spanner.v1.ResultSetMetadata
): void;
}
export interface RunUpdateCallback {
(err: null | grpc.ServiceError, rowCount: number): void;
}
export type CommitCallback =
NormalCallback<spannerClient.spanner.v1.ICommitResponse>;
/**
* @typedef {object} TimestampBounds
* @property {boolean} [strong=true] Read at a timestamp where all previously
* committed transactions are visible.
* @property {external:PreciseDate|google.protobuf.Timestamp} [minReadTimestamp]
* Executes all reads at a `timestamp >= minReadTimestamp`.
* @property {number|google.protobuf.Timestamp} [maxStaleness] Read data at a
* `timestamp >= NOW - maxStaleness` (milliseconds).
* @property {external:PreciseDate|google.protobuf.Timestamp} [readTimestamp]
* Executes all reads at the given timestamp.
* @property {number|google.protobuf.Timestamp} [exactStaleness] Executes all
* reads at a timestamp that is `exactStaleness` (milliseconds) old.
* @property {boolean} [returnReadTimestamp=true] When true,
* {@link Snapshot#readTimestamp} will be populated after
* {@link Snapshot#begin} is called.
*/
/**
* This transaction type provides guaranteed consistency across several reads,
* but does not allow writes. Snapshot read-only transactions can be configured
* to read at timestamps in the past.
*
* When finished with the Snapshot, call {@link Snapshot#end} to
* release the underlying {@link Session}. Failure to do so can result in a
* Session leak.
*
* **This object is created and returned from {@link Database#getSnapshot}.**
*
* @class
* @hideconstructor
*
* @see [Timestamp Bounds API Documentation](https://cloud.google.com/spanner/docs/timestamp-bounds)
*
* @example
* ```
* const {Spanner} = require('@google-cloud/spanner');
* const spanner = new Spanner();
*
* const instance = spanner.instance('my-instance');
* const database = instance.database('my-database');
*
* const timestampBounds = {
* strong: true
* };
*
* database.getSnapshot(timestampBounds, (err, transaction) => {
* if (err) {
* // Error handling omitted.
* }
*
* // It should be called when the snapshot finishes.
* transaction.end();
* });
* ```
*/
export class Snapshot extends EventEmitter {
protected _options!: spannerClient.spanner.v1.ITransactionOptions;
protected _seqno = 1;
protected _waitingRequests: Array<() => void>;
protected _inlineBeginStarted;
protected _useInRunner = false;
id?: Uint8Array | string;
ended: boolean;
metadata?: spannerClient.spanner.v1.ITransaction;
readTimestamp?: PreciseDate;
readTimestampProto?: spannerClient.protobuf.ITimestamp;
request: (config: {}, callback: Function) => void;
requestStream: (config: {}) => Readable;
session: Session;
queryOptions?: IQueryOptions;
resourceHeader_: {[k: string]: string};
requestOptions?: Pick<IRequestOptions, 'transactionTag'>;
/**
* The transaction ID.
*
* @name Snapshot#id
* @type {?(string|Buffer)}
*/
/**
* Whether or not the transaction has ended. If true, make no further
* requests, and discard the transaction.
*
* @name Snapshot#ended
* @type {boolean}
*/
/**
* The raw transaction response object. It is populated after
* {@link Snapshot#begin} is called.
*
* @name Snapshot#metadata
* @type {?TransactionResponse}
*/
/**
* **Snapshot only**
* The timestamp at which all reads are performed.
*
* @name Snapshot#readTimestamp
* @type {?external:PreciseDate}
*/
/**
* **Snapshot only**
* The protobuf version of {@link Snapshot#readTimestamp}. This is useful if
* you require microsecond precision.
*
* @name Snapshot#readTimestampProto
* @type {?google.protobuf.Timestamp}
*/
/**
* @constructor
*
* @param {Session} session The parent Session object.
* @param {TimestampBounds} [options] Snapshot timestamp bounds.
* @param {QueryOptions} [queryOptions] Default query options to use when none
* are specified for a query.
*/
constructor(
session: Session,
options?: TimestampBounds,
queryOptions?: IQueryOptions
) {
super();
this.ended = false;
this.session = session;
this.queryOptions = Object.assign({}, queryOptions);
this.request = session.request.bind(session);
this.requestStream = session.requestStream.bind(session);
const readOnly = Snapshot.encodeTimestampBounds(options || {});
this._options = {readOnly};
this.resourceHeader_ = {
[CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_,
};
this._waitingRequests = [];
this._inlineBeginStarted = false;
}
/**
* @typedef {object} TransactionResponse
* @property {string|Buffer} id The transaction ID.
* @property {?google.protobuf.Timestamp} readTimestamp For snapshot read-only
* transactions, the read timestamp chosen for the transaction.
*/
/**
* @typedef {array} TransactionBeginResponse
* @property {TransactionResponse} 0 The raw transaction object.
*/
/**
* @callback TransactionBeginCallback
* @param {?Error} err Request error, if any.
* @param {TransactionResponse} apiResponse The raw transaction object.
*/
/**
* Begin a new transaction. Typically, you need not call this unless
* manually creating transactions via {@link Session} objects.
*
* @see [BeginTransaction API Documentation](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.Spanner.BeginTransaction)
*
* @param {object} [gaxOptions] Request configuration options,
* See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions}
* for more details.
* @param {TransactionBeginCallback} [callback] Callback function.
* @returns {Promise<TransactionBeginResponse>}
*
* @example
* ```
* transaction.begin(function(err) {
* if (!err) {
* // transaction began successfully.
* }
* });
*
* ```
* @example If the callback is omitted, the function returns a Promise
* ```
* transaction.begin()
* .then(function(data) {
* const apiResponse = data[0];
* });
* ```
*/
begin(gaxOptions?: CallOptions): Promise<BeginResponse>;
begin(callback: BeginTransactionCallback): void;
begin(gaxOptions: CallOptions, callback: BeginTransactionCallback): void;
begin(
gaxOptionsOrCallback?: CallOptions | BeginTransactionCallback,
cb?: BeginTransactionCallback
): void | Promise<BeginResponse> {
const gaxOpts =
typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {};
const callback =
typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!;
const session = this.session.formattedName_!;
const options = this._options;
const reqOpts: spannerClient.spanner.v1.IBeginTransactionRequest = {
session,
options,
};
// Only hand crafted read-write transactions will be able to set a
// transaction tag for the BeginTransaction RPC. Also, this.requestOptions
// is only set in the constructor of Transaction, which is the constructor
// for read/write transactions.
if (this.requestOptions) {
reqOpts.requestOptions = this.requestOptions;
}
const headers = this.resourceHeader_;
if (
this._getSpanner().routeToLeaderEnabled &&
(this._options.readWrite !== undefined ||
this._options.partitionedDml !== undefined)
) {
addLeaderAwareRoutingHeader(headers);
}
this.request(
{
client: 'SpannerClient',
method: 'beginTransaction',
reqOpts,
gaxOpts,
headers: headers,
},
(
err: null | grpc.ServiceError,
resp: spannerClient.spanner.v1.ITransaction
) => {
if (err) {
callback!(err, resp);
return;
}
this._update(resp);
callback!(null, resp);
}
);
}
/**
* A KeyRange represents a range of rows in a table or index.
*
* A range has a start key and an end key. These keys can be open or closed,
* indicating if the range includes rows with that key.
*
* Keys are represented by an array of strings where the nth value in the list
* corresponds to the nth component of the table or index primary key.
*
* @typedef {object} KeyRange
* @property {string[]} [startClosed] If the start is closed, then the range
* includes all rows whose first key columns exactly match.
* @property {string[]} [startOpen] If the start is open, then the range
* excludes rows whose first key columns exactly match.
* @property {string[]} [endClosed] If the end is closed, then the range
* includes all rows whose first key columns exactly match.
* @property {string[]} [endOpen] If the end is open, then the range excludes
* rows whose first key columns exactly match.
*/
/**
* Read request options. This includes all standard ReadRequest options as
* well as several convenience properties.
*
* @see [StreamingRead API Documentation](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.Spanner.StreamingRead)
* @see [ReadRequest API Documentation](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.ReadRequest)
*
* @typedef {object} ReadRequest
* @property {string} table The name of the table in the database to be read.
* @property {string[]} columns The columns of the table to be returned for each
* row matching this query.
* @property {string[]|string[][]} keys The primary or index keys of the rows in this table to be
* yielded. If using a composite key, provide an array within this array.
* See the example below.
* @property {KeyRange[]} [ranges] An alternative to the keys property; this can
* be used to define a range of keys to be yielded.
* @property {string} [index] The name of an index on the table if a
* different index than the primary key should be used to determine which rows to return.
* @property {boolean} [json=false] Receive the rows as serialized objects. This
* is the equivalent of calling `toJSON()` on each row.
* @property {JSONOptions} [jsonOptions] Configuration options for the serialized
* objects.
* @property {object} [keySet] Defines a collection of keys and/or key ranges to
* read.
* @property {number} [limit] The number of rows to yield.
* @property {Buffer} [partitionToken]
* If present, results will be restricted to the specified partition
* previously created using PartitionRead(). There must be an exact
* match for the values of fields common to this message and the
* PartitionReadRequest message used to create this partition_token.
* @property {google.spanner.v1.RequestOptions} [requestOptions]
* Common options for this request.
* @property {google.spanner.v1.IDirectedReadOptions} [directedReadOptions]
* Indicates which replicas or regions should be used for non-transactional reads or queries.
* @property {object} [gaxOptions]
* Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions}
* for more details.
*/
/**
* Create a readable object stream to receive rows from the database using key
* lookups and scans.
*
* Wrapper around {@link v1.SpannerClient#streamingRead}.
*
* @see {@link v1.SpannerClient#streamingRead}
* @see [StreamingRead API Documentation](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.Spanner.StreamingRead)
* @see [ReadRequest API Documentation](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.ReadRequest)
*
* @fires PartialResultStream#response
* @fires PartialResultStream#stats
*
* @param {string} table The table to read from.
* @param {ReadRequest} query Configuration object. See official
* [`ReadRequest`](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.ReadRequest).
* API documentation.
* @returns {ReadableStream} A readable stream that emits rows.
*
* @example
* ```
* transaction.createReadStream('Singers', {
* keys: ['1'],
* columns: ['SingerId', 'name']
* })
* .on('error', function(err) {})
* .on('data', function(row) {
* // row = [
* // {
* // name: 'SingerId',
* // value: '1'
* // },
* // {
* // name: 'Name',
* // value: 'Eddie Wilson'
* // }
* // ]
* })
* .on('end', function() {
* // All results retrieved.
* });
*
* ```
* @example Provide an array for `query.keys` to read with a
* composite key.
* ```
* const query = {
* keys: [
* [
* 'Id1',
* 'Name1'
* ],
* [
* 'Id2',
* 'Name2'
* ]
* ],
* // ...
* };
* ```
*
* @example Rows are returned as an array of object arrays. Each
* object has a `name` and `value` property. To get a serialized object, call
* `toJSON()`.
* ```
* transaction.createReadStream('Singers', {
* keys: ['1'],
* columns: ['SingerId', 'name']
* })
* .on('error', function(err) {})
* .on('data', function(row) {
* // row.toJSON() = {
* // SingerId: '1',
* // Name: 'Eddie Wilson'
* // }
* })
* .on('end', function() {
* // All results retrieved.
* });
* ```
*
* @example Alternatively, set `query.json` to `true`, and this step
* will perform automatically.
* ```
* transaction.createReadStream('Singers', {
* keys: ['1'],
* columns: ['SingerId', 'name'],
* json: true,
* })
* .on('error', function(err) {})
* .on('data', function(row) {
* // row = {
* // SingerId: '1',
* // Name: 'Eddie Wilson'
* // }
* })
* .on('end', function() {
* // All results retrieved.
* });
* ```
*
* @example If you anticipate many results, you can end a stream
* early to prevent unnecessary processing and API requests.
* ```
* transaction.createReadStream('Singers', {
* keys: ['1'],
* columns: ['SingerId', 'name']
* })
* .on('data', function(row) {
* this.end();
* });
* ```
*/
createReadStream(
table: string,
request = {} as ReadRequest
): PartialResultStream {
const {
gaxOptions,
json,
jsonOptions,
maxResumeRetries,
requestOptions,
columnsMetadata,
} = request;
const keySet = Snapshot.encodeKeySet(request);
const transaction: spannerClient.spanner.v1.ITransactionSelector = {};
if (this.id) {
transaction.id = this.id as Uint8Array;
} else if (this._options.readWrite) {
transaction.begin = this._options;
} else {
transaction.singleUse = this._options;
}
const directedReadOptions = this._getDirectedReadOptions(
request.directedReadOptions
);
request = Object.assign({}, request);
delete request.gaxOptions;
delete request.json;
delete request.jsonOptions;
delete request.maxResumeRetries;
delete request.keys;
delete request.ranges;
delete request.requestOptions;
delete request.directedReadOptions;
delete request.columnsMetadata;
const reqOpts: spannerClient.spanner.v1.IReadRequest = Object.assign(
request,
{
session: this.session.formattedName_!,
requestOptions: this.configureTagOptions(
typeof transaction.singleUse !== 'undefined',
this.requestOptions?.transactionTag ?? undefined,
requestOptions
),
directedReadOptions: directedReadOptions,
transaction,
table,
keySet,
}
);
const headers = this.resourceHeader_;
if (
this._getSpanner().routeToLeaderEnabled &&
(this._options.readWrite !== undefined ||
this._options.partitionedDml !== undefined)
) {
addLeaderAwareRoutingHeader(headers);
}
const makeRequest = (resumeToken?: ResumeToken): Readable => {
if (this.id && transaction.begin) {
delete transaction.begin;
transaction.id = this.id;
}
return this.requestStream({
client: 'SpannerClient',
method: 'streamingRead',
reqOpts: Object.assign({}, reqOpts, {resumeToken}),
gaxOpts: gaxOptions,
headers: headers,
});
};
return partialResultStream(this._wrapWithIdWaiter(makeRequest), {
json,
jsonOptions,
maxResumeRetries,
columnsMetadata,
gaxOptions,
})
?.on('response', response => {
if (response.metadata && response.metadata!.transaction && !this.id) {
this._update(response.metadata!.transaction);
}
})
.on('error', err => {
const isServiceError = err && typeof err === 'object' && 'code' in err;
if (
!this.id &&
this._useInRunner &&
!(
isServiceError &&
(err as grpc.ServiceError).code === grpc.status.ABORTED
)
) {
this.begin();
}
});
}
/**
* Let the client know you're done with a particular transaction. This should
* mainly be called for {@link Snapshot} objects, however in certain cases
* you may want to call them for {@link Transaction} objects as well.
*
* @example Calling `end` on a read only snapshot
* ```
* database.getSnapshot((err, transaction) => {
* if (err) {
* // Error handling omitted.
* }
*
* transaction.run('SELECT * FROM Singers', (err, rows) => {
* if (err) {
* // Error handling omitted.
* }
*
* // End the snapshot.
* transaction.end();
* });
* });
* ```
*
* @example Calling `end` on a read/write transaction
* ```
* database.runTransaction((err, transaction) => {
* if (err) {
* // Error handling omitted.
* }
*
* const query = 'UPDATE Account SET Balance = 1000 WHERE Key = 1';
*
* transaction.runUpdate(query, err => {
* if (err) {
* // In the event of an error, there would be nothing to rollback,
* so
* // instead of continuing, discard the
* transaction. transaction.end(); return;
* }
*
* transaction.commit(err => {});
* });
* });
* ```
*/
end(): void {
if (this.ended) {
return;
}
this.ended = true;
process.nextTick(() => this.emit('end'));
}
/**
* @typedef {array} ReadResponse
* @property {array[]} 0 Rows are returned as an array of object arrays. Each
* object has a `name` and `value` property. To get a serialized object,
* call `toJSON()`. Optionally, provide an options object to `toJSON()`
* specifying `wrapNumbers: true` to protect large integer values outside
* of the range of JavaScript Number. If set, FLOAT64 values are returned
* as {@link Spanner.Float} objects and INT64 values as {@link
* Spanner.Int}.
*/
/**
* @callback ReadCallback
* @param {?Error} err Request error, if any.
* @param {array[]} rows Rows are returned as an array of object arrays. Each
* object has a `name` and `value` property. To get a serialized object,
* call `toJSON()`. Optionally, provide an options object to `toJSON()`
* specifying `wrapNumbers: true` to protect large integer values outside
* of the range of JavaScript Number. If set, FLOAT64 values are returned
* as {@link Spanner.Float} objects and INT64 values as {@link
* Spanner.Int}.
*/
/**
* Performs a read request against the specified Table.
*
* Wrapper around {@link v1.SpannerClient#read}.
*
* @see {@link v1.SpannerClient#read}
*
* @param {string} table The table to read from.
* @param {ReadRequest} query Configuration object. See official
* [`ReadRequest`](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.ReadRequest).
* API documentation.
* @param {ReadCallback} [callback] Callback function.
* @returns {Promise<ReadResponse>}
*
* @example
* ```
* const query = {
* keys: ['1'],
* columns: ['SingerId', 'name']
* };
*
* transaction.read('Singers', query, function(err, rows) {
* if (err) {
* // Error handling omitted.
* }
*
* const firstRow = rows[0];
*
* // firstRow = [
* // {
* // name: 'SingerId',
* // value: '1'
* // },
* // {
* // name: 'Name',
* // value: 'Eddie Wilson'
* // }
* // ]
* });
*
* ```
* @example Provide an array for `query.keys` to read with a
* composite key.
* ```
* const query = {
* keys: [
* [
* 'Id1',
* 'Name1'
* ],
* [
* 'Id2',
* 'Name2'
* ]
* ],
* // ...
* };
* ```
*
* @example Rows are returned as an array of object arrays. Each
* object has a `name` and `value` property. To get a serialized object, call
* `toJSON()`.
* ```
* transaction.read('Singers', query, function(err, rows) {
* if (err) {
* // Error handling omitted.
* }
*
* const firstRow = rows[0];
*
* // firstRow.toJSON() = {
* // SingerId: '1',
* // Name: 'Eddie Wilson'
* // }
* });
* ```
*
* @example Alternatively, set `query.json` to `true`, and this step
* will perform automatically.
* ```
* query.json = true;
*
* transaction.read('Singers', query, function(err, rows) {
* if (err) {
* // Error handling omitted.
* }
*
* const firstRow = rows[0];
*
* // firstRow = {
* // SingerId: '1',
* // Name: 'Eddie Wilson'
* // }
* });
* ```
*/
read(table: string, request: ReadRequest): Promise<ReadResponse>;
read(table: string, callback: ReadCallback): void;
read(table: string, request: ReadRequest, callback: ReadCallback): void;
read(
table: string,
requestOrCallback: ReadRequest | ReadCallback,
cb?: ReadCallback
): void | Promise<ReadResponse> {
const rows: Rows = [];
let request: ReadRequest;
let callback: ReadCallback;
if (typeof requestOrCallback === 'function') {
request = {} as RequestOptions;
callback = requestOrCallback as ReadCallback;
} else {
request = requestOrCallback as RequestOptions;
callback = cb as ReadCallback;
}
this.createReadStream(table, request)
.on('error', callback!)
.on('data', row => rows.push(row))
.on('end', () => callback!(null, rows));
}
/**
* Execute a SQL statement on this database inside of a transaction.
*
* **Performance Considerations:**
*
* This method wraps the streaming method,
* {@link Snapshot#run} for your convenience. All rows are stored in memory
* before releasing to your callback. If you intend to receive a lot of
* results from your query, consider using the streaming method,
* so you can free each result from memory after consuming it.
*
* Wrapper around {@link v1.SpannerClient#executeStreamingSql}.
*
* @see {@link v1.SpannerClient#executeStreamingSql}
* @see [ExecuteStreamingSql API Documentation](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.Spanner.ExecuteStreamingSql)
* @see [ExecuteSqlRequest API Documentation](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest)
*
* @param {string|ExecuteSqlRequest} query A SQL query or
* {@link ExecuteSqlRequest} object.
* @param {RunCallback} [callback] Callback function.
* @returns {Promise<RunResponse>}
*
* @example
* ```
* transaction.run(query, function(err, rows) {
* if (err) {
* // Error handling omitted.
* }
*
* // rows = [
* // {
* // SingerId: '1',
* // Name: 'Eddie Wilson'
* // }
* // ]
* });
*
* ```
* @example The SQL query string can contain parameter placeholders.
* A parameter placeholder consists of '@' followed by the parameter name.
* ```
* const query = {
* sql: 'SELECT * FROM Singers WHERE name = @name',
* params: {
* name: 'Eddie Wilson'
* }
* };
*
* transaction.run(query, function(err, rows) {
* if (err) {
* // Error handling omitted.
* }
* });
* ```
*
* @example If you need to enforce a specific param type, a types map
* can be provided. This is typically useful if your param value can be null.
* ```
* const query = {
* sql: 'SELECT * FROM Singers WHERE name = @name AND id = @id',
* params: {
* id: spanner.int(8),
* name: null
* },
* types: {
* id: 'int64',
* name: 'string'
* }
* };
*
* transaction.run(query, function(err, rows) {
* if (err) {
* // Error handling omitted.