From 2c6741734e622429b8e080ff9459f46e3eae2293 Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Mon, 26 Aug 2019 14:45:22 -0700 Subject: [PATCH] feature: enabled the 'tcpsock:receiveany()' API for upstream TCP cosockets. Ported from ngx_meta_lua #3018ec8 and ngx_http_lua #b5ffb11. --- src/ngx_stream_lua_socket_tcp.c | 99 ++++++++++- t/058-tcp-socket.t | 290 +++++++++++++++++++++++++++++++- t/062-count.t | 21 ++- t/156-slow-network.t | 131 +++++++++++++++ 4 files changed, 534 insertions(+), 7 deletions(-) create mode 100644 t/156-slow-network.t diff --git a/src/ngx_stream_lua_socket_tcp.c b/src/ngx_stream_lua_socket_tcp.c index 8b1c64f7..951243bd 100644 --- a/src/ngx_stream_lua_socket_tcp.c +++ b/src/ngx_stream_lua_socket_tcp.c @@ -33,6 +33,7 @@ static int ngx_stream_lua_socket_tcp_connect(lua_State *L); static int ngx_stream_lua_socket_tcp_sslhandshake(lua_State *L); #endif static int ngx_stream_lua_socket_tcp_receive(lua_State *L); +static int ngx_stream_lua_socket_tcp_receiveany(lua_State *L); static int ngx_stream_lua_socket_tcp_send(lua_State *L); static int ngx_stream_lua_socket_tcp_close(lua_State *L); static int ngx_stream_lua_socket_tcp_setoption(lua_State *L); @@ -107,6 +108,7 @@ static int ngx_stream_lua_socket_write_error_retval_handler( static ngx_int_t ngx_stream_lua_socket_read_all(void *data, ssize_t bytes); static ngx_int_t ngx_stream_lua_socket_read_until(void *data, ssize_t bytes); static ngx_int_t ngx_stream_lua_socket_read_chunk(void *data, ssize_t bytes); +static ngx_int_t ngx_stream_lua_socket_read_any(void *data, ssize_t bytes); static int ngx_stream_lua_socket_tcp_receiveuntil(lua_State *L); static int ngx_stream_lua_socket_receiveuntil_iterator(lua_State *L); static ngx_int_t ngx_stream_lua_socket_compile_pattern(u_char *data, size_t len, @@ -316,7 +318,7 @@ ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L) /* {{{tcp object metatable */ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask( tcp_socket_metatable_key)); - lua_createtable(L, 0 /* narr */, 13 /* nrec */); + lua_createtable(L, 0 /* narr */, 14 /* nrec */); lua_pushcfunction(L, ngx_stream_lua_socket_tcp_connect); lua_setfield(L, -2, "connect"); @@ -334,6 +336,9 @@ ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L) lua_pushcfunction(L, ngx_stream_lua_socket_tcp_receiveuntil); lua_setfield(L, -2, "receiveuntil"); + lua_pushcfunction(L, ngx_stream_lua_socket_tcp_receiveany); + lua_setfield(L, -2, "receiveany"); + lua_pushcfunction(L, ngx_stream_lua_socket_tcp_send); lua_setfield(L, -2, "send"); @@ -358,7 +363,6 @@ ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L) lua_pushcfunction(L, ngx_stream_lua_socket_tcp_shutdown); lua_setfield(L, -2, "shutdown"); - lua_pushvalue(L, -1); lua_setfield(L, -2, "__index"); lua_rawset(L, LUA_REGISTRYINDEX); @@ -2366,6 +2370,75 @@ ngx_stream_lua_socket_tcp_receive_helper(ngx_stream_lua_request_t *r, } +static int +ngx_stream_lua_socket_tcp_receiveany(lua_State *L) +{ + int n; + lua_Integer bytes; + ngx_stream_lua_request_t *r; + ngx_stream_lua_loc_conf_t *llcf; + + ngx_stream_lua_socket_tcp_upstream_t *u; + + n = lua_gettop(L); + if (n != 2) { + return luaL_error(L, "expecting 2 arguments " + "(including the object), but got %d", n); + } + + r = ngx_stream_lua_get_req(L); + if (r == NULL) { + return luaL_error(L, "no request found"); + } + + luaL_checktype(L, 1, LUA_TTABLE); + + lua_rawgeti(L, 1, SOCKET_CTX_INDEX); + u = lua_touserdata(L, -1); + + if (u == NULL || u->peer.connection == NULL || u->read_closed) { + + llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module); + + if (llcf->log_socket_errors) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "attempt to receive data on a closed " + "socket: u:%p, c:%p, ft:%d eof:%d", + u, u ? u->peer.connection : NULL, + u ? (int) u->ft_type : 0, u ? (int) u->eof : 0); + } + + lua_pushnil(L); + lua_pushliteral(L, "closed"); + return 2; + } + + if (u->request != r) { + return luaL_error(L, "bad request"); + } + + ngx_stream_lua_socket_check_busy_connecting(r, u, L); + ngx_stream_lua_socket_check_busy_reading(r, u, L); + + if (!lua_isnumber(L, 2)) { + return luaL_argerror(L, 2, "bad max argument"); + } + + bytes = lua_tointeger(L, 2); + if (bytes <= 0) { + return luaL_argerror(L, 2, "bad max argument"); + } + + u->input_filter = ngx_stream_lua_socket_read_any; + u->rest = (size_t) bytes; + u->length = u->rest; + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, + "stream lua tcp socket calling receiveany() " + "method to read at most %uz bytes", u->rest); + + return ngx_stream_lua_socket_tcp_receive_helper(r, u, L); +} static int @@ -2406,8 +2479,8 @@ ngx_stream_lua_socket_tcp_receive(lua_State *L) if (llcf->log_socket_errors) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, - "attempt to receive data on a closed socket: u:%p, " - "c:%p, ft:%d eof:%d", + "stream attempt to receive data on a closed " + "socket: u:%p, c:%p, ft:%d eof:%d", u, u ? u->peer.connection : NULL, u ? (int) u->ft_type : 0, u ? (int) u->eof : 0); } @@ -2550,6 +2623,24 @@ ngx_stream_lua_socket_read_line(void *data, ssize_t bytes) } +static ngx_int_t +ngx_stream_lua_socket_read_any(void *data, ssize_t bytes) +{ + ngx_int_t rc; + ngx_stream_lua_socket_tcp_upstream_t *u = data; + + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, u->request->connection->log, 0, + "stream lua tcp socket read any"); + + rc = ngx_stream_lua_read_any(&u->buffer, u->buf_in, &u->rest, bytes, + u->request->connection->log); + if (rc == NGX_ERROR) { + u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_CLOSED; + return NGX_ERROR; + } + + return rc; +} static ngx_int_t diff --git a/t/058-tcp-socket.t b/t/058-tcp-socket.t index 7462cc1d..94bb7d08 100644 --- a/t/058-tcp-socket.t +++ b/t/058-tcp-socket.t @@ -4,7 +4,7 @@ use Test::Nginx::Socket::Lua::Stream; repeat_each(2); -plan tests => repeat_each() * 199; +plan tests => repeat_each() * 219; our $HtmlDir = html_dir; @@ -3238,3 +3238,291 @@ connected: 1 --- error_log cleanup lua tcp socket request GC cycle done + + + +=== TEST 62: receiveany method in cosocket +--- config + location = /foo { + server_tokens off; + + content_by_lua_block { + local resp = { + '1', + '22', + 'hello world', + } + + local length = 0 + for _, v in ipairs(resp) do + length = length + #v + end + + -- flush http header + ngx.header['Content-Length'] = length + ngx.flush(true) + ngx.sleep(0.01) + + -- send http body + for _, v in ipairs(resp) do + ngx.print(v) + ngx.flush(true) + ngx.sleep(0.01) + end + } + } +--- stream_server_config + content_by_lua_block { + local sock = ngx.socket.tcp() + sock:settimeout(500) + + assert(sock:connect("127.0.0.1", $TEST_NGINX_SERVER_PORT)) + local req = { + 'GET /foo HTTP/1.0\r\n', + 'Host: localhost\r\n', + 'Connection: close\r\n\r\n', + } + local ok, err = sock:send(req) + if not ok then + ngx.say("send request failed: ", err) + return + end + + -- skip http header + while true do + local data, err, _ = sock:receive('*l') + if err then + ngx.say('unexpected error occurs when receiving http head: ', err) + return + end + + if #data == 0 then -- read last line of head + break + end + end + + -- receive http body + while true do + local data, err = sock:receiveany(1024) + if err then + if err ~= 'closed' then + ngx.say('unexpected err: ', err) + end + break + end + ngx.say(data) + end + + sock:close() + } +--- stream_response +1 +22 +hello world +--- no_error_log +[error] +--- error_log +lua tcp socket read any + + + +=== TEST 63: receiveany send data after read side closed +--- stream_server_config + content_by_lua_block { + local sock = ngx.socket.tcp() + sock:settimeout(500) + assert(sock:connect("127.0.0.1", 7658)) + + while true do + local data, err = sock:receiveany(1024) + if err then + if err ~= 'closed' then + ngx.say('unexpected err: ', err) + break + end + + local data = "send data after read side closed" + local bytes, err = sock:send(data) + if not bytes then + ngx.say(err) + end + + break + end + ngx.say(data) + end + + sock:close() + } +--- tcp_listen: 7658 +--- tcp_shutdown: 1 +--- tcp_query eval: "send data after read side closed" +--- tcp_query_len: 32 +--- stream_response +--- no_error_log +[error] + + + +=== TEST 64: receiveany with limited, max <= 0 +--- stream_server_config + content_by_lua_block { + local sock = ngx.socket.tcp() + sock:settimeout(500) + assert(sock:connect("127.0.0.1", $TEST_NGINX_SERVER_PORT)) + + local function receiveany_say_err(...) + local ok, err = pcall(sock.receiveany, sock, ...) + if not ok then + ngx.say(err) + end + end + + + receiveany_say_err(0) + receiveany_say_err(-1) + receiveany_say_err() + receiveany_say_err(nil) + } +--- stream_response +bad argument #2 to '?' (bad max argument) +bad argument #2 to '?' (bad max argument) +expecting 2 arguments (including the object), but got 1 +bad argument #2 to '?' (bad max argument) +--- no_error_log +[error] + + + +=== TEST 65: receiveany with limited, max is larger than data +--- config + location = /foo { + server_tokens off; + + content_by_lua_block { + local resp = 'hello world' + local length = #resp + + ngx.header['Content-Length'] = length + ngx.flush(true) + ngx.sleep(0.01) + + ngx.print(resp) + } + } +--- stream_server_config + content_by_lua_block { + local sock = ngx.socket.tcp() + sock:settimeout(500) + + assert(sock:connect("127.0.0.1", $TEST_NGINX_SERVER_PORT)) + local req = { + 'GET /foo HTTP/1.0\r\n', + 'Host: localhost\r\n', + 'Connection: close\r\n\r\n', + } + local ok, err = sock:send(req) + if not ok then + ngx.say("send request failed: ", err) + return + end + + while true do + local data, err, _ = sock:receive('*l') + if err then + ngx.say('unexpected error occurs when receiving http head: ', err) + return + end + + if #data == 0 then -- read last line of head + break + end + end + + local data, err = sock:receiveany(128) + if err then + if err ~= 'closed' then + ngx.say('unexpected err: ', err) + end + else + ngx.say(data) + end + + sock:close() + } +--- stream_response +hello world +--- no_error_log +[error] +--- error_log +lua tcp socket calling receiveany() method to read at most 128 bytes + + + +=== TEST 66: receiveany with limited, max is smaller than data +--- config + location = /foo { + server_tokens off; + + content_by_lua_block { + local resp = 'hello world' + local length = #resp + + ngx.header['Content-Length'] = length + ngx.flush(true) + ngx.sleep(0.01) + + ngx.print(resp) + } + } +--- stream_server_config + content_by_lua_block { + local sock = ngx.socket.tcp() + sock:settimeout(500) + + assert(sock:connect("127.0.0.1", $TEST_NGINX_SERVER_PORT)) + local req = { + 'GET /foo HTTP/1.0\r\n', + 'Host: localhost\r\n', + 'Connection: close\r\n\r\n', + } + local ok, err = sock:send(req) + if not ok then + ngx.say("send request failed: ", err) + return + end + + while true do + local data, err, _ = sock:receive('*l') + if err then + ngx.say('unexpected error occurs when receiving http head: ', err) + return + end + + if #data == 0 then -- read last line of head + break + end + end + + while true do + local data, err = sock:receiveany(7) + if err then + if err ~= 'closed' then + ngx.say('unexpected err: ', err) + end + break + + else + ngx.say(data) + end + end + + sock:close() + } +--- stream_response +hello w +orld +--- no_error_log +[error] +--- error_log +lua tcp socket calling receiveany() method to read at most 7 bytes diff --git a/t/062-count.t b/t/062-count.t index 288c28e6..cad446ea 100644 --- a/t/062-count.t +++ b/t/062-count.t @@ -238,7 +238,24 @@ worker: 4 -=== TEST 13: entries under the metatable of udp sockets +=== TEST 13: entries under the metatable of tcp sockets +--- stream_server_config + content_by_lua_block { + local n = 0 + local sock = ngx.socket.tcp() + for k, v in pairs(getmetatable(sock)) do + n = n + 1 + end + ngx.say("n = ", n) + } +--- stream_response +n = 14 +--- no_error_log +[error] + + + +=== TEST 14: entries under the metatable of udp sockets --- stream_server_config content_by_lua_block { local n = 0 @@ -255,7 +272,7 @@ n = 6 -=== TEST 14: entries under the metatable of req raw sockets +=== TEST 15: entries under the metatable of req raw sockets --- stream_server_config content_by_lua_block { local n = 0 diff --git a/t/156-slow-network.t b/t/156-slow-network.t new file mode 100644 index 00000000..c153d491 --- /dev/null +++ b/t/156-slow-network.t @@ -0,0 +1,131 @@ +BEGIN { + if (!defined $ENV{LD_PRELOAD}) { + $ENV{LD_PRELOAD} = ''; + } + + if ($ENV{LD_PRELOAD} !~ /\bmockeagain\.so\b/) { + $ENV{LD_PRELOAD} = "mockeagain.so $ENV{LD_PRELOAD}"; + } + + if ($ENV{MOCKEAGAIN} eq 'r') { + $ENV{MOCKEAGAIN} = 'rw'; + + } else { + $ENV{MOCKEAGAIN} = 'w'; + } + + $ENV{TEST_NGINX_EVENT_TYPE} = 'poll'; +} + +use Test::Nginx::Socket::Lua::Stream; + +repeat_each(2); + +plan tests => repeat_each() * (blocks() * 4); + +add_block_preprocessor(sub { + my $block = shift; + + if (!defined $block->error_log) { + $block->set_value("no_error_log", "[error]"); + } +}); + + +log_level("debug"); +no_long_string(); +#no_diff(); +run_tests(); + +__DATA__ + +=== TEST 1: receiveany returns anything once socket receives +--- config + location = /foo { + server_tokens off; + + content_by_lua_block { + local resp = { + '1', + 'hello', + } + + local length = 0 + for _, v in ipairs(resp) do + length = length + #v + end + + -- flush http header + ngx.header['Content-Length'] = length + ngx.flush(true) + ngx.sleep(0.01) + + -- send http body bytes by bytes + for _, v in ipairs(resp) do + ngx.print(v) + ngx.flush(true) + ngx.sleep(0.01) + end + } + } +--- stream_server_config + content_by_lua_block { + local sock = ngx.socket.tcp() + sock:settimeout(500) + + assert(sock:connect("127.0.0.1", $TEST_NGINX_SERVER_PORT)) + local req = { + 'GET /foo HTTP/1.0\r\n', + 'Host: localhost\r\n', + 'Connection: close\r\n\r\n', + } + local ok, err = sock:send(req) + if not ok then + ngx.say("send request failed: ", err) + return + end + + + -- skip http header + while true do + local data, err, _ = sock:receive('*l') + if err then + ngx.say('unexpected error occurs when receiving http head: ' .. err) + return + end + if #data == 0 then -- read last line of head + break + end + end + + -- receive http body + while true do + local data, err = sock:receiveany(1024) + if err then + if err ~= 'closed' then + ngx.say('unexpected err: ', err) + end + break + end + ngx.say(data) + end + + sock:close() + } +--- stream_response +1 +h +e +l +l +o +--- grep_error_log eval +qr/lua tcp socket read any/ +--- grep_error_log_out +lua tcp socket read any +lua tcp socket read any +lua tcp socket read any +lua tcp socket read any +lua tcp socket read any +lua tcp socket read any +lua tcp socket read any