From 63cdc6f990951cc27ee1ffc97fb874afade21468 Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Fri, 13 Sep 2024 01:17:11 +0200 Subject: [PATCH] Optional byte splitter and delayer for tests. --- lib/src/v3/connection.dart | 8 ++++++++ test/docker.dart | 34 ++++++++++++++++++++++++++++++++-- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/lib/src/v3/connection.dart b/lib/src/v3/connection.dart index 5d3b586..ab968a4 100644 --- a/lib/src/v3/connection.dart +++ b/lib/src/v3/connection.dart @@ -202,6 +202,8 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { static Future connect( Endpoint endpoint, { ConnectionSettings? connectionSettings, + @visibleForTesting + StreamTransformer? incomingBytesTransformer, }) async { final settings = connectionSettings is ResolvedConnectionSettings ? connectionSettings @@ -217,6 +219,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { endpoint, settings, codecContext: codecContext, + incomingBytesTransformer: incomingBytesTransformer, ); if (_debugLog) { @@ -257,6 +260,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { Endpoint endpoint, ResolvedConnectionSettings settings, { required CodecContext codecContext, + StreamTransformer? incomingBytesTransformer, }) async { final host = endpoint.host; final port = endpoint.port; @@ -337,6 +341,10 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { adaptedStream = async.SubscriptionStream(subscription); } + if (incomingBytesTransformer != null) { + adaptedStream = adaptedStream.transform(incomingBytesTransformer); + } + final outgoingSocket = async.StreamSinkExtensions(socket) .transform( async.StreamSinkTransformer.fromHandlers(handleDone: (out) { diff --git a/test/docker.dart b/test/docker.dart index e297a4b..af8f4f9 100644 --- a/test/docker.dart +++ b/test/docker.dart @@ -1,5 +1,6 @@ import 'dart:async'; import 'dart:io'; +import 'dart:typed_data'; import 'package:async/async.dart'; import 'package:docker_process/containers/postgres.dart'; @@ -8,9 +9,12 @@ import 'package:meta/meta.dart'; import 'package:path/path.dart' as p; import 'package:postgres/messages.dart'; import 'package:postgres/postgres.dart'; +import 'package:postgres/src/v3/connection.dart'; import 'package:stream_channel/stream_channel.dart'; import 'package:test/test.dart'; +final _splitAndDelayBytes = false; + // We log all packets sent to and received from the postgres server. This can be // used to debug failing tests. To view logs, something like this can be put // at the beginning of `main()`: @@ -64,9 +68,9 @@ class PostgresServer { SslMode? sslMode, QueryMode? queryMode, }) async { - return Connection.open( + return await PgConnectionImplementation.connect( await endpoint(), - settings: ConnectionSettings( + connectionSettings: ConnectionSettings( connectTimeout: Duration(seconds: 3), queryTimeout: Duration(seconds: 3), replicationMode: replicationMode, @@ -74,6 +78,8 @@ class PostgresServer { sslMode: sslMode, queryMode: queryMode, ), + incomingBytesTransformer: + _splitAndDelayBytes ? _transformIncomingBytes() : null, ); } @@ -82,6 +88,30 @@ class PostgresServer { } } +StreamTransformer _transformIncomingBytes() { + return StreamTransformer.fromBind((s) => s.asyncExpand((u) { + if (u.length <= 2) { + return Stream.value(u); + } + final hash = u.hashCode.abs(); + final split = hash % u.length; + if (split == 0 || split >= u.length - 1) { + return Stream.value(u); + } + + final p1 = u.sublist(0, split); + final p2 = u.sublist(split); + + return Stream.fromFutures([ + Future.value(p1), + Future.delayed( + Duration(milliseconds: 50), + () => p2, + ) + ]); + })); +} + @isTestGroup void withPostgresServer( String name,