diff --git a/src/lib/ptree/alarm_codec.lua b/src/lib/ptree/alarm_codec.lua index 7e3c812751..42ca02c7da 100644 --- a/src/lib/ptree/alarm_codec.lua +++ b/src/lib/ptree/alarm_codec.lua @@ -156,12 +156,12 @@ local alarms_channel function get_channel() if alarms_channel then return alarms_channel end - local name = '/'..S.getpid()..'/alarms-follower-channel' + local name = '/'..S.getpid()..'/alarms-worker-channel' local success, value = pcall(channel.open, name) if success then alarms_channel = value else - alarms_channel = channel.create('alarms-follower-channel', 1e6) + alarms_channel = channel.create('alarms-worker-channel', 1e6) end return alarms_channel end diff --git a/src/lib/ptree/channel.lua b/src/lib/ptree/channel.lua index 1818bb7b43..3554c57b06 100644 --- a/src/lib/ptree/channel.lua +++ b/src/lib/ptree/channel.lua @@ -3,12 +3,12 @@ module(...,package.seeall) -- A channel is a ring buffer used by the manager to send updates to a --- follower. Each follower has its own ring buffer and is the only --- reader to the buffer. The manager is the only writer to these --- buffers also. The ring buffer is just bytes; putting a message onto --- the buffer will write a header indicating the message size, then the --- bytes of the message. The channel ring buffer is mapped into shared --- memory. Access to a channel will never block or cause a system call. +-- worker. Each worker has its own ring buffer and is the only reader +-- to the buffer. The manager is the only writer to these buffers also. +-- The ring buffer is just bytes; putting a message onto the buffer will +-- write a header indicating the message size, then the bytes of the +-- message. The channel ring buffer is mapped into shared memory. +-- Access to a channel will never block or cause a system call. local ffi = require('ffi') local S = require("syscall") diff --git a/src/lib/ptree/manager.lua b/src/lib/ptree/manager.lua index 8e80906e12..706fbee134 100644 --- a/src/lib/ptree/manager.lua +++ b/src/lib/ptree/manager.lua @@ -65,7 +65,7 @@ function Manager:new (conf) ret.period = 1/conf.Hz ret.next_time = app.now() ret.worker_start_code = conf.worker_start_code - ret.followers = {} + ret.workers = {} ret.rpc_callee = rpc.prepare_callee('snabb-config-leader-v1') ret.rpc_handler = rpc.dispatch_handler(ret, 'rpc_') @@ -78,7 +78,7 @@ function Manager:set_initial_configuration (configuration) self.current_configuration = configuration self.current_in_place_dependencies = {} - -- Start the followers and configure them. + -- Start the workers and configure them. local worker_app_graphs = self.setup_fn(configuration) -- Calculate the dependences @@ -87,9 +87,9 @@ function Manager:set_initial_configuration (configuration) {}, worker_app_graphs, self.schema_name, 'load', '/', self.current_configuration) - -- Iterate over followers starting the workers and queuing up actions. + -- Iterate over workers starting the workers and queuing up actions. for id, worker_app_graph in pairs(worker_app_graphs) do - self:start_follower_for_graph(id, worker_app_graph) + self:start_worker_for_graph(id, worker_app_graph) end end @@ -99,36 +99,36 @@ function Manager:start_worker(cpu) table.insert(start_code, 1, "print('Bound data plane to CPU:',"..cpu..")") table.insert(start_code, 1, "require('lib.numa').bind_to_cpu("..cpu..")") end - return worker.start("follower", table.concat(start_code, "\n")) + return worker.start("worker", table.concat(start_code, "\n")) end function Manager:stop_worker(id) -- Tell the worker to terminate local stop_actions = {{'shutdown', {}}, {'commit', {}}} - self:enqueue_config_actions_for_follower(id, stop_actions) - self:send_messages_to_followers() - self.followers[id].shutting_down = true + self:enqueue_config_actions_for_worker(id, stop_actions) + self:send_messages_to_workers() + self.workers[id].shutting_down = true end -function Manager:remove_stale_followers() +function Manager:remove_stale_workers() local stale = {} - for id, follower in pairs(self.followers) do - if follower.shutting_down then - if S.waitpid(follower.pid, S.c.W["NOHANG"]) ~= 0 then + for id, worker in pairs(self.workers) do + if worker.shutting_down then + if S.waitpid(worker.pid, S.c.W["NOHANG"]) ~= 0 then stale[#stale + 1] = id end end end for _, id in ipairs(stale) do - if self.followers[id].cpu then - self.cpuset:release(self.followers[id].cpu) + if self.workers[id].cpu then + self.cpuset:release(self.workers[id].cpu) end - self.followers[id] = nil + self.workers[id] = nil end end -function Manager:acquire_cpu_for_follower(id, app_graph) +function Manager:acquire_cpu_for_worker(id, app_graph) local pci_addresses = {} -- Grovel through app initargs for keys named "pciaddr". Hacky! for name, init in pairs(app_graph.apps) do @@ -141,17 +141,17 @@ function Manager:acquire_cpu_for_follower(id, app_graph) return self.cpuset:acquire_for_pci_addresses(pci_addresses) end -function Manager:start_follower_for_graph(id, graph) - local cpu = self:acquire_cpu_for_follower(id, graph) - self.followers[id] = { cpu=cpu, pid=self:start_worker(cpu), queue={}, +function Manager:start_worker_for_graph(id, graph) + local cpu = self:acquire_cpu_for_worker(id, graph) + self.workers[id] = { cpu=cpu, pid=self:start_worker(cpu), queue={}, graph=graph } local actions = self.support.compute_config_actions( - app_graph.new(), self.followers[id].graph, {}, 'load') - self:enqueue_config_actions_for_follower(id, actions) - return self.followers[id] + app_graph.new(), self.workers[id].graph, {}, 'load') + self:enqueue_config_actions_for_worker(id, actions) + return self.workers[id] end -function Manager:take_follower_message_queue () +function Manager:take_worker_message_queue () local actions = self.config_action_queue self.config_action_queue = nil return actions @@ -159,17 +159,17 @@ end local verbose = os.getenv('SNABB_MANAGER_VERBOSE') and true -function Manager:enqueue_config_actions_for_follower(follower, actions) +function Manager:enqueue_config_actions_for_worker(worker, actions) for _,action in ipairs(actions) do if verbose then print('encode', action[1], unpack(action[2])) end local buf, len = action_codec.encode(action) - table.insert(self.followers[follower].queue, { buf=buf, len=len }) + table.insert(self.workers[worker].queue, { buf=buf, len=len }) end end function Manager:enqueue_config_actions (actions) - for id,_ in pairs(self.followers) do - self.enqueue_config_actions_for_follower(id, actions) + for id,_ in pairs(self.workers) do + self.enqueue_config_actions_for_worker(id, actions) end end @@ -535,19 +535,19 @@ function Manager:update_configuration (update_fn, verb, path, ...) local new_config = update_fn(self.current_configuration, ...) local new_graphs = self.setup_fn(new_config, ...) for id, graph in pairs(new_graphs) do - if self.followers[id] == nil then - self:start_follower_for_graph(id, graph) + if self.workers[id] == nil then + self:start_worker_for_graph(id, graph) end end - for id, follower in pairs(self.followers) do + for id, worker in pairs(self.workers) do if new_graphs[id] == nil then self:stop_worker(id) else local actions = self.support.compute_config_actions( - follower.graph, new_graphs[id], to_restart, verb, path, ...) - self:enqueue_config_actions_for_follower(id, actions) - follower.graph = new_graphs[id] + worker.graph, new_graphs[id], to_restart, verb, path, ...) + self:enqueue_config_actions_for_worker(id, actions) + worker.graph = new_graphs[id] end end self.current_configuration = new_config @@ -567,10 +567,10 @@ end function Manager:get_native_state () local states = {} local state_reader = self.support.compute_state_reader(self.schema_name) - for _, follower in pairs(self.followers) do - local follower_config = self.support.configuration_for_follower( - follower, self.current_configuration) - table.insert(states, state_reader(follower.pid, follower_config)) + for _, worker in pairs(self.workers) do + local worker_config = self.support.configuration_for_worker( + worker, self.current_configuration) + table.insert(states, state_reader(worker.pid, worker_config)) end return self.support.process_states(states) end @@ -838,23 +838,23 @@ function Manager:handle_calls_from_peers() end end -function Manager:send_messages_to_followers() - for _,follower in pairs(self.followers) do - if not follower.channel then - local name = '/'..tostring(follower.pid)..'/config-follower-channel' +function Manager:send_messages_to_workers() + for _,worker in pairs(self.workers) do + if not worker.channel then + local name = '/'..tostring(worker.pid)..'/config-worker-channel' local success, channel = pcall(channel.open, name) - if success then follower.channel = channel end + if success then worker.channel = channel end end - local channel = follower.channel + local channel = worker.channel if channel then - local queue = follower.queue - follower.queue = {} + local queue = worker.queue + worker.queue = {} local requeue = false for _,msg in ipairs(queue) do if not requeue then requeue = not channel:put_message(msg.buf, msg.len) end - if requeue then table.insert(follower.queue, msg) end + if requeue then table.insert(worker.queue, msg) end end end end @@ -863,36 +863,36 @@ end function Manager:pull () if app.now() < self.next_time then return end self.next_time = app.now() + self.period - self:remove_stale_followers() + self:remove_stale_workers() self:handle_calls_from_peers() - self:send_messages_to_followers() - self:receive_alarms_from_followers() + self:send_messages_to_workers() + self:receive_alarms_from_workers() end -function Manager:receive_alarms_from_followers () - for _,follower in pairs(self.followers) do - self:receive_alarms_from_follower(follower) +function Manager:receive_alarms_from_workers () + for _,worker in pairs(self.workers) do + self:receive_alarms_from_worker(worker) end end -function Manager:receive_alarms_from_follower (follower) - if not follower.alarms_channel then - local name = '/'..tostring(follower.pid)..'/alarms-follower-channel' +function Manager:receive_alarms_from_worker (worker) + if not worker.alarms_channel then + local name = '/'..tostring(worker.pid)..'/alarms-worker-channel' local success, channel = pcall(channel.open, name) if not success then return end - follower.alarms_channel = channel + worker.alarms_channel = channel end - local channel = follower.alarms_channel + local channel = worker.alarms_channel while true do local buf, len = channel:peek_message() if not buf then break end local alarm = alarm_codec.decode(buf, len) - self:handle_alarm(follower, alarm) + self:handle_alarm(worker, alarm) channel:discard_message(len) end end -function Manager:handle_alarm (follower, alarm) +function Manager:handle_alarm (worker, alarm) local fn, args = unpack(alarm) if fn == 'raise_alarm' then local key, args = alarm_codec.to_alarm(args) @@ -920,9 +920,9 @@ function Manager:stop () end function test_worker() - local follower = require("lib.ptree.follower") + local worker = require("lib.ptree.worker") local myconf = config.new() - config.app(myconf, "follower", follower.Follower, {}) + config.app(myconf, "worker", worker.Worker, {}) app.configure(myconf) app.busywait = true app.main({}) @@ -947,9 +947,9 @@ function selftest () schema_name='ietf-inet-types', initial_configuration={}}) engine.configure(graph) engine.main({ duration = 0.05, report = {showapps=true,showlinks=true}}) - assert(app.app_table.manager.followers[1]) - assert(app.app_table.manager.followers[1].graph.links) - assert(app.app_table.manager.followers[1].graph.links["source.foo -> sink.bar"]) + assert(app.app_table.manager.workers[1]) + assert(app.app_table.manager.workers[1].graph.links) + assert(app.app_table.manager.workers[1].graph.links["source.foo -> sink.bar"]) local link = app.link_table["source.foo -> sink.bar"] engine.configure(app_graph.new()) print('selftest: ok') diff --git a/src/lib/ptree/support.lua b/src/lib/ptree/support.lua index d11a6dae21..a1f45ad523 100644 --- a/src/lib/ptree/support.lua +++ b/src/lib/ptree/support.lua @@ -98,17 +98,17 @@ local function compute_objects_maybe_updated_in_place (schema_name, config, return objs end -local function record_mutable_objects_embedded_in_app_initarg (follower_id, app_name, obj, accum) +local function record_mutable_objects_embedded_in_app_initarg (worker_id, app_name, obj, accum) local function record(obj) local tab = accum[obj] if not tab then tab = {} accum[obj] = tab end - if tab[follower_id] == nil then - tab[follower_id] = {app_name} + if tab[worker_id] == nil then + tab[worker_id] = {app_name} else - table.insert(tab[follower_id], app_name) + table.insert(tab[worker_id], app_name) end end local function visit(obj) @@ -126,9 +126,9 @@ local function record_mutable_objects_embedded_in_app_initarg (follower_id, app_ visit(obj) end --- Takes a table of follower ids (app_graph_map) and returns a tabl≈e which has --- the follower id as the key and a table listing all app names --- i.e. {follower_id => {app name, ...}, ...} +-- Takes a table of worker ids (app_graph_map) and returns a tabl≈e which has +-- the worker id as the key and a table listing all app names +-- i.e. {worker_id => {app name, ...}, ...} local function compute_mutable_objects_embedded_in_app_initargs (app_graph_map) local deps = {} for id, app_graph in pairs(app_graph_map) do @@ -190,7 +190,7 @@ local function add_restarts(actions, app_graph, to_restart) return actions end -local function configuration_for_follower(follower, configuration) +local function configuration_for_worker(worker, configuration) return configuration end @@ -216,7 +216,7 @@ generic_schema_config_support = { return compute_mutable_objects_embedded_in_app_initargs(app_graph) end, compute_state_reader = compute_state_reader, - configuration_for_follower = configuration_for_follower, + configuration_for_worker = configuration_for_worker, process_states = process_states, compute_apps_to_restart_after_configuration_update = compute_apps_to_restart_after_configuration_update, diff --git a/src/lib/ptree/support/snabb-softwire-v2.lua b/src/lib/ptree/support/snabb-softwire-v2.lua index 274fd80c53..7f83d788a4 100644 --- a/src/lib/ptree/support/snabb-softwire-v2.lua +++ b/src/lib/ptree/support/snabb-softwire-v2.lua @@ -630,8 +630,8 @@ local function ietf_softwire_br_translator () return ret end -local function configuration_for_follower(follower, configuration) - return follower.graph.apps.lwaftr.arg +local function configuration_for_worker(worker, configuration) + return worker.graph.apps.lwaftr.arg end local function compute_state_reader(schema_name) @@ -700,7 +700,7 @@ function get_config_support() compute_apps_to_restart_after_configuration_update, compute_state_reader = compute_state_reader, process_states = process_states, - configuration_for_follower = configuration_for_follower, + configuration_for_worker = configuration_for_worker, translators = { ['ietf-softwire-br'] = ietf_softwire_br_translator () } } end diff --git a/src/lib/ptree/follower.lua b/src/lib/ptree/worker.lua similarity index 85% rename from src/lib/ptree/follower.lua rename to src/lib/ptree/worker.lua index e21c9a2a00..25466431b0 100644 --- a/src/lib/ptree/follower.lua +++ b/src/lib/ptree/worker.lua @@ -13,23 +13,23 @@ local channel = require("lib.ptree.channel") local action_codec = require("lib.ptree.action_codec") local alarm_codec = require("lib.ptree.alarm_codec") -Follower = { +Worker = { config = { Hz = {default=1000}, } } -function Follower:new (conf) - local ret = setmetatable({}, {__index=Follower}) +function Worker:new (conf) + local ret = setmetatable({}, {__index=Worker}) ret.period = 1/conf.Hz ret.next_time = app.now() - ret.channel = channel.create('config-follower-channel', 1e6) + ret.channel = channel.create('config-worker-channel', 1e6) ret.alarms_channel = alarm_codec.get_channel() ret.pending_actions = {} return ret end -function Follower:shutdown() +function Worker:shutdown() -- This will shutdown everything. engine.configure(app_graph.new()) @@ -37,7 +37,7 @@ function Follower:shutdown() S.exit(0) end -function Follower:commit_pending_actions() +function Worker:commit_pending_actions() local to_apply = {} local should_flush = false for _,action in ipairs(self.pending_actions) do @@ -64,7 +64,7 @@ function Follower:commit_pending_actions() if should_flush then require('jit').flush() end end -function Follower:handle_actions_from_manager() +function Worker:handle_actions_from_manager() local channel = self.channel for i=1,4 do local buf, len = channel:peek_message() @@ -79,16 +79,16 @@ function Follower:handle_actions_from_manager() end end -function Follower:pull () +function Worker:pull () if app.now() < self.next_time then return end self.next_time = app.now() + self.period self:handle_actions_from_manager() end function selftest () - print('selftest: lib.ptree.follower') + print('selftest: lib.ptree.worker') local c = config.new() - config.app(c, "follower", Follower, {}) + config.app(c, "worker", Worker, {}) engine.configure(c) engine.main({ duration = 0.0001, report = {showapps=true,showlinks=true}}) engine.configure(config.new())