From 088d3e6d08321614794cf45e920a49f182583562 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 27 Mar 2024 12:50:36 +1300 Subject: [PATCH] Expose `graceful_stop` for controlling graceful shutdown. (#31) --- examples/grace/server.rb | 72 +++++++++++++++++++++++++++ lib/async/container/controller.rb | 19 ++++--- lib/async/container/group.rb | 5 ++ lib/async/container/notify/console.rb | 2 +- test/async/container/.bad.rb | 4 +- test/async/container/.dots.rb | 6 +-- test/async/container/.graceful.rb | 45 +++++++++++++++++ test/async/container/controller.rb | 39 +++++++++++++++ 8 files changed, 180 insertions(+), 12 deletions(-) create mode 100755 examples/grace/server.rb create mode 100755 test/async/container/.graceful.rb diff --git a/examples/grace/server.rb b/examples/grace/server.rb new file mode 100755 index 0000000..2efd5d0 --- /dev/null +++ b/examples/grace/server.rb @@ -0,0 +1,72 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require '../../lib/async/container' +require 'async/io/host_endpoint' + +Console.logger.debug! + +module SignalWrapper + def self.trap(signal, &block) + signal = signal + + original = Signal.trap(signal) do + ::Signal.trap(signal, original) + block.call + end + end +end + +class Controller < Async::Container::Controller + def initialize(...) + super + + @endpoint = Async::IO::Endpoint.tcp("localhost", 8080) + @bound_endpoint = nil + end + + def start + Console.debug(self) {"Binding to #{@endpoint}"} + @bound_endpoint = Sync{@endpoint.bound} + + super + end + + def setup(container) + container.run count: 2, restart: true do |instance| + SignalWrapper.trap(:INT) do + Console.debug(self) {"Closing bound instance..."} + @bound_endpoint.close + end + + Sync do |task| + Console.info(self) {"Starting bound instance..."} + + instance.ready! + + @bound_endpoint.accept do |peer| + while true + peer.write("#{Time.now.to_s.rjust(32)}: Hello World\n") + sleep 1 + end + end + end + end + end + + def stop(graceful = true) + super + + if @bound_endpoint + @bound_endpoint.close + @bound_endpoint = nil + end + end +end + +controller = Controller.new + +controller.run diff --git a/lib/async/container/controller.rb b/lib/async/container/controller.rb index e68ff20..294450b 100644 --- a/lib/async/container/controller.rb +++ b/lib/async/container/controller.rb @@ -22,7 +22,7 @@ class Controller # Initialize the controller. # @parameter notify [Notify::Client] A client used for process readiness notifications. - def initialize(notify: Notify.open!, container_class: Container) + def initialize(notify: Notify.open!, container_class: Container, graceful_stop: true) @container = nil @container_class = container_class @@ -35,6 +35,8 @@ def initialize(notify: Notify.open!, container_class: Container) trap(SIGHUP) do self.restart end + + @graceful_stop = graceful_stop end # The state of the controller. @@ -96,7 +98,7 @@ def start # Stop the container if it's running. # @parameter graceful [Boolean] Whether to give the children instances time to shut down or to kill them immediately. - def stop(graceful = true) + def stop(graceful = @graceful_stop) @container&.stop(graceful) @container = nil end @@ -130,7 +132,7 @@ def restart if container.failed? @notify&.error!("Container failed to start!") - container.stop + container.stop(false) raise SetupError, container end @@ -142,7 +144,7 @@ def restart if old_container Console.logger.debug(self, "Stopping old container...") - old_container&.stop + old_container&.stop(@graceful_stop) end @notify&.ready! @@ -165,7 +167,9 @@ def reload # Wait for all child processes to enter the ready state. Console.logger.debug(self, "Waiting for startup...") + @container.wait_until_ready + Console.logger.debug(self, "Finished startup.") if @container.failed? @@ -182,14 +186,17 @@ def run # I thought this was the default... but it doesn't always raise an exception unless you do this explicitly. # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. interrupt_action = Signal.trap(:INT) do + # $stderr.puts "Received INT signal, terminating...", caller ::Thread.current.raise(Interrupt) end terminate_action = Signal.trap(:TERM) do + # $stderr.puts "Received TERM signal, terminating...", caller ::Thread.current.raise(Terminate) end hangup_action = Signal.trap(:HUP) do + # $stderr.puts "Received HUP signal, restarting...", caller ::Thread.current.raise(Hangup) end @@ -211,11 +218,11 @@ def run end end rescue Interrupt - self.stop(true) + self.stop rescue Terminate self.stop(false) ensure - self.stop(true) + self.stop(false) # Restore the interrupt handler: Signal.trap(:INT, interrupt_action) diff --git a/lib/async/container/group.rb b/lib/async/container/group.rb index 2f56fb7..f3a0989 100644 --- a/lib/async/container/group.rb +++ b/lib/async/container/group.rb @@ -20,6 +20,10 @@ def initialize @queue = nil end + def inspect + "#<#{self.class} running=#{@running.size}>" + end + # @attribute [Hash(IO, Fiber)] the running tasks, indexed by IO. attr :running @@ -133,6 +137,7 @@ def wait_for(channel) protected def wait_for_children(duration = nil) + Console.debug(self, "Waiting for children...", duration: duration) if !@running.empty? # Maybe consider using a proper event loop here: readable, _, _ = ::IO.select(@running.keys, nil, nil, duration) diff --git a/lib/async/container/notify/console.rb b/lib/async/container/notify/console.rb index ea94f98..0c8b0ef 100644 --- a/lib/async/container/notify/console.rb +++ b/lib/async/container/notify/console.rb @@ -13,7 +13,7 @@ module Notify # Implements a general process readiness protocol with output to the local console. class Console < Client # Open a notification client attached to the current console. - def self.open!(logger = ::Console.logger) + def self.open!(logger = ::Console) self.new(logger) end diff --git a/test/async/container/.bad.rb b/test/async/container/.bad.rb index 8c3b4c2..d655b16 100755 --- a/test/async/container/.bad.rb +++ b/test/async/container/.bad.rb @@ -6,6 +6,8 @@ require_relative '../../../lib/async/container/controller' +$stdout.sync = true + class Bad < Async::Container::Controller def setup(container) container.run(name: "bad", count: 1, restart: true) do |instance| @@ -13,12 +15,10 @@ def setup(container) # instance.ready! $stdout.puts "Ready..." - $stdout.flush sleep ensure $stdout.puts "Exiting..." - $stdout.flush end end end diff --git a/test/async/container/.dots.rb b/test/async/container/.dots.rb index dca311b..8050b0f 100755 --- a/test/async/container/.dots.rb +++ b/test/async/container/.dots.rb @@ -6,17 +6,17 @@ require_relative '../../../lib/async/container/controller' -# Console.logger.debug! +$stdout.sync = true class Dots < Async::Container::Controller def setup(container) container.run(name: "dots", count: 1, restart: true) do |instance| instance.ready! - sleep 1 + # This is to avoid race conditions in the controller in test conditions. + sleep 0.1 $stdout.write "." - $stdout.flush sleep rescue Async::Container::Interrupt diff --git a/test/async/container/.graceful.rb b/test/async/container/.graceful.rb new file mode 100755 index 0000000..f5f1e9d --- /dev/null +++ b/test/async/container/.graceful.rb @@ -0,0 +1,45 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2020-2022, by Samuel Williams. + +require_relative '../../../lib/async/container/controller' + +$stdout.sync = true + +class Graceful < Async::Container::Controller + def setup(container) + container.run(name: "graceful", count: 1, restart: true) do |instance| + instance.ready! + + # This is to avoid race conditions in the controller in test conditions. + sleep 0.1 + + clock = Async::Clock.start + + original_action = Signal.trap(:INT) do + # We ignore the int, but in practical applications you would want start a graceful shutdown. + $stdout.puts "Graceful shutdown...", clock.total + + Signal.trap(:INT, original_action) + end + + $stdout.puts "Ready...", clock.total + + sleep + ensure + $stdout.puts "Exiting...", clock.total + end + end +end + +controller = Graceful.new(graceful_stop: 1) + +begin + controller.run +rescue Async::Container::Terminate + $stdout.puts "Terminated..." +rescue Interrupt + $stdout.puts "Interrupted..." +end diff --git a/test/async/container/controller.rb b/test/async/container/controller.rb index 5941fd9..274ed93 100644 --- a/test/async/container/controller.rb +++ b/test/async/container/controller.rb @@ -88,6 +88,45 @@ def controller.setup(container) end end + with 'graceful controller' do + let(:controller_path) {File.expand_path(".graceful.rb", __dir__)} + + let(:pipe) {IO.pipe} + let(:input) {pipe.first} + let(:output) {pipe.last} + + let(:pid) {@pid} + + def before + @pid = Process.spawn("bundle", "exec", controller_path, out: output) + output.close + + super + end + + def after + Process.kill(:TERM, @pid) + Process.wait(@pid) + + super + end + + it "has graceful shutdown" do + expect(input.gets).to be == "Ready...\n" + start_time = input.gets.to_f + + Process.kill(:INT, @pid) + + expect(input.gets).to be == "Graceful shutdown...\n" + graceful_shutdown_time = input.gets.to_f + + expect(input.gets).to be == "Exiting...\n" + exit_time = input.gets.to_f + + expect(exit_time - graceful_shutdown_time).to be >= 1.0 + end + end + with 'bad controller' do let(:controller_path) {File.expand_path(".bad.rb", __dir__)}