Skip to content

Commit

Permalink
Fixes Crash in ConnectionPoolStateMachine (#438)
Browse files Browse the repository at this point in the history
- Correctly handle Connection closes while running a keep alive (fix: #436)
- Add further keep alive tests
- Restructure MockClock quite a bit
  • Loading branch information
fabianfett authored Nov 10, 2023
1 parent c826992 commit 036931d
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ extension PoolStateMachine {
@inlinable
mutating func keepAliveSucceeded(_ connectionID: Connection.ID) -> (Int, AvailableConnectionContext)? {
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
preconditionFailure("A connection that we don't know was released? Something is very wrong...")
// keepAliveSucceeded can race against, closeIfIdle, shutdowns or connection errors
return nil
}

guard let connectionInfo = self.connections[index].keepAliveSucceeded() else {
Expand Down Expand Up @@ -430,15 +431,8 @@ extension PoolStateMachine {

self.stats.idle -= 1
self.stats.closing += 1

// if idleState.runningKeepAlive {
// self.stats.runningKeepAlive -= 1
// if self.keepAliveReducesAvailableStreams {
// self.stats.availableStreams += 1
// }
// }

self.stats.availableStreams -= closeAction.maxStreams
self.stats.runningKeepAlive -= closeAction.runningKeepAlive ? 1 : 0
self.stats.availableStreams -= closeAction.maxStreams - closeAction.usedStreams

return CloseAction(
connection: closeAction.connection!,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,20 +496,25 @@ extension PoolStateMachine {
var usedStreams: UInt16
@usableFromInline
var maxStreams: UInt16
@usableFromInline
var runningKeepAlive: Bool


@inlinable
init(
connection: Connection?,
previousConnectionState: PreviousConnectionState,
cancelTimers: Max2Sequence<TimerCancellationToken>,
usedStreams: UInt16,
maxStreams: UInt16
maxStreams: UInt16,
runningKeepAlive: Bool
) {
self.connection = connection
self.previousConnectionState = previousConnectionState
self.cancelTimers = cancelTimers
self.usedStreams = usedStreams
self.maxStreams = maxStreams
self.runningKeepAlive = runningKeepAlive
}
}

Expand All @@ -526,7 +531,8 @@ extension PoolStateMachine {
idleTimerState?.cancellationContinuation
),
usedStreams: keepAlive.usedStreams,
maxStreams: maxStreams
maxStreams: maxStreams,
runningKeepAlive: keepAlive.isRunning
)

case .leased, .closed:
Expand Down Expand Up @@ -559,7 +565,8 @@ extension PoolStateMachine {
idleTimerState?.cancellationContinuation
),
usedStreams: keepAlive.usedStreams,
maxStreams: maxStreams
maxStreams: maxStreams,
runningKeepAlive: keepAlive.isRunning
)

case .leased(let connection, usedStreams: let usedStreams, maxStreams: let maxStreams, var keepAlive):
Expand All @@ -571,7 +578,8 @@ extension PoolStateMachine {
keepAlive.cancelTimerIfScheduled()
),
usedStreams: keepAlive.usedStreams + usedStreams,
maxStreams: maxStreams
maxStreams: maxStreams,
runningKeepAlive: keepAlive.isRunning
)

case .backingOff(let timer):
Expand All @@ -581,7 +589,8 @@ extension PoolStateMachine {
previousConnectionState: .backingOff,
cancelTimers: Max2Sequence(timer.cancellationContinuation),
usedStreams: 0,
maxStreams: 0
maxStreams: 0,
runningKeepAlive: false
)
}
}
Expand Down
158 changes: 153 additions & 5 deletions Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ final class ConnectionPoolTests: XCTestCase {
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionRequest<MockConnection>.self,
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil),
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self),
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: ContinuousClock()
) {
Expand Down Expand Up @@ -74,7 +74,7 @@ final class ConnectionPoolTests: XCTestCase {
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionRequest<MockConnection>.self,
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil),
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self),
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: clock
) {
Expand Down Expand Up @@ -119,7 +119,7 @@ final class ConnectionPoolTests: XCTestCase {
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionRequest<MockConnection>.self,
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil),
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self),
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: clock
) {
Expand All @@ -135,7 +135,7 @@ final class ConnectionPoolTests: XCTestCase {
throw ConnectionCreationError()
}

await clock.timerScheduled()
await clock.nextTimerScheduled()

taskGroup.cancelAll()
}
Expand All @@ -156,7 +156,7 @@ final class ConnectionPoolTests: XCTestCase {
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionRequest<MockConnection>.self,
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil),
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self),
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: ContinuousClock()
) {
Expand Down Expand Up @@ -220,6 +220,154 @@ final class ConnectionPoolTests: XCTestCase {
XCTAssert(hasFinished.load(ordering: .relaxed))
XCTAssertEqual(factory.runningConnections.count, 0)
}

func testKeepAliveWorks() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
let keepAliveDuration = Duration.seconds(30)
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)

var mutableConfig = ConnectionPoolConfiguration()
mutableConfig.minimumConnectionCount = 0
mutableConfig.maximumConnectionSoftLimit = 1
mutableConfig.maximumConnectionHardLimit = 1
let config = mutableConfig

let pool = ConnectionPool(
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionRequest<MockConnection>.self,
keepAliveBehavior: keepAlive,
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, for: $1)
}

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
await pool.run()
}

async let lease1ConnectionAsync = pool.leaseConnection()

let connection = await factory.nextConnectAttempt { connectionID in
return 1
}

let lease1Connection = try await lease1ConnectionAsync
XCTAssert(connection === lease1Connection)

pool.releaseConnection(lease1Connection)

// keep alive 1

// validate that a keep alive timer and an idle timeout timer is scheduled
var expectedInstants: Set<MockClock.Instant> = [.init(keepAliveDuration), .init(config.idleTimeout)]
let deadline1 = await clock.nextTimerScheduled()
print(deadline1)
XCTAssertNotNil(expectedInstants.remove(deadline1))
let deadline2 = await clock.nextTimerScheduled()
print(deadline2)
XCTAssertNotNil(expectedInstants.remove(deadline2))
XCTAssert(expectedInstants.isEmpty)

// move clock forward to keep alive
let newTime = clock.now.advanced(by: keepAliveDuration)
clock.advance(to: newTime)
print("clock advanced to: \(newTime)")

await keepAlive.nextKeepAlive { keepAliveConnection in
defer { print("keep alive 1 has run") }
XCTAssertTrue(keepAliveConnection === lease1Connection)
return true
}

// keep alive 2

let deadline3 = await clock.nextTimerScheduled()
XCTAssertEqual(deadline3, clock.now.advanced(by: keepAliveDuration))
print(deadline3)

// race keep alive vs timeout
clock.advance(to: clock.now.advanced(by: keepAliveDuration))

taskGroup.cancelAll()

for connection in factory.runningConnections {
connection.closeIfClosing()
}
}
}

func testKeepAliveWorksRacesAgainstShutdown() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
let keepAliveDuration = Duration.seconds(30)
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)

var mutableConfig = ConnectionPoolConfiguration()
mutableConfig.minimumConnectionCount = 0
mutableConfig.maximumConnectionSoftLimit = 1
mutableConfig.maximumConnectionHardLimit = 1
let config = mutableConfig

let pool = ConnectionPool(
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionRequest<MockConnection>.self,
keepAliveBehavior: keepAlive,
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, for: $1)
}

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
await pool.run()
}

async let lease1ConnectionAsync = pool.leaseConnection()

let connection = await factory.nextConnectAttempt { connectionID in
return 1
}

let lease1Connection = try await lease1ConnectionAsync
XCTAssert(connection === lease1Connection)

pool.releaseConnection(lease1Connection)

// keep alive 1

// validate that a keep alive timer and an idle timeout timer is scheduled
var expectedInstants: Set<MockClock.Instant> = [.init(keepAliveDuration), .init(config.idleTimeout)]
let deadline1 = await clock.nextTimerScheduled()
print(deadline1)
XCTAssertNotNil(expectedInstants.remove(deadline1))
let deadline2 = await clock.nextTimerScheduled()
print(deadline2)
XCTAssertNotNil(expectedInstants.remove(deadline2))
XCTAssert(expectedInstants.isEmpty)

clock.advance(to: clock.now.advanced(by: keepAliveDuration))

await keepAlive.nextKeepAlive { keepAliveConnection in
defer { print("keep alive 1 has run") }
XCTAssertTrue(keepAliveConnection === lease1Connection)
return true
}

taskGroup.cancelAll()
print("cancelled")

for connection in factory.runningConnections {
connection.closeIfClosing()
}
}
}

}


Loading

0 comments on commit 036931d

Please sign in to comment.