diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index c966386d84..2218a376c7 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1702,7 +1702,7 @@ void clusterAddNode(clusterNode *node) { */ void clusterDelNode(clusterNode *delnode) { serverAssert(delnode != NULL); - serverLog(LL_DEBUG, "Deleting node %s from cluster view", delnode->name); + serverLog(LL_DEBUG, "Deleting node %.40s from cluster view", delnode->name); int j; dictIterator *di; @@ -3213,31 +3213,40 @@ int clusterProcessPacket(clusterLink *link) { } } - /* Add this node if it is new for us and the msg type is MEET. - * In this stage we don't try to add the node with the right - * flags, replicaof pointer, and so forth, as this details will be - * resolved when we'll receive PONGs from the node. The exception - * to this is the flag that indicates extensions are supported, as - * we want to send extensions right away in the return PONG in order - * to reduce the amount of time needed to stabilize the shard ID. */ - if (!sender && type == CLUSTERMSG_TYPE_MEET) { - clusterNode *node; - - node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE); - serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK); - getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port); - node->cport = ntohs(hdr->cport); - if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) { - node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED; - } - setClusterNodeToInboundClusterLink(node, link); - clusterAddNode(node); - clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); + if (type == CLUSTERMSG_TYPE_MEET) { + if (!sender) { + /* Add this node if it is new for us and the msg type is MEET. + * In this stage we don't try to add the node with the right + * flags, replicaof pointer, and so forth, as this details will be + * resolved when we'll receive PONGs from the node. The exception + * to this is the flag that indicates extensions are supported, as + * we want to send extensions right away in the return PONG in order + * to reduce the amount of time needed to stabilize the shard ID. */ + clusterNode *node; - /* If this is a MEET packet from an unknown node, we still process - * the gossip section here since we have to trust the sender because - * of the message type. */ - clusterProcessGossipSection(hdr, link); + node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE); + serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK); + getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port); + node->cport = ntohs(hdr->cport); + if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) { + node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED; + } + setClusterNodeToInboundClusterLink(node, link); + clusterAddNode(node); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); + + /* If this is a MEET packet from an unknown node, we still process + * the gossip section here since we have to trust the sender because + * of the message type. */ + clusterProcessGossipSection(hdr, link); + } else if (sender->link) { + /* The MEET packet is from a known node, so the sender thinks that I do not know it. + * Freeing my outbound link to that node, to force a reconnect and sending a PING. + * Once that node receives our PING, it should recognize the new connection as an inbound link from me. */ + serverAssert(link != sender->link); // we should always receive a MEET packet on an inbound link + serverLog(LL_NOTICE, "Freeing outbound link to node %.40s after receiving a MEET packet from this known node", sender->name); + freeClusterLink(sender->link); + } } /* Anyway reply with a PONG */ @@ -3694,9 +3703,6 @@ void clusterLinkConnectHandler(connection *conn) { */ serverLog(LL_DEBUG, "Connecting with Node %.40s at %s:%d", node->name, node->ip, node->cport); - if (nodeInMeetState(node)) { - serverLog(LL_DEBUG, "Sending MEET packet on connection to node %.40s", node->name); - } } /* Performs sanity check on the message signature and length depending on the type. */ @@ -3939,6 +3945,11 @@ void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) { /* Send a PING or PONG packet to the specified node, making sure to add enough * gossip information. */ void clusterSendPing(clusterLink *link, int type) { + serverLog(LL_DEBUG, "Sending %s packet to node %.40s (%s)", + clusterGetMessageTypeString(type), + link->node ? link->node->name : "", + link->inbound ? "inbound" : "outbound"); + static unsigned long long cluster_pings_sent = 0; cluster_pings_sent++; int gossipcount = 0; /* Number of gossip sections added so far. */ diff --git a/tests/unit/cluster/cluster-reliable-meet.tcl b/tests/unit/cluster/cluster-reliable-meet.tcl index 1a9d9666d0..f189e96d5b 100644 --- a/tests/unit/cluster/cluster-reliable-meet.tcl +++ b/tests/unit/cluster/cluster-reliable-meet.tcl @@ -88,29 +88,33 @@ proc cluster_get_first_node_in_handshake id { return {} } -proc cluster_3_nodes_all_know_each_other {} { - set node0_id [dict get [get_myself 0] id] - set node1_id [dict get [get_myself 1] id] - set node2_id [dict get [get_myself 2] id] - - if { - [cluster_get_node_by_id 0 $node0_id] != {} && - [cluster_get_node_by_id 0 $node1_id] != {} && - [cluster_get_node_by_id 0 $node2_id] != {} && - [cluster_get_node_by_id 1 $node0_id] != {} && - [cluster_get_node_by_id 1 $node1_id] != {} && - [cluster_get_node_by_id 1 $node2_id] != {} && - [cluster_get_node_by_id 2 $node0_id] != {} && - [cluster_get_node_by_id 2 $node1_id] != {} && - [cluster_get_node_by_id 2 $node2_id] != {} && - [llength [R 0 CLUSTER LINKS]] == 4 && - [llength [R 1 CLUSTER LINKS]] == 4 && - [llength [R 2 CLUSTER LINKS]] == 4 - } { - return 1 - } else { - return 0 +proc cluster_nodes_all_know_each_other {num_nodes} { + # Collect node IDs dynamically + set node_ids {} + for {set i 0} {$i < $num_nodes} {incr i} { + lappend node_ids [dict get [get_myself $i] id] } + + # Check if all nodes know each other + foreach node_id $node_ids { + foreach check_node_id $node_ids { + for {set node_index 0} {$node_index < $num_nodes} {incr node_index} { + if {[cluster_get_node_by_id $node_index $check_node_id] == {}} { + return 0 + } + } + } + } + + # Verify cluster link counts for each node + set expected_links [expr {2 * ($num_nodes - 1)}] + for {set i 0} {$i < $num_nodes} {incr i} { + if {[llength [R $i CLUSTER LINKS]] != $expected_links} { + return 0 + } + } + + return 1 } start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout 4000 cluster-replica-no-failover yes}} { @@ -196,7 +200,7 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout # Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inbound link. # Handshake should now complete successfully. wait_for_condition 50 200 { - [cluster_3_nodes_all_know_each_other] + [cluster_nodes_all_know_each_other 3] } else { fail "Unexpected CLUSTER NODES output, all nodes should know each other." } @@ -239,12 +243,11 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout # Wait for node 0 to know about the other nodes in the cluster wait_for_condition 50 100 { - [cluster_get_node_by_id 0 $node1_id] != {} && - [cluster_get_node_by_id 0 $node2_id] != {} + [cluster_get_node_by_id 0 $node1_id] != {} } else { - fail "Node 0 never learned about node 1 and 2" + fail "Node 0 never learned about node 1" } - # At this point, node 0 learned about the other nodes in the cluster from meeting node 1. + # At this point, node 0 knows about node 1 and might know node 2 if node 1 gossiped about it. wait_for_condition 50 100 { [cluster_get_first_node_in_handshake 0] eq {} } else { @@ -267,7 +270,7 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout # Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inblound link. # Handshake should now complete successfully. wait_for_condition 50 200 { - [cluster_3_nodes_all_know_each_other] + [cluster_nodes_all_know_each_other 3] } else { fail "Unexpected CLUSTER NODES output, all nodes should know each other." }