Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge wingo-next for v2018.05 #1349

Merged
merged 80 commits into from
Jun 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
5100c5d
wip: MCRingBuffer for packets.
eugeneia Jul 25, 2017
3ddb3f3
manual batching
eugeneia Jul 25, 2017
86a6ee9
balance packets
eugeneia Jul 25, 2017
293892b
apps/test
eugeneia Jul 25, 2017
9cdd697
worker
eugeneia Jul 25, 2017
322b61c
factor
eugeneia Jul 25, 2017
6306ed7
core.worker: include si_status in status
eugeneia Jul 25, 2017
4e538cf
fix init race
eugeneia Jul 25, 2017
626c8b6
reorg
eugeneia Jul 25, 2017
ac886ba
global max
eugeneia Jul 26, 2017
06053f6
lib.interlink: fix initialization race.
eugeneia Aug 1, 2017
2e32f83
core.worker: include si_status in status
eugeneia Aug 16, 2017
47d8792
lib.interlink: specialized inter-process links
eugeneia Jul 25, 2017
7318cc0
apps.interlink: name apps
eugeneia Aug 17, 2017
8dff93b
apps.interlink: use robust setup/teardown routine, add sensible memor…
eugeneia Aug 21, 2017
4486728
apps.interlink.receiver: don’t forward packets until connected
eugeneia Aug 21, 2017
c971256
Merge fixes for initialization races from branch interlink into mcpring
eugeneia Jan 3, 2018
db61756
extract lib.print_object from lib.store_config
eugeneia Oct 30, 2017
a60a42b
lib.xsd_regexp: port MaxPC to lib.maxpc, implement regular expression…
eugeneia Oct 30, 2017
204f21e
lib.xsd_regexp: implement basic regexp compiler
eugeneia Nov 2, 2017
6f41966
lib.maxpc: add input_class argument to parse / use classes for input …
eugeneia Nov 3, 2017
660ae79
Revert "lib.maxpc: add input_class argument to parse / use classes [.…
eugeneia Nov 3, 2017
ba37cd8
lib.maxpc: decode UTF-8 input
eugeneia Nov 3, 2017
d3cacc6
lib.maxpc: fix bug in match._not
eugeneia Nov 5, 2017
b6cc1a6
lib.maxpc: port digit/number parsers
eugeneia Nov 5, 2017
5fd0198
lib.xsd_regexp: implement unicode ranges
eugeneia Nov 8, 2017
7de7fd3
lib.ucd: supporting Unicode database for lib.xsd_regexp
eugeneia Nov 8, 2017
f62c95a
lib.xsd_regexp: implement limited category escapes
eugeneia Nov 9, 2017
5eaf4ac
lib.xsd_regexp: remove support for block escapes, copy in GC predicates
eugeneia Nov 9, 2017
ba292f0
lib.xsd_regexp: remove obsolete match.digit
eugeneia Nov 9, 2017
ba818b9
Revert "lib.ucd: supporting Unicode database for lib.xsd_regexp"
eugeneia Nov 9, 2017
b26d5fc
Revert "lib.xsd_regexp: implement unicode ranges"
eugeneia Nov 9, 2017
6fcebb8
Revert "lib.maxpc: decode UTF-8 input", revert lib.xsd_regexp to ASCI…
eugeneia Nov 9, 2017
1187fc7
lib.xsd_regexp: implement \i, \I, \c, and \C
eugeneia Nov 9, 2017
074ff0c
lib.xsd_regexp: implement backtracking
eugeneia Nov 10, 2017
1be3a55
lib.yang.schema: fix parse_range
eugeneia Nov 12, 2017
de468cb
lib.yang.data: implement range and pattern restrictions
eugeneia Nov 12, 2017
e54b5f6
lib.yang.data: do not compile pattern validator for empty set of patt…
eugeneia Jan 12, 2018
88cbeb0
lib.yang.data: only run pattern validator on string values
eugeneia Jan 12, 2018
d1a7409
lib.yang.schema: fix bug in selftest (range can have multiple intervals)
eugeneia Jan 12, 2018
e577721
program.lwaftr.tests.propbased.genyang: adapt to new range format
eugeneia Jan 12, 2018
090d617
lib.numa: skip selftest if running on a non-NUMA machine
eugeneia Jan 26, 2018
2ece0c1
lib.numa: more fine grained approach to 090d6179d
eugeneia Jan 29, 2018
6783dac
lib.interlink: use __attribute((packet, aligned(64)))__ to align s
eugeneia Jan 25, 2018
cae2554
apps/interlink: turn test.lua into Snabb script selftest.snabb
eugeneia Feb 9, 2018
022ab0c
apps.interlink: add duration parameter to selftest.snabb
eugeneia Feb 9, 2018
c888a86
Simple spinlock implementation based on xchg instruction.
xrme Feb 22, 2016
d1adbcb
process group: rebalance packets exchanged across process boundaries
eugeneia Feb 7, 2018
9f0694f
core.packet: extract packet.account_free from packet.free
eugeneia Feb 9, 2018
756255c
DynASM: add CMPXCHG instruction
eugeneia Jan 18, 2018
d05da38
core.sync: add atomic compare-and-swap primitive based on CMPXCHG
eugeneia Jan 18, 2018
9dfa543
interlink: adapt interlink and related apps to group freelist
eugeneia Feb 9, 2018
020a9fd
core.sync: consolidate core.spinlock into core.sync
eugeneia Feb 10, 2018
302f952
core.app: pass app name as second argument to :new in start_app
eugeneia Feb 15, 2018
4686cbf
apps.interlink: use app name to derive SHM interlink path
eugeneia Feb 15, 2018
cb9250f
lib.interlink: fix life-time fsm design and implementation bugs
eugeneia Feb 18, 2018
274ea4b
core.packet: do not count packets reclaimed from group_fl as freed
eugeneia Feb 19, 2018
59caa02
apps.interlink: shutdown handler to release stale attachments
eugeneia Feb 19, 2018
b1472e8
lib.interlink: integrate with shm.open_frame for snabb top
eugeneia Feb 20, 2018
5ac340e
core.shm: explicitly open shm objects read-only in open_frame
eugeneia Feb 20, 2018
95de704
lib.interlink: fix outdated comments
eugeneia Feb 21, 2018
154d63c
lib.interlink / core.main: fix shutdown bugs
eugeneia Feb 22, 2018
72aadf7
lib.interlink: take advantage of non-excl shm.create semantics
eugeneia Feb 22, 2018
44b4cdf
core.packet: use ffi init argument to initialize packet_fl
eugeneia Feb 24, 2018
5ac6e7b
core.packet: enable group freelist only on demand
eugeneia Mar 5, 2018
03daccb
Merge tag 'v2018.01.1' into yang-restrictions-upstream
eugeneia Mar 5, 2018
02bdf78
apps.interlink.*: add user-facing documentation.
eugeneia Mar 6, 2018
e7f63d6
doc/branches.md: Add snabbco/raptorjit integration branch
lukego Apr 4, 2018
be6c657
lib.maxpc: fix backtracking “zero or more” combinator match.all
eugeneia Apr 6, 2018
b9da7ca
Merge PR #1322 (v2018.04 release) into master
eugeneia Apr 30, 2018
bf4307d
Make "offset" arg to lseek a signed integer
wingo May 2, 2018
c583ac7
apps.interlink: fix documentation bugs
eugeneia May 3, 2018
5fbe79e
apps.interlink: avoid engine/appname kludge by attaching in :link()
eugeneia May 3, 2018
a083487
Revert "core.app: pass app name as second argument to :new in start_app"
eugeneia May 3, 2018
b699d7e
apps.interlink: allow specifying queue name per config
eugeneia May 30, 2018
d4c2eca
Merge 'eugeneia/mcpring-group-freelist' into wingo-next
wingo Jun 1, 2018
47df3ba
Merge 'lukego/raptorjit-branch' into wingo-next
wingo Jun 1, 2018
6cfe5e5
Merge 'Igalia/fix-lseek' into wingo-next
wingo Jun 1, 2018
bb935c7
Merge 'eugeneia/yang-restrictions-upstream' into wingo-next
wingo Jun 1, 2018
3120d7c
Merge 'eugeneia/numa-selftest-nonnuma' into wingo-next
wingo Jun 4, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/ljsyscall/syscall/linux/c.lua
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ else -- 64 bit
function C.fstatfs(fd, buf) return syscall(sys.fstatfs, int(fd), void(buf)) end
function C.preadv(fd, iov, iovcnt, offset) return syscall_long(sys.preadv, int(fd), void(iov), long(iovcnt), ulong(offset)) end
function C.pwritev(fd, iov, iovcnt, offset) return syscall_long(sys.pwritev, int(fd), void(iov), long(iovcnt), ulong(offset)) end
function C.lseek(fd, offset, whence) return syscall_off(sys.lseek, int(fd), ulong(offset), int(whence)) end
function C.lseek(fd, offset, whence) return syscall_off(sys.lseek, int(fd), long(offset), int(whence)) end
function C.sendfile(outfd, infd, offset, count)
return syscall_long(sys.sendfile, int(outfd), int(infd), void(offset), ulong(count))
end
Expand Down
4 changes: 2 additions & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ TESTMODS = $(shell find . -regex '[^\#]*\.\(lua\|dasl\)' -printf '%P ' | \

# TESTSCRIPTS expands to:
# lib/watchdog/selftest.sh ...
# for each executable selftext.sh script in src.
TESTSCRIPTS = $(shell find . -name "selftest.sh" -executable | xargs)
# for each executable selftext.* script in src.
TESTSCRIPTS = $(shell find . -name "selftest.*" -executable | xargs)

PATH := ../lib/luajit/usr/local/bin:$(PATH)

Expand Down
65 changes: 65 additions & 0 deletions src/apps/interlink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Inter-process links (apps.interlink.*)

The “interlink” transmitter and receiver apps allow for efficient exchange
of packets between Snabb processes within the same process group (see
[Multiprocess operation (core.worker)](#multiprocess-operation-core.worker)).

DIAGRAM: Transmitter and Receiver
+-------------+ +-------------+
| | | |
input | | | |
----* Transmitter | | Reciever *----
| | | | output
| | | |
+-------------+ +-------------+

To make packets from an output port available to other processes, configure a
transmitter app, and link the appropriate output port to its `input` port.

```lua
local Transmitter = require("apps.interlink.transmitter")

config.app(c, "interlink", Transmitter)
config.link(c, "myapp.output -> interlink.input")
```

Then, in the process that should receive the packets, configure a receiver app
with the same name, and link its `output` port as suitable.

```lua
local Receiver = require("apps.interlink.receiver")

config.app(c, "interlink", Receiver)
config.link(c, "interlink.output -> otherapp.input")
```

Subsequently, packets transmitted to the transmitter’s `input` port will appear
on the receiver’s `output` port.

Alternatively, a name can be supplied as a configuration argument to be used
instead of the app’s name:

```lua
config.app(c, "mylink", Receiver, "interlink")
config.link(c, "mylink.output -> otherapp.input")
```

## Configuration

The configured app names denote globally unique queues within the process
group. Alternativelyy, the receiver and transmitter apps can instead be passed
a string that names the shared queue to which to attach to.

Starting either the transmitter or receiver app attaches them to a shared
packet queue visible to the process group under the name that was given to the
app. When the queue identified by the name is unavailable, because it is
already in use by a pair of processes within the group, configuration of the
app network will block until the queue becomes available. Once the transmitter
or receiver apps are stopped they detach from the queue.

Only two processes (one receiver and one transmitter) can be attached to an
interlink queue at the same time, but during the lifetime of the queue (e.g.,
from when the first process attached to when the last process detaches) it can
be shared by any number of receivers and transmitters. Meaning, either process
attached to the queue can be restarted or replaced by another process without
packet loss.
58 changes: 58 additions & 0 deletions src/apps/interlink/receiver.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local shm = require("core.shm")
local interlink = require("lib.interlink")

local Receiver = {name="apps.interlink.Receiver"}

function Receiver:new (queue)
packet.enable_group_freelist()
return setmetatable({attached=false, queue=queue}, {__index=Receiver})
end

function Receiver:link ()
local queue = self.queue or self.appname
if not self.attached then
self.shm_name = "group/interlink/"..queue..".interlink"
self.backlink = "interlink/receiver/"..queue..".interlink"
self.interlink = interlink.attach_receiver(self.shm_name)
shm.alias(self.backlink, self.shm_name)
self.attached = true
end
end

function Receiver:pull ()
local o, r, n = self.output.output, self.interlink, 0
if not o then return end -- don’t forward packets until connected
while not interlink.empty(r) and n < engine.pull_npackets do
link.transmit(o, interlink.extract(r))
n = n + 1
end
interlink.pull(r)
end

function Receiver:stop ()
if self.attached then
interlink.detach_receiver(self.interlink, self.shm_name)
shm.unlink(self.backlink)
end
end

-- Detach receivers to prevent leaking interlinks opened by pid.
--
-- This is an internal API function provided for cleanup during
-- process termination.
function Receiver.shutdown (pid)
for _, queue in ipairs(shm.children("/"..pid.."/interlink/receiver")) do
local backlink = "/"..pid.."/interlink/receiver/"..queue..".interlink"
local shm_name = "/"..pid.."/group/interlink/"..queue..".interlink"
-- Call protected in case /<pid>/group is already unlinked.
local ok, r = pcall(interlink.open, shm_name)
if ok then interlink.detach_receiver(r, shm_name) end
shm.unlink(backlink)
end
end

return Receiver
33 changes: 33 additions & 0 deletions src/apps/interlink/selftest.snabb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!snabb snsh

-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

local worker = require("core.worker")
local interlink = require("lib.interlink")
local Receiver = require("apps.interlink.receiver")
local Sink = require("apps.basic.basic_apps").Sink

-- Synopsis: selftest.snabb [duration]
local DURATION = tonumber(main.parameters[1]) or 10

worker.start("source", [[require("apps.interlink.test_source").start("test")]])

local c = config.new()

config.app(c, "test", Receiver)
config.app(c, "sink", Sink)
config.link(c, "test.output->sink.input")

engine.configure(c)
engine.main({duration=DURATION, report={showlinks=true}})

for w, s in pairs(worker.status()) do
print(("worker %s: pid=%s alive=%s status=%s"):format(
w, s.pid, s.alive, s.status))
end
local stats = link.stats(engine.app_table["sink"].input.input)
print(stats.txpackets / 1e6 / DURATION .. " Mpps")

-- test teardown
engine.configure(config.new())
engine.main({duration=0.1})
15 changes: 15 additions & 0 deletions src/apps/interlink/test_source.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local Transmitter = require("apps.interlink.transmitter")
local Source = require("apps.basic.basic_apps").Source

function start (name)
local c = config.new()
config.app(c, name, Transmitter)
config.app(c, "source", Source)
config.link(c, "source.output -> "..name..".input")
engine.configure(c)
engine.main()
end
58 changes: 58 additions & 0 deletions src/apps/interlink/transmitter.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local shm = require("core.shm")
local interlink = require("lib.interlink")

local Transmitter = {name="apps.interlink.Transmitter"}

function Transmitter:new (queue)
packet.enable_group_freelist()
return setmetatable({attached=false, queue=queue}, {__index=Transmitter})
end

function Transmitter:link ()
local queue = self.queue or self.appname
if not self.attached then
self.shm_name = "group/interlink/"..queue..".interlink"
self.backlink = "interlink/transmitter/"..queue..".interlink"
self.interlink = interlink.attach_transmitter(self.shm_name)
shm.alias(self.backlink, self.shm_name)
self.attached = true
end
end

function Transmitter:push ()
local i, r = self.input.input, self.interlink
while not (interlink.full(r) or link.empty(i)) do
local p = link.receive(i)
packet.account_free(p) -- stimulate breathing
interlink.insert(r, p)
end
interlink.push(r)
end

function Transmitter:stop ()
if self.attached then
interlink.detach_transmitter(self.interlink, self.shm_name)
shm.unlink(self.backlink)
end
end

-- Detach transmitters to prevent leaking interlinks opened by pid.
--
-- This is an internal API function provided for cleanup during
-- process termination.
function Transmitter.shutdown (pid)
for _, queue in ipairs(shm.children("/"..pid.."/interlink/transmitter")) do
local backlink = "/"..pid.."/interlink/transmitter/"..queue..".interlink"
local shm_name = "/"..pid.."/group/interlink/"..queue..".interlink"
-- Call protected in case /<pid>/group is already unlinked.
local ok, r = pcall(interlink.open, shm_name)
if ok then interlink.detach_transmitter(r, shm_name) end
shm.unlink(backlink)
end
end

return Transmitter
7 changes: 5 additions & 2 deletions src/core/app.lua
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,11 @@ function breathe ()
end
end
counter.add(breaths)
-- Commit counters at a reasonable frequency
if counter.read(breaths) % 100 == 0 then counter.commit() end
-- Commit counters and rebalance freelists at a reasonable frequency
if counter.read(breaths) % 100 == 0 then
counter.commit()
packet.rebalance_freelists()
end
running = false
end

Expand Down
10 changes: 7 additions & 3 deletions src/core/lib.lua
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ function load_conf (file)
end

-- Store Lua representation of value in file.
function store_conf (file, value)
function print_object (value, stream)
stream = stream or io.stdout
local indent = 0
local function print_indent (stream)
for i = 1, indent do stream:write(" ") end
Expand Down Expand Up @@ -151,10 +152,13 @@ function store_conf (file, value)
stream:write(("%s"):format(value))
end
end
local stream = assert(io.open(file, "w"))
stream:write("return ")
print_value(value, stream)
stream:write("\n")
end
function store_conf (file, value)
local stream = assert(io.open(file, "w"))
stream:write("return ")
print_object(value, stream)
stream:close()
end

Expand Down
14 changes: 8 additions & 6 deletions src/core/main.lua
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,19 @@ end

-- Cleanup after Snabb process.
function shutdown (pid)
-- simple pcall helper to print error and continue
local function safely (f)
local ok, err = pcall(f)
if not ok then print(err) end
end
-- Run cleanup hooks
safely(function () require("apps.interlink.receiver").shutdown(pid) end)
safely(function () require("apps.interlink.transmitter").shutdown(pid) end)
-- Parent process performs additional cleanup steps.
-- (Parent is the process whose 'group' folder is not a symlink.)
local st, err = S.lstat(shm.root.."/"..pid.."/group")
local is_parent = st and st.isdir
if is_parent then
-- simple pcall helper to print error and continue
local function safely (f)
local ok, err = pcall(f)
if not ok then print(err) end
end
-- Run cleanup hooks
safely(function () require("lib.hardware.pci").shutdown(pid) end)
safely(function () require("core.memory").shutdown(pid) end)
end
Expand Down
Loading