diff --git a/src/program/ipfix/lib.lua b/src/program/ipfix/lib.lua index 73da79f9b7..a0b4be40c3 100644 --- a/src/program/ipfix/lib.lua +++ b/src/program/ipfix/lib.lua @@ -16,7 +16,10 @@ local basic = require("apps.basic.basic_apps") local arp = require("apps.ipv4.arp") local ipfix = require("apps.ipfix.ipfix") local template = require("apps.ipfix.template") +local rss = require("apps.rss.rss") local ifmib = require("lib.ipc.shmem.iftable_mib") +local Transmitter = require("apps.interlink.transmitter") + local ifmib_dir = '/ifmib' @@ -238,26 +241,70 @@ function configure_graph (arg, in_graph) return config, graph end +function parse_jit_option_fn (jit) + return function (arg) + if arg:match("^v") then + local file = arg:match("^v=(.*)") + if file == '' then file = nil end + jit.v = file + elseif arg:match("^p") then + local opts, file = arg:match("^p=([^,]*),?(.*)") + if file == '' then file = nil end + jit.p = { opts, file } + elseif arg:match("^dump") then + local opts, file = arg:match("^dump=([^,]*),?(.*)") + if file == '' then file = nil end + jit.dump = { opts, file } + elseif arg:match("^opt") then + local opt = arg:match("^opt=(.*)") + table.insert(jit.opts, opt) + elseif arg:match("^tprof") then + jit.traceprof = true + end + end +end + +local function set_jit_options (jit) + if not jit then return end + if jit.v then + require("jit.v").start(jit.v) + end + if jit.p and #jit.p > 0 then + require("jit.p").start(unpack(jit.p)) + end + if jit.traceprof then + require("lib.traceprof.traceprof").start() + end + if jit.dump and #jit.dump > 0 then + require("jit.dump").on(unpack(jit.dump)) + end + if jit.opts and #jit.opts > 0 then + require("jit.opt").start(unpack(jit.opts)) + end +end + +local function clear_jit_options (jit) + if not jit then return end + if jit.dump then + require("jit.dump").off() + end + if jit.traceprof then + require("lib.traceprof.traceprof").stop() + end + if jit.p then + require("jit.p").stop() + end + if jit.v then + require("jit.v").stop() + end +end + +-- Run an instance of the ipfix probe function run (arg, duration, busywait, cpu, jit) local config = configure_graph(arg) - if jit then - if jit.v then - require("jit.v").start(jit.v) - end - if jit.p and #jit.p > 0 then - require("jit.p").start(unpack(jit.p)) - end - if jit.traceprof then - require("lib.traceprof.traceprof").start() - end - if jit.dump and #jit.dump > 0 then - require("jit.dump").on(unpack(jit.dump)) - end - if jit.opts and #jit.opts > 0 then - require("jit.opt").start(unpack(jit.opts)) - end - end + if cpu then numa.bind_to_cpu(cpu) end + set_jit_options(jit) local done if not duration and config.input_type == "pcap" then @@ -267,28 +314,16 @@ function run (arg, duration, busywait, cpu, jit) end local t1 = now() - if cpu then numa.bind_to_cpu(cpu) end - engine.busywait = busywait + if busywait ~= nil then + engine.busywait = busywait + end engine.main({ duration = duration, done = done, measure_latency = false }) - if jit then - if jit.dump then - require("jit.dump").off() - end - if jit.traceprof then - require("lib.traceprof.traceprof").stop() - end - if jit.p then - require("jit.p").stop() - end - if jit.v then - require("jit.v").stop() - end - end + clear_jit_options(jit) local t2 = now() - local stats = link.stats(engine.app_table.ipfix.input.input) + local stats = link.stats(engine.app_table['ipfix'..config.instance].input.input) print("IPFIX probe stats:") local comma = lib.comma_value print(string.format("bytes: %s packets: %s bps: %s Mpps: %s", @@ -298,3 +333,74 @@ function run (arg, duration, busywait, cpu, jit) comma(stats.rxpackets / ((t2 - t1) * 1000000)))) end + +-- Run an instance of the RSS app. The output links can either be +-- interlinks or regular links with an instance of an ipfix probe +-- attached. +function run_rss(config, inputs, outputs, duration, busywait, cpu, jit) + if cpu then numa.bind_to_cpu(cpu) end + set_jit_options(jit) + + local graph = app_graph.new() + app_graph.app(graph, "rss", rss.rss, config) + + -- An input describes a physical interface + local tags, in_app_specs = {}, {} + for n, input in ipairs(inputs) do + local suffix = #inputs > 1 and n or '' + local input_name = "input"..suffix + local in_link, in_app = in_apps.pci(input.device.."/"..input.rxq) + input.config.pciaddr = input.device + table.insert(in_app_specs, + { pciaddr = input.device, + name = input_name, + ifname = input.name or + (input.device:gsub("[:%.]", "_")), + ifalias = input.description }) + app_graph.app(graph, input_name, unpack(in_app)) + local link_name = "input"..suffix + if input.tag then + local tag = input.tag + assert(not(tags[tag]), "Tag not unique: "..tag) + link_name = "vlan"..tag + end + app_graph.link(graph, input_name.."."..in_link.output + .." -> rss."..link_name) + end + + -- An output describes either an interlink or a complete ipfix app + for _, output in ipairs(outputs) do + if output.type == 'interlink' then + -- Keys + -- link_name name of the link + app_graph.app(graph, output.link_name, Transmitter) + app_graph.link(graph, "rss."..output.link_name.." -> " + ..output.link_name..".input") + else + -- Keys + -- link_name name of the link + -- args probe configuration + -- instance # of embedded instance + output.args.instance = output.instance + local config = configure_graph(output.args, graph) + app_graph.link(graph, "rss."..output.link_name + .." -> ipfix"..output.instance..".input") + end + end + + engine.configure(graph) + for _, spec in ipairs(in_app_specs) do + create_ifmib(engine.app_table[spec.name].stats, + spec.ifname, spec.ifalias) + end + require("jit").flush() + + local engine_opts = { no_report = true, measure_latency = false } + if duration ~= 0 then engine_opts.duration = duration end + if busywait ~= nil then + engine.busywait = busywait + end + engine.main(engine_opts) + + clear_jit_options(jit) +end diff --git a/src/program/ipfix/probe/probe.lua b/src/program/ipfix/probe/probe.lua index 5a256b77f7..6dfacc8954 100644 --- a/src/program/ipfix/probe/probe.lua +++ b/src/program/ipfix/probe/probe.lua @@ -148,26 +148,7 @@ function run (args) ["cpu"] = function (arg) cpu = tonumber(arg) end, - j = function (arg) - if arg:match("^v") then - local file = arg:match("^v=(.*)") - if file == '' then file = nil end - jit.v = file - elseif arg:match("^p") then - local opts, file = arg:match("^p=([^,]*),?(.*)") - if file == '' then file = nil end - jit.p = { opts, file } - elseif arg:match("^dump") then - local opts, file = arg:match("^dump=([^,]*),?(.*)") - if file == '' then file = nil end - jit.dump = { opts, file } - elseif arg:match("^opt") then - local opt = arg:match("^opt=(.*)") - table.insert(jit.opts, opt) - elseif arg:match("^tprof") then - jit.traceprof = true - end - end + j = require("program.ipfix.lib").parse_jit_option_fn(jit) } args = lib.dogetopt(args, opt, "hD:i:o:p:m:a:c:j:b", long_opts) diff --git a/src/program/ipfix/probe_rss/probe_rss.lua b/src/program/ipfix/probe_rss/probe_rss.lua index 5bcbcb7ded..5ee726df11 100644 --- a/src/program/ipfix/probe_rss/probe_rss.lua +++ b/src/program/ipfix/probe_rss/probe_rss.lua @@ -1,106 +1,16 @@ module(...,package.seeall) +local S = require("syscall") local lib = require("core.lib") local app_graph = require("core.config") local worker = require("core.worker") +local shm = require("core.shm") local pci = require("lib.hardware.pci") local probe = require("program.ipfix.lib") -local Transmitter = require("apps.interlink.transmitter") - -local long_opts = { - duration = "D", - logfile = "l", - debug = "d", - jit = "j", - help = "h", - ["busy-wait"] = "b" -} - -function run (parameters) - local duration = 0 - local busywait = false - local profiling, traceprofiling - local jit_opts = {} - local opt = { - D = function (arg) - if arg:match("^[0-9]+$") then - duration = tonumber(arg) - else - usage() - end - end, - l = function (arg) - local logfh = assert(io.open(arg, "a")) - lib.logger_default.fh = logfh - end, - h = function (arg) usage() end, - d = function (arg) _G.developer_debug = true end, - b = function (arg) - busywait = true - end, - j = function (arg) - if arg:match("^v") then - local file = arg:match("^v=(.*)") - if file == '' then file = nil end - require("jit.v").start(file) - elseif arg:match("^p") then - local opts, file = arg:match("^p=([^,]*),?(.*)") - if file == '' then file = nil end - require("jit.p").start(opts, file) - profiling = true - elseif arg:match("^dump") then - local opts, file = arg:match("^dump=([^,]*),?(.*)") - if file == '' then file = nil end - require("jit.dump").on(opts, file) - elseif arg:match("^opt") then - local opt = arg:match("^opt=(.*)") - table.insert(jit_opts, opt) - elseif arg:match("^tprof") then - require("lib.traceprof.traceprof").start() - traceprofiling = true - end - end - } - - -- Parse command line arguments - parameters = lib.dogetopt(parameters, opt, "hdj:D:l:b", long_opts) - - -- Defaults: sizemcode=32, maxmcode=512 - require("jit.opt").start('sizemcode=256', 'maxmcode=2048') - if #jit_opts then - require("jit.opt").start(unpack(jit_opts)) - end - if #parameters ~= 1 then usage () end - - local file = table.remove(parameters, 1) - - local engine_opts = { no_report = true, measure_latency = false } - if duration ~= 0 then engine_opts.duration = duration end - - local probe_config = assert(loadfile(file))() - local graph, in_app_specs = create_app_graph(probe_config, busywait) - engine.configure(graph) - - for _, spec in ipairs(in_app_specs) do - probe.create_ifmib(engine.app_table[spec.name].stats, - spec.ifname, spec.ifalias) - end - - jit.flush() - engine.busywait = busywait - engine.main(engine_opts) - - if profiling then - require("jit.p").stop() - end - if traceprofiling then - require("lib.traceprof.traceprof").stop() - end - -end local main_config = { interfaces = { required = true }, + hw_rss_scaling = { default = 1 }, rss = { required = true }, ipfix = { required = true } } @@ -114,6 +24,7 @@ local interface_config = { local ipfix_config = { default = { default = {} }, maps = { default = {} }, + observation_domain_base = { default = 256 }, exporters = { required = true } } local maps_config = { @@ -131,10 +42,9 @@ local exporter_config = { instances = { default = {} } } local instance_config = { - observation_domain = { required = true }, embed = { default = true }, weight = { default = 1 }, - jit = { default = {} }, + jit = { default = nil }, busywait = { default = nil } } local jit_config = { @@ -182,47 +92,14 @@ local function value_to_string (value, string) return string end -function create_app_graph (probe_config, busywait) +local function create_workers (probe_config, duration, busywait, jit, logger) local main = lib.parse(probe_config, main_config) - assert(type(main.interfaces) == "table") - - local graph = app_graph.new() - - app_graph.app(graph, "rss", require("apps.rss.rss").rss, - main.rss) - - local tags = {} - local in_app_specs = {} - for i, interface in ipairs(main.interfaces) do - local interface = lib.parse(interface, interface_config) - local suffix = #main.interfaces > 1 and i or '' - local input_name = "input"..suffix - local device_info = pci.device_info(interface.device) - interface.config.pciaddr = interface.device - table.insert(in_app_specs, - { pciaddr = interface.device, - name = input_name, - ifname = interface.name or - (interface.device:gsub("[:%.]", "_")), - ifalias = interface.description }) - app_graph.app(graph, input_name, - require(device_info.driver).driver, - interface.config) - local link_name = "input"..suffix - if interface.tag then - local tag = interface.tag - assert(not(tags[tag]), "Tag not unique: "..tag) - link_name = "vlan"..tag - end - app_graph.link(graph, input_name.."."..device_info.tx - .." -> rss."..link_name) - end - local maps = lib.parse(main.ipfix.maps, maps_config) + local ipfix = lib.parse(main.ipfix, ipfix_config) + local ipfix_default = lib.parse(ipfix.default, ipfix_default_config) - local ipfix_config = lib.parse(main.ipfix.default, ipfix_default_config) local function merge_with_default (config) - local merged = lib.deepcopy(ipfix_config) + local merged = lib.deepcopy(ipfix_default) for _, key in ipairs({ "collector_ip", "collector_port", "templates" }) do if config[key] then merged[key] = config[key] @@ -230,6 +107,7 @@ function create_app_graph (probe_config, busywait) end return merged end + local classes = {} local function rss_link_name (class, weight) if not classes[class] then @@ -240,58 +118,161 @@ function create_app_graph (probe_config, busywait) return class.."_"..instance..(weight > 1 and "_"..weight or '') end - local embedded_instance = 1 - local observation_domains = {} - for _, exporter in ipairs(main.ipfix.exporters) do - local exporter = lib.parse(exporter, exporter_config) - local config = merge_with_default(exporter) - config.output_type = "tap_routed" - config.instance = nil - config.add_packet_metadata = false - if exporter.use_maps then - config.maps = maps + assert(type(main.interfaces) == "table") + + local observation_domain = ipfix.observation_domain_base + for rssq = 0, main.hw_rss_scaling - 1 do + local inputs, outputs = {}, {} + for i, interface in ipairs(main.interfaces) do + local input = lib.parse(interface, interface_config) + input.rxq = rssq + table.insert(inputs, input) end - for i, instance in ipairs(exporter.instances) do - local instance = lib.parse(instance, instance_config) - local rss_link = rss_link_name(exporter.rss_class, instance.weight) - local od = instance.observation_domain - assert(not observation_domains[od], - "Observation domain not unique: "..od) - observation_domains[od] = true - config.observation_domain = od - config.output = "ipfixexport"..od - if exporter.maps_log_dir then - config.maps_logfile = - exporter.maps_log_dir.."/"..od..".log" + + local embedded_instance = 1 + for _, exporter in ipairs(main.ipfix.exporters) do + local exporter = lib.parse(exporter, exporter_config) + local config = merge_with_default(exporter) + config.output_type = "tap_routed" + config.instance = nil + config.add_packet_metadata = false + if exporter.use_maps then + config.maps = maps end - if instance.embed then - config.instance = embedded_instance - probe.configure_graph(config, graph) - app_graph.link(graph, "rss."..rss_link - .." -> ipfix"..embedded_instance..".input") - embedded_instance = embedded_instance + 1 - else - config.input_type = "interlink" - config.input = rss_link - local jit = lib.parse(instance.jit, jit_config) - if instance.busywait == nil then - instance.busywait = busywait + + for i, instance in ipairs(exporter.instances) do + -- Create a clone of the configuration for parameters + -- specific to the instance + local iconfig = lib.deepcopy(config) + local instance = lib.parse(instance, instance_config) + local rss_link = rss_link_name(exporter.rss_class, instance.weight) + local od = observation_domain + observation_domain = observation_domain + 1 + iconfig.observation_domain = od + iconfig.output = "ipfixexport"..od + if exporter.maps_log_dir then + iconfig.maps_logfile = + exporter.maps_log_dir.."/"..od..".log" end - local worker_expr = string.format( - 'require("program.ipfix.lib").run(%s, nil, %s, nil, %s)', - value_to_string(config), tostring(instance.busywait), - value_to_string(jit) - ) - local child_pid = worker.start(rss_link, worker_expr) - print("Launched worker process #"..child_pid) - -- The stats program uses the PID encoded in the name of - -- the transmitter app to find the receiving IPFIX - -- process. - app_graph.app(graph, rss_link, Transmitter) - app_graph.link(graph, "rss."..rss_link.." -> "..rss_link..".input") + + local output + if instance.embed then + output = { + link_name = rss_link, + args = iconfig, + instance = embedded_instance + } + embedded_instance = embedded_instance + 1 + else + output = { type = "interlink", link_name = rss_link } + iconfig.input_type = "interlink" + iconfig.input = rss_link + + -- Override jit options and dump file + local jit = lib.deepcopy(jit) + if instance.jit then + if instance.jit.opts then + jit.opts = instance.jit.opts + end + if instance.jit.dump then + local dump = lib.deepcopy(instance.jit.dump) + local file = dump[2] + if file then + file = file.."_"..od + dump[2] = file + end + jit.dump = dump + end + end + + local worker_expr = string.format( + 'require("program.ipfix.lib").run(%s, %s, %s, nil, %s)', + value_to_string(iconfig), tostring(duration), + tostring(instance.busywait), value_to_string(jit) + ) + local child_pid = worker.start(rss_link, worker_expr) + logger:log("Launched IPFIX worker process #"..child_pid) + shm.create("ipfix_workers/"..child_pid, "uint64_t") + end + table.insert(outputs, output) end end + + local jit_c = lib.deepcopy(jit) + if jit_c.dump then + if jit_c.dump[2] then + jit_c.dump[2] = jit_c.dump[2].."_"..rssq + end + end + local worker_expr = string.format( + 'require("program.ipfix.lib").run_rss(%s, %s, %s, %s, %s, nil, %s)', + value_to_string(main.rss), value_to_string(inputs), + value_to_string(outputs), tostring(duration), + tostring(busywait), value_to_string(jit_c) + ) + local child_pid = worker.start("rss"..rssq, worker_expr) + logger:log("Launched RSS worker process #"..child_pid) + shm.create("rss_workers/"..child_pid, "uint64_t") + end +end + +local long_opts = { + duration = "D", + logfile = "l", + debug = "d", + jit = "j", + help = "h", + ["busy-wait"] = "b" +} - return graph, in_app_specs +function run (parameters) + local duration + local busywait = false + local profiling, traceprofiling + local jit = { opts = {} } + local log_pid = string.format("[%5d]", S.getpid()) + local logger = lib.logger_new({ module = log_pid.." RSS master" }) + local opt = { + D = function (arg) + if arg:match("^[0-9]+$") then + duration = tonumber(arg) + else + usage() + end + end, + l = function (arg) + local logfh = assert(io.open(arg, "a")) + lib.logger_default.fh = logfh + end, + h = function (arg) usage() end, + d = function (arg) _G.developer_debug = true end, + b = function (arg) + busywait = true + end, + j = probe.parse_jit_option_fn(jit) + } + + -- Parse command line arguments + parameters = lib.dogetopt(parameters, opt, "hdj:D:l:b", long_opts) + if #parameters ~= 1 then usage () end + + local file = table.remove(parameters, 1) + local probe_config = assert(loadfile(file))() + create_workers(probe_config, duration, busywait, jit, logger) + + if duration then + S.sleep(duration) + logger:log("waiting for workers to finish") + local alive + repeat + alive = false + for _, s in pairs(worker.status()) do + if s.alive then alive = true end + end + until not alive + logger:log("done") + else + S.pause() + end end