diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a7440f..ad979b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - `Connection.info` (through `ConnectionInfo` class) exposes read-only connection-level information, e.g. acessing access server-provided parameter status values. - Support for binary `pgoutput` replication by [wolframm](https://github.com/Wolframm-Activities-OU). +- Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages). - **Allowing custom type codecs**: - `Codec` interface is used for encoding/decoding value by type OIDs or Dart values. - `Codec.encode` and `Codec.decode` gets a reference to `CodecContext` which provides @@ -13,8 +14,6 @@ (for values where type is not specified). - `RelationTracker` tracks information about relations (currently limited to `RelationMessage` caching). - **Behaviour / soft-breaking changes**: - - Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages). - - Deprecated some logical replication message parsing method. - Removed `@internal`-annotated methods from the public API of `ServerException` and `Severity`. - `ServerException` may be transformed into `_PgTimeoutException` which is both `PgException` and `TimeoutException` (but no longer `ServerException`). - The `timeout` parameters and the `SessionSettings.queryTimeout` has only a somewhat diff --git a/lib/messages.dart b/lib/messages.dart index a1db8de..4920d80 100644 --- a/lib/messages.dart +++ b/lib/messages.dart @@ -2,7 +2,6 @@ library messages; export 'src/buffer.dart' show PgByteDataWriter; export 'src/messages/client_messages.dart'; -export 'src/messages/logical_replication_messages.dart' - hide tryAsyncParseLogicalReplicationMessage; -export 'src/messages/server_messages.dart' hide parseXLogDataMessage; +export 'src/messages/logical_replication_messages.dart'; +export 'src/messages/server_messages.dart'; export 'src/messages/shared_messages.dart'; diff --git a/lib/src/message_window.dart b/lib/src/message_window.dart index 6b2436b..b7bda95 100644 --- a/lib/src/message_window.dart +++ b/lib/src/message_window.dart @@ -1,4 +1,3 @@ -import 'dart:async'; import 'dart:collection'; import 'dart:typed_data'; @@ -12,7 +11,7 @@ import 'messages/shared_messages.dart'; const int _headerByteSize = 5; -typedef _ServerMessageFn = FutureOr Function( +typedef _ServerMessageFn = ServerMessage Function( PgByteDataReader reader, int length); Map _messageTypeMap = { @@ -51,7 +50,7 @@ class MessageFramer { bool get _isComplete => _expectedLength == 0 || _expectedLength <= _reader.remainingLength; - Future addBytes(Uint8List bytes) async { + void addBytes(Uint8List bytes) { _reader.add(bytes); while (true) { @@ -77,7 +76,7 @@ class MessageFramer { } final targetRemainingLength = _reader.remainingLength - _expectedLength; - final msg = await msgMaker(_reader, _expectedLength); + final msg = msgMaker(_reader, _expectedLength); if (_reader.remainingLength > targetRemainingLength) { throw StateError( 'Message parser consumed more bytes than expected. type=$_type expectedLength=$_expectedLength'); @@ -112,8 +111,7 @@ class MessageFramer { /// such as replication messages. /// Returns a [ReplicationMessage] if the message contains such message. /// Otherwise, it'll just return the provided bytes as [CopyDataMessage]. -Future _parseCopyDataMessage( - PgByteDataReader reader, int length) async { +ServerMessage _parseCopyDataMessage(PgByteDataReader reader, int length) { final code = reader.readUint8(); if (code == ReplicationMessageId.primaryKeepAlive) { return PrimaryKeepAliveMessage.parse(reader); diff --git a/lib/src/messages/logical_replication_messages.dart b/lib/src/messages/logical_replication_messages.dart index a5a0ede..d6adc24 100644 --- a/lib/src/messages/logical_replication_messages.dart +++ b/lib/src/messages/logical_replication_messages.dart @@ -1,7 +1,6 @@ import 'dart:typed_data'; import 'package:buffer/buffer.dart'; -import 'package:meta/meta.dart'; import 'package:postgres/src/types/codec.dart'; import '../buffer.dart'; @@ -48,8 +47,6 @@ class XLogDataLogicalMessage implements XLogDataMessage { /// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so, /// [LogicalReplicationMessage] is returned, otherwise `null` is returned. -@Deprecated('This method will be removed from public API. ' - 'Please file a new issue on GitHub if you are using it.') LogicalReplicationMessage? tryParseLogicalReplicationMessage( PgByteDataReader reader, int length) { // the first byte is the msg type @@ -72,66 +69,13 @@ LogicalReplicationMessage? tryParseLogicalReplicationMessage( return TypeMessage._parse(reader); case LogicalReplicationMessageTypes.insert: - return InsertMessage._syncParse(reader); + return InsertMessage._parse(reader); case LogicalReplicationMessageTypes.update: - return UpdateMessage._syncParse(reader); + return UpdateMessage._parse(reader); case LogicalReplicationMessageTypes.delete: - return DeleteMessage._syncParse(reader); - - case LogicalReplicationMessageTypes.truncate: - return TruncateMessage._parse(reader); - - case LogicalReplicationMessageTypes.unsupported: - // wal2json messages starts with `{` as the first byte - if (firstByte == '{'.codeUnits.single) { - // note this needs the full set of bytes unlike other cases - final bb = BytesBuffer(); - bb.addByte(firstByte); - bb.add(reader.read(length - 1)); - try { - return JsonMessage(reader.encoding.decode(bb.toBytes())); - } catch (_) { - // ignore - } - } - return null; - } -} - -/// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so, -/// [LogicalReplicationMessage] is returned, otherwise `null` is returned. -@internal -Future tryAsyncParseLogicalReplicationMessage( - PgByteDataReader reader, int length) async { - // the first byte is the msg type - final firstByte = reader.readUint8(); - final msgType = LogicalReplicationMessageTypes.fromByte(firstByte); - switch (msgType) { - case LogicalReplicationMessageTypes.begin: - return BeginMessage._parse(reader); - - case LogicalReplicationMessageTypes.commit: - return CommitMessage._parse(reader); - - case LogicalReplicationMessageTypes.origin: - return OriginMessage._parse(reader); - - case LogicalReplicationMessageTypes.relation: - return RelationMessage._parse(reader); - - case LogicalReplicationMessageTypes.type: - return TypeMessage._parse(reader); - - case LogicalReplicationMessageTypes.insert: - return await InsertMessage._parse(reader); - - case LogicalReplicationMessageTypes.update: - return await UpdateMessage._parse(reader); - - case LogicalReplicationMessageTypes.delete: - return await DeleteMessage._parse(reader); + return DeleteMessage._parse(reader); case LogicalReplicationMessageTypes.truncate: return TruncateMessage._parse(reader); @@ -437,9 +381,7 @@ class TupleData { /// TupleData does not consume the entire bytes /// /// It'll read until the types are generated. - /// - /// NOTE: do not use, will be removed. - factory TupleData._syncParse(PgByteDataReader reader, int relationId) { + factory TupleData._parse(PgByteDataReader reader, int relationId) { final columnCount = reader.readUint16(); final columns = []; for (var i = 0; i < columnCount; i++) { @@ -495,66 +437,6 @@ class TupleData { return TupleData(columns: columns); } - /// TupleData does not consume the entire bytes - /// - /// It'll read until the types are generated. - static Future _parse( - PgByteDataReader reader, int relationId) async { - final columnCount = reader.readUint16(); - final columns = []; - for (var i = 0; i < columnCount; i++) { - // reading order matters - final typeId = reader.readUint8(); - final tupleDataType = TupleDataType.fromByte(typeId); - late final int length; - late final String data; - final typeOid = reader.codecContext.relationTracker - .getCachedTypeOidForRelationColumn(relationId, i); - Object? value; - switch (tupleDataType) { - case TupleDataType.text: - length = reader.readUint32(); - data = reader.encoding.decode(reader.read(length)); - value = data; - break; - case TupleDataType.binary: - length = reader.readUint32(); - final bytes = reader.read(length); - value = typeOid == null - ? UndecodedBytes( - typeOid: 0, - isBinary: true, - bytes: bytes, - encoding: reader.codecContext.encoding, - ) - : await reader.codecContext.typeRegistry.decode( - EncodedValue.binary( - bytes, - typeOid: typeOid, - ), - reader.codecContext, - ); - data = value.toString(); - break; - case TupleDataType.null_: - case TupleDataType.toast: - length = 0; - data = ''; - break; - } - columns.add( - TupleDataColumn( - typeId: typeId, - length: length, - typeOid: typeOid, - data: data, - value: value, - ), - ); - } - return TupleData(columns: columns); - } - late final int columnCount = columns.length; @override @@ -569,26 +451,13 @@ class InsertMessage implements LogicalReplicationMessage { late final int relationId; late final TupleData tuple; - InsertMessage._(this.relationId, this.tuple); - - /// NOTE: do not use, will be removed. - InsertMessage._syncParse(PgByteDataReader reader) { + InsertMessage._parse(PgByteDataReader reader) { relationId = reader.readUint32(); final tupleType = reader.readUint8(); if (tupleType != 'N'.codeUnitAt(0)) { throw Exception("InsertMessage must have 'N' tuple type"); } - tuple = TupleData._syncParse(reader, relationId); - } - - static Future _parse(PgByteDataReader reader) async { - final relationId = reader.readUint32(); - final tupleType = reader.readUint8(); - if (tupleType != 'N'.codeUnitAt(0)) { - throw Exception("InsertMessage must have 'N' tuple type"); - } - final tuple = await TupleData._parse(reader, relationId); - return InsertMessage._(relationId, tuple); + tuple = TupleData._parse(reader, relationId); } @override @@ -642,11 +511,7 @@ class UpdateMessage implements LogicalReplicationMessage { /// Byte1('N'): Identifies the following TupleData message as a new tuple. late final TupleData? newTuple; - UpdateMessage._( - this.relationId, this.oldTupleType, this.oldTuple, this.newTuple); - - /// NOTE: do not use, will be removed. - UpdateMessage._syncParse(PgByteDataReader reader) { + UpdateMessage._parse(PgByteDataReader reader) { // reading order matters relationId = reader.readUint32(); var tupleType = UpdateMessageTuple.fromByte(reader.readUint8()); @@ -654,7 +519,7 @@ class UpdateMessage implements LogicalReplicationMessage { if (tupleType == UpdateMessageTuple.oldType || tupleType == UpdateMessageTuple.keyType) { oldTupleType = tupleType; - oldTuple = TupleData._syncParse(reader, relationId); + oldTuple = TupleData._parse(reader, relationId); tupleType = UpdateMessageTuple.fromByte(reader.readUint8()); } else { oldTupleType = null; @@ -662,38 +527,12 @@ class UpdateMessage implements LogicalReplicationMessage { } if (tupleType == UpdateMessageTuple.newType) { - newTuple = TupleData._syncParse(reader, relationId); + newTuple = TupleData._parse(reader, relationId); } else { throw Exception('Invalid Tuple Type for UpdateMessage'); } } - static Future _parse(PgByteDataReader reader) async { - // reading order matters - final relationId = reader.readUint32(); - UpdateMessageTuple? oldTupleType; - TupleData? oldTuple; - TupleData? newTuple; - var tupleType = UpdateMessageTuple.fromByte(reader.readUint8()); - - if (tupleType == UpdateMessageTuple.oldType || - tupleType == UpdateMessageTuple.keyType) { - oldTupleType = tupleType; - oldTuple = await TupleData._parse(reader, relationId); - tupleType = UpdateMessageTuple.fromByte(reader.readUint8()); - } else { - oldTupleType = null; - oldTuple = null; - } - - if (tupleType == UpdateMessageTuple.newType) { - newTuple = await TupleData._parse(reader, relationId); - } else { - throw Exception('Invalid Tuple Type for UpdateMessage'); - } - return UpdateMessage._(relationId, oldTupleType, oldTuple, newTuple); - } - @override String toString() { return 'UpdateMessage(relationId: $relationId, oldTupleType: $oldTupleType, oldTuple: $oldTuple, newTuple: $newTuple)'; @@ -744,36 +583,18 @@ class DeleteMessage implements LogicalReplicationMessage { /// Byte1('N'): Identifies the following TupleData message as a new tuple. late final TupleData oldTuple; - DeleteMessage._(this.relationId, this.oldTupleType, this.oldTuple); - - /// NOTE: do not use, will be removed. - DeleteMessage._syncParse(PgByteDataReader reader) { + DeleteMessage._parse(PgByteDataReader reader) { relationId = reader.readUint32(); oldTupleType = DeleteMessageTuple.fromByte(reader.readUint8()); switch (oldTupleType) { case DeleteMessageTuple.keyType: case DeleteMessageTuple.oldType: - oldTuple = TupleData._syncParse(reader, relationId); - break; - case DeleteMessageTuple.unknown: - throw Exception('Unknown tuple type for DeleteMessage'); - } - } - - static Future _parse(PgByteDataReader reader) async { - final relationId = reader.readUint32(); - final oldTupleType = DeleteMessageTuple.fromByte(reader.readUint8()); - TupleData? oldTuple; - switch (oldTupleType) { - case DeleteMessageTuple.keyType: - case DeleteMessageTuple.oldType: - oldTuple = await TupleData._parse(reader, relationId); + oldTuple = TupleData._parse(reader, relationId); break; case DeleteMessageTuple.unknown: throw Exception('Unknown tuple type for DeleteMessage'); } - return DeleteMessage._(relationId, oldTupleType, oldTuple); } @override diff --git a/lib/src/messages/server_messages.dart b/lib/src/messages/server_messages.dart index 07b9029..2f43cf2 100644 --- a/lib/src/messages/server_messages.dart +++ b/lib/src/messages/server_messages.dart @@ -368,8 +368,9 @@ class XLogDataMessage implements ReplicationMessage, ServerMessage { /// will return a [XLogDataLogicalMessage] with that message. Otherwise, it'll /// return [XLogDataMessage] with raw data. /// - @Deprecated('This method will be removed from public API. ' - 'Please file a new issue on GitHub if you are using it.') + @Deprecated( + 'It is likely that this method signature will change or will be removed in ' + 'an upcoming release. Please file a new issue on GitHub if you are using it.') static XLogDataMessage parse( Uint8List bytes, Encoding encoding, { @@ -408,45 +409,6 @@ class XLogDataMessage implements ReplicationMessage, ServerMessage { 'XLogDataMessage(walStart: $walStart, walEnd: $walEnd, time: $time, data: $data)'; } -/// Parses the XLogDataMessage -/// -/// If [XLogDataMessage.data] is a [LogicalReplicationMessage], then the method -/// will return a [XLogDataLogicalMessage] with that message. Otherwise, it'll -/// return [XLogDataMessage] with raw data. -@internal -Future parseXLogDataMessage( - Uint8List bytes, - Encoding encoding, { - CodecContext? codecContext, -}) async { - final reader = PgByteDataReader( - codecContext: - codecContext ?? CodecContext.withDefaults(encoding: encoding)) - ..add(bytes); - final walStart = LSN(reader.readUint64()); - final walEnd = LSN(reader.readUint64()); - final time = dateTimeFromMicrosecondsSinceY2k(reader.readUint64()); - - final message = await tryAsyncParseLogicalReplicationMessage( - reader, reader.remainingLength); - if (message != null) { - return XLogDataLogicalMessage( - message: message, - bytes: bytes, - time: time, - walEnd: walEnd, - walStart: walStart, - ); - } else { - return XLogDataMessage( - bytes: bytes, - time: time, - walEnd: walEnd, - walStart: walStart, - ); - } -} - class UnknownMessage extends ServerMessage { final int code; final Uint8List bytes; diff --git a/lib/src/types/codec.dart b/lib/src/types/codec.dart index 04657fa..6c407ba 100644 --- a/lib/src/types/codec.dart +++ b/lib/src/types/codec.dart @@ -1,4 +1,3 @@ -import 'dart:async'; import 'dart:convert'; import 'dart:typed_data'; @@ -61,7 +60,7 @@ enum EncodingFormat { /// Encodes the [input] value and returns an [EncodedValue] object. /// /// May return `null` if the encoder is not able to convert the [input] value. -typedef EncoderFn = FutureOr Function( +typedef EncoderFn = EncodedValue? Function( TypedValue input, CodecContext context); /// Encoder and decoder for a value stored in Postgresql. @@ -69,13 +68,13 @@ abstract class Codec { /// Encodes the [input] value and returns an [EncodedValue] object. /// /// May return `null` if the codec is not able to encode the [input]. - FutureOr encode(TypedValue input, CodecContext context); + EncodedValue? encode(TypedValue input, CodecContext context); /// Decodes the [input] value and returns a Dart value object. /// /// May return [UndecodedBytes] or the same [input] instance if the codec /// is not able to decode the [input]. - FutureOr decode(EncodedValue input, CodecContext context); + Object? decode(EncodedValue input, CodecContext context); } /// Provides access to connection and database information, and also to additional codecs. diff --git a/lib/src/types/type_registry.dart b/lib/src/types/type_registry.dart index f10fc77..055d550 100644 --- a/lib/src/types/type_registry.dart +++ b/lib/src/types/type_registry.dart @@ -267,12 +267,12 @@ class TypeRegistry { ]); } - Future encode(TypedValue input, CodecContext context) async { + EncodedValue? encode(TypedValue input, CodecContext context) { // check for codec final typeOid = input.type.oid; final codec = typeOid == null ? null : _codecs[typeOid]; if (codec != null) { - final r = await codec.encode(input, context); + final r = codec.encode(input, context); if (r != null) { return r; } @@ -280,7 +280,7 @@ class TypeRegistry { // fallback encoders for (final encoder in _encoders) { - final encoded = await encoder(input, context); + final encoded = encoder(input, context); if (encoded != null) { return encoded; } @@ -288,7 +288,7 @@ class TypeRegistry { throw PgException("Could not infer type of value '${input.value}'."); } - Future decode(EncodedValue value, CodecContext context) async { + Object? decode(EncodedValue value, CodecContext context) { final typeOid = value.typeOid; if (typeOid == null) { throw ArgumentError('`EncodedValue.typeOid` was not provided.'); @@ -297,7 +297,7 @@ class TypeRegistry { // check for codec final codec = _codecs[typeOid]; if (codec != null) { - final r = await codec.decode(value, context); + final r = codec.decode(value, context); if (r != value && r is! UndecodedBytes) { return r; } diff --git a/lib/src/v3/connection.dart b/lib/src/v3/connection.dart index c33bdad..b695193 100644 --- a/lib/src/v3/connection.dart +++ b/lib/src/v3/connection.dart @@ -679,17 +679,16 @@ class _PgResultStreamSubscription _scheduleStatement(() async { connection._pending = this; - final encodedFutures = >[]; + final encodedValues = []; final context = connection.codecContext; for (final e in statement.parameters) { if (e.isSqlNull) { - encodedFutures.add(Future.value(null)); + encodedValues.add(null); continue; } final f = context.typeRegistry.encode(e, context); - encodedFutures.add(f); + encodedValues.add(f); } - final encodedValues = await Future.wait(encodedFutures); connection._channel.sink.add(AggregatedClientMessage([ BindMessage( @@ -806,7 +805,7 @@ class _PgResultStreamSubscription sqlNulls ??= List.filled(columnCount, false); sqlNulls[i] = true; } - final futureValue = context.typeRegistry.decode( + final futureOr = context.typeRegistry.decode( EncodedValue( input, format: EncodingFormat.fromBinaryFlag(field.isBinaryEncoding), @@ -814,7 +813,7 @@ class _PgResultStreamSubscription ), context, ); - futures.add(futureValue); + futures.add(futureOr is Future ? futureOr : Future.value(futureOr)); } final values = await Future.wait(futures); diff --git a/lib/src/v3/protocol.dart b/lib/src/v3/protocol.dart index 7535b68..fb8e577 100644 --- a/lib/src/v3/protocol.dart +++ b/lib/src/v3/protocol.dart @@ -69,8 +69,8 @@ StreamTransformer _readMessages( } } - Future handleChunk(Uint8List bytes) async { - await framer.addBytes(bytes); + void handleChunk(Uint8List bytes) { + framer.addBytes(bytes); emitFinishedMessages(); } diff --git a/test/framer_test.dart b/test/framer_test.dart index c488226..bea6db0 100644 --- a/test/framer_test.dart +++ b/test/framer_test.dart @@ -14,12 +14,12 @@ void main() { framer = MessageFramer(CodecContext.withDefaults()); }); - tearDown(() async { - await flush(framer); + tearDown(() { + flush(framer); }); - test('Perfectly sized message in one buffer', () async { - await framer.addBytes(bufferWithMessages([ + test('Perfectly sized message in one buffer', () { + framer.addBytes(bufferWithMessages([ messageWithBytes([1, 2, 3], 1) ])); @@ -29,8 +29,8 @@ void main() { ]); }); - test('Two perfectly sized messages in one buffer', () async { - await framer.addBytes(bufferWithMessages([ + test('Two perfectly sized messages in one buffer', () { + framer.addBytes(bufferWithMessages([ messageWithBytes([1, 2, 3], 1), messageWithBytes([1, 2, 3, 4], 2) ])); @@ -42,13 +42,13 @@ void main() { ]); }); - test('Header fragment', () async { + test('Header fragment', () { final message = messageWithBytes([1, 2, 3], 1); final fragments = fragmentedMessageBuffer(message, 2); - await framer.addBytes(fragments.first); + framer.addBytes(fragments.first); expect(framer.messageQueue, isEmpty); - await framer.addBytes(fragments.last); + framer.addBytes(fragments.last); final messages = framer.messageQueue.toList(); expect(messages, [ @@ -56,18 +56,18 @@ void main() { ]); }); - test('Two header fragments', () async { + test('Two header fragments', () { final message = messageWithBytes([1, 2, 3], 1); final fragments = fragmentedMessageBuffer(message, 2); final moreFragments = fragmentedMessageBuffer(fragments.first, 1); - await framer.addBytes(moreFragments.first); + framer.addBytes(moreFragments.first); expect(framer.messageQueue, isEmpty); - await framer.addBytes(moreFragments.last); + framer.addBytes(moreFragments.last); expect(framer.messageQueue, isEmpty); - await framer.addBytes(fragments.last); + framer.addBytes(fragments.last); final messages = framer.messageQueue.toList(); expect(messages, [ @@ -75,17 +75,16 @@ void main() { ]); }); - test('One message + header fragment', () async { + test('One message + header fragment', () { final message1 = messageWithBytes([1, 2, 3], 1); final message2 = messageWithBytes([2, 2, 3], 2); final message2Fragments = fragmentedMessageBuffer(message2, 3); - await framer - .addBytes(bufferWithMessages([message1, message2Fragments.first])); + framer.addBytes(bufferWithMessages([message1, message2Fragments.first])); expect(framer.messageQueue.length, 1); - await framer.addBytes(message2Fragments.last); + framer.addBytes(message2Fragments.last); final messages = framer.messageQueue.toList(); expect(messages, [ @@ -94,17 +93,16 @@ void main() { ]); }); - test('Message + header, missing rest of buffer', () async { + test('Message + header, missing rest of buffer', () { final message1 = messageWithBytes([1, 2, 3], 1); final message2 = messageWithBytes([2, 2, 3], 2); final message2Fragments = fragmentedMessageBuffer(message2, 5); - await framer - .addBytes(bufferWithMessages([message1, message2Fragments.first])); + framer.addBytes(bufferWithMessages([message1, message2Fragments.first])); expect(framer.messageQueue.length, 1); - await framer.addBytes(message2Fragments.last); + framer.addBytes(message2Fragments.last); final messages = framer.messageQueue.toList(); expect(messages, [ @@ -113,13 +111,13 @@ void main() { ]); }); - test('Message body spans two packets', () async { + test('Message body spans two packets', () { final message = messageWithBytes([1, 2, 3, 4, 5, 6, 7], 1); final fragments = fragmentedMessageBuffer(message, 8); - await framer.addBytes(fragments.first); + framer.addBytes(fragments.first); expect(framer.messageQueue, isEmpty); - await framer.addBytes(fragments.last); + framer.addBytes(fragments.last); final messages = framer.messageQueue.toList(); expect(messages, [ @@ -129,15 +127,15 @@ void main() { test( 'Message spans two packets, started in a packet that contained another message', - () async { + () { final earlierMessage = messageWithBytes([1, 2], 0); final message = messageWithBytes([1, 2, 3, 4, 5, 6, 7], 1); - await framer.addBytes(bufferWithMessages( + framer.addBytes(bufferWithMessages( [earlierMessage, fragmentedMessageBuffer(message, 8).first])); expect(framer.messageQueue, hasLength(1)); - await framer.addBytes(fragmentedMessageBuffer(message, 8).last); + framer.addBytes(fragmentedMessageBuffer(message, 8).last); final messages = framer.messageQueue.toList(); expect(messages, [ @@ -146,22 +144,21 @@ void main() { ]); }); - test('Message spans three packets, only part of header in the first', - () async { + test('Message spans three packets, only part of header in the first', () { final earlierMessage = messageWithBytes([1, 2], 0); final message = messageWithBytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13], 1); - await framer.addBytes(bufferWithMessages( + framer.addBytes(bufferWithMessages( [earlierMessage, fragmentedMessageBuffer(message, 3).first])); expect(framer.messageQueue, hasLength(1)); - await framer.addBytes( + framer.addBytes( fragmentedMessageBuffer(fragmentedMessageBuffer(message, 3).last, 6) .first); expect(framer.messageQueue, hasLength(1)); - await framer.addBytes( + framer.addBytes( fragmentedMessageBuffer(fragmentedMessageBuffer(message, 3).last, 6) .last); @@ -173,43 +170,41 @@ void main() { ]); }); - test('Frame with no data', () async { - await framer.addBytes(bufferWithMessages([messageWithBytes([], 10)])); + test('Frame with no data', () { + framer.addBytes(bufferWithMessages([messageWithBytes([], 10)])); final messages = framer.messageQueue.toList(); expect(messages, [UnknownMessage(10, Uint8List(0))]); }); - test('Identify CopyDoneMessage with length equals size length (min)', - () async { + test('Identify CopyDoneMessage with length equals size length (min)', () { // min length final length = [0, 0, 0, 4]; // min length (4 bytes) as 32-bit final bytes = Uint8List.fromList([ SharedMessageId.copyDone, ...length, ]); - await framer.addBytes(bytes); + framer.addBytes(bytes); final message = framer.messageQueue.toList().first; expect(message, isA()); expect((message as CopyDoneMessage).length, 4); }); - test('Identify CopyDoneMessage when length larger than size length', - () async { + test('Identify CopyDoneMessage when length larger than size length', () { final length = (ByteData(4)..setUint32(0, 42)).buffer.asUint8List(); final bytes = Uint8List.fromList([ SharedMessageId.copyDone, ...length, ]); - await framer.addBytes(bytes); + framer.addBytes(bytes); final message = framer.messageQueue.toList().first; expect(message, isA()); expect((message as CopyDoneMessage).length, 42); }); - test('Adds XLogDataMessage to queue', () async { + test('Adds XLogDataMessage to queue', () { final bits64 = (ByteData(8)..setUint64(0, 42)).buffer.asUint8List(); // random data bytes final dataBytes = [1, 2, 3, 4, 5, 6, 7, 8]; @@ -232,13 +227,13 @@ void main() { ...xlogDataMessage, ]; - await framer.addBytes(Uint8List.fromList(copyDataBytes)); + framer.addBytes(Uint8List.fromList(copyDataBytes)); final message = framer.messageQueue.toList().first; expect(message, isA()); expect(message, isNot(isA())); }); - test('Adds XLogDataLogicalMessage with JsonMessage to queue', () async { + test('Adds XLogDataLogicalMessage with JsonMessage to queue', () { final bits64 = (ByteData(8)..setUint64(0, 42)).buffer.asUint8List(); /// represent an empty json object so we should get a XLogDataLogicalMessage @@ -264,13 +259,13 @@ void main() { ...xlogDataMessage, ]; - await framer.addBytes(Uint8List.fromList(copyDataMessage)); + framer.addBytes(Uint8List.fromList(copyDataMessage)); final message = framer.messageQueue.toList().first; expect(message, isA()); expect((message as XLogDataLogicalMessage).message, isA()); }); - test('Adds PrimaryKeepAliveMessage to queue', () async { + test('Adds PrimaryKeepAliveMessage to queue', () { final bits64 = (ByteData(8)..setUint64(0, 42)).buffer.asUint8List(); /// This represent a raw [PrimaryKeepAliveMessage] @@ -290,12 +285,12 @@ void main() { ...xlogDataMessage, ]; - await framer.addBytes(Uint8List.fromList(copyDataMessage)); + framer.addBytes(Uint8List.fromList(copyDataMessage)); final message = framer.messageQueue.toList().first; expect(message, isA()); }); - test('Adds raw CopyDataMessage for unknown stream message', () async { + test('Adds raw CopyDataMessage for unknown stream message', () { final xlogDataBytes = [ -1, // unknown id ]; @@ -310,7 +305,7 @@ void main() { ...xlogDataBytes, ]; - await framer.addBytes(Uint8List.fromList(copyDataMessage)); + framer.addBytes(Uint8List.fromList(copyDataMessage)); final message = framer.messageQueue.toList().first; expect(message, isA()); }); @@ -336,9 +331,9 @@ Uint8List bufferWithMessages(List> messages) { return Uint8List.fromList(messages.expand((l) => l).toList()); } -Future flush(MessageFramer framer) async { +void flush(MessageFramer framer) { framer.messageQueue.clear(); - await framer.addBytes(bufferWithMessages([ + framer.addBytes(bufferWithMessages([ messageWithBytes([1, 2, 3], 1) ]));