Skip to content

Commit

Permalink
Add sockets library that integrates with blocking handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
wingo committed May 7, 2018
1 parent 9a03646 commit 70fde0b
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 3 deletions.
10 changes: 7 additions & 3 deletions src/lib/stream/file.lua
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ end

set_blocking_handler()

function init_nonblocking(fd) blocking_handler:init_nonblocking(fd) end
function wait_for_readable(fd) blocking_handler:wait_for_readable(fd) end
function wait_for_writable(fd) blocking_handler:wait_for_writable(fd) end

local File = {}
local File_mt = {__index = File}

local function new_file_io(fd, filename)
blocking_handler:init_nonblocking(fd)
init_nonblocking(fd)
return setmetatable({fd=fd, filename=filename}, File_mt)
end

Expand Down Expand Up @@ -88,8 +92,8 @@ function File:seek(whence, offset)
return self.fd:lseek(offset, whence)
end

function File:wait_for_readable() blocking_handler:wait_for_readable(self.fd) end
function File:wait_for_writable() blocking_handler:wait_for_writable(self.fd) end
function File:wait_for_readable() wait_for_readable(self.fd) end
function File:wait_for_writable() wait_for_writable(self.fd) end

function File:close()
self.fd:close()
Expand Down
105 changes: 105 additions & 0 deletions src/lib/stream/socket.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

-- A stream IO implementation for sockets.

module(..., package.seeall)

local file = require('lib.stream.file')
local S = require('syscall')

local Socket = {}
local Socket_mt = {__index = Socket}

local sigpipe_handler

function socket(domain, stype, protocol)
if sigpipe_handler == nil then sigpipe_handler = S.signal('pipe', 'ign') end
local fd = assert(S.socket(domain, stype, protocol))
file.init_nonblocking(fd)
return setmetatable({fd=fd}, Socket_mt)
end

function Socket:listen_unix(file)
local sa = S.t.sockaddr_un(file)
self.scratch_sockaddr = S.t.sockaddr_un()
assert(self.fd:bind(sa))
assert(self.fd:listen())
end

function Socket:accept()
while true do
local fd, err = self.fd:accept(self.scratch_sockaddr)
if fd then
return file.fdopen(fd)
elseif err.AGAIN or err.WOULDBLOCK then
file.wait_for_readable(self.fd)
else
error(tostring(err))
end
end
end

function Socket:connect(sa)
local ok, err = self.fd:connect(sa)
if not ok and err.INPROGRESS then
-- Bonkers semantics; see connect(2).
file.wait_for_writable(self.fd)
local err = assert(s:getsockopt("socket", "error"))
if err == 0 then ok = true
else err = S.t.error(err) end
end
if ok then
local fd = self.fd
self.fd = nil
return file.fdopen(fd)
end
error(tostring(err))
end

function Socket:connect_unix(file, stype)
local sa = S.t.sockaddr_un(file)
return self:connect(sa)
end

function listen_unix(file, stype, protocol)
local s = socket('unix', stype or 'stream', protocol)
s:listen_unix(file)
return s
end

function connect_unix(file, stype, protocol)
local s = socket('unix', stype or 'stream', protocol)
return s:connect_unix(file)
end

function Socket:close()
if self.fd then self.fd:close() end
self.fd = nil
end

function selftest()
print('selftest: lib.stream.socket')
local shm = require('core.shm')

local sockname = shm.root..'/'..tostring(S.getpid())..'/test-socket'
S.unlink(sockname)

local server = listen_unix(sockname)
local client = connect_unix(sockname)
local peer = server:accept()

local message = "hello, world\n"
client:write(message)
client:flush_output()
local message2 = peer:read_some_chars()
assert(message == message2)
client:close()
assert(peer:read_some_chars() == nil)
peer:close()

server:close()

S.unlink(sockname)

print('selftest: ok')
end

0 comments on commit 70fde0b

Please sign in to comment.