Skip to content

Commit

Permalink
Rewrite "snabb config bench" to use fibers and new streams
Browse files Browse the repository at this point in the history
  • Loading branch information
wingo committed May 4, 2018
1 parent e26a4b6 commit 19e422c
Showing 1 changed file with 74 additions and 75 deletions.
149 changes: 74 additions & 75 deletions src/program/config/bench/bench.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ module(..., package.seeall)
local S = require("syscall")
local lib = require("core.lib")
local ffi = require("ffi")
local json_lib = require("lib.ptree.json")
local file = require("lib.stream.file")
local mem = require("lib.stream.mem")
local fiber = require("lib.fibers.fiber")
local json_lib = require("lib.ptree.json2")

function show_usage(command, status, err_msg)
if err_msg then print('error: '..err_msg) end
Expand All @@ -28,106 +31,102 @@ function parse_command_line(args)
return listen_params, commands_file
end

local function read_reply(fd)
local json = read_json_object(client)
local output = buffered_output()
write_json_object(output, json)
output:flush(S.stdout)
end

local function read_commands(file)
local fd = assert(S.open(file, "rdonly"))
local input = json_lib.buffered_input(fd)
json_lib.skip_whitespace(input)
local function read_commands(filename)
local input = assert(file.open(filename, 'r'))
local ret = {}
while not input:eof() do
local json = json_lib.read_json_object(input)
json_lib.skip_whitespace(input)
local out = json_lib.buffered_output()
json_lib.write_json_object(out, json)
table.insert(ret, out:flush())
while true do
local obj = json_lib.read_json_object(input)
if obj == nil then break end
local function write_json(out)
json_lib.write_json_object(out, obj)
end
table.insert(ret, mem.call_with_output_string(write_json))
end
fd:close()
input:close()
return ret
end

function die(input)
local chars = {}
while input:peek() do
table.insert(chars, input:peek())
input:discard()
end
local str = table.concat(chars)
io.stderr:write("Error detected reading response:\n"..str)
main.exit(1)
end

function full_write(fd, str)
local ptr = ffi.cast("const char*", str)
local written = 0
while written < #str do
local count = assert(fd:write(ptr + written, #str - written))
written = written + count
end
end

function run(args)
listen_params, file = parse_command_line(args)
local commands = read_commands(file)
-- The stock Lua popen interface does not support full-duplex operation;
-- it would need to return two pipes.
local function popen_rw(filename, argv)
local ok, err, input_read, input_write = assert(S.pipe())
local ok, err, output_read, output_write = assert(S.pipe())
local pid = S.fork()
if pid == 0 then
local argv = {"snabb", "config", "listen"}
if listen_params.schema_name then
table.insert(argv, "-s")
table.insert(argv, listen_params.schema_name)
end
if listen_params.revision_date then
table.insert(argv, "-r")
table.insert(argv, listen_params.revision_date)
end
table.insert(argv, listen_params.instance_id)
S.prctl("set_pdeathsig", "hup")
input_write:close()
output_read:close()
assert(S.dup2(input_read, 0))
assert(S.dup2(output_write, 1))
input_read:close()
output_write:close()
lib.execv(("/proc/%d/exe"):format(S.getpid()), argv)
lib.execv(filename, argv)
end
input_read:close()
output_write:close()

local write_buffering = assert(input_write:fcntl(S.c.F.GETPIPE_SZ))
return pid, file.fdopen(input_write), file.fdopen(output_read)
end

local function spawn_snabb_config_listen(params)
local argv = {"snabb", "config", "listen"}
if params.schema_name then
table.insert(argv, "-s")
table.insert(argv, params.schema_name)
end
if params.revision_date then
table.insert(argv, "-r")
table.insert(argv, params.revision_date)
end
table.insert(argv, params.instance_id)
return popen_rw("/proc/self/exe", argv)
end

function run(args)
local handler = require('lib.fibers.file').new_poll_io_handler()
file.set_blocking_handler(handler)
fiber.current_scheduler:add_task_source(handler)

local listen_params, filename = parse_command_line(args)
local pid, tx, rx = spawn_snabb_config_listen(listen_params)
local commands = read_commands(filename)

local input = json_lib.buffered_input(output_read)
local start = engine.now()
local next_write, next_read = 1, 1
local buffered_bytes = 0
io.stdout:setvbuf("no")
while next_read <= #commands do
while next_write <= #commands do
local str = commands[next_write]
if buffered_bytes + #str > write_buffering then break end
full_write(input_write, str)

local function exit_if_error(f)
return function()
local success, res = pcall(f)
if not success then
io.stderr:write('error: '..tostring(res)..'\n')
os.exit(1)
end
end
end

local done = false
local function send_requests()
for i,command in ipairs(commands) do
tx:write_chars(command)
io.stdout:write("w")
buffered_bytes = buffered_bytes + #str
next_write = next_write + 1
end
while next_read < next_write do
json_lib.skip_whitespace(input)
local ok, response = pcall(json_lib.read_json_object, input)
if ok then
buffered_bytes = buffered_bytes - #commands[next_read]
next_read = next_read + 1
io.stdout:write("r")
else
die(input)
end
io.stdout:write("!")
tx:flush()
end
local function read_replies()
for i=1,#commands do
local ok, obj = pcall(json_lib.read_json_object, rx)
if not ok then error('failed to read json obj: '..tostring(obj)) end
if not obj then error('unexpected EOF while reading response') end
io.stdout:write("r")
end
done = true
end

fiber.spawn(exit_if_error(send_requests))
fiber.spawn(exit_if_error(read_replies))

local start = engine.now()
while not done do fiber.current_scheduler:run() end
local elapsed = engine.now() - start
io.stdout:write("\n")
print(string.format("Issued %s commands in %.2f seconds (%.2f commands/s)",
Expand Down

0 comments on commit 19e422c

Please sign in to comment.