From 80b9078e0f1319b3d3c2a12570f2015a412dfb21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Istv=C3=A1n=20So=C3=B3s?= Date: Fri, 13 Sep 2024 22:54:51 +0200 Subject: [PATCH] First implementation of statement cancellation through new connection. (#377) --- lib/src/messages/client_messages.dart | 30 ++++++++++++++ lib/src/v3/connection.dart | 56 ++++++++++++++++++++------- test/timeout_test.dart | 10 +++++ 3 files changed, 81 insertions(+), 15 deletions(-) diff --git a/lib/src/messages/client_messages.dart b/lib/src/messages/client_messages.dart index 05a0a67..95dd91e 100644 --- a/lib/src/messages/client_messages.dart +++ b/lib/src/messages/client_messages.dart @@ -398,3 +398,33 @@ class HotStandbyFeedbackMessage extends ClientMessage buffer.writeUint32(epochCatalogXminXid); } } + +class CancelRequestMessage extends ClientMessage { + /// The process ID of the target backend. + final int processId; + + /// The secret key for the target backend. + final int secretKey; + + CancelRequestMessage({ + required this.processId, + required this.secretKey, + }); + + @override + void applyToBuffer(PgByteDataWriter buffer) { + // Length of message contents in bytes, including self. + buffer.writeInt32(16); + // The cancel request code. The value is chosen to contain 1234 in the most significant 16 bits, + // and 5678 in the least significant 16 bits. + // (To avoid confusion, this code must not be the same as any protocol version number.) + buffer.writeInt32(80877102); + buffer.writeUint32(processId); + buffer.writeUint32(secretKey); + } + + @override + String toString() { + return 'CancelRequestMessage: $processId $secretKey'; + } +} diff --git a/lib/src/v3/connection.dart b/lib/src/v3/connection.dart index 9c62151..e4200fd 100644 --- a/lib/src/v3/connection.dart +++ b/lib/src/v3/connection.dart @@ -222,20 +222,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { incomingBytesTransformer: incomingBytesTransformer, ); - if (_debugLog) { - channel = channel.transform(StreamChannelTransformer( - StreamTransformer.fromHandlers( - handleData: (msg, sink) { - print('[in] $msg'); - sink.add(msg); - }, - ), - async.StreamSinkTransformer.fromHandlers(handleData: (msg, sink) { - print('[out] $msg'); - sink.add(msg); - }), - )); - } + channel = _debugChannel(channel); if (settings.transformer != null) { channel = channel.transform(settings.transformer!); @@ -256,6 +243,25 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { return connection; } + static StreamChannel _debugChannel(StreamChannel channel) { + if (!_debugLog) { + return channel; + } + final hash = channel.hashCode.abs().toRadixString(16); + return channel.transform(StreamChannelTransformer( + StreamTransformer.fromHandlers( + handleData: (msg, sink) { + print('[$hash][in] $msg'); + sink.add(msg); + }, + ), + async.StreamSinkTransformer.fromHandlers(handleData: (msg, sink) { + print('[$hash][out] $msg'); + sink.add(msg); + }), + )); + } + static Future<(StreamChannel, bool)> _connect( Endpoint endpoint, ResolvedConnectionSettings settings, { @@ -380,6 +386,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { /// Whether [_channel] is backed by a TLS connection. final bool _channelIsSecure; late final StreamSubscription _serverMessages; + BackendKeyMessage? _backendKeyMessage; bool _isClosing = false; bool _socketIsBroken = false; @@ -461,7 +468,9 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { if (message is ParameterStatusMessage) { info.setParameter(message.name, message.value); - } else if (message is BackendKeyMessage || message is NoticeMessage) { + } else if (message is BackendKeyMessage) { + _backendKeyMessage = message; + } else if (message is NoticeMessage) { // ignore for now } else if (message is NotificationResponseMessage) { _channels.deliverNotification(message); @@ -597,6 +606,23 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { socketIsBroken: cause?.willAbortConnection ?? false, ); } + + @internal + Future cancelPendingStatement() async { + var (channel, _) = + await _connect(_endpoint, _settings, codecContext: codecContext); + if (_backendKeyMessage == null) { + throw PgException( + 'Unable to cancel pending statement: no backend key available.'); + } + channel = _debugChannel(channel); + channel.sink.add(CancelRequestMessage( + processId: _backendKeyMessage!.processId, + secretKey: _backendKeyMessage!.secretKey, + )); + // Waiting for the server to close connection. + await channel.stream.listen((_) {}).asFuture(); + } } class _PreparedStatement extends Statement { diff --git a/test/timeout_test.dart b/test/timeout_test.dart index e781946..45de04f 100644 --- a/test/timeout_test.dart +++ b/test/timeout_test.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'package:postgres/postgres.dart'; +import 'package:postgres/src/v3/connection.dart'; import 'package:test/test.dart'; import 'docker.dart'; @@ -18,6 +19,15 @@ void main() { await conn.close(); }); + test('Cancel current statement through a new connection', () async { + final f = conn.execute('SELECT pg_sleep(2);'); + await (conn as PgConnectionImplementation).cancelPendingStatement(); + await expectLater(f, throwsA(isA())); + // connection is still usable + final rs = await conn.execute('SELECT 1;'); + expect(rs[0][0], 1); + }); + test('Timeout fires during transaction rolls ack transaction', () async { try { await conn.runTx((ctx) async {