Skip to content

Commit

Permalink
Merge remote-tracking branch 'eugeneia/mcpring-group-freelist' into i…
Browse files Browse the repository at this point in the history
…pfix-rss
  • Loading branch information
alexandergall committed May 15, 2018
2 parents dd90b64 + a083487 commit 49a24f0
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 41 deletions.
56 changes: 56 additions & 0 deletions src/apps/interlink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Inter-process links (apps.interlink.*)

The “interlink” transmitter and receiver apps allow for efficient exchange
of packets between Snabb processes within the same process group (see
[Multiprocess operation (core.worker)](#multiprocess-operation-core.worker)).

DIAGRAM: Transmitter and Receiver
+-------------+ +-------------+
| | | |
input | | | |
----* Transmitter | | Reciever *----
| | | | output
| | | |
+-------------+ +-------------+

To make packets from an output port available to other processes, configure a
transmitter app, and link the appropriate output port to its `input` port.

```lua
local Transmitter = require("apps.interlink.transmitter")

config.app(c, "interlink", Transmitter)
config.link(c, "myapp.output -> interlink.input")
```

Then, in the process that should receive the packets, configure a receiver app
with the same name, and link its `output` port as suitable.

```lua
local Receiver = require("apps.interlink.receiver")

config.app(c, "interlink", Receiver)
config.link(c, "interlink.output -> otherapp.input")
```

Subsequently, packets transmitted to the transmitter’s `input` port will appear
on the receiver’s `output` port.

## Configuration

None, but the configured app names are globally unique within the process
group.

Starting either the transmitter or receiver app attaches them to a shared
packet queue visible to the process group under the name that was given to the
app. When the queue identified by the name is unavailable, because it is
already in use by a pair of processes within the group, configuration of the
app network will block until the queue becomes available. Once the transmitter
or receiver apps are stopped they detach from the queue.

Only two processes (one receiver and one transmitter) can be attached to an
interlink queue at the same time, but during the lifetime of the queue (e.g.,
from when the first process attached to when the last process detaches) it can
be shared by any number of receivers and transmitters. Meaning, either process
attached to the queue can be restarted or replaced by another process without
packet loss.
26 changes: 17 additions & 9 deletions src/apps/interlink/receiver.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ local interlink = require("lib.interlink")

local Receiver = {name="apps.interlink.Receiver"}

function Receiver:new (_, name)
local self = {}
self.shm_name = "group/interlink/"..name..".interlink"
self.backlink = "interlink/receiver/"..name..".interlink"
self.interlink = interlink.attach_receiver(self.shm_name)
shm.alias(self.backlink, self.shm_name)
return setmetatable(self, {__index=Receiver})
function Receiver:new ()
packet.enable_group_freelist()
return setmetatable({attached=false}, {__index=Receiver})
end

function Receiver:link ()
if not self.attached then
self.shm_name = "group/interlink/"..self.appname..".interlink"
self.backlink = "interlink/receiver/"..self.appname..".interlink"
self.interlink = interlink.attach_receiver(self.shm_name)
shm.alias(self.backlink, self.shm_name)
self.attached = true
end
end

function Receiver:pull ()
Expand All @@ -27,8 +33,10 @@ function Receiver:pull ()
end

function Receiver:stop ()
interlink.detach_receiver(self.interlink, self.shm_name)
shm.unlink(self.backlink)
if self.attached then
interlink.detach_receiver(self.interlink, self.shm_name)
shm.unlink(self.backlink)
end
end

-- Detach receivers to prevent leaking interlinks opened by pid.
Expand Down
26 changes: 17 additions & 9 deletions src/apps/interlink/transmitter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ local interlink = require("lib.interlink")

local Transmitter = {name="apps.interlink.Transmitter"}

function Transmitter:new (_, name)
local self = {}
self.shm_name = "group/interlink/"..name..".interlink"
self.backlink = "interlink/transmitter/"..name..".interlink"
self.interlink = interlink.attach_transmitter(self.shm_name)
shm.alias(self.backlink, self.shm_name)
return setmetatable(self, {__index=Transmitter})
function Transmitter:new ()
packet.enable_group_freelist()
return setmetatable({attached=false}, {__index=Transmitter})
end

function Transmitter:link ()
if not self.attached then
self.shm_name = "group/interlink/"..self.appname..".interlink"
self.backlink = "interlink/transmitter/"..self.appname..".interlink"
self.interlink = interlink.attach_transmitter(self.shm_name)
shm.alias(self.backlink, self.shm_name)
self.attached = true
end
end

function Transmitter:push ()
Expand All @@ -27,8 +33,10 @@ function Transmitter:push ()
end

function Transmitter:stop ()
interlink.detach_transmitter(self.interlink, self.shm_name)
shm.unlink(self.backlink)
if self.attached then
interlink.detach_transmitter(self.interlink, self.shm_name)
shm.unlink(self.backlink)
end
end

-- Detach transmitters to prevent leaking interlinks opened by pid.
Expand Down
2 changes: 1 addition & 1 deletion src/core/app.lua
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ function apply_config_actions (actions)
configuration.apps[name] = nil
end
function ops.start_app (name, class, arg)
local app = class:new(arg, name)
local app = class:new(arg)
if type(app) ~= 'table' then
error(("bad return value from app '%s' start() method: %s"):format(
name, tostring(app)))
Expand Down
32 changes: 18 additions & 14 deletions src/core/packet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,20 @@ end

local packet_allocation_step = 1000
local packets_allocated = 0
local packets_fl = ffi.new("struct freelist")
packets_fl.max = max_packets
local group_fl
if not shm.exists("group/packets.freelist") then
group_fl = shm.create("group/packets.freelist", "struct freelist")
group_fl.max = max_packets
else
group_fl = shm.open("group/packets.freelist", "struct freelist")
local packets_fl = ffi.new("struct freelist", {max=max_packets})
local group_fl -- Initialized on demand.

-- Call to ensure group freelist is enabled.
function enable_group_freelist ()
if not group_fl then
group_fl = shm.create("group/packets.freelist", "struct freelist")
group_fl.max = max_packets
end
end

-- Return borrowed packets to group freelist.
function rebalance_freelists ()
if freelist_nfree(packets_fl) > packets_allocated then
if group_fl and freelist_nfree(packets_fl) > packets_allocated then
freelist_lock(group_fl)
while freelist_nfree(packets_fl) > packets_allocated
and not freelist_full(group_fl) do
Expand All @@ -118,12 +120,14 @@ end
-- Return an empty packet.
function allocate ()
if freelist_nfree(packets_fl) == 0 then
freelist_lock(group_fl)
while freelist_nfree(group_fl) > 0
and freelist_nfree(packets_fl) < packets_allocated do
freelist_add(packets_fl, freelist_remove(group_fl))
if group_fl then
freelist_lock(group_fl)
while freelist_nfree(group_fl) > 0
and freelist_nfree(packets_fl) < packets_allocated do
freelist_add(packets_fl, freelist_remove(group_fl))
end
freelist_unlock(group_fl)
end
freelist_unlock(group_fl)
if freelist_nfree(packets_fl) == 0 then
preallocate_step()
end
Expand Down
2 changes: 2 additions & 0 deletions src/doc/genbook.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ $(cat $mdroot/apps/test/README.md)
$(cat $mdroot/apps/wall/README.md)
$(cat $mdroot/apps/interlink/README.md)
# Libraries
$(cat $mdroot/lib/README.checksum.md)
Expand Down
14 changes: 6 additions & 8 deletions src/lib/interlink.lua
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,16 @@ local DOWN = 4 -- Both ends have detached; must be re-allocated.
-- (any) DOWN->* Cannot transition from DOWN (must create new queue.)

local function attach (name, initialize)
local ok, r
local r
local first_try = true
waitfor(
function ()
-- Try to open the queue.
ok, r = pcall(shm.open, name, "struct interlink")
-- If that failed then we try to create it.
if not ok then ok, r = pcall(shm.create, name, "struct interlink") end
-- Return if we could map the queue and succeed to initialize it.
if ok and initialize(r) then return true end
-- Create/open the queue.
r = shm.create(name, "struct interlink")
-- Return if we succeed to initialize it.
if initialize(r) then return true end
-- We failed; handle error and try again.
if ok then shm.unmap(r); ok, r = nil end
shm.unmap(r)
if first_try then
print("interlink: waiting for "..name.." to become available...")
first_try = false
Expand Down

0 comments on commit 49a24f0

Please sign in to comment.