Skip to content

Commit

Permalink
Merge pull request #1055 from dpino/alarm-notifications
Browse files Browse the repository at this point in the history
Support alarm notifications
  • Loading branch information
dpino authored May 6, 2018
2 parents 39fc947 + 65df2ca commit b2008dc
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 8 deletions.
71 changes: 71 additions & 0 deletions src/lib/ptree/ptree.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ local support = require("lib.ptree.support")
local channel = require("lib.ptree.channel")
local trace = require("lib.ptree.trace")
local alarms = require("lib.yang.alarms")
local json_lib = require("lib.ptree.json")

local call_with_output_string = mem.call_with_output_string

Expand Down Expand Up @@ -77,6 +78,7 @@ function new_manager (conf)
ret.default_schema = conf.default_schema or conf.schema_name
ret.support = support.load_schema_config_support(conf.schema_name)
ret.peers = {}
ret.notification_peers = {}
ret.setup_fn = conf.setup_fn
ret.period = 1/conf.Hz
ret.worker_default_scheduling = conf.worker_default_scheduling
Expand Down Expand Up @@ -495,6 +497,19 @@ function Manager:rpc_attach_listener (args)
if success then return response else return {status=1, error=response} end
end

function Manager:rpc_attach_notification_listener ()
local i, peers = 1, self.peers
while i <= #peers do
if peers[i] == self.rpc_peer then break end
i = i + 1
end
if i <= #peers then
table.insert(self.notification_peers, self.rpc_peer)
table.remove(self.peers, i)
end
return {}
end

function Manager:rpc_get_state (args)
local function getter()
if args.schema ~= self.schema_name then
Expand Down Expand Up @@ -533,6 +548,61 @@ end

local dummy_unix_sockaddr = S.t.sockaddr_un()

function Manager:push_notifications_to_peers()
local notifications = alarms.notifications()
if #notifications == 0 then return end
local function head (queue)
local msg = assert(queue[1])
local len = #msg
return ffi.cast('uint8_t*', msg), len
end
local function tojson (output, str)
json_lib.write_json_object(output, str)
local msg = output:flush()
return tostring(#msg)..'\n'..msg
end
-- Enqueue notifications into each peer queue.
local peers = self.notification_peers
for _,peer in ipairs(peers) do
local output = json_lib.buffered_output()
peer.queue = peer.queue or {}
for _,each in ipairs(notifications) do
table.insert(peer.queue, tojson(output, each))
end
end
-- Iterate peers and send enqueued messages.
for i,peer in ipairs(peers) do
local queue = peer.queue
while #queue > 0 do
local buf, len = head(peer.queue)
peer.pos = peer.pos or 0
local count, err = peer.fd:write(buf + peer.pos,
len - peer.pos)
if not count then
if err.AGAIN then break end
peer.state = 'error'
peer.msg = tostring(err)
elseif count == 0 then
peer.state = 'error'
peer.msg = 'short write'
else
peer.pos = peer.pos + count
assert(peer.pos <= len)
if peer.pos == len then
peer.pos = 0
table.remove(peer.queue, 1)
end
end

if peer.state == 'error' then
if peer.state == 'error' then self:warn('%s', peer.msg) end
peer.fd:close()
table.remove(peers, i)
end
end
end
end

function Manager:handle_calls_from_peers()
local peers = self.peers
while true do
Expand Down Expand Up @@ -756,6 +826,7 @@ function Manager:main (duration)
if timer.ticks then timer.run_to_time(now * 1e9) end
self:remove_stale_workers()
self:handle_calls_from_peers()
self:push_notifications_to_peers()
self:send_messages_to_workers()
self:receive_alarms_from_workers()
now = C.get_monotonic_time()
Expand Down
63 changes: 55 additions & 8 deletions src/lib/yang/alarms.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,36 @@ local state = {
},
shelved_alarms = {
shelved_alarms = {}
},
notifications = {
alarm = {},
alarm_inventory_changed = {},
operator_action = {}
}
}

local function clear_notifications ()
state.notifications.alarm = {}
state.notifications.alarm_inventory_changed = {}
state.notifications.operator_action = {}
end

function notifications ()
local ret = {}
local notifications = state.notifications
for k,v in pairs(notifications.alarm) do
table.insert(ret, v)
end
for k,v in pairs(notifications.alarm_inventory_changed) do
table.insert(ret, v)
end
for k,v in pairs(notifications.operator_action) do
table.insert(ret, v)
end
clear_notifications()
return ret
end

local function table_size (t)
local size = 0
for _ in pairs(t) do size = size + 1 end
Expand Down Expand Up @@ -171,6 +198,11 @@ function do_add_to_inventory (k, v)
end
state.alarm_inventory.alarm_type[key] = v
state.alarm_inventory.alarm_type[key].resource = resource
alarm_inventory_changed()
end

function alarm_inventory_changed()
table.insert(state.notifications.alarm_inventory_changed, {})
end

-- Single point to access alarm keys.
Expand Down Expand Up @@ -284,13 +316,19 @@ end
-- The entry with latest time-stamp in this list MUST correspond to the leafs
-- 'is-cleared', 'perceived-severity' and 'alarm-text' for the alarm.
-- The time-stamp for that entry MUST be equal to the 'last-changed' leaf.
local function add_status_change (alarm, status)
local function add_status_change (key, alarm, status)
alarm.status_change = alarm.status_change or {}
alarm.perceived_severity = status.perceived_severity
alarm.alarm_text = status.alarm_text
alarm.last_changed = status.time
state.alarm_list.last_changed = status.time
table.insert(alarm.status_change, status)
add_alarm_notification(key, status)
end

function add_alarm_notification (key, status)
local notifications = state.notifications.alarm
notifications[key] = status
end

-- Creates a new alarm.
Expand All @@ -305,7 +343,7 @@ local function new_alarm (key, args)
perceived_severity = args.perceived_severity or ret.perceived_severity,
alarm_text = args.alarm_text or ret.alarm_text,
}
add_status_change(ret, status)
add_status_change(key, ret, status)
ret.last_changed = assert(status.time)
ret.time_created = assert(ret.last_changed)
ret.is_cleared = args.is_cleared
Expand Down Expand Up @@ -339,14 +377,14 @@ end
-- An alarm gets updated if it needs a status change. A status change implies
-- to add a new status change to the alarm and update the alarm 'is_cleared'
-- flag.
local function update_alarm (alarm, args)
local function update_alarm (key, alarm, args)
if needs_status_change(alarm, args) then
local status = {
time = assert(format_date_as_iso_8601()),
perceived_severity = assert(args.perceived_severity or alarm.perceived_severity),
alarm_text = assert(args.alarm_text or alarm.alarm_text),
}
add_status_change(alarm, status)
add_status_change(key, alarm, status)
alarm.is_cleared = args.is_cleared
end
end
Expand All @@ -364,6 +402,8 @@ local function lookup_alarm (key)
end
end

-- Notifications are only sent when a new alarm is raised, re-raised after being
-- cleared and when an alarm is cleared.
function raise_alarm (key, args)
assert(key)
args = args or {}
Expand All @@ -373,7 +413,7 @@ function raise_alarm (key, args)
if not alarm then
create_alarm(key, args)
else
update_alarm(alarm, args)
update_alarm(key, alarm, args)
end
end

Expand All @@ -385,7 +425,7 @@ function clear_alarm (key)
key = alarm_keys:normalize(key)
local alarm = lookup_alarm(key)
if alarm then
update_alarm(alarm, args)
update_alarm(key, alarm, args)
end
end

Expand Down Expand Up @@ -424,20 +464,27 @@ function set_operator_state (key, args)
alarm.operator_state_change = {}
end
local time = format_date_as_iso_8601()
table.insert(alarm.operator_state_change, {
local status = {
time = time,
operator = 'admin',
state = args.state,
text = args.text,
})
}
table.insert(alarm.operator_state_change, status)
if args.state == 'shelved' then
shelve_alarm(key, alarm)
elseif args.state == 'un-shelved' then
unshelve_alarm(key, alarm)
end
add_operator_action_notification(key, status)
return true
end

function add_operator_action_notification (key, status)
local operator_action = state.notifications.operator_action
operator_action[key] = status
end

-- Purge alarms.

local ages = {seconds=1, minutes=60, hours=3600, days=3600*24, weeks=3600*24*7}
Expand Down
6 changes: 6 additions & 0 deletions src/lib/yang/snabb-config-leader-v1.yang
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ module snabb-config-leader-v1 {
}
}

rpc attach-notification-listener {
output {
uses error-reporting;
}
}

rpc set-alarm-operator-state {
description
"This is a means for the operator to indicate
Expand Down
18 changes: 18 additions & 0 deletions src/program/alarms/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,24 @@ $ snabb alarms compress test 12345 arp-resolution

See [`snabb alarms compress --help`](./compress/README) for more information.

### Notifications

Alarm notification are sent by a leader under certain circumstances. There
are 3 types of alarm notifications:

- Alarm notification: sent to report a newly raised alarm, a cleared alarm
or changing the text and/or severity of an existing alarm.

- Alarm inventory changed notification: sent to report that the list of
possible alarms has changed. This can happen when for example if a new
software module is installed, or a new physical card is inserted.

- Operator action notification: sent to report that an operator acted upon an
alarm.

To listen to these notifications open a connection to a Snabb instance using
the subprogram `alarms listen`.

## How does it work?

The Snabb instance itself should be running in *multi-process mode*,
Expand Down
13 changes: 13 additions & 0 deletions src/program/alarms/listen/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Usage: snabb alarms listen [OPTION]... ID
Listens to alarms notifications sent from an Snabb instance.

Available options:
-h, --help Display this message.
-c, --socket Path to unix socket if using.

If the --socket argument is provided, it will accept a client over the unix
socket. If it has not been provided, it will accept commands over stdin and
display them on stdout.

See https://github.com/Igalia/snabb/blob/lwaftr/src/program/alarms/README.md
for full documentation.
1 change: 1 addition & 0 deletions src/program/alarms/listen/README.inc
Loading

0 comments on commit b2008dc

Please sign in to comment.