From 2710408b3b7c18e194b65e599faa879a4b6406ca Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Fri, 13 Sep 2024 23:03:36 +0200 Subject: [PATCH] Timeout triggers current statement cancellation (using a new connection). --- CHANGELOG.md | 1 + lib/src/exceptions.dart | 10 +++++++- lib/src/v3/connection.dart | 48 ++++++++++++++++++++++++++++---------- test/timeout_test.dart | 44 ++++++++++++++++++++++------------ 4 files changed, 75 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ac7f56c..3ae81ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - `DatabaseInfo` tracks information about relations and oids (currently limited to `RelationMessage` caching). - **Behaviour / soft-breaking changes**: - Preparing/executing a stamement on the main connection while in a `runTx` callback will throw an exception. + - Setting `timeout` will try to actively cancel the current statement using a new connection. - Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages). - Deprecated some logical replication message parsing method. - Removed `@internal`-annotated methods from the public API of `ServerException` and `Severity`. diff --git a/lib/src/exceptions.dart b/lib/src/exceptions.dart index 39839ee..1d61a12 100644 --- a/lib/src/exceptions.dart +++ b/lib/src/exceptions.dart @@ -190,13 +190,21 @@ ServerException buildExceptionFromErrorFields(List errorFields) { ); } -PgException transformServerException(ServerException ex) { +PgException transformServerException( + ServerException ex, { + bool timeoutTriggered = false, +}) { if (ex.code == '57014' && ex.message == 'canceling statement due to statement timeout') { return _PgTimeoutException( ['${ex.code}:', ex.message, ex.trace].whereType().join(' '), ); } + if (ex.code == '57014' && timeoutTriggered) { + return _PgTimeoutException( + ['${ex.code}:', ex.message, ex.trace].whereType().join(' '), + ); + } return ex; } diff --git a/lib/src/v3/connection.dart b/lib/src/v3/connection.dart index 253deca..088b247 100644 --- a/lib/src/v3/connection.dart +++ b/lib/src/v3/connection.dart @@ -156,11 +156,9 @@ abstract class _PgSessionBase implements Session { ignoreRows, ); try { - await querySubscription.asFuture().optionalTimeout(timeout); - return Result( - rows: items, - affectedRows: await querySubscription.affectedRows, - schema: await querySubscription.schema, + return await querySubscription._waitForResult( + items: items, + timeout: timeout, ); } finally { await querySubscription.cancel(); @@ -260,6 +258,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { ), async.StreamSinkTransformer.fromHandlers(handleData: (msg, sink) { print('[$hash][out] $msg'); + print('[out] $msg'); sink.add(msg); }), )); @@ -655,16 +654,13 @@ class _PreparedStatement extends Statement { final items = []; final subscription = bind(parameters).listen(items.add); try { - await subscription.asFuture().optionalTimeout(timeout); + return await (subscription as _PgResultStreamSubscription)._waitForResult( + items: items, + timeout: timeout, + ); } finally { await subscription.cancel(); } - - return Result( - rows: items, - affectedRows: await subscription.affectedRows, - schema: await subscription.schema, - ); } @override @@ -892,6 +888,34 @@ class _PgResultStreamSubscription } } + Future _waitForResult({ + required List items, + required Duration? timeout, + }) async { + bool timeoutTriggered = false; + final cancelTimer = timeout == null + ? null + : Timer(timeout, () async { + timeoutTriggered = true; + await connection.cancelPendingStatement(); + }); + try { + await asFuture(); + return Result( + rows: items, + affectedRows: await affectedRows, + schema: await schema, + ); + } on ServerException catch (e) { + if (timeoutTriggered) { + throw transformServerException(e, timeoutTriggered: timeoutTriggered); + } + rethrow; + } finally { + cancelTimer?.cancel(); + } + } + // Forwarding subscription interface to regular stream subscription from // controller diff --git a/test/timeout_test.dart b/test/timeout_test.dart index 50af19a..a6f7e20 100644 --- a/test/timeout_test.dart +++ b/test/timeout_test.dart @@ -103,19 +103,33 @@ void main() { // Note: to fix this, we may consider cancelling the currently running statements: // https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-CANCELING-REQUESTS - // withPostgresServer('timeout race conditions', (server) { - // test('two transactions for update', () async { - // final c1 = await server.newConnection(); - // final c2 = await server.newConnection(); - // await c1.execute('CREATE TABLE t (id INT PRIMARY KEY);'); - // await c1.execute('INSERT INTO t (id) values (1);'); - // await c1.execute('BEGIN'); - // await c1.execute('SELECT * FROM t WHERE id=1 FOR UPDATE'); - // await c2.execute('BEGIN'); - // await c2.execute('SELECT * FROM t WHERE id=1 FOR UPDATE', - // timeout: Duration(seconds: 1)); - // await c1.execute('ROLLBACK'); - // await c2.execute('ROLLBACK'); - // }); - // }); + withPostgresServer('timeout race conditions', (server) { + setUp(() async { + final c1 = await server.newConnection(); + await c1.execute('CREATE TABLE t (id INT PRIMARY KEY);'); + await c1.execute('INSERT INTO t (id) values (1);'); + }); + + test('two transactions for update', () async { + for (final qm in QueryMode.values) { + final c1 = await server.newConnection(); + final c2 = await server.newConnection(queryMode: qm); + await c1.execute('BEGIN'); + await c1.execute('SELECT * FROM t WHERE id=1 FOR UPDATE'); + await c2.execute('BEGIN'); + try { + await c2.execute('SELECT * FROM t WHERE id=1 FOR UPDATE', + timeout: Duration(seconds: 1)); + fail('unreachable'); + } on TimeoutException catch (_) { + // ignore + } + await c1.execute('ROLLBACK'); + await c2.execute('ROLLBACK'); + + await c1.execute('SELECT 1'); + await c2.execute('SELECT 1'); + } + }); + }); }