Skip to content

Commit

Permalink
feature: enabled the 'tcpsock:receiveany()' API for upstream TCP coso…
Browse files Browse the repository at this point in the history
…ckets.

Ported from ngx_meta_lua #3018ec8 and ngx_http_lua #b5ffb11.
  • Loading branch information
thibaultcha committed Aug 26, 2019
1 parent 9355e7a commit 55c5e81
Show file tree
Hide file tree
Showing 4 changed files with 534 additions and 7 deletions.
99 changes: 95 additions & 4 deletions src/ngx_stream_lua_socket_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ static int ngx_stream_lua_socket_tcp_connect(lua_State *L);
static int ngx_stream_lua_socket_tcp_sslhandshake(lua_State *L);
#endif
static int ngx_stream_lua_socket_tcp_receive(lua_State *L);
static int ngx_stream_lua_socket_tcp_receiveany(lua_State *L);
static int ngx_stream_lua_socket_tcp_send(lua_State *L);
static int ngx_stream_lua_socket_tcp_close(lua_State *L);
static int ngx_stream_lua_socket_tcp_setoption(lua_State *L);
Expand Down Expand Up @@ -107,6 +108,7 @@ static int ngx_stream_lua_socket_write_error_retval_handler(
static ngx_int_t ngx_stream_lua_socket_read_all(void *data, ssize_t bytes);
static ngx_int_t ngx_stream_lua_socket_read_until(void *data, ssize_t bytes);
static ngx_int_t ngx_stream_lua_socket_read_chunk(void *data, ssize_t bytes);
static ngx_int_t ngx_stream_lua_socket_read_any(void *data, ssize_t bytes);
static int ngx_stream_lua_socket_tcp_receiveuntil(lua_State *L);
static int ngx_stream_lua_socket_receiveuntil_iterator(lua_State *L);
static ngx_int_t ngx_stream_lua_socket_compile_pattern(u_char *data, size_t len,
Expand Down Expand Up @@ -316,7 +318,7 @@ ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
/* {{{tcp object metatable */
lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
tcp_socket_metatable_key));
lua_createtable(L, 0 /* narr */, 13 /* nrec */);
lua_createtable(L, 0 /* narr */, 14 /* nrec */);

lua_pushcfunction(L, ngx_stream_lua_socket_tcp_connect);
lua_setfield(L, -2, "connect");
Expand All @@ -334,6 +336,9 @@ ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
lua_pushcfunction(L, ngx_stream_lua_socket_tcp_receiveuntil);
lua_setfield(L, -2, "receiveuntil");

lua_pushcfunction(L, ngx_stream_lua_socket_tcp_receiveany);
lua_setfield(L, -2, "receiveany");

lua_pushcfunction(L, ngx_stream_lua_socket_tcp_send);
lua_setfield(L, -2, "send");

Expand All @@ -358,7 +363,6 @@ ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
lua_pushcfunction(L, ngx_stream_lua_socket_tcp_shutdown);
lua_setfield(L, -2, "shutdown");


lua_pushvalue(L, -1);
lua_setfield(L, -2, "__index");
lua_rawset(L, LUA_REGISTRYINDEX);
Expand Down Expand Up @@ -2366,6 +2370,75 @@ ngx_stream_lua_socket_tcp_receive_helper(ngx_stream_lua_request_t *r,
}


static int
ngx_stream_lua_socket_tcp_receiveany(lua_State *L)
{
int n;
lua_Integer bytes;
ngx_stream_lua_request_t *r;
ngx_stream_lua_loc_conf_t *llcf;

ngx_stream_lua_socket_tcp_upstream_t *u;

n = lua_gettop(L);
if (n != 2) {
return luaL_error(L, "expecting 2 arguments "
"(including the object), but got %d", n);
}

r = ngx_stream_lua_get_req(L);
if (r == NULL) {
return luaL_error(L, "no request found");
}

luaL_checktype(L, 1, LUA_TTABLE);

lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
u = lua_touserdata(L, -1);

if (u == NULL || u->peer.connection == NULL || u->read_closed) {

llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);

if (llcf->log_socket_errors) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"attempt to receive data on a closed "
"socket: u:%p, c:%p, ft:%d eof:%d",
u, u ? u->peer.connection : NULL,
u ? (int) u->ft_type : 0, u ? (int) u->eof : 0);
}

lua_pushnil(L);
lua_pushliteral(L, "closed");
return 2;
}

if (u->request != r) {
return luaL_error(L, "bad request");
}

ngx_stream_lua_socket_check_busy_connecting(r, u, L);
ngx_stream_lua_socket_check_busy_reading(r, u, L);

if (!lua_isnumber(L, 2)) {
return luaL_argerror(L, 2, "bad max argument");
}

bytes = lua_tointeger(L, 2);
if (bytes <= 0) {
return luaL_argerror(L, 2, "bad max argument");
}

u->input_filter = ngx_stream_lua_socket_read_any;
u->rest = (size_t) bytes;
u->length = u->rest;

ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
"stream lua tcp socket calling receiveany() "
"method to read at most %uz bytes", u->rest);

return ngx_stream_lua_socket_tcp_receive_helper(r, u, L);
}


static int
Expand Down Expand Up @@ -2406,8 +2479,8 @@ ngx_stream_lua_socket_tcp_receive(lua_State *L)

if (llcf->log_socket_errors) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"attempt to receive data on a closed socket: u:%p, "
"c:%p, ft:%d eof:%d",
"stream attempt to receive data on a closed "
"socket: u:%p, c:%p, ft:%d eof:%d",
u, u ? u->peer.connection : NULL,
u ? (int) u->ft_type : 0, u ? (int) u->eof : 0);
}
Expand Down Expand Up @@ -2550,6 +2623,24 @@ ngx_stream_lua_socket_read_line(void *data, ssize_t bytes)
}


static ngx_int_t
ngx_stream_lua_socket_read_any(void *data, ssize_t bytes)
{
ngx_int_t rc;
ngx_stream_lua_socket_tcp_upstream_t *u = data;

ngx_log_debug0(NGX_LOG_DEBUG_STREAM, u->request->connection->log, 0,
"stream lua tcp socket read any");

rc = ngx_stream_lua_read_any(&u->buffer, u->buf_in, &u->rest, bytes,
u->request->connection->log);
if (rc == NGX_ERROR) {
u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_CLOSED;
return NGX_ERROR;
}

return rc;
}


static ngx_int_t
Expand Down
Loading

0 comments on commit 55c5e81

Please sign in to comment.