Skip to content

Commit

Permalink
Websocket simple proxy protocol implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksey Mikhaylov <aym@tempesta-tech.com>
  • Loading branch information
ttaym committed Mar 24, 2022
1 parent 01fee37 commit 168129c
Show file tree
Hide file tree
Showing 19 changed files with 388 additions and 81 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ Module.symvers
# vscode project settings
.vscode

# nodejs and npm
node_modules
package.json
package-lock.json

# Python compiler cache files
*.pyc

Expand Down
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ DBG_CFG ?= 0
DBG_HTTP_PARSER ?= 0
DBG_SS ?= 0
DBG_TLS ?= 0
DBG_WS ?= 0
DBG_APM ?= 0
DBG_HTTP_FRAME ?= 0
DBG_HTTP_STREAM ?= 0
DBG_HPACK ?= 0
TFW_CFLAGS += -DDBG_CFG=$(DBG_CFG) -DDBG_HTTP_PARSER=$(DBG_HTTP_PARSER)
TFW_CFLAGS += -DDBG_SS=$(DBG_SS) -DDBG_TLS=$(DBG_TLS) -DDBG_APM=$(DBG_APM)
TFW_CFLAGS += -DDBG_HTTP_FRAME=$(DBG_HTTP_FRAME)
TFW_CFLAGS += -DDBG_SS=$(DBG_SS) -DDBG_TLS=$(DBG_TLS) -DDBG_WS=$(DBG_WS)
TFW_CFLAGS += -DDBG_APM=$(DBG_APM) -DDBG_HTTP_FRAME=$(DBG_HTTP_FRAME)
TFW_CFLAGS += -DDBG_HTTP_STREAM=$(DBG_HTTP_STREAM)
TFW_CFLAGS += -DDBG_HPACK=$(DBG_HPACK)

Expand Down
11 changes: 0 additions & 11 deletions fw/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,6 @@ tfw_connection_send(TfwConn *conn, TfwMsg *msg)
return TFW_CONN_HOOK_CALL(conn, conn_send, msg);
}

int
tfw_connection_recv(void *cdata, struct sk_buff *skb)
{
TfwConn *conn = cdata;
TfwFsmData fsm_data = {
.skb = skb,
};

return tfw_http_msg_process(conn, &fsm_data);
}

void
tfw_connection_hooks_register(TfwConnHooks *hooks, int type)
{
Expand Down
5 changes: 3 additions & 2 deletions fw/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ enum {
* @timer - The keep-alive/retry timer for the connection;
* @stream - instance for control messages processing;
* @peer - TfwClient or TfwServer handler. Hop-by-hop peer;
* @pair - Paired TfwCliConn or TfwSrvConn for websocket connections;
* @sk - an appropriate sock handler;
* @destructor - called when a connection is destroyed;
*/
typedef struct tfw_conn_t TfwConn;
#define TFW_CONN_COMMON \
SsProto proto; \
TfwGState state; \
Expand All @@ -105,6 +107,7 @@ enum {
struct timer_list timer; \
TfwStream stream; \
TfwPeer *peer; \
TfwConn *pair; \
struct sock *sk; \
void (*destructor)(void *);

Expand Down Expand Up @@ -527,6 +530,4 @@ int tfw_connection_close(TfwConn *conn, bool sync);
void tfw_connection_drop(TfwConn *conn);
void tfw_connection_release(TfwConn *conn);

int tfw_connection_recv(void *cdata, struct sk_buff *skb);

#endif /* __TFW_CONNECTION_H__ */
13 changes: 1 addition & 12 deletions fw/gfsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
* tfw_gfsm_move().
*
* Copyright (C) 2014 NatSys Lab. (info@natsys-lab.com).
* Copyright (C) 2015-2018 Tempesta Technologies, Inc.
* Copyright (C) 2015-2022 Tempesta Technologies, Inc.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -193,17 +193,6 @@ __gfsm_fsm_exec(TfwGState *st, int fsm_id, TfwFsmData *data)
return r;
}

/**
* Dispatch connection data to proper FSM by application protocol type.
*/
int
tfw_gfsm_dispatch(TfwGState *st, void *obj, TfwFsmData *data)
{
int fsm_id = TFW_FSM_TYPE(((SsProto *)obj)->type);

return __gfsm_fsm_exec(st, fsm_id, data);
}

/**
* Move the FSM with descriptor @st to new the state @state and call all
* registered hooks for it.
Expand Down
1 change: 0 additions & 1 deletion fw/gfsm.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ typedef struct tfw_conn_t TfwConn;
typedef int (*tfw_gfsm_handler_t)(TfwConn *conn, TfwFsmData *data);

void tfw_gfsm_state_init(TfwGState *st, void *obj, int st0);
int tfw_gfsm_dispatch(TfwGState *st, void *obj, TfwFsmData *data);
int tfw_gfsm_move(TfwGState *st, unsigned short state, TfwFsmData *data);

#ifdef DEBUG
Expand Down
103 changes: 63 additions & 40 deletions fw/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
#include "client.h"
#include "http_msg.h"
#include "http_sess.h"
#include "websocket.h"
#include "log.h"
#include "procfs.h"
#include "server.h"
Expand Down Expand Up @@ -2271,7 +2272,7 @@ tfw_http_resp_pair(TfwHttpMsg *hmresp)
* instance. Increment the number of users of the instance. Initialize
* GFSM for the message.
*/
static TfwMsg *
TfwMsg *
tfw_http_conn_msg_alloc(TfwConn *conn, TfwStream *stream)
{
int type = TFW_CONN_TYPE(conn);
Expand Down Expand Up @@ -2307,12 +2308,14 @@ tfw_http_conn_msg_alloc(TfwConn *conn, TfwStream *stream)

if (TFW_MSG_H2(hm->req)) {
size_t sz = TFW_HDR_MAP_SZ(TFW_HDR_MAP_INIT_CNT);
TfwHttpTransIter *mit = &((TfwHttpResp *)hm)->mit;
TfwHttpTransIter *mit =
&((TfwHttpResp *)hm)->mit;

mit->map = tfw_pool_alloc(hm->pool, sz);
if (unlikely(!mit->map)) {
T_WARN("HTTP/2: unable to allocate memory for"
" response header map\n");
T_WARN("HTTP/2: unable to allocate"
"memory for response header "
"map\n");
goto clean;
}
mit->map->size = TFW_HDR_MAP_INIT_CNT;
Expand Down Expand Up @@ -5948,6 +5951,12 @@ tfw_http_websocket_upgrade(TfwHttpResp *resp)

set_bit(TFW_CONN_B_UNSCHED, &((TfwSrvConn *)resp->conn)->flags);

resp->conn->proto.type |= TFW_FSM_WS;
resp->req->conn->proto.type |= TFW_FSM_WS;

resp->req->conn->pair = resp->conn;
resp->conn->pair = resp->req->conn;

tfw_sock_srv_connect_one(srv, srv_conn);

srv->sg->sched->upd_srv(srv);
Expand Down Expand Up @@ -6104,6 +6113,27 @@ tfw_http_resp_process(TfwConn *conn, TfwStream *stream, struct sk_buff *skb)
if (unlikely(r < TFW_PASS))
return TFW_BLOCK;

/*
* Upgrade client and server connection to websocket, remove it
* from scheduler and provision new connection.
*
* Need to do upgrade before sibling processing because after upgrade
* http semantics for pipelined responses no longer apply.
*/
if (unlikely(test_bit(TFW_HTTP_B_CONN_UPGRADE, hmresp->flags)
&& test_bit(TFW_HTTP_B_UPGRADE_WEBSOCKET, hmresp->flags)
&& resp->status == 101))
{
r = tfw_http_websocket_upgrade(resp);
if (unlikely(r < TFW_PASS))
return TFW_BLOCK;

if (skb) {
ss_skb_queue_tail(&resp->msg.skb_head, skb);
skb = NULL;
}
}

/*
* If @skb's data has not been processed in full, then
* we have pipelined responses. Create a sibling message.
Expand Down Expand Up @@ -6136,22 +6166,6 @@ tfw_http_resp_process(TfwConn *conn, TfwStream *stream, struct sk_buff *skb)
goto next_resp;
}

/*
* Upgrade client and server connection to websocket, remove it
* from scheduler and provision new connection.
*
* TODO #755: set existent client and server connection to Conn_Ws*
* when websocket proxing protocol will be implemented
*/
if (unlikely(test_bit(TFW_HTTP_B_CONN_UPGRADE, hmresp->flags)
&& test_bit(TFW_HTTP_B_UPGRADE_WEBSOCKET, hmresp->flags)
&& resp->status == 101))
{
r = tfw_http_websocket_upgrade(resp);
if (unlikely(r < TFW_PASS))
return TFW_BLOCK;
}

/*
* Pass the response to cache for further processing.
* In the end, the response is sent on to the client.
Expand Down Expand Up @@ -6203,28 +6217,27 @@ tfw_http_resp_process(TfwConn *conn, TfwStream *stream, struct sk_buff *skb)
*/
int
tfw_http_msg_process_generic(TfwConn *conn, TfwStream *stream,
TfwFsmData *data)
struct sk_buff *skb)
{
if (WARN_ON_ONCE(!stream))
return -EINVAL;
if (unlikely(!stream->msg)) {
stream->msg = tfw_http_conn_msg_alloc(conn, stream);
if (!stream->msg) {
__kfree_skb(data->skb);
__kfree_skb(skb);
return TFW_BLOCK;
}
tfw_http_mark_wl_new_msg(conn, (TfwHttpMsg *)stream->msg,
data->skb);
tfw_http_mark_wl_new_msg(conn, (TfwHttpMsg *)stream->msg, skb);
T_DBG2("Link new msg %p with connection %p\n",
stream->msg, conn);
}

T_DBG2("Add skb %p to message %p\n", data->skb, stream->msg);
ss_skb_queue_tail(&stream->msg->skb_head, data->skb);
T_DBG2("Add skb %p to message %p\n", skb, stream->msg);
ss_skb_queue_tail(&stream->msg->skb_head, skb);

return (TFW_CONN_TYPE(conn) & Conn_Clnt)
? tfw_http_req_process(conn, stream, data->skb)
: tfw_http_resp_process(conn, stream, data->skb);
? tfw_http_req_process(conn, stream, skb)
: tfw_http_resp_process(conn, stream, skb);
}

/**
Expand All @@ -6236,25 +6249,29 @@ tfw_http_msg_process_generic(TfwConn *conn, TfwStream *stream,
* returned an error code on. The rest of skbs are freed by us.
*/
int
tfw_http_msg_process(TfwConn *conn, TfwFsmData *data)
tfw_http_msg_process(TfwConn *conn, struct sk_buff *skb)
{
int r = T_OK;
TfwStream *stream = &((TfwConn *)conn)->stream;
struct sk_buff *next;

if (data->skb->prev)
data->skb->prev->next = NULL;
for (next = data->skb->next; data->skb;
data->skb = next, next = next ? next->next : NULL)
if (skb->prev)
skb->prev->next = NULL;
for (next = skb->next; skb;
skb = next, next = next ? next->next : NULL)
{
if (likely(r == T_OK || r == T_POSTPONE)) {
data->skb->next = data->skb->prev = NULL;
r = TFW_CONN_H2(conn)
? tfw_h2_frame_process(conn, data->skb)
: tfw_http_msg_process_generic(conn, stream,
data);
skb->next = skb->prev = NULL;
if (unlikely(TFW_CONN_PROTO(conn) & TFW_FSM_WS))
r = tfw_ws_msg_process(conn, skb);
else
r = TFW_CONN_H2(conn)
? tfw_h2_frame_process(conn, skb)
: tfw_http_msg_process_generic(conn,
stream,
skb);
} else {
__kfree_skb(data->skb);
__kfree_skb(skb);
}
}

Expand Down Expand Up @@ -7002,6 +7019,12 @@ TfwMod tfw_http_mod = {
.specs = tfw_http_specs,
};

static int
tfw_http_msg_process_fsm(TfwConn *conn, TfwFsmData *data)
{
return tfw_http_msg_process(conn, data->skb);
}

/*
* ------------------------------------------------------------------------
* init/exit
Expand All @@ -7013,7 +7036,7 @@ tfw_http_init(void)
{
int r;

if ((r = tfw_gfsm_register_fsm(TFW_FSM_HTTP, tfw_http_msg_process)))
if ((r = tfw_gfsm_register_fsm(TFW_FSM_HTTP, tfw_http_msg_process_fsm)))
return r;

tfw_connection_hooks_register(&http_conn_hooks, TFW_FSM_HTTP);
Expand Down
5 changes: 3 additions & 2 deletions fw/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -674,9 +674,9 @@ tfw_h2_pseudo_index(unsigned short status)
typedef void (*tfw_http_cache_cb_t)(TfwHttpMsg *);

/* External HTTP functions. */
int tfw_http_msg_process(TfwConn *conn, TfwFsmData *data);
int tfw_http_msg_process(TfwConn *conn, struct sk_buff *skb);
int tfw_http_msg_process_generic(TfwConn *conn, TfwStream *stream,
TfwFsmData *data);
struct sk_buff *skb);
unsigned long tfw_http_req_key_calc(TfwHttpReq *req);
void tfw_http_req_destruct(void *msg);
void tfw_http_resp_fwd(TfwHttpResp *resp);
Expand Down Expand Up @@ -705,6 +705,7 @@ int tfw_h1_prep_redirect(TfwHttpResp *resp, unsigned short status,
TfwStr *rmark, TfwStr *cookie, TfwStr *body);
int tfw_http_prep_304(TfwHttpReq *req, struct sk_buff **skb_head,
TfwMsgIter *it);
TfwMsg *tfw_http_conn_msg_alloc(TfwConn *conn, TfwStream *stream);
void tfw_http_conn_msg_free(TfwHttpMsg *hm);
void tfw_http_send_resp(TfwHttpReq *req, int status, const char *reason);

Expand Down
2 changes: 1 addition & 1 deletion fw/http_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -1803,7 +1803,7 @@ tfw_h2_frame_process(TfwConn *c, struct sk_buff *skb)
}
h2->data_off = 0;
h2->skb_head = data_up.skb->next = data_up.skb->prev = NULL;
r = tfw_http_msg_process_generic(c, h2->cur_stream, &data_up);
r = tfw_http_msg_process_generic(c, h2->cur_stream, data_up.skb);
if (r == T_DROP) {
kfree_skb(nskb);
goto out;
Expand Down
3 changes: 2 additions & 1 deletion fw/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Tempesta FW
*
* Copyright (C) 2014 NatSys Lab. (info@natsys-lab.com).
* Copyright (C) 2015-2021 Tempesta Technologies, Inc.
* Copyright (C) 2015-2022 Tempesta Technologies, Inc.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -470,6 +470,7 @@ tfw_init(void)
DO_INIT(filter);
DO_INIT(cache);
DO_INIT(http_sess);
DO_INIT(websocket);

DO_INIT(sync_socket);
DO_INIT(server);
Expand Down
2 changes: 1 addition & 1 deletion fw/sock_clnt.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ tfw_sock_clnt_drop(struct sock *sk)
static const SsHooks tfw_sock_http_clnt_ss_hooks = {
.connection_new = tfw_sock_clnt_new,
.connection_drop = tfw_sock_clnt_drop,
.connection_recv = tfw_connection_recv,
.connection_recv = tfw_http_msg_process,
};

static const SsHooks tfw_sock_tls_clnt_ss_hooks = {
Expand Down
2 changes: 1 addition & 1 deletion fw/sock_srv.c
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ tfw_sock_srv_connect_drop(struct sock *sk)
static const SsHooks tfw_sock_srv_ss_hooks = {
.connection_new = tfw_sock_srv_connect_complete,
.connection_drop = tfw_sock_srv_connect_drop,
.connection_recv = tfw_connection_recv,
.connection_recv = tfw_http_msg_process,
};

/**
Expand Down
6 changes: 4 additions & 2 deletions fw/sync_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Synchronous Socket API.
*
* Copyright (C) 2014 NatSys Lab. (info@natsys-lab.com).
* Copyright (C) 2015-2017 Tempesta Technologies, Inc.
* Copyright (C) 2015-2022 Tempesta Technologies, Inc.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -53,6 +53,8 @@ enum {
Conn_Stop = 0x1 << __Flag_Bits,
};

typedef struct tfw_conn_t TfwConn;

/* Table of Synchronous Sockets connection callbacks. */
typedef struct ss_hooks {
/* New connection accepted. */
Expand All @@ -71,7 +73,7 @@ typedef struct ss_hooks {
void (*connection_drop)(struct sock *sk);

/* Process data received on the socket. */
int (*connection_recv)(void *conn, struct sk_buff *skb);
int (*connection_recv)(TfwConn *conn, struct sk_buff *skb);
} SsHooks;

/**
Expand Down
Loading

0 comments on commit 168129c

Please sign in to comment.