forked from leafo/pgmoon
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
ce5ac68
commit 8cc46bc
Showing
1 changed file
with
77 additions
and
113 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,113 +1,77 @@ | ||
local flatten | ||
flatten = require("pgmoon.util").flatten | ||
local RedbeanSocket | ||
do | ||
local _class_0 | ||
local _base_0 = { | ||
connect = function(self, host, port, opts) | ||
if not (self.unix_socket) then | ||
self.unix_socket = assert(unix.socket()) | ||
end | ||
local err | ||
self.sock, err = unix.connect(self.unix_socket, assert(ResolveIp(host)), port) | ||
if not (self.sock) then | ||
return nil, err:doc() | ||
end | ||
if self.timeout then | ||
unix.setsockopt(self.unix_socket, SOL_SOCKET, SO_RCVTIMEO, self.timeout / 1000) | ||
unix.setsockopt(self.unix_socket, SOL_SOCKET, SO_SNDTIMEO, self.timeout / 1000) | ||
end | ||
return true | ||
end, | ||
starttls = function(self, ...) | ||
return error("Not supported at the moment.") | ||
end, | ||
getpeercertificate = function(self) | ||
return error("Not supported at the moment.") | ||
end, | ||
send = function(self, ...) | ||
local data = flatten(...) | ||
local CANWRITE = unix.POLLOUT | unix.POLLWRNORM | ||
local events = assert(unix.poll({ | ||
[self.unix_socket] = unix.POLLOUT | ||
}, self.timeout)) | ||
if not (events[self.unix_socket]) then | ||
return nil, "timeout" | ||
end | ||
if events[self.unix_socket] & CANWRITE == 0 then | ||
return nil, "close" | ||
end | ||
local size = 0 | ||
while size < #data do | ||
local sent, err = unix.send(self.unix_socket, string.sub(data, size + 1)) | ||
if not sent and err:name() == "EAGAIN" then | ||
return nil, "timeout" | ||
end | ||
if not sent then | ||
return nil, err:doc() | ||
end | ||
size = size + sent | ||
end | ||
return size | ||
end, | ||
receive = function(self, pattern) | ||
local CANREAD = unix.POLLIN | unix.POLLRDNORM | unix.POLLRDBAND | ||
local size = tonumber(pattern) | ||
local buf = "" | ||
if size then | ||
local events = assert(unix.poll({ | ||
[self.unix_socket] = unix.POLLIN | ||
}, self.timeout)) | ||
if not (events[self.unix_socket]) then | ||
return nil, "timeout" | ||
end | ||
if events[self.unix_socket] & CANREAD == 0 then | ||
return nil, "close" | ||
end | ||
while #buf < size do | ||
local rec = assert(unix.recv(self.unix_socket, size - #buf)) | ||
if #rec == 0 then | ||
break | ||
else | ||
buf = buf .. rec | ||
end | ||
end | ||
end | ||
return buf | ||
end, | ||
close = function(self) | ||
return assert(unix.close(self.unix_socket)) | ||
end, | ||
settimeout = function(self, t) | ||
self.timeout = t | ||
if self.unix_socket then | ||
unix.setsockopt(self.unix_socket, SOL_SOCKET, SO_RCVTIMEO, t / 1000) | ||
return unix.setsockopt(self.unix_socket, SOL_SOCKET, SO_SNDTIMEO, t / 1000) | ||
end | ||
end, | ||
getreusedtimes = function(self) | ||
return 0 | ||
end, | ||
setkeepalive = function(self) | ||
return error("You attempted to call setkeepalive on a Redbean socket. This method is only available for the ngx cosocket API for releasing a socket back into the connection pool") | ||
end | ||
} | ||
_base_0.__index = _base_0 | ||
_class_0 = setmetatable({ | ||
__init = function() end, | ||
__base = _base_0, | ||
__name = "RedbeanSocket" | ||
}, { | ||
__index = _base_0, | ||
__call = function(cls, ...) | ||
local _self_0 = setmetatable({}, _base_0) | ||
cls.__init(_self_0, ...) | ||
return _self_0 | ||
end | ||
}) | ||
_base_0.__class = _class_0 | ||
RedbeanSocket = _class_0 | ||
end | ||
return { | ||
RedbeanSocket = RedbeanSocket | ||
} | ||
-- https://github.com/pkulchenko/ZeroBranePackage/blob/21b1b58ad4df5a61f0c79020b58d180d5c693d98/redbean.lua#L94 | ||
|
||
import flatten from require "pgmoon.util" | ||
|
||
-- socket proxy class to make Redbean socket behave like ngx.socket.tcp | ||
class RedbeanSocket | ||
connect: (host, port, opts) => | ||
@unix_socket = assert unix.socket! unless @unix_socket | ||
|
||
@sock, err = unix.connect @unix_socket, assert(ResolveIp host), port | ||
|
||
unless @sock | ||
return nil, err\doc! | ||
|
||
if @timeout | ||
unix.setsockopt @unix_socket, SOL_SOCKET, SO_RCVTIMEO, @timeout / 1000 | ||
unix.setsockopt @unix_socket, SOL_SOCKET, SO_SNDTIMEO, @timeout / 1000 | ||
|
||
true | ||
|
||
-- args: [context][, timeout] | ||
starttls: (...) => | ||
error "Not supported at the moment." | ||
|
||
-- returns openssl.x509 object | ||
getpeercertificate: => | ||
error "Not supported at the moment." | ||
|
||
send: (...) => | ||
data = flatten ... | ||
|
||
CANWRITE = unix.POLLOUT | unix.POLLWRNORM | ||
events = assert unix.poll @unix_socket: unix.POLLOUT, @timeout | ||
return nil, "timeout" unless events[@unix_socket] | ||
return nil, "close" if events[@unix_socket] & CANWRITE == 0 | ||
size = 0 | ||
while size < #data | ||
sent, err = unix.send @unix_socket, string.sub data, size + 1 | ||
return nil, "timeout" if not sent and err\name! == "EAGAIN" | ||
return nil, err\doc! if not sent | ||
size += sent | ||
break if sent == 0 | ||
size | ||
|
||
receive: (pattern) => | ||
CANREAD = unix.POLLIN | unix.POLLRDNORM | unix.POLLRDBAND | ||
size = tonumber(pattern) | ||
buf = "" | ||
if size | ||
events = assert unix.poll @unix_socket: unix.POLLIN, @timeout | ||
return nil, "timeout" unless events[@unix_socket] | ||
return nil, "close" if events[@unix_socket] & CANREAD == 0 | ||
while #buf < size | ||
rec = assert unix.recv @unix_socket, size - #buf | ||
if #rec == 0 | ||
break | ||
else | ||
buf ..= rec | ||
buf | ||
|
||
close: => | ||
assert unix.close @unix_socket | ||
|
||
settimeout: (t) => | ||
@timeout = t | ||
if @unix_socket | ||
unix.setsockopt @unix_socket, SOL_SOCKET, SO_RCVTIMEO, t / 1000 | ||
unix.setsockopt @unix_socket, SOL_SOCKET, SO_SNDTIMEO, t / 1000 | ||
|
||
-- openresty pooling interface, always return 0 to suggest that the socket | ||
-- is connecting for the first time | ||
getreusedtimes: => 0 | ||
|
||
setkeepalive: => | ||
error "You attempted to call setkeepalive on a Redbean socket. This method is only available for the ngx cosocket API for releasing a socket back into the connection pool" | ||
|
||
{ :RedbeanSocket } |