Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout triggers current statement cancellation (using a new connection. #380

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- `DatabaseInfo` tracks information about relations and oids (currently limited to `RelationMessage` caching).
- **Behaviour / soft-breaking changes**:
- Preparing/executing a stamement on the main connection while in a `runTx` callback will throw an exception.
- Setting `timeout` will try to actively cancel the current statement using a new connection.
- Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages).
- Deprecated some logical replication message parsing method.
- Removed `@internal`-annotated methods from the public API of `ServerException` and `Severity`.
Expand Down
10 changes: 9 additions & 1 deletion lib/src/exceptions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,21 @@ ServerException buildExceptionFromErrorFields(List<ErrorField> errorFields) {
);
}

PgException transformServerException(ServerException ex) {
PgException transformServerException(
ServerException ex, {
bool timeoutTriggered = false,
}) {
if (ex.code == '57014' &&
ex.message == 'canceling statement due to statement timeout') {
return _PgTimeoutException(
['${ex.code}:', ex.message, ex.trace].whereType<String>().join(' '),
);
}
if (ex.code == '57014' && timeoutTriggered) {
return _PgTimeoutException(
['${ex.code}:', ex.message, ex.trace].whereType<String>().join(' '),
);
}
return ex;
}

Expand Down
48 changes: 36 additions & 12 deletions lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,9 @@ abstract class _PgSessionBase implements Session {
ignoreRows,
);
try {
await querySubscription.asFuture().optionalTimeout(timeout);
return Result(
rows: items,
affectedRows: await querySubscription.affectedRows,
schema: await querySubscription.schema,
return await querySubscription._waitForResult(
items: items,
timeout: timeout,
);
} finally {
await querySubscription.cancel();
Expand Down Expand Up @@ -260,6 +258,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
),
async.StreamSinkTransformer.fromHandlers(handleData: (msg, sink) {
print('[$hash][out] $msg');
print('[out] $msg');
sink.add(msg);
}),
));
Expand Down Expand Up @@ -655,16 +654,13 @@ class _PreparedStatement extends Statement {
final items = <ResultRow>[];
final subscription = bind(parameters).listen(items.add);
try {
await subscription.asFuture().optionalTimeout(timeout);
return await (subscription as _PgResultStreamSubscription)._waitForResult(
items: items,
timeout: timeout,
);
} finally {
await subscription.cancel();
}

return Result(
rows: items,
affectedRows: await subscription.affectedRows,
schema: await subscription.schema,
);
}

@override
Expand Down Expand Up @@ -892,6 +888,34 @@ class _PgResultStreamSubscription
}
}

Future<Result> _waitForResult({
required List<ResultRow> items,
required Duration? timeout,
}) async {
bool timeoutTriggered = false;
final cancelTimer = timeout == null
? null
: Timer(timeout, () async {
timeoutTriggered = true;
await connection.cancelPendingStatement();
});
try {
await asFuture();
return Result(
rows: items,
affectedRows: await affectedRows,
schema: await schema,
);
} on ServerException catch (e) {
if (timeoutTriggered) {
throw transformServerException(e, timeoutTriggered: timeoutTriggered);
}
rethrow;
} finally {
cancelTimer?.cancel();
}
}

// Forwarding subscription interface to regular stream subscription from
// controller

Expand Down
44 changes: 29 additions & 15 deletions test/timeout_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,33 @@ void main() {

// Note: to fix this, we may consider cancelling the currently running statements:
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-CANCELING-REQUESTS
// withPostgresServer('timeout race conditions', (server) {
// test('two transactions for update', () async {
// final c1 = await server.newConnection();
// final c2 = await server.newConnection();
// await c1.execute('CREATE TABLE t (id INT PRIMARY KEY);');
// await c1.execute('INSERT INTO t (id) values (1);');
// await c1.execute('BEGIN');
// await c1.execute('SELECT * FROM t WHERE id=1 FOR UPDATE');
// await c2.execute('BEGIN');
// await c2.execute('SELECT * FROM t WHERE id=1 FOR UPDATE',
// timeout: Duration(seconds: 1));
// await c1.execute('ROLLBACK');
// await c2.execute('ROLLBACK');
// });
// });
withPostgresServer('timeout race conditions', (server) {
setUp(() async {
final c1 = await server.newConnection();
await c1.execute('CREATE TABLE t (id INT PRIMARY KEY);');
await c1.execute('INSERT INTO t (id) values (1);');
});

test('two transactions for update', () async {
for (final qm in QueryMode.values) {
final c1 = await server.newConnection();
final c2 = await server.newConnection(queryMode: qm);
await c1.execute('BEGIN');
await c1.execute('SELECT * FROM t WHERE id=1 FOR UPDATE');
await c2.execute('BEGIN');
try {
await c2.execute('SELECT * FROM t WHERE id=1 FOR UPDATE',
timeout: Duration(seconds: 1));
fail('unreachable');
} on TimeoutException catch (_) {
// ignore
}
await c1.execute('ROLLBACK');
await c2.execute('ROLLBACK');

await c1.execute('SELECT 1');
await c2.execute('SELECT 1');
}
});
});
}
Loading