Skip to content

Commit

Permalink
Expose graceful_stop for controlling graceful shutdown. (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix authored Mar 26, 2024
1 parent 1dc1482 commit 088d3e6
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 12 deletions.
72 changes: 72 additions & 0 deletions examples/grace/server.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

require '../../lib/async/container'
require 'async/io/host_endpoint'

Console.logger.debug!

module SignalWrapper
def self.trap(signal, &block)
signal = signal

original = Signal.trap(signal) do
::Signal.trap(signal, original)
block.call
end
end
end

class Controller < Async::Container::Controller
def initialize(...)
super

@endpoint = Async::IO::Endpoint.tcp("localhost", 8080)
@bound_endpoint = nil
end

def start
Console.debug(self) {"Binding to #{@endpoint}"}
@bound_endpoint = Sync{@endpoint.bound}

super
end

def setup(container)
container.run count: 2, restart: true do |instance|
SignalWrapper.trap(:INT) do
Console.debug(self) {"Closing bound instance..."}
@bound_endpoint.close
end

Sync do |task|
Console.info(self) {"Starting bound instance..."}

instance.ready!

@bound_endpoint.accept do |peer|
while true
peer.write("#{Time.now.to_s.rjust(32)}: Hello World\n")
sleep 1
end
end
end
end
end

def stop(graceful = true)
super

if @bound_endpoint
@bound_endpoint.close
@bound_endpoint = nil
end
end
end

controller = Controller.new

controller.run
19 changes: 13 additions & 6 deletions lib/async/container/controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Controller

# Initialize the controller.
# @parameter notify [Notify::Client] A client used for process readiness notifications.
def initialize(notify: Notify.open!, container_class: Container)
def initialize(notify: Notify.open!, container_class: Container, graceful_stop: true)
@container = nil
@container_class = container_class

Expand All @@ -35,6 +35,8 @@ def initialize(notify: Notify.open!, container_class: Container)
trap(SIGHUP) do
self.restart
end

@graceful_stop = graceful_stop
end

# The state of the controller.
Expand Down Expand Up @@ -96,7 +98,7 @@ def start

# Stop the container if it's running.
# @parameter graceful [Boolean] Whether to give the children instances time to shut down or to kill them immediately.
def stop(graceful = true)
def stop(graceful = @graceful_stop)
@container&.stop(graceful)
@container = nil
end
Expand Down Expand Up @@ -130,7 +132,7 @@ def restart
if container.failed?
@notify&.error!("Container failed to start!")

container.stop
container.stop(false)

raise SetupError, container
end
Expand All @@ -142,7 +144,7 @@ def restart

if old_container
Console.logger.debug(self, "Stopping old container...")
old_container&.stop
old_container&.stop(@graceful_stop)
end

@notify&.ready!
Expand All @@ -165,7 +167,9 @@ def reload

# Wait for all child processes to enter the ready state.
Console.logger.debug(self, "Waiting for startup...")

@container.wait_until_ready

Console.logger.debug(self, "Finished startup.")

if @container.failed?
Expand All @@ -182,14 +186,17 @@ def run
# I thought this was the default... but it doesn't always raise an exception unless you do this explicitly.
# We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly.
interrupt_action = Signal.trap(:INT) do
# $stderr.puts "Received INT signal, terminating...", caller
::Thread.current.raise(Interrupt)
end

terminate_action = Signal.trap(:TERM) do
# $stderr.puts "Received TERM signal, terminating...", caller
::Thread.current.raise(Terminate)
end

hangup_action = Signal.trap(:HUP) do
# $stderr.puts "Received HUP signal, restarting...", caller
::Thread.current.raise(Hangup)
end

Expand All @@ -211,11 +218,11 @@ def run
end
end
rescue Interrupt
self.stop(true)
self.stop
rescue Terminate
self.stop(false)
ensure
self.stop(true)
self.stop(false)

# Restore the interrupt handler:
Signal.trap(:INT, interrupt_action)
Expand Down
5 changes: 5 additions & 0 deletions lib/async/container/group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ def initialize
@queue = nil
end

def inspect
"#<#{self.class} running=#{@running.size}>"
end

# @attribute [Hash(IO, Fiber)] the running tasks, indexed by IO.
attr :running

Expand Down Expand Up @@ -133,6 +137,7 @@ def wait_for(channel)
protected

def wait_for_children(duration = nil)
Console.debug(self, "Waiting for children...", duration: duration)
if !@running.empty?
# Maybe consider using a proper event loop here:
readable, _, _ = ::IO.select(@running.keys, nil, nil, duration)
Expand Down
2 changes: 1 addition & 1 deletion lib/async/container/notify/console.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module Notify
# Implements a general process readiness protocol with output to the local console.
class Console < Client
# Open a notification client attached to the current console.
def self.open!(logger = ::Console.logger)
def self.open!(logger = ::Console)
self.new(logger)
end

Expand Down
4 changes: 2 additions & 2 deletions test/async/container/.bad.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@

require_relative '../../../lib/async/container/controller'

$stdout.sync = true

class Bad < Async::Container::Controller
def setup(container)
container.run(name: "bad", count: 1, restart: true) do |instance|
# Deliberately missing call to `instance.ready!`:
# instance.ready!

$stdout.puts "Ready..."
$stdout.flush

sleep
ensure
$stdout.puts "Exiting..."
$stdout.flush
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions test/async/container/.dots.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@

require_relative '../../../lib/async/container/controller'

# Console.logger.debug!
$stdout.sync = true

class Dots < Async::Container::Controller
def setup(container)
container.run(name: "dots", count: 1, restart: true) do |instance|
instance.ready!

sleep 1
# This is to avoid race conditions in the controller in test conditions.
sleep 0.1

$stdout.write "."
$stdout.flush

sleep
rescue Async::Container::Interrupt
Expand Down
45 changes: 45 additions & 0 deletions test/async/container/.graceful.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2020-2022, by Samuel Williams.

require_relative '../../../lib/async/container/controller'

$stdout.sync = true

class Graceful < Async::Container::Controller
def setup(container)
container.run(name: "graceful", count: 1, restart: true) do |instance|
instance.ready!

# This is to avoid race conditions in the controller in test conditions.
sleep 0.1

clock = Async::Clock.start

original_action = Signal.trap(:INT) do
# We ignore the int, but in practical applications you would want start a graceful shutdown.
$stdout.puts "Graceful shutdown...", clock.total

Signal.trap(:INT, original_action)
end

$stdout.puts "Ready...", clock.total

sleep
ensure
$stdout.puts "Exiting...", clock.total
end
end
end

controller = Graceful.new(graceful_stop: 1)

begin
controller.run
rescue Async::Container::Terminate
$stdout.puts "Terminated..."
rescue Interrupt
$stdout.puts "Interrupted..."
end
39 changes: 39 additions & 0 deletions test/async/container/controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,45 @@ def controller.setup(container)
end
end

with 'graceful controller' do
let(:controller_path) {File.expand_path(".graceful.rb", __dir__)}

let(:pipe) {IO.pipe}
let(:input) {pipe.first}
let(:output) {pipe.last}

let(:pid) {@pid}

def before
@pid = Process.spawn("bundle", "exec", controller_path, out: output)
output.close

super
end

def after
Process.kill(:TERM, @pid)
Process.wait(@pid)

super
end

it "has graceful shutdown" do
expect(input.gets).to be == "Ready...\n"
start_time = input.gets.to_f

Process.kill(:INT, @pid)

expect(input.gets).to be == "Graceful shutdown...\n"
graceful_shutdown_time = input.gets.to_f

expect(input.gets).to be == "Exiting...\n"
exit_time = input.gets.to_f

expect(exit_time - graceful_shutdown_time).to be >= 1.0
end
end

with 'bad controller' do
let(:controller_path) {File.expand_path(".bad.rb", __dir__)}

Expand Down

0 comments on commit 088d3e6

Please sign in to comment.