Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade connection to websocket and provision new connection #1594

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions fw/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ enum {
/* HTTPS */
Conn_HttpsClnt = Conn_Clnt | TFW_FSM_HTTPS,
Conn_HttpsSrv = Conn_Srv | TFW_FSM_HTTPS,

/* Websocket plain */
Conn_WsClnt = Conn_HttpClnt | TFW_FSM_WS,
Conn_WsSrv = Conn_HttpSrv | TFW_FSM_WS,

/* Websocket secure */
Conn_WssClnt = Conn_HttpsClnt | TFW_FSM_WS,
Conn_WssSrv = Conn_HttpsSrv | TFW_FSM_WS,
};

#define TFW_CONN_TYPE2IDX(t) TFW_FSM_TYPE(t)
Expand Down Expand Up @@ -100,7 +108,7 @@ enum {
struct sock *sk; \
void (*destructor)(void *);

typedef struct TfwConn {
typedef struct tfw_conn_t {
TFW_CONN_COMMON;
} TfwConn;

Expand Down Expand Up @@ -199,7 +207,9 @@ enum {
/* Connection is in use or at least scheduled to be established. */
TFW_CONN_B_ACTIVE,
/* Connection is disconnected and stopped. */
TFW_CONN_B_STOPPED
TFW_CONN_B_STOPPED,
/* Mark connection as unavailable to schedulers */
TFW_CONN_B_UNSCHED
};

/**
Expand Down Expand Up @@ -297,6 +307,15 @@ tfw_srv_conn_restricted(TfwSrvConn *srv_conn)
return test_bit(TFW_CONN_B_RESEND, &srv_conn->flags);
}

/*
* Connection is unavailable to scheduler and may be removed from it
*/
static inline bool
tfw_srv_conn_unscheduled(TfwSrvConn *srv_conn)
{
return test_bit(TFW_CONN_B_UNSCHED, &srv_conn->flags);
}

/*
* Tell if a connection has non-idempotent requests.
*/
Expand Down
4 changes: 3 additions & 1 deletion fw/gfsm.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ enum {
/* Protocols */
TFW_FSM_HTTP,
TFW_FSM_HTTPS,
/* Not really a FSM */
TFW_FSM_WS,

/* Security rules enforcement. */
TFW_FSM_FRANG_REQ,
Expand Down Expand Up @@ -181,7 +183,7 @@ typedef struct {
& ((TFW_GFSM_FSM_MASK << TFW_GFSM_FSM_SHIFT) \
| TFW_GFSM_STATE_MASK))

typedef struct TfwConn TfwConn;
typedef struct tfw_conn_t TfwConn;
typedef int (*tfw_gfsm_handler_t)(TfwConn *conn, TfwFsmData *data);

void tfw_gfsm_state_init(TfwGState *st, void *obj, int st0);
Expand Down
41 changes: 40 additions & 1 deletion fw/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -2339,7 +2339,6 @@ static int
tfw_http_conn_init(TfwConn *conn)
{
T_DBG2("%s: conn=[%p]\n", __func__, conn);

if (TFW_CONN_TYPE(conn) & Conn_Srv) {
TfwSrvConn *srv_conn = (TfwSrvConn *)conn;
if (!list_empty(&srv_conn->fwd_queue)) {
Expand Down Expand Up @@ -5931,6 +5930,7 @@ tfw_http_resp_process(TfwConn *conn, TfwStream *stream, struct sk_buff *skb)
unsigned int chunks_unused, parsed;
TfwHttpReq *bad_req;
TfwHttpMsg *hmresp, *hmsib;
TfwHttpResp *resp;
TfwFsmData data_up;
bool conn_stop, filtout = false;

Expand All @@ -5953,6 +5953,7 @@ tfw_http_resp_process(TfwConn *conn, TfwStream *stream, struct sk_buff *skb)
parsed = 0;
hmsib = NULL;
hmresp = (TfwHttpMsg *)stream->msg;
resp = (TfwHttpResp *)hmresp;

r = ss_skb_process(skb, tfw_http_parse_resp, hmresp, &chunks_unused,
&parsed);
Expand Down Expand Up @@ -6098,6 +6099,44 @@ tfw_http_resp_process(TfwConn *conn, TfwStream *stream, struct sk_buff *skb)
r = TFW_PASS;
goto next_resp;
}

/*
* Upgrade client and server connection to websocket, remove it
* from scheduler and provision new connection.
*
* TODO #755: set existent client and server connection to Conn_Ws*
* when websocket proxing protocol will be implemented
*/
if (unlikely(test_bit(TFW_HTTP_B_CONN_UPGRADE, hmresp->flags)
&& test_bit(TFW_HTTP_B_UPGRADE_WEBSOCKET, hmresp->flags)
&& resp->status == 101))
{
TfwServer *srv = (TfwServer *)resp->conn->peer;
TfwSrvConn *srv_conn;

/* Cannot proceed with upgrade websocket due to error
* in creation of new http connection. While it will not be
* inherently erroneous to upgrade existing connection, but
* we would pay for it with essentially dropping connection with
* server. Better just drop upgrade request and
* reestablish connection.
*/
if (!(srv_conn = tfw_sock_srv_new_conn(srv))) {
tfw_http_conn_error_log(conn, "Can't create new "
"connection for websocket"
" upgrade response");
return TFW_BLOCK;
}

set_bit(TFW_CONN_B_UNSCHED,
&((TfwSrvConn *)hmresp->conn)->flags);

tfw_sock_srv_conn_activate(srv, srv_conn);
tfw_sock_srv_connect_try(srv_conn);

srv->sg->sched->upd_srv(srv);
}

/*
* Pass the response to cache for further processing.
* In the end, the response is sent on to the client.
Expand Down
2 changes: 1 addition & 1 deletion fw/http_frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ typedef struct {
unsigned char data_off;
} TfwH2Ctx;

typedef struct TfwConn TfwConn;
typedef struct tfw_conn_t TfwConn;

int tfw_h2_init(void);
void tfw_h2_cleanup(void);
Expand Down
28 changes: 28 additions & 0 deletions fw/http_sched_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ __is_conn_suitable(TfwSrvConn *conn, bool hmonitor)
{
return (hmonitor || !tfw_srv_suspended((TfwServer *)conn->peer))
&& !tfw_srv_conn_restricted(conn)
&& !tfw_srv_conn_unscheduled(conn)
&& !tfw_srv_conn_busy(conn)
&& !tfw_srv_conn_queue_full(conn)
&& tfw_srv_conn_get_if_live(conn);
Expand Down Expand Up @@ -333,6 +334,8 @@ tfw_sched_hash_add_conns(TfwServer *srv, TfwHashConnList *cl, size_t *seed,

list_for_each_entry(conn, &srv->conn_list, list) {
unsigned long hash;
if (tfw_srv_conn_unscheduled(conn))
continue;
do {
hash = __hash_64(srv_hash ^ *seed);
*seed += seed_inc;
Expand Down Expand Up @@ -432,13 +435,38 @@ tfw_sched_hash_del_srv(TfwServer *srv)
call_rcu(&cl->rcu, tfw_sched_hash_put_srv_data);
}

static int
tfw_sched_hash_upd_srv(TfwServer *srv)
{
size_t size, seed, seed_inc = 0;
TfwHashConnList *cl = rcu_dereference_bh_check(srv->sched_data, 1);
TfwHashConnList *cl_copy;

seed = get_random_long();
seed_inc = get_random_int();

size = sizeof(TfwHashConnList) + srv->conn_n * sizeof(TfwHashConn);
if (!(cl_copy = kzalloc(size, GFP_ATOMIC)))
return -ENOMEM;

tfw_sched_hash_add_conns(srv, cl_copy, &seed, seed_inc);

rcu_assign_pointer(srv->sched_data, cl_copy);

if (cl)
call_rcu(&cl->rcu, tfw_sched_hash_put_srv_data);

return 0;
}

static TfwScheduler tfw_sched_hash = {
.name = "hash",
.list = LIST_HEAD_INIT(tfw_sched_hash.list),
.add_grp = tfw_sched_hash_add_grp,
.del_grp = tfw_sched_hash_del_grp,
.add_srv = tfw_sched_hash_add_srv,
.del_srv = tfw_sched_hash_del_srv,
.upd_srv = tfw_sched_hash_upd_srv,
.sched_sg_conn = tfw_sched_hash_get_sg_conn,
.sched_srv_conn = tfw_sched_hash_get_srv_conn,
};
Expand Down
90 changes: 73 additions & 17 deletions fw/http_sched_ratio.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@

#define TFW_SCHED_RATIO_INTVL (HZ / 20) /* The timer periodicity. */

typedef struct {
struct rcu_head rcu;
size_t conn_n;
TfwSrvConn *conns[0];
} TfwRatioSrvConnList;

/**
* Individual upstream server descriptor.
*
Expand All @@ -38,17 +44,15 @@
*
* @rcu - RCU control structure.
* @srv - pointer to server structure.
* @conn - list of pointers to server connection structures.
* @cl - pointer to list of pointers to server connection structures.
* @counter - monotonic counter for choosing the next connection.
* @conn_n - number of connections to server.
* @seq - current sequence number for APM stats.
*/
typedef struct {
struct rcu_head rcu;
TfwServer *srv;
TfwSrvConn **conn;
TfwRatioSrvConnList *cl;
atomic64_t counter;
size_t conn_n;
unsigned int seq;
} TfwRatioSrvDesc;

Expand Down Expand Up @@ -827,12 +831,15 @@ static inline TfwSrvConn *
__sched_srv(TfwRatioSrvDesc *srvdesc, int skipnip, int *nipconn)
{
size_t ci;
TfwRatioSrvConnList *cl = rcu_dereference_bh_check(srvdesc->cl, 1);

for (ci = 0; ci < srvdesc->conn_n; ++ci) {
rcu_read_lock_bh();
for (ci = 0; ci < cl->conn_n; ++ci) {
unsigned long idxval = atomic64_inc_return(&srvdesc->counter);
TfwSrvConn *srv_conn = srvdesc->conn[idxval % srvdesc->conn_n];
TfwSrvConn *srv_conn = cl->conns[idxval % cl->conn_n];

if (unlikely(tfw_srv_conn_restricted(srv_conn)
|| tfw_srv_conn_unscheduled(srv_conn)
|| tfw_srv_conn_busy(srv_conn)
|| tfw_srv_conn_queue_full(srv_conn)))
continue;
Expand All @@ -841,9 +848,12 @@ __sched_srv(TfwRatioSrvDesc *srvdesc, int skipnip, int *nipconn)
++(*nipconn);
continue;
}
if (likely(tfw_srv_conn_get_if_live(srv_conn)))
if (likely(tfw_srv_conn_get_if_live(srv_conn))) {
rcu_read_unlock_bh();
return srv_conn;
}
}
rcu_read_unlock_bh();

return NULL;
}
Expand Down Expand Up @@ -981,7 +991,7 @@ tfw_sched_ratio_cleanup(TfwRatio *ratio)

/* Data that is shared between pool entries. */
for (si = 0; si < ratio->srv_n; ++si)
kfree(ratio->srvdesc[si].conn);
kfree(ratio->srvdesc[si].cl);

kfree(ratio->hstdata);
kfree(ratio->rtodata);
Expand Down Expand Up @@ -1039,28 +1049,32 @@ static int
tfw_sched_ratio_srvdesc_setup_srv(TfwServer *srv, TfwRatioSrvDesc *srvdesc)
{
size_t size, ci = 0;
TfwSrvConn **conn, *srv_conn;
TfwSrvConn *srv_conn;
TfwRatioSrvConnList *cl;

size = sizeof(TfwSrvConn *) * srv->conn_n;
if (!(srvdesc->conn = kzalloc(size, GFP_KERNEL)))
size = sizeof(TfwRatioSrvConnList) + sizeof(TfwSrvConn *) * srv->conn_n;
if (!(cl = kzalloc(size, GFP_KERNEL)))
return -ENOMEM;

conn = srvdesc->conn;
list_for_each_entry(srv_conn, &srv->conn_list, list) {
if (tfw_srv_conn_unscheduled(srv_conn))
continue;
if (unlikely(ci++ == srv->conn_n))
goto err;
*conn++ = srv_conn;

cl->conns[ci-1] = srv_conn;
}
if (unlikely(ci != srv->conn_n))
goto err;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems I was wrong with the proposal of the functions unification in both the schedulers.

Previously we called tfw_sock_srv_new_conn(), which adds a new connection to the connection list, only from process context (since we have constant number of server connections and just failover TCP connections inside them). Moreover, tfw_peer_add_conn() uses a spin lock to add a connection. I believe we have a mutex on concurrent configuraion reloads, so we rely on the fact that tfw_sched_ratio_add_srv() always works in single context.

Now we can call this function concurrently with other websocket creation and with the configuration reload, so we definitely have races in all the places in the function. We can back to the separate functions and use the spin lock in the new function, but we'll also need to use the spin lock in all other functions iterating the connections list, which introduces lock contention.

What you're doing in the patch is almost solving #710 , which is a hard task on it's own to upgrade the schedulers data structures without locks.

Initially with #755 I assumed that you just "steal" sk from TfwSrvConn and initiate the existing connection failovering mechanism.


srvdesc->conn_n = srv->conn_n;
cl->conn_n = ci;
srvdesc->srv = srv;
atomic64_set(&srvdesc->counter, 0);

rcu_assign_pointer(srvdesc->cl, cl);
return 0;
err:
kfree(srvdesc->conn);
kfree(srvdesc->cl);
return -EINVAL;
}

Expand All @@ -1081,7 +1095,7 @@ tfw_sched_ratio_srvdesc_setup(TfwSrvGroup *sg, TfwRatio *ratio)
int r;
size_t si = 0;
TfwServer *srv;
TfwRatioSrvDesc *srvdesc = ratio->srvdesc;
TfwRatioSrvDesc *srvdesc = rcu_dereference_bh_check(ratio->srvdesc, 1);

list_for_each_entry(srv, &sg->srv_list, list) {
if (unlikely((si++ == sg->srv_n) || !srv->conn_n
Expand Down Expand Up @@ -1277,10 +1291,17 @@ static void
tfw_sched_ratio_put_srv_data(struct rcu_head *rcu)
{
TfwRatioSrvDesc *srvdesc = container_of(rcu, TfwRatioSrvDesc, rcu);
kfree(srvdesc->conn);
kfree(srvdesc->cl);
kfree(srvdesc);
}

static void
tfw_sched_ratio_put_conn_data(struct rcu_head *rcu)
{
TfwRatioSrvConnList *cl = container_of(rcu, TfwRatioSrvConnList, rcu);
kfree(cl);
}

static void
tfw_sched_ratio_del_srv(TfwServer *srv)
{
Expand All @@ -1291,13 +1312,48 @@ tfw_sched_ratio_del_srv(TfwServer *srv)
call_rcu(&srvdesc->rcu, tfw_sched_ratio_put_srv_data);
}

static int
tfw_sched_ratio_upd_srv(TfwServer *srv)
{
TfwRatioSrvDesc *srvdesc = rcu_dereference_bh_check(srv->sched_data, 1);
size_t size, ci = 0;
TfwRatioSrvConnList *cl_copy;
TfwRatioSrvConnList *cl = rcu_dereference_bh_check(srvdesc->cl, 1);
TfwSrvConn *srv_conn;

size = sizeof(TfwRatioSrvConnList) + sizeof(TfwSrvConn *) * srv->conn_n;
if (!(cl_copy = kzalloc(size, GFP_ATOMIC)))
return -ENOMEM;

list_for_each_entry(srv_conn, &srv->conn_list, list) {
if (tfw_srv_conn_unscheduled(srv_conn))
continue;
if (unlikely(ci++ == srv->conn_n))
goto err;
cl_copy->conns[ci-1] = srv_conn;
}
if (unlikely(ci != srv->conn_n))
goto err;
cl->conn_n = ci;
rcu_assign_pointer(srvdesc->cl, cl_copy);

if (srvdesc)
call_rcu(&cl->rcu, tfw_sched_ratio_put_conn_data);

return 0;
err:
kfree(cl_copy);
return -EINVAL;
}

static TfwScheduler tfw_sched_ratio = {
.name = "ratio",
.list = LIST_HEAD_INIT(tfw_sched_ratio.list),
.add_grp = tfw_sched_ratio_add_grp,
.del_grp = tfw_sched_ratio_del_grp,
.add_srv = tfw_sched_ratio_add_srv,
.del_srv = tfw_sched_ratio_del_srv,
.upd_srv = tfw_sched_ratio_upd_srv,
.sched_sg_conn = tfw_sched_ratio_sched_sg_conn,
.sched_srv_conn = tfw_sched_ratio_sched_srv_conn,
};
Expand Down
Loading