Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send MEET packet to node if there is no inbound link to fix inconsistency when handshake timedout #1307

Merged
merged 9 commits into from
Dec 12, 2024
94 changes: 65 additions & 29 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,10 @@ clusterLink *createClusterLink(clusterNode *node) {
* with this link will have the 'link' field set to NULL. */
void freeClusterLink(clusterLink *link) {
serverAssert(link != NULL);
serverLog(LL_DEBUG, "Freeing cluster link for node: %.40s:%s",
link->node ? link->node->name : "<unknown>",
link->inbound ? "inbound" : "outbound");

if (link->conn) {
connClose(link->conn);
link->conn = NULL;
Expand All @@ -1350,6 +1354,7 @@ void freeClusterLink(clusterLink *link) {
} else if (link->node->inbound_link == link) {
serverAssert(link->inbound);
link->node->inbound_link = NULL;
link->node->inbound_link_freed_time = mstime();
}
}
zfree(link);
Expand Down Expand Up @@ -1489,6 +1494,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->fail_time = 0;
node->link = NULL;
node->inbound_link = NULL;
node->inbound_link_freed_time = node->ctime;
memset(node->ip, 0, sizeof(node->ip));
node->announce_client_ipv4 = sdsempty();
node->announce_client_ipv6 = sdsempty();
Expand Down Expand Up @@ -1695,6 +1701,9 @@ void clusterAddNode(clusterNode *node) {
* it is a replica node.
*/
void clusterDelNode(clusterNode *delnode) {
serverAssert(delnode != NULL);
serverLog(LL_DEBUG, "Deleting node %.40s from cluster view", delnode->name);

int j;
dictIterator *di;
dictEntry *de;
Expand Down Expand Up @@ -2077,7 +2086,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) {
/* Return 1 if we already have a node in HANDSHAKE state matching the
* specified ip address and port number. This function is used in order to
* avoid adding a new handshake node for the same address multiple times. */
int clusterHandshakeInProgress(char *ip, int port, int cport) {
static int clusterHandshakeInProgress(char *ip, int port, int cport) {
dictIterator *di;
dictEntry *de;

Expand All @@ -2099,7 +2108,7 @@ int clusterHandshakeInProgress(char *ip, int port, int cport) {
*
* EAGAIN - There is already a handshake in progress for this address.
* EINVAL - IP or port are not valid. */
int clusterStartHandshake(char *ip, int port, int cport) {
static int clusterStartHandshake(char *ip, int port, int cport) {
clusterNode *n;
char norm_ip[NET_IP_STR_LEN];
struct sockaddr_storage sa;
Expand Down Expand Up @@ -3205,33 +3214,44 @@ int clusterProcessPacket(clusterLink *link) {
}
}

/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, replicaof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node. The exception
* to this is the flag that indicates extensions are supported, as
* we want to send extensions right away in the return PONG in order
* to reduce the amount of time needed to stabilize the shard ID. */
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;

node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE);
serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK);
getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port);
node->cport = ntohs(hdr->cport);
if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
if (type == CLUSTERMSG_TYPE_MEET) {
if (!sender) {
/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, replicaof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node. The exception
* to this is the flag that indicates extensions are supported, as
* we want to send extensions right away in the return PONG in order
* to reduce the amount of time needed to stabilize the shard ID. */
clusterNode *node;

node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE);
serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK);
getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port);
node->cport = ntohs(hdr->cport);
if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
}
setClusterNodeToInboundClusterLink(node, link);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);

/* If this is a MEET packet from an unknown node, we still process
* the gossip section here since we have to trust the sender because
* of the message type. */
clusterProcessGossipSection(hdr, link);
} else if (sender->link && now - sender->ctime > server.cluster_node_timeout) {
pieturin marked this conversation as resolved.
Show resolved Hide resolved
pieturin marked this conversation as resolved.
Show resolved Hide resolved
/* The MEET packet is from a known node, so the sender thinks that I do not know it.
* Freeing my outbound link to that node, to force a reconnect and sending a PING.
pieturin marked this conversation as resolved.
Show resolved Hide resolved
* Once that node receives our PING, it should recognize the new connection as an inbound link from me.
* We should only free the outbound link if the node is known for more time than the handshake timeout,
* since during this time, the other side might still be trying to complete the handshake. */
serverAssert(link != sender->link); // we should always receive a MEET packet on an inbound link
pieturin marked this conversation as resolved.
Show resolved Hide resolved
serverLog(LL_NOTICE, "Freeing outbound link to node %.40s after receiving a MEET packet from this known node", sender->name);
freeClusterLink(sender->link);
}
setClusterNodeToInboundClusterLink(node, link);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

/* If this is a MEET packet from an unknown node, we still process
* the gossip section here since we have to trust the sender because
* of the message type. */
if (!sender && type == CLUSTERMSG_TYPE_MEET) clusterProcessGossipSection(hdr, link);

/* Anyway reply with a PONG */
clusterSendPing(link, CLUSTERMSG_TYPE_PONG);
}
Expand All @@ -3241,7 +3261,7 @@ int clusterProcessPacket(clusterLink *link) {
serverLog(LL_DEBUG, "%s packet received: %.40s", clusterGetMessageTypeString(type),
link->node ? link->node->name : "NULL");

if (sender && (sender->flags & CLUSTER_NODE_MEET)) {
if (sender && nodeInMeetState(sender)) {
/* Once we get a response for MEET from the sender, we can stop sending more MEET. */
sender->flags &= ~CLUSTER_NODE_MEET;
serverLog(LL_NOTICE, "Successfully completed handshake with %.40s (%s)", sender->name,
Expand Down Expand Up @@ -3666,7 +3686,7 @@ void clusterLinkConnectHandler(connection *conn) {
* of a PING one, to force the receiver to add us in its node
* table. */
mstime_t old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
clusterSendPing(link, nodeInMeetState(node) ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
Expand Down Expand Up @@ -3745,7 +3765,9 @@ void clusterReadHandler(connection *conn) {

if (nread <= 0) {
/* I/O error... */
serverLog(LL_DEBUG, "I/O error reading from node link: %s",
serverLog(LL_DEBUG, "I/O error reading from node link (%.40s:%s): %s",
link->node ? link->node->name : "<unknown>",
link->inbound ? "inbound" : "outbound",
(nread == 0) ? "connection closed" : connGetLastError(conn));
handleLinkIOError(link);
return;
Expand Down Expand Up @@ -3926,6 +3948,11 @@ void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
/* Send a PING or PONG packet to the specified node, making sure to add enough
* gossip information. */
void clusterSendPing(clusterLink *link, int type) {
serverLog(LL_DEBUG, "Sending %s packet to node %.40s (%s)",
clusterGetMessageTypeString(type),
link->node ? link->node->name : "<unknown>",
link->inbound ? "inbound" : "outbound");
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved

static unsigned long long cluster_pings_sent = 0;
cluster_pings_sent++;
int gossipcount = 0; /* Number of gossip sections added so far. */
Expand Down Expand Up @@ -4941,6 +4968,15 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_
clusterDelNode(node);
return 1;
}
if (node->link != NULL && node->inbound_link == NULL && nodeInNormalState(node) &&
pieturin marked this conversation as resolved.
Show resolved Hide resolved
now - node->inbound_link_freed_time > handshake_timeout) {
/* Node has an outbound link, but no inbound link for more than the handshake timeout.
* This probably means this node does not know us yet, whereas we know it.
* So we send it a MEET packet to do a handshake with it and correct the inconsistent cluster view. */
node->flags |= CLUSTER_NODE_MEET;
serverLog(LL_NOTICE, "Sending MEET packet to node %.40s because there is no inbound link for it", node->name);
clusterSendPing(node->link, CLUSTERMSG_TYPE_MEET);
}

if (node->link == NULL) {
clusterLink *link = createClusterLink(node);
Expand Down
4 changes: 4 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ typedef struct clusterLink {
#define nodeIsPrimary(n) ((n)->flags & CLUSTER_NODE_PRIMARY)
#define nodeIsReplica(n) ((n)->flags & CLUSTER_NODE_REPLICA)
#define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE)
#define nodeInMeetState(n) ((n)->flags & CLUSTER_NODE_MEET)
#define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR))
#define nodeTimedOut(n) ((n)->flags & CLUSTER_NODE_PFAIL)
#define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL)
#define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER)
#define nodeSupportsExtensions(n) ((n)->flags & CLUSTER_NODE_EXTENSIONS_SUPPORTED)
#define nodeSupportsLightMsgHdr(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_SUPPORTED)
#define nodeInNormalState(n) (!((n)->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET | CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL)))

/* This structure represent elements of node->fail_reports. */
typedef struct clusterNodeFailReport {
Expand Down Expand Up @@ -343,6 +345,8 @@ struct _clusterNode {
* failover scenarios. */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned primary condition */
mstime_t inbound_link_freed_time; /* Last time we freed the inbound link for this node.
If it was never freed, it is the same as ctime */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
sds announce_client_ipv4; /* IPv4 for clients only. */
Expand Down
9 changes: 9 additions & 0 deletions tests/support/cluster_util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,15 @@ proc get_cluster_nodes {id {status "*"}} {
return $nodes
}

# Returns the parsed myself node entry as a dictionary.
proc get_myself id {
set nodes [get_cluster_nodes $id]
foreach n $nodes {
if {[cluster_has_flag $n myself]} {return $n}
}
return {}
}

# Returns 1 if no node knows node_id, 0 if any node knows it.
proc node_is_forgotten {node_id} {
for {set j 0} {$j < [llength $::servers]} {incr j} {
Expand Down
4 changes: 3 additions & 1 deletion tests/unit/cluster/cluster-multiple-meets.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ tags {tls:skip external:skip cluster} {
} else {
fail "Node 1 recognizes node 0 even though it drops PONGs from node 0"
}
assert {[llength [get_cluster_nodes 0 connected]] == 2}
assert {[llength [get_cluster_nodes 0]] == 2}

# Drop incoming and outgoing links from/to 1
R 0 DEBUG CLUSTERLINK KILL ALL [R 1 CLUSTER MYID]
Expand All @@ -77,6 +77,8 @@ tags {tls:skip external:skip cluster} {
# Both a and b will turn to cluster state ok
wait_for_condition 1000 50 {
[CI 1 cluster_state] eq {ok} && [CI 0 cluster_state] eq {ok} &&
[llength [get_cluster_nodes 0 connected]] == 2 &&
[llength [get_cluster_nodes 1 connected]] == 2 &&
[CI 1 cluster_stats_messages_meet_sent] == [CI 0 cluster_stats_messages_meet_received]
} else {
fail "1 cluster_state:[CI 1 cluster_state], 0 cluster_state: [CI 0 cluster_state]"
Expand Down
Loading
Loading