From c4c79ea74e105be8d4a59730be94592fcebaf26c Mon Sep 17 00:00:00 2001 From: Aleksey Baulin Date: Thu, 20 Oct 2016 12:05:56 +0300 Subject: [PATCH] Re-sending of requests within a restored server connection. (#419) This commit implements the logic of re-sending requests that were in the server connection's queue when the connection failed. When the connection is restored, it's not scheduled until all requests in the forwarding queue are re-sent or sent to the server. --- tempesta_fw/connection.c | 9 + tempesta_fw/connection.h | 61 +++- tempesta_fw/http.c | 512 ++++++++++++++++++----------- tempesta_fw/sched/tfw_sched_hash.c | 8 +- tempesta_fw/sched/tfw_sched_rr.c | 6 +- tempesta_fw/sock_srv.c | 7 +- 6 files changed, 391 insertions(+), 212 deletions(-) diff --git a/tempesta_fw/connection.c b/tempesta_fw/connection.c index f8bd72140..f7f5ac23c 100644 --- a/tempesta_fw/connection.c +++ b/tempesta_fw/connection.c @@ -58,6 +58,15 @@ tfw_connection_new(TfwConnection *conn) return TFW_CONN_HOOK_CALL(conn, conn_init); } +/** + * Call connection repairing via TfwConnHooks. + */ +void +tfw_connection_repair(TfwConnection *conn) +{ + TFW_CONN_HOOK_CALL(conn, conn_repair); +} + /** * Publish the "connection is dropped" event via TfwConnHooks. */ diff --git a/tempesta_fw/connection.h b/tempesta_fw/connection.h index fae22d979..d0c81f7e0 100644 --- a/tempesta_fw/connection.h +++ b/tempesta_fw/connection.h @@ -79,36 +79,51 @@ enum { * @msg_queue - queue of messages to be sent over the connection; * @nip_queue - queue of non-idempotent messages within @msg_queue; * @msg_qlock - lock for accessing @msg_queue; + * @flags - various atomic flags related to connection's state; * @refcnt - number of users of the connection structure instance; - * @nipcnt - number of non-idempotent requests in the connection; * @timer - The keep-alive/retry timer for the connection; * @msg - message that is currently being processed; * @msg_sent - message that was sent last in the connection; + * @msg_resent - message that was re-sent last in the connection; * @peer - TfwClient or TfwServer handler; * @sk - an appropriate sock handler; * @destructor - called when a connection is destroyed; + * @forward - called when a request is forwarded to server; */ -typedef struct { +typedef struct tfw_connection_t { SsProto proto; TfwGState state; struct list_head list; struct list_head msg_queue; - struct list_head nip_queue; + struct list_head nip_queue; /*srv*/ spinlock_t msg_qlock; + unsigned long flags; /*srv*/ atomic_t refcnt; - atomic_t nipcnt; struct timer_list timer; TfwMsg *msg; - TfwMsg *msg_sent; + TfwMsg *msg_sent; /*srv*/ + TfwMsg *msg_resent; /*srv*/ TfwPeer *peer; struct sock *sk; void (*destructor)(void *); + void (*forward)(struct tfw_connection_t *); /*srv*/ } TfwConnection; #define TFW_CONN_DEATHCNT (INT_MIN / 2) #define TFW_CONN_TYPE(c) ((c)->proto.type) +/* Connection flags are defined by the bit number. */ +enum { + TFW_CONN_B_RESEND = 0, /* Need to re-send requests. */ + TFW_CONN_B_QFORWD, /* Need to forward requests in the queue. */ + TFW_CONN_B_HASNIP, /* Has non-idempotent requests. */ +}; + +#define TFW_CONN_F_RESEND (1 << TFW_CONN_B_RESEND) +#define TFW_CONN_F_QFORWD (1 << TFW_CONN_B_QFORWD) +#define TFW_CONN_F_HASNIP (1 << TFW_CONN_B_HASNIP) + /** * TLS hardened connection. */ @@ -129,6 +144,14 @@ typedef struct { */ int (*conn_init)(TfwConnection *conn); + /* + * Called when a new connection is initialized and before + * the initialization is complete. Makes sense only for + * server connections. Used to re-send requests that were + * left in the connection queue. + */ + void (*conn_repair)(TfwConnection *conn); + /* * Called when closing a connection (client or server, * as in conn_init()). This is required for modules that @@ -159,6 +182,25 @@ extern TfwConnHooks *conn_hooks[TFW_CONN_MAX_PROTOS]; #define TFW_CONN_HOOK_CALL(c, f...) \ tfw_conn_hook_call(TFW_CONN_TYPE2IDX(TFW_CONN_TYPE(c)), c, f) +/* + * Tell if a connection is restricted. When restricted, a connection + * cannot be scheduled. + */ +static inline bool +tfw_connection_restricted(TfwConnection *conn) +{ + return test_bit(TFW_CONN_B_RESEND, &conn->flags); +} + +/* + * Tell if a connection has non-idempotent requests. + */ +static inline bool +tfw_connection_hasnip(TfwConnection *conn) +{ + return test_bit(TFW_CONN_B_HASNIP, &conn->flags); +} + static inline bool tfw_connection_live(TfwConnection *conn) { @@ -172,8 +214,8 @@ tfw_connection_get(TfwConnection *conn) } /** - * Increment reference counter and return true if @conn isn't in failovering - * process, i.e. @refcnt > 0. + * Increment reference counter and return true if @conn isi not in + * failovering process, i.e. @refcnt wasn't less or equal to zero. */ static inline bool tfw_connection_get_if_live(TfwConnection *conn) @@ -295,7 +337,9 @@ tfw_connection_validate_cleanup(TfwConnection *conn) BUG_ON(!conn); BUG_ON(!list_empty(&conn->list)); - BUG_ON(!list_empty(&conn->msg_queue)); + BUG_ON((TFW_CONN_TYPE(conn) & Conn_Clnt) + && !list_empty(&conn->msg_queue)); + BUG_ON(atomic_read(&conn->refcnt) & ~1); BUG_ON(conn->msg); rc = atomic_read(&conn->refcnt); @@ -311,6 +355,7 @@ void tfw_connection_init(TfwConnection *conn); void tfw_connection_link_peer(TfwConnection *conn, TfwPeer *peer); int tfw_connection_new(TfwConnection *conn); +void tfw_connection_repair(TfwConnection *conn); void tfw_connection_drop(TfwConnection *conn); void tfw_connection_release(TfwConnection *conn); diff --git a/tempesta_fw/http.c b/tempesta_fw/http.c index 433a5c9ae..923879a33 100644 --- a/tempesta_fw/http.c +++ b/tempesta_fw/http.c @@ -376,37 +376,266 @@ tfw_http_req_is_nonidempotent(TfwHttpReq *req) return (req->flags & TFW_HTTP_NON_IDEMP); } +/* + * Set the request @req in server connection @srv_conn as idempotent. + */ static inline void -__tfw_http_req_flip_nonidempotent(TfwConnection *srv_conn, TfwHttpReq *req) +__tfw_http_req_set_idempotent(TfwConnection *srv_conn, TfwHttpReq *req) { list_del_init(&req->nip_list); - atomic_dec(&srv_conn->nipcnt); + if (list_empty(&srv_conn->nip_queue)) + clear_bit(TFW_CONN_B_HASNIP, &srv_conn->flags); } /* - * Flip a non-idempotent request. If @req in server connection @srv_conn - * is non-idempotent, then make it idempotent. + * If @req in server connection @srv_conn is non-idempotent, then set it + * as idempotent. */ static inline void -tfw_http_req_flip_nonidempotent(TfwConnection *srv_conn, TfwHttpReq *req) +tfw_http_req_flip_if_nonidempotent(TfwConnection *srv_conn, TfwHttpReq *req) { if (!list_empty(&req->nip_list)) - __tfw_http_req_flip_nonidempotent(srv_conn, req); + __tfw_http_req_set_idempotent(srv_conn, req); } /* * If a request on the list of non-idempotent requests in server - * connection @srv_conn had become an idempotent request, then flip it - * and make it idempotent. + * connection @srv_conn had become idempotent, then set it as idempotent. */ static inline void -tfw_http_conn_flip_nonidempotent(TfwConnection *srv_conn) +tfw_http_conn_flip_if_nonidempotent(TfwConnection *srv_conn) { TfwHttpReq *req, *tmp; list_for_each_entry_safe(req, tmp, &srv_conn->nip_queue, nip_list) if (!tfw_http_req_is_nonidempotent(req)) - __tfw_http_req_flip_nonidempotent(srv_conn, req); + __tfw_http_req_set_idempotent(srv_conn, req); +} + +/* + * Set the request @req in server connection @srv_conn as non-idempotent. + */ +static inline void +__tfw_http_req_set_nonidempotent(TfwConnection *srv_conn, TfwHttpReq *req) +{ + list_add_tail(&req->nip_list, &srv_conn->nip_queue); + set_bit(TFW_CONN_B_HASNIP, &srv_conn->flags); +} + +/* + * Set the request @req in server connection @srv_conn is idempotent, + * then set it as non-idempotent. + */ +static inline void +tfw_http_req_flip_if_idempotent(TfwConnection *srv_conn, TfwHttpReq *req) +{ + if (list_empty(&req->nip_list)) + __tfw_http_req_set_nonidempotent(srv_conn, req); +} + +/* + * Tell if the server connection's forwarding queue is on hold. + * It's on hold it the request that was sent last was non-idempotent. + */ +static inline bool +tfw_http_conn_on_hold(TfwConnection *srv_conn) +{ + TfwHttpReq *req = (TfwHttpReq *)srv_conn->msg_sent; + + BUG_ON(!(TFW_CONN_TYPE(srv_conn) & Conn_Srv)); + return (req && tfw_http_req_is_nonidempotent(req)); +} + +/* + * Tell if the server connection's forwarding queue is drained. + * It's drained if there're no requests in the queue after the + * request that was sent last. + */ +static inline bool +tfw_http_conn_drained(TfwConnection *srv_conn) +{ + TfwMsg *lmsg; + + BUG_ON(!(TFW_CONN_TYPE(srv_conn) & Conn_Srv)); + if (list_empty(&srv_conn->msg_queue)) { + TFW_DBG2("%s: Empty: srv_conn=[%p]\n", __func__, srv_conn); + return true; + } + if (!srv_conn->msg_sent) { + TFW_DBG2("%s: None sent: srv_conn=[%p]\n", __func__, srv_conn); + return false; + } + lmsg = list_last_entry(&srv_conn->msg_queue, TfwMsg, seq_list); + if (srv_conn->msg_sent == lmsg) + return true; + TFW_DBG2("%s: Some not sent: srv_conn=[%p]\n", __func__, srv_conn); + return false; +} + +static inline bool +tfw_http_conn_req_need_fwd(TfwConnection *srv_conn) +{ + return (!tfw_http_conn_on_hold(srv_conn) + && !tfw_http_conn_drained(srv_conn)); +} + +/* + * Delete requests that were not forwarded due to an error. Send an + * error response to a client. The response will be attached to the + * request and sent to the client in proper seq order. + */ +static void +tfw_http_req_zap_error(struct list_head *err_queue) +{ + TfwHttpReq *req, *tmp; + + TFW_DBG2("%s: queue is %sempty\n", + __func__, list_empty(err_queue) ? "" : "NOT "); + + list_for_each_entry_safe(req, tmp, err_queue, msg.fwd_list) { + list_del_init(&req->msg.fwd_list); + tfw_http_send_500(req); + } +} + +/* + * Forward requests in the server connection @srv_conn. The requests + * are forwarded until a non-idempotent request is found in the queue. + * Must be called with a lock on the server connection's @msg_queue. + */ +static void +__tfw_http_req_fwd_stalled(TfwConnection *srv_conn, struct list_head *err_queue) +{ + TfwHttpReq *req, *tmp; + struct list_head *fwd_queue = &srv_conn->msg_queue; + + TFW_DBG2("%s: conn = %p\n", __func__, srv_conn); + + /* + * Process the server connection's queue of pending requests. + * The queue is locked against concurrent updates: inserts of + * outgoing requests, or closing of the server connection. Do + * it as fast as possible by moving failed requests to other + * queues that can be processed without the lock. + */ + req = srv_conn->msg_sent + ? (TfwHttpReq *)list_next_entry(srv_conn->msg_sent, fwd_list) + : (TfwHttpReq *)list_first_entry(fwd_queue, TfwMsg, fwd_list); + list_for_each_entry_safe_from(req, tmp, fwd_queue, msg.fwd_list) { + /* + * If unable to send to the server connection due to + * an error, then move the request to @err_queue for + * sending a 500 error response later. That is safe + * as the response will be sent in proper seq order. + */ + if (tfw_connection_send(srv_conn, (TfwMsg *)req)) { + tfw_http_req_flip_if_nonidempotent(srv_conn, req); + list_move_tail(&req->msg.fwd_list, err_queue); + TFW_DBG2("%s: Error sending to server connection: " + "conn=[%p] req=[%p]\n", + __func__, srv_conn, req); + continue; + } + srv_conn->msg_sent = (TfwMsg *)req; + /* Stop sending if the request is non-idempotent. */ + if (tfw_http_req_is_nonidempotent(req)) + break; + /* See if the request has become idempotent. */ + tfw_http_req_flip_if_nonidempotent(srv_conn, req); + } +} + +/* + * Forward stalled requests in server connection @srv_conn. + * + * This function expects that the queue in the server connection + * is locked. The queue in unlocked inside the function which is + * very non-traditional. Please use with caution. + */ +static void +tfw_http_req_fwd_stalled(TfwConnection *srv_conn) +{ + LIST_HEAD(err_queue); + + TFW_DBG2("%s: conn = %p\n", __func__, srv_conn); + WARN_ON(!spin_is_locked(&srv_conn->msg_qlock)); + BUG_ON(list_empty(&srv_conn->msg_queue)); + + __tfw_http_req_fwd_stalled(srv_conn, &err_queue); + spin_unlock(&srv_conn->msg_qlock); + + if (!list_empty(&err_queue)) + tfw_http_req_zap_error(&err_queue); +} + +static void +__tfw_http_req_fwd_resend(TfwConnection *srv_conn, + bool one_msg, struct list_head *err_queue) +{ + TfwHttpReq *req, *tmp; + struct list_head *end, *fwd_queue = &srv_conn->msg_queue; + + TFW_DBG2("%s: conn = %p\n", __func__, srv_conn); + BUG_ON(!srv_conn->msg_sent); + + req = srv_conn->msg_resent + ? (TfwHttpReq *)list_next_entry(srv_conn->msg_resent, fwd_list) + : (TfwHttpReq *)list_first_entry(fwd_queue, TfwMsg, fwd_list); + end = srv_conn->msg_sent->fwd_list.next; + + /* An equivalent of list_for_each_entry_safe_from() */ + for (tmp = list_next_entry(req, msg.fwd_list); + &req->msg.fwd_list != end; + req = tmp, tmp = list_next_entry(tmp, msg.fwd_list)) + { + if (tfw_connection_send(srv_conn, (TfwMsg *)req)) { + tfw_http_req_flip_if_nonidempotent(srv_conn, req); + list_move_tail(&req->msg.fwd_list, err_queue); + TFW_DBG2("%s: Error sending to server connection: " + "conn=[%p] req=[%p]\n", + __func__, srv_conn, req); + continue; + } + srv_conn->msg_resent = (TfwMsg *)req; + if (unlikely(one_msg)) + break; + } +} + +static void +__tfw_http_req_fwd_qforwd(TfwConnection *srv_conn, struct list_head *err_queue) +{ + __tfw_http_req_fwd_stalled(srv_conn, err_queue); + if (list_empty(&srv_conn->msg_queue)) { + srv_conn->forward = tfw_http_req_fwd_stalled; + clear_bit(TFW_CONN_B_QFORWD, &srv_conn->flags); + clear_bit(TFW_CONN_B_RESEND, &srv_conn->flags); + } +} + +static void +tfw_http_req_fwd_repair(TfwConnection *srv_conn) +{ + LIST_HEAD(err_queue); + + TFW_DBG2("%s: conn = %p\n", __func__, srv_conn); + WARN_ON(!spin_is_locked(&srv_conn->msg_qlock)); + BUG_ON(list_empty(&srv_conn->msg_queue)); + + if (test_bit(TFW_CONN_B_QFORWD, &srv_conn->flags)) { + __tfw_http_req_fwd_qforwd(srv_conn, &err_queue); + } else { + if (!srv_conn->msg_sent) + __tfw_http_req_fwd_resend(srv_conn, false, &err_queue); + if (srv_conn->msg_resent == srv_conn->msg_sent) { + set_bit(TFW_CONN_B_QFORWD, &srv_conn->flags); + __tfw_http_req_fwd_qforwd(srv_conn, &err_queue); + } + } + spin_unlock(&srv_conn->msg_qlock); + + if (!list_empty(&err_queue)) + tfw_http_req_zap_error(&err_queue); } /* @@ -431,8 +660,8 @@ tfw_http_conn_msg_alloc(TfwConnection *conn) TfwHttpReq *req; spin_lock(&conn->msg_qlock); - req = (TfwHttpReq *)list_first_entry_or_null(&conn->msg_queue, - TfwMsg, fwd_list); + req = list_first_entry_or_null(&conn->msg_queue, + TfwHttpReq, msg.fwd_list); spin_unlock(&conn->msg_qlock); if (req && (req->method == TFW_HTTP_METH_HEAD)) hm->flags |= TFW_HTTP_VOID_BODY; @@ -442,19 +671,6 @@ tfw_http_conn_msg_alloc(TfwConnection *conn) return (TfwMsg *)hm; } -void -tfw_http_req_destruct(void *msg) -{ - TfwHttpReq *req = msg; - - BUG_ON(!list_empty(&req->msg.seq_list)); - BUG_ON(!list_empty(&req->msg.fwd_list)); - BUG_ON(!list_empty(&req->nip_list)); - - if (req->sess) - tfw_http_sess_put(req->sess); -} - /* * Free an HTTP message. * Also, free the connection structure if there's no more references. @@ -492,6 +708,37 @@ tfw_http_conn_msg_free(TfwHttpMsg *hm) tfw_http_msg_free(hm); } +/* + * Find requests in the server's connection queue that were forwarded + * to the server. These are unanswered requests. According to RFC 7230 + * 6.3.2, "a client MUST NOT pipeline immediately after connection + * establishment". To address that, re-send the first request to the + * server. When a response comes, that will trigger resending of the + * rest of those unanswered requests. + */ +static void +tfw_http_conn_repair(TfwConnection *srv_conn) +{ + LIST_HEAD(err_queue); + + TFW_DBG2("%s: conn = %p\n", __func__, srv_conn); + BUG_ON(!(TFW_CONN_TYPE(srv_conn) & Conn_Srv)); + BUG_ON(!tfw_connection_restricted(srv_conn)); + + /* Resend the first unanswered request. */ + spin_lock(&srv_conn->msg_qlock); + if (!srv_conn->msg_sent) + __tfw_http_req_fwd_resend(srv_conn, true, &err_queue); + if (!srv_conn->msg_resent) { + set_bit(TFW_CONN_B_QFORWD, &srv_conn->flags); + __tfw_http_req_fwd_qforwd(srv_conn, &err_queue); + } + spin_unlock(&srv_conn->msg_qlock); + + if (!list_empty(&err_queue)) + tfw_http_req_zap_error(&err_queue); +} + /* * Connection with a peer is created. * @@ -502,21 +749,38 @@ static int tfw_http_conn_init(TfwConnection *conn) { if (TFW_CONN_TYPE(conn) & Conn_Srv) { - atomic_set(&conn->nipcnt, 0); - INIT_LIST_HEAD(&conn->nip_queue); + if (list_empty(&conn->msg_queue)) { + conn->forward = tfw_http_req_fwd_stalled; + } else { + conn->msg_resent = NULL; + conn->forward = tfw_http_req_fwd_repair; + set_bit(TFW_CONN_B_RESEND, &conn->flags); + } } tfw_gfsm_state_init(&conn->state, conn, TFW_HTTP_FSM_INIT); return 0; } +void +tfw_http_req_destruct(void *msg) +{ + TfwHttpReq *req = msg; + + BUG_ON(!list_empty(&req->msg.seq_list)); + BUG_ON(!list_empty(&req->msg.fwd_list)); + BUG_ON(!list_empty(&req->nip_list)); + + if (req->sess) + tfw_http_sess_put(req->sess); +} + /* * Connection with a peer is released. * * For server connections the requests that were sent to that server are * kept in the queue until a paired response comes. That will never happen - * now, and requests will remain unanswered. For each request in the queue - * send an error response to the corresponding client connection. Both the - * request and the response will be freed when the response is sent out. + * now. Keep the queue. When the connection is restored the requests will + * be re-sent to the server. * * Called when a connection is released. There are no users at that time, * so locks are not needed. @@ -524,21 +788,11 @@ tfw_http_conn_init(TfwConnection *conn) static void tfw_http_conn_release(TfwConnection *srv_conn) { - TfwHttpReq *req, *tmp; - struct list_head *fwd_queue = &srv_conn->msg_queue; - TFW_DBG2("%s: conn = %p\n", __func__, srv_conn); BUG_ON(!(TFW_CONN_TYPE(srv_conn) & Conn_Srv)); - list_for_each_entry_safe(req, tmp, fwd_queue, msg.fwd_list) { - BUG_ON(req->conn && (req->conn == srv_conn)); - list_del_init(&req->msg.fwd_list); - tfw_http_req_flip_nonidempotent(srv_conn, req); - tfw_http_send_404(req); - TFW_INC_STAT_BH(clnt.msgs_otherr); - } - BUG_ON(atomic_read(&srv_conn->nipcnt) != 0); - BUG_ON(!list_empty(&srv_conn->nip_queue)); + clear_bit(TFW_CONN_B_QFORWD, &srv_conn->flags); + clear_bit(TFW_CONN_B_RESEND, &srv_conn->flags); } /* @@ -559,6 +813,7 @@ tfw_http_conn_cli_drop(TfwConnection *cli_conn) { TfwHttpMsg *hmreq, *tmp; struct list_head *seq_queue = &cli_conn->msg_queue; + LIST_HEAD(zap_queue); TFW_DBG2("%s: conn = %p\n", __func__, cli_conn); BUG_ON(!(TFW_CONN_TYPE(cli_conn) & Conn_Clnt)); @@ -567,10 +822,11 @@ tfw_http_conn_cli_drop(TfwConnection *cli_conn) return; spin_lock(&cli_conn->msg_qlock); - list_for_each_entry_safe(hmreq, tmp, seq_queue, msg.seq_list) { - list_del_init(&hmreq->msg.seq_list); - } + list_splice_tail_init(seq_queue, &zap_queue); spin_unlock(&cli_conn->msg_qlock); + + list_for_each_entry_safe(hmreq, tmp, &zap_queue, msg.seq_list) + list_del_init(&hmreq->msg.seq_list); } /* @@ -864,141 +1120,6 @@ tfw_http_adjust_resp(TfwHttpResp *resp, TfwHttpReq *req) TFW_HTTP_HDR_SERVER, 0); } -/* - * Tell if the server connection's forwarding queue is on hold. - * It's on hold it the request that was sent last was non-idempotent. - */ -static inline bool -__tfw_http_conn_on_hold(TfwConnection *srv_conn) -{ - TfwHttpReq *req = (TfwHttpReq *)srv_conn->msg_sent; - - BUG_ON(!(TFW_CONN_TYPE(srv_conn) & Conn_Srv)); - return (req && tfw_http_req_is_nonidempotent(req)); -} - -/* - * Tell if the server connection's forwarding queue is drained. - * It's drained if there're no requests in the queue after the - * request that was sent last. - */ -static inline bool -__tfw_http_conn_drained(TfwConnection *srv_conn) -{ - TfwMsg *lmsg; - - BUG_ON(!(TFW_CONN_TYPE(srv_conn) & Conn_Srv)); - if (list_empty(&srv_conn->msg_queue)) { - TFW_DBG2("%s: Empty: srv_conn=[%p]\n", __func__, srv_conn); - return true; - } - if (!srv_conn->msg_sent) { - TFW_DBG2("%s: None sent: srv_conn=[%p]\n", __func__, srv_conn); - return false; - } - lmsg = list_last_entry(&srv_conn->msg_queue, TfwMsg, seq_list); - if (srv_conn->msg_sent == lmsg) - return true; - TFW_DBG2("%s: Some not sent: srv_conn=[%p]\n", __func__, srv_conn); - return false; -} - -static inline bool -__tfw_http_conn_req_need_fwd(TfwConnection *srv_conn) -{ - return (!__tfw_http_conn_on_hold(srv_conn) - && !__tfw_http_conn_drained(srv_conn)); -} - -/* - * Delete requests that were not forwarded due to an error. Send an - * error response to a client. The response will be attached to the - * request and sent to the client in proper seq order. - */ -static void -tfw_http_req_zap_error(struct list_head *err_queue) -{ - TfwHttpReq *req, *tmp; - - TFW_DBG2("%s: queue is %sempty\n", - __func__, list_empty(err_queue) ? "" : "NOT "); - - list_for_each_entry_safe(req, tmp, err_queue, msg.fwd_list) { - list_del_init(&req->msg.fwd_list); - tfw_http_send_500(req); - } -} - -/* - * Forward requests in server connection @srv_conn. The requests are - * forwarded until a non-idempotent request is found in the queue. - * Must be called with a lock on the server connection's @msg_queue. - */ -static void -__tfw_http_req_fwd_many(TfwConnection *srv_conn, struct list_head *err_queue) -{ - TfwHttpReq *req, *tmp; - struct list_head *fwd_queue = &srv_conn->msg_queue; - - TFW_DBG2("%s: conn = %p\n", __func__, srv_conn); - BUG_ON(list_empty(fwd_queue)); - - /* - * Process the server connection's queue of pending requests. - * The queue is locked against concurrent updates: inserts of - * outgoing requests, or closing of the server connection. Do - * it as fast as possible by moving failed requests to other - * queues that can be processed without the lock. - */ - req = srv_conn->msg_sent - ? (TfwHttpReq *)list_next_entry(srv_conn->msg_sent, fwd_list) - : (TfwHttpReq *)list_first_entry(fwd_queue, TfwMsg, fwd_list); - list_for_each_entry_safe_from(req, tmp, fwd_queue, msg.fwd_list) { - /* - * If unable to send to the server connection due to - * an error, then move the request to @err_queue for - * sending a 500 error response later. That is safe - * as the response will be sent in proper seq order. - */ - if (tfw_connection_send(srv_conn, (TfwMsg *)req)) { - list_move_tail(&req->msg.fwd_list, err_queue); - TFW_DBG2("%s: Error sending to server connection: " - "conn=[%p] req=[%p]\n", - __func__, srv_conn, req); - continue; - } - srv_conn->msg_sent = (TfwMsg *)req; - /* Stop sending if the request is non-idempotent. */ - if (tfw_http_req_is_nonidempotent(req)) - break; - tfw_http_req_flip_nonidempotent(srv_conn, req); - } -} - -/* - * Forward stalled requests in server connection @srv_conn. - * - * This function expect that the queue in the server connection - * is locked. The queue in unlocked inside the function which is - * very non-traditional. Please use with caution. - */ -static void -__tfw_http_req_fwd_stalled(TfwConnection *srv_conn) -{ - struct list_head err_queue; - - TFW_DBG2("%s: conn = %p\n", __func__, srv_conn); - BUG_ON(!spin_is_locked(&srv_conn->msg_qlock)); - - INIT_LIST_HEAD(&err_queue); - - __tfw_http_req_fwd_many(srv_conn, &err_queue); - spin_unlock(&srv_conn->msg_qlock); - - if (!list_empty(&err_queue)) - tfw_http_req_zap_error(&err_queue); -} - /* * Forward the request @req to server connection @srv_conn. * @@ -1022,13 +1143,11 @@ tfw_http_req_fwd(TfwConnection *srv_conn, TfwHttpReq *req) BUG_ON(!(TFW_CONN_TYPE(srv_conn) & Conn_Srv)); spin_lock(&srv_conn->msg_qlock); - drained = __tfw_http_conn_drained(srv_conn); + drained = tfw_http_conn_drained(srv_conn); list_add_tail(&req->msg.fwd_list, &srv_conn->msg_queue); - if (tfw_http_req_is_nonidempotent(req)) { - list_add_tail(&req->nip_list, &srv_conn->nip_queue); - atomic_inc(&srv_conn->nipcnt); - } - if (__tfw_http_conn_on_hold(srv_conn)) { + if (tfw_http_req_is_nonidempotent(req)) + __tfw_http_req_set_nonidempotent(srv_conn, req); + if (tfw_http_conn_on_hold(srv_conn)) { spin_unlock(&srv_conn->msg_qlock); TFW_DBG2("%s: Server connection is on hold: conn=[%p]\n", __func__, srv_conn); @@ -1037,13 +1156,13 @@ tfw_http_req_fwd(TfwConnection *srv_conn, TfwHttpReq *req) if (!drained) { TFW_DBG2("%s: Server connection is not drained: conn=[%p]\n", __func__, srv_conn); + tfw_http_req_fwd_stalled(srv_conn); /* The queue is unlocked inside the function. */ - __tfw_http_req_fwd_stalled(srv_conn); return; } if (tfw_connection_send(srv_conn, (TfwMsg *)req)) { list_del_init(&req->msg.fwd_list); - tfw_http_req_flip_nonidempotent(srv_conn, req); + tfw_http_req_flip_if_nonidempotent(srv_conn, req); spin_unlock(&srv_conn->msg_qlock); TFW_DBG2("%s: Error sending to server connection: " "conn=[%p] req=[%p]\n", __func__, srv_conn, req); @@ -1062,11 +1181,11 @@ tfw_http_resp_fwd(TfwHttpReq *req, TfwHttpResp *resp) { TfwHttpReq *tmp; TfwConnection *cli_conn = req->conn; - struct list_head out_queue, *seq_queue = &cli_conn->msg_queue; + struct list_head *seq_queue = &cli_conn->msg_queue; + LIST_HEAD(out_queue); TFW_DBG2("%s: req=[%p], resp=[%p]\n", __func__, req, resp); - INIT_LIST_HEAD(&out_queue); /* * Starting with the first request on the list, pick consecutive * requests that have a paired response. Remove those requests @@ -1543,17 +1662,17 @@ tfw_http_popreq(TfwHttpMsg *hmresp) } req = list_first_entry(fwd_queue, TfwHttpReq, msg.fwd_list); list_del_init(&req->msg.fwd_list); - if (srv_conn->msg_sent == (TfwMsg *)req) + if ((TfwMsg *)req == srv_conn->msg_sent) srv_conn->msg_sent = NULL; - tfw_http_req_flip_nonidempotent(srv_conn, req); - tfw_http_conn_flip_nonidempotent(srv_conn); + tfw_http_req_flip_if_nonidempotent(srv_conn, req); + tfw_http_conn_flip_if_nonidempotent(srv_conn); /* * If the server connection is no longer on hold, and the queue * is not drained, then forward pending requests to the server. - * Note: The queue is unlocked inside __tfw_http_req_fwd_stalled(). + * Note: The queue is unlocked inside srv_conn->forward(). */ - if (__tfw_http_conn_req_need_fwd(srv_conn)) - __tfw_http_req_fwd_stalled(srv_conn); + if (tfw_http_conn_req_need_fwd(srv_conn)) + srv_conn->forward(srv_conn); else spin_unlock(&srv_conn->msg_qlock); @@ -1886,6 +2005,7 @@ EXPORT_SYMBOL(tfw_http_req_key_calc); static TfwConnHooks http_conn_hooks = { .conn_init = tfw_http_conn_init, + .conn_repair = tfw_http_conn_repair, .conn_drop = tfw_http_conn_drop, .conn_release = tfw_http_conn_release, .conn_send = tfw_http_conn_send, diff --git a/tempesta_fw/sched/tfw_sched_hash.c b/tempesta_fw/sched/tfw_sched_hash.c index 43a40cbf0..913c7907b 100644 --- a/tempesta_fw/sched/tfw_sched_hash.c +++ b/tempesta_fw/sched/tfw_sched_hash.c @@ -145,15 +145,15 @@ tfw_sched_hash_get_srv_conn(TfwMsg *msg, TfwSrvGroup *sg) msg_hash = tfw_http_req_key_calc((TfwHttpReq *)msg); for (tries = 0; tries < __HLIST_SZ(TFW_SG_MAX_CONN); ++tries) { for (ch = sg->sched_data; ch->conn; ++ch) { + if (unlikely(tfw_connection_restricted(ch->conn)) + || unlikely(!tfw_connection_live(ch->conn))) + continue; curr_weight = msg_hash ^ ch->hash; - if (likely(tfw_connection_live(ch->conn)) - && curr_weight > best_weight) - { + if (curr_weight > best_weight) { best_weight = curr_weight; best_conn = ch->conn; } } - if (unlikely(!best_conn)) return NULL; if (tfw_connection_get_if_live(best_conn)) diff --git a/tempesta_fw/sched/tfw_sched_rr.c b/tempesta_fw/sched/tfw_sched_rr.c index 230af4cd8..dd675cd04 100644 --- a/tempesta_fw/sched/tfw_sched_rr.c +++ b/tempesta_fw/sched/tfw_sched_rr.c @@ -128,8 +128,10 @@ tfw_sched_rr_get_srv_conn(TfwMsg *msg, TfwSrvGroup *sg) for (c = 0; c < srv_cl->conn_n; ++c) { idx = atomic64_inc_return(&srv_cl->rr_counter); conn = srv_cl->conns[idx % srv_cl->conn_n]; - if (skipnip && atomic_read(&conn->nipcnt)) { - if (tfw_connection_live(conn)) + if (unlikely(tfw_connection_restricted(conn))) + continue; + if (skipnip && tfw_connection_hasnip(conn)) { + if (likely(tfw_connection_live(conn))) nipconn++; continue; } diff --git a/tempesta_fw/sock_srv.c b/tempesta_fw/sock_srv.c index 313326254..3543ee95f 100644 --- a/tempesta_fw/sock_srv.c +++ b/tempesta_fw/sock_srv.c @@ -308,8 +308,7 @@ tfw_sock_srv_connect_complete(struct sock *sk) tfw_connection_link_to_sk(conn, sk); /* Notify higher level layers. */ - r = tfw_connection_new(conn); - if (r) { + if ((r = tfw_connection_new(conn))) { TFW_ERR("conn_init() hook returned error\n"); return r; } @@ -317,6 +316,10 @@ tfw_sock_srv_connect_complete(struct sock *sk) /* Let schedulers use the connection hereafter. */ tfw_connection_revive(conn); + /* Repair the connection is necessary. */ + if (unlikely(tfw_connection_restricted(conn))) + tfw_connection_repair(conn); + __reset_retry_timer(srv_conn); TFW_DBG_ADDR("connected", &srv->addr);