Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Asynchronous Codec methods + updated (deprecated) message processing." #374

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- `Connection.info` (through `ConnectionInfo` class) exposes read-only connection-level information,
e.g. acessing access server-provided parameter status values.
- Support for binary `pgoutput` replication by [wolframm](https://github.com/Wolframm-Activities-OU).
- Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages).
- **Allowing custom type codecs**:
- `Codec` interface is used for encoding/decoding value by type OIDs or Dart values.
- `Codec.encode` and `Codec.decode` gets a reference to `CodecContext` which provides
Expand All @@ -13,8 +14,6 @@
(for values where type is not specified).
- `RelationTracker` tracks information about relations (currently limited to `RelationMessage` caching).
- **Behaviour / soft-breaking changes**:
- Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages).
- Deprecated some logical replication message parsing method.
- Removed `@internal`-annotated methods from the public API of `ServerException` and `Severity`.
- `ServerException` may be transformed into `_PgTimeoutException` which is both `PgException` and `TimeoutException` (but no longer `ServerException`).
- The `timeout` parameters and the `SessionSettings.queryTimeout` has only a somewhat
Expand Down
5 changes: 2 additions & 3 deletions lib/messages.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ library messages;

export 'src/buffer.dart' show PgByteDataWriter;
export 'src/messages/client_messages.dart';
export 'src/messages/logical_replication_messages.dart'
hide tryAsyncParseLogicalReplicationMessage;
export 'src/messages/server_messages.dart' hide parseXLogDataMessage;
export 'src/messages/logical_replication_messages.dart';
export 'src/messages/server_messages.dart';
export 'src/messages/shared_messages.dart';
10 changes: 4 additions & 6 deletions lib/src/message_window.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import 'dart:async';
import 'dart:collection';
import 'dart:typed_data';

Expand All @@ -12,7 +11,7 @@ import 'messages/shared_messages.dart';

const int _headerByteSize = 5;

typedef _ServerMessageFn = FutureOr<ServerMessage> Function(
typedef _ServerMessageFn = ServerMessage Function(
PgByteDataReader reader, int length);

Map<int, _ServerMessageFn> _messageTypeMap = {
Expand Down Expand Up @@ -51,7 +50,7 @@ class MessageFramer {
bool get _isComplete =>
_expectedLength == 0 || _expectedLength <= _reader.remainingLength;

Future<void> addBytes(Uint8List bytes) async {
void addBytes(Uint8List bytes) {
_reader.add(bytes);

while (true) {
Expand All @@ -77,7 +76,7 @@ class MessageFramer {
}

final targetRemainingLength = _reader.remainingLength - _expectedLength;
final msg = await msgMaker(_reader, _expectedLength);
final msg = msgMaker(_reader, _expectedLength);
if (_reader.remainingLength > targetRemainingLength) {
throw StateError(
'Message parser consumed more bytes than expected. type=$_type expectedLength=$_expectedLength');
Expand Down Expand Up @@ -112,8 +111,7 @@ class MessageFramer {
/// such as replication messages.
/// Returns a [ReplicationMessage] if the message contains such message.
/// Otherwise, it'll just return the provided bytes as [CopyDataMessage].
Future<ServerMessage> _parseCopyDataMessage(
PgByteDataReader reader, int length) async {
ServerMessage _parseCopyDataMessage(PgByteDataReader reader, int length) {
final code = reader.readUint8();
if (code == ReplicationMessageId.primaryKeepAlive) {
return PrimaryKeepAliveMessage.parse(reader);
Expand Down
201 changes: 11 additions & 190 deletions lib/src/messages/logical_replication_messages.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import 'dart:typed_data';

import 'package:buffer/buffer.dart';
import 'package:meta/meta.dart';
import 'package:postgres/src/types/codec.dart';

import '../buffer.dart';
Expand Down Expand Up @@ -48,8 +47,6 @@ class XLogDataLogicalMessage implements XLogDataMessage {

/// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so,
/// [LogicalReplicationMessage] is returned, otherwise `null` is returned.
@Deprecated('This method will be removed from public API. '
'Please file a new issue on GitHub if you are using it.')
LogicalReplicationMessage? tryParseLogicalReplicationMessage(
PgByteDataReader reader, int length) {
// the first byte is the msg type
Expand All @@ -72,66 +69,13 @@ LogicalReplicationMessage? tryParseLogicalReplicationMessage(
return TypeMessage._parse(reader);

case LogicalReplicationMessageTypes.insert:
return InsertMessage._syncParse(reader);
return InsertMessage._parse(reader);

case LogicalReplicationMessageTypes.update:
return UpdateMessage._syncParse(reader);
return UpdateMessage._parse(reader);

case LogicalReplicationMessageTypes.delete:
return DeleteMessage._syncParse(reader);

case LogicalReplicationMessageTypes.truncate:
return TruncateMessage._parse(reader);

case LogicalReplicationMessageTypes.unsupported:
// wal2json messages starts with `{` as the first byte
if (firstByte == '{'.codeUnits.single) {
// note this needs the full set of bytes unlike other cases
final bb = BytesBuffer();
bb.addByte(firstByte);
bb.add(reader.read(length - 1));
try {
return JsonMessage(reader.encoding.decode(bb.toBytes()));
} catch (_) {
// ignore
}
}
return null;
}
}

/// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so,
/// [LogicalReplicationMessage] is returned, otherwise `null` is returned.
@internal
Future<LogicalReplicationMessage?> tryAsyncParseLogicalReplicationMessage(
PgByteDataReader reader, int length) async {
// the first byte is the msg type
final firstByte = reader.readUint8();
final msgType = LogicalReplicationMessageTypes.fromByte(firstByte);
switch (msgType) {
case LogicalReplicationMessageTypes.begin:
return BeginMessage._parse(reader);

case LogicalReplicationMessageTypes.commit:
return CommitMessage._parse(reader);

case LogicalReplicationMessageTypes.origin:
return OriginMessage._parse(reader);

case LogicalReplicationMessageTypes.relation:
return RelationMessage._parse(reader);

case LogicalReplicationMessageTypes.type:
return TypeMessage._parse(reader);

case LogicalReplicationMessageTypes.insert:
return await InsertMessage._parse(reader);

case LogicalReplicationMessageTypes.update:
return await UpdateMessage._parse(reader);

case LogicalReplicationMessageTypes.delete:
return await DeleteMessage._parse(reader);
return DeleteMessage._parse(reader);

case LogicalReplicationMessageTypes.truncate:
return TruncateMessage._parse(reader);
Expand Down Expand Up @@ -437,9 +381,7 @@ class TupleData {
/// TupleData does not consume the entire bytes
///
/// It'll read until the types are generated.
///
/// NOTE: do not use, will be removed.
factory TupleData._syncParse(PgByteDataReader reader, int relationId) {
factory TupleData._parse(PgByteDataReader reader, int relationId) {
final columnCount = reader.readUint16();
final columns = <TupleDataColumn>[];
for (var i = 0; i < columnCount; i++) {
Expand Down Expand Up @@ -495,66 +437,6 @@ class TupleData {
return TupleData(columns: columns);
}

/// TupleData does not consume the entire bytes
///
/// It'll read until the types are generated.
static Future<TupleData> _parse(
PgByteDataReader reader, int relationId) async {
final columnCount = reader.readUint16();
final columns = <TupleDataColumn>[];
for (var i = 0; i < columnCount; i++) {
// reading order matters
final typeId = reader.readUint8();
final tupleDataType = TupleDataType.fromByte(typeId);
late final int length;
late final String data;
final typeOid = reader.codecContext.relationTracker
.getCachedTypeOidForRelationColumn(relationId, i);
Object? value;
switch (tupleDataType) {
case TupleDataType.text:
length = reader.readUint32();
data = reader.encoding.decode(reader.read(length));
value = data;
break;
case TupleDataType.binary:
length = reader.readUint32();
final bytes = reader.read(length);
value = typeOid == null
? UndecodedBytes(
typeOid: 0,
isBinary: true,
bytes: bytes,
encoding: reader.codecContext.encoding,
)
: await reader.codecContext.typeRegistry.decode(
EncodedValue.binary(
bytes,
typeOid: typeOid,
),
reader.codecContext,
);
data = value.toString();
break;
case TupleDataType.null_:
case TupleDataType.toast:
length = 0;
data = '';
break;
}
columns.add(
TupleDataColumn(
typeId: typeId,
length: length,
typeOid: typeOid,
data: data,
value: value,
),
);
}
return TupleData(columns: columns);
}

late final int columnCount = columns.length;

@override
Expand All @@ -569,26 +451,13 @@ class InsertMessage implements LogicalReplicationMessage {
late final int relationId;
late final TupleData tuple;

InsertMessage._(this.relationId, this.tuple);

/// NOTE: do not use, will be removed.
InsertMessage._syncParse(PgByteDataReader reader) {
InsertMessage._parse(PgByteDataReader reader) {
relationId = reader.readUint32();
final tupleType = reader.readUint8();
if (tupleType != 'N'.codeUnitAt(0)) {
throw Exception("InsertMessage must have 'N' tuple type");
}
tuple = TupleData._syncParse(reader, relationId);
}

static Future<InsertMessage> _parse(PgByteDataReader reader) async {
final relationId = reader.readUint32();
final tupleType = reader.readUint8();
if (tupleType != 'N'.codeUnitAt(0)) {
throw Exception("InsertMessage must have 'N' tuple type");
}
final tuple = await TupleData._parse(reader, relationId);
return InsertMessage._(relationId, tuple);
tuple = TupleData._parse(reader, relationId);
}

@override
Expand Down Expand Up @@ -642,58 +511,28 @@ class UpdateMessage implements LogicalReplicationMessage {
/// Byte1('N'): Identifies the following TupleData message as a new tuple.
late final TupleData? newTuple;

UpdateMessage._(
this.relationId, this.oldTupleType, this.oldTuple, this.newTuple);

/// NOTE: do not use, will be removed.
UpdateMessage._syncParse(PgByteDataReader reader) {
UpdateMessage._parse(PgByteDataReader reader) {
// reading order matters
relationId = reader.readUint32();
var tupleType = UpdateMessageTuple.fromByte(reader.readUint8());

if (tupleType == UpdateMessageTuple.oldType ||
tupleType == UpdateMessageTuple.keyType) {
oldTupleType = tupleType;
oldTuple = TupleData._syncParse(reader, relationId);
oldTuple = TupleData._parse(reader, relationId);
tupleType = UpdateMessageTuple.fromByte(reader.readUint8());
} else {
oldTupleType = null;
oldTuple = null;
}

if (tupleType == UpdateMessageTuple.newType) {
newTuple = TupleData._syncParse(reader, relationId);
newTuple = TupleData._parse(reader, relationId);
} else {
throw Exception('Invalid Tuple Type for UpdateMessage');
}
}

static Future<UpdateMessage> _parse(PgByteDataReader reader) async {
// reading order matters
final relationId = reader.readUint32();
UpdateMessageTuple? oldTupleType;
TupleData? oldTuple;
TupleData? newTuple;
var tupleType = UpdateMessageTuple.fromByte(reader.readUint8());

if (tupleType == UpdateMessageTuple.oldType ||
tupleType == UpdateMessageTuple.keyType) {
oldTupleType = tupleType;
oldTuple = await TupleData._parse(reader, relationId);
tupleType = UpdateMessageTuple.fromByte(reader.readUint8());
} else {
oldTupleType = null;
oldTuple = null;
}

if (tupleType == UpdateMessageTuple.newType) {
newTuple = await TupleData._parse(reader, relationId);
} else {
throw Exception('Invalid Tuple Type for UpdateMessage');
}
return UpdateMessage._(relationId, oldTupleType, oldTuple, newTuple);
}

@override
String toString() {
return 'UpdateMessage(relationId: $relationId, oldTupleType: $oldTupleType, oldTuple: $oldTuple, newTuple: $newTuple)';
Expand Down Expand Up @@ -744,36 +583,18 @@ class DeleteMessage implements LogicalReplicationMessage {
/// Byte1('N'): Identifies the following TupleData message as a new tuple.
late final TupleData oldTuple;

DeleteMessage._(this.relationId, this.oldTupleType, this.oldTuple);

/// NOTE: do not use, will be removed.
DeleteMessage._syncParse(PgByteDataReader reader) {
DeleteMessage._parse(PgByteDataReader reader) {
relationId = reader.readUint32();
oldTupleType = DeleteMessageTuple.fromByte(reader.readUint8());

switch (oldTupleType) {
case DeleteMessageTuple.keyType:
case DeleteMessageTuple.oldType:
oldTuple = TupleData._syncParse(reader, relationId);
break;
case DeleteMessageTuple.unknown:
throw Exception('Unknown tuple type for DeleteMessage');
}
}

static Future<DeleteMessage> _parse(PgByteDataReader reader) async {
final relationId = reader.readUint32();
final oldTupleType = DeleteMessageTuple.fromByte(reader.readUint8());
TupleData? oldTuple;
switch (oldTupleType) {
case DeleteMessageTuple.keyType:
case DeleteMessageTuple.oldType:
oldTuple = await TupleData._parse(reader, relationId);
oldTuple = TupleData._parse(reader, relationId);
break;
case DeleteMessageTuple.unknown:
throw Exception('Unknown tuple type for DeleteMessage');
}
return DeleteMessage._(relationId, oldTupleType, oldTuple);
}

@override
Expand Down
Loading
Loading