diff --git a/tempesta_fw/connection.h b/tempesta_fw/connection.h index 8314a2435..b7d95d69c 100644 --- a/tempesta_fw/connection.h +++ b/tempesta_fw/connection.h @@ -77,14 +77,15 @@ enum { * @state - connection processing state; * @list - member in the list of connections with @peer; * @msg_queue - queue of messages to be sent over the connection; - * @nip_queue - queue of non-idempotent messages within @msg_queue; + * @nip_queue - queue of non-idempotent messages in server's @msg_queue; * @msg_qlock - lock for accessing @msg_queue; - * @flags - various atomic flags related to connection's state; + * @flags - atomic flags related to server connection's state; * @refcnt - number of users of the connection structure instance; + * @qsize - current number of requests in server's @msg_queue; * @timer - The keep-alive/retry timer for the connection; * @msg - message that is currently being processed; - * @msg_sent - message that was sent last in the connection; - * @msg_resent - message that was re-sent last in the connection; + * @msg_sent - message that was sent last in a server connection; + * @msg_resent - message that was re-sent last in a server connection; * @peer - TfwClient or TfwServer handler; * @sk - an appropriate sock handler; * @destructor - called when a connection is destroyed; @@ -99,6 +100,7 @@ typedef struct tfw_connection_t { spinlock_t msg_qlock; unsigned long flags; /*srv*/ atomic_t refcnt; + atomic_t qsize; /*srv*/ struct timer_list timer; TfwMsg *msg; TfwMsg *msg_sent; /*srv*/ @@ -106,7 +108,6 @@ typedef struct tfw_connection_t { TfwPeer *peer; struct sock *sk; void (*destructor)(void *); - void (*forward)(struct tfw_connection_t *); /*srv*/ } TfwConnection; #define TFW_CONN_DEATHCNT (INT_MIN / 2) diff --git a/tempesta_fw/http.c b/tempesta_fw/http.c index eeb76fda3..97c1b849e 100644 --- a/tempesta_fw/http.c +++ b/tempesta_fw/http.c @@ -458,45 +458,54 @@ tfw_http_conn_drained(TfwConnection *srv_conn) BUG_ON(!(TFW_CONN_TYPE(srv_conn) & Conn_Srv)); - if (list_empty(fwd_queue)) { - TFW_DBG2("%s: Empty: srv_conn=[%p]\n", __func__, srv_conn); + if (list_empty(fwd_queue)) return true; - } - if (!srv_conn->msg_sent) { - TFW_DBG2("%s: None sent: srv_conn=[%p]\n", __func__, srv_conn); + if (!srv_conn->msg_sent) return false; - } - if (srv_conn->msg_sent == list_last_entry(fwd_queue, TfwMsg, seq_list)) + if (srv_conn->msg_sent == list_last_entry(fwd_queue, TfwMsg, fwd_list)) return true; - - TFW_DBG2("%s: Some not sent: srv_conn=[%p]\n", __func__, srv_conn); return false; } static inline bool -tfw_http_conn_req_need_fwd(TfwConnection *srv_conn) +tfw_http_conn_need_fwd(TfwConnection *srv_conn) { return (!tfw_http_conn_on_hold(srv_conn) && !tfw_http_conn_drained(srv_conn)); } +static inline void +tfw_http_req_move2equeue(TfwConnection *srv_conn, TfwHttpReq *req, + struct list_head *equeue, unsigned short status) +{ + tfw_http_req_flip_if_nonidempotent(srv_conn, req); + list_move_tail(&req->msg.fwd_list, equeue); + req->rstatus = status; + atomic_dec(&srv_conn->qsize); +} + /* * Delete requests that were not forwarded due to an error. Send an * error response to a client. The response will be attached to the * request and sent to the client in proper seq order. */ static void -tfw_http_req_zap_error(struct list_head *err_queue) +tfw_http_req_zap_error(struct list_head *equeue) { TfwHttpReq *req, *tmp; TFW_DBG2("%s: queue is %sempty\n", __func__, list_empty(err_queue) ? "" : "NOT "); - list_for_each_entry_safe(req, tmp, err_queue, msg.fwd_list) { - list_del_init(&req->msg.fwd_list); - tfw_http_send_500(req); - } + list_for_each_entry_safe(req, tmp, equeue, msg.fwd_list) { + list_del_init(&req->msg.fwd_list); + if (req->rstatus == 500) + tfw_http_send_500(req); + else if (req->rstatus == 504) + tfw_http_send_504(req); + else + BUG(); + } } /* @@ -505,9 +514,10 @@ tfw_http_req_zap_error(struct list_head *err_queue) * Must be called with a lock on the server connection's @msg_queue. */ static void -__tfw_http_req_fwd_stalled(TfwConnection *srv_conn, struct list_head *err_queue) +__tfw_http_req_fwd_stalled(TfwConnection *srv_conn, struct list_head *equeue) { TfwHttpReq *req, *tmp; + TfwServer *srv = (TfwServer *)srv_conn->peer; struct list_head *fwd_queue = &srv_conn->msg_queue; TFW_DBG2("%s: conn=[%p]\n", __func__, srv_conn); @@ -523,6 +533,14 @@ __tfw_http_req_fwd_stalled(TfwConnection *srv_conn, struct list_head *err_queue) ? (TfwHttpReq *)list_next_entry(srv_conn->msg_sent, fwd_list) : (TfwHttpReq *)list_first_entry(fwd_queue, TfwMsg, fwd_list); list_for_each_entry_safe_from(req, tmp, fwd_queue, msg.fwd_list) { + unsigned long jtimeout = jiffies - req->jtstamp; + if (time_after(jtimeout, srv->qjtimeout)) { + TFW_DBG2("%s: Eviction: req=[%p] overdue=[%dms]\n", + __func__, req, + jiffies_to_msecs(jtimeout - srv->qjtimeout)); + tfw_http_req_move2equeue(srv_conn, req, equeue, 504); + continue; + } /* * If unable to send to the server connection due to * an error, then move the request to @err_queue for @@ -530,17 +548,18 @@ __tfw_http_req_fwd_stalled(TfwConnection *srv_conn, struct list_head *err_queue) * as the response will be sent in proper seq order. */ if (tfw_connection_send(srv_conn, (TfwMsg *)req)) { - tfw_http_req_flip_if_nonidempotent(srv_conn, req); - list_move_tail(&req->msg.fwd_list, err_queue); - TFW_DBG2("%s: Error sending to server connection: " - "conn=[%p] req=[%p]\n", + TFW_DBG2("%s: Forwarding error: conn=[%p] req=[%p]\n", __func__, srv_conn, req); + tfw_http_req_move2equeue(srv_conn, req, equeue, 500); continue; } srv_conn->msg_sent = (TfwMsg *)req; /* Stop sending if the request is non-idempotent. */ - if (tfw_http_req_is_nonidempotent(req)) + if (tfw_http_req_is_nonidempotent(req)) { + TFW_DBG2("%s: Break on non-idempotent: req=[%p]\n", + __func__, req); break; + } /* See if the request has become idempotent. */ tfw_http_req_flip_if_nonidempotent(srv_conn, req); } @@ -556,17 +575,17 @@ __tfw_http_req_fwd_stalled(TfwConnection *srv_conn, struct list_head *err_queue) static void tfw_http_req_fwd_stalled(TfwConnection *srv_conn) { - LIST_HEAD(err_queue); + LIST_HEAD(equeue); TFW_DBG2("%s: conn=[%p]\n", __func__, srv_conn); WARN_ON(!spin_is_locked(&srv_conn->msg_qlock)); BUG_ON(list_empty(&srv_conn->msg_queue)); - __tfw_http_req_fwd_stalled(srv_conn, &err_queue); + __tfw_http_req_fwd_stalled(srv_conn, &equeue); spin_unlock(&srv_conn->msg_qlock); - if (!list_empty(&err_queue)) - tfw_http_req_zap_error(&err_queue); + if (!list_empty(&equeue)) + tfw_http_req_zap_error(&equeue); } /* @@ -594,6 +613,7 @@ tfw_http_req_fwd(TfwConnection *srv_conn, TfwHttpReq *req) spin_lock(&srv_conn->msg_qlock); drained = tfw_http_conn_drained(srv_conn); list_add_tail(&req->msg.fwd_list, &srv_conn->msg_queue); + atomic_inc(&srv_conn->qsize); if (tfw_http_req_is_nonidempotent(req)) __tfw_http_req_set_nonidempotent(srv_conn, req); if (tfw_http_conn_on_hold(srv_conn)) { @@ -612,9 +632,10 @@ tfw_http_req_fwd(TfwConnection *srv_conn, TfwHttpReq *req) if (tfw_connection_send(srv_conn, (TfwMsg *)req)) { list_del_init(&req->msg.fwd_list); tfw_http_req_flip_if_nonidempotent(srv_conn, req); + atomic_dec(&srv_conn->qsize); spin_unlock(&srv_conn->msg_qlock); - TFW_DBG2("%s: Error sending to server connection: " - "conn=[%p] req=[%p]\n", __func__, srv_conn, req); + TFW_DBG2("%s: Forwarding error: conn=[%p] req=[%p]\n", + __func__, srv_conn, req); tfw_http_send_500(req); return; } @@ -624,9 +645,10 @@ tfw_http_req_fwd(TfwConnection *srv_conn, TfwHttpReq *req) static void __tfw_http_req_fwd_resend(TfwConnection *srv_conn, - bool one_msg, struct list_head *err_queue) + bool one_msg, struct list_head *equeue) { TfwHttpReq *req, *tmp; + TfwServer *srv = (TfwServer *)srv_conn->peer; struct list_head *end, *fwd_queue = &srv_conn->msg_queue; TFW_DBG2("%s: conn=[%p] one_msg=[%s]\n", @@ -642,12 +664,16 @@ __tfw_http_req_fwd_resend(TfwConnection *srv_conn, &req->msg.fwd_list != end; req = tmp, tmp = list_next_entry(tmp, msg.fwd_list)) { + if (req->retries++ >= srv->retry_max) { + TFW_DBG2("%s: Eviction: req=[%p] retries=[%d]\n", + __func__, req, req->retries); + tfw_http_req_move2equeue(srv_conn, req, equeue, 504); + continue; + } if (tfw_connection_send(srv_conn, (TfwMsg *)req)) { - tfw_http_req_flip_if_nonidempotent(srv_conn, req); - list_move_tail(&req->msg.fwd_list, err_queue); - TFW_DBG2("%s: Error sending to server connection: " - "conn=[%p] req=[%p]\n", + TFW_DBG2("%s: Forwarding error: conn=[%p] req=[%p]\n", __func__, srv_conn, req); + tfw_http_req_move2equeue(srv_conn, req, equeue, 500); continue; } srv_conn->msg_resent = (TfwMsg *)req; @@ -656,44 +682,36 @@ __tfw_http_req_fwd_resend(TfwConnection *srv_conn, } } -static void -__tfw_http_req_fwd_qforwd(TfwConnection *srv_conn, struct list_head *err_queue) -{ - TFW_DBG2("%s: conn=[%p]\n", __func__, srv_conn); - - __tfw_http_req_fwd_stalled(srv_conn, err_queue); - if (list_empty(&srv_conn->msg_queue)) { - srv_conn->forward = tfw_http_req_fwd_stalled; - clear_bit(TFW_CONN_B_QFORWD, &srv_conn->flags); - clear_bit(TFW_CONN_B_RESEND, &srv_conn->flags); - } -} - static void tfw_http_req_fwd_repair(TfwConnection *srv_conn) { - LIST_HEAD(err_queue); + LIST_HEAD(equeue); TFW_DBG2("%s: conn=[%p]\n", __func__, srv_conn); WARN_ON(!spin_is_locked(&srv_conn->msg_qlock)); - BUG_ON(list_empty(&srv_conn->msg_queue)); + BUG_ON(!(srv_conn->flags & (TFW_CONN_B_QFORWD | TFW_CONN_B_RESEND))); - if (test_bit(TFW_CONN_B_QFORWD, &srv_conn->flags)) { - __tfw_http_req_fwd_qforwd(srv_conn, &err_queue); + if (list_empty(&srv_conn->msg_queue)) { + clear_bit(TFW_CONN_B_QFORWD, &srv_conn->flags); + clear_bit(TFW_CONN_B_RESEND, &srv_conn->flags); + } else if (test_bit(TFW_CONN_B_QFORWD, &srv_conn->flags)) { + if (tfw_http_conn_need_fwd(srv_conn)) + __tfw_http_req_fwd_stalled(srv_conn, &equeue); } else { srv_conn->msg_resent = NULL; if (srv_conn->msg_sent) { - __tfw_http_req_fwd_resend(srv_conn, false, &err_queue); + __tfw_http_req_fwd_resend(srv_conn, false, &equeue); if (srv_conn->msg_resent != srv_conn->msg_sent) srv_conn->msg_sent = srv_conn->msg_resent; } set_bit(TFW_CONN_B_QFORWD, &srv_conn->flags); - __tfw_http_req_fwd_qforwd(srv_conn, &err_queue); + if (tfw_http_conn_need_fwd(srv_conn)) + __tfw_http_req_fwd_stalled(srv_conn, &equeue); } spin_unlock(&srv_conn->msg_qlock); - if (!list_empty(&err_queue)) - tfw_http_req_zap_error(&err_queue); + if (!list_empty(&equeue)) + tfw_http_req_zap_error(&equeue); } /* @@ -773,9 +791,9 @@ tfw_http_conn_msg_free(TfwHttpMsg *hm) * can be configured to re-schedule those requests as well. * * FIXME: It appears that a re-scheduled request should be put in a - * new server connection's queue according to its original timestamp, - * and NOT just added at the end of the queue. That will matter when - * eviction of old requests is implemented. + * new server connection's queue according to its original timestamp. + * It may matter as old requests are evicted. However, that is time + * consuming. For now just put them at the end of the queue. */ static void tfw_http_req_fwd_resched(TfwConnection *srv_conn) @@ -788,6 +806,7 @@ tfw_http_req_fwd_resched(TfwConnection *srv_conn) list_for_each_entry_safe(req, tmp, fwd_queue, msg.fwd_list) { list_del_init(&req->msg.fwd_list); + atomic_dec(&srv_conn->qsize); /* FIXME: Need config option. */ if (tfw_http_req_is_nonidempotent(req)) { __tfw_http_req_set_idempotent(srv_conn, req); @@ -803,6 +822,7 @@ tfw_http_req_fwd_resched(TfwConnection *srv_conn) tfw_http_send_404(req); TFW_INC_STAT_BH(clnt.msgs_otherr); } + BUG_ON(atomic_read(&srv_conn->qsize)); } /* @@ -816,7 +836,7 @@ tfw_http_req_fwd_resched(TfwConnection *srv_conn) static void tfw_http_conn_repair(TfwConnection *srv_conn) { - LIST_HEAD(err_queue); + LIST_HEAD(equeue); TFW_DBG2("%s: conn=[%p]\n", __func__, srv_conn); BUG_ON(!(TFW_CONN_TYPE(srv_conn) & Conn_Srv)); @@ -827,23 +847,23 @@ tfw_http_conn_repair(TfwConnection *srv_conn) tfw_http_req_fwd_resched(srv_conn); return; } - /* Re-send the first unanswered request. */ spin_lock(&srv_conn->msg_qlock); srv_conn->msg_resent = NULL; if (srv_conn->msg_sent) { - __tfw_http_req_fwd_resend(srv_conn, true, &err_queue); + __tfw_http_req_fwd_resend(srv_conn, true, &equeue); if (!srv_conn->msg_resent) srv_conn->msg_sent = NULL; } if (!srv_conn->msg_resent) { set_bit(TFW_CONN_B_QFORWD, &srv_conn->flags); - __tfw_http_req_fwd_qforwd(srv_conn, &err_queue); + if (tfw_http_conn_need_fwd(srv_conn)) + __tfw_http_req_fwd_stalled(srv_conn, &equeue); } spin_unlock(&srv_conn->msg_qlock); - if (!list_empty(&err_queue)) - tfw_http_req_zap_error(&err_queue); + if (!list_empty(&equeue)) + tfw_http_req_zap_error(&equeue); } /* @@ -858,12 +878,8 @@ tfw_http_conn_init(TfwConnection *conn) TFW_DBG2("%s: conn=[%p]\n", __func__, conn); if (TFW_CONN_TYPE(conn) & Conn_Srv) { - if (list_empty(&conn->msg_queue)) { - conn->forward = tfw_http_req_fwd_stalled; - } else { - conn->forward = tfw_http_req_fwd_repair; + if (!list_empty(&conn->msg_queue)) set_bit(TFW_CONN_B_RESEND, &conn->flags); - } } tfw_gfsm_state_init(&conn->state, conn, TFW_HTTP_FSM_INIT); return 0; @@ -1256,7 +1272,7 @@ tfw_http_resp_fwd(TfwHttpReq *req, TfwHttpResp *resp) spin_lock(&cli_conn->msg_qlock); if (list_empty(seq_queue)) { spin_unlock(&cli_conn->msg_qlock); - TFW_DBG2("%s: Missing client requests: conn=[%p]\n", + TFW_DBG2("%s: The client's request missing: conn=[%p]\n", __func__, cli_conn); ss_close_sync(cli_conn->sk, true); tfw_http_conn_msg_free((TfwHttpMsg *)resp); @@ -1292,8 +1308,7 @@ tfw_http_resp_fwd(TfwHttpReq *req, TfwHttpResp *resp) * Otherwise, the correct order of responses may be broken. */ if (tfw_cli_conn_send(cli_conn, (TfwMsg *)resp)) { - TFW_DBG2("%s: Error sending to client connection: " - "conn=[%p] resp=[%p]\n", + TFW_DBG2("%s: Forwarding error: conn=[%p] resp=[%p]\n", __func__, cli_conn, resp); ss_close_sync(cli_conn->sk, true); } @@ -1373,8 +1388,7 @@ tfw_http_req_cache_cb(TfwHttpReq *req, TfwHttpResp *resp) * initialized to point at the appropriate TfwConnection, so that * all subsequent session hits are scheduled much faster. */ - srv_conn = tfw_sched_get_srv_conn((TfwMsg *)req); - if (srv_conn == NULL) { + if (!(srv_conn = tfw_sched_get_srv_conn((TfwMsg *)req))) { TFW_WARN("Unable to find a backend server\n"); goto send_502; } @@ -1683,11 +1697,21 @@ tfw_http_resp_cache_cb(TfwHttpReq *req, TfwHttpResp *resp) TFW_INC_STAT_BH(serv.msgs_otherr); return; } - tfw_http_resp_fwd(req, resp); - /* Responses from cache don't have @resp->conn. */ - if (resp->conn) + /* + * Responses from cache don't have @resp->conn. + * + * FIXME: The same check is performed in tfw_http_popreq() + * which happens just a bit earlier. Is there a way to avoid + * it here? The condition is considered rare, and there's no + * need to check for it in the regular path. The real issue + * here is that APM stats can't handle response times that + * are >= USHORT_MAX. So for now don't count the requests + * that are re-sent after a server connection is restored. + */ + if (resp->conn && !tfw_connection_restricted(resp->conn)) tfw_apm_update(((TfwServer *)resp->conn->peer)->apm, resp->jtstamp, resp->jtstamp - req->jtstamp); + tfw_http_resp_fwd(req, resp); TFW_INC_STAT_BH(serv.msgs_forwarded); return; } @@ -1709,6 +1733,7 @@ tfw_http_popreq(TfwHttpMsg *hmresp) spin_lock(&srv_conn->msg_qlock); if (unlikely(list_empty(fwd_queue))) { + BUG_ON(atomic_read(&srv_conn->qsize)); spin_unlock(&srv_conn->msg_qlock); /* @conn->msg will get NULLed in the process. */ TFW_WARN("Paired request missing\n"); @@ -1719,17 +1744,21 @@ tfw_http_popreq(TfwHttpMsg *hmresp) } req = list_first_entry(fwd_queue, TfwHttpReq, msg.fwd_list); list_del_init(&req->msg.fwd_list); + atomic_dec(&srv_conn->qsize); if ((TfwMsg *)req == srv_conn->msg_sent) srv_conn->msg_sent = NULL; tfw_http_req_flip_if_nonidempotent(srv_conn, req); tfw_http_conn_flip_if_nonidempotent(srv_conn); /* - * If the server connection is no longer on hold, and the queue - * is not drained, then forward pending requests to the server. - * Note: The queue is unlocked inside srv_conn->forward(). + * Perform special processing if the connection is in repair + * mode. Otherwise, forward pending requests to the server. + * Note: The queue is unlocked inside tfw_http_req_fwd_repair() + * or tfw_http_req_fwd_stalled(). */ - if (tfw_http_conn_req_need_fwd(srv_conn)) - srv_conn->forward(srv_conn); + if (tfw_connection_restricted(srv_conn)) + tfw_http_req_fwd_repair(srv_conn); + else if (tfw_http_conn_need_fwd(srv_conn)) + tfw_http_req_fwd_stalled(srv_conn); else spin_unlock(&srv_conn->msg_qlock); diff --git a/tempesta_fw/http.h b/tempesta_fw/http.h index 74d9f37e1..40c7a825e 100644 --- a/tempesta_fw/http.h +++ b/tempesta_fw/http.h @@ -345,9 +345,13 @@ typedef struct { * @method - HTTP request method, one of GET/PORT/HEAD/etc; * @node - NUMA node where request is serviced; * @frang_st - current state of FRANG classifier; + * @chunk_cnt - header or body chunk count for Frang classifier; * @tm_header - time HTTP header started coming; * @tm_bchunk - time previous chunk of HTTP body had come at; * @hash - hash value for caching calculated for the request; + * @resp - the response paired with this request; + * @rstatus - response HTTP status until the response is prepared; + * @retries - the number of re-send attempts; * * TfwStr members must be the first for efficient scanning. */ @@ -367,7 +371,11 @@ typedef struct { unsigned long tm_header; unsigned long tm_bchunk; unsigned long hash; - TfwHttpMsg *resp; + union { + TfwHttpMsg *resp; + unsigned short rstatus; + unsigned short retries; + }; } TfwHttpReq; #define TFW_HTTP_REQ_STR_START(r) __MSG_STR_START(r) diff --git a/tempesta_fw/sched/tfw_sched_hash.c b/tempesta_fw/sched/tfw_sched_hash.c index 913c7907b..64b6abdd7 100644 --- a/tempesta_fw/sched/tfw_sched_hash.c +++ b/tempesta_fw/sched/tfw_sched_hash.c @@ -146,6 +146,7 @@ tfw_sched_hash_get_srv_conn(TfwMsg *msg, TfwSrvGroup *sg) for (tries = 0; tries < __HLIST_SZ(TFW_SG_MAX_CONN); ++tries) { for (ch = sg->sched_data; ch->conn; ++ch) { if (unlikely(tfw_connection_restricted(ch->conn)) + || unlikely(tfw_server_queue_full(ch->conn)) || unlikely(!tfw_connection_live(ch->conn))) continue; curr_weight = msg_hash ^ ch->hash; diff --git a/tempesta_fw/sched/tfw_sched_rr.c b/tempesta_fw/sched/tfw_sched_rr.c index dd675cd04..4413e6041 100644 --- a/tempesta_fw/sched/tfw_sched_rr.c +++ b/tempesta_fw/sched/tfw_sched_rr.c @@ -128,7 +128,8 @@ tfw_sched_rr_get_srv_conn(TfwMsg *msg, TfwSrvGroup *sg) for (c = 0; c < srv_cl->conn_n; ++c) { idx = atomic64_inc_return(&srv_cl->rr_counter); conn = srv_cl->conns[idx % srv_cl->conn_n]; - if (unlikely(tfw_connection_restricted(conn))) + if (unlikely(tfw_connection_restricted(conn)) + || unlikely(tfw_server_queue_full(conn))) continue; if (skipnip && tfw_connection_hasnip(conn)) { if (likely(tfw_connection_live(conn))) diff --git a/tempesta_fw/server.h b/tempesta_fw/server.h index f4eafe9ec..e4fd72bdf 100644 --- a/tempesta_fw/server.h +++ b/tempesta_fw/server.h @@ -43,14 +43,19 @@ typedef struct tfw_scheduler_t TfwScheduler; * @list - member pointer in the list of servers of a server group; * @sg - back-reference to the server group; * @apm - opaque handle for APM stats; + * @qsize_max - maximum queue size of a server connection; + * @qjtimeout - maximum age of a request in a server connection, in jiffies; + * @retry_max - maximum number of tries for forwarding a request; */ typedef struct { TFW_PEER_COMMON; struct list_head list; TfwSrvGroup *sg; void *apm; - unsigned int flags; int stress; + unsigned int qsize_max; + unsigned long qjtimeout; + unsigned int retry_max; } TfwServer; /** @@ -118,6 +123,13 @@ void tfw_server_destroy(TfwServer *srv); void tfw_srv_conn_release(TfwConnection *conn); +static inline bool +tfw_server_queue_full(TfwConnection *srv_conn) +{ + TfwServer *srv = (TfwServer *)srv_conn->peer; + return atomic_read(&srv_conn->qsize) >= srv->qsize_max; +} + /* Server group routines. */ TfwSrvGroup *tfw_sg_lookup(const char *name); TfwSrvGroup *tfw_sg_new(const char *name, gfp_t flags); diff --git a/tempesta_fw/sock_srv.c b/tempesta_fw/sock_srv.c index 65b431cd9..8cc938486 100644 --- a/tempesta_fw/sock_srv.c +++ b/tempesta_fw/sock_srv.c @@ -58,7 +58,7 @@ /* * Default number of reconnect attempts. Zero means unlimited number. */ -#define TFW_SOCK_SRV_RETRY_ATTEMPTS_DEF 0 /* default value */ +#define TFW_SRV_RETRY_ATTEMPTS_DEF 0 /* default value */ /** * TfwConnection extension for server sockets. @@ -495,13 +495,12 @@ tfw_srv_conn_alloc(void) { TfwSrvConnection *srv_conn; - srv_conn = kmem_cache_alloc(tfw_srv_conn_cache, GFP_ATOMIC); - if (!srv_conn) + if (!(srv_conn = kmem_cache_alloc(tfw_srv_conn_cache, GFP_ATOMIC))) return NULL; tfw_connection_init(&srv_conn->conn); - atomic_set(&srv_conn->conn.msg_qsize, 0); INIT_LIST_HEAD(&srv_conn->conn.nip_queue); + atomic_set(&srv_conn->conn.qsize, 0); __setup_retry_timer(srv_conn); ss_proto_init(&srv_conn->conn.proto, &tfw_sock_srv_ss_hooks, Conn_HttpSrv); @@ -516,8 +515,8 @@ tfw_srv_conn_free(TfwSrvConnection *srv_conn) /* Check that all nested resources are freed. */ tfw_connection_validate_cleanup(&srv_conn->conn); - BUG_ON(atomic_read(&srv_conn->conn.msg_qsize)); BUG_ON(!list_empty(&srv_conn->conn.nip_queue)); + BUG_ON(atomic_read(&srv_conn->conn.qsize)); kmem_cache_free(tfw_srv_conn_cache, srv_conn); } @@ -562,59 +561,112 @@ tfw_sock_srv_delete_all_conns(void) * ------------------------------------------------------------------------ */ -#define TFW_SRV_CFG_DEF_CONNS_N "32" +/* Default number of connections per server. */ +#define TFW_SRV_CONNS_N_DEF "32" -static int tfw_srv_cfg_in_attempts = TFW_SOCK_SRV_RETRY_ATTEMPTS_DEF; -static int tfw_srv_cfg_out_attempts = TFW_SOCK_SRV_RETRY_ATTEMPTS_DEF; - -static int -tfw_srv_cfg_set_conn_retries(TfwServer *srv, int attempts) -{ - TfwSrvConnection *srv_conn, *tmp; +/* + * Server connection's maximum queue size, and default timeout for + * requests in the queue. + */ +#define TFW_SRV_QUEUE_SIZE_DEF 1000 /* Max queue size */ +#define TFW_SRV_QUEUE_TIMEOUT_DEF 60 /* Default request timeout */ +#define TFW_SRV_QUEUE_TRIES_DEF 5 /* Default number of tries */ - list_for_each_entry_safe(srv_conn, tmp, &srv->conn_list, conn.list) - srv_conn->max_attempts = attempts; +static int tfw_cfg_in_queue_size = TFW_SRV_QUEUE_SIZE_DEF; +static int tfw_cfg_in_queue_timeout = TFW_SRV_QUEUE_TIMEOUT_DEF; +static int tfw_cfg_in_queue_tries = TFW_SRV_QUEUE_TRIES_DEF; +static int tfw_cfg_out_queue_size = TFW_SRV_QUEUE_SIZE_DEF; +static int tfw_cfg_out_queue_timeout = TFW_SRV_QUEUE_TIMEOUT_DEF; +static int tfw_cfg_out_queue_tries = TFW_SRV_QUEUE_TRIES_DEF; - return 0; -} +static int tfw_cfg_in_retry_attempts = TFW_SRV_RETRY_ATTEMPTS_DEF; +static int tfw_cfg_out_retry_attempts = TFW_SRV_RETRY_ATTEMPTS_DEF; static int -tfw_srv_cfg_handle_conn_retries(TfwCfgSpec *cs, TfwCfgEntry *ce, int *attempts) +tfw_handle_opt_val(TfwCfgSpec *cs, TfwCfgEntry *ce, int *optval) { int ret; + if (ce->attr_n) { + TFW_ERR("%s: Arguments may not have the \'=\' sign\n", + cs->name); + return -EINVAL; + } if (ce->val_n != 1) { - TFW_ERR("%s: Invalid number of arguments: %zd\n", - cs->name, ce->val_n); + TFW_ERR("%s: Invalid number of arguments: %d\n", + cs->name, (int)ce->val_n); return -EINVAL; } - - if ((ret = tfw_cfg_parse_int(ce->vals[0], attempts))) + if ((ret = tfw_cfg_parse_int(ce->vals[0], optval))) return ret; return 0; } static int -tfw_srv_cfg_handle_in_conn_retries(TfwCfgSpec *cs, TfwCfgEntry *ce) +tfw_handle_in_queue_size(TfwCfgSpec *cs, TfwCfgEntry *ce) +{ + return tfw_handle_opt_val(cs, ce, &tfw_cfg_in_queue_size); +} + +static int +tfw_handle_out_queue_size(TfwCfgSpec *cs, TfwCfgEntry *ce) +{ + return tfw_handle_opt_val(cs, ce, &tfw_cfg_out_queue_size); +} + +static int +tfw_handle_in_queue_timeout(TfwCfgSpec *cs, TfwCfgEntry *ce) { - return tfw_srv_cfg_handle_conn_retries(cs, ce, - &tfw_srv_cfg_in_attempts); + return tfw_handle_opt_val(cs, ce, &tfw_cfg_in_queue_timeout); } static int -tfw_srv_cfg_handle_out_conn_retries(TfwCfgSpec *cs, TfwCfgEntry *ce) +tfw_handle_out_queue_timeout(TfwCfgSpec *cs, TfwCfgEntry *ce) +{ + return tfw_handle_opt_val(cs, ce, &tfw_cfg_out_queue_timeout); +} + +static int +tfw_handle_in_queue_tries(TfwCfgSpec *cs, TfwCfgEntry *ce) +{ + return tfw_handle_opt_val(cs, ce, &tfw_cfg_in_queue_tries); +} + +static int +tfw_handle_out_queue_tries(TfwCfgSpec *cs, TfwCfgEntry *ce) +{ + return tfw_handle_opt_val(cs, ce, &tfw_cfg_out_queue_tries); +} + +static int +tfw_handle_in_conn_tries(TfwCfgSpec *cs, TfwCfgEntry *ce) +{ + return tfw_handle_opt_val(cs, ce, &tfw_cfg_in_retry_attempts); +} + +static int +tfw_handle_out_conn_tries(TfwCfgSpec *cs, TfwCfgEntry *ce) +{ + return tfw_handle_opt_val(cs, ce, &tfw_cfg_out_retry_attempts); +} + +tfw_cfg_set_conn_tries(TfwServer *srv, int attempts) { - return tfw_srv_cfg_handle_conn_retries(cs, ce, - &tfw_srv_cfg_out_attempts); + TfwSrvConnection *srv_conn; + + list_for_each_entry(srv_conn, &srv->conn_list, conn.list) { + srv_conn->max_attempts = attempts; + + return 0; } /** * A "srv_group" which is currently being parsed. * All "server" entries are added to this group. */ -static TfwSrvGroup *tfw_srv_cfg_curr_group; -static TfwScheduler *tfw_srv_cfg_dflt_sched; +static TfwSrvGroup *tfw_cfg_curr_group; +static TfwScheduler *tfw_cfg_dflt_sched; /** * Handle "server" within an "srv_group", e.g.: @@ -627,20 +679,20 @@ static TfwScheduler *tfw_srv_cfg_dflt_sched; * Every server is simply added to the tfw_srv_cfg_curr_group. */ static TfwServer * -tfw_srv_cfg_handle_server(TfwCfgSpec *cs, TfwCfgEntry *ce) +tfw_handle_server(TfwCfgSpec *cs, TfwCfgEntry *ce) { TfwAddr addr; TfwServer *srv; int r, conns_n; const char *in_addr, *in_conns_n; - BUG_ON(!tfw_srv_cfg_curr_group); + BUG_ON(!tfw_cfg_curr_group); if ((r = tfw_cfg_check_val_n(ce, 1))) return NULL; in_addr = ce->vals[0]; - in_conns_n = tfw_cfg_get_attr(ce, "conns_n", TFW_SRV_CFG_DEF_CONNS_N); + in_conns_n = tfw_cfg_get_attr(ce, "conns_n", TFW_SRV_CONNS_N_DEF); if ((r = tfw_addr_pton(&TFW_STR_FROM(in_addr), &addr))) return NULL; @@ -656,7 +708,7 @@ tfw_srv_cfg_handle_server(TfwCfgSpec *cs, TfwCfgEntry *ce) TFW_ERR("can't create a server socket\n"); return NULL; } - tfw_sg_add(tfw_srv_cfg_curr_group, srv); + tfw_sg_add(tfw_cfg_curr_group, srv); if ((r = tfw_sock_srv_add_conns(srv, conns_n))) { TFW_ERR("can't add connections to the server\n"); @@ -666,20 +718,20 @@ tfw_srv_cfg_handle_server(TfwCfgSpec *cs, TfwCfgEntry *ce) return srv; } -static TfwServer *tfw_srv_cfg_in_lst[TFW_SG_MAX_SRV]; -static int tfw_srv_cfg_in_lstsz = 0; -static int tfw_srv_cfg_out_lstsz = 0; +static TfwServer *tfw_cfg_in_lst[TFW_SG_MAX_SRV]; +static int tfw_cfg_in_lstsz = 0; +static int tfw_cfg_out_lstsz = 0; static int -tfw_srv_cfg_handle_in_server(TfwCfgSpec *cs, TfwCfgEntry *ce) +tfw_handle_in_server(TfwCfgSpec *cs, TfwCfgEntry *ce) { TfwServer *srv; - if (tfw_srv_cfg_in_lstsz >= TFW_SG_MAX_SRV) + if (tfw_cfg_in_lstsz >= TFW_SG_MAX_SRV) return -EINVAL; - if (!(srv = tfw_srv_cfg_handle_server(cs, ce))) + if (!(srv = tfw_handle_server(cs, ce))) return -EINVAL; - tfw_srv_cfg_in_lst[tfw_srv_cfg_in_lstsz++] = srv; + tfw_cfg_in_lst[tfw_cfg_in_lstsz++] = srv; return 0; } @@ -704,7 +756,7 @@ tfw_srv_cfg_handle_in_server(TfwCfgSpec *cs, TfwCfgEntry *ce) * } */ static int -tfw_srv_cfg_handle_out_server(TfwCfgSpec *cs, TfwCfgEntry *ce) +tfw_handle_out_server(TfwCfgSpec *cs, TfwCfgEntry *ce) { int ret; TfwServer *srv; @@ -712,7 +764,7 @@ tfw_srv_cfg_handle_out_server(TfwCfgSpec *cs, TfwCfgEntry *ce) static const char __read_mostly s_default[] = "default"; TfwSrvGroup *sg = tfw_sg_lookup(s_default); - if (tfw_srv_cfg_out_lstsz >= TFW_SG_MAX_SRV) + if (tfw_cfg_out_lstsz >= TFW_SG_MAX_SRV) return -EINVAL; /* The group "default" is created implicitly. */ if (sg == NULL) { @@ -720,8 +772,8 @@ tfw_srv_cfg_handle_out_server(TfwCfgSpec *cs, TfwCfgEntry *ce) TFW_ERR("Unable to add server group '%s'\n", s_default); return -EINVAL; } - dflt_sched_name = tfw_srv_cfg_dflt_sched - ? tfw_srv_cfg_dflt_sched->name + dflt_sched_name = tfw_cfg_dflt_sched + ? tfw_cfg_dflt_sched->name : "round-robin"; if ((ret = tfw_sg_set_sched(sg, dflt_sched_name)) != 0) { TFW_ERR("Unable to set scheduler '%s' " @@ -730,12 +782,15 @@ tfw_srv_cfg_handle_out_server(TfwCfgSpec *cs, TfwCfgEntry *ce) return ret; } } - tfw_srv_cfg_curr_group = sg; + tfw_cfg_curr_group = sg; - if (!(srv = tfw_srv_cfg_handle_server(cs, ce))) + if (!(srv = tfw_handle_server(cs, ce))) return -EINVAL; - tfw_srv_cfg_set_conn_retries(srv, tfw_srv_cfg_out_attempts); + tfw_cfg_set_conn_tries(srv, tfw_cfg_out_retry_attempts) + srv->qsize_max = tfw_cfg_out_queue_size; + srv->qjtimeout = msecs_to_jiffies(tfw_cfg_out_queue_timeout * 1000); + srv->retry_max = tfw_cfg_out_queue_tries; return 0; } @@ -753,39 +808,39 @@ tfw_srv_cfg_handle_out_server(TfwCfgSpec *cs, TfwCfgEntry *ce) * new TfwSrvGroup object and sets the context for parsing nested "server"s. */ static int -tfw_srv_cfg_begin_srv_group(TfwCfgSpec *cs, TfwCfgEntry *ce) +tfw_begin_srv_group(TfwCfgSpec *cs, TfwCfgEntry *ce) { int r; TfwSrvGroup *sg; const char *sg_name, *sched_name, *dflt_sched_name; - r = tfw_cfg_check_val_n(ce, 1); - if (r) + if ((r = tfw_cfg_check_val_n(ce, 1))) return r; sg_name = ce->vals[0]; - dflt_sched_name = tfw_srv_cfg_dflt_sched - ? tfw_srv_cfg_dflt_sched->name : "round-robin"; + dflt_sched_name = tfw_cfg_dflt_sched + ? tfw_cfg_dflt_sched->name : "round-robin"; sched_name = tfw_cfg_get_attr(ce, "sched", dflt_sched_name); TFW_DBG("begin srv_group: %s\n", sg_name); - sg = tfw_sg_new(sg_name, GFP_KERNEL); - if (!sg) { + if (!(sg = tfw_sg_new(sg_name, GFP_KERNEL))) { TFW_ERR("Unable to add server group '%s'\n", sg_name); return -EINVAL; } - r = tfw_sg_set_sched(sg, sched_name); - if (r) { + if ((r = tfw_sg_set_sched(sg, sched_name))) { TFW_ERR("Unable to set scheduler '%s' " "for server group '%s'\n", sched_name, sg_name); return r; } /* Set the current group. All nested "server"s are added to it. */ - tfw_srv_cfg_curr_group = sg; + tfw_cfg_curr_group = sg; - tfw_srv_cfg_in_lstsz = 0; - tfw_srv_cfg_in_attempts = tfw_srv_cfg_out_attempts; + tfw_cfg_in_lstsz = 0; + tfw_cfg_in_retry_attempts = tfw_cfg_out_retry_attempts; + tfw_cfg_in_queue_size = tfw_cfg_out_queue_size; + tfw_cfg_in_queue_timeout = tfw_cfg_out_queue_timeout; + tfw_cfg_in_queue_tries = tfw_cfg_out_queue_tries; return 0; } @@ -801,30 +856,34 @@ tfw_srv_cfg_begin_srv_group(TfwCfgSpec *cs, TfwCfgEntry *ce) * } <--- The position at the moment of call. */ static int -tfw_srv_cfg_finish_srv_group(TfwCfgSpec *cs) +tfw_finish_srv_group(TfwCfgSpec *cs) { int i; - BUG_ON(!tfw_srv_cfg_curr_group); - BUG_ON(list_empty(&tfw_srv_cfg_curr_group->srv_list)); - TFW_DBG("finish srv_group: %s\n", tfw_srv_cfg_curr_group->name); - - for (i = 0; i < tfw_srv_cfg_in_lstsz; ++i) - tfw_srv_cfg_set_conn_retries(tfw_srv_cfg_in_lst[i], - tfw_srv_cfg_in_attempts); - tfw_srv_cfg_curr_group = NULL; + BUG_ON(!tfw_cfg_curr_group); + BUG_ON(list_empty(&tfw_cfg_curr_group->srv_list)); + TFW_DBG("finish srv_group: %s\n", tfw_cfg_curr_group->name); + + for (i = 0; i < tfw_cfg_in_lstsz; ++i) { + tfw_cfg_set_conn_tries(tfw_cfg_in_lst[i], + tfw_cfg_in_retry_attempts); + srv->qsize_max = tfw_cfg_in_queue_size; + srv->qjtimeout = + msecs_to_jiffies(tfw_cfg_in_queue_timeout * 1000); + srv->retry_max = tfw_cfg_in_queue_tries; + } + tfw_cfg_curr_group = NULL; return 0; } static int -tfw_srv_cfg_handle_sched(TfwCfgSpec *cs, TfwCfgEntry *ce) +tfw_handle_sched(TfwCfgSpec *cs, TfwCfgEntry *ce) { if (tfw_cfg_check_val_n(ce, 1)) return -EINVAL; - tfw_srv_cfg_dflt_sched = tfw_sched_lookup(ce->vals[0]); - if (tfw_srv_cfg_dflt_sched == NULL) { + if (!(tfw_cfg_dflt_sched = tfw_sched_lookup(ce->vals[0]))) { TFW_ERR("Unrecognized scheduler: '%s'\n", ce->vals[0]); return -EINVAL; } @@ -836,29 +895,53 @@ tfw_srv_cfg_handle_sched(TfwCfgSpec *cs, TfwCfgEntry *ce) * Clean everything produced during parsing "server" and "srv_group" entries. */ static void -tfw_srv_cfg_clean_srv_groups(TfwCfgSpec *cs) +tfw_clean_srv_groups(TfwCfgSpec *cs) { tfw_sock_srv_delete_all_conns(); tfw_sg_release_all(); - tfw_srv_cfg_curr_group = NULL; + tfw_cfg_curr_group = NULL; } -static TfwCfgSpec tfw_sock_srv_cfg_srv_group_specs[] = { +static TfwCfgSpec tfw_srv_group_specs[] = { { "server", NULL, - tfw_srv_cfg_handle_in_server, + tfw_handle_in_server, .allow_repeat = true, - .cleanup = tfw_srv_cfg_clean_srv_groups + .cleanup = tfw_clean_srv_groups + }, + { + "server_queue_size", + NULL, + tfw_handle_in_queue_size, + .allow_none = true, + .allow_repeat = false, + .cleanup = tfw_clean_srv_groups, + }, + { + "server_queue_timeout", + NULL, + tfw_handle_in_queue_timeout, + .allow_none = true, + .allow_repeat = false, + .cleanup = tfw_clean_srv_groups, }, { - "connect_retries", + "server_queue_tries", NULL, - tfw_srv_cfg_handle_in_conn_retries, + tfw_handle_in_queue_tries, .allow_none = true, .allow_repeat = false, - .cleanup = tfw_srv_cfg_clean_srv_groups, + .cleanup = tfw_clean_srv_groups, }, - {} + { + "connect_tries", + NULL, + tfw_handle_in_conn_tries, + .allow_none = true, + .allow_repeat = false, + .cleanup = tfw_clean_srv_groups, + }, + { 0 } }; TfwCfgMod tfw_sock_srv_cfg_mod = { @@ -869,40 +952,64 @@ TfwCfgMod tfw_sock_srv_cfg_mod = { { "server", NULL, - tfw_srv_cfg_handle_out_server, + tfw_handle_out_server, + .allow_none = true, + .allow_repeat = true, + .cleanup = tfw_clean_srv_groups, + }, + { + "server_queue_size", + NULL, + tfw_handle_out_queue_size, + .allow_none = true, + .allow_repeat = true, + .cleanup = tfw_clean_srv_groups, + }, + { + "server_queue_timeout", + NULL, + tfw_handle_out_queue_timeout, + .allow_none = true, + .allow_repeat = true, + .cleanup = tfw_clean_srv_groups, + }, + { + "server_queue_tries", + NULL, + tfw_handle_out_queue_tries, .allow_none = true, .allow_repeat = true, - .cleanup = tfw_srv_cfg_clean_srv_groups, + .cleanup = tfw_clean_srv_groups, }, { - "connect_retries", + "connect_tries", NULL, - tfw_srv_cfg_handle_out_conn_retries, + tfw_handle_out_conn_tries, .allow_none = true, .allow_repeat = true, - .cleanup = tfw_srv_cfg_clean_srv_groups, + .cleanup = tfw_clean_srv_groups, }, { "sched", NULL, - tfw_srv_cfg_handle_sched, + tfw_handle_sched, .allow_none = true, .allow_repeat = true, - .cleanup = tfw_srv_cfg_clean_srv_groups, + .cleanup = tfw_clean_srv_groups, }, { "srv_group", NULL, tfw_cfg_handle_children, - tfw_sock_srv_cfg_srv_group_specs, + tfw_srv_group_specs, &(TfwCfgSpecChild ) { - .begin_hook = tfw_srv_cfg_begin_srv_group, - .finish_hook = tfw_srv_cfg_finish_srv_group + .begin_hook = tfw_begin_srv_group, + .finish_hook = tfw_finish_srv_group }, .allow_none = true, .allow_repeat = true, }, - {} + { 0 } } };