Skip to content

Commit

Permalink
Rename lib.ptree.follower to lib.ptree.worker
Browse files Browse the repository at this point in the history
  • Loading branch information
wingo committed Dec 11, 2017
1 parent 3704987 commit 3619ac2
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 94 deletions.
4 changes: 2 additions & 2 deletions src/lib/ptree/alarm_codec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ local alarms_channel

function get_channel()
if alarms_channel then return alarms_channel end
local name = '/'..S.getpid()..'/alarms-follower-channel'
local name = '/'..S.getpid()..'/alarms-worker-channel'
local success, value = pcall(channel.open, name)
if success then
alarms_channel = value
else
alarms_channel = channel.create('alarms-follower-channel', 1e6)
alarms_channel = channel.create('alarms-worker-channel', 1e6)
end
return alarms_channel
end
Expand Down
12 changes: 6 additions & 6 deletions src/lib/ptree/channel.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
module(...,package.seeall)

-- A channel is a ring buffer used by the manager to send updates to a
-- follower. Each follower has its own ring buffer and is the only
-- reader to the buffer. The manager is the only writer to these
-- buffers also. The ring buffer is just bytes; putting a message onto
-- the buffer will write a header indicating the message size, then the
-- bytes of the message. The channel ring buffer is mapped into shared
-- memory. Access to a channel will never block or cause a system call.
-- worker. Each worker has its own ring buffer and is the only reader
-- to the buffer. The manager is the only writer to these buffers also.
-- The ring buffer is just bytes; putting a message onto the buffer will
-- write a header indicating the message size, then the bytes of the
-- message. The channel ring buffer is mapped into shared memory.
-- Access to a channel will never block or cause a system call.

local ffi = require('ffi')
local S = require("syscall")
Expand Down
128 changes: 64 additions & 64 deletions src/lib/ptree/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ function Manager:new (conf)
ret.period = 1/conf.Hz
ret.next_time = app.now()
ret.worker_start_code = conf.worker_start_code
ret.followers = {}
ret.workers = {}
ret.rpc_callee = rpc.prepare_callee('snabb-config-leader-v1')
ret.rpc_handler = rpc.dispatch_handler(ret, 'rpc_')

Expand All @@ -78,7 +78,7 @@ function Manager:set_initial_configuration (configuration)
self.current_configuration = configuration
self.current_in_place_dependencies = {}

-- Start the followers and configure them.
-- Start the workers and configure them.
local worker_app_graphs = self.setup_fn(configuration)

-- Calculate the dependences
Expand All @@ -87,9 +87,9 @@ function Manager:set_initial_configuration (configuration)
{}, worker_app_graphs, self.schema_name, 'load',
'/', self.current_configuration)

-- Iterate over followers starting the workers and queuing up actions.
-- Iterate over workers starting the workers and queuing up actions.
for id, worker_app_graph in pairs(worker_app_graphs) do
self:start_follower_for_graph(id, worker_app_graph)
self:start_worker_for_graph(id, worker_app_graph)
end
end

Expand All @@ -99,36 +99,36 @@ function Manager:start_worker(cpu)
table.insert(start_code, 1, "print('Bound data plane to CPU:',"..cpu..")")
table.insert(start_code, 1, "require('lib.numa').bind_to_cpu("..cpu..")")
end
return worker.start("follower", table.concat(start_code, "\n"))
return worker.start("worker", table.concat(start_code, "\n"))
end

function Manager:stop_worker(id)
-- Tell the worker to terminate
local stop_actions = {{'shutdown', {}}, {'commit', {}}}
self:enqueue_config_actions_for_follower(id, stop_actions)
self:send_messages_to_followers()
self.followers[id].shutting_down = true
self:enqueue_config_actions_for_worker(id, stop_actions)
self:send_messages_to_workers()
self.workers[id].shutting_down = true
end

function Manager:remove_stale_followers()
function Manager:remove_stale_workers()
local stale = {}
for id, follower in pairs(self.followers) do
if follower.shutting_down then
if S.waitpid(follower.pid, S.c.W["NOHANG"]) ~= 0 then
for id, worker in pairs(self.workers) do
if worker.shutting_down then
if S.waitpid(worker.pid, S.c.W["NOHANG"]) ~= 0 then
stale[#stale + 1] = id
end
end
end
for _, id in ipairs(stale) do
if self.followers[id].cpu then
self.cpuset:release(self.followers[id].cpu)
if self.workers[id].cpu then
self.cpuset:release(self.workers[id].cpu)
end
self.followers[id] = nil
self.workers[id] = nil

end
end

function Manager:acquire_cpu_for_follower(id, app_graph)
function Manager:acquire_cpu_for_worker(id, app_graph)
local pci_addresses = {}
-- Grovel through app initargs for keys named "pciaddr". Hacky!
for name, init in pairs(app_graph.apps) do
Expand All @@ -141,35 +141,35 @@ function Manager:acquire_cpu_for_follower(id, app_graph)
return self.cpuset:acquire_for_pci_addresses(pci_addresses)
end

function Manager:start_follower_for_graph(id, graph)
local cpu = self:acquire_cpu_for_follower(id, graph)
self.followers[id] = { cpu=cpu, pid=self:start_worker(cpu), queue={},
function Manager:start_worker_for_graph(id, graph)
local cpu = self:acquire_cpu_for_worker(id, graph)
self.workers[id] = { cpu=cpu, pid=self:start_worker(cpu), queue={},
graph=graph }
local actions = self.support.compute_config_actions(
app_graph.new(), self.followers[id].graph, {}, 'load')
self:enqueue_config_actions_for_follower(id, actions)
return self.followers[id]
app_graph.new(), self.workers[id].graph, {}, 'load')
self:enqueue_config_actions_for_worker(id, actions)
return self.workers[id]
end

function Manager:take_follower_message_queue ()
function Manager:take_worker_message_queue ()
local actions = self.config_action_queue
self.config_action_queue = nil
return actions
end

local verbose = os.getenv('SNABB_MANAGER_VERBOSE') and true

function Manager:enqueue_config_actions_for_follower(follower, actions)
function Manager:enqueue_config_actions_for_worker(worker, actions)
for _,action in ipairs(actions) do
if verbose then print('encode', action[1], unpack(action[2])) end
local buf, len = action_codec.encode(action)
table.insert(self.followers[follower].queue, { buf=buf, len=len })
table.insert(self.workers[worker].queue, { buf=buf, len=len })
end
end

function Manager:enqueue_config_actions (actions)
for id,_ in pairs(self.followers) do
self.enqueue_config_actions_for_follower(id, actions)
for id,_ in pairs(self.workers) do
self.enqueue_config_actions_for_worker(id, actions)
end
end

Expand Down Expand Up @@ -535,19 +535,19 @@ function Manager:update_configuration (update_fn, verb, path, ...)
local new_config = update_fn(self.current_configuration, ...)
local new_graphs = self.setup_fn(new_config, ...)
for id, graph in pairs(new_graphs) do
if self.followers[id] == nil then
self:start_follower_for_graph(id, graph)
if self.workers[id] == nil then
self:start_worker_for_graph(id, graph)
end
end

for id, follower in pairs(self.followers) do
for id, worker in pairs(self.workers) do
if new_graphs[id] == nil then
self:stop_worker(id)
else
local actions = self.support.compute_config_actions(
follower.graph, new_graphs[id], to_restart, verb, path, ...)
self:enqueue_config_actions_for_follower(id, actions)
follower.graph = new_graphs[id]
worker.graph, new_graphs[id], to_restart, verb, path, ...)
self:enqueue_config_actions_for_worker(id, actions)
worker.graph = new_graphs[id]
end
end
self.current_configuration = new_config
Expand All @@ -567,10 +567,10 @@ end
function Manager:get_native_state ()
local states = {}
local state_reader = self.support.compute_state_reader(self.schema_name)
for _, follower in pairs(self.followers) do
local follower_config = self.support.configuration_for_follower(
follower, self.current_configuration)
table.insert(states, state_reader(follower.pid, follower_config))
for _, worker in pairs(self.workers) do
local worker_config = self.support.configuration_for_worker(
worker, self.current_configuration)
table.insert(states, state_reader(worker.pid, worker_config))
end
return self.support.process_states(states)
end
Expand Down Expand Up @@ -838,23 +838,23 @@ function Manager:handle_calls_from_peers()
end
end

function Manager:send_messages_to_followers()
for _,follower in pairs(self.followers) do
if not follower.channel then
local name = '/'..tostring(follower.pid)..'/config-follower-channel'
function Manager:send_messages_to_workers()
for _,worker in pairs(self.workers) do
if not worker.channel then
local name = '/'..tostring(worker.pid)..'/config-worker-channel'
local success, channel = pcall(channel.open, name)
if success then follower.channel = channel end
if success then worker.channel = channel end
end
local channel = follower.channel
local channel = worker.channel
if channel then
local queue = follower.queue
follower.queue = {}
local queue = worker.queue
worker.queue = {}
local requeue = false
for _,msg in ipairs(queue) do
if not requeue then
requeue = not channel:put_message(msg.buf, msg.len)
end
if requeue then table.insert(follower.queue, msg) end
if requeue then table.insert(worker.queue, msg) end
end
end
end
Expand All @@ -863,36 +863,36 @@ end
function Manager:pull ()
if app.now() < self.next_time then return end
self.next_time = app.now() + self.period
self:remove_stale_followers()
self:remove_stale_workers()
self:handle_calls_from_peers()
self:send_messages_to_followers()
self:receive_alarms_from_followers()
self:send_messages_to_workers()
self:receive_alarms_from_workers()
end

function Manager:receive_alarms_from_followers ()
for _,follower in pairs(self.followers) do
self:receive_alarms_from_follower(follower)
function Manager:receive_alarms_from_workers ()
for _,worker in pairs(self.workers) do
self:receive_alarms_from_worker(worker)
end
end

function Manager:receive_alarms_from_follower (follower)
if not follower.alarms_channel then
local name = '/'..tostring(follower.pid)..'/alarms-follower-channel'
function Manager:receive_alarms_from_worker (worker)
if not worker.alarms_channel then
local name = '/'..tostring(worker.pid)..'/alarms-worker-channel'
local success, channel = pcall(channel.open, name)
if not success then return end
follower.alarms_channel = channel
worker.alarms_channel = channel
end
local channel = follower.alarms_channel
local channel = worker.alarms_channel
while true do
local buf, len = channel:peek_message()
if not buf then break end
local alarm = alarm_codec.decode(buf, len)
self:handle_alarm(follower, alarm)
self:handle_alarm(worker, alarm)
channel:discard_message(len)
end
end

function Manager:handle_alarm (follower, alarm)
function Manager:handle_alarm (worker, alarm)
local fn, args = unpack(alarm)
if fn == 'raise_alarm' then
local key, args = alarm_codec.to_alarm(args)
Expand Down Expand Up @@ -920,9 +920,9 @@ function Manager:stop ()
end

function test_worker()
local follower = require("lib.ptree.follower")
local worker = require("lib.ptree.worker")
local myconf = config.new()
config.app(myconf, "follower", follower.Follower, {})
config.app(myconf, "worker", worker.Worker, {})
app.configure(myconf)
app.busywait = true
app.main({})
Expand All @@ -947,9 +947,9 @@ function selftest ()
schema_name='ietf-inet-types', initial_configuration={}})
engine.configure(graph)
engine.main({ duration = 0.05, report = {showapps=true,showlinks=true}})
assert(app.app_table.manager.followers[1])
assert(app.app_table.manager.followers[1].graph.links)
assert(app.app_table.manager.followers[1].graph.links["source.foo -> sink.bar"])
assert(app.app_table.manager.workers[1])
assert(app.app_table.manager.workers[1].graph.links)
assert(app.app_table.manager.workers[1].graph.links["source.foo -> sink.bar"])
local link = app.link_table["source.foo -> sink.bar"]
engine.configure(app_graph.new())
print('selftest: ok')
Expand Down
18 changes: 9 additions & 9 deletions src/lib/ptree/support.lua
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,17 @@ local function compute_objects_maybe_updated_in_place (schema_name, config,
return objs
end

local function record_mutable_objects_embedded_in_app_initarg (follower_id, app_name, obj, accum)
local function record_mutable_objects_embedded_in_app_initarg (worker_id, app_name, obj, accum)
local function record(obj)
local tab = accum[obj]
if not tab then
tab = {}
accum[obj] = tab
end
if tab[follower_id] == nil then
tab[follower_id] = {app_name}
if tab[worker_id] == nil then
tab[worker_id] = {app_name}
else
table.insert(tab[follower_id], app_name)
table.insert(tab[worker_id], app_name)
end
end
local function visit(obj)
Expand All @@ -126,9 +126,9 @@ local function record_mutable_objects_embedded_in_app_initarg (follower_id, app_
visit(obj)
end

-- Takes a table of follower ids (app_graph_map) and returns a tabl≈e which has
-- the follower id as the key and a table listing all app names
-- i.e. {follower_id => {app name, ...}, ...}
-- Takes a table of worker ids (app_graph_map) and returns a tabl≈e which has
-- the worker id as the key and a table listing all app names
-- i.e. {worker_id => {app name, ...}, ...}
local function compute_mutable_objects_embedded_in_app_initargs (app_graph_map)
local deps = {}
for id, app_graph in pairs(app_graph_map) do
Expand Down Expand Up @@ -190,7 +190,7 @@ local function add_restarts(actions, app_graph, to_restart)
return actions
end

local function configuration_for_follower(follower, configuration)
local function configuration_for_worker(worker, configuration)
return configuration
end

Expand All @@ -216,7 +216,7 @@ generic_schema_config_support = {
return compute_mutable_objects_embedded_in_app_initargs(app_graph)
end,
compute_state_reader = compute_state_reader,
configuration_for_follower = configuration_for_follower,
configuration_for_worker = configuration_for_worker,
process_states = process_states,
compute_apps_to_restart_after_configuration_update =
compute_apps_to_restart_after_configuration_update,
Expand Down
6 changes: 3 additions & 3 deletions src/lib/ptree/support/snabb-softwire-v2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -630,8 +630,8 @@ local function ietf_softwire_br_translator ()
return ret
end

local function configuration_for_follower(follower, configuration)
return follower.graph.apps.lwaftr.arg
local function configuration_for_worker(worker, configuration)
return worker.graph.apps.lwaftr.arg
end

local function compute_state_reader(schema_name)
Expand Down Expand Up @@ -700,7 +700,7 @@ function get_config_support()
compute_apps_to_restart_after_configuration_update,
compute_state_reader = compute_state_reader,
process_states = process_states,
configuration_for_follower = configuration_for_follower,
configuration_for_worker = configuration_for_worker,
translators = { ['ietf-softwire-br'] = ietf_softwire_br_translator () }
}
end
Loading

0 comments on commit 3619ac2

Please sign in to comment.