diff --git a/Sources/ServiceLifecycle/Docs.docc/How to adopt ServiceLifecycle in applications.md b/Sources/ServiceLifecycle/Docs.docc/How to adopt ServiceLifecycle in applications.md index e64b09f..33173cc 100644 --- a/Sources/ServiceLifecycle/Docs.docc/How to adopt ServiceLifecycle in applications.md +++ b/Sources/ServiceLifecycle/Docs.docc/How to adopt ServiceLifecycle in applications.md @@ -1,41 +1,49 @@ # How to adopt ServiceLifecycle in applications -``ServiceLifecycle`` aims to provide a unified API that services should adopt to make orchestrating -them in an application easier. To achieve this ``ServiceLifecycle`` is providing the ``ServiceGroup`` actor. +``ServiceLifecycle`` aims to provide a unified API that services should adopt to +make orchestrating them in an application easier. To achieve this +``ServiceLifecycle`` is providing the ``ServiceGroup`` actor. ## Why do we need this? -When building applications we often have a bunch of services that comprise the internals of the applications. -These services include fundamental needs like logging or metrics. Moreover, they also include -services that compromise the application's business logic such as long-running actors. -Lastly, they might also include HTTP, gRPC, or similar servers that the application is exposing. -One important requirement of the application is to orchestrate the various services currently during -startup and shutdown. Furthermore, the application also needs to handle a single service failing. - -Swift introduced Structured Concurrency which already helps tremendously with running multiple -async services concurrently. This can be achieved with the use of task groups. However, Structured -Concurrency doesn't enforce consistent interfaces between the services, so it becomes hard to orchestrate them. -This is where ``ServiceLifecycle`` comes in. It provides the ``Service`` protocol which enforces -a common API. Additionally, it provides the ``ServiceGroup`` which is responsible for orchestrating -all services in an application. +When building applications we often have a bunch of services that comprise the +internals of the applications. These services include fundamental needs like +logging or metrics. Moreover, they also include services that compromise the +application's business logic such as long-running actors. Lastly, they might +also include HTTP, gRPC, or similar servers that the application is exposing. +One important requirement of the application is to orchestrate the various +services during startup and shutdown. + +Swift introduced Structured Concurrency which already helps tremendously with +running multiple asynchronous services concurrently. This can be achieved with +the use of task groups. However, Structured Concurrency doesn't enforce +consistent interfaces between the services, so it becomes hard to orchestrate +them. This is where ``ServiceLifecycle`` comes in. It provides the ``Service`` +protocol which enforces a common API. Additionally, it provides the +``ServiceGroup`` which is responsible for orchestrating all services in an +application. ## Adopting the ServiceGroup in your application -This article is focusing on how the ``ServiceGroup`` works and how you can adopt it in your application. -If you are interested in how to properly implement a service, go check out the article: . +This article is focusing on how the ``ServiceGroup`` works and how you can adopt +it in your application. If you are interested in how to properly implement a +service, go check out the article: +. ### How is the ServiceGroup working? -The ``ServiceGroup`` is just a slightly complicated task group under the hood that runs each service -in a separate child task. Furthermore, the ``ServiceGroup`` handles individual services exiting -or throwing unexpectedly. Lastly, it also introduces a concept called graceful shutdown which allows -tearing down all services in reverse order safely. Graceful shutdown is often used in server -scenarios i.e. when rolling out a new version and draining traffic from the old version. +The ``ServiceGroup`` is just a complicated task group under the hood that runs +each service in a separate child task. Furthermore, the ``ServiceGroup`` handles +individual services exiting or throwing. Lastly, it also introduces a concept +called graceful shutdown which allows tearing down all services in reverse order +safely. Graceful shutdown is often used in server scenarios i.e. when rolling +out a new version and draining traffic from the old version (commonly referred +to as quiescing). ### How to use the ServiceGroup? -Let's take a look how the ``ServiceGroup`` can be used in an application. First, we define some -fictional services. +Let's take a look how the ``ServiceGroup`` can be used in an application. First, +we define some fictional services. ```swift struct FooService: Service { @@ -53,11 +61,12 @@ public struct BarService: Service { } ``` -The `BarService` is depending in our example on the `FooService`. A dependency between services -is quite common and the ``ServiceGroup`` is inferring the dependencies from the order of the -services passed to the ``ServiceGroup/init(services:configuration:logger:)``. Services with a higher -index can depend on services with a lower index. The following example shows how this can be applied -to our `BarService`. +The `BarService` is depending in our example on the `FooService`. A dependency +between services is quite common and the ``ServiceGroup`` is inferring the +dependencies from the order of the services passed to the +``ServiceGroup/init(configuration:)``. Services with a higher index can depend +on services with a lower index. The following example shows how this can be +applied to our `BarService`. ```swift @main @@ -68,9 +77,13 @@ struct Application { let serviceGroup = ServiceGroup( // We are encoding the dependency hierarchy here by listing the fooService first - services: [fooService, barService], - configuration: .init(gracefulShutdownSignals: []), - logger: logger + configuration: .init( + services: [ + .init(service: fooService), + .init(service: barService) + ], + logger: logger + ), ) try await serviceGroup.run() @@ -80,17 +93,26 @@ struct Application { ### Graceful shutdown -The ``ServiceGroup`` supports graceful shutdown by taking an array of `UnixSignal`s that trigger -the shutdown. Commonly `SIGTERM` is used to indicate graceful shutdowns in container environments -such as Docker or Kubernetes. The ``ServiceGroup`` is then gracefully shutting down each service -one by one in the reverse order of the array passed to the init. -Importantly, the ``ServiceGroup`` is going to wait for the ``Service/run()`` method to return +Graceful shutdown is a concept from service lifecycle which aims to be an +alternative to task cancellation that is not as forceful. Graceful shutdown +rather lets the various services opt-in to supporting it. A common example of +when you might want to use graceful shutdown is in containerized enviroments +such as Docker or Kubernetes. In those environments, `SIGTERM` is commonly used +to indicate to the application that it should shut down before a `SIGKILL` is +sent. + +The ``ServiceGroup`` can be setup to listen to `SIGTERM` and trigger a graceful +shutdown on all its orchestrated services. It will then gracefully shut down +each service one by one in reverse startup order. Importantly, the +``ServiceGroup`` is going to wait for the ``Service/run()`` method to return before triggering the graceful shutdown on the next service. -Since graceful shutdown is up to the individual services and application it requires explicit support. -We recommend that every service author makes sure their implementation is handling graceful shutdown -correctly. Lastly, application authors also have to make sure they are handling graceful shutdown. -A common example of this is for applications that implement streaming behaviours. +Since graceful shutdown is up to the individual services and application it +requires explicit support. We recommend that every service author makes sure +their implementation is handling graceful shutdown correctly. Lastly, +application authors also have to make sure they are handling graceful shutdown. +A common example of this is for applications that implement streaming +behaviours. ```swift struct StreamingService: Service { @@ -126,9 +148,11 @@ struct Application { }) let serviceGroup = ServiceGroup( - services: [streamingService], - configuration: .init(gracefulShutdownSignals: [.sigterm]), - logger: logger + configuration: .init( + services: [.init(service: streamingService)], + gracefulShutdownSignals: [.sigterm], + logger: logger + ) ) try await serviceGroup.run() @@ -136,17 +160,20 @@ struct Application { } ``` -The code above demonstrates a hypothetical `StreamingService` with a configurable handler that -is invoked per stream. Each stream is handled in a separate child task concurrently. -The above code doesn't support graceful shutdown right now. There are two places where we are missing it. -First, the service's `run()` method is iterating the `makeStream()` async sequence. This iteration is -not stopped on graceful shutdown and we are continuing to accept new streams. Furthermore, -the `streamHandler` that we pass in our main method is also not supporting graceful shutdown since it -is iterating over the incoming requests. - -Luckily, adding support in both places is trivial with the helpers that ``ServiceLifecycle`` exposes. -In both cases, we are iterating an async sequence and what we want to do is stop the iteration. -To do this we can use the `cancelOnGracefulShutdown()` method that ``ServiceLifecycle`` adds to +The code above demonstrates a hypothetical `StreamingService` with a +configurable handler that is invoked per stream. Each stream is handled in a +separate child task concurrently. The above code doesn't support graceful +shutdown right now. There are two places where we are missing it. First, the +service's `run()` method is iterating the `makeStream()` async sequence. This +iteration is not stopped on graceful shutdown and we are continuing to accept +new streams. Furthermore, the `streamHandler` that we pass in our main method is +also not supporting graceful shutdown since it is iterating over the incoming +requests. + +Luckily, adding support in both places is trivial with the helpers that +``ServiceLifecycle`` exposes. In both cases, we are iterating an async sequence +and what we want to do is stop the iteration. To do this we can use the +`cancelOnGracefulShutdown()` method that ``ServiceLifecycle`` adds to `AsyncSequence`. The updated code looks like this: ```swift @@ -183,9 +210,11 @@ struct Application { }) let serviceGroup = ServiceGroup( - services: [streamingService], - configuration: .init(gracefulShutdownSignals: [.sigterm]), - logger: logger + configuration: .init( + services: [.init(service: streamingService)], + gracefulShutdownSignals: [.sigterm], + logger: logger + ) ) try await serviceGroup.run() @@ -193,8 +222,52 @@ struct Application { } ``` -Now one could ask - Why aren't we using cancellation in the first place here? The problem is that -cancellation is forceful and doesn't allow users to make a decision if they want to cancel or not. -However, graceful shutdown is very specific to business logic often. In our case, we were fine with just -stopping to handle new requests on a stream. Other applications might want to send a response indicating -to the client that the server is shutting down and waiting for an acknowledgment of that message. +Now one could ask - Why aren't we using cancellation in the first place here? +The problem is that cancellation is forceful and doesn't allow users to make a +decision if they want to cancel or not. However, graceful shutdown is very +specific to business logic often. In our case, we were fine with just stopping +to handle new requests on a stream. Other applications might want to send a +response indicating to the client that the server is shutting down and waiting +for an acknowledgment of that message. + +### Customizing the behavior when a service returns or throws + +By default the ``ServiceGroup`` is cancelling the whole group if the one service +returns or throws. However, in some scenarios this is totally expected e.g. when +the ``ServiceGroup`` is used in a CLI tool to orchestrate some services while a +command is handled. To customize the behavior you set the +``ServiceGroupConfiguration/ServiceConfiguration/returnBehaviour`` and +``ServiceGroupConfiguration/ServiceConfiguration/throwBehaviour``. Both of them +offer three different options. The default behavior for both is +``ServiceGroupConfiguration/ServiceConfiguration/TerminationBehavior/cancelGroup``. +You can also choose to either ignore if a service returns/throws by setting it +to ``ServiceGroupConfiguration/ServiceConfiguration/TerminationBehavior/ignore`` +or trigger a graceful shutdown by setting it to +``ServiceGroupConfiguration/ServiceConfiguration/TerminationBehavior/gracefullyShutdownGroup``. + +Another example where you might want to use this is when you have a service that +should be gracefully shutdown when another service exits, e.g. you want to make +sure your telemetry service is gracefully shutdown after your HTTP server +unexpectedly threw from its `run()` method. This setup could look like this: + +```swift +@main +struct Application { + static func main() async throws { + let telemetryService = TelemetryService() + let httpServer = HTTPServer() + + let serviceGroup = ServiceGroup( + configuration: .init( + services: [ + .init(service: telemetryService), + .init(service: httpServer, returnBehavior: .shutdownGracefully, throwBehavior: .shutdownGracefully) + ], + logger: logger + ), + ) + + try await serviceGroup.run() + } +} +``` diff --git a/Sources/ServiceLifecycle/Docs.docc/How to adopt ServiceLifecycle in libraries.md b/Sources/ServiceLifecycle/Docs.docc/How to adopt ServiceLifecycle in libraries.md index d1d681a..db2a7be 100644 --- a/Sources/ServiceLifecycle/Docs.docc/How to adopt ServiceLifecycle in libraries.md +++ b/Sources/ServiceLifecycle/Docs.docc/How to adopt ServiceLifecycle in libraries.md @@ -1,39 +1,47 @@ # How to adopt ServiceLifecycle in libraries -``ServiceLifecycle`` aims to provide a unified API that services should adopt to make orchestrating -them in an application easier. To achieve this ``ServiceLifecycle`` is providing the ``Service`` protocol. +``ServiceLifecycle`` aims to provide a unified API that services should adopt to +make orchestrating them in an application easier. To achieve this +``ServiceLifecycle`` is providing the ``Service`` protocol. ## Why do we need this? -Before diving into how to adopt this protocol in your library, let's take a step back and -talk about why we even need to have this unified API. A common need for services is to either -schedule long running work like sending keep alive pings in the background or to handle new -incoming work like handling new TCP connections. Before Concurrency was introduced services put -their work into separate threads using things like `DispatchQueue`s or NIO `EventLoop`s. -This often required explicit lifetime management of the services to make sure to shutdown the threads correctly. -With the introduction of Concurrency, specifically Structured Concurrency, we now have a better way -to structure our programs and model our work as a tree of tasks. -The ``Service`` protocol is providing a common interface that requires a single `run()` method where -services can put their long running work in. Having all services in an application conform to this -protocol enables easy orchestration of them and makes sure they interact nicely with each other. +Before diving into how to adopt this protocol in your library, let's take a step +back and talk about why we even need to have this unified API. A common need for +services is to either schedule long running work like sending keep alive pings +in the background or to handle new incoming work like handling new TCP +connections. Before Concurrency was introduced services put their work into +separate threads using things like `DispatchQueue`s or NIO `EventLoop`s. This +often required explicit lifetime management of the services to make sure its +resources, e.g. threads, are shutdown correctly. With the introduction of +Concurrency, specifically Structured Concurrency, we now have a better way to +structure our programs and model our work as a tree of tasks. The ``Service`` +protocol is providing a common interface that requires a single `run()` method +where services can put their long running work in. Having all services in an +application conform to this protocol enables easy orchestration of them and +makes sure they interact nicely with each other. ## Adopting the Service protocol in your service -Adopting the ``Service`` protocol is quite easy in your services. The protocol has only a single requirement -which is the ``Service/run()`` method. There are a few important caveats to it which we are going over in the -next sections. Make sure that your service is following those. +Adopting the ``Service`` protocol is quite easy in your services. The protocol +has only a single requirement which is the ``Service/run()`` method. There are a +few important caveats to it which we are going over in the next sections. Make +sure that your service is following those. ### Make sure to use Structured Concurrency -Swift offers multiple ways to use Structured Concurrency. The primary primitives are the -`async` and `await` keywords which enable straight-line code to make asynchronous calls. -Furthermore, the language provides the concept of task groups which allow the creation of -concurrent work while still staying tied to the parent task. On the other hand, Swift also provides -`Task(priority:operation:)` and `Task.detached(priority:operation:)` which create a new unstructured Task. +Swift offers multiple ways to use Structured Concurrency. The primary primitives +are the `async` and `await` keywords which enable straight-line code to make +asynchronous calls. Furthermore, the language provides the concept of task +groups which allow the creation of concurrent work while still staying tied to +the parent task. On the other hand, Swift also provides +`Task(priority:operation:)` and `Task.detached(priority:operation:)` which +create a new unstructured Task. -Imagine our library wants to offer a simple `TCPEchoClient`. To make it interesting let's assume we -need to send keep-alive pings on every open connection every second. Below you can see how we could -implement this using unstructured Concurrency. +Imagine our library wants to offer a simple `TCPEchoClient`. To make it +interesting let's assume we need to send keep-alive pings on every open +connection every second. Below you can see how we could implement this using +unstructured Concurrency. ```swift public actor TCPEchoClient { @@ -49,16 +57,19 @@ public actor TCPEchoClient { } ``` -The above code has a few problems. First, we are never canceling the `Task` that is running the -keep-alive pings. To do this we would need to store the `Task` in our actor and cancel it at the -appropriate time. Secondly, we actually would need to expose a `cancel()` method on the actor to cancel -the `Task`. At this point, we have just reinvented Structured Concurrency. -To avoid all of these problems we can just conform to the ``Service`` protocol which requires a `run()` -method. This requirement already guides us to implement the long running work inside the `run()` method. -Having this method allows the user of the client to decide in which task to schedule the keep-alive pings. -They can still decide to create an unstructured `Task` for this, but that is up to the user now. -Furthermore, we now get automatic cancellation propagation from the task that called our `run()` method. -Below is an overhauled implementation that exposes such a `run()` method. +The above code has a few problems. First, we are never canceling the `Task` that +is running the keep-alive pings. To do this we would need to store the `Task` in +our actor and cancel it at the appropriate time. Secondly, we actually would +need to expose a `cancel()` method on the actor to cancel the `Task`. At this +point, we have just reinvented Structured Concurrency. To avoid all of these +problems we can just conform to the ``Service`` protocol which requires a +`run()` method. This requirement already guides us to implement the long running +work inside the `run()` method. Having this method allows the user of the client +to decide in which task to schedule the keep-alive pings. They can still decide +to create an unstructured `Task` for this, but that is up to the user now. +Furthermore, we now get automatic cancellation propagation from the task that +called our `run()` method. Below is an overhauled implementation that exposes +such a `run()` method. ```swift public actor TCPEchoClient: Service { @@ -77,44 +88,50 @@ public actor TCPEchoClient: Service { ### Returning from your `run()` method -Since the `run()` method contains long running work, returning from it is seen as a failure and will -lead to the ``ServiceGroup`` cancelling all other services by cancelling the task that is running -their respective `run()` method. +Since the `run()` method contains long running work, returning from it is seen +as a failure and will lead to the ``ServiceGroup`` cancelling all other services +by cancelling the task that is running their respective `run()` method. ### Cancellation -Structured Concurrency propagates task cancellation down the task tree. Every task in the tree can -check for cancellation or react to it with cancellation handlers. The ``ServiceGroup`` is using task -cancellation to tear everything down in the case of an early return or thrown error from the `run()` -method of any of the services. Hence it is important that each service properly implements task -cancellation in their `run()` methods. +Structured Concurrency propagates task cancellation down the task tree. Every +task in the tree can check for cancellation or react to it with cancellation +handlers. The ``ServiceGroup`` is using task cancellation to tear everything +down in the case of an early return or thrown error from the `run()` method of +any of the services. Hence it is important that each service properly implements +task cancellation in their `run()` methods. -Note: If your `run()` method is only calling other async methods that support cancellation themselves -or is consuming an `AsyncSequence`, you don't have to do anything explicitly here. Looking at the -`TCPEchoClient` example from above we can see that we only call `Task.sleep` in our `run()` method -which is supporting task cancellation. +Note: If your `run()` method is only calling other async methods that support +cancellation themselves or is consuming an `AsyncSequence`, you don't have to do +anything explicitly here. Looking at the `TCPEchoClient` example from above we +can see that we only call `Task.sleep` in our `run()` method which is supporting +task cancellation. ### Graceful shutdown -When running an application in a real environment it is often required to gracefully shutdown the application. -For example, the application might be running in Kubernetes and a new version of it got deployed. In this -case, Kubernetes is going to send a `SIGTERM` signal to the application and expects it to terminate -within a grace period. If the application isn't stopping in time then Kubernetes will send the `SIGKILL` -signal and forcefully terminate the process. -For this reason ``ServiceLifecycle`` introduces a new _shutdown gracefully_ concept that allows terminating -the work in a structured and graceful manner. This works similarly to task cancellation but -it is fully opt-in and up to the business logic of the application to decide what to do. - -``ServiceLifecycle`` exposes one free function called ``withGracefulShutdownHandler(operation:onGracefulShutdown:)`` -that works similarly to the `withTaskCancellationHandler` function from the Concurrency library. -Library authors are expected to make sure that any work they spawn from the `run()` method -properly supports graceful shutdown. For example, a server might be closing its listening socket -to stop accepting new connections. -Importantly here though is that the server is not force closing the currently open ones. Rather it -expects the business logic on these connections to handle graceful shutdown on their own. - -An example implementation of a `TCPEchoServer` on a high level that supports graceful shutdown -might look like this. +When running an application in a real environment it is often required to +gracefully shutdown the application. For example, the application might be +running in Kubernetes and a new version of it got deployed. In this case, +Kubernetes is going to send a `SIGTERM` signal to the application and expects it +to terminate within a grace period. If the application isn't stopping in time +then Kubernetes will send the `SIGKILL` signal and forcefully terminate the +process. For this reason ``ServiceLifecycle`` introduces a new _shutdown +gracefully_ concept that allows terminating the work in a structured and +graceful manner. This works similarly to task cancellation but it is fully +opt-in and up to the business logic of the application to decide what to do. + +``ServiceLifecycle`` exposes one free function called +``withGracefulShutdownHandler(operation:onGracefulShutdown:)`` that works +similarly to the `withTaskCancellationHandler` function from the Concurrency +library. Library authors are expected to make sure that any work they spawn from +the `run()` method properly supports graceful shutdown. For example, a server +might be closing its listening socket to stop accepting new connections. +Importantly here though is that the server is not force closing the currently +open ones. Rather it expects the business logic on these connections to handle +graceful shutdown on their own. + +An example implementation of a `TCPEchoServer` on a high level that supports +graceful shutdown might look like this. ```swift public actor TCPEchoClient: Service { @@ -130,11 +147,12 @@ public actor TCPEchoClient: Service { } } } -```` +``` -In the case of our `TCPEchoClient`, the only reasonable thing to do is cancel the iteration of our -timer sequence when we receive the graceful shutdown sequence. ``ServiceLifecycle`` is providing -a convenience on `AsyncSequence` to cancel on graceful shutdown. Let's take a look at how this works. +In the case of our `TCPEchoClient`, the only reasonable thing to do is cancel +the iteration of our timer sequence when we receive the graceful shutdown +sequence. ``ServiceLifecycle`` is providing a convenience on `AsyncSequence` to +cancel on graceful shutdown. Let's take a look at how this works. ```swift public actor TCPEchoClient: Service { @@ -150,4 +168,5 @@ public actor TCPEchoClient: Service { } ``` -As you can see in the code above, it is as simple as adding a `cancelOnGracefulShutdown()` call. +As you can see in the code above, it is as simple as adding a +`cancelOnGracefulShutdown()` call. diff --git a/Sources/ServiceLifecycle/GracefulShutdown.swift b/Sources/ServiceLifecycle/GracefulShutdown.swift index 43be472..d8b9671 100644 --- a/Sources/ServiceLifecycle/GracefulShutdown.swift +++ b/Sources/ServiceLifecycle/GracefulShutdown.swift @@ -56,7 +56,7 @@ public func withGracefulShutdownHandler( } /// This is just a helper type for the result of our task group. -enum ValueOrGracefulShutdown { +enum ValueOrGracefulShutdown: Sendable { case value(T) case gracefulShutdown } @@ -64,7 +64,7 @@ enum ValueOrGracefulShutdown { /// Cancels the closure when a graceful shutdown was triggered. /// /// - Parameter operation: The actual operation. -public func cancelOnGracefulShutdown(_ operation: @Sendable @escaping () async throws -> T) async rethrows -> T? { +public func cancelOnGracefulShutdown(_ operation: @Sendable @escaping () async throws -> T) async rethrows -> T? { return try await withThrowingTaskGroup(of: ValueOrGracefulShutdown.self) { group in group.addTask { let value = try await operation() diff --git a/Sources/ServiceLifecycle/ServiceGroup.swift b/Sources/ServiceLifecycle/ServiceGroup.swift index e6bd329..8bf317e 100644 --- a/Sources/ServiceLifecycle/ServiceGroup.swift +++ b/Sources/ServiceLifecycle/ServiceGroup.swift @@ -20,7 +20,7 @@ public actor ServiceGroup: Sendable { /// The internal state of the ``ServiceGroup``. private enum State { /// The initial state of the group. - case initial + case initial(services: [ServiceGroupConfiguration.ServiceConfiguration]) /// The state once ``ServiceGroup/run()`` has been called. case running( gracefulShutdownStreamContinuation: AsyncStream.Continuation @@ -29,30 +29,48 @@ public actor ServiceGroup: Sendable { case finished } - /// The services to run. - private let services: [any Service] - /// The group's configuration. - private let configuration: ServiceGroupConfiguration /// The logger. private let logger: Logger - + /// The logging configuration. + private let loggingConfiguration: ServiceGroupConfiguration.LoggingConfiguration + /// The signals that lead to graceful shutdown. + private let gracefulShutdownSignals: [UnixSignal] + /// The signals that lead to cancellation. + private let cancellationSignals: [UnixSignal] /// The current state of the group. - private var state: State = .initial + private var state: State /// Initializes a new ``ServiceGroup``. /// /// - Parameters: - /// - services: The services to run. - /// - configuration: The group's configuration. - /// - logger: The logger. + /// - configuration: The group's configuration + public init( + configuration: ServiceGroupConfiguration + ) { + precondition( + Set(configuration.gracefulShutdownSignals).isDisjoint(with: configuration.cancellationSignals), + "Overlapping graceful shutdown and cancellation signals" + ) + precondition(configuration.logger.label != deprecatedLoggerLabel, "Please migrate to the new initializers") + self.state = .initial(services: configuration.services) + self.gracefulShutdownSignals = configuration.gracefulShutdownSignals + self.cancellationSignals = configuration.cancellationSignals + self.logger = configuration.logger + self.loggingConfiguration = configuration.logging + } + + @available(*, deprecated) public init( services: [any Service], configuration: ServiceGroupConfiguration, logger: Logger ) { - self.services = services - self.configuration = configuration + precondition(configuration.services.isEmpty, "Please migrate to the new initializers") + self.state = .initial(services: Array(services.map { ServiceGroupConfiguration.ServiceConfiguration(service: $0) })) + self.gracefulShutdownSignals = configuration.gracefulShutdownSignals + self.cancellationSignals = configuration.cancellationSignals self.logger = logger + self.loggingConfiguration = configuration.logging } /// Runs all the services by spinning up a child task per service. @@ -60,8 +78,8 @@ public actor ServiceGroup: Sendable { /// for graceful shutdown. public func run(file: String = #file, line: Int = #line) async throws { switch self.state { - case .initial: - guard !self.services.isEmpty else { + case .initial(var services): + guard !services.isEmpty else { self.state = .finished return } @@ -74,7 +92,10 @@ public actor ServiceGroup: Sendable { var potentialError: Error? do { - try await self._run(gracefulShutdownStream: gracefulShutdownStream) + try await self._run( + services: &services, + gracefulShutdownStream: gracefulShutdownStream + ) } catch { potentialError = error } @@ -126,20 +147,24 @@ public actor ServiceGroup: Sendable { } private enum ChildTaskResult { - case serviceFinished(service: any Service, index: Int) - case serviceThrew(service: any Service, index: Int, error: any Error) + case serviceFinished(service: ServiceGroupConfiguration.ServiceConfiguration, index: Int) + case serviceThrew(service: ServiceGroupConfiguration.ServiceConfiguration, index: Int, error: any Error) case signalCaught(UnixSignal) case signalSequenceFinished case gracefulShutdownCaught case gracefulShutdownFinished } - private func _run(gracefulShutdownStream: AsyncStream) async throws { + private func _run( + services: inout [ServiceGroupConfiguration.ServiceConfiguration], + gracefulShutdownStream: AsyncStream + ) async throws { self.logger.debug( "Starting service lifecycle", metadata: [ - self.configuration.logging.keys.signalsKey: "\(self.configuration.gracefulShutdownSignals)", - self.configuration.logging.keys.servicesKey: "\(self.services)", + self.loggingConfiguration.keys.gracefulShutdownSignalsKey: "\(self.gracefulShutdownSignals)", + self.loggingConfiguration.keys.cancellationSignalsKey: "\(self.cancellationSignals)", + self.loggingConfiguration.keys.servicesKey: "\(services.map { $0.service })", ] ) @@ -147,11 +172,21 @@ public actor ServiceGroup: Sendable { // but the body itself is throwing let result = await withTaskGroup(of: ChildTaskResult.self, returning: Result.self) { group in // First we have to register our signals. - let unixSignals = await UnixSignalsSequence(trapping: self.configuration.gracefulShutdownSignals) + let gracefulShutdownSignals = await UnixSignalsSequence(trapping: self.gracefulShutdownSignals) + let cancellationSignals = await UnixSignalsSequence(trapping: self.cancellationSignals) - // This is the task that listens to signals + // This is the task that listens to graceful shutdown signals group.addTask { - for await signal in unixSignals { + for await signal in gracefulShutdownSignals { + return .signalCaught(signal) + } + + return .signalSequenceFinished + } + + // This is the task that listens to cancellation signals + group.addTask { + for await signal in cancellationSignals { return .signalCaught(signal) } @@ -182,13 +217,13 @@ public actor ServiceGroup: Sendable { // since we want to signal them individually and wait for a single service // to finish before moving to the next one var gracefulShutdownManagers = [GracefulShutdownManager]() - gracefulShutdownManagers.reserveCapacity(self.services.count) + gracefulShutdownManagers.reserveCapacity(services.count) - for (index, service) in self.services.enumerated() { + for (index, serviceConfiguration) in services.enumerated() { self.logger.debug( "Starting service", metadata: [ - self.configuration.logging.keys.serviceKey: "\(service)", + self.loggingConfiguration.keys.serviceKey: "\(serviceConfiguration.service)", ] ) @@ -200,16 +235,20 @@ public actor ServiceGroup: Sendable { group.addTask { return await TaskLocals.$gracefulShutdownManager.withValue(gracefulShutdownManager) { do { - try await service.run() - return .serviceFinished(service: service, index: index) + try await serviceConfiguration.service.run() + return .serviceFinished(service: serviceConfiguration, index: index) } catch { - return .serviceThrew(service: service, index: index, error: error) + return .serviceThrew(service: serviceConfiguration, index: index, error: error) } } } } - precondition(gracefulShutdownManagers.count == self.services.count, "We did not create a graceful shutdown manager per service") + // We are storing the services in an optional array now. When a slot in the array is + // empty it indicates that the service has been shutdown. + var services = services.map { Optional($0) } + + precondition(gracefulShutdownManagers.count == services.count, "We did not create a graceful shutdown manager per service") // We are going to wait for any of the services to finish or // the signal sequence to throw an error. @@ -217,48 +256,140 @@ public actor ServiceGroup: Sendable { let result: ChildTaskResult? = await group.next() switch result { - case .serviceFinished(let service, _): - // If a long running service finishes early we treat this as an unexpected - // early exit and have to cancel the rest of the services. - self.logger.error( - "Service finished unexpectedly. Cancelling all other services now", - metadata: [ - self.configuration.logging.keys.serviceKey: "\(service)", - ] - ) + case .serviceFinished(let service, let index): + if group.isCancelled { + // The group is cancelled and we expect all services to finish + continue + } - group.cancelAll() - return .failure(ServiceGroupError.serviceFinishedUnexpectedly()) + switch service.successTerminationBehavior.behavior { + case .cancelGroup: + self.logger.error( + "Service finished unexpectedly. Cancelling group.", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(service.service)", + ] + ) + group.cancelAll() + return .failure(ServiceGroupError.serviceFinishedUnexpectedly()) + + case .gracefullyShutdownGroup: + self.logger.error( + "Service finished unexpectedly. Gracefully shutting down group.", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(service.service)", + ] + ) + services[index] = nil + do { + try await self.shutdownGracefully( + services: services, + group: &group, + gracefulShutdownManagers: gracefulShutdownManagers + ) + } catch { + return .failure(error) + } - case .serviceThrew(let service, _, let error): - // One of the servers threw an error. We have to cancel everything else now. - self.logger.error( - "Service threw error. Cancelling all other services now", - metadata: [ - self.configuration.logging.keys.serviceKey: "\(service)", - self.configuration.logging.keys.errorKey: "\(error)", - ] - ) - group.cancelAll() + case .ignore: + self.logger.debug( + "Service finished.", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(service.service)", + ] + ) + services[index] = nil + + if services.allSatisfy({ $0 == nil }) { + self.logger.debug( + "All services finished." + ) + group.cancelAll() + return .success(()) + } + } - return .failure(error) + case .serviceThrew(let service, let index, let error): + switch service.failureTerminationBehavior.behavior { + case .cancelGroup: + self.logger.error( + "Service threw error. Cancelling group.", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(service.service)", + self.loggingConfiguration.keys.errorKey: "\(error)", + ] + ) + group.cancelAll() + return .failure(error) - case .signalCaught(let unixSignal): - // We got a signal. Let's initiate graceful shutdown. - self.logger.debug( - "Signal caught. Shutting down services", - metadata: [ - self.configuration.logging.keys.signalKey: "\(unixSignal)", - ] - ) + case .gracefullyShutdownGroup: + self.logger.error( + "Service threw error. Shutting down group.", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(service.service)", + self.loggingConfiguration.keys.errorKey: "\(error)", + ] + ) + services[index] = nil - do { - try await self.shutdownGracefully( - group: &group, - gracefulShutdownManagers: gracefulShutdownManagers + do { + try await self.shutdownGracefully( + services: services, + group: &group, + gracefulShutdownManagers: gracefulShutdownManagers + ) + } catch { + return .failure(error) + } + + case .ignore: + self.logger.debug( + "Service threw error.", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(service.service)", + self.loggingConfiguration.keys.errorKey: "\(error)", + ] ) - } catch { - return .failure(error) + services[index] = nil + + if services.allSatisfy({ $0 == nil }) { + self.logger.debug( + "All services finished." + ) + + group.cancelAll() + return .success(()) + } + } + + case .signalCaught(let unixSignal): + if self.gracefulShutdownSignals.contains(unixSignal) { + // Let's initiate graceful shutdown. + self.logger.debug( + "Signal caught. Shutting down the group.", + metadata: [ + self.loggingConfiguration.keys.signalKey: "\(unixSignal)", + ] + ) + do { + try await self.shutdownGracefully( + services: services, + group: &group, + gracefulShutdownManagers: gracefulShutdownManagers + ) + } catch { + return .failure(error) + } + } else { + // Let's cancel the group. + self.logger.debug( + "Signal caught. Cancelling the group.", + metadata: [ + self.loggingConfiguration.keys.signalKey: "\(unixSignal)", + ] + ) + + group.cancelAll() } case .gracefulShutdownCaught: @@ -267,6 +398,7 @@ public actor ServiceGroup: Sendable { do { try await self.shutdownGracefully( + services: services, group: &group, gracefulShutdownManagers: gracefulShutdownManagers ) @@ -295,6 +427,7 @@ public actor ServiceGroup: Sendable { } private func shutdownGracefully( + services: [ServiceGroupConfiguration.ServiceConfiguration?], group: inout TaskGroup, gracefulShutdownManagers: [GracefulShutdownManager] ) async throws { @@ -306,10 +439,16 @@ public actor ServiceGroup: Sendable { // we are going to signal each child task the graceful shutdown and then wait for // its exit. for (gracefulShutdownIndex, gracefulShutdownManager) in gracefulShutdownManagers.lazy.enumerated().reversed() { + guard let service = services[gracefulShutdownIndex] else { + self.logger.debug( + "Service already finished. Skipping shutdown" + ) + continue + } self.logger.debug( "Triggering graceful shutdown for service", metadata: [ - self.configuration.logging.keys.serviceKey: "\(self.services[gracefulShutdownIndex])", + self.loggingConfiguration.keys.serviceKey: "\(service.service)", ] ) @@ -319,13 +458,18 @@ public actor ServiceGroup: Sendable { switch result { case .serviceFinished(let service, let index): + if group.isCancelled { + // The group is cancelled and we expect all services to finish + continue + } + if index == gracefulShutdownIndex { // The service that we signalled graceful shutdown did exit/ // We can continue to the next one. self.logger.debug( "Service finished", metadata: [ - self.configuration.logging.keys.serviceKey: "\(service)", + self.loggingConfiguration.keys.serviceKey: "\(service.service)", ] ) continue @@ -334,7 +478,7 @@ public actor ServiceGroup: Sendable { self.logger.debug( "Service finished unexpectedly during graceful shutdown. Cancelling all other services now", metadata: [ - self.configuration.logging.keys.serviceKey: "\(service)", + self.loggingConfiguration.keys.serviceKey: "\(service.service)", ] ) @@ -343,18 +487,44 @@ public actor ServiceGroup: Sendable { } case .serviceThrew(let service, _, let error): - self.logger.debug( - "Service threw error during graceful shutdown. Cancelling all other services now", - metadata: [ - self.configuration.logging.keys.serviceKey: "\(service)", - self.configuration.logging.keys.errorKey: "\(error)", - ] - ) - group.cancelAll() + switch service.failureTerminationBehavior.behavior { + case .cancelGroup: + self.logger.error( + "Service threw error during graceful shutdown. Cancelling group.", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(service.service)", + self.loggingConfiguration.keys.errorKey: "\(error)", + ] + ) + group.cancelAll() + throw error + + case .gracefullyShutdownGroup, .ignore: + self.logger.debug( + "Service threw error during graceful shutdown.", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(service.service)", + self.loggingConfiguration.keys.errorKey: "\(error)", + ] + ) + + continue + } + + case .signalCaught(let signal): + if self.cancellationSignals.contains(signal) { + // We got signalled cancellation after graceful shutdown + self.logger.debug( + "Signal caught. Cancelling the group.", + metadata: [ + self.loggingConfiguration.keys.signalKey: "\(signal)", + ] + ) - throw error + group.cancelAll() + } - case .signalCaught, .signalSequenceFinished, .gracefulShutdownCaught, .gracefulShutdownFinished: + case .signalSequenceFinished, .gracefulShutdownCaught, .gracefulShutdownFinished: // We just have to tolerate this since signals and parent graceful shutdowns downs can race. continue diff --git a/Sources/ServiceLifecycle/ServiceGroupConfiguration.swift b/Sources/ServiceLifecycle/ServiceGroupConfiguration.swift index bc29a1a..08d9d21 100644 --- a/Sources/ServiceLifecycle/ServiceGroupConfiguration.swift +++ b/Sources/ServiceLifecycle/ServiceGroupConfiguration.swift @@ -12,17 +12,22 @@ // //===----------------------------------------------------------------------===// +import Logging import UnixSignals +let deprecatedLoggerLabel = "service-lifecycle-deprecated-method-logger" + /// The configuration for the ``ServiceGroup``. -public struct ServiceGroupConfiguration: Hashable, Sendable { +public struct ServiceGroupConfiguration: Sendable { /// The group's logging configuration. - public struct LoggingConfiguration: Hashable, Sendable { - public struct Keys: Hashable, Sendable { + public struct LoggingConfiguration: Sendable { + public struct Keys: Sendable { /// The logging key used for logging the unix signal. public var signalKey = "signal" - /// The logging key used for logging the unix signals. - public var signalsKey = "signals" + /// The logging key used for logging the graceful shutdown unix signals. + public var gracefulShutdownSignalsKey = "gracefulShutdownSignals" + /// The logging key used for logging the cancellation unix signals. + public var cancellationSignalsKey = "cancellationSignals" /// The logging key used for logging the service. public var serviceKey = "service" /// The logging key used for logging the services. @@ -41,17 +46,139 @@ public struct ServiceGroupConfiguration: Hashable, Sendable { public init() {} } + public struct ServiceConfiguration: Sendable { + public struct TerminationBehavior: Sendable, CustomStringConvertible { + internal enum _TerminationBehavior { + case cancelGroup + case gracefullyShutdownGroup + case ignore + } + + internal let behavior: _TerminationBehavior + + public static let cancelGroup = Self(behavior: .cancelGroup) + public static let gracefullyShutdownGroup = Self(behavior: .gracefullyShutdownGroup) + public static let ignore = Self(behavior: .ignore) + + public var description: String { + switch self.behavior { + case .cancelGroup: + return "cancelGroup" + case .gracefullyShutdownGroup: + return "gracefullyShutdownGroup" + case .ignore: + return "ignore" + } + } + } + + /// The service. + public var service: any Service + /// The behavior when the service returns from its `run()` method. + public var successTerminationBehavior: TerminationBehavior + /// The behavior when the service throws from its `run()` method. + public var failureTerminationBehavior: TerminationBehavior + + /// Initializes a new ``ServiceGroupConfiguration/ServiceConfiguration``. + /// + /// - Parameters: + /// - service: The service. + /// - successTerminationBehavior: The behavior when the service returns from its `run()` method. + /// - failureTerminationBehavior: The behavior when the service throws from its `run()` method. + public init( + service: any Service, + successTerminationBehavior: TerminationBehavior = .cancelGroup, + failureTerminationBehavior: TerminationBehavior = .cancelGroup + ) { + self.service = service + self.successTerminationBehavior = successTerminationBehavior + self.failureTerminationBehavior = failureTerminationBehavior + } + } + + /// The groups's service configurations. + public var services: [ServiceConfiguration] + /// The signals that lead to graceful shutdown. - public var gracefulShutdownSignals: [UnixSignal] + public var gracefulShutdownSignals = [UnixSignal]() + + /// The signals that lead to cancellation. + public var cancellationSignals = [UnixSignal]() + + /// The group's logger. + public var logger: Logger /// The group's logging configuration. - public var logging: LoggingConfiguration + public var logging = LoggingConfiguration() /// Initializes a new ``ServiceGroupConfiguration``. /// - /// - Parameter gracefulShutdownSignals: The signals that lead to graceful shutdown. + /// - Parameters: + /// - services: The groups's service configurations. + /// - logger: The group's logger. + public init( + services: [ServiceConfiguration], + logger: Logger + ) { + self.services = services + self.logger = logger + } + + /// Initializes a new ``ServiceGroupConfiguration``. + /// + /// - Parameters: + /// - services: The groups's service configurations. + /// - gracefulShutdownSignals: The signals that lead to graceful shutdown. + /// - cancellationSignals: The signals that lead to cancellation. + /// - logger: The group's logger. + public init( + services: [ServiceConfiguration], + gracefulShutdownSignals: [UnixSignal] = [], + cancellationSignals: [UnixSignal] = [], + logger: Logger + ) { + self.services = services + self.logger = logger + self.gracefulShutdownSignals = gracefulShutdownSignals + self.cancellationSignals = cancellationSignals + } + + /// Initializes a new ``ServiceGroupConfiguration``. + /// + /// - Parameters: + /// - services: The groups's services. + /// - logger: The group's logger. + public init( + services: [Service], + logger: Logger + ) { + self.services = Array(services.map { ServiceConfiguration(service: $0) }) + self.logger = logger + } + + /// Initializes a new ``ServiceGroupConfiguration``. + /// + /// - Parameters: + /// - services: The groups's services. + /// - gracefulShutdownSignals: The signals that lead to graceful shutdown. + /// - cancellationSignals: The signals that lead to cancellation. + /// - logger: The group's logger. + public init( + services: [Service], + gracefulShutdownSignals: [UnixSignal] = [], + cancellationSignals: [UnixSignal] = [], + logger: Logger + ) { + self.services = Array(services.map { ServiceConfiguration(service: $0) }) + self.logger = logger + self.gracefulShutdownSignals = gracefulShutdownSignals + self.cancellationSignals = cancellationSignals + } + + @available(*, deprecated) public init(gracefulShutdownSignals: [UnixSignal]) { + self.services = [] self.gracefulShutdownSignals = gracefulShutdownSignals - self.logging = .init() + self.logger = Logger(label: deprecatedLoggerLabel) } } diff --git a/Sources/UnixSignals/UnixSignalsSequence.swift b/Sources/UnixSignals/UnixSignalsSequence.swift index 14d505e..029adf8 100644 --- a/Sources/UnixSignals/UnixSignalsSequence.swift +++ b/Sources/UnixSignals/UnixSignalsSequence.swift @@ -14,11 +14,12 @@ #if canImport(Darwin) import Darwin +import Dispatch #elseif canImport(Glibc) +@preconcurrency import Dispatch import Glibc #endif import ConcurrencyHelpers -import Dispatch /// An unterminated `AsyncSequence` of ``UnixSignal``s. /// @@ -31,8 +32,8 @@ public struct UnixSignalsSequence: AsyncSequence, Sendable { public typealias Element = UnixSignal - fileprivate struct Source { - var dispatchSource: any DispatchSourceSignal + fileprivate struct Source: Sendable { + var dispatchSource: DispatchSource var signal: UnixSignal } @@ -80,7 +81,8 @@ extension UnixSignalsSequence { signal(sig.rawValue, SIG_IGN) #endif return .init( - dispatchSource: DispatchSource.makeSignalSource(signal: sig.rawValue, queue: UnixSignalsSequence.queue), + // This force-unwrap is safe since Dispatch always returns a `DispatchSource` + dispatchSource: DispatchSource.makeSignalSource(signal: sig.rawValue, queue: UnixSignalsSequence.queue) as! DispatchSource, signal: sig ) } diff --git a/Tests/ServiceLifecycleTests/ServiceGroupTests.swift b/Tests/ServiceLifecycleTests/ServiceGroupTests.swift index fa4fdbe..855f4e3 100644 --- a/Tests/ServiceLifecycleTests/ServiceGroupTests.swift +++ b/Tests/ServiceLifecycleTests/ServiceGroupTests.swift @@ -17,6 +17,8 @@ import ServiceLifecycle import UnixSignals import XCTest +private struct ExampleError: Error, Hashable {} + private actor MockService: Service, CustomStringConvertible { enum Event { case run @@ -87,7 +89,9 @@ private actor MockService: Service, CustomStringConvertible { final class ServiceGroupTests: XCTestCase { func testRun_whenAlreadyRunning() async throws { let mockService = MockService(description: "Service1") - let serviceGroup = self.makeServiceGroup(services: [mockService], configuration: .init(gracefulShutdownSignals: [])) + let serviceGroup = self.makeServiceGroup( + services: [.init(service: mockService)] + ) try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { @@ -107,7 +111,7 @@ final class ServiceGroupTests: XCTestCase { } func testRun_whenAlreadyFinished() async throws { - let group = self.makeServiceGroup(services: [], configuration: .init(gracefulShutdownSignals: [])) + let group = self.makeServiceGroup() try await group.run() @@ -117,14 +121,16 @@ final class ServiceGroupTests: XCTestCase { } func testRun_whenNoService_andNoSignal() async throws { - let group = self.makeServiceGroup(services: [], configuration: .init(gracefulShutdownSignals: [])) + let group = self.makeServiceGroup() try await group.run() } func testRun_whenNoSignal() async throws { let mockService = MockService(description: "Service1") - let serviceGroup = self.makeServiceGroup(services: [mockService], configuration: .init(gracefulShutdownSignals: [])) + let serviceGroup = self.makeServiceGroup( + services: [.init(service: mockService)] + ) await withThrowingTaskGroup(of: Void.self) { group in group.addTask { @@ -142,9 +148,11 @@ final class ServiceGroupTests: XCTestCase { } func test_whenRun_ShutdownGracefully() async throws { - let configuration = ServiceGroupConfiguration(gracefulShutdownSignals: [.sigalrm]) let mockService = MockService(description: "Service1") - let serviceGroup = self.makeServiceGroup(services: [mockService], configuration: configuration) + let serviceGroup = self.makeServiceGroup( + services: [.init(service: mockService)], + gracefulShutdownSignals: [.sigalrm] + ) await withThrowingTaskGroup(of: Void.self) { group in group.addTask { @@ -163,9 +171,11 @@ final class ServiceGroupTests: XCTestCase { } func testRun_whenServiceExitsEarly() async throws { - let configuration = ServiceGroupConfiguration(gracefulShutdownSignals: [.sigalrm]) let mockService = MockService(description: "Service1") - let serviceGroup = self.makeServiceGroup(services: [mockService], configuration: configuration) + let serviceGroup = self.makeServiceGroup( + services: [.init(service: mockService)], + gracefulShutdownSignals: [.sigalrm] + ) try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { @@ -183,11 +193,154 @@ final class ServiceGroupTests: XCTestCase { } } + func testRun_whenServiceExitsEarly_andIgnore() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1, successTerminationBehavior: .ignore), .init(service: service2, failureTerminationBehavior: .ignore)], + gracefulShutdownSignals: [.sigalrm] + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + await service1.resumeRunContinuation(with: .success(())) + + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + await service2.resumeRunContinuation(with: .failure(ExampleError())) + + try await XCTAsyncAssertNoThrow(await group.next()) + } + } + + func testRun_whenServiceExitsEarly_andShutdownGracefully() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [ + .init(service: service1), + .init(service: service2, successTerminationBehavior: .gracefullyShutdownGroup), + .init(service: service3), + ] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + await service2.resumeRunContinuation(with: .success(())) + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that the remaining two are still running + service1.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // Waiting to see that the remaining is still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // The first service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + // Waiting to see that the one remaining are still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + } + } + + func testRun_whenServiceExitsEarly_andShutdownGracefully_andThenCancels() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [ + .init(service: service1), + .init(service: service2, successTerminationBehavior: .cancelGroup), + .init(service: service3, successTerminationBehavior: .gracefullyShutdownGroup), + ] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + await service3.resumeRunContinuation(with: .success(())) + + // The second service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that all two are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the second service + await service2.resumeRunContinuation(with: .success(())) + + // Waiting to see that the remaining is still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // The first service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + // Waiting to see that the one remaining are still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + } + } + func testRun_whenServiceExitsEarly_andOtherRunningService() async throws { - let configuration = ServiceGroupConfiguration(gracefulShutdownSignals: [.sigalrm]) let shortService = MockService(description: "Service1") let longService = MockService(description: "Service2") - let serviceGroup = self.makeServiceGroup(services: [shortService, longService], configuration: configuration) + let serviceGroup = self.makeServiceGroup( + services: [.init(service: shortService), .init(service: longService)], + gracefulShutdownSignals: [.sigalrm] + ) try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { @@ -214,14 +367,14 @@ final class ServiceGroupTests: XCTestCase { } func testRun_whenServiceThrows() async throws { - let configuration = ServiceGroupConfiguration(gracefulShutdownSignals: [.sigalrm]) let service1 = MockService(description: "Service1") let service2 = MockService(description: "Service2") - let serviceGroup = self.makeServiceGroup(services: [service1, service2], configuration: configuration) + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1), .init(service: service2)], + gracefulShutdownSignals: [.sigalrm] + ) try await withThrowingTaskGroup(of: Void.self) { group in - struct ExampleError: Error, Hashable {} - group.addTask { try await serviceGroup.run() } @@ -247,12 +400,183 @@ final class ServiceGroupTests: XCTestCase { } } + func testRun_whenServiceThrows_andIgnore() async throws { + let mockService = MockService(description: "Service1") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: mockService, failureTerminationBehavior: .ignore)], + gracefulShutdownSignals: [.sigalrm] + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator = mockService.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator.next(), .run) + + await mockService.resumeRunContinuation(with: .failure(ExampleError())) + + try await XCTAsyncAssertNoThrow(await group.next()) + } + } + + func testRun_whenServiceThrows_andShutdownGracefully() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [ + .init(service: service1), + .init(service: service2, failureTerminationBehavior: .gracefullyShutdownGroup), + .init(service: service3), + ] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + await service2.resumeRunContinuation(with: .failure(ExampleError())) + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all two are still running + service1.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // Waiting to see that the remaining is still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // The first service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + // Waiting to see that the one remaining are still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + } + } + + func testCancellationSignal() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1), .init(service: service2), .init(service: service3)], + cancellationSignals: [.sigalrm] + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + let pid = getpid() + kill(pid, UnixSignal.sigalrm.rawValue) + + await XCTAsyncAssertEqual(await eventIterator1.next(), .runCancelled) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runCancelled) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runCancelled) + + // Let's exit from all services + await service1.resumeRunContinuation(with: .success(())) + await service2.resumeRunContinuation(with: .success(())) + await service3.resumeRunContinuation(with: .success(())) + + await XCTAsyncAssertNoThrow(try await group.next()) + } + } + + func testCancellationSignal_afterGracefulShutdownSignal() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1), .init(service: service2), .init(service: service3)], + gracefulShutdownSignals: [.sighup], + cancellationSignals: [.sigalrm] + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + let pid = getpid() + kill(pid, UnixSignal.sighup.rawValue) + + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all services are still running. + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Now we signal cancellation + kill(pid, UnixSignal.sigalrm.rawValue) + + await XCTAsyncAssertEqual(await eventIterator1.next(), .runCancelled) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runCancelled) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runCancelled) + + // Let's exit from all services + await service1.resumeRunContinuation(with: .success(())) + await service2.resumeRunContinuation(with: .success(())) + await service3.resumeRunContinuation(with: .success(())) + + await XCTAsyncAssertNoThrow(try await group.next()) + } + } + func testGracefulShutdownOrdering() async throws { - let configuration = ServiceGroupConfiguration(gracefulShutdownSignals: [.sigalrm]) let service1 = MockService(description: "Service1") let service2 = MockService(description: "Service2") let service3 = MockService(description: "Service3") - let serviceGroup = self.makeServiceGroup(services: [service1, service2, service3], configuration: configuration) + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1), .init(service: service2), .init(service: service3)], + gracefulShutdownSignals: [.sigalrm] + ) await withThrowingTaskGroup(of: Void.self) { group in group.addTask { @@ -310,11 +634,13 @@ final class ServiceGroupTests: XCTestCase { } func testGracefulShutdownOrdering_whenServiceThrows() async throws { - let configuration = ServiceGroupConfiguration(gracefulShutdownSignals: [.sigalrm]) let service1 = MockService(description: "Service1") let service2 = MockService(description: "Service2") let service3 = MockService(description: "Service3") - let serviceGroup = self.makeServiceGroup(services: [service1, service2, service3], configuration: configuration) + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1), .init(service: service2), .init(service: service3)], + gracefulShutdownSignals: [.sigalrm] + ) await withThrowingTaskGroup(of: Void.self) { group in group.addTask { @@ -368,11 +694,13 @@ final class ServiceGroupTests: XCTestCase { } func testGracefulShutdownOrdering_whenServiceExits() async throws { - let configuration = ServiceGroupConfiguration(gracefulShutdownSignals: [.sigalrm]) let service1 = MockService(description: "Service1") let service2 = MockService(description: "Service2") let service3 = MockService(description: "Service3") - let serviceGroup = self.makeServiceGroup(services: [service1, service2, service3], configuration: configuration) + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1), .init(service: service2), .init(service: service3)], + gracefulShutdownSignals: [.sigalrm] + ) await withThrowingTaskGroup(of: Void.self) { group in group.addTask { @@ -425,6 +753,73 @@ final class ServiceGroupTests: XCTestCase { } } + func testGracefulShutdownOrdering_whenServiceExits_andIgnoringThrows() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [ + .init(service: service1, failureTerminationBehavior: .gracefullyShutdownGroup), + .init(service: service2, failureTerminationBehavior: .ignore), + .init(service: service3, successTerminationBehavior: .ignore), + ], + gracefulShutdownSignals: [.sigalrm] + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + let pid = getpid() + kill(pid, UnixSignal.sigalrm.rawValue) + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's throw from the second service + await service2.resumeRunContinuation(with: .failure(ExampleError())) + + // The first service should still be running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's throw from the first service + await service1.resumeRunContinuation(with: .failure(ExampleError())) + + await XCTAsyncAssertNoThrow(try await group.next()) + } + } + func testNestedServiceLifecycle() async throws { struct NestedGroupService: Service { let group: ServiceGroup @@ -438,16 +833,17 @@ final class ServiceGroupTests: XCTestCase { } } - let configuration = ServiceGroupConfiguration(gracefulShutdownSignals: [.sigalrm]) let service1 = MockService(description: "Service1") let service2 = MockService(description: "Service2") let nestedGroupService = NestedGroupService( group: self.makeServiceGroup( - services: [service2], - configuration: .init(gracefulShutdownSignals: []) + services: [.init(service: service2)] ) ) - let serviceGroup = self.makeServiceGroup(services: [service1, nestedGroupService], configuration: configuration) + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1), .init(service: nestedGroupService)], + gracefulShutdownSignals: [.sigalrm] + ) await withThrowingTaskGroup(of: Void.self) { group in group.addTask { @@ -488,11 +884,12 @@ final class ServiceGroupTests: XCTestCase { } func testTriggerGracefulShutdown() async throws { - let configuration = ServiceGroupConfiguration(gracefulShutdownSignals: []) let service1 = MockService(description: "Service1") let service2 = MockService(description: "Service2") let service3 = MockService(description: "Service3") - let serviceGroup = self.makeServiceGroup(services: [service1, service2, service3], configuration: configuration) + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1), .init(service: service2), .init(service: service3)] + ) await withThrowingTaskGroup(of: Void.self) { group in group.addTask { @@ -547,16 +944,20 @@ final class ServiceGroupTests: XCTestCase { // MARK: - Helpers private func makeServiceGroup( - services: [any Service], - configuration: ServiceGroupConfiguration + services: [ServiceGroupConfiguration.ServiceConfiguration] = [], + gracefulShutdownSignals: [UnixSignal] = .init(), + cancellationSignals: [UnixSignal] = .init() ) -> ServiceGroup { var logger = Logger(label: "Tests") logger.logLevel = .debug return .init( - services: services, - configuration: configuration, - logger: logger + configuration: .init( + services: services, + gracefulShutdownSignals: gracefulShutdownSignals, + cancellationSignals: cancellationSignals, + logger: logger + ) ) } } diff --git a/Tests/ServiceLifecycleTests/XCTest+Async.swift b/Tests/ServiceLifecycleTests/XCTest+Async.swift index 5526b01..0f31954 100644 --- a/Tests/ServiceLifecycleTests/XCTest+Async.swift +++ b/Tests/ServiceLifecycleTests/XCTest+Async.swift @@ -41,7 +41,7 @@ func XCTAsyncAssertThrowsError( } } -func XCTAssertNoThrow( +func XCTAsyncAssertNoThrow( _ expression: @autoclosure () async throws -> some Any, _ message: @autoclosure () -> String = "", file: StaticString = #filePath,