Skip to content

Commit d9ae990

Browse files
committed
feat: add sessionTrailerCallbackSymbol and support for trailer callbacks in QueryClient
1 parent b3e1c01 commit d9ae990

File tree

4 files changed

+65
-46
lines changed

4 files changed

+65
-46
lines changed

src/query/query-client.ts

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,34 @@
11
import EventEmitter from "events";
2-
import {QuerySessionPool, SessionCallback, SessionEvent} from "./query-session-pool";
3-
import {Ydb} from "ydb-sdk-proto";
4-
import {AUTO_TX} from "../table";
2+
import { Metadata } from "@grpc/grpc-js";
3+
import { Ydb } from "ydb-sdk-proto";
4+
import { AUTO_TX } from "../table";
5+
import { QuerySessionPool, SessionCallback, SessionEvent } from "./query-session-pool";
56
import {
67
sessionTxSettingsSymbol,
78
sessionTxIdSymbol,
89
sessionRollbackTransactionSymbol,
910
sessionCommitTransactionSymbol,
1011
sessionCurrentOperationSymbol,
11-
sessionReleaseSymbol, isIdempotentSymbol, isIdempotentDoLevelSymbol, ctxSymbol
12+
sessionReleaseSymbol, isIdempotentSymbol, isIdempotentDoLevelSymbol, ctxSymbol,
13+
sessionTrailerCallbackSymbol
1214
} from "./symbols";
13-
import {YdbError} from "../errors";
14-
import {Context} from "../context";
15-
import {ensureContext} from "../context";
16-
import {Logger} from "../logger/simple-logger";
17-
import {RetryStrategy} from "../retries/retryStrategy";
18-
import {RetryPolicySymbol} from "../retries/symbols";
19-
import {IClientSettings} from "../client/settings";
15+
import { YdbError } from "../errors";
16+
import { Context } from "../context";
17+
import { ensureContext } from "../context";
18+
import { Logger } from "../logger/simple-logger";
19+
import { RetryStrategy } from "../retries/retryStrategy";
20+
import { RetryPolicySymbol } from "../retries/symbols";
21+
import { IClientSettings } from "../client/settings";
22+
2023

2124
interface IDoOpts<T> {
2225
ctx?: Context,
2326
// ctx?: Context
2427
txSettings?: Ydb.Query.ITransactionSettings,
2528
fn: SessionCallback<T>,
2629
timeout?: number,
27-
idempotent?: boolean
30+
idempotent?: boolean,
31+
onTrailer?: (metadata: Metadata) => void
2832
}
2933

3034
/**
@@ -57,13 +61,18 @@ export class QueryClient extends EventEmitter {
5761
timeout: opts.timeout
5862
},
5963
async (ctx) => {
60-
return this.retrier.retry<T>(ctx,async (_ctx) => {
64+
return this.retrier.retry<T>(ctx, async (_ctx) => {
6165
const session = await this.pool.acquire();
6266
session[ctxSymbol] = ctx;
6367
if (opts.hasOwnProperty('idempotent')) {
6468
session[isIdempotentDoLevelSymbol] = true;
6569
session[isIdempotentSymbol] = opts.idempotent;
6670
}
71+
72+
if (opts.hasOwnProperty('onTrailer')) {
73+
session[sessionTrailerCallbackSymbol] = opts.onTrailer;
74+
}
75+
6776
let error: Error;
6877
try {
6978
if (opts.txSettings) session[sessionTxSettingsSymbol] = opts.txSettings;
@@ -85,16 +94,17 @@ export class QueryClient extends EventEmitter {
8594
await session[sessionRollbackTransactionSymbol]();
8695
}
8796
}
88-
return {result: res};
97+
return { result: res };
8998
} catch (err) {
9099
error = err as Error;
91-
return {err: error, idempotent: session[isIdempotentSymbol]}
100+
return { err: error, idempotent: session[isIdempotentSymbol] }
92101
} finally {
93102
delete session[ctxSymbol];
94103
delete session[sessionTxSettingsSymbol];
95104
delete session[sessionCurrentOperationSymbol];
96105
delete session[isIdempotentDoLevelSymbol];
97106
delete session[isIdempotentSymbol];
107+
delete session[sessionTrailerCallbackSymbol];
98108
// @ts-ignore
99109
if (error && (error as any)[RetryPolicySymbol]?.deleteSession) {
100110
this.logger.debug('Encountered bad or busy session, re-creating the session');
@@ -110,7 +120,7 @@ export class QueryClient extends EventEmitter {
110120
@ensureContext()
111121
public doTx<T>(opts: IDoOpts<T>): Promise<T> {
112122
if (!opts.txSettings) {
113-
opts = {...opts, txSettings: AUTO_TX.beginTx};
123+
opts = { ...opts, txSettings: AUTO_TX.beginTx };
114124
}
115125
return this.do<T>(opts);
116126
}

src/query/query-session-execute.ts

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
1-
import {Ydb} from "ydb-sdk-proto";
1+
import { Ydb } from "ydb-sdk-proto";
22
import {
33
isIdempotentDoLevelSymbol,
44
isIdempotentSymbol,
55
resultsetYdbColumnsSymbol,
66
sessionCurrentOperationSymbol,
7+
sessionTrailerCallbackSymbol,
78
sessionTxIdSymbol,
89
sessionTxSettingsSymbol,
910
} from "./symbols";
10-
import {buildAsyncQueueIterator, IAsyncQueueIterator} from "../utils/build-async-queue-iterator";
11-
import {ResultSet} from "./result-set";
12-
import {ClientReadableStream} from "@grpc/grpc-js";
13-
import {ensureCallSucceeded} from "../utils/process-ydb-operation-result";
11+
import { buildAsyncQueueIterator, IAsyncQueueIterator } from "../utils/build-async-queue-iterator";
12+
import { ResultSet } from "./result-set";
13+
import { ClientReadableStream } from "@grpc/grpc-js";
14+
import { ensureCallSucceeded } from "../utils/process-ydb-operation-result";
1415
import Long from "long";
15-
import {StatusObject as GrpcStatusObject} from "@grpc/grpc-js/build/src/call-interface";
16-
import {TransportError} from "../errors";
17-
import {implSymbol, QuerySession} from "./query-session";
18-
import {convertYdbValueToNative, snakeToCamelCaseConversion} from "../types";
19-
import {CtxUnsubcribe} from "../context";
16+
import { StatusObject as GrpcStatusObject } from "@grpc/grpc-js/build/src/call-interface";
17+
import { TransportError } from "../errors";
18+
import { implSymbol, QuerySession } from "./query-session";
19+
import { convertYdbValueToNative, snakeToCamelCaseConversion } from "../types";
20+
import { CtxUnsubcribe } from "../context";
2021
import IExecuteQueryRequest = Ydb.Query.IExecuteQueryRequest;
2122
import IColumn = Ydb.IColumn;
2223

@@ -113,7 +114,7 @@ export function execute(this: QuerySession, args: IExecuteArgs): Promise<IExecut
113114
throw new Error('txControl.commitTx === true when no open transaction and there\'s no txControl.beginTx');
114115
}
115116

116-
// Build params
117+
// Build params
117118
const executeQueryRequest: IExecuteQueryRequest = {
118119
sessionId: this.sessionId,
119120
queryContent: {
@@ -126,7 +127,7 @@ export function execute(this: QuerySession, args: IExecuteArgs): Promise<IExecut
126127
if (args.statsMode) executeQueryRequest.statsMode = args.statsMode;
127128
if (args.parameters) executeQueryRequest.parameters = args.parameters;
128129
if (this[sessionTxSettingsSymbol] && !this[sessionTxIdSymbol])
129-
executeQueryRequest.txControl = {beginTx: this[sessionTxSettingsSymbol], commitTx: false};
130+
executeQueryRequest.txControl = { beginTx: this[sessionTxSettingsSymbol], commitTx: false };
130131
else if (args.txControl)
131132
executeQueryRequest.txControl = args.txControl;
132133
if (this[sessionTxIdSymbol])
@@ -137,7 +138,7 @@ export function execute(this: QuerySession, args: IExecuteArgs): Promise<IExecut
137138
if (args.idempotent) this[isIdempotentSymbol] = true;
138139
}
139140

140-
// Run the operation
141+
// Run the operation
141142
let finished = false;
142143
const resultSetByIndex: [iterator: IAsyncQueueIterator<Ydb.IValue>, resultSet: ResultSet][] = [];
143144
const resultSetIterator = buildAsyncQueueIterator<ResultSet>();
@@ -158,7 +159,7 @@ export function execute(this: QuerySession, args: IExecuteArgs): Promise<IExecut
158159
});
159160
}
160161

161-
// One operation per session in a time. And it might be cancelled
162+
// One operation per session in a time. And it might be cancelled
162163
if (this[sessionCurrentOperationSymbol]) throw new Error('There\'s another active operation in the session');
163164

164165
const cancel = (reason: any, onStreamError?: boolean) => {
@@ -179,9 +180,9 @@ export function execute(this: QuerySession, args: IExecuteArgs): Promise<IExecut
179180
delete this[sessionCurrentOperationSymbol];
180181
}
181182

182-
this[sessionCurrentOperationSymbol] = {cancel};
183+
this[sessionCurrentOperationSymbol] = { cancel };
183184

184-
// Operation
185+
// Operation
185186
responseStream = this[implSymbol].grpcServiceClient!.makeServerStreamRequest(
186187
'/Ydb.Query.V1.QueryService/ExecuteQuery',
187188
(v) => Ydb.Query.ExecuteQueryRequest.encode(v).finish() as Buffer,
@@ -282,7 +283,11 @@ export function execute(this: QuerySession, args: IExecuteArgs): Promise<IExecut
282283
cancel(TransportError.convertToYdbError(err), true);
283284
});
284285

285-
responseStream.on('metadata', (_metadata) => {
286+
responseStream.on('metadata', (metadata) => {
287+
if (this[sessionTrailerCallbackSymbol]) {
288+
this[sessionTrailerCallbackSymbol](metadata);
289+
}
290+
286291
// TODO: Process partial meta
287292
// TODO: Expect to see on graceful shutdown
288293
});

src/query/query-session.ts

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import EventEmitter from "events";
2-
import {GrpcQueryService, QueryService, SessionEvent} from "./query-session-pool";
3-
import {Endpoint} from "../discovery";
4-
import {retryable} from "../retries_obsoleted";
5-
import {pessimizable} from "../utils";
6-
import {ensureCallSucceeded} from "../utils/process-ydb-operation-result";
7-
import {Ydb} from "ydb-sdk-proto";
8-
import {ClientReadableStream} from "@grpc/grpc-js";
2+
import { GrpcQueryService, QueryService, SessionEvent } from "./query-session-pool";
3+
import { Endpoint } from "../discovery";
4+
import { retryable } from "../retries_obsoleted";
5+
import { pessimizable } from "../utils";
6+
import { ensureCallSucceeded } from "../utils/process-ydb-operation-result";
7+
import { Ydb } from "ydb-sdk-proto";
8+
import { ClientReadableStream, Metadata } from "@grpc/grpc-js";
99
import {
1010
sessionIdSymbol,
1111
sessionTxSettingsSymbol,
@@ -23,20 +23,21 @@ import {
2323
isIdempotentSymbol,
2424
isIdempotentDoLevelSymbol,
2525
createSymbol,
26-
sessionIsClosingSymbol, ctxSymbol
26+
sessionIsClosingSymbol, ctxSymbol,
27+
sessionTrailerCallbackSymbol
2728
} from './symbols';
2829
import ICreateSessionResult = Ydb.Table.ICreateSessionResult;
2930

30-
import {attach as attachImpl} from './query-session-attach';
31-
import {CANNOT_MANAGE_TRASACTIONS_ERROR, execute as executeImpl} from './query-session-execute';
31+
import { attach as attachImpl } from './query-session-attach';
32+
import { CANNOT_MANAGE_TRASACTIONS_ERROR, execute as executeImpl } from './query-session-execute';
3233
import {
3334
beginTransaction,
3435
beginTransaction as beginTransactionImpl, commitTransaction,
3536
commitTransaction as commitTransactionImpl,
3637
rollbackTransaction as rollbackTransactionImpl
3738
} from './query-session-transaction';
38-
import {Logger} from "../logger/simple-logger";
39-
import {Context} from "../context";
39+
import { Logger } from "../logger/simple-logger";
40+
import { Context } from "../context";
4041

4142
export interface QuerySessionOperation {
4243
cancel(reason: any): void;
@@ -54,6 +55,7 @@ export class QuerySession extends EventEmitter implements ICreateSessionResult {
5455
[sessionTxSettingsSymbol]?: Ydb.Query.ITransactionSettings;
5556
[isIdempotentDoLevelSymbol]?: boolean
5657
[isIdempotentSymbol]?: boolean;
58+
[sessionTrailerCallbackSymbol]?: (md: Metadata) => void;
5759

5860
// private fields, available in the methods placed in separated files
5961
[implSymbol]: QueryService;
@@ -137,7 +139,7 @@ export class QuerySession extends EventEmitter implements ICreateSessionResult {
137139
this.beingDeleted = true;
138140
await this[attachStreamSymbol]?.cancel();
139141
delete this[attachStreamSymbol]; // only one stream cancel even when multi ple retries
140-
ensureCallSucceeded(await this[apiSymbol].deleteSession({sessionId: this.sessionId}));
142+
ensureCallSucceeded(await this[apiSymbol].deleteSession({ sessionId: this.sessionId }));
141143
}
142144

143145
// TODO: Uncomment after switch to TS 5.3

src/query/symbols.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,6 @@ export const sessionIdSymbol = Symbol('sessionId');
2424
export const sessionTxIdSymbol = Symbol('sessionTxId');
2525
export const sessionTxSettingsSymbol = Symbol('sessionTxSettings');
2626
export const sessionCurrentOperationSymbol = Symbol('sessionCurrentOperation');
27+
export const sessionTrailerCallbackSymbol = Symbol('sessionTrailerCallback');
28+
2729
export const resultsetYdbColumnsSymbol = Symbol('resultsetYdbColumns');

0 commit comments

Comments
 (0)