Skip to content

Commit

Permalink
Implementing Pool.close(force: true). (#405)
Browse files Browse the repository at this point in the history
  • Loading branch information
isoos authored Dec 22, 2024
1 parent 527f53c commit 5a24e72
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Fix: do not close connection after an empty statement.
- Fix: `close(force: true)` does not cause uncaught exception.
- Implemented `Pool.close(force: true)` (using [davidmartos96](https://github.com/davidmartos96)'s [#397](https://github.com/isoos/postgresql-dart/pull/397) as baseline).

## 3.4.5

Expand Down
19 changes: 11 additions & 8 deletions lib/src/pool/pool_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,31 @@ class PoolImplementation<L> implements Pool<L> {
1,
timeout: _settings.connectTimeout,
);
bool _closing = false;

PoolImplementation(this._selector, PoolSettings? settings)
: _settings = ResolvedPoolSettings(settings);

@override
bool get isOpen => !_semaphore.isClosed;
bool get isOpen => !_closing;

@override
Future<void> get closed => _semaphore.done;

@override
Future<void> close({bool force = false}) async {
// TODO: Implement force close.
await _semaphore.close();
_closing = true;
final semaphoreFuture = _semaphore.close();

// Connections are closed when they are returned to the pool if it's closed.
// We still need to close statements that are currently unused.
for (final connection in [..._connections]) {
if (!connection._isInUse) {
await connection._dispose();
if (force || !connection._isInUse) {
await connection._dispose(force: force);
}
}

await semaphoreFuture;
}

@override
Expand Down Expand Up @@ -164,7 +167,7 @@ class PoolImplementation<L> implements Pool<L> {
// well.
if (connection != null) {
connection._elapsedInUse += sw.elapsed;
if (_semaphore.isClosed || !reuse || !connection.isOpen) {
if (_closing || !reuse || !connection.isOpen) {
await connection._dispose();
} else {
// Allow the connection to be re-used later.
Expand Down Expand Up @@ -255,9 +258,9 @@ class _PoolConnection implements Connection {
return false;
}

Future<void> _dispose() async {
Future<void> _dispose({bool force = false}) async {
_pool._connections.remove(this);
await _connection.close();
await _connection.close(force: force);
}

@override
Expand Down
35 changes: 18 additions & 17 deletions test/pool_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ void main() {
});
});

group(skip: 'not implemented', 'force close', () {
group('force close', () {
Future<Pool> openPool(PostgresServer server) async {
final pool = Pool.withEndpoints(
[await server.endpoint()],
Expand All @@ -235,35 +235,36 @@ void main() {
}

withPostgresServer('pool session', (server) {
test('', () async {
test('pool session', () async {
final pool = await openPool(server);
// ignore: unawaited_futures
runLongQuery(pool);
final started = Completer();
final rs = pool.run((s) async {
started.complete();
await runLongQuery(s);
});
// let it start
await started.future;
await Future.delayed(const Duration(milliseconds: 100));
await expectPoolClosesForcefully(pool);

await expectLater(() => rs, throwsA(isA<PgException>()));
});
});

withPostgresServer('tx session', (server) {
test('', () async {
test('tx', () async {
final pool = await openPool(server);
// Ignore async error, it will fail when the connection is closed and it tries to do COMMIT
pool.runTx(runLongQuery).ignore();
final started = Completer();
final rs = pool.runTx((s) async {
started.complete();
await runLongQuery(s);
});
// let it start
await started.future;
await Future.delayed(const Duration(milliseconds: 100));
await expectPoolClosesForcefully(pool);
});
});

withPostgresServer('run session', (server) {
test('', () async {
final pool = await openPool(server);
// ignore: unawaited_futures
pool.run(runLongQuery);
// let it start
await Future.delayed(const Duration(milliseconds: 100));
await expectPoolClosesForcefully(pool);
await expectLater(() => rs, throwsA(isA<PgException>()));
});
});
});
Expand Down

0 comments on commit 5a24e72

Please sign in to comment.