Skip to content

Commit

Permalink
lib.ptree: manage worker restarts per worker configued via worker_opts
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneia committed May 23, 2022
1 parent a757a8e commit 0996502
Showing 1 changed file with 45 additions and 25 deletions.
70 changes: 45 additions & 25 deletions src/lib/ptree/ptree.lua
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ local manager_config_spec = {
log_level = {default=default_log_level},
rpc_trace_file = {},
cpuset = {default=cpuset.global_cpuset()},
Hz = {default=100},
Hz = {default=100}
}

local worker_opt_spec = {
restart_intensity = {default=0}, -- How many restarts are permitted...
restart_period = {default=0} -- ...within period seconds.
}
Expand Down Expand Up @@ -92,18 +95,13 @@ function new_manager (conf)
ret.period = 1/conf.Hz
ret.worker_default_scheduling = conf.worker_default_scheduling
ret.workers = {}
ret.workers_aux = {}
ret.worker_app_graphs = {}
ret.state_change_listeners = {}
-- name->{aggregated=counter, active=pid->counter, archived=uint64[1]}
ret.counters = {}
-- name->{aggregated=gauge, active=pid->gauge}
ret.gauges = {}
ret.restart = {
period = conf.restart_period,
intensity = conf.restart_intensity,
count = 0,
previous = false
}

if conf.rpc_trace_file then
ret:info("Logging RPCs to %s", conf.rpc_trace_file)
Expand Down Expand Up @@ -168,7 +166,8 @@ function Manager:set_initial_configuration (configuration)
self.current_in_place_dependencies = {}

-- Start the workers and configure them.
local worker_app_graphs = self.setup_fn(configuration)
local worker_app_graphs, worker_opts = self.setup_fn(configuration)
self.workers_aux = self:compute_workers_aux(worker_app_graphs, worker_opts)

-- Calculate the dependences
self.current_in_place_dependencies =
Expand Down Expand Up @@ -298,32 +297,52 @@ function Manager:remove_stale_workers()
end
end

function Manager:can_restart()
function Manager:compute_workers_aux (worker_app_graphs, worker_opts)
worker_opts = worker_opts or {}
local workers_aux = {}
for id in pairs(worker_app_graphs) do
local worker_opt = lib.parse(worker_opts[id] or {}, worker_opt_spec)
local worker_aux = {
restart = {
period = worker_opt.restart_period,
intensity = worker_opt.restart_intensity,
count = 0,
previous = false
}
}
workers_aux[id] = worker_aux
end
return workers_aux
end

function Manager:can_restart_worker(id)
local restart = self.workers_aux[id].restart
local now = engine.now()
local expired = 0
if self.restart.previous then
local elapsed = now - self.restart.previous
expired = (elapsed / self.restart.period) * self.restart.intensity
if restart.previous then
local elapsed = now - restart.previous
expired = (elapsed / restart.period) * restart.intensity
end
self.restart.count = math.max(0, self.restart.count - expired) + 1
self.restart.previous = now
self:info('Restart intensity is at: %.1f/%.1f',
self.restart.count, self.restart.intensity)
return self.restart.count <= self.restart.intensity
restart.count = math.max(0, restart.count - expired) + 1
restart.previous = now
self:info('Restart intensity for worker %s is at: %.1f/%.1f',
id, restart.count, restart.intensity)
return restart.count <= restart.intensity
end

function Manager:restart_crashed_workers()
for id, proc in pairs(worker.status()) do
local worker = self.workers[id]
if worker and not worker.shutting_down then
if not proc.alive then
self:warn('Worker %s crashed!', id)
self:warn('Worker %s (pid %d) crashed with status %d!',
id, proc.pid, proc.status)
self:state_change_event('worker_stopped', id)
if self.workers[id].scheduling.cpu then
self.cpuset:release(self.workers[id].scheduling.cpu)
end
self.workers[id] = nil
if self:can_restart() then
if self:can_restart_worker(id) then
self:info('Restarting worker %s.', id)
self:start_worker_for_graph(id, self.worker_app_graphs[id])
else
Expand Down Expand Up @@ -624,21 +643,22 @@ function Manager:update_configuration (update_fn, verb, path, ...)
self.schema_name, self.current_configuration, verb, path,
self.current_in_place_dependencies, ...)
local new_config = update_fn(self.current_configuration, ...)
local new_graphs = self.setup_fn(new_config, ...)
local new_graphs, new_opts = self.setup_fn(new_config, ...)
self.workers_aux = self:compute_workers_aux(new_graphs, new_opts)
for id, graph in pairs(new_graphs) do
if self.workers[id] == nil then
self:start_worker_for_graph(id, graph)
self:start_worker_for_graph(id, graph)
end
end

for id, worker in pairs(self.workers) do
if new_graphs[id] == nil then
self:stop_worker(id)
else
local actions = self.support.compute_config_actions(
worker.graph, new_graphs[id], to_restart, verb, path, ...)
self:enqueue_config_actions_for_worker(id, actions)
worker.graph = new_graphs[id]
local actions = self.support.compute_config_actions(
worker.graph, new_graphs[id], to_restart, verb, path, ...)
self:enqueue_config_actions_for_worker(id, actions)
worker.graph = new_graphs[id]
end
end
self.current_configuration = new_config
Expand Down

0 comments on commit 0996502

Please sign in to comment.