Skip to content

Commit

Permalink
Re-sending of requests within a restored server connection. (#419)
Browse files Browse the repository at this point in the history
This commit implements the logic of re-sending requests that were
in the server connection's queue when the connection failed. When
the connection is restored, it's not scheduled until all requests
in the forwarding queue are re-sent or sent to the server.
  • Loading branch information
keshonok committed Feb 13, 2017
1 parent af18f8e commit c4c79ea
Show file tree
Hide file tree
Showing 6 changed files with 391 additions and 212 deletions.
9 changes: 9 additions & 0 deletions tempesta_fw/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ tfw_connection_new(TfwConnection *conn)
return TFW_CONN_HOOK_CALL(conn, conn_init);
}

/**
* Call connection repairing via TfwConnHooks.
*/
void
tfw_connection_repair(TfwConnection *conn)
{
TFW_CONN_HOOK_CALL(conn, conn_repair);
}

/**
* Publish the "connection is dropped" event via TfwConnHooks.
*/
Expand Down
61 changes: 53 additions & 8 deletions tempesta_fw/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,36 +79,51 @@ enum {
* @msg_queue - queue of messages to be sent over the connection;
* @nip_queue - queue of non-idempotent messages within @msg_queue;
* @msg_qlock - lock for accessing @msg_queue;
* @flags - various atomic flags related to connection's state;
* @refcnt - number of users of the connection structure instance;
* @nipcnt - number of non-idempotent requests in the connection;
* @timer - The keep-alive/retry timer for the connection;
* @msg - message that is currently being processed;
* @msg_sent - message that was sent last in the connection;
* @msg_resent - message that was re-sent last in the connection;
* @peer - TfwClient or TfwServer handler;
* @sk - an appropriate sock handler;
* @destructor - called when a connection is destroyed;
* @forward - called when a request is forwarded to server;
*/
typedef struct {
typedef struct tfw_connection_t {
SsProto proto;
TfwGState state;
struct list_head list;
struct list_head msg_queue;
struct list_head nip_queue;
struct list_head nip_queue; /*srv*/
spinlock_t msg_qlock;
unsigned long flags; /*srv*/
atomic_t refcnt;
atomic_t nipcnt;
struct timer_list timer;
TfwMsg *msg;
TfwMsg *msg_sent;
TfwMsg *msg_sent; /*srv*/
TfwMsg *msg_resent; /*srv*/
TfwPeer *peer;
struct sock *sk;
void (*destructor)(void *);
void (*forward)(struct tfw_connection_t *); /*srv*/
} TfwConnection;

#define TFW_CONN_DEATHCNT (INT_MIN / 2)

#define TFW_CONN_TYPE(c) ((c)->proto.type)

/* Connection flags are defined by the bit number. */
enum {
TFW_CONN_B_RESEND = 0, /* Need to re-send requests. */
TFW_CONN_B_QFORWD, /* Need to forward requests in the queue. */
TFW_CONN_B_HASNIP, /* Has non-idempotent requests. */
};

#define TFW_CONN_F_RESEND (1 << TFW_CONN_B_RESEND)
#define TFW_CONN_F_QFORWD (1 << TFW_CONN_B_QFORWD)
#define TFW_CONN_F_HASNIP (1 << TFW_CONN_B_HASNIP)

/**
* TLS hardened connection.
*/
Expand All @@ -129,6 +144,14 @@ typedef struct {
*/
int (*conn_init)(TfwConnection *conn);

/*
* Called when a new connection is initialized and before
* the initialization is complete. Makes sense only for
* server connections. Used to re-send requests that were
* left in the connection queue.
*/
void (*conn_repair)(TfwConnection *conn);

/*
* Called when closing a connection (client or server,
* as in conn_init()). This is required for modules that
Expand Down Expand Up @@ -159,6 +182,25 @@ extern TfwConnHooks *conn_hooks[TFW_CONN_MAX_PROTOS];
#define TFW_CONN_HOOK_CALL(c, f...) \
tfw_conn_hook_call(TFW_CONN_TYPE2IDX(TFW_CONN_TYPE(c)), c, f)

/*
* Tell if a connection is restricted. When restricted, a connection
* cannot be scheduled.
*/
static inline bool
tfw_connection_restricted(TfwConnection *conn)
{
return test_bit(TFW_CONN_B_RESEND, &conn->flags);
}

/*
* Tell if a connection has non-idempotent requests.
*/
static inline bool
tfw_connection_hasnip(TfwConnection *conn)
{
return test_bit(TFW_CONN_B_HASNIP, &conn->flags);
}

static inline bool
tfw_connection_live(TfwConnection *conn)
{
Expand All @@ -172,8 +214,8 @@ tfw_connection_get(TfwConnection *conn)
}

/**
* Increment reference counter and return true if @conn isn't in failovering
* process, i.e. @refcnt > 0.
* Increment reference counter and return true if @conn isi not in
* failovering process, i.e. @refcnt wasn't less or equal to zero.
*/
static inline bool
tfw_connection_get_if_live(TfwConnection *conn)
Expand Down Expand Up @@ -295,7 +337,9 @@ tfw_connection_validate_cleanup(TfwConnection *conn)

BUG_ON(!conn);
BUG_ON(!list_empty(&conn->list));
BUG_ON(!list_empty(&conn->msg_queue));
BUG_ON((TFW_CONN_TYPE(conn) & Conn_Clnt)
&& !list_empty(&conn->msg_queue));
BUG_ON(atomic_read(&conn->refcnt) & ~1);
BUG_ON(conn->msg);

rc = atomic_read(&conn->refcnt);
Expand All @@ -311,6 +355,7 @@ void tfw_connection_init(TfwConnection *conn);
void tfw_connection_link_peer(TfwConnection *conn, TfwPeer *peer);

int tfw_connection_new(TfwConnection *conn);
void tfw_connection_repair(TfwConnection *conn);
void tfw_connection_drop(TfwConnection *conn);
void tfw_connection_release(TfwConnection *conn);

Expand Down
Loading

0 comments on commit c4c79ea

Please sign in to comment.