Skip to content

Commit

Permalink
Fix bug where node is sending MEET packets in a loop
Browse files Browse the repository at this point in the history
If we receive a MEET packet from a known node, we disconnect the
outbound link to force a reconnect and sending of a PING packet so that
the other node recognizes the link as belonging to us.

Also deflaked one of the tests. And improved testing code following PR
comment.

Signed-off-by: Pierre Turin <pieturin@amazon.com>
  • Loading branch information
pieturin committed Nov 27, 2024
1 parent 6c67d41 commit 68324ec
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 56 deletions.
67 changes: 39 additions & 28 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1702,7 +1702,7 @@ void clusterAddNode(clusterNode *node) {
*/
void clusterDelNode(clusterNode *delnode) {
serverAssert(delnode != NULL);
serverLog(LL_DEBUG, "Deleting node %s from cluster view", delnode->name);
serverLog(LL_DEBUG, "Deleting node %.40s from cluster view", delnode->name);

int j;
dictIterator *di;
Expand Down Expand Up @@ -3213,31 +3213,40 @@ int clusterProcessPacket(clusterLink *link) {
}
}

/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, replicaof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node. The exception
* to this is the flag that indicates extensions are supported, as
* we want to send extensions right away in the return PONG in order
* to reduce the amount of time needed to stabilize the shard ID. */
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;

node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE);
serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK);
getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port);
node->cport = ntohs(hdr->cport);
if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
}
setClusterNodeToInboundClusterLink(node, link);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
if (type == CLUSTERMSG_TYPE_MEET) {
if (!sender) {
/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, replicaof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node. The exception
* to this is the flag that indicates extensions are supported, as
* we want to send extensions right away in the return PONG in order
* to reduce the amount of time needed to stabilize the shard ID. */
clusterNode *node;

/* If this is a MEET packet from an unknown node, we still process
* the gossip section here since we have to trust the sender because
* of the message type. */
clusterProcessGossipSection(hdr, link);
node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE);
serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK);
getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port);
node->cport = ntohs(hdr->cport);
if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
}
setClusterNodeToInboundClusterLink(node, link);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);

/* If this is a MEET packet from an unknown node, we still process
* the gossip section here since we have to trust the sender because
* of the message type. */
clusterProcessGossipSection(hdr, link);
} else if (sender->link) {
/* The MEET packet is from a known node, so the sender thinks that I do not know it.
* Freeing my outbound link to that node, to force a reconnect and sending a PING.
* Once that node receives our PING, it should recognize the new connection as an inbound link from me. */
serverAssert(link != sender->link); // we should always receive a MEET packet on an inbound link
serverLog(LL_NOTICE, "Freeing outbound link to node %.40s after receiving a MEET packet from this known node", sender->name);
freeClusterLink(sender->link);
}
}

/* Anyway reply with a PONG */
Expand Down Expand Up @@ -3694,9 +3703,6 @@ void clusterLinkConnectHandler(connection *conn) {
*/

serverLog(LL_DEBUG, "Connecting with Node %.40s at %s:%d", node->name, node->ip, node->cport);
if (nodeInMeetState(node)) {
serverLog(LL_DEBUG, "Sending MEET packet on connection to node %.40s", node->name);
}
}

/* Performs sanity check on the message signature and length depending on the type. */
Expand Down Expand Up @@ -3939,6 +3945,11 @@ void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
/* Send a PING or PONG packet to the specified node, making sure to add enough
* gossip information. */
void clusterSendPing(clusterLink *link, int type) {
serverLog(LL_DEBUG, "Sending %s packet to node %.40s (%s)",
clusterGetMessageTypeString(type),
link->node ? link->node->name : "<unknown>",
link->inbound ? "inbound" : "outbound");

static unsigned long long cluster_pings_sent = 0;
cluster_pings_sent++;
int gossipcount = 0; /* Number of gossip sections added so far. */
Expand Down
59 changes: 31 additions & 28 deletions tests/unit/cluster/cluster-reliable-meet.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -88,29 +88,33 @@ proc cluster_get_first_node_in_handshake id {
return {}
}

proc cluster_3_nodes_all_know_each_other {} {
set node0_id [dict get [get_myself 0] id]
set node1_id [dict get [get_myself 1] id]
set node2_id [dict get [get_myself 2] id]

if {
[cluster_get_node_by_id 0 $node0_id] != {} &&
[cluster_get_node_by_id 0 $node1_id] != {} &&
[cluster_get_node_by_id 0 $node2_id] != {} &&
[cluster_get_node_by_id 1 $node0_id] != {} &&
[cluster_get_node_by_id 1 $node1_id] != {} &&
[cluster_get_node_by_id 1 $node2_id] != {} &&
[cluster_get_node_by_id 2 $node0_id] != {} &&
[cluster_get_node_by_id 2 $node1_id] != {} &&
[cluster_get_node_by_id 2 $node2_id] != {} &&
[llength [R 0 CLUSTER LINKS]] == 4 &&
[llength [R 1 CLUSTER LINKS]] == 4 &&
[llength [R 2 CLUSTER LINKS]] == 4
} {
return 1
} else {
return 0
proc cluster_nodes_all_know_each_other {num_nodes} {
# Collect node IDs dynamically
set node_ids {}
for {set i 0} {$i < $num_nodes} {incr i} {
lappend node_ids [dict get [get_myself $i] id]
}

# Check if all nodes know each other
foreach node_id $node_ids {
foreach check_node_id $node_ids {
for {set node_index 0} {$node_index < $num_nodes} {incr node_index} {
if {[cluster_get_node_by_id $node_index $check_node_id] == {}} {
return 0
}
}
}
}

# Verify cluster link counts for each node
set expected_links [expr {2 * ($num_nodes - 1)}]
for {set i 0} {$i < $num_nodes} {incr i} {
if {[llength [R $i CLUSTER LINKS]] != $expected_links} {
return 0
}
}

return 1
}

start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout 4000 cluster-replica-no-failover yes}} {
Expand Down Expand Up @@ -196,7 +200,7 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout
# Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inbound link.
# Handshake should now complete successfully.
wait_for_condition 50 200 {
[cluster_3_nodes_all_know_each_other]
[cluster_nodes_all_know_each_other 3]
} else {
fail "Unexpected CLUSTER NODES output, all nodes should know each other."
}
Expand Down Expand Up @@ -239,12 +243,11 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout

# Wait for node 0 to know about the other nodes in the cluster
wait_for_condition 50 100 {
[cluster_get_node_by_id 0 $node1_id] != {} &&
[cluster_get_node_by_id 0 $node2_id] != {}
[cluster_get_node_by_id 0 $node1_id] != {}
} else {
fail "Node 0 never learned about node 1 and 2"
fail "Node 0 never learned about node 1"
}
# At this point, node 0 learned about the other nodes in the cluster from meeting node 1.
# At this point, node 0 knows about node 1 and might know node 2 if node 1 gossiped about it.
wait_for_condition 50 100 {
[cluster_get_first_node_in_handshake 0] eq {}
} else {
Expand All @@ -267,7 +270,7 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout
# Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inblound link.
# Handshake should now complete successfully.
wait_for_condition 50 200 {
[cluster_3_nodes_all_know_each_other]
[cluster_nodes_all_know_each_other 3]
} else {
fail "Unexpected CLUSTER NODES output, all nodes should know each other."
}
Expand Down

0 comments on commit 68324ec

Please sign in to comment.