Skip to content

Commit

Permalink
Simplify alarm notifications listener
Browse files Browse the repository at this point in the history
Instead of having to RPC and then turn the socket into a listening
state, we should just have a separate socket for notifications.  It's
stateless anyway.  No need for the special length-prefixed message
protocol either -- we can just write JSON and trust fibers to allow
concurrency.  The listen program likewise simplifies to a glorified
socat.
  • Loading branch information
wingo committed May 18, 2018
1 parent 0d26daf commit 855aae7
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 108 deletions.
7 changes: 6 additions & 1 deletion src/lib/ptree/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,17 @@ is a table of key/value pairs. The following keys are required:

Optional entries that may be present in the *parameters* table include:

* `socket_file_name`: The name of the socket on which to listen for
* `rpc_socket_file_name`: The name of the socket on which to listen for
incoming connections from `snabb config` clients. See [the `snabb
config` documentation](../../program/config/README.md) for more
information. Default is `$SNABB_SHM_ROOT/PID/config-leader-socket`,
where the `$SNABB_SHM_ROOT` environment variable defaults to
`/var/run/snabb`.
* `notification_socket_file_name`: The name of the socket on which to
listen for incoming connections from `snabb alarms` clients. See
[the `snabb alarms` documentation](../../program/alarms/README.md)
for more information. Default is
`$SNABB_SHM_ROOT/PID/notifications`.
* `name`: A name to claim for this process tree. `snabb config` can
address network functions by name in addition to PID. If the name is
already claimed on the local machine, an error will be signalled.
Expand Down
104 changes: 48 additions & 56 deletions src/lib/ptree/ptree.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ local support = require("lib.ptree.support")
local channel = require("lib.ptree.channel")
local trace = require("lib.ptree.trace")
local alarms = require("lib.yang.alarms")
local json_lib = require("lib.ptree.json")
local json = require("lib.ptree.json")
local queue = require('lib.fibers.queue')
local fiber_sleep = require('lib.fibers.sleep').sleep

Expand All @@ -44,7 +44,8 @@ if os.getenv('SNABB_MANAGER_VERBOSE') then default_log_level = "DEBUG" end

local manager_config_spec = {
name = {},
socket_file_name = {default='config-leader-socket'},
rpc_socket_file_name = {default='config-leader-socket'},
notification_socket_file_name = {default='notifications'},
setup_fn = {required=true},
-- Could relax this requirement.
initial_configuration = {required=true},
Expand All @@ -57,14 +58,12 @@ local manager_config_spec = {
Hz = {default=100},
}

local function open_socket (file)
S.signal('pipe', 'ign')
local socket = assert(S.socket("unix", "stream, nonblock"))
S.unlink(file) --unlink to avoid EINVAL on bind()
local sa = S.t.sockaddr_un(file)
assert(socket:bind(sa))
assert(socket:listen())
return socket
local function ensure_absolute(file_name)
if file_name:match('^/') then
return file_name
else
return shm.root..'/'..tostring(S.getpid())..'/'..file_name
end
end

function new_manager (conf)
Expand All @@ -74,11 +73,9 @@ function new_manager (conf)
ret.name = conf.name
ret.log_level = assert(log_levels[conf.log_level])
ret.cpuset = conf.cpuset
ret.socket_file_name = conf.socket_file_name
if not ret.socket_file_name:match('^/') then
local instance_dir = shm.root..'/'..tostring(S.getpid())
ret.socket_file_name = instance_dir..'/'..ret.socket_file_name
end
ret.rpc_socket_file_name = ensure_absolute(conf.rpc_socket_file_name)
ret.notification_socket_file_name = ensure_absolute(
conf.notification_socket_file_name)
ret.schema_name = conf.schema_name
ret.default_schema = conf.default_schema or conf.schema_name
ret.support = support.load_schema_config_support(conf.schema_name)
Expand Down Expand Up @@ -171,14 +168,8 @@ function Manager:start ()
self.cpuset:bind_to_numa_node()
require('lib.fibers.file').install_poll_io_handler()
self.sched = fiber.current_scheduler
local sockname = self.socket_file_name
local sock = socket.listen_unix(sockname)
local parent_close = sock.close
function sock:close()
parent_close(sock)
S.unlink(sockname)
end
fiber.spawn(function () self:accept_peers(sock) end)
fiber.spawn(function () self:accept_rpc_peers() end)
fiber.spawn(function () self:accept_notification_peers() end)
fiber.spawn(function () self:notification_poller() end)
end

Expand All @@ -188,16 +179,28 @@ function Manager:call_with_cleanup(closeable, f, ...)
if not ok then self:warn('%s', tostring(err)) end
end

function Manager:accept_peers (sock)
function Manager:accept_rpc_peers ()
local sock = socket.listen_unix(self.rpc_socket_file_name, {ephemeral=true})
self:call_with_cleanup(sock, function()
while true do
local peer = sock:accept()
fiber.spawn(function() self:handle_peer(peer) end)
fiber.spawn(function() self:handle_rpc_peer(peer) end)
end
end)
end

function Manager:accept_notification_peers ()
local sock = socket.listen_unix(self.notification_socket_file_name,
{ephemeral=true})
fiber.spawn(function()
while true do
local peer = sock:accept()
fiber.spawn(function() self:handle_notification_peer(peer) end)
end
end)
end

function Manager:handle_peer(peer)
function Manager:handle_rpc_peer(peer)
self:call_with_cleanup(peer, function()
while true do
local prefix = peer:read_line('discard')
Expand All @@ -216,6 +219,22 @@ function Manager:handle_peer(peer)
if self.listen_peer == peer then self.listen_peer = nil end
end

function Manager:handle_notification_peer(peer)
local q = queue.new()
self.notification_peers[q] = true
function q:close()
self.notification_peers[q] = nil
peer:close()
end
self:call_with_cleanup(q, function()
while true do
json.write_json_object(peer, q:get())
peer:write_chars("\n")
peer:flush_output()
end
end)
end

function Manager:run_scheduler()
self.sched:run(engine.now())
end
Expand Down Expand Up @@ -550,14 +569,6 @@ function Manager:rpc_attach_listener (args)
if success then return response else return {status=1, error=response} end
end

function Manager:rpc_attach_notification_listener ()
local peer = self.rpc_peer
peer.queue = queue.new()
table.insert(self.notification_peers, peer)
fiber.spawn(function() self:send_notifications_to_peer(peer) end)
return {}
end

function Manager:rpc_get_state (args)
local function getter()
if args.schema ~= self.schema_name then
Expand Down Expand Up @@ -597,41 +608,22 @@ function Manager:handle (peer, payload)
return ret
end

local function tojson (str)
local msg = mem.call_with_output_string(function(output)
json_lib.write_json_object(output, str)
return output:flush()
end)
return tostring(#msg)..'\n'..msg
end


-- Spawn in a fiber.
function Manager:notification_poller ()
while true do
local notifications = alarms.notifications()
if #notifications == 0 then
fiber_sleep(1/50) -- poll at 50 Hz.
else
for _, peer in ipairs(self.notification_peers) do
for _,each in ipairs(notifications) do
peer.queue:put(tojson(each))
for q,_ in pairs(self.notification_peers) do
for _,notification in ipairs(notifications) do
q:put(notification)
end
end
end
end
end

function Manager:send_notifications_to_peer (peer)
self:call_with_cleanup(peer, function()
while true do
local msg = peer.queue:get()
peer:write_chars(msg)
peer:flush_output()
end
end)
end

function Manager:send_messages_to_workers()
for id,worker in pairs(self.workers) do
if not worker.channel then
Expand Down
11 changes: 9 additions & 2 deletions src/lib/stream/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,16 @@ function Socket:connect_unix(file, stype)
return self:connect(sa)
end

function listen_unix(file, stype, protocol)
local s = socket('unix', stype or 'stream', protocol)
function listen_unix(file, args)
local s = socket('unix', args.stype or "stream", args.protocol)
s:listen_unix(file)
if args.ephemeral then
local parent_close = s.close
function s:close()
parent_close(s)
S.unlink(file)
end
end
return s
end

Expand Down
88 changes: 39 additions & 49 deletions src/program/alarms/listen/listen.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,65 @@
module(..., package.seeall)

local S = require("syscall")
local common = require("program.config.common")
local data = require("lib.yang.data")
local fiber = require("lib.fibers.fiber")
local lib = require("core.lib")
local shm = require("core.shm")
local file = require("lib.stream.file")
local mem = require("lib.stream.mem")
local path_lib = require("lib.yang.path")
local rpc = require("lib.yang.rpc")
local socket = require("lib.stream.socket")
local fiber = require("lib.fibers.fiber")
local json = require("lib.ptree.json")

function show_usage(status, err_msg)
if err_msg then print('error: '..err_msg) end
print(require("program.alarms.listen.README_inc"))
main.exit(status)
end

local function open_socket(file)
S.signal('pipe', 'ign')
local socket = assert(S.socket("unix", "stream"))
S.unlink(file)
local sa = S.t.sockaddr_un(file)
assert(socket:bind(sa))
assert(socket:listen())
return socket
local function parse_command_line(args)
local handlers = {}
function handlers.h() show_usage(0) end
args = lib.dogetopt(args, handlers, "h", {help="h"})
if #args ~= 1 then show_usage(1, msg) end
return unpack(args)
end

local function attach_listener(leader, caller)
local msg, parse_reply = rpc.prepare_call(
caller, 'attach-notification-listener', {})
common.send_message(leader, msg)
return parse_reply(mem.open_input_string(common.recv_message(leader)))
local function connect(instance_id)
local tail = instance_id..'/notifications'
local ok, ret = pcall(socket.connect_unix, shm.root..'/by-name/'..tail)
if not ok then
ok, ret = pcall(socket.connect_unix, shm.root..'/'..tail)
end
if ok then return ret end
error("Could not connect to notifications socket on Snabb instance '"..
instance_id.."'.\n")
end

function run(args)
args = common.parse_command_line(args, { command='listen' })
local caller = rpc.prepare_caller('snabb-config-leader-v1')
local leader = common.open_socket_or_die(args.instance_id)
attach_listener(leader, caller)

local instance_id = parse_command_line(args)
local handler = require('lib.fibers.file').new_poll_io_handler()
file.set_blocking_handler(handler)
fiber.current_scheduler:add_task_source(handler)
-- Leader was blocking in call to attach_listener.
leader:nonblock()
require('lib.stream.compat').install()

-- Check if there is a socket path specified, if so use that as method
-- to communicate, otherwise use stdin and stdout.
local client_tx
if args.socket then
local sockfd = open_socket(args.socket)
local addr = S.t.sockaddr_un()
-- Wait for a connection
print("Listening for clients on socket: "..args.socket)
client_tx = file.fdopen(assert(sockfd:accept(addr)))
else
client_tx = file.fdopen(S.stdout)
local function print_notifications()
local socket = connect(instance_id)
while true do
local obj = json.read_json_object(socket)
if obj == nil then return end
json.write_json_object(io.stdout, obj)
io.stdout:write_chars("\n")
io.stdout:flush_output()
end
end

local function exit_when_finished(f)
return function()
local success, res = pcall(f)
if not success then io.stderr:write('error: '..tostring(res)..'\n') end
os.exit(success and 0 or 1)
end
end
local function print_notification (output, msg)
output:write_chars(msg)
output:flush()
end
local function handle_outgoing ()
while true do
local msg = common.recv_message(leader)
print_notification(client_tx, msg)
end
end

fiber.spawn(exit_when_finished(handle_outgoing))
fiber.spawn(exit_when_finished(print_notifications))

while true do
local sched = fiber.current_scheduler
Expand Down

0 comments on commit 855aae7

Please sign in to comment.