Skip to content

Commit

Permalink
Adapt "snabb config listen" to streaming parse
Browse files Browse the repository at this point in the history
  • Loading branch information
wingo committed May 3, 2018
1 parent ac8e74e commit 91560fa
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions src/program/config/listen/listen.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module(..., package.seeall)
local S = require("syscall")
local ffi = require("ffi")
local mem = require("lib.stream.mem")
local file = require("lib.stream.file")
local rpc = require("lib.yang.rpc")
local data = require("lib.yang.data")
local path_lib = require("lib.yang.path")
Expand All @@ -17,7 +18,7 @@ local function open_socket(file)
local sa = S.t.sockaddr_un(file)
assert(socket:bind(sa))
assert(socket:listen())
return socket
return file.fdopen(socket, 'rdwr')
end

local function validate_config(schema_name, revision_date, path, value_str)
Expand Down Expand Up @@ -78,7 +79,7 @@ local function attach_listener(leader, caller, schema_name, revision_date)
local msg, parse_reply = rpc.prepare_call(
caller, 'attach-listener', {schema=schema_name, revision=revision_date})
common.send_message(leader, msg)
return parse_reply(common.recv_message(leader))
return parse_reply(mem.open_input_string(common.recv_message(leader)))
end

function run(args)
Expand Down Expand Up @@ -107,15 +108,15 @@ function run(args)

local client = json_lib.buffered_input(fd)
local pollfds = S.types.t.pollfds({
{fd=leader, events="in"},
{fd=leader.io.fd, events="in"},
{fd=client, events="in"}})
local pending_replies = {}
while true do
if client:avail() == 0 then
assert(S.poll(pollfds, -1))
end
for _,pfd in ipairs(pollfds) do
if pfd.fd == leader:getfd() then
if pfd.fd == leader.io.fd:getfd() then
if pfd.ERR or pfd.HUP then
while #pending_replies > 0 do
local have_reply = table.remove(pending_replies)
Expand Down Expand Up @@ -151,6 +152,7 @@ function run(args)
local msg, parse_reply = rpc.prepare_call(
caller, request.method, request.args)
local function have_reply(msg)
msg = mem.open_input_string(msg)
return print_reply(parse_reply(msg), fd)
end
common.send_message(leader, msg)
Expand Down

0 comments on commit 91560fa

Please sign in to comment.