From 5a24e72d6e357156bf2e6b1568fab713bdfeb5cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Istv=C3=A1n=20So=C3=B3s?= Date: Sun, 22 Dec 2024 20:09:05 +0100 Subject: [PATCH] Implementing Pool.close(force: true). (#405) --- CHANGELOG.md | 1 + lib/src/pool/pool_impl.dart | 19 +++++++++++-------- test/pool_test.dart | 35 ++++++++++++++++++----------------- 3 files changed, 30 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6386403..86419e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - Fix: do not close connection after an empty statement. - Fix: `close(force: true)` does not cause uncaught exception. +- Implemented `Pool.close(force: true)` (using [davidmartos96](https://github.com/davidmartos96)'s [#397](https://github.com/isoos/postgresql-dart/pull/397) as baseline). ## 3.4.5 diff --git a/lib/src/pool/pool_impl.dart b/lib/src/pool/pool_impl.dart index 3fbd5b1..6ca7e77 100644 --- a/lib/src/pool/pool_impl.dart +++ b/lib/src/pool/pool_impl.dart @@ -30,28 +30,31 @@ class PoolImplementation implements Pool { 1, timeout: _settings.connectTimeout, ); + bool _closing = false; PoolImplementation(this._selector, PoolSettings? settings) : _settings = ResolvedPoolSettings(settings); @override - bool get isOpen => !_semaphore.isClosed; + bool get isOpen => !_closing; @override Future get closed => _semaphore.done; @override Future close({bool force = false}) async { - // TODO: Implement force close. - await _semaphore.close(); + _closing = true; + final semaphoreFuture = _semaphore.close(); // Connections are closed when they are returned to the pool if it's closed. // We still need to close statements that are currently unused. for (final connection in [..._connections]) { - if (!connection._isInUse) { - await connection._dispose(); + if (force || !connection._isInUse) { + await connection._dispose(force: force); } } + + await semaphoreFuture; } @override @@ -164,7 +167,7 @@ class PoolImplementation implements Pool { // well. if (connection != null) { connection._elapsedInUse += sw.elapsed; - if (_semaphore.isClosed || !reuse || !connection.isOpen) { + if (_closing || !reuse || !connection.isOpen) { await connection._dispose(); } else { // Allow the connection to be re-used later. @@ -255,9 +258,9 @@ class _PoolConnection implements Connection { return false; } - Future _dispose() async { + Future _dispose({bool force = false}) async { _pool._connections.remove(this); - await _connection.close(); + await _connection.close(force: force); } @override diff --git a/test/pool_test.dart b/test/pool_test.dart index a407f30..372ccb8 100644 --- a/test/pool_test.dart +++ b/test/pool_test.dart @@ -211,7 +211,7 @@ void main() { }); }); - group(skip: 'not implemented', 'force close', () { + group('force close', () { Future openPool(PostgresServer server) async { final pool = Pool.withEndpoints( [await server.endpoint()], @@ -235,35 +235,36 @@ void main() { } withPostgresServer('pool session', (server) { - test('', () async { + test('pool session', () async { final pool = await openPool(server); - // ignore: unawaited_futures - runLongQuery(pool); + final started = Completer(); + final rs = pool.run((s) async { + started.complete(); + await runLongQuery(s); + }); // let it start + await started.future; await Future.delayed(const Duration(milliseconds: 100)); await expectPoolClosesForcefully(pool); + + await expectLater(() => rs, throwsA(isA())); }); }); withPostgresServer('tx session', (server) { - test('', () async { + test('tx', () async { final pool = await openPool(server); - // Ignore async error, it will fail when the connection is closed and it tries to do COMMIT - pool.runTx(runLongQuery).ignore(); + final started = Completer(); + final rs = pool.runTx((s) async { + started.complete(); + await runLongQuery(s); + }); // let it start + await started.future; await Future.delayed(const Duration(milliseconds: 100)); await expectPoolClosesForcefully(pool); - }); - }); - withPostgresServer('run session', (server) { - test('', () async { - final pool = await openPool(server); - // ignore: unawaited_futures - pool.run(runLongQuery); - // let it start - await Future.delayed(const Duration(milliseconds: 100)); - await expectPoolClosesForcefully(pool); + await expectLater(() => rs, throwsA(isA())); }); }); });