diff --git a/.gitignore b/.gitignore index 7df97359ba..752b10df64 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ __pycache__ /src/programs.inc .images /lib/luajit/usr +/src/program/programs.inc diff --git a/src/apps/config/README.md b/src/apps/config/README.md deleted file mode 100644 index cb1d37dc8c..0000000000 --- a/src/apps/config/README.md +++ /dev/null @@ -1,145 +0,0 @@ -# Config leader and follower - -Sometimes you want to query the state or configuration of a running -Snabb data plane, or reload its configuration, or incrementally update -that configuration. However, you want to minimize the impact of -configuration query and update on data plane performance. The -`Leader` and `Follower` apps are here to fulfill this need, while -minimizing performance overhead. - -The high-level design is that a `Leader` app is responsible for -knowing the state and configuration of a data plane. The leader -offers an interface to allow the outside world to query the -configuration and state, and to request configuration updates. To -avoid data-plane overhead, the `Leader` app should be deployed in a -separate process. Because it knows the data-plane state, it can -respond to queries directly, without involving the data plane. It -processes update requests into a form that the data plane can handle, -and feeds those requests to the data plane via a high-performance -back-channel. - -The data plane runs a `Follower` app that reads and applies update -messages sent to it from the leader. Checking for update availability -requires just a memory access, not a system call, so the overhead of -including a follower in the data plane is very low. - -## Two protocols - -The leader communicates with its followers using a private protocol. -Because the leader and the follower are from the same Snabb version, -the details of this protocol are subject to change. The private -protocol's only design constraint is that it should cause the lowest -overhead for the data plane. - -The leader communicates with the world via a public protocol. The -"snabb config" command-line tool speaks this protocol. "snabb config -get foo /bar" will find the local Snabb instance named "foo", open the -UNIX socket that the "foo" instance is listening on, issue a request, -then read the response, then close the socket. - -## Public protocol - -The design constraint on the public protocol is that it be expressive -and future-proof. We also want to enable the leader to talk to more -than one "snabb config" at a time. In particular someone should be -able to have a long-lived "snabb config listen" session open, and that -shouldn't impede someone else from doing a "snabb config get" to read -state. - -To this end the public protocol container is very simple: - -``` -Message = Length "\n" RPC* -``` - -Length is a base-10 string of characters indicating the length of the -message. There may be a maximum length restriction. This requires -that "snabb config" build up the whole message as a string and measure -its length, but that's OK. Knowing the length ahead of time allows -"snabb config" to use nonblocking operations to slurp up the whole -message as a string. A partial read can be resumed later. The -message can then be parsed without fear of blocking the main process. - -The RPC is an RPC request or response for the -[`snabb-config-leader-v1` YANG -schema](../../lib/yang/snabb-config-leader-v1.yang), expressed in the -Snabb [textual data format for YANG data](../../lib/yang/README.md). -For example the `snabb-config-leader-v1` schema supports a -`get-config` RPC defined like this in the schema: - -```yang -rpc get-config { - input { - leaf schema { type string; mandatory true; } - leaf revision { type string; } - leaf path { type string; default "/"; } - } - output { - leaf config { type string; } - } -} -``` - -A request to this RPC might look like: - -```yang -get-config { - schema snabb-softwire-v1; - path "/foo"; -} -``` - -As you can see, non-mandatory inputs can be left out. A response -might look like: - -```yang -get-config { - config "blah blah blah"; -} -``` - -Responses are prefixed by the RPC name. One message can include a -number of RPCs; the RPCs will be made in order. See the -[`snabb-config-leader-v1` YANG -schema](../../lib/yang/snabb-config-leader-v1.yang) for full details -of available RPCs. - -## Private protocol - -The leader maintains a configuration for the program as a whole. As -it gets requests, it computes the set of changes to app graphs that -would be needed to apply that configuration. These changes are then -passed through the private protocol to the follower. No response from -the follower is necessary. - -In some remote or perhaps not so remote future, all Snabb apps will -have associated YANG schemas describing their individual -configurations. In this happy future, the generic way to ship -configurations from the leader to a follower is by the binary -serialization of YANG data, implemented already in the YANG modules. -Until then however, there is also generic Lua data without a schema. -The private protocol supports both kinds of information transfer. - -In the meantime, the way to indicate that an app's configuration data -conforms to a YANG schema is to set the `schema_name` property on the -app's class. - -The private protocol consists of binary messages passed over a ring -buffer. A follower's leader writes to the buffer, and the follower -reads from it. There are no other readers or writers. Given that a -message may in general be unbounded in size, whereas a ring buffer is -naturally fixed, messages which may include arbtrary-sized data may be -forced to put that data in the filesystem, and refer to it from the -messages in the ring buffer. Since this file system is backed by -`tmpfs`, stalls will be minimal. - -## User interface - -The above sections document how the leader and follower apps are -implemented so that a data-plane developer can understand the overhead -of run-time (re)configuration. End users won't be typing at a UNIX -socket though; we include the `snabb config` program as a command-line -interface to this functionality. - -See [the `snabb config` documentation](../../program/config/README.md) -for full details. diff --git a/src/apps/config/follower.lua b/src/apps/config/follower.lua deleted file mode 100644 index 3a1f8c6cc7..0000000000 --- a/src/apps/config/follower.lua +++ /dev/null @@ -1,96 +0,0 @@ --- Use of this source code is governed by the Apache 2.0 license; see COPYING. - -module(...,package.seeall) - -local S = require("syscall") -local ffi = require("ffi") -local yang = require("lib.yang.yang") -local rpc = require("lib.yang.rpc") -local app = require("core.app") -local shm = require("core.shm") -local app_graph = require("core.config") -local channel = require("apps.config.channel") -local action_codec = require("apps.config.action_codec") -local alarm_codec = require("apps.config.alarm_codec") - -Follower = { - config = { - Hz = {default=1000}, - } -} - -function Follower:new (conf) - local ret = setmetatable({}, {__index=Follower}) - ret.period = 1/conf.Hz - ret.next_time = app.now() - ret.channel = channel.create('config-follower-channel', 1e6) - ret.alarms_channel = alarm_codec.get_channel() - ret.pending_actions = {} - return ret -end - -function Follower:shutdown() - -- This will shutdown everything. - engine.configure(app_graph.new()) - - -- Now we can exit. - S.exit(0) -end - -function Follower:commit_pending_actions() - local to_apply = {} - local should_flush = false - for _,action in ipairs(self.pending_actions) do - local name, args = unpack(action) - if name == 'call_app_method_with_blob' then - if #to_apply > 0 then - app.apply_config_actions(to_apply) - to_apply = {} - end - local callee, method, blob = unpack(args) - local obj = assert(app.app_table[callee]) - assert(obj[method])(obj, blob) - elseif name == "shutdown" then - self:shutdown() - else - if name == 'start_app' or name == 'reconfig_app' then - should_flush = true - end - table.insert(to_apply, action) - end - end - if #to_apply > 0 then app.apply_config_actions(to_apply) end - self.pending_actions = {} - if should_flush then require('jit').flush() end -end - -function Follower:handle_actions_from_leader() - local channel = self.channel - for i=1,4 do - local buf, len = channel:peek_message() - if not buf then break end - local action = action_codec.decode(buf, len) - if action[1] == 'commit' then - self:commit_pending_actions() - else - table.insert(self.pending_actions, action) - end - channel:discard_message(len) - end -end - -function Follower:pull () - if app.now() < self.next_time then return end - self.next_time = app.now() + self.period - self:handle_actions_from_leader() -end - -function selftest () - print('selftest: apps.config.follower') - local c = config.new() - config.app(c, "follower", Follower, {}) - engine.configure(c) - engine.main({ duration = 0.0001, report = {showapps=true,showlinks=true}}) - engine.configure(config.new()) - print('selftest: ok') -end diff --git a/src/apps/lwaftr/lwaftr.lua b/src/apps/lwaftr/lwaftr.lua index e1dc1abfc0..c88a54afcf 100644 --- a/src/apps/lwaftr/lwaftr.lua +++ b/src/apps/lwaftr/lwaftr.lua @@ -473,9 +473,9 @@ function LwAftr:new(conf) return o end --- The following two methods are called by apps.config.follower in --- reaction to binding table changes, via --- apps/config/support/snabb-softwire-v2.lua. +-- The following two methods are called by lib.ptree.worker in reaction +-- to binding table changes, via +-- lib/ptree/support/snabb-softwire-v2.lua. function LwAftr:add_softwire_entry(entry_blob) self.binding_table:add_softwire_entry(entry_blob) end diff --git a/src/core/timer.lua b/src/core/timer.lua index 996caef73c..3e93f94b71 100644 --- a/src/core/timer.lua +++ b/src/core/timer.lua @@ -33,6 +33,7 @@ local function call_timers (l) if debug then print(string.format("running timer %s at tick %s", timer.name, ticks)) end + timer.next_tick = nil timer.fn(timer) if timer.repeating then activate(timer) end end @@ -49,6 +50,7 @@ function run_to_time (ns) end function activate (t) + assert(t.next_tick == nil, "timer already activated") -- Initialize time if not ticks then ticks = math.floor(tonumber(C.get_time_ns() / ns_per_tick)) @@ -59,6 +61,19 @@ function activate (t) else timers[tick] = {t} end + t.next_tick = tick +end + +function cancel (t) + if t.next_tick then + for idx, timer in ipairs(timers[t.next_tick]) do + if timer == t then + table.remove(timers[t.next_tick], idx) + t.next_tick = nil + return true + end + end + end end function new (name, fn, nanos, mode) diff --git a/src/doc/genbook.sh b/src/doc/genbook.sh index be23d89ad2..ca23aadd1a 100755 --- a/src/doc/genbook.sh +++ b/src/doc/genbook.sh @@ -100,6 +100,14 @@ $(cat $mdroot/lib/ipsec/README.md) $(cat $mdroot/program/snabbnfv/README.md) +## LISPER + +$(cat $mdroot/program/lisper/README.md) + +## Ptree + +$(cat $mdroot/program/ptree/README.md) + ## Watchdog (lib.watchdog.watchdog) $(cat $mdroot/lib/watchdog/README.md) diff --git a/src/lib/cpuset.lua b/src/lib/cpuset.lua index e92d0143b1..251ebba19a 100644 --- a/src/lib/cpuset.lua +++ b/src/lib/cpuset.lua @@ -73,13 +73,32 @@ function CPUSet:acquire(on_node) end end end + if on_node ~= nil then + for node, cpus in pairs(self.by_node) do + for cpu, avail in pairs(cpus) do + if avail then + print("Warning: No CPU available on local NUMA node "..on_node) + print("Warning: Assigning CPU "..cpu.." from remote node "..node) + cpus[cpu] = false + return cpu + end + end + end + end + for node, cpus in pairs(self.by_node) do + print("Warning: All assignable CPUs in use; " + .."leaving data-plane process without assigned CPU.") + return + end + print("Warning: No assignable CPUs declared; " + .."leaving data-plane process without assigned CPU.") end function CPUSet:release(cpu) local node = numa.cpu_get_numa_node(cpu) assert(node ~= nil, 'Failed to get NUMA node for CPU: '..cpu) - for cpu, avail in pairs(self.by_node[node]) do - if avail then + for x, avail in pairs(self.by_node[node]) do + if x == cpu then assert(self.by_node[node][cpu] == false) self.by_node[node][cpu] = true return diff --git a/src/lib/ptree/README.md b/src/lib/ptree/README.md new file mode 100644 index 0000000000..a64a54a2e1 --- /dev/null +++ b/src/lib/ptree/README.md @@ -0,0 +1,250 @@ +# Process tree (`lib.ptree`) + +When prototyping a network function, it's useful to start with a single +process that does packet forwarding. A first draft of a prototype +network function will take its configuration from command line +arguments; once it's started, you can read some information from it via +its counters but you can't affect its operation to make it do something +else without restarting it. + +As you grow a prototype network function into a production system, new +needs arise. You might want to query the state or configuration of a +running Snabb data plane. You might want to reload its configuration, +or incrementally update that configuration. However, as you add these +new capabilities, you want to minimize their impact on data plane +performance. The process tree facility is here to help with these tasks +by allowing a network function to be divided into separate management +and data-plane processes. + +Additionally, as a network function grows, you might want to dedicate +multiple CPU cores to dataplane tasks. Here too `lib.ptree` helps out, +as a management process can be responsible for multiple workers. All +you need to do is to write a function that maps your network function's +configuration to a set of app graphs\* (as a table from worker ID to app +graph). Each app graph in the result will be instantiated on a separate +worker process. If the configuration changes at run-time resulting in a +different set of worker IDs, the `ptree` manager will start new +workers and stop any old workers that are no longer necessary. + +\*: An "app graph" is an instance of `core.config`. The `ptree` +facility reserves the word "configuration" to refer to the user-facing +configuration of a network function as a whole, and uses "app graph" to +refer to the network of Snabb apps that runs in a single worker +data-plane process. + +The high-level design is that a manager from `lib.ptree.manager` is +responsible for knowing the state and configuration of a data plane. +The manager also offers an interface to allow the outside world to query +the configuration and state, and to request configuration updates. +Because it knows the data-plane state, the manager can respond to +queries directly, without involving the data plane. It processes update +requests into a form that the data plane(s) can handle, and feeds those +requests to the data plane(s) via a high-performance back-channel. + +The data planes are started and stopped by the manager as needed. +Internally they run a special main loop from `lib.ptree.worker` which, +as part of its engine breathe loop, also reads and applies update +messages sent to it from the manager. Checking for update availability +requires just a memory access, not a system call, so the overhead of the +message channel on the data plane is very low. + +## Example + +See [the example `snabb ptree` program](../../program/ptree/README.md) +for a full example. + +## API reference + +The public interface to `ptree` is the `lib.ptree.ptree` module. + +— Function **ptree.new_manager** *parameters* + +Create and start a new manager for a `ptree` process tree. *parameters* +is a table of key/value pairs. The following keys are required: + + * `schema_name`: The name of a YANG schema describing this network function. + * `setup_fn`: A function mapping a configuration to a worker set. A + worker set is a table mapping worker IDs to app graphs (`core.config` + instances). See [the setup function described in the `snabb ptree` + documentation](../../program/ptree/README.md) for a full example. + * `initial_configuration`: The initial network configuration for the + network function, for example as returned by + `lib.yang.yang.load_configuration`. Must be an instance of + `schema_name`. + +Optional entries that may be present in the *parameters* table include: + + * `socket_file_name`: The name of the socket on which to listen for + incoming connections from `snabb config` clients. See [the `snabb + config` documentation](../../program/config/README.md) for more + information. Default is `$SNABB_SHM_ROOT/PID/config-leader-socket`, + where the `$SNABB_SHM_ROOT` environment variable defaults to + `/var/run/snabb`. + * `name`: A name to claim for this process tree. `snabb config` can + address network functions by name in addition to PID. If the name is + already claimed on the local machine, an error will be signalled. + The name will be released when the manager stops. Default is not to + claim a name. + * `worker_default_scheduling`: A table of scheduling parameters to + apply to worker processes, suitable for passing to + `lib.scheduling.apply()`. + * `default_schema`: Some network functions can respond to `snabb + config` queries against multiple schemas. This parameter indicates + the default schema to expose, and defaults to *schema_name*. Using + an alternate default schema requires a bit of behind-the-scenes + plumbing to work though from `lib.ptree.support`; see the code for + details. + * `log_level`: One of `"DEBUG"`, `"INFO"`, or `"WARN"`. Default is + `"WARN"`. + * `cpuset`: A set of CPUs to devote to data-plane processes; an + instance of `lib.cpuset.new()`. Default is + `lib.cpuset.global_cpuset()`. The manager will try to bind + data-plane worker processes to CPUs local to the NUMA node of any PCI + address being used by the worker. + * `Hz`: Frequency at which to poll the config socket. Default is + 1000. + +The return value is a ptree manager object, whose public methods are as +follows: + +— Manager method **:run** *duration* + +Run a process tree, servicing configuration and state queries and +updates from remote `snabb config` clients, managing a tree of workers, +feeding configuration updates to workers, and receiving state and alarm +updates from those workers. If *duration* is passed, stop after that +many seconds; otherwise continue indefinitely. + +— Manager method **:stop** + +Stop a process tree by sending a shutdown message to all workers, +waiting for them to shut down for short time, then forcibly terminating +any remaining worker processes. The manager's socket will be closed and +the Snabb network function name will be released. + +## Internals + +### Two protocols + +The manager communicates with its worker using a private protocol. +Because the manager and the worker are from the same Snabb version, the +details of this protocol are subject to change. The private protocol's +only design constraint is that it should cause the lowest overhead for +the data plane. + +The manager communicates with the world via a public protocol. The +"snabb config" command-line tool speaks this protocol. "snabb config +get foo /bar" will find the local Snabb instance named "foo", open the +UNIX socket that the "foo" instance is listening on, issue a request, +then read the response, then close the socket. + +### Public protocol + +The design constraint on the public protocol is that it be expressive +and future-proof. We also want to enable the manager to talk to more +than one "snabb config" at a time. In particular someone should be able +to have a long-lived "snabb config listen" session open, and that +shouldn't impede someone else from doing a "snabb config get" to read +state. + +To this end the public protocol container is very simple: + +``` +Message = Length "\n" RPC* +``` + +Length is a base-10 string of characters indicating the length of the +message. There may be a maximum length restriction. This requires +that "snabb config" build up the whole message as a string and measure +its length, but that's OK. Knowing the length ahead of time allows +"snabb config" to use nonblocking operations to slurp up the whole +message as a string. A partial read can be resumed later. The +message can then be parsed without fear of blocking the main process. + +The RPC is an RPC request or response for the +[`snabb-config-leader-v1` YANG +schema](../../lib/yang/snabb-config-leader-v1.yang), expressed in the +Snabb [textual data format for YANG data](../../lib/yang/README.md). +For example the `snabb-config-leader-v1` schema supports a +`get-config` RPC defined like this in the schema: + +```yang +rpc get-config { + input { + leaf schema { type string; mandatory true; } + leaf revision { type string; } + leaf path { type string; default "/"; } + leaf print-default { type boolean; } + leaf format { type string; } + } + output { + leaf status { type uint8; default 0; } + leaf error { type string; } + leaf config { type string; } + } +} +``` + +A request to this RPC might look like: + +```yang +get-config { + schema snabb-softwire-v1; + path "/foo"; +} +``` + +As you can see, non-mandatory inputs can be left out. A response +might look like: + +```yang +get-config { + config "blah blah blah"; +} +``` + +Responses are prefixed by the RPC name. One message can include a +number of RPCs; the RPCs will be made in order. See the +[`snabb-config-leader-v1` YANG +schema](../../lib/yang/snabb-config-leader-v1.yang) for full details +of available RPCs. + +### Private protocol + +The manager maintains a configuration for the network function as a +whole. As it gets requests, it computes the set of changes to worker +app graphs that would be needed to apply that configuration. These +changes are then passed through the private protocol to the specific +workers. No response from the workers is necessary. + +In some remote or perhaps not so remote future, all Snabb apps will have +associated YANG schemas describing how they may be configured. In this +happy future, the generic way to ship app configurations from the +manager to a worker is by the binary serialization of YANG data, +implemented already in the YANG modules. Until then however, there is +also generic Lua data without a schema. The private protocol supports +both kinds of information transfer. + +In the meantime, the way to indicate that an app's configuration data +conforms to a YANG schema is to set the `schema_name` property on the +app's class. + +The private protocol consists of binary messages passed over a ring +buffer. A worker's manager writes to the buffer, and the worker reads +from it. There are no other readers or writers. Given that a message +may in general be unbounded in size, whereas a ring buffer is naturally +fixed, messages which may include arbitrary-sized data may be forced to +put that data in the filesystem, and refer to it from the messages in +the ring buffer. Since this file system is backed by `tmpfs`, stalls +will be minimal. + +## User interface + +The above sections document how the manager and worker libraries are +implemented so that a data-plane developer can understand the overhead +of using `lib.ptree` in their network function. End users won't be +typing at a UNIX socket though; we include the `snabb config` program as +a command-line interface to this functionality. + +See [the `snabb config` documentation](../../program/config/README.md) +for full details. diff --git a/src/apps/config/action_codec.lua b/src/lib/ptree/action_codec.lua similarity index 99% rename from src/apps/config/action_codec.lua rename to src/lib/ptree/action_codec.lua index 6f04559218..305be9ed91 100644 --- a/src/apps/config/action_codec.lua +++ b/src/lib/ptree/action_codec.lua @@ -194,7 +194,7 @@ function decode(buf, len) end function selftest () - print('selftest: apps.config.action_codec') + print('selftest: lib.ptree.action_codec') local function serialize(data) local tmp = random_file_name() print('serializing to:', tmp) diff --git a/src/apps/config/alarm_codec.lua b/src/lib/ptree/alarm_codec.lua similarity index 97% rename from src/apps/config/alarm_codec.lua rename to src/lib/ptree/alarm_codec.lua index 301092622a..42ca02c7da 100644 --- a/src/apps/config/alarm_codec.lua +++ b/src/lib/ptree/alarm_codec.lua @@ -3,7 +3,7 @@ module(...,package.seeall) local S = require("syscall") -local channel = require("apps.config.channel") +local channel = require("lib.ptree.channel") local ffi = require("ffi") local UINT32_MAX = 0xffffffff @@ -156,12 +156,12 @@ local alarms_channel function get_channel() if alarms_channel then return alarms_channel end - local name = '/'..S.getpid()..'/alarms-follower-channel' + local name = '/'..S.getpid()..'/alarms-worker-channel' local success, value = pcall(channel.open, name) if success then alarms_channel = value else - alarms_channel = channel.create('alarms-follower-channel', 1e6) + alarms_channel = channel.create('alarms-worker-channel', 1e6) end return alarms_channel end @@ -184,7 +184,7 @@ function alarm:normalize_args (t) return normalize(t, self.args_attrs) end --- To be used by the leader to group args into key and args. +-- To be used by the manager to group args into key and args. function to_alarm (args) local key = { resource = args[1], @@ -272,7 +272,7 @@ function declare_alarm (key, args) end function selftest () - print('selftest: apps.config.alarm_codec') + print('selftest: lib.ptree.alarm_codec') local lib = require("core.lib") local function test_alarm (name, args) local encoded, len diff --git a/src/apps/config/channel.lua b/src/lib/ptree/channel.lua similarity index 93% rename from src/apps/config/channel.lua rename to src/lib/ptree/channel.lua index 49c94c2186..3554c57b06 100644 --- a/src/apps/config/channel.lua +++ b/src/lib/ptree/channel.lua @@ -2,14 +2,13 @@ module(...,package.seeall) --- A channel is a ring buffer used by the config leader app to send --- updates to a follower. Each follower has its own ring buffer and is --- the only reader to the buffer. The config leader is the only writer --- to these buffers also. The ring buffer is just bytes; putting a --- message onto the buffer will write a header indicating the message --- size, then the bytes of the message. The channel ring buffer is --- mapped into shared memory. Access to a channel will never block or --- cause a system call. +-- A channel is a ring buffer used by the manager to send updates to a +-- worker. Each worker has its own ring buffer and is the only reader +-- to the buffer. The manager is the only writer to these buffers also. +-- The ring buffer is just bytes; putting a message onto the buffer will +-- write a header indicating the message size, then the bytes of the +-- message. The channel ring buffer is mapped into shared memory. +-- Access to a channel will never block or cause a system call. local ffi = require('ffi') local S = require("syscall") @@ -205,7 +204,7 @@ function Channel:discard_message(payload_len) end function selftest() - print('selftest: apps.config.channel') + print('selftest: lib.ptree.channel') local msg_t = ffi.typeof('struct { uint8_t a; uint8_t b; }') local ch = create('test/config-channel', (4+2)*16 + 1) local function put(i) diff --git a/src/apps/config/leader.lua b/src/lib/ptree/ptree.lua similarity index 72% rename from src/apps/config/leader.lua rename to src/lib/ptree/ptree.lua index 9bf5f4cdd7..dfa6a4515b 100644 --- a/src/apps/config/leader.lua +++ b/src/lib/ptree/ptree.lua @@ -4,9 +4,15 @@ module(...,package.seeall) local S = require("syscall") local ffi = require("ffi") +local C = ffi.C +local app_graph = require("core.config") local lib = require("core.lib") +local shm = require("core.shm") +local timer = require("core.timer") +local worker = require("core.worker") local cltable = require("lib.cltable") local cpuset = require("lib.cpuset") +local scheduling = require("lib.scheduling") local yang = require("lib.yang.yang") local data = require("lib.yang.data") local util = require("lib.yang.util") @@ -14,28 +20,30 @@ local schema = require("lib.yang.schema") local rpc = require("lib.yang.rpc") local state = require("lib.yang.state") local path_mod = require("lib.yang.path") -local app = require("core.app") -local shm = require("core.shm") -local worker = require("core.worker") -local app_graph = require("core.config") -local action_codec = require("apps.config.action_codec") -local alarm_codec = require("apps.config.alarm_codec") -local support = require("apps.config.support") -local channel = require("apps.config.channel") +local action_codec = require("lib.ptree.action_codec") +local alarm_codec = require("lib.ptree.alarm_codec") +local support = require("lib.ptree.support") +local channel = require("lib.ptree.channel") local alarms = require("lib.yang.alarms") -Leader = { - config = { - socket_file_name = {default='config-leader-socket'}, - setup_fn = {required=true}, - -- Could relax this requirement. - initial_configuration = {required=true}, - schema_name = {required=true}, - worker_start_code = {required=true}, - default_schema = {}, - cpuset = {default=cpuset.global_cpuset()}, - Hz = {default=100}, - } +local Manager = {} + +local log_levels = { DEBUG=1, INFO=2, WARN=3 } +local default_log_level = "WARN" +if os.getenv('SNABB_MANAGER_VERBOSE') then default_log_level = "DEBUG" end + +local manager_config_spec = { + name = {}, + socket_file_name = {default='config-leader-socket'}, + setup_fn = {required=true}, + -- Could relax this requirement. + initial_configuration = {required=true}, + schema_name = {required=true}, + worker_default_scheduling = {default={busywait=true}}, + default_schema = {}, + log_level = {default=default_log_level}, + cpuset = {default=cpuset.global_cpuset()}, + Hz = {default=100}, } local function open_socket (file) @@ -48,8 +56,12 @@ local function open_socket (file) return socket end -function Leader:new (conf) - local ret = setmetatable({}, {__index=Leader}) +function new_manager (conf) + local conf = lib.parse(conf, manager_config_spec) + + local ret = setmetatable({}, {__index=Manager}) + ret.name = conf.name + ret.log_level = assert(log_levels[conf.log_level]) ret.cpuset = conf.cpuset ret.socket_file_name = conf.socket_file_name if not ret.socket_file_name:match('^/') then @@ -59,26 +71,63 @@ function Leader:new (conf) ret.schema_name = conf.schema_name ret.default_schema = conf.default_schema or conf.schema_name ret.support = support.load_schema_config_support(conf.schema_name) - ret.socket = open_socket(ret.socket_file_name) ret.peers = {} ret.setup_fn = conf.setup_fn ret.period = 1/conf.Hz - ret.next_time = app.now() - ret.worker_start_code = conf.worker_start_code - ret.followers = {} + ret.worker_default_scheduling = conf.worker_default_scheduling + ret.workers = {} + ret.state_change_listeners = {} ret.rpc_callee = rpc.prepare_callee('snabb-config-leader-v1') ret.rpc_handler = rpc.dispatch_handler(ret, 'rpc_') ret:set_initial_configuration(conf.initial_configuration) + ret:start() + return ret end -function Leader:set_initial_configuration (configuration) +function Manager:log (level, fmt, ...) + if log_levels[level] < self.log_level then return end + local prefix = os.date("%F %H:%M:%S")..": "..level..': ' + io.stderr:write(prefix..fmt:format(...)..'\n') + io.stderr:flush() +end + +function Manager:debug(fmt, ...) self:log("DEBUG", fmt, ...) end +function Manager:info(fmt, ...) self:log("INFO", fmt, ...) end +function Manager:warn(fmt, ...) self:log("WARN", fmt, ...) end + +function Manager:add_state_change_listener(listener) + table.insert(self.state_change_listeners, listener) + for id, worker in pairs(self.workers) do + listener:worker_starting(id) + if worker.channel then listener:worker_started(id, worker.pid) end + if worker.shutting_down then listener:worker_stopping(id) end + end +end + +function Manager:remove_state_change_listener(listener) + for i, x in ipairs(self.state_change_listeners) do + if x == listener then + table.remove(self.state_change_listeners, i) + return + end + end + error("listener not found") +end + +function Manager:state_change_event(event, ...) + for _,listener in ipairs(self.state_change_listeners) do + listener[event](listener, ...) + end +end + +function Manager:set_initial_configuration (configuration) self.current_configuration = configuration self.current_in_place_dependencies = {} - -- Start the followers and configure them. + -- Start the workers and configure them. local worker_app_graphs = self.setup_fn(configuration) -- Calculate the dependences @@ -87,48 +136,55 @@ function Leader:set_initial_configuration (configuration) {}, worker_app_graphs, self.schema_name, 'load', '/', self.current_configuration) - -- Iterate over followers starting the workers and queuing up actions. + -- Iterate over workers starting the workers and queuing up actions. for id, worker_app_graph in pairs(worker_app_graphs) do - self:start_follower_for_graph(id, worker_app_graph) + self:start_worker_for_graph(id, worker_app_graph) end end -function Leader:start_worker(cpu) - local start_code = { self.worker_start_code } - if cpu then - table.insert(start_code, 1, "print('Bound data plane to CPU:',"..cpu..")") - table.insert(start_code, 1, "require('lib.numa').bind_to_cpu("..cpu..")") - end - return worker.start("follower", table.concat(start_code, "\n")) +function Manager:start () + if self.name then engine.claim_name(self.name) end + self.cpuset:bind_to_numa_node() + self.socket = open_socket(self.socket_file_name) end -function Leader:stop_worker(id) - -- Tell the worker to terminate +function Manager:start_worker(sched_opts) + local code = { + scheduling.stage(sched_opts), + "require('lib.ptree.worker').main()" + } + return worker.start("worker", table.concat(code, "\n")) +end + +function Manager:stop_worker(id) + self:info('Asking worker %s to shut down.', id) local stop_actions = {{'shutdown', {}}, {'commit', {}}} - self:enqueue_config_actions_for_follower(id, stop_actions) - self:send_messages_to_followers() - self.followers[id].shutting_down = true + self:state_change_event('worker_stopping', id) + self:enqueue_config_actions_for_worker(id, stop_actions) + self:send_messages_to_workers() + self.workers[id].shutting_down = true end -function Leader:remove_stale_followers() +function Manager:remove_stale_workers() local stale = {} - for id, follower in pairs(self.followers) do - if follower.shutting_down then - if S.waitpid(follower.pid, S.c.W["NOHANG"]) ~= 0 then + for id, worker in pairs(self.workers) do + if worker.shutting_down then + if S.waitpid(worker.pid, S.c.W["NOHANG"]) ~= 0 then stale[#stale + 1] = id end end end for _, id in ipairs(stale) do - if self.followers[id].cpu then - self.cpuset:release(self.followers[id].cpu) + self:state_change_event('worker_stopped', id) + if self.workers[id].scheduling.cpu then + self.cpuset:release(self.workers[id].scheduling.cpu) end - self.followers[id] = nil + self.workers[id] = nil end end -function Leader:acquire_cpu_for_follower(id, app_graph) +function Manager:acquire_cpu_for_worker(id, app_graph) local pci_addresses = {} -- Grovel through app initargs for keys named "pciaddr". Hacky! for name, init in pairs(app_graph.apps) do @@ -141,39 +197,48 @@ function Leader:acquire_cpu_for_follower(id, app_graph) return self.cpuset:acquire_for_pci_addresses(pci_addresses) end -function Leader:start_follower_for_graph(id, graph) - local cpu = self:acquire_cpu_for_follower(id, graph) - self.followers[id] = { cpu=cpu, pid=self:start_worker(cpu), queue={}, - graph=graph } +function Manager:compute_scheduling_for_worker(id, app_graph) + local ret = {} + for k, v in pairs(self.worker_default_scheduling) do ret[k] = v end + ret.cpu = self:acquire_cpu_for_worker(id, app_graph) + return ret +end + +function Manager:start_worker_for_graph(id, graph) + local scheduling = self:compute_scheduling_for_worker(id, graph) + self:info('Starting worker %s.', id) + self.workers[id] = { scheduling=scheduling, + pid=self:start_worker(scheduling), + queue={}, graph=graph } + self:state_change_event('worker_starting', id) + self:debug('Worker %s has PID %s.', id, self.workers[id].pid) local actions = self.support.compute_config_actions( - app_graph.new(), self.followers[id].graph, {}, 'load') - self:enqueue_config_actions_for_follower(id, actions) - return self.followers[id] + app_graph.new(), self.workers[id].graph, {}, 'load') + self:enqueue_config_actions_for_worker(id, actions) + return self.workers[id] end -function Leader:take_follower_message_queue () +function Manager:take_worker_message_queue () local actions = self.config_action_queue self.config_action_queue = nil return actions end -local verbose = os.getenv('SNABB_LEADER_VERBOSE') and true - -function Leader:enqueue_config_actions_for_follower(follower, actions) +function Manager:enqueue_config_actions_for_worker(id, actions) for _,action in ipairs(actions) do - if verbose then print('encode', action[1], unpack(action[2])) end + self:debug('encode %s for worker %s', action[1], id) local buf, len = action_codec.encode(action) - table.insert(self.followers[follower].queue, { buf=buf, len=len }) + table.insert(self.workers[id].queue, { buf=buf, len=len }) end end -function Leader:enqueue_config_actions (actions) - for id,_ in pairs(self.followers) do - self.enqueue_config_actions_for_follower(id, actions) +function Manager:enqueue_config_actions (actions) + for id,_ in pairs(self.workers) do + self.enqueue_config_actions_for_worker(id, actions) end end -function Leader:rpc_describe (args) +function Manager:rpc_describe (args) local alternate_schemas = {} for schema_name, translator in pairs(self.support.translators) do table.insert(alternate_schemas, schema_name) @@ -184,6 +249,15 @@ function Leader:rpc_describe (args) capability = schema.get_default_capabilities() } end +function Manager:rpc_get_schema (args) + local function getter() + return { source = schema.load_schema_source_by_name( + args.schema, args.revision) } + end + local success, response = pcall(getter) + if success then return response else return {status=1, error=response} end +end + local function path_printer_for_grammar(grammar, path, format, print_default) local getter, subgrammar = path_mod.resolver(grammar, path) local printer @@ -210,7 +284,7 @@ local function path_printer_for_schema_by_name(schema_name, path, is_config, print_default) end -function Leader:rpc_get_config (args) +function Manager:rpc_get_config (args) local function getter() if args.schema ~= self.schema_name then return self:foreign_rpc_get_config( @@ -225,7 +299,7 @@ function Leader:rpc_get_config (args) if success then return response else return {status=1, error=response} end end -function Leader:rpc_set_alarm_operator_state (args) +function Manager:rpc_set_alarm_operator_state (args) local function getter() if args.schema ~= self.schema_name then return false, ("Set-operator-state operation not supported in".. @@ -240,7 +314,7 @@ function Leader:rpc_set_alarm_operator_state (args) if success then return response else return {status=1, error=response} end end -function Leader:rpc_purge_alarms (args) +function Manager:rpc_purge_alarms (args) local function purge() if args.schema ~= self.schema_name then return false, ("Purge-alarms operation not supported in".. @@ -252,7 +326,7 @@ function Leader:rpc_purge_alarms (args) if success then return response else return {status=1, error=response} end end -function Leader:rpc_compress_alarms (args) +function Manager:rpc_compress_alarms (args) local function compress() if args.schema ~= self.schema_name then return false, ("Compress-alarms operation not supported in".. @@ -520,13 +594,13 @@ function compute_remove_config_fn (schema_name, path) return path_remover_for_schema(yang.load_schema_by_name(schema_name), path) end -function Leader:notify_pre_update (config, verb, path, ...) +function Manager:notify_pre_update (config, verb, path, ...) for _,translator in pairs(self.support.translators) do translator.pre_update(config, verb, path, ...) end end -function Leader:update_configuration (update_fn, verb, path, ...) +function Manager:update_configuration (update_fn, verb, path, ...) self:notify_pre_update(self.current_configuration, verb, path, ...) local to_restart = self.support.compute_apps_to_restart_after_configuration_update ( @@ -535,19 +609,19 @@ function Leader:update_configuration (update_fn, verb, path, ...) local new_config = update_fn(self.current_configuration, ...) local new_graphs = self.setup_fn(new_config, ...) for id, graph in pairs(new_graphs) do - if self.followers[id] == nil then - self:start_follower_for_graph(id, graph) + if self.workers[id] == nil then + self:start_worker_for_graph(id, graph) end end - for id, follower in pairs(self.followers) do + for id, worker in pairs(self.workers) do if new_graphs[id] == nil then self:stop_worker(id) else local actions = self.support.compute_config_actions( - follower.graph, new_graphs[id], to_restart, verb, path, ...) - self:enqueue_config_actions_for_follower(id, actions) - follower.graph = new_graphs[id] + worker.graph, new_graphs[id], to_restart, verb, path, ...) + self:enqueue_config_actions_for_worker(id, actions) + worker.graph = new_graphs[id] end end self.current_configuration = new_config @@ -556,7 +630,7 @@ function Leader:update_configuration (update_fn, verb, path, ...) self.current_in_place_dependencies, new_graphs, verb, path, ...) end -function Leader:handle_rpc_update_config (args, verb, compute_update_fn) +function Manager:handle_rpc_update_config (args, verb, compute_update_fn) local path = path_mod.normalize_path(args.path) local parser = path_parser_for_schema_by_name(args.schema, path) self:update_configuration(compute_update_fn(args.schema, path), @@ -564,23 +638,23 @@ function Leader:handle_rpc_update_config (args, verb, compute_update_fn) return {} end -function Leader:get_native_state () +function Manager:get_native_state () local states = {} local state_reader = self.support.compute_state_reader(self.schema_name) - for _, follower in pairs(self.followers) do - local follower_config = self.support.configuration_for_follower( - follower, self.current_configuration) - table.insert(states, state_reader(follower.pid, follower_config)) + for _, worker in pairs(self.workers) do + local worker_config = self.support.configuration_for_worker( + worker, self.current_configuration) + table.insert(states, state_reader(worker.pid, worker_config)) end return self.support.process_states(states) end -function Leader:get_translator (schema_name) +function Manager:get_translator (schema_name) local translator = self.support.translators[schema_name] if translator then return translator end error('unsupported schema: '..schema_name) end -function Leader:apply_translated_rpc_updates (updates) +function Manager:apply_translated_rpc_updates (updates) for _,update in ipairs(updates) do local verb, args = unpack(update) local method = assert(self['rpc_'..verb..'_config']) @@ -588,7 +662,7 @@ function Leader:apply_translated_rpc_updates (updates) end return {} end -function Leader:foreign_rpc_get_config (schema_name, path, format, +function Manager:foreign_rpc_get_config (schema_name, path, format, print_default) path = path_mod.normalize_path(path) local translate = self:get_translator(schema_name) @@ -598,7 +672,7 @@ function Leader:foreign_rpc_get_config (schema_name, path, format, local config = printer(foreign_config, yang.string_output_file()) return { config = config } end -function Leader:foreign_rpc_get_state (schema_name, path, format, +function Manager:foreign_rpc_get_state (schema_name, path, format, print_default) path = path_mod.normalize_path(path) local translate = self:get_translator(schema_name) @@ -608,7 +682,7 @@ function Leader:foreign_rpc_get_state (schema_name, path, format, local state = printer(foreign_state, yang.string_output_file()) return { state = state } end -function Leader:foreign_rpc_set_config (schema_name, path, config_str) +function Manager:foreign_rpc_set_config (schema_name, path, config_str) path = path_mod.normalize_path(path) local translate = self:get_translator(schema_name) local parser = path_parser_for_schema_by_name(schema_name, path) @@ -616,7 +690,7 @@ function Leader:foreign_rpc_set_config (schema_name, path, config_str) parser(config_str)) return self:apply_translated_rpc_updates(updates) end -function Leader:foreign_rpc_add_config (schema_name, path, config_str) +function Manager:foreign_rpc_add_config (schema_name, path, config_str) path = path_mod.normalize_path(path) local translate = self:get_translator(schema_name) local parser = path_parser_for_schema_by_name(schema_name, path) @@ -624,14 +698,14 @@ function Leader:foreign_rpc_add_config (schema_name, path, config_str) parser(config_str)) return self:apply_translated_rpc_updates(updates) end -function Leader:foreign_rpc_remove_config (schema_name, path) +function Manager:foreign_rpc_remove_config (schema_name, path) path = path_mod.normalize_path(path) local translate = self:get_translator(schema_name) local updates = translate.remove_config(self.current_configuration, path) return self:apply_translated_rpc_updates(updates) end -function Leader:rpc_set_config (args) +function Manager:rpc_set_config (args) local function setter() if self.listen_peer ~= nil and self.listen_peer ~= self.rpc_peer then error('Attempt to modify configuration while listener attached') @@ -645,7 +719,7 @@ function Leader:rpc_set_config (args) if success then return response else return {status=1, error=response} end end -function Leader:rpc_add_config (args) +function Manager:rpc_add_config (args) local function adder() if self.listen_peer ~= nil and self.listen_peer ~= self.rpc_peer then error('Attempt to modify configuration while listener attached') @@ -659,7 +733,7 @@ function Leader:rpc_add_config (args) if success then return response else return {status=1, error=response} end end -function Leader:rpc_remove_config (args) +function Manager:rpc_remove_config (args) local function remover() if self.listen_peer ~= nil and self.listen_peer ~= self.rpc_peer then error('Attempt to modify configuration while listener attached') @@ -676,7 +750,7 @@ function Leader:rpc_remove_config (args) if success then return response else return {status=1, error=response} end end -function Leader:rpc_attach_listener (args) +function Manager:rpc_attach_listener (args) local function attacher() if self.listen_peer ~= nil then error('Listener already attached') end self.listen_peer = self.rpc_peer @@ -686,7 +760,7 @@ function Leader:rpc_attach_listener (args) if success then return response else return {status=1, error=response} end end -function Leader:rpc_get_state (args) +function Manager:rpc_get_state (args) local function getter() if args.schema ~= self.schema_name then return self:foreign_rpc_get_state(args.schema, args.path, @@ -701,7 +775,7 @@ function Leader:rpc_get_state (args) if success then return response else return {status=1, error=response} end end -function Leader:rpc_get_alarms_state (args) +function Manager:rpc_get_alarms_state (args) local function getter() assert(args.schema == "ietf-alarms") local printer = path_printer_for_schema_by_name( @@ -716,13 +790,13 @@ function Leader:rpc_get_alarms_state (args) if success then return response else return {status=1, error=response} end end -function Leader:handle (payload) +function Manager:handle (payload) return rpc.handle_calls(self.rpc_callee, payload, self.rpc_handler) end local dummy_unix_sockaddr = S.t.sockaddr_un() -function Leader:handle_calls_from_peers() +function Manager:handle_calls_from_peers() local peers = self.peers while true do local fd, err = self.socket:accept(dummy_unix_sockaddr) @@ -828,7 +902,7 @@ function Leader:handle_calls_from_peers() end end if peer.state == 'done' or peer.state == 'error' then - if peer.state == 'error' then print('error: '..peer.msg) end + if peer.state == 'error' then self:warn('%s', peer.msg) end peer.fd:close() table.remove(peers, i) if self.listen_peer == peer then self.listen_peer = nil end @@ -838,61 +912,56 @@ function Leader:handle_calls_from_peers() end end -function Leader:send_messages_to_followers() - for _,follower in pairs(self.followers) do - if not follower.channel then - local name = '/'..tostring(follower.pid)..'/config-follower-channel' +function Manager:send_messages_to_workers() + for id,worker in pairs(self.workers) do + if not worker.channel then + local name = '/'..tostring(worker.pid)..'/config-worker-channel' local success, channel = pcall(channel.open, name) - if success then follower.channel = channel end + if success then + worker.channel = channel + self:state_change_event('worker_started', id, worker.pid) + self:info("Worker %s has started (PID %s).", id, worker.pid) + end end - local channel = follower.channel + local channel = worker.channel if channel then - local queue = follower.queue - follower.queue = {} + local queue = worker.queue + worker.queue = {} local requeue = false for _,msg in ipairs(queue) do if not requeue then requeue = not channel:put_message(msg.buf, msg.len) end - if requeue then table.insert(follower.queue, msg) end + if requeue then table.insert(worker.queue, msg) end end end end end -function Leader:pull () - if app.now() < self.next_time then return end - self.next_time = app.now() + self.period - self:remove_stale_followers() - self:handle_calls_from_peers() - self:send_messages_to_followers() - self:receive_alarms_from_followers() -end - -function Leader:receive_alarms_from_followers () - for _,follower in pairs(self.followers) do - self:receive_alarms_from_follower(follower) +function Manager:receive_alarms_from_workers () + for _,worker in pairs(self.workers) do + self:receive_alarms_from_worker(worker) end end -function Leader:receive_alarms_from_follower (follower) - if not follower.alarms_channel then - local name = '/'..tostring(follower.pid)..'/alarms-follower-channel' +function Manager:receive_alarms_from_worker (worker) + if not worker.alarms_channel then + local name = '/'..tostring(worker.pid)..'/alarms-worker-channel' local success, channel = pcall(channel.open, name) if not success then return end - follower.alarms_channel = channel + worker.alarms_channel = channel end - local channel = follower.alarms_channel + local channel = worker.alarms_channel while true do local buf, len = channel:peek_message() if not buf then break end local alarm = alarm_codec.decode(buf, len) - self:handle_alarm(follower, alarm) + self:handle_alarm(worker, alarm) channel:discard_message(len) end end -function Leader:handle_alarm (follower, alarm) +function Manager:handle_alarm (worker, alarm) local fn, args = unpack(alarm) if fn == 'raise_alarm' then local key, args = alarm_codec.to_alarm(args) @@ -912,25 +981,62 @@ function Leader:handle_alarm (follower, alarm) end end -function Leader:stop () +function Manager:stop () for _,peer in ipairs(self.peers) do peer.fd:close() end self.peers = {} self.socket:close() S.unlink(self.socket_file_name) + + for id, worker in pairs(self.workers) do + if not worker.shutting_down then self:stop_worker(id) end + end + -- Wait 250ms for workers to shut down nicely, polling every 5ms. + local start = C.get_monotonic_time() + local wait = 0.25 + while C.get_monotonic_time() < start + wait do + self:remove_stale_workers() + if not next(self.workers) then break end + C.usleep(5000) + end + -- If that didn't work, send SIGKILL and wait indefinitely. + for id, worker in pairs(self.workers) do + self:warn('Forcing worker %s to shut down.', id) + S.kill(worker.pid, "KILL") + end + while next(self.workers) do + self:remove_stale_workers() + C.usleep(5000) + end + if self.name then engine.unclaim_name(self.name) end + self:info('Shutdown complete.') +end + +function Manager:main (duration) + local now = C.get_monotonic_time() + local stop = now + (duration or 1/0) + while now < stop do + next_time = now + self.period + if timer.ticks then timer.run_to_time(now * 1e9) end + self:remove_stale_workers() + self:handle_calls_from_peers() + self:send_messages_to_workers() + self:receive_alarms_from_workers() + now = C.get_monotonic_time() + if now < next_time then + C.usleep(math.floor((next_time - now) * 1e6)) + now = C.get_monotonic_time() + end + end end -function test_worker() - local follower = require("apps.config.follower") - local myconf = config.new() - config.app(myconf, "follower", follower.Follower, {}) - app.configure(myconf) - app.busywait = true - app.main({}) +function main (opts, duration) + local m = new_manager(opts) + m:main(duration) + m:stop() end function selftest () - print('selftest: apps.config.leader') - local graph = app_graph.new() + print('selftest: lib.ptree.ptree') local function setup_fn(cfg) local graph = app_graph.new() local basic_apps = require('apps.basic.basic_apps') @@ -939,18 +1045,30 @@ function selftest () app_graph.link(graph, "source.foo -> sink.bar") return {graph} end - local worker_start_code = "require('apps.config.leader').test_worker()" - app_graph.app(graph, "leader", Leader, - {setup_fn=setup_fn, worker_start_code=worker_start_code, - -- Use a schema with no data nodes, just for - -- testing. - schema_name='ietf-inet-types', initial_configuration={}}) - engine.configure(graph) - engine.main({ duration = 0.05, report = {showapps=true,showlinks=true}}) - assert(app.app_table.leader.followers[1]) - assert(app.app_table.leader.followers[1].graph.links) - assert(app.app_table.leader.followers[1].graph.links["source.foo -> sink.bar"]) - local link = app.link_table["source.foo -> sink.bar"] - engine.configure(app_graph.new()) + local m = new_manager({setup_fn=setup_fn, + -- Use a schema with no data nodes, just for + -- testing. + schema_name='ietf-inet-types', + initial_configuration={}, + log_level="DEBUG"}) + local l = {log={}} + function l:worker_starting(...) table.insert(self.log,{'starting',...}) end + function l:worker_started(...) table.insert(self.log,{'started',...}) end + function l:worker_stopping(...) table.insert(self.log,{'stopping',...}) end + function l:worker_stopped(...) table.insert(self.log,{'stopped',...}) end + m:add_state_change_listener(l) + assert(m.workers[1]) + local pid = m.workers[1].pid + assert(m.workers[1].graph.links) + assert(m.workers[1].graph.links["source.foo -> sink.bar"]) + -- Worker will be started once main loop starts to run. + assert(not m.workers[1].channel) + -- Wait for worker to start. + while not m.workers[1].channel do m:main(0.005) end + m:stop() + assert(m.workers[1] == nil) + assert(lib.equal(l.log, + { {'starting', 1}, {'started', 1, pid}, {'stopping', 1}, + {'stopped', 1} })) print('selftest: ok') end diff --git a/src/apps/config/support.lua b/src/lib/ptree/support.lua similarity index 92% rename from src/apps/config/support.lua rename to src/lib/ptree/support.lua index 5633adbba3..a1f45ad523 100644 --- a/src/apps/config/support.lua +++ b/src/lib/ptree/support.lua @@ -98,17 +98,17 @@ local function compute_objects_maybe_updated_in_place (schema_name, config, return objs end -local function record_mutable_objects_embedded_in_app_initarg (follower_id, app_name, obj, accum) +local function record_mutable_objects_embedded_in_app_initarg (worker_id, app_name, obj, accum) local function record(obj) local tab = accum[obj] if not tab then tab = {} accum[obj] = tab end - if tab[follower_id] == nil then - tab[follower_id] = {app_name} + if tab[worker_id] == nil then + tab[worker_id] = {app_name} else - table.insert(tab[follower_id], app_name) + table.insert(tab[worker_id], app_name) end end local function visit(obj) @@ -126,9 +126,9 @@ local function record_mutable_objects_embedded_in_app_initarg (follower_id, app_ visit(obj) end --- Takes a table of follower ids (app_graph_map) and returns a tabl≈e which has --- the follower id as the key and a table listing all app names --- i.e. {follower_id => {app name, ...}, ...} +-- Takes a table of worker ids (app_graph_map) and returns a tabl≈e which has +-- the worker id as the key and a table listing all app names +-- i.e. {worker_id => {app name, ...}, ...} local function compute_mutable_objects_embedded_in_app_initargs (app_graph_map) local deps = {} for id, app_graph in pairs(app_graph_map) do @@ -190,7 +190,7 @@ local function add_restarts(actions, app_graph, to_restart) return actions end -local function configuration_for_follower(follower, configuration) +local function configuration_for_worker(worker, configuration) return configuration end @@ -216,7 +216,7 @@ generic_schema_config_support = { return compute_mutable_objects_embedded_in_app_initargs(app_graph) end, compute_state_reader = compute_state_reader, - configuration_for_follower = configuration_for_follower, + configuration_for_worker = configuration_for_worker, process_states = process_states, compute_apps_to_restart_after_configuration_update = compute_apps_to_restart_after_configuration_update, @@ -224,7 +224,7 @@ generic_schema_config_support = { } function load_schema_config_support(schema_name) - local mod_name = 'apps.config.support.'..schema_name:gsub('-', '_') + local mod_name = 'lib.ptree.support.'..schema_name:gsub('-', '_') local success, support_mod = pcall(require, mod_name) if success then return support_mod.get_config_support() end return generic_schema_config_support diff --git a/src/apps/config/support/snabb-softwire-v2.lua b/src/lib/ptree/support/snabb-softwire-v2.lua similarity index 99% rename from src/apps/config/support/snabb-softwire-v2.lua rename to src/lib/ptree/support/snabb-softwire-v2.lua index 651dcc53e7..7f83d788a4 100644 --- a/src/apps/config/support/snabb-softwire-v2.lua +++ b/src/lib/ptree/support/snabb-softwire-v2.lua @@ -13,7 +13,7 @@ local yang = require('lib.yang.yang') local ctable = require('lib.ctable') local cltable = require('lib.cltable') local path_mod = require('lib.yang.path') -local generic = require('apps.config.support').generic_schema_config_support +local generic = require('lib.ptree.support').generic_schema_config_support local binding_table = require("apps.lwaftr.binding_table") local binding_table_instance @@ -630,8 +630,8 @@ local function ietf_softwire_br_translator () return ret end -local function configuration_for_follower(follower, configuration) - return follower.graph.apps.lwaftr.arg +local function configuration_for_worker(worker, configuration) + return worker.graph.apps.lwaftr.arg end local function compute_state_reader(schema_name) @@ -700,7 +700,7 @@ function get_config_support() compute_apps_to_restart_after_configuration_update, compute_state_reader = compute_state_reader, process_states = process_states, - configuration_for_follower = configuration_for_follower, + configuration_for_worker = configuration_for_worker, translators = { ['ietf-softwire-br'] = ietf_softwire_br_translator () } } end diff --git a/src/lib/ptree/worker.lua b/src/lib/ptree/worker.lua new file mode 100644 index 0000000000..b779cae0b0 --- /dev/null +++ b/src/lib/ptree/worker.lua @@ -0,0 +1,118 @@ +-- Use of this source code is governed by the Apache 2.0 license; see COPYING. + +module(...,package.seeall) + +local S = require("syscall") +local engine = require("core.app") +local app_graph = require("core.config") +local counter = require("core.counter") +local histogram = require('core.histogram') +local lib = require('core.lib') +local timer = require('core.timer') +local channel = require("lib.ptree.channel") +local action_codec = require("lib.ptree.action_codec") +local alarm_codec = require("lib.ptree.alarm_codec") + +local Worker = {} + +local worker_config_spec = { + duration = {}, + measure_latency = {default=true}, + no_report = {default=false}, + report = {default={showapps=true,showlinks=true}}, + Hz = {default=1000}, +} + +function new_worker (conf) + local conf = lib.parse(conf, worker_config_spec) + local ret = setmetatable({}, {__index=Worker}) + ret.period = 1/conf.Hz + ret.duration = conf.duration or 1/0 + ret.no_report = conf.no_report + ret.channel = channel.create('config-worker-channel', 1e6) + ret.alarms_channel = alarm_codec.get_channel() + ret.pending_actions = {} + + ret.breathe = engine.breathe + if conf.measure_latency then + local latency = histogram.create('engine/latency.histogram', 1e-6, 1e0) + ret.breathe = latency:wrap_thunk(ret.breathe, engine.now) + end + return ret +end + +function Worker:shutdown() + -- This will call stop() on all apps. + engine.configure(app_graph.new()) + + -- Now we can exit. + S.exit(0) +end + +function Worker:commit_pending_actions() + local to_apply = {} + local should_flush = false + for _,action in ipairs(self.pending_actions) do + local name, args = unpack(action) + if name == 'call_app_method_with_blob' then + if #to_apply > 0 then + engine.apply_config_actions(to_apply) + to_apply = {} + end + local callee, method, blob = unpack(args) + local obj = assert(engine.app_table[callee]) + assert(obj[method])(obj, blob) + elseif name == "shutdown" then + self:shutdown() + else + if name == 'start_app' or name == 'reconfig_app' then + should_flush = true + end + table.insert(to_apply, action) + end + end + if #to_apply > 0 then engine.apply_config_actions(to_apply) end + self.pending_actions = {} + if should_flush then require('jit').flush() end +end + +function Worker:handle_actions_from_manager() + local channel = self.channel + for i=1,4 do + local buf, len = channel:peek_message() + if not buf then break end + local action = action_codec.decode(buf, len) + if action[1] == 'commit' then + self:commit_pending_actions() + else + table.insert(self.pending_actions, action) + end + channel:discard_message(len) + end +end + +function Worker:main () + local stop = engine.now() + self.duration + local next_time = engine.now() + repeat + self.breathe() + if next_time < engine.now() then + next_time = engine.now() + self.period + self:handle_actions_from_manager() + timer.run() + end + if not engine.busywait then engine.pace_breathing() end + until stop < engine.now() + counter.commit() + if not self.no_report then engine.report(self.report) end +end + +function main (opts) + return new_worker(opts):main() +end + +function selftest () + print('selftest: lib.ptree.worker') + main({duration=0.005}) + print('selftest: ok') +end diff --git a/src/lib/scheduling.lua b/src/lib/scheduling.lua new file mode 100644 index 0000000000..66b43638b8 --- /dev/null +++ b/src/lib/scheduling.lua @@ -0,0 +1,105 @@ +-- Use of this source code is governed by the Apache 2.0 license; see COPYING. + +module(..., package.seeall) + +local S = require("syscall") +local lib = require("core.lib") +local numa = require("lib.numa") +local ingress_drop_monitor = require("lib.timers.ingress_drop_monitor") + +local function fatal (msg) + print(msg) + main.exit(1) +end + +local scheduling_opts = { + cpu = {}, -- CPU index (integer). + real_time = {}, -- Boolean. + ingress_drop_monitor = {}, -- Action string: one of 'flush' or 'warn'. + busywait = {}, -- Boolean. + j = {}, -- Profiling argument string, e.g. "p" or "v". + eval = {} -- String. +} + +local sched_apply = {} + +function sched_apply.cpu (cpu) + print(string.format('Binding data plane PID %s to CPU %s.', + tonumber(S.getpid()), cpu)) + numa.bind_to_cpu(cpu) +end + +function sched_apply.ingress_drop_monitor (action) + timer.activate(ingress_drop_monitor.new({action=action}):timer()) +end + +function sched_apply.real_time (real_time) + if real_time and not S.sched_setscheduler(0, "fifo", 1) then + fatal('Failed to enable real-time scheduling. Try running as root.') + end +end + +function sched_apply.j (arg) + if arg:match("^v") then + local file = arg:match("^v=(.*)") + if file == '' then file = nil end + require("jit.v").start(file) + elseif arg:match("^p") then + local opts, file = arg:match("^p=([^,]*),?(.*)") + if file == '' then file = nil end + local prof = require('jit.p') + prof.start(opts, file) + local function report() prof.stop(); prof.start(opts, file) end + timer.activate(timer.new('p', report, 10e9, 'repeating')) + elseif arg:match("^dump") then + local opts, file = arg:match("^dump=([^,]*),?(.*)") + if file == '' then file = nil end + require("jit.dump").on(opts, file) + elseif arg:match("^tprof") then + local prof = require('lib.traceprof.traceprof') + prof.start() + local function report() prof.stop(); prof.start() end + timer.activate(timer.new('tprof', report, 10e9, 'repeating')) + end +end + +function sched_apply.busywait (busywait) + engine.busywait = busywait +end + +function sched_apply.eval (str) + loadstring(str)() +end + +function apply (opts) + opts = lib.parse(opts, scheduling_opts) + for k, v in pairs(opts) do sched_apply[k](v) end +end + +local function stringify (x) + if type(x) == 'string' then return string.format('%q', x) end + if type(x) == 'number' then return tostring(x) end + if type(x) == 'boolean' then return x and 'true' or 'false' end + assert(type(x) == 'table') + local ret = {"{"} + local first = true + for k,v in pairs(x) do + if first then first = false else table.insert(ret, ",") end + table.insert(ret, string.format('[%s]=%s', stringify(k), stringify(v))) + end + table.insert(ret, "}") + return table.concat(ret) +end + +function stage (opts) + return string.format("require('lib.scheduling').apply(%s)", + stringify(lib.parse(opts, scheduling_opts))) +end + +function selftest () + print('selftest: lib.scheduling') + loadstring(stage({}))() + loadstring(stage({busywait=true}))() + loadstring(stage({eval='print("lib.scheduling: eval test")'}))() + print('selftest: ok') +end diff --git a/src/lib/yang/README.md b/src/lib/yang/README.md index f42a76b165..0df8788d47 100644 --- a/src/lib/yang/README.md +++ b/src/lib/yang/README.md @@ -230,6 +230,18 @@ schema itself, or as `import *name* { ... }` in other YANG modules that import this module. *revision* optionally indicates that a certain revision data should be required. +— Function **add_schema** *src* *filename* + +Add the YANG schema from the string *src* to Snabb's database of YANG +schemas, making it available to `load_schema_by_name` and related +functionality. *filename* is used when signalling any parse errors. +Returns the name of the newly added schema. + +— Function **add_schema_file** *filename* + +Like `add_schema`, but reads the YANG schema in from a file. Returns +the name of the newly added schema. + — Function **load_config_for_schema** *schema* *src* *filename* Given the schema object *schema*, load the configuration from the string diff --git a/src/lib/yang/alarms.lua b/src/lib/yang/alarms.lua index d5456e7eb6..b5fb24836b 100644 --- a/src/lib/yang/alarms.lua +++ b/src/lib/yang/alarms.lua @@ -3,7 +3,7 @@ module(..., package.seeall) local data = require('lib.yang.data') local lib = require('core.lib') local util = require('lib.yang.util') -local alarm_codec = require('apps.config.alarm_codec') +local alarm_codec = require('lib.ptree.alarm_codec') local counter = require("core.counter") local format_date_as_iso_8601 = util.format_date_as_iso_8601 diff --git a/src/lib/yang/schema.lua b/src/lib/yang/schema.lua index a576cbeb0f..cccaa007f7 100644 --- a/src/lib/yang/schema.lua +++ b/src/lib/yang/schema.lua @@ -933,14 +933,37 @@ function load_schema_file(filename) return inherit_config(s), e end load_schema_file = util.memoize(load_schema_file) -function load_schema_by_name(name, revision) + +function load_schema_source_by_name(name, revision) -- FIXME: @ is not valid in a Lua module name. -- if revision then name = name .. '@' .. revision end name = name:gsub('-', '_') - return load_schema(require('lib.yang.'..name..'_yang'), name) + return require('lib.yang.'..name..'_yang') +end + +function load_schema_by_name(name, revision) + return load_schema(load_schema_source_by_name(name, revision)) end load_schema_by_name = util.memoize(load_schema_by_name) +function add_schema(src, filename) + -- Assert that the source actually parses, and get the ID. + local s, e = load_schema(src, filename) + -- Assert that this schema isn't known. + assert(not pcall(load_schema_source_by_name, s.id)) + assert(s.id) + -- Intern. + package.loaded['lib.yang.'..s.id:gsub('-', '_')..'_yang'] = src + return s.id +end + +function add_schema_file(filename) + local file_in = assert(io.open(filename)) + local contents = file_in:read("*a") + file_in:close() + return add_schema(contents, filename) +end + function lookup_identity (fqid) local schema_name, id = fqid:match("^([^:]*):(.*)$") local schema, env = load_schema_by_name(schema_name) diff --git a/src/lib/yang/snabb-config-leader-v1.yang b/src/lib/yang/snabb-config-leader-v1.yang index 233cf2d120..7daf8354dc 100644 --- a/src/lib/yang/snabb-config-leader-v1.yang +++ b/src/lib/yang/snabb-config-leader-v1.yang @@ -40,6 +40,17 @@ module snabb-config-leader-v1 { } } + rpc get-schema { + input { + leaf schema { type string; mandatory true; } + leaf revision { type string; } + } + output { + uses error-reporting; + leaf source { type string; } + } + } + rpc get-config { input { leaf schema { type string; mandatory true; } diff --git a/src/lib/yang/yang.lua b/src/lib/yang/yang.lua index 3d6a40414e..fe92e6fea4 100644 --- a/src/lib/yang/yang.lua +++ b/src/lib/yang/yang.lua @@ -12,6 +12,9 @@ load_schema = schema.load_schema load_schema_file = schema.load_schema_file load_schema_by_name = schema.load_schema_by_name +add_schema = schema.add_schema +add_schema_file = schema.add_schema_file + load_config_for_schema = data.load_config_for_schema load_config_for_schema_by_name = data.load_config_for_schema_by_name diff --git a/src/program/alarms/README.md b/src/program/alarms/README.md index 958d7e4436..d63cf0a705 100644 --- a/src/program/alarms/README.md +++ b/src/program/alarms/README.md @@ -34,8 +34,8 @@ config`](../config/README.md) uses. Only some Snabb data-planes have enabled `snabb config`; currently in fact it's only the [lwAFTR](../lwaftr/doc/README.md). If you are implementing a data-plane and want to add support for alarms, first you add support for `snabb -config`. See the [`apps.config` -documentation](../../apps/config/README.md) for more. +config` by using the `lib.ptree` process tree facility. See the +[`lib.ptree` documentation](../../lib/ptree/README.md) for more. ## Resource state @@ -184,12 +184,13 @@ See [`snabb alarms compress --help`](./compress/README) for more information. ## How does it work? The Snabb instance itself should be running in *multi-process mode*, -whereby there is one main process that shepherds a number of worker +whereby there is one manager process that shepherds a number of worker processes. The workers perform the actual data-plane functionality, are typically bound to reserved CPU and NUMA nodes, and have soft-real-time -constraints. The main process however doesn't have much to do; it just -coordinates the workers. Workers tell the main process about the alarms -that they support, and then also signal the main process when an alarm -changes state. The main process collects all of the alarms and makes -them available to `snabb alarms`, over a socket. See the [`apps.config` -documentation](../../apps/config/README.md) for full details. +constraints. The manager process however doesn't have much to do; it +just coordinates the workers. Workers tell the manager process about +the alarms that they support, and then also signal the manager process +when an alarm changes state. The manager process collects all of the +alarms and makes them available to `snabb alarms`, over a socket. See +the [`lib.ptree` documentation](../../lib/ptree/README.md) for full +details. diff --git a/src/program/config/README.md b/src/program/config/README.md index b2f34f7c46..11660df997 100644 --- a/src/program/config/README.md +++ b/src/program/config/README.md @@ -371,17 +371,17 @@ relevant standardized schemas. Work here is ongoing. ## How does it work? The Snabb instance itself should be running in *multi-process mode*, -whereby there is one main process that shepherds a number of worker -processes. The workers perform the actual data-plane functionality, -are typically bound to reserved CPU and NUMA nodes, and have -soft-real-time constraints. The main process however doesn't have -much to do; it just coordinates the workers. - -The main process runs a special app in its engine that listens on a -UNIX socket for special remote procedure calls, translates those calls -to updates that the data plane should apply, and dispatches those -updates to the data plane in an efficient way. See the [`apps.config` -documentation](../../apps/config/README.md) for full details. +whereby there is one manager process that shepherds a number of worker +processes. The workers perform the actual data-plane functionality, are +typically bound to reserved CPU and NUMA nodes, and have soft-real-time +constraints. The manager process however doesn't have much to do; it +just coordinates the workers. + +The manager process runs a special event loop that listens on a UNIX +socket for remote procedure calls from `snabb config` programs, +translates those calls to updates that the data plane should apply, and +dispatches those updates to the data plane in an efficient way. See the +[`lib.ptree` documentation](../../lib/ptree/README.md) for full details. Some data planes, like the lwAFTR, add hooks to the `set`, `add`, and `remove` subcommands of `snabb config` to allow even more efficient diff --git a/src/program/config/common.lua b/src/program/config/common.lua index f367166346..bd4838b4d7 100644 --- a/src/program/config/common.lua +++ b/src/program/config/common.lua @@ -91,6 +91,13 @@ function parse_command_line(args, opts) ret.schema_name = descr.default_schema end require('lib.yang.schema').set_default_capabilities(descr.capability) + if not pcall(yang.load_schema_by_name, ret.schema_name) then + local response = call_leader( + ret.instance_id, 'get-schema', + {schema=ret.schema_name, revision=ret.revision_date}) + assert(not response.error, response.error) + yang.add_schema(response.source, ret.schema_name) + end if opts.with_config_file then if #args == 0 then err("missing config file argument") end local file = table.remove(args, 1) diff --git a/src/program/lwaftr/bench/bench.lua b/src/program/lwaftr/bench/bench.lua index 4b9239f0c7..f9cea5dc62 100644 --- a/src/program/lwaftr/bench/bench.lua +++ b/src/program/lwaftr/bench/bench.lua @@ -34,27 +34,9 @@ function parse_args(args) args = lib.dogetopt(args, handlers, "j:n:hyb:D:", { help="h", hydra="y", ["bench-file"]="b", duration="D", name="n", cpu=1}) if #args ~= 3 then show_usage(1) end - cpuset.global_cpuset():bind_to_numa_node() return opts, scheduling, unpack(args) end --- Finds current followers for leader (note it puts the pid as the key) -local function find_followers() - local followers = {} - local mypid = S.getpid() - for _, name in ipairs(shm.children("/")) do - local pid = tonumber(name) - if pid ~= nil and shm.exists("/"..pid.."/group") then - local path = S.readlink(shm.root.."/"..pid.."/group") - local parent = tonumber(lib.basename(lib.dirname(path))) - if parent == mypid then - followers[pid] = true - end - end - end - return followers -end - function run(args) local opts, scheduling, conf_file, inv4_pcap, inv6_pcap = parse_args(args) local conf = setup.read_config(conf_file) @@ -65,23 +47,29 @@ function run(args) conf.softwire_config.name = opts.name end - local graph = config.new() local function setup_fn(graph, lwconfig) return setup.load_bench(graph, lwconfig, inv4_pcap, inv6_pcap, 'sinkv4', 'sinkv6') end - setup.reconfigurable(scheduling, setup_fn, graph, conf) - app.configure(graph) + local manager = setup.ptree_manager(scheduling, setup_fn, conf) - local function start_sampling_for_pid(pid, write_header) + local stats = {csv={}} + function stats:worker_starting(id) end + function stats:worker_started(id, pid) local csv = csv_stats.CSVStatsTimer:new(opts.bench_file, opts.hydra, pid) csv:add_app('sinkv4', { 'input' }, { input=opts.hydra and 'decap' or 'Decap.' }) csv:add_app('sinkv6', { 'input' }, { input=opts.hydra and 'encap' or 'Encap.' }) - csv:activate(write_header) + self.csv[id] = csv + self.csv[id]:start() end + function stats:worker_stopping(id) + self.csv[id]:stop() + self.csv[id] = nil + end + function stats:worker_stopped(id) end + manager:add_state_change_listener(stats) - setup.start_sampling(start_sampling_for_pid) - - app.main({duration=opts.duration}) + manager:main(opts.duration) + manager:stop() end diff --git a/src/program/lwaftr/csv_stats.lua b/src/program/lwaftr/csv_stats.lua index dae22b6e45..ef2e9acd60 100644 --- a/src/program/lwaftr/csv_stats.lua +++ b/src/program/lwaftr/csv_stats.lua @@ -47,31 +47,18 @@ function CSVStatsTimer:new(filename, hydra_mode, pid) local file = filename and io.open(filename, "w") or io.stdout local o = { hydra_mode=hydra_mode, link_data={}, file=file, period=1, header = hydra_mode and "benchmark,id,score,unit" or "Time (s)"} + o.ready = false + o.deferred_apps = {} o.pid = pid or S.getpid() - o.links_by_app = open_link_counters(o.pid) return setmetatable(o, {__index = CSVStatsTimer}) end --- Add links from an app whose identifier is ID to the CSV timer. If --- present, LINKS is an array of strings identifying a subset of links --- to monitor. The optional LINK_NAMES table maps link names to --- human-readable names, for the column headers. -function CSVStatsTimer:add_app(id, links, link_names) - local function add_link_data(name, link) - local link_name = link_names[name] or name - if not self.hydra_mode then - local h = (',%s MPPS,%s Gbps'):format(link_name, link_name) - self.header = self.header..h - end - local data = { - link_name = link_name, - txpackets = link.txpackets, - txbytes = link.txbytes, - } - table.insert(self.link_data, data) - end - - local app = assert(self.links_by_app[id], "App named "..id.." not found") +function CSVStatsTimer:resolve_app(deferred) + local id, links, link_names = unpack(assert(deferred)) + self.links_by_app = open_link_counters(self.pid) + local app = self.links_by_app[id] + if not app then return false end + local resolved_links = {} for _,name in ipairs(links) do local link = app.input[name] or app.output[name] -- If we didn't find these links, allow a link name of "rx" to be @@ -84,41 +71,81 @@ function CSVStatsTimer:add_app(id, links, link_names) if name == 'rx' then link = app.input.input end if name == 'tx' then link = app.output.output end end - assert(link, "Link named "..name.." not found in "..id) - add_link_data(name, link) + if not link then return false end + table.insert(resolved_links, {name, link}) end + for _, resolved_link in ipairs(resolved_links) do + local name, link = unpack(resolved_link) + local link_name = link_names[name] or name + local data = { + link_name = link_name, + txpackets = link.txpackets, + txbytes = link.txbytes, + } + if not self.hydra_mode then + local h = (',%s MPPS,%s Gbps'):format(link_name, link_name) + self.header = self.header..h + end + table.insert(self.link_data, data) + end + return true +end + +-- Add links from an app whose identifier is ID to the CSV timer. If +-- present, LINKS is an array of strings identifying a subset of links +-- to monitor. The optional LINK_NAMES table maps link names to +-- human-readable names, for the column headers. +function CSVStatsTimer:add_app(id, links, link_names) + -- Because we are usually measuring counters from another process and + -- that process is probably spinning up as we are installing the + -- counter, we defer the resolve operation and try to resolve it from + -- inside the timer. + table.insert(self.deferred_apps, {id, links, link_names}) end function CSVStatsTimer:set_period(period) self.period = period end -- Activate the timer with a period of PERIOD seconds. -function CSVStatsTimer:activate(write_header) - if write_header then - self.file:write(self.header..'\n') - self.file:flush() +function CSVStatsTimer:start() + local function tick() return self:tick() end + self.tick_timer = timer.new('csv_stats', tick, self.period*1e9, 'repeating') + tick() + timer.activate(self.tick_timer) +end + +function CSVStatsTimer:stop() + self:tick() -- ? + timer.cancel(self.tick_timer) +end + +function CSVStatsTimer:is_ready() + if self.ready then return true end + for i,data in ipairs(self.deferred_apps) do + if not data then + -- pass + elseif self:resolve_app(data) then + self.deferred_apps[i] = false + else + return false + end end + -- print header + self.file:write(self.header..'\n') + self.file:flush() self.start = engine.now() self.prev_elapsed = 0 for _,data in ipairs(self.link_data) do data.prev_txpackets = counter.read(data.txpackets) data.prev_txbytes = counter.read(data.txbytes) end - local function tick() return self:tick() end - self.tick_timer = timer.new('csv_stats', tick, self.period*1e9, 'repeating') - timer.activate(self.tick_timer) - return self.tick_timer -end - -function CSVStatsTimer:check_alive() - -- Instances can be terminated periodically, this checks for that and if so - -- removes the timer so the3 stats don't get displayed indefinitely. - if S.waitpid(self.pid, S.c.W["NOHANG"]) ~= 0 then - self.tick_timer.repeating = false - end + self.ready = true + -- Return false for the last time, so that our first reading is + -- legit. + return false end function CSVStatsTimer:tick() - self:check_alive() + if not self:is_ready() then return end local elapsed = engine.now() - self.start local dt = elapsed - self.prev_elapsed self.prev_elapsed = elapsed diff --git a/src/program/lwaftr/query/query.lua b/src/program/lwaftr/query/query.lua index e0c57e68cd..8ba68c7057 100644 --- a/src/program/lwaftr/query/query.lua +++ b/src/program/lwaftr/query/query.lua @@ -82,8 +82,9 @@ local function print_counters (pid, filter) end end --- Return the pid that was specified, unless it was a leader process, --- in which case, return the follower pid that actually has useful counters. +-- Return the pid that was specified, unless it was a manager process, +-- in which case, return the worker pid that actually has useful +-- counters. local function pid_to_parent(pid) -- It's meaningless to get the parent of a nil 'pid'. if not pid then return pid end @@ -91,10 +92,11 @@ local function pid_to_parent(pid) for _, name in ipairs(shm.children("/")) do local p = tonumber(name) if p and ps.is_worker(p) then - local leader_pid = tonumber(ps.get_leader_pid(p)) - -- If the precomputed by-name pid is the leader pid, set the pid - -- to be the follower's pid instead to get meaningful counters. - if leader_pid == pid then pid = p end + local manager_pid = tonumber(ps.get_manager_pid(p)) + -- If the precomputed by-name pid is the manager pid, set the + -- pid to be the worker's pid instead to get meaningful + -- counters. + if manager_pid == pid then pid = p end end end return pid @@ -115,18 +117,19 @@ function run (raw_args) fatal(("Couldn't find process with name '%s'"):format(opts.name)) end - -- Check if it was run with --reconfigurable - -- If it was, find the children, then find the pid of their parent. - -- Note that this approach will break as soon as there can be multiple - -- followers which need to have their statistics aggregated, as it will - -- only print the statistics for one child, not for all of them. + -- Check if it was run with --reconfigurable If it was, find the + -- children, then find the pid of their parent. Note that this + -- approach will break as soon as there can be multiple workers + -- which need to have their statistics aggregated, as it will only + -- print the statistics for one child, not for all of them. for _, name in ipairs(shm.children("/")) do local p = tonumber(name) if p and ps.is_worker(p) then - local leader_pid = tonumber(ps.get_leader_pid(p)) - -- If the precomputed by-name pid is the leader pid, set the pid - -- to be the follower's pid instead to get meaningful counters. - if leader_pid == pid then pid = p end + local manager_pid = tonumber(ps.get_manager_pid(p)) + -- If the precomputed by-name pid is the manager pid, set + -- the pid to be the worker's pid instead to get meaningful + -- counters. + if manager_pid == pid then pid = p end end end end diff --git a/src/program/lwaftr/run/run.lua b/src/program/lwaftr/run/run.lua index 881b383be2..5b53e78aa4 100644 --- a/src/program/lwaftr/run/run.lua +++ b/src/program/lwaftr/run/run.lua @@ -119,7 +119,6 @@ function parse_args(args) if opts.mirror then assert(opts["on-a-stick"], "Mirror option is only valid in on-a-stick mode") end - cpuset.global_cpuset():bind_to_numa_node() if opts["on-a-stick"] then scheduling.pci_addrs = { v4 } return opts, scheduling, conf_file, v4 @@ -176,20 +175,19 @@ function run(args) end end - local c = config.new() + local manager = setup.ptree_manager(scheduling, setup_fn, conf) - conf.alarm_notification = true - setup.reconfigurable(scheduling, setup_fn, c, conf) - engine.configure(c) - - if opts.verbosity >= 2 then + -- FIXME: Doesn't work in multi-process environment. + if false and opts.verbosity >= 2 then local function lnicui_info() engine.report_apps() end local t = timer.new("report", lnicui_info, 1e9, 'repeating') timer.activate(t) end if opts.verbosity >= 1 then - function add_csv_stats_for_pid(pid, write_header) + local stats = {csv={}} + function stats:worker_starting(id) end + function stats:worker_started(id, pid) local csv = csv_stats.CSVStatsTimer:new(opts.bench_file, opts.hydra, pid) -- Link names like "tx" are from the app's perspective, but -- these labels are from the perspective of the lwAFTR as a @@ -205,18 +203,17 @@ function run(args) csv:add_app('inetNic', { 'tx', 'rx' }, { tx=ipv4_tx, rx=ipv4_rx }) csv:add_app('b4sideNic', { 'tx', 'rx' }, { tx=ipv6_tx, rx=ipv6_rx }) end - csv:activate(write_header) + self.csv[id] = csv + self.csv[id]:start() end - setup.start_sampling(add_csv_stats_for_pid) - end - - if opts.ingress_drop_monitor then - io.stderr:write("Warning: Ingress drop monitor not yet supported\n") + function stats:worker_stopping(id) + self.csv[id]:stop() + self.csv[id] = nil + end + function stats:worker_stopped(id) end + manager:add_state_change_listener(stats) end - if opts.duration then - engine.main({duration=opts.duration, report={showlinks=true}}) - else - engine.main({report={showlinks=true}}) - end + manager:main(opts.duration) + manager:stop() end diff --git a/src/program/lwaftr/setup.lua b/src/program/lwaftr/setup.lua index 55e9aafdb5..001ed6a6b5 100644 --- a/src/program/lwaftr/setup.lua +++ b/src/program/lwaftr/setup.lua @@ -1,8 +1,7 @@ module(..., package.seeall) local config = require("core.config") -local leader = require("apps.config.leader") -local follower = require("apps.config.follower") +local manager = require("lib.ptree.ptree") local PcapFilter = require("apps.packet_filter.pcap_filter").PcapFilter local V4V6 = require("apps.lwaftr.V4V6").V4V6 local VirtioNet = require("apps.virtio_net.virtio_net").VirtioNet @@ -562,123 +561,6 @@ function load_soak_test_on_a_stick (c, conf, inv4_pcap, inv6_pcap) link_sink(c, unpack(sinks)) end -local apply_scheduling_opts = { - pci_addrs = { default={} }, - real_time = { default=false }, - ingress_drop_monitor = { default='flush' }, - j = {} -} -function apply_scheduling(opts) - local lib = require("core.lib") - local ingress_drop_monitor = require("lib.timers.ingress_drop_monitor") - local fatal = lwutil.fatal - - opts = lib.parse(opts, apply_scheduling_opts) - if opts.ingress_drop_monitor then - local mon = ingress_drop_monitor.new({action=opts.ingress_drop_monitor}) - timer.activate(mon:timer()) - end - if opts.real_time then - if not S.sched_setscheduler(0, "fifo", 1) then - fatal('Failed to enable real-time scheduling. Try running as root.') - end - end - if opts.j then - local arg = opts.j - if arg:match("^v") then - local file = arg:match("^v=(.*)") - if file == '' then file = nil end - require("jit.v").start(file) - elseif arg:match("^p") then - local opts, file = arg:match("^p=([^,]*),?(.*)") - if file == '' then file = nil end - local prof = require('jit.p') - prof.start(opts, file) - local function report() prof.stop(); prof.start(opts, file) end - timer.activate(timer.new('p', report, 10e9, 'repeating')) - elseif arg:match("^dump") then - local opts, file = arg:match("^dump=([^,]*),?(.*)") - if file == '' then file = nil end - require("jit.dump").on(opts, file) - elseif arg:match("^tprof") then - local prof = require('lib.traceprof.traceprof') - prof.start() - local function report() prof.stop(); prof.start() end - timer.activate(timer.new('tprof', report, 10e9, 'repeating')) - end - end -end - -function run_worker(scheduling) - local app = require("core.app") - apply_scheduling(scheduling) - local myconf = config.new() - config.app(myconf, "follower", follower.Follower, {}) - app.configure(myconf) - app.busywait = true - app.main({}) -end - -local function stringify(x) - if type(x) == 'string' then return string.format('%q', x) end - if type(x) == 'number' then return tostring(x) end - if type(x) == 'boolean' then return x and 'true' or 'false' end - assert(type(x) == 'table') - local ret = {"{"} - local first = true - for k,v in pairs(x) do - if first then first = false else table.insert(ret, ",") end - table.insert(ret, string.format('[%s]=%s', stringify(k), stringify(v))) - end - table.insert(ret, "}") - return table.concat(ret) -end - --- Takes a function (which takes a follower PID) and starts sampling --- --- The function searches for followers of the leader and when a new one --- appears it calls the sampling function (passed in) with the follower --- PID to begin the sampling. The sampling function should look like: --- function(pid, write_header) --- If write_header is false it should not write a new header. -function start_sampling(sample_fn) - local header_written = false - local followers = {} - local function find_followers() - local ret = {} - local mypid = S.getpid() - for _, name in ipairs(shm.children("/")) do - local pid = tonumber(name) - if pid ~= nil and shm.exists("/"..pid.."/group") then - local path = S.readlink(shm.root.."/"..pid.."/group") - local parent = tonumber(lib.basename(lib.dirname(path))) - if parent == mypid then - ret[pid] = true - end - end - end - return ret - end - - local function sample_for_new_followers() - local new_followers = find_followers() - for pid, _ in pairs(new_followers) do - if followers[pid] == nil then - if not pcall(sample_fn, pid, (not header_written)) then - new_followers[pid] = nil - io.stderr:write("Waiting on follower "..pid.. - " to start ".."before recording statistics...\n") - else - header_written = true - end - end - end - followers = new_followers - end - timer.activate(timer.new('start_sampling', sample_for_new_followers, - 1e9, 'repeating')) -end - -- Produces configuration for each worker. Each queue on each device -- will get its own worker process. local function compute_worker_configs(conf) @@ -707,7 +589,7 @@ local function compute_worker_configs(conf) return ret end -function reconfigurable(scheduling, f, graph, conf) +function ptree_manager(scheduling, f, conf) -- Always enabled in reconfigurable mode. alarm_notification = true @@ -721,12 +603,12 @@ function reconfigurable(scheduling, f, graph, conf) return worker_app_graphs end - local worker_code = "require('program.lwaftr.setup').run_worker(%s)" - worker_code = worker_code:format(stringify(scheduling)) - - config.app(graph, 'leader', leader.Leader, - { setup_fn = setup_fn, initial_configuration = conf, - worker_start_code = worker_code, - schema_name = 'snabb-softwire-v2', - default_schema = 'ietf-softwire-br'}) + return manager.new_manager { + setup_fn = setup_fn, + initial_configuration = conf, + schema_name = 'snabb-softwire-v2', + default_schema = 'ietf-softwire-br', + worker_default_scheduling = scheduling, + -- log_level="DEBUG" + } end diff --git a/src/program/ps/ps.lua b/src/program/ps/ps.lua index 5760bb6355..975b64f4a5 100644 --- a/src/program/ps/ps.lua +++ b/src/program/ps/ps.lua @@ -47,7 +47,7 @@ local function is_addressable (pid) return false end -function get_leader_pid (pid) +function get_manager_pid (pid) local fq = shm.root.."/"..pid.."/group" local path = S.readlink(fq) return basename(dirname(path)) @@ -65,7 +65,7 @@ local function compute_snabb_instances() if p and p ~= my_pid then local instance = {pid=p, name=name} if is_worker(p) then - instance.leader = get_leader_pid(p) + instance.leader = get_manager_pid(p) end if is_addressable(p) then instance.addressable = true diff --git a/src/program/ptree/README b/src/program/ptree/README new file mode 100644 index 0000000000..38e165374d --- /dev/null +++ b/src/program/ptree/README @@ -0,0 +1,42 @@ +Usage: ptree --help + ptree [OPTION...] SCHEMA.YANG SETUP.LUA CONF + +Run a multi-process network function. The dataplane is constructed by +the the "setup function" defined in SETUP.LUA. This function takes +configuration data conforming to a specified YANG schema as an argument, +and returns table mapping worker ID to the corresponding graph of apps +and links (a "core.config" object). + +This "ptree" program is so named because it uses the "lib.ptree" process +tree facility from Snabb. It's useful for prototyping network functions +and can be a good starting point for a new network function -- just copy +its code and modify to suit. + +The management process in a "ptree" network function listens on a socket +for configuration queries and updates. Users can change the network +function's configuration while it is running using the "snabb config" +program. The management process will use the setup function to compute +new app graphs for the workers and then apply any needed changes to the +workers. + +Optional arguments: + -n NAME, --name NAME Sets the name as the identifier of this program. + This must be unique amongst other snab programs. + --cpu Run data-plane processes on the given CPUs. + --real-time Enable real-time SCHED_FIFO scheduler on + data-plane processes. + --on-ingress-drop=ACTION Specify an action to take in a data-plane if + too many ingress drops are detected. Available + actions: "warn" to print a warning, "flush" + to flush the JIT, or "off" to do nothing. + Default is "flush". + -D Duration in seconds. + -v Verbose (repeat for more verbosity). + +Optional arguments for debugging and profiling: + -jv, -jv=FILE Print out when traces are recorded + -jp, -jp=MODE,FILE Profile the system by method + -jtprof Profile the system by trace + +CPUSET is a list of CPU ranges. For example "3-5,7-9", or "3,4,5,7,8,9" +both allow the data planes to run on the given CPUs. diff --git a/src/program/ptree/README.inc b/src/program/ptree/README.inc new file mode 120000 index 0000000000..100b93820a --- /dev/null +++ b/src/program/ptree/README.inc @@ -0,0 +1 @@ +README \ No newline at end of file diff --git a/src/program/ptree/README.md b/src/program/ptree/README.md new file mode 100644 index 0000000000..dc9e77703b --- /dev/null +++ b/src/program/ptree/README.md @@ -0,0 +1,362 @@ +### Ptree (program.ptree) + +Example Snabb program for prototyping multi-process YANG-based network +functions. + +#### Overview + +The [`lib.ptree`](../../lib/ptree/README.md) facility in Snabb allows +network engineers to build a network function out of a tree of processes +described by a [YANG schema](../../lib/yang/README.md). The root +process runs the management plane, and the leaf processes (the +"workers") run the data plane. The apps and links in the workers are +declaratively created as a function of a YANG configuration. + +This `snabb ptree` program is a tool to allow quick prototyping of +network functions using the ptree facilities. The invocation syntax of +`snabb ptree` is as follows: + +``` +snabb ptree [OPTION...] SCHEMA.YANG SETUP.LUA CONF +``` + +The *schema.yang* file contains a YANG schema describing the network +function's configuration. *setup.lua* defines a Lua function mapping a +configuration to apps and links for a set of worker processes. *conf* +is the initial configuration of the network function. + +#### Example: Simple packet filter + +Let's say we're going to make a packet filter application. We can use +Snabb's built-in support for filters expressed in pflang, the language +used by `tcpdump`, and just hook that filter up to a full-duplex NIC. + +To begin with, we have to think about how to represent the configuration +of the network function. If we simply want to be able to specify the +PCI device of a NIC, an RSS queue, and a filter string, we could +describe it with a YANG schema like this: + +```yang +module snabb-pf-v1 { + namespace snabb:pf-v1; + prefix pf-v1; + + leaf device { type string; mandatory true; } + leaf rss-queue { type uint8; default 0; } + leaf filter { type string; default ""; } +} +``` + +We throw this into a file `pf-v1.yang`. In YANG, a `module`'s +body contains configuration declarations, most importantly `leaf`, +`container`, and `list`. In our `snabb-pf-v1` schema, there is a +`module` containing three `leaf`s: `device`, `rss-queue`, and `filter`. +Snabb effectively generates a validating parser for configurations +following this YANG schema; a configuration file must contain exactly +one `device FOO;` declaration and may contain one `rss-queue` statement +and one `filter` statement. Thus a concrete configuration following +this YANG schema might look like this: + +``` +device 83:00.0; +rss-queue 0; +filter "tcp port 80"; +``` + +So let's just drop that into a file `pf-v1.cfg` and use that as our +initial configuration. + +Now we just need to map from this configuration to app graphs in some +set of workers. The *setup.lua* file should define this function. + +``` +-- Function taking a snabb-pf-v1 configuration and +-- returning a table mapping worker ID to app graph. +return function (conf) + -- Write me :) +end +``` + +The `conf` parameter to the setup function is a Lua representation of +config data for this network function. In our case it will be a table +containing the keys `device`, `rss_queue`, and `filter`. (Note that +Snabb's YANG support maps dashes to underscores for the Lua data, so it +really is `rss_queue` and not `rss-queue`.) + +The return value of the setup function is a table whose keys are "worker +IDs", and whose values are the corresponding app graphs. A worker ID +can be any Lua value, for example a number or a string or whatever. If +the user later reconfigures the network function (perhaps setting a +different filter string), the manager will re-run the setup function to +produce a new set of worker IDs and app graphs. The manager will then +stop workers whose ID is no longer present, start new workers, and +reconfigure workers whose ID is still present. + +In our case we're just going to have one worker, so we can use any +worker ID. If the user reconfigures the filter but keeps the same +device and RSS queue, we don't want to interrupt packet flow, so we want +to use a worker ID that won't change. But if the user changes the +device, probably we do want to restart the worker, so maybe we make the +worker ID a function of the device name. + +With all of these considerations, we are ready to actually write the +setup function. + +```lua +local app_graph = require('core.config') +local pci = require('lib.hardware.pci') +local pcap_filter = require('apps.packet_filter.pcap_filter') + +-- Function taking a snabb-pf-v1 configuration and +-- returning a table mapping worker ID to app graph. +return function (conf) + -- Load NIC driver for PCI address. + local device_info = pci.device_info(conf.device) + local driver = require(device_info.driver).driver + + -- Make a new app graph for this configuration. + local graph = app_graph.new() + app_graph.app(graph, "nic", driver, + {pciaddr=conf.device, rxq=conf.rss_queue, + txq=conf.rss_queue}) + app_graph.app(graph, "filter", pcap_filter.PcapFilter, + {filter=conf.filter}) + app_graph.link(graph, "nic."..device_info.tx.." -> filter.input") + app_graph.link(graph, "filter.output -> nic."..device_info.rx) + + -- Use DEVICE/QUEUE as the worker ID. + local id = conf.device..'/'..conf.rss_queue + + -- One worker with the given ID and the given app graph. + return {[id]=graph} +end +``` + +Put this in, say, `pf-v1.lua`, and we're good to go. The network +function can be run like this: + +``` +$ snabb ptree --name my-filter pf-v1.yang pf-v1.lua pf-v1.cfg +``` + +See [`snabb ptree --help`](./README) for full details on arguments like +`--name`. + +#### Tuning + +The `snabb ptree` program also takes a number of options that apply to +the data-plane processes. + +— **--cpu** *cpus* + +Allocate *cpus* to the data-plane processes. The manager of the process +tree will allocate CPUs from this set to data-plane workers. For +example, For example, `--cpu 3-5,7-9` assigns CPUs 3, 4, 5, 7, 8, and 9 +to the network function. The manager will try to allocate a CPU for a +worker that is NUMA-local to the PCI devices used by the worker. + +— **--real-time** + +Use the `SCHED_FIFO` real-time scheduler for the data-plane processes. + +— **--on-ingress-drop** *action* + +If a data-plane process detects too many dropped packets (by default, +100K packets over 30 seconds), perform *action*. Available *action*s +are `flush`, which tells Snabb to re-optimize the code; `warn`, which +simply prints a warning and raises an alarm; and `off`, which does +nothing. + +— **-j** *arg* + +Enable profiling in the data-plane. Useful when you are trying to +isolate a performance problem. It is thought that this will be reworked +when Snabb switches to RaptorJIT, so we leave this somewhat complicated +option undocumented at present :) + +#### Reconfiguration + +The manager of a ptree-based Snabb network function also listens to +configuration queries and updates on a local socket. The user-facing +side of this interface is [`snabb config`](../config/README.md). A +`snabb config` user can address a local ptree network function by PID, +but it's easier to do so by name, so the above example passed `--name +my-filter` to the `snabb ptree` invocation. + +For example, we can get the configuration of a running network function +with `snabb config get`: + +``` +$ snabb config get my-filter / +device 83:00.0; +rss-queue 0; +filter "tcp port 80"; +``` + +You can also update the configuration. For example, to move this +network function over to device `82:00.0`, do: + +``` +$ snabb config set my-filter /device 82:00.0 +$ snabb config get my-filter / +device 82:00.0; +rss-queue 0; +filter "tcp port 80"; +``` + +The ptree manager takes the necessary actions to update the dataplane to +match the specified configuration. + +#### Multi-process + +Let's say your clients are really loving this network function, so much +so that they are running an instance on each network card on your +server. Whenever the filter string updates though they are getting +tired of having to `snabb config set` all of the different processes. +Well you can make them even happier by refactoring the network function +to be multi-process. + +```yang +module snabb-pf-v2 { + namespace snabb:pf-v2; + prefix pf-v2; + + /* Default filter string. */ + leaf filter { type string; default ""; } + + list worker { + key "device rss-queue"; + leaf device { type string; } + leaf rss-queue { type uint8; } + /* Optional worker-specific filter string. */ + leaf filter { type string; } + } +} +``` + +Here we declare a new YANG model that instead of having one device and +RSS queue, it has a whole list of them. The `key "device rss-queue"` +declaration says that the combination of device and RSS queue should be +unique -- you can't have two different workers on the same device+queue +pair, logically. We declare a default `filter` at the top level, and +also allow each worker to override with their own filter declaration. + +A configuration might look like this: + +``` +filter "tcp port 80"; +worker { + device 83:00.0; + rss-queue 0; +} +worker { + device 83:00.0; + rss-queue 1; +} +worker { + device 83:00.1; + rss-queue 0; + filter "tcp port 443"; +} +worker { + device 83:00.1; + rss-queue 1; + filter "tcp port 443"; +} +``` + +Finally, we need a new setup function as well: + +```lua +local app_graph = require('core.config') +local pci = require('lib.hardware.pci') +local pcap_filter = require('apps.packet_filter.pcap_filter') + +-- Function taking a snabb-pf-v2 configuration and +-- returning a table mapping worker ID to app graph. +return function (conf) + local workers = {} + for k, v in pairs(conf.worker) do + -- Load NIC driver for PCI address. + local device_info = pci.device_info(k.device) + local driver = require(device_info.driver).driver + + -- Make a new app graph for this worker. + local graph = app_graph.new() + app_graph.app(graph, "nic", driver, + {pciaddr=k.device, rxq=k.rss_queue, + txq=k.rss_queue}) + app_graph.app(graph, "filter", pcap_filter.PcapFilter, + {filter=v.filter or conf.filter}) + app_graph.link(graph, "nic."..device_info.tx.." -> filter.input") + app_graph.link(graph, "filter.output -> nic."..device_info.rx) + + -- Use DEVICE/QUEUE as the worker ID. + local id = k.device..'/'..k.rss_queue + + -- Add worker with the given ID and the given app graph. + workers[id] = graph + end + return workers +end +``` + +If we place these into analogously named files, we have a multiprocess +network function: + +``` +$ snabb ptree --name my-filter pf-v2.yang pf-v2.lua pf-v2.cfg +``` + +If you change the root filter string via `snabb config`, it propagates +to all workers, except those that have their own overrides of course: + +``` +$ snabb config set my-filter /filter "'tcp port 666'" +$ snabb config get my-filter /filter +"tcp port 666" +``` + +The syntax to get at a particular worker is a little gnarly; it's based +on XPath, for compatibility with existing NETCONF NCS systems. See [the +`snabb config` documentation](../config/README.md) for full details. + +``` +$ snabb config get my-filter '/worker[device=83:00.1][rss-queue=1]' +filter "tcp port 443"; +``` + +You can stop a worker with `snabb config remove`: + +``` +$ snabb config remove my-filter '/worker[device=83:00.1][rss-queue=1]' +$ snabb config get my-filter / +filter "tcp port 666"; +worker { + device 83:00.0; + rss-queue 0; +} +worker { + device 83:00.0; + rss-queue 1; +} +worker { + device 83:00.1; + rss-queue 0; + filter "tcp port 443"; +} +``` + +Start up a new one with `snabb config add`: + +``` +$ snabb config add my-filter /worker <= 0, "duration can't be negative") + end + function handlers.cpu (arg) + opts.cpuset:add_from_string(arg) + end + handlers['real-time'] = function (arg) + scheduling.real_time = true + end + handlers["on-ingress-drop"] = function (arg) + if arg == 'flush' or arg == 'warn' then + scheduling.ingress_drop_monitor = arg + elseif arg == 'off' then + scheduling.ingress_drop_monitor = false + else + fatal("invalid --on-ingress-drop argument: %s (valid values: %s)", + arg, "flush, warn, off") + end + end + function handlers.j (arg) scheduling.j = arg end + function handlers.h () show_usage(0) end + + args = lib.dogetopt(args, handlers, "vD:hn:j:", + { verbose = "v", duration = "D", help = "h", cpu = 1, + ["real-time"] = 0, ["on-ingress-drop"] = 1, + name="n" }) + + if #args ~= 3 then show_usage(1) end + + return opts, scheduling, unpack(args) +end + +function run (args) + local opts, scheduling, schema_file, setup_file, conf_file = parse_args(args) + local schema_name = yang.add_schema_file(schema_file) + local setup_thunk = loadfile(setup_file) + local conf = yang.load_configuration(conf_file, {schema_name=schema_name}) + + local setup_fn = setup_thunk() + if not type(setup_fn) then + fatal("Expected %s to evaluate to a function, instead got %s", + setup_file, tostring(setup_fn)) + end + + local manager = ptree.new_manager { + name = opts.name, + setup_fn = setup_fn, + cpuset = opts.cpuset, + initial_configuration = conf, + schema_name = schema_name, + worker_default_scheduling = scheduling, + log_level = ({"WARN","INFO","DEBUG"})[opts.verbosity or 1] or "DEBUG", + } + + manager:main(opts.duration) + + manager:stop() +end