From e1781633a8a843b8901ab8b71cdfdf80fad690af Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 13 Nov 2023 11:13:29 +0100 Subject: [PATCH] Add test to lease multiple connections at once (#440) - Add test to lease multiple connections at once - Rename `Waiter` to `Future` - Rename `Waiter.Result` to `Future.Success` --- .../ConnectionPoolTests.swift | 86 ++++++++++++++++++- .../Mocks/MockConnectionFactory.swift | 2 +- .../Utils/{Waiter.swift => Future.swift} | 25 +++--- 3 files changed, 99 insertions(+), 14 deletions(-) rename Tests/ConnectionPoolModuleTests/Utils/{Waiter.swift => Future.swift} (77%) diff --git a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift index 4d4cac95..a4c2cde7 100644 --- a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift +++ b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift @@ -401,7 +401,7 @@ final class ConnectionPoolTests: XCTestCase { _ = try await pool.leaseConnection() } - let connectionAttemptWaiter = Waiter(of: Void.self) + let connectionAttemptWaiter = Future(of: Void.self) taskGroup.addTask { try await factory.nextConnectAttempt { connectionID in @@ -410,7 +410,7 @@ final class ConnectionPoolTests: XCTestCase { } } - try await connectionAttemptWaiter.result + try await connectionAttemptWaiter.success leaseTask.cancel() let taskResult = await leaseTask.result @@ -427,5 +427,87 @@ final class ConnectionPoolTests: XCTestCase { } } } + + func testLeasingMultipleConnectionsAtOnceWorks() async throws { + let clock = MockClock() + let factory = MockConnectionFactory() + let keepAliveDuration = Duration.seconds(30) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + + var mutableConfig = ConnectionPoolConfiguration() + mutableConfig.minimumConnectionCount = 4 + mutableConfig.maximumConnectionSoftLimit = 4 + mutableConfig.maximumConnectionHardLimit = 4 + mutableConfig.idleTimeout = .seconds(10) + let config = mutableConfig + + let pool = ConnectionPool( + configuration: config, + idGenerator: ConnectionIDGenerator(), + requestType: ConnectionFuture.self, + keepAliveBehavior: keepAlive, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, for: $1) + } + + try await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + await pool.run() + } + + // create 4 persisted connections + for _ in 0..<4 { + await factory.nextConnectAttempt { connectionID in + return 1 + } + } + + // create 4 connection requests + let requests = (0..<4).map { ConnectionFuture(id: $0) } + + // lease 4 connections at once + pool.leaseConnections(requests) + var connections = [MockConnection]() + + for request in requests { + let connection = try await request.future.success + connections.append(connection) + } + + // Ensure that we got 4 distinct connections + XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 4) + + // release all 4 leased connections + for connection in connections { + pool.releaseConnection(connection) + } + + // shutdown + taskGroup.cancelAll() + for connection in factory.runningConnections { + connection.closeIfClosing() + } + } + } } +struct ConnectionFuture: ConnectionRequestProtocol { + let id: Int + let future: Future + + init(id: Int) { + self.id = id + self.future = Future(of: MockConnection.self) + } + + func complete(with result: Result) { + switch result { + case .success(let success): + self.future.yield(value: success) + case .failure(let failure): + self.future.yield(error: failure) + } + } +} diff --git a/Tests/ConnectionPoolModuleTests/Mocks/MockConnectionFactory.swift b/Tests/ConnectionPoolModuleTests/Mocks/MockConnectionFactory.swift index b0c94467..eec2e7c3 100644 --- a/Tests/ConnectionPoolModuleTests/Mocks/MockConnectionFactory.swift +++ b/Tests/ConnectionPoolModuleTests/Mocks/MockConnectionFactory.swift @@ -30,7 +30,7 @@ final class MockConnectionFactory where Clock.Duratio func makeConnection( id: Int, - for pool: ConnectionPool, Int, MockPingPongBehavior, NoOpConnectionPoolMetrics, Clock> + for pool: ConnectionPool, NoOpConnectionPoolMetrics, Clock> ) async throws -> ConnectionAndMetadata { // we currently don't support cancellation when creating a connection let result = try await withCheckedThrowingContinuation { (checkedContinuation: CheckedContinuation<(MockConnection, UInt16), any Error>) in diff --git a/Tests/ConnectionPoolModuleTests/Utils/Waiter.swift b/Tests/ConnectionPoolModuleTests/Utils/Future.swift similarity index 77% rename from Tests/ConnectionPoolModuleTests/Utils/Waiter.swift rename to Tests/ConnectionPoolModuleTests/Utils/Future.swift index 12cf90cc..2bee3216 100644 --- a/Tests/ConnectionPoolModuleTests/Utils/Waiter.swift +++ b/Tests/ConnectionPoolModuleTests/Utils/Future.swift @@ -1,31 +1,34 @@ import Atomics @testable import _ConnectionPoolModule -final class Waiter: Sendable { +/// This is a `Future` type that shall make writing tests a bit simpler. I'm well aware, that this is a pattern +/// that should not be embraced with structured concurrency. However writing all tests in full structured +/// concurrency is an effort, that isn't worth the endgoals in my view. +final class Future: Sendable { struct State: Sendable { - var result: Swift.Result? = nil - var continuations: [(Int, CheckedContinuation)] = [] + var result: Swift.Result? = nil + var continuations: [(Int, CheckedContinuation)] = [] } let waiterID = ManagedAtomic(0) let stateBox: NIOLockedValueBox = NIOLockedValueBox(State()) - init(of: Result.Type) {} + init(of: Success.Type) {} enum GetAction { case fail(any Error) - case succeed(Result) + case succeed(Success) case none } - var result: Result { + var success: Success { get async throws { let waiterID = self.waiterID.loadThenWrappingIncrement(ordering: .relaxed) return try await withTaskCancellationHandler { - return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in let action = self.stateBox.withLockedValue { state -> GetAction in if Task.isCancelled { return .fail(CancellationError()) @@ -56,7 +59,7 @@ final class Waiter: Sendable { } } } onCancel: { - let cont = self.stateBox.withLockedValue { state -> CheckedContinuation? in + let cont = self.stateBox.withLockedValue { state -> CheckedContinuation? in guard state.result == nil else { return nil } guard let contIndex = state.continuations.firstIndex(where: { $0.0 == waiterID }) else { @@ -71,10 +74,10 @@ final class Waiter: Sendable { } } - func yield(value: Result) { + func yield(value: Success) { let continuations = self.stateBox.withLockedValue { state in guard state.result == nil else { - return [(Int, CheckedContinuation)]().lazy.map(\.1) + return [(Int, CheckedContinuation)]().lazy.map(\.1) } state.result = .success(value) @@ -92,7 +95,7 @@ final class Waiter: Sendable { func yield(error: any Error) { let continuations = self.stateBox.withLockedValue { state in guard state.result == nil else { - return [(Int, CheckedContinuation)]().lazy.map(\.1) + return [(Int, CheckedContinuation)]().lazy.map(\.1) } state.result = .failure(error)