From 8fc2e5d98e0558bb3a7a0d24b0806fd088ed9abe Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Fri, 13 Oct 2023 09:52:05 +0100 Subject: [PATCH] Add a configurable escalation behaviour # Motivation The service group is often running multiple services and orchestrates shutdown for them. We have seen that sometimes some services never shutdown nor do they respond to task cancellation properly. This can become quite problematic when the whole application is waiting for a service to cancel and otherwise appears to be healthy but in reality can't serve any traffic. # Modification This PR adds a new configuration to escalate both graceful shutdown and cancellation. The escalation order is graceful shutdown -> task cancellation -> `fatalError`. The `fatalError` acts a last resort to make sure applications are never stuck. --- Sources/ServiceLifecycle/ServiceGroup.swift | 102 ++++++++++++++-- .../ServiceGroupConfiguration.swift | 62 ++++++++++ .../ServiceGroupTests.swift | 111 ++++++++++++++++-- 3 files changed, 260 insertions(+), 15 deletions(-) diff --git a/Sources/ServiceLifecycle/ServiceGroup.swift b/Sources/ServiceLifecycle/ServiceGroup.swift index 0dc0227..6710fa2 100644 --- a/Sources/ServiceLifecycle/ServiceGroup.swift +++ b/Sources/ServiceLifecycle/ServiceGroup.swift @@ -33,6 +33,8 @@ public actor ServiceGroup: Sendable { private let logger: Logger /// The logging configuration. private let loggingConfiguration: ServiceGroupConfiguration.LoggingConfiguration + /// The escalation configuration. + private let escalationConfiguration: ServiceGroupConfiguration.EscalationBehaviour /// The signals that lead to graceful shutdown. private let gracefulShutdownSignals: [UnixSignal] /// The signals that lead to cancellation. @@ -57,6 +59,7 @@ public actor ServiceGroup: Sendable { self.cancellationSignals = configuration.cancellationSignals self.logger = configuration.logger self.loggingConfiguration = configuration.logging + self.escalationConfiguration = configuration.escalation } /// Initializes a new ``ServiceGroup``. @@ -94,6 +97,7 @@ public actor ServiceGroup: Sendable { self.cancellationSignals = configuration.cancellationSignals self.logger = logger self.loggingConfiguration = configuration.logging + self.escalationConfiguration = configuration.escalation } /// Runs all the services by spinning up a child task per service. @@ -176,6 +180,8 @@ public actor ServiceGroup: Sendable { case signalSequenceFinished case gracefulShutdownCaught case gracefulShutdownFinished + case gracefulShutdownTimedOut + case cancellationCaught } private func _run( @@ -191,6 +197,10 @@ public actor ServiceGroup: Sendable { ] ) + // A task that is spawned when we got cancelled or + // we cancel the task group to keep track of a timeout. + var cancellationTimeoutTask: Task? + // Using a result here since we want a task group that has non-throwing child tasks // but the body itself is throwing let result = try await withThrowingTaskGroup(of: ChildTaskResult.self, returning: Result.self) { group in @@ -267,6 +277,13 @@ public actor ServiceGroup: Sendable { } } + group.addTask { + // This child task is waiting forever until the group gets cancelled. + let (stream, _) = AsyncStream.makeStream(of: Void.self) + await stream.first { _ in true } + return .cancellationCaught + } + // We are storing the services in an optional array now. When a slot in the array is // empty it indicates that the service has been shutdown. var services = services.map { Optional($0) } @@ -293,7 +310,7 @@ public actor ServiceGroup: Sendable { self.loggingConfiguration.keys.serviceKey: "\(service.service)", ] ) - group.cancelAll() + cancellationTimeoutTask = self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group) return .failure(ServiceGroupError.serviceFinishedUnexpectedly()) case .gracefullyShutdownGroup: @@ -307,6 +324,7 @@ public actor ServiceGroup: Sendable { do { try await self.shutdownGracefully( services: services, + cancellationTimeoutTask: &cancellationTimeoutTask, group: &group, gracefulShutdownManagers: gracefulShutdownManagers ) @@ -327,7 +345,7 @@ public actor ServiceGroup: Sendable { self.logger.debug( "All services finished." ) - group.cancelAll() + cancellationTimeoutTask = self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group) return .success(()) } } @@ -342,7 +360,7 @@ public actor ServiceGroup: Sendable { self.loggingConfiguration.keys.errorKey: "\(serviceError)", ] ) - group.cancelAll() + cancellationTimeoutTask = self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group) return .failure(serviceError) case .gracefullyShutdownGroup: @@ -358,6 +376,7 @@ public actor ServiceGroup: Sendable { do { try await self.shutdownGracefully( services: services, + cancellationTimeoutTask: &cancellationTimeoutTask, group: &group, gracefulShutdownManagers: gracefulShutdownManagers ) @@ -381,7 +400,7 @@ public actor ServiceGroup: Sendable { "All services finished." ) - group.cancelAll() + cancellationTimeoutTask = self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group) return .success(()) } } @@ -398,6 +417,7 @@ public actor ServiceGroup: Sendable { do { try await self.shutdownGracefully( services: services, + cancellationTimeoutTask: &cancellationTimeoutTask, group: &group, gracefulShutdownManagers: gracefulShutdownManagers ) @@ -413,7 +433,7 @@ public actor ServiceGroup: Sendable { ] ) - group.cancelAll() + cancellationTimeoutTask = self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group) } case .gracefulShutdownCaught: @@ -423,6 +443,7 @@ public actor ServiceGroup: Sendable { do { try await self.shutdownGracefully( services: services, + cancellationTimeoutTask: &cancellationTimeoutTask, group: &group, gracefulShutdownManagers: gracefulShutdownManagers ) @@ -430,12 +451,21 @@ public actor ServiceGroup: Sendable { return .failure(error) } + case .cancellationCaught: + // We caught cancellation in our child task so we have to spawn + // our cancellation timeout task if needed + self.logger.debug("Caught cancellation.") + cancellationTimeoutTask = self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group) + case .signalSequenceFinished, .gracefulShutdownFinished: // This can happen when we are either cancelling everything or // when the user did not specify any shutdown signals. We just have to tolerate // this. continue + case .gracefulShutdownTimedOut: + fatalError("Received gracefulShutdownTimedOut but never triggered a graceful shutdown") + case nil: fatalError("Invalid result from group.next(). We checked if the group is empty before and still got nil") } @@ -447,11 +477,13 @@ public actor ServiceGroup: Sendable { self.logger.debug( "Service lifecycle ended" ) + cancellationTimeoutTask?.cancel() try result.get() } private func shutdownGracefully( services: [ServiceGroupConfiguration.ServiceConfiguration?], + cancellationTimeoutTask: inout Task?, group: inout ThrowingTaskGroup, gracefulShutdownManagers: [GracefulShutdownManager] ) async throws { @@ -459,6 +491,14 @@ public actor ServiceGroup: Sendable { fatalError("Unexpected state") } + if #available(macOS 13.0, *), let maximumGracefulShutdownDuration = self.escalationConfiguration.maximumGracefulShutdownDuration { + group.addTask { + try await Task.sleep(for: maximumGracefulShutdownDuration) + return .gracefulShutdownTimedOut + } + } + + // We are storing the first error of a service that threw here. var error: Error? @@ -509,7 +549,7 @@ public actor ServiceGroup: Sendable { ] ) - group.cancelAll() + cancellationTimeoutTask = self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group) throw ServiceGroupError.serviceFinishedUnexpectedly() } @@ -561,9 +601,26 @@ public actor ServiceGroup: Sendable { ] ) - group.cancelAll() + cancellationTimeoutTask = self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group) } + case .gracefulShutdownTimedOut: + // Gracefully shutting down took longer than the user configured + // so we have to escalate it now. + self.logger.debug( + "Graceful shutdown took longer than allowed by the configuration. Cancelling the group now.", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(service.service)", + ] + ) + cancellationTimeoutTask = self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group) + + case .cancellationCaught: + // We caught cancellation in our child task so we have to spawn + // our cancellation timeout task if needed + self.logger.debug("Caught cancellation.") + cancellationTimeoutTask = self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group) + case .signalSequenceFinished, .gracefulShutdownCaught, .gracefulShutdownFinished: // We just have to tolerate this since signals and parent graceful shutdowns downs can race. continue @@ -575,7 +632,9 @@ public actor ServiceGroup: Sendable { // If we hit this then all services are shutdown. The only thing remaining // are the tasks that listen to the various graceful shutdown signals. We - // just have to cancel those + // just have to cancel those. + // In this case we don't have to spawn our cancellation timeout task since + // we are sure all other child tasks are handling cancellation appropriately. group.cancelAll() // If we saw an error during graceful shutdown from a service that triggers graceful @@ -584,6 +643,33 @@ public actor ServiceGroup: Sendable { throw error } } + + private func cancelGroupAndSpawnTimeoutIfNeeded( + group: inout ThrowingTaskGroup + ) -> Task? { + group.cancelAll() + if #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *), let maximumCancellationDuration = self.escalationConfiguration.maximumCancellationDuration { + // We have to spawn an unstructured task here because the call to our `run` + // method might have already been cancelled and we need to protect the sleep + // from being cancelled. + return Task { + do { + self.logger.debug( + "Task cancellation timeout task started." + ) + try await Task.sleep(for: maximumCancellationDuration) + self.logger.debug( + "Cancellation took longer than allowed by the configuration." + ) + fatalError("Cancellation took longer than allowed by the configuration.") + } catch { + // We got cancelled so our services must have finished up. + } + } + } else { + return nil + } + } } // This should be removed once we support Swift 5.9+ diff --git a/Sources/ServiceLifecycle/ServiceGroupConfiguration.swift b/Sources/ServiceLifecycle/ServiceGroupConfiguration.swift index 29c2898..b1a0f9d 100644 --- a/Sources/ServiceLifecycle/ServiceGroupConfiguration.swift +++ b/Sources/ServiceLifecycle/ServiceGroupConfiguration.swift @@ -96,6 +96,65 @@ public struct ServiceGroupConfiguration: Sendable { } } + /// The group's escalation configuration. + public struct EscalationBehaviour: Sendable { + /// The maximum amount of time that graceful shutdown is allowed to take. + /// + /// After this time has elapsed graceful shutdown will be escalated to task cancellation. + @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) + public var maximumGracefulShutdownDuration: Duration? { + get { + if let maximumGracefulShutdownDuration = self._maximumGracefulShutdownDuration { + return .init( + secondsComponent: maximumGracefulShutdownDuration.secondsComponent, + attosecondsComponent: maximumGracefulShutdownDuration.attosecondsComponent + ) + } else { + return nil + } + } + set { + if let newValue = newValue { + self._maximumGracefulShutdownDuration = (newValue.components.seconds, newValue.components.attoseconds) + } else { + self._maximumCancellationDuration = nil + } + } + } + + /// The maximum amount of time that task cancellation is allowed to take. + /// + /// After this time has elapsed task cancellation will be escalated to a `fatalError`. + /// + /// - Important: This setting is useful to guarantee that your application will exit at some point and + /// should be used to identify APIs that are not properly implementing task cancellation. + @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) + public var maximumCancellationDuration: Duration? { + get { + if let maximumCancellationDuration = self._maximumCancellationDuration { + return .init( + secondsComponent: maximumCancellationDuration.secondsComponent, + attosecondsComponent: maximumCancellationDuration.attosecondsComponent + ) + } else { + return nil + } + } + set { + if let newValue = newValue { + self._maximumCancellationDuration = (newValue.components.seconds, newValue.components.attoseconds) + } else { + self._maximumCancellationDuration = nil + } + } + } + + private var _maximumGracefulShutdownDuration: (secondsComponent: Int64, attosecondsComponent: Int64)? + private var _maximumCancellationDuration: (secondsComponent: Int64, attosecondsComponent: Int64)? + + public init() {} + } + /// The groups's service configurations. public var services: [ServiceConfiguration] @@ -111,6 +170,9 @@ public struct ServiceGroupConfiguration: Sendable { /// The group's logging configuration. public var logging = LoggingConfiguration() + /// The group's escalation configuration. + public var escalation = EscalationBehaviour() + /// Initializes a new ``ServiceGroupConfiguration``. /// /// - Parameters: diff --git a/Tests/ServiceLifecycleTests/ServiceGroupTests.swift b/Tests/ServiceLifecycleTests/ServiceGroupTests.swift index 693b413..bdae94e 100644 --- a/Tests/ServiceLifecycleTests/ServiceGroupTests.swift +++ b/Tests/ServiceLifecycleTests/ServiceGroupTests.swift @@ -86,6 +86,7 @@ private actor MockService: Service, CustomStringConvertible { } } +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) final class ServiceGroupTests: XCTestCase { func testRun_whenAlreadyRunning() async throws { let mockService = MockService(description: "Service1") @@ -1080,23 +1081,119 @@ final class ServiceGroupTests: XCTestCase { } } + func testGracefulShutdownEscalation() async throws { + let mockService = MockService(description: "Service1") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: mockService)], + gracefulShutdownSignals: [.sigalrm], + maximumGracefulShutdownDuration: .seconds(0.1), + maximumCancellationDuration: .seconds(0.5) + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator = mockService.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator.next(), .run) + + await serviceGroup.triggerGracefulShutdown() + + await XCTAsyncAssertEqual(await eventIterator.next(), .shutdownGracefully) + + await XCTAsyncAssertEqual(await eventIterator.next(), .runCancelled) + + try await Task.sleep(for: .seconds(0.2)) + + await mockService.resumeRunContinuation(with: .success(())) + + try await XCTAsyncAssertNoThrow(await group.next()) + } + } + + func testGracefulShutdownEscalation_whenNoCancellationEscalation() async throws { + let mockService = MockService(description: "Service1") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: mockService)], + gracefulShutdownSignals: [.sigalrm], + maximumGracefulShutdownDuration: .seconds(0.1), + maximumCancellationDuration: nil + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator = mockService.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator.next(), .run) + + await serviceGroup.triggerGracefulShutdown() + + await XCTAsyncAssertEqual(await eventIterator.next(), .shutdownGracefully) + + await XCTAsyncAssertEqual(await eventIterator.next(), .runCancelled) + + try await Task.sleep(for: .seconds(0.2)) + + await mockService.resumeRunContinuation(with: .success(())) + + try await XCTAsyncAssertNoThrow(await group.next()) + } + } + + func testCancellationEscalation() async throws { + let mockService = MockService(description: "Service1") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: mockService)], + gracefulShutdownSignals: [.sigalrm], + maximumGracefulShutdownDuration: nil, + maximumCancellationDuration: .seconds(1) + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator = mockService.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator.next(), .run) + + group.cancelAll() + + await XCTAsyncAssertEqual(await eventIterator.next(), .runCancelled) + + try await Task.sleep(for: .seconds(0.1)) + + await mockService.resumeRunContinuation(with: .success(())) + + try await XCTAsyncAssertNoThrow(await group.next()) + } + } + // MARK: - Helpers private func makeServiceGroup( services: [ServiceGroupConfiguration.ServiceConfiguration] = [], gracefulShutdownSignals: [UnixSignal] = .init(), - cancellationSignals: [UnixSignal] = .init() + cancellationSignals: [UnixSignal] = .init(), + maximumGracefulShutdownDuration: Duration? = nil, + maximumCancellationDuration: Duration? = .seconds(5) ) -> ServiceGroup { var logger = Logger(label: "Tests") logger.logLevel = .debug + var configuration = ServiceGroupConfiguration( + services: services, + gracefulShutdownSignals: gracefulShutdownSignals, + cancellationSignals: cancellationSignals, + logger: logger + ) + configuration.escalation.maximumGracefulShutdownDuration = maximumGracefulShutdownDuration + configuration.escalation.maximumCancellationDuration = maximumCancellationDuration return .init( - configuration: .init( - services: services, - gracefulShutdownSignals: gracefulShutdownSignals, - cancellationSignals: cancellationSignals, - logger: logger - ) + configuration: configuration ) } }