From c099c2ae35f71ebb6a182abcd8ed1072b32ec091 Mon Sep 17 00:00:00 2001 From: Pierre Turin Date: Thu, 14 Nov 2024 19:53:29 +0000 Subject: [PATCH 1/9] [cluster-bus] Send a MEET packet to a node if there is no inbound link In some cases, when meeting a new node, if the handshake times out, we can end up with an inconsistent view of the cluster where the new node knows about all the nodes in the cluster, but the cluster does not know about this new node (or vice versa). To detect this inconsistency, we now check if a node has an outbound link but no inbound link, in this case it probably means this node does not know us. In this case we (re-)send a MEET packet to this node to do a new handshake with it. Signed-off-by: Pierre Turin --- src/cluster_legacy.c | 45 +++-- src/cluster_legacy.h | 3 + tests/support/cluster_util.tcl | 9 + tests/unit/cluster/cluster-reliable-meet.tcl | 168 ++++++++++++++++++- 4 files changed, 212 insertions(+), 13 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index e4b25e265d..0f0c53c742 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1335,6 +1335,10 @@ clusterLink *createClusterLink(clusterNode *node) { * with this link will have the 'link' field set to NULL. */ void freeClusterLink(clusterLink *link) { serverAssert(link != NULL); + serverLog(LL_DEBUG, "Freeing cluster link for node: %.40s:%s", + link->node ? link->node->name : "", + link->inbound ? "inbound" : "outbound"); + if (link->conn) { connClose(link->conn); link->conn = NULL; @@ -1350,6 +1354,7 @@ void freeClusterLink(clusterLink *link) { } else if (link->node->inbound_link == link) { serverAssert(link->inbound); link->node->inbound_link = NULL; + link->node->inbound_link_freed_time = mstime(); } } zfree(link); @@ -1489,6 +1494,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { node->fail_time = 0; node->link = NULL; node->inbound_link = NULL; + node->inbound_link_freed_time = node->ctime; memset(node->ip, 0, sizeof(node->ip)); node->announce_client_ipv4 = sdsempty(); node->announce_client_ipv6 = sdsempty(); @@ -1695,6 +1701,9 @@ void clusterAddNode(clusterNode *node) { * it is a replica node. */ void clusterDelNode(clusterNode *delnode) { + serverAssert(delnode != NULL); + serverLog(LL_DEBUG, "Deleting node %s from cluster view", delnode->name); + int j; dictIterator *di; dictEntry *de; @@ -2077,7 +2086,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { /* Return 1 if we already have a node in HANDSHAKE state matching the * specified ip address and port number. This function is used in order to * avoid adding a new handshake node for the same address multiple times. */ -int clusterHandshakeInProgress(char *ip, int port, int cport) { +static int clusterHandshakeInProgress(char *ip, int port, int cport) { dictIterator *di; dictEntry *de; @@ -2099,7 +2108,7 @@ int clusterHandshakeInProgress(char *ip, int port, int cport) { * * EAGAIN - There is already a handshake in progress for this address. * EINVAL - IP or port are not valid. */ -int clusterStartHandshake(char *ip, int port, int cport) { +static int clusterStartHandshake(char *ip, int port, int cport) { clusterNode *n; char norm_ip[NET_IP_STR_LEN]; struct sockaddr_storage sa; @@ -3225,12 +3234,12 @@ int clusterProcessPacket(clusterLink *link) { setClusterNodeToInboundClusterLink(node, link); clusterAddNode(node); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); - } - /* If this is a MEET packet from an unknown node, we still process - * the gossip section here since we have to trust the sender because - * of the message type. */ - if (!sender && type == CLUSTERMSG_TYPE_MEET) clusterProcessGossipSection(hdr, link); + /* If this is a MEET packet from an unknown node, we still process + * the gossip section here since we have to trust the sender because + * of the message type. */ + clusterProcessGossipSection(hdr, link); + } /* Anyway reply with a PONG */ clusterSendPing(link, CLUSTERMSG_TYPE_PONG); @@ -3241,7 +3250,7 @@ int clusterProcessPacket(clusterLink *link) { serverLog(LL_DEBUG, "%s packet received: %.40s", clusterGetMessageTypeString(type), link->node ? link->node->name : "NULL"); - if (sender && (sender->flags & CLUSTER_NODE_MEET)) { + if (sender && nodeIsMeeting(sender)) { /* Once we get a response for MEET from the sender, we can stop sending more MEET. */ sender->flags &= ~CLUSTER_NODE_MEET; serverLog(LL_NOTICE, "Successfully completed handshake with %.40s (%s)", sender->name, @@ -3666,7 +3675,7 @@ void clusterLinkConnectHandler(connection *conn) { * of a PING one, to force the receiver to add us in its node * table. */ mstime_t old_ping_sent = node->ping_sent; - clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING); + clusterSendPing(link, nodeIsMeeting(node) ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING); if (old_ping_sent) { /* If there was an active ping before the link was * disconnected, we want to restore the ping time, otherwise @@ -3686,6 +3695,9 @@ void clusterLinkConnectHandler(connection *conn) { */ serverLog(LL_DEBUG, "Connecting with Node %.40s at %s:%d", node->name, node->ip, node->cport); + if (nodeIsMeeting(node)) { + serverLog(LL_DEBUG, "Sending MEET packet on connection to node %.40s", node->name); + } } /* Performs sanity check on the message signature and length depending on the type. */ @@ -3745,7 +3757,9 @@ void clusterReadHandler(connection *conn) { if (nread <= 0) { /* I/O error... */ - serverLog(LL_DEBUG, "I/O error reading from node link: %s", + serverLog(LL_DEBUG, "I/O error reading from node link (%.40s:%s): %s", + link->node ? link->node->name : "", + link->inbound ? "inbound" : "outbound", (nread == 0) ? "connection closed" : connGetLastError(conn)); handleLinkIOError(link); return; @@ -4941,6 +4955,16 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_ clusterDelNode(node); return 1; } + if (node->link != NULL && node->inbound_link == NULL && + !nodeInHandshake(node) && !nodeIsMeeting(node) && !nodeTimedOut(node) && + now - node->inbound_link_freed_time > handshake_timeout) { + /* Node has an outbound link, but no inbound link for more than the handshake timeout. + * This probably means this node does not know us yet, wherease we know it. + * So we send it a MEET packet to do a handshake with it and correct the inconsistent cluster view. */ + node->flags |= CLUSTER_NODE_MEET; + serverLog(LL_NOTICE, "Sending MEET packet to node %.40s because there is no inbound link for it", node->name); + clusterSendPing(node->link, CLUSTERMSG_TYPE_MEET); + } if (node->link == NULL) { clusterLink *link = createClusterLink(node); @@ -4963,6 +4987,7 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_ return 0; } } + return 0; } diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 39148c748d..2864ea2c85 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -61,6 +61,7 @@ typedef struct clusterLink { #define nodeIsPrimary(n) ((n)->flags & CLUSTER_NODE_PRIMARY) #define nodeIsReplica(n) ((n)->flags & CLUSTER_NODE_REPLICA) #define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE) +#define nodeIsMeeting(n) ((n)->flags & CLUSTER_NODE_MEET) #define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR)) #define nodeTimedOut(n) ((n)->flags & CLUSTER_NODE_PFAIL) #define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL) @@ -343,6 +344,8 @@ struct _clusterNode { * failover scenarios. */ mstime_t repl_offset_time; /* Unix time we received offset for this node */ mstime_t orphaned_time; /* Starting time of orphaned primary condition */ + mstime_t inbound_link_freed_time; /* Last time we freed the inbound link for this node. + If it was never freed, it is the same as ctime */ long long repl_offset; /* Last known repl offset for this node. */ char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */ sds announce_client_ipv4; /* IPv4 for clients only. */ diff --git a/tests/support/cluster_util.tcl b/tests/support/cluster_util.tcl index 686f00071b..4f641c5e96 100644 --- a/tests/support/cluster_util.tcl +++ b/tests/support/cluster_util.tcl @@ -323,6 +323,15 @@ proc get_cluster_nodes {id {status "*"}} { return $nodes } +# Returns the parsed myself node entry as a dictionary. +proc get_myself id { + set nodes [get_cluster_nodes $id] + foreach n $nodes { + if {[cluster_has_flag $n myself]} {return $n} + } + return {} +} + # Returns 1 if no node knows node_id, 0 if any node knows it. proc node_is_forgotten {node_id} { for {set j 0} {$j < [llength $::servers]} {incr j} { diff --git a/tests/unit/cluster/cluster-reliable-meet.tcl b/tests/unit/cluster/cluster-reliable-meet.tcl index 45f5a6dc89..8bd93717f1 100644 --- a/tests/unit/cluster/cluster-reliable-meet.tcl +++ b/tests/unit/cluster/cluster-reliable-meet.tcl @@ -3,6 +3,12 @@ set old_singledb $::singledb set ::singledb 1 tags {tls:skip external:skip cluster} { + set CLUSTER_PACKET_TYPE_PING 0 + set CLUSTER_PACKET_TYPE_PONG 1 + set CLUSTER_PACKET_TYPE_MEET 2 + set CLUSTER_PACKET_TYPE_NONE -1 + set CLUSTER_PACKET_TYPE_ALL -2 + set base_conf [list cluster-enabled yes] start_multiple_servers 2 [list overrides $base_conf] { test "Cluster nodes are reachable" { @@ -22,9 +28,6 @@ tags {tls:skip external:skip cluster} { wait_for_cluster_state fail } - set CLUSTER_PACKET_TYPE_MEET 2 - set CLUSTER_PACKET_TYPE_NONE -1 - test "Cluster nodes haven't met each other" { assert {[llength [get_cluster_nodes 1]] == 1} assert {[llength [get_cluster_nodes 0]] == 1} @@ -75,3 +78,162 @@ tags {tls:skip external:skip cluster} { set ::singledb $old_singledb +proc cluster_get_first_node_in_handshake id { + set nodes [get_cluster_nodes $id] + foreach n $nodes { + if {[cluster_has_flag $n handshake]} { + return [dict get $n id] + } + } + return {} +} + +start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout 4000 cluster-replica-no-failover yes}} { + set CLUSTER_PACKET_TYPE_PING 0 + set CLUSTER_PACKET_TYPE_PONG 1 + set CLUSTER_PACKET_TYPE_MEET 2 + set CLUSTER_PACKET_TYPE_NONE -1 + set CLUSTER_PACKET_TYPE_ALL -2 + + test "Handshake eventually succeeds after node handshake timeout on both sides with inconsistent view of the cluster" { + set cluster_port [find_available_port $::baseport $::portcount] + start_server [list overrides [list cluster-enabled yes cluster-node-timeout 4000 cluster-port $cluster_port]] { + # In this test we will trigger a handshake timeout on both sides of the handshake. + # Node 1 and 2 already know each other, then we make node 1 meet node 0: + # + # Node 1 -- MEET -> Node 0 [Node 0 might learn about Node 2 from the gossip section of the msg] + # Node 1 <- PONG -- Node 0 [we drop this message, so Node 1 will eventually mark the handshake as timed out] + # Node 1 <- PING -- Node 0 [we drop this message, so Node 1 will never send a PONG and Node 0 will eventually mark the handshake as timed out] + # + # After the handshake is timed out, we allow all cluster bus messages to go through. + # Eventually Node 0 should send a MEET packet to the other nodes to complete the handshake. + + # Drop all cluster bus messages + R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_ALL + # Drop MEET cluster bus messages, so that Node 0 cannot start a handshake with Node 2. + R 2 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_MEET + + R 1 CLUSTER MEET [srv 0 host] [srv 0 port] $cluster_port + + # We want Node 0 to learn about Node 2 through the gossip section of the MEET message + set meet_retry 0 + set node2_id [dict get [get_myself 2] id] + while {[cluster_get_node_by_id 0 $node2_id] eq {}} { + if {$meet_retry == 10} { + error "assertion: Retried to meet Node 0 too many times" + } + # If Node 0 doesn't know about Node 1 & 2, it means Node 1 did not gossip about node 2 in its MEET message. + # So we kill the outbound link from Node 1 to Node 0, to force a reconnect and a re-send of the MEET message. + after 100 + # Since we are in handshake, we use a randomly generated ID we have to find + R 1 DEBUG CLUSTERLINK KILL ALL [cluster_get_first_node_in_handshake 1] + incr meet_retry 1 + } + + # Wait for Node 1's handshake to timeout + wait_for_condition 50 100 { + [cluster_get_first_node_in_handshake 1] eq {} + } else { + fail "Node 1 never exited handshake state" + } + + # Wait for Node 0's handshake to timeout + wait_for_condition 50 100 { + [cluster_get_first_node_in_handshake 1] eq {} + } else { + fail "Node 0 never exited handshake state" + } + + # At this point Node 0 knows Node 1 & 2 through the gossip, but they don't know Node 0. + wait_for_condition 50 100 { + [llength [R 0 CLUSTER NODES]] == 25 && + [llength [R 1 CLUSTER NODES]] == 18 && + [llength [R 2 CLUSTER NODES]] == 18 + } else { + fail "Unexpected CLUSTER NODES output, nodes 1 & 2 should not know node 0." + } + + # Allow all messages to go through again + R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE + R 2 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE + + # Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inblound link. + # Handshake should now complete successfully. + wait_for_condition 50 200 { + [llength [R 0 CLUSTER NODES]] == 26 && + [llength [R 1 CLUSTER NODES]] == 26 && + [llength [R 2 CLUSTER NODES]] == 26 + } else { + fail "Unexpected CLUSTER NODES output, all nodes should know each other." + } + } ;# stop Node 0 + } ;# test +} ;# stop cluster + +start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout 4000 cluster-replica-no-failover yes}} { + set CLUSTER_PACKET_TYPE_PING 0 + set CLUSTER_PACKET_TYPE_PONG 1 + set CLUSTER_PACKET_TYPE_MEET 2 + set CLUSTER_PACKET_TYPE_NONE -1 + set CLUSTER_PACKET_TYPE_ALL -2 + + test "Handshake eventually succeeds after node handshake timeout on one side with inconsistent view of the cluster" { + set cluster_port [find_available_port $::baseport $::portcount] + start_server [list overrides [list cluster-enabled yes cluster-node-timeout 4000 cluster-port $cluster_port]] { + # In this test we will trigger a handshake timeout on one side of the handshake. + # Node 1 and 2 already know each other, then we make node 0 meet node 1: + # + # Node 0 -- MEET -> Node 1 + # Node 0 <- PONG -- Node 1 + # Node 0 <- PING -- Node 1 [Node 0 will mark the handshake as successful] + # Node 0 -- PONG -> Node 1 [we drop this message, so node 1 will eventually mark the handshake as timed out] + # + # After the handshake is timed out, we allow all cluster bus messages to go through. + # Eventually Node 0 should send a MEET packet to the other nodes to complete the handshake. + + # Drop PONG messages + R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_PONG + # Drop MEET cluster bus messages, so that Node 0 cannot start a handshake with Node 2. + R 2 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_MEET + + # Node 0 meets node 1 + R 0 CLUSTER MEET [srv -1 host] [srv -1 port] + + # Wait for node 0 to know about the other nodes in the cluster + wait_for_condition 50 100 { + [llength [R 0 CLUSTER NODES]] == 26 + } else { + fail "Node 0 never learned about node 1 and 2" + } + # At this point, node 0 learned about the other nodes in the cluster from meeting node 1. + wait_for_condition 50 100 { + [cluster_get_first_node_in_handshake 0] eq {} + } else { + fail "Node 1 never exited handshake state" + } + # At this point, from node 0 point of view, the handshake with node 1 succeeded. + + wait_for_condition 50 100 { + [cluster_get_first_node_in_handshake 1] eq {} + } else { + fail "Node 1 never exited handshake state" + } + assert {[llength [R 1 CLUSTER NODES]] == 18} + # At this point, from node 1 point of view, the handshake with node 0 timed out. + + # Allow all messages + R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE + R 2 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE + + # Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inblound link. + # Handshake should now complete successfully. + wait_for_condition 50 200 { + [llength [R 0 CLUSTER NODES]] == 26 && + [llength [R 1 CLUSTER NODES]] == 26 && + [llength [R 2 CLUSTER NODES]] == 26 + } else { + fail "Unexpected CLUSTER NODES output, all nodes should know each other." + } + } ;# stop Node 0 + } ;# test +} ;# stop cluster From ce3468408f091396348544f17bc2602ec49a5544 Mon Sep 17 00:00:00 2001 From: Pierre Turin Date: Thu, 14 Nov 2024 20:09:08 +0000 Subject: [PATCH 2/9] Fix typo in comment Signed-off-by: Pierre Turin --- src/cluster_legacy.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 0f0c53c742..e79e0c4641 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -4959,7 +4959,7 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_ !nodeInHandshake(node) && !nodeIsMeeting(node) && !nodeTimedOut(node) && now - node->inbound_link_freed_time > handshake_timeout) { /* Node has an outbound link, but no inbound link for more than the handshake timeout. - * This probably means this node does not know us yet, wherease we know it. + * This probably means this node does not know us yet, whereas we know it. * So we send it a MEET packet to do a handshake with it and correct the inconsistent cluster view. */ node->flags |= CLUSTER_NODE_MEET; serverLog(LL_NOTICE, "Sending MEET packet to node %.40s because there is no inbound link for it", node->name); From 4ee754629c252e5b116b760f624a51658aa29eb5 Mon Sep 17 00:00:00 2001 From: Pierre Turin Date: Thu, 14 Nov 2024 23:13:41 +0000 Subject: [PATCH 3/9] Addressed PR comments Update test to check node IDs instead of relying on number of words. Rename nodeIsMeeting() to nodeInMeetState(). Introduce nodeInNormalState() macro. Signed-off-by: Pierre Turin --- src/cluster_legacy.c | 10 ++-- src/cluster_legacy.h | 3 +- tests/unit/cluster/cluster-reliable-meet.tcl | 63 ++++++++++++++++---- 3 files changed, 56 insertions(+), 20 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index e79e0c4641..ec6b9ba8be 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -3250,7 +3250,7 @@ int clusterProcessPacket(clusterLink *link) { serverLog(LL_DEBUG, "%s packet received: %.40s", clusterGetMessageTypeString(type), link->node ? link->node->name : "NULL"); - if (sender && nodeIsMeeting(sender)) { + if (sender && nodeInMeetState(sender)) { /* Once we get a response for MEET from the sender, we can stop sending more MEET. */ sender->flags &= ~CLUSTER_NODE_MEET; serverLog(LL_NOTICE, "Successfully completed handshake with %.40s (%s)", sender->name, @@ -3675,7 +3675,7 @@ void clusterLinkConnectHandler(connection *conn) { * of a PING one, to force the receiver to add us in its node * table. */ mstime_t old_ping_sent = node->ping_sent; - clusterSendPing(link, nodeIsMeeting(node) ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING); + clusterSendPing(link, nodeInMeetState(node) ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING); if (old_ping_sent) { /* If there was an active ping before the link was * disconnected, we want to restore the ping time, otherwise @@ -3695,7 +3695,7 @@ void clusterLinkConnectHandler(connection *conn) { */ serverLog(LL_DEBUG, "Connecting with Node %.40s at %s:%d", node->name, node->ip, node->cport); - if (nodeIsMeeting(node)) { + if (nodeInMeetState(node)) { serverLog(LL_DEBUG, "Sending MEET packet on connection to node %.40s", node->name); } } @@ -4955,8 +4955,7 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_ clusterDelNode(node); return 1; } - if (node->link != NULL && node->inbound_link == NULL && - !nodeInHandshake(node) && !nodeIsMeeting(node) && !nodeTimedOut(node) && + if (node->link != NULL && node->inbound_link == NULL && nodeInNormalState(node) && now - node->inbound_link_freed_time > handshake_timeout) { /* Node has an outbound link, but no inbound link for more than the handshake timeout. * This probably means this node does not know us yet, whereas we know it. @@ -4987,7 +4986,6 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_ return 0; } } - return 0; } diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 2864ea2c85..492ecc0fdb 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -61,13 +61,14 @@ typedef struct clusterLink { #define nodeIsPrimary(n) ((n)->flags & CLUSTER_NODE_PRIMARY) #define nodeIsReplica(n) ((n)->flags & CLUSTER_NODE_REPLICA) #define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE) -#define nodeIsMeeting(n) ((n)->flags & CLUSTER_NODE_MEET) +#define nodeInMeetState(n) ((n)->flags & CLUSTER_NODE_MEET) #define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR)) #define nodeTimedOut(n) ((n)->flags & CLUSTER_NODE_PFAIL) #define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL) #define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER) #define nodeSupportsExtensions(n) ((n)->flags & CLUSTER_NODE_EXTENSIONS_SUPPORTED) #define nodeSupportsLightMsgHdr(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_SUPPORTED) +#define nodeInNormalState(n) (!((n)->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET | CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL))) /* This structure represent elements of node->fail_reports. */ typedef struct clusterNodeFailReport { diff --git a/tests/unit/cluster/cluster-reliable-meet.tcl b/tests/unit/cluster/cluster-reliable-meet.tcl index 8bd93717f1..1a9d9666d0 100644 --- a/tests/unit/cluster/cluster-reliable-meet.tcl +++ b/tests/unit/cluster/cluster-reliable-meet.tcl @@ -88,6 +88,31 @@ proc cluster_get_first_node_in_handshake id { return {} } +proc cluster_3_nodes_all_know_each_other {} { + set node0_id [dict get [get_myself 0] id] + set node1_id [dict get [get_myself 1] id] + set node2_id [dict get [get_myself 2] id] + + if { + [cluster_get_node_by_id 0 $node0_id] != {} && + [cluster_get_node_by_id 0 $node1_id] != {} && + [cluster_get_node_by_id 0 $node2_id] != {} && + [cluster_get_node_by_id 1 $node0_id] != {} && + [cluster_get_node_by_id 1 $node1_id] != {} && + [cluster_get_node_by_id 1 $node2_id] != {} && + [cluster_get_node_by_id 2 $node0_id] != {} && + [cluster_get_node_by_id 2 $node1_id] != {} && + [cluster_get_node_by_id 2 $node2_id] != {} && + [llength [R 0 CLUSTER LINKS]] == 4 && + [llength [R 1 CLUSTER LINKS]] == 4 && + [llength [R 2 CLUSTER LINKS]] == 4 + } { + return 1 + } else { + return 0 + } +} + start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout 4000 cluster-replica-no-failover yes}} { set CLUSTER_PACKET_TYPE_PING 0 set CLUSTER_PACKET_TYPE_PONG 1 @@ -108,6 +133,10 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout # After the handshake is timed out, we allow all cluster bus messages to go through. # Eventually Node 0 should send a MEET packet to the other nodes to complete the handshake. + set node0_id [dict get [get_myself 0] id] + set node1_id [dict get [get_myself 1] id] + set node2_id [dict get [get_myself 2] id] + # Drop all cluster bus messages R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_ALL # Drop MEET cluster bus messages, so that Node 0 cannot start a handshake with Node 2. @@ -115,9 +144,15 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout R 1 CLUSTER MEET [srv 0 host] [srv 0 port] $cluster_port + # Wait for Node 0 to be in handshake + wait_for_condition 10 400 { + [cluster_get_first_node_in_handshake 0] != {} + } else { + fail "Node 0 never entered handshake state" + } + # We want Node 0 to learn about Node 2 through the gossip section of the MEET message set meet_retry 0 - set node2_id [dict get [get_myself 2] id] while {[cluster_get_node_by_id 0 $node2_id] eq {}} { if {$meet_retry == 10} { error "assertion: Retried to meet Node 0 too many times" @@ -146,9 +181,10 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout # At this point Node 0 knows Node 1 & 2 through the gossip, but they don't know Node 0. wait_for_condition 50 100 { - [llength [R 0 CLUSTER NODES]] == 25 && - [llength [R 1 CLUSTER NODES]] == 18 && - [llength [R 2 CLUSTER NODES]] == 18 + [cluster_get_node_by_id 0 $node1_id] != {} && + [cluster_get_node_by_id 0 $node2_id] != {} && + [cluster_get_node_by_id 1 $node0_id] eq {} && + [cluster_get_node_by_id 2 $node0_id] eq {} } else { fail "Unexpected CLUSTER NODES output, nodes 1 & 2 should not know node 0." } @@ -157,12 +193,10 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE R 2 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE - # Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inblound link. + # Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inbound link. # Handshake should now complete successfully. wait_for_condition 50 200 { - [llength [R 0 CLUSTER NODES]] == 26 && - [llength [R 1 CLUSTER NODES]] == 26 && - [llength [R 2 CLUSTER NODES]] == 26 + [cluster_3_nodes_all_know_each_other] } else { fail "Unexpected CLUSTER NODES output, all nodes should know each other." } @@ -191,6 +225,10 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout # After the handshake is timed out, we allow all cluster bus messages to go through. # Eventually Node 0 should send a MEET packet to the other nodes to complete the handshake. + set node0_id [dict get [get_myself 0] id] + set node1_id [dict get [get_myself 1] id] + set node2_id [dict get [get_myself 2] id] + # Drop PONG messages R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_PONG # Drop MEET cluster bus messages, so that Node 0 cannot start a handshake with Node 2. @@ -201,7 +239,8 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout # Wait for node 0 to know about the other nodes in the cluster wait_for_condition 50 100 { - [llength [R 0 CLUSTER NODES]] == 26 + [cluster_get_node_by_id 0 $node1_id] != {} && + [cluster_get_node_by_id 0 $node2_id] != {} } else { fail "Node 0 never learned about node 1 and 2" } @@ -218,7 +257,7 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout } else { fail "Node 1 never exited handshake state" } - assert {[llength [R 1 CLUSTER NODES]] == 18} + assert {[cluster_get_node_by_id 1 $node0_id] eq {}} # At this point, from node 1 point of view, the handshake with node 0 timed out. # Allow all messages @@ -228,9 +267,7 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout # Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inblound link. # Handshake should now complete successfully. wait_for_condition 50 200 { - [llength [R 0 CLUSTER NODES]] == 26 && - [llength [R 1 CLUSTER NODES]] == 26 && - [llength [R 2 CLUSTER NODES]] == 26 + [cluster_3_nodes_all_know_each_other] } else { fail "Unexpected CLUSTER NODES output, all nodes should know each other." } From 1edfa027509906330899c6deb85c0f0a6090e767 Mon Sep 17 00:00:00 2001 From: Pierre Turin Date: Wed, 27 Nov 2024 22:29:33 +0000 Subject: [PATCH 4/9] Fix bug where node is sending MEET packets in a loop If we receive a MEET packet from a known node, we disconnect the outbound link to force a reconnect and sending of a PING packet so that the other node recognizes the link as belonging to us. Also deflaked one of the tests. And improved testing code following PR comment. Signed-off-by: Pierre Turin --- src/cluster_legacy.c | 67 ++++++++++++-------- tests/unit/cluster/cluster-reliable-meet.tcl | 59 +++++++++-------- 2 files changed, 70 insertions(+), 56 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index ec6b9ba8be..7e8bb017de 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1702,7 +1702,7 @@ void clusterAddNode(clusterNode *node) { */ void clusterDelNode(clusterNode *delnode) { serverAssert(delnode != NULL); - serverLog(LL_DEBUG, "Deleting node %s from cluster view", delnode->name); + serverLog(LL_DEBUG, "Deleting node %.40s from cluster view", delnode->name); int j; dictIterator *di; @@ -3214,31 +3214,40 @@ int clusterProcessPacket(clusterLink *link) { } } - /* Add this node if it is new for us and the msg type is MEET. - * In this stage we don't try to add the node with the right - * flags, replicaof pointer, and so forth, as this details will be - * resolved when we'll receive PONGs from the node. The exception - * to this is the flag that indicates extensions are supported, as - * we want to send extensions right away in the return PONG in order - * to reduce the amount of time needed to stabilize the shard ID. */ - if (!sender && type == CLUSTERMSG_TYPE_MEET) { - clusterNode *node; - - node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE); - serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK); - getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port); - node->cport = ntohs(hdr->cport); - if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) { - node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED; - } - setClusterNodeToInboundClusterLink(node, link); - clusterAddNode(node); - clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); + if (type == CLUSTERMSG_TYPE_MEET) { + if (!sender) { + /* Add this node if it is new for us and the msg type is MEET. + * In this stage we don't try to add the node with the right + * flags, replicaof pointer, and so forth, as this details will be + * resolved when we'll receive PONGs from the node. The exception + * to this is the flag that indicates extensions are supported, as + * we want to send extensions right away in the return PONG in order + * to reduce the amount of time needed to stabilize the shard ID. */ + clusterNode *node; - /* If this is a MEET packet from an unknown node, we still process - * the gossip section here since we have to trust the sender because - * of the message type. */ - clusterProcessGossipSection(hdr, link); + node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE); + serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK); + getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port); + node->cport = ntohs(hdr->cport); + if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) { + node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED; + } + setClusterNodeToInboundClusterLink(node, link); + clusterAddNode(node); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); + + /* If this is a MEET packet from an unknown node, we still process + * the gossip section here since we have to trust the sender because + * of the message type. */ + clusterProcessGossipSection(hdr, link); + } else if (sender->link) { + /* The MEET packet is from a known node, so the sender thinks that I do not know it. + * Freeing my outbound link to that node, to force a reconnect and sending a PING. + * Once that node receives our PING, it should recognize the new connection as an inbound link from me. */ + serverAssert(link != sender->link); // we should always receive a MEET packet on an inbound link + serverLog(LL_NOTICE, "Freeing outbound link to node %.40s after receiving a MEET packet from this known node", sender->name); + freeClusterLink(sender->link); + } } /* Anyway reply with a PONG */ @@ -3695,9 +3704,6 @@ void clusterLinkConnectHandler(connection *conn) { */ serverLog(LL_DEBUG, "Connecting with Node %.40s at %s:%d", node->name, node->ip, node->cport); - if (nodeInMeetState(node)) { - serverLog(LL_DEBUG, "Sending MEET packet on connection to node %.40s", node->name); - } } /* Performs sanity check on the message signature and length depending on the type. */ @@ -3940,6 +3946,11 @@ void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) { /* Send a PING or PONG packet to the specified node, making sure to add enough * gossip information. */ void clusterSendPing(clusterLink *link, int type) { + serverLog(LL_DEBUG, "Sending %s packet to node %.40s (%s)", + clusterGetMessageTypeString(type), + link->node ? link->node->name : "", + link->inbound ? "inbound" : "outbound"); + static unsigned long long cluster_pings_sent = 0; cluster_pings_sent++; int gossipcount = 0; /* Number of gossip sections added so far. */ diff --git a/tests/unit/cluster/cluster-reliable-meet.tcl b/tests/unit/cluster/cluster-reliable-meet.tcl index 1a9d9666d0..f189e96d5b 100644 --- a/tests/unit/cluster/cluster-reliable-meet.tcl +++ b/tests/unit/cluster/cluster-reliable-meet.tcl @@ -88,29 +88,33 @@ proc cluster_get_first_node_in_handshake id { return {} } -proc cluster_3_nodes_all_know_each_other {} { - set node0_id [dict get [get_myself 0] id] - set node1_id [dict get [get_myself 1] id] - set node2_id [dict get [get_myself 2] id] - - if { - [cluster_get_node_by_id 0 $node0_id] != {} && - [cluster_get_node_by_id 0 $node1_id] != {} && - [cluster_get_node_by_id 0 $node2_id] != {} && - [cluster_get_node_by_id 1 $node0_id] != {} && - [cluster_get_node_by_id 1 $node1_id] != {} && - [cluster_get_node_by_id 1 $node2_id] != {} && - [cluster_get_node_by_id 2 $node0_id] != {} && - [cluster_get_node_by_id 2 $node1_id] != {} && - [cluster_get_node_by_id 2 $node2_id] != {} && - [llength [R 0 CLUSTER LINKS]] == 4 && - [llength [R 1 CLUSTER LINKS]] == 4 && - [llength [R 2 CLUSTER LINKS]] == 4 - } { - return 1 - } else { - return 0 +proc cluster_nodes_all_know_each_other {num_nodes} { + # Collect node IDs dynamically + set node_ids {} + for {set i 0} {$i < $num_nodes} {incr i} { + lappend node_ids [dict get [get_myself $i] id] } + + # Check if all nodes know each other + foreach node_id $node_ids { + foreach check_node_id $node_ids { + for {set node_index 0} {$node_index < $num_nodes} {incr node_index} { + if {[cluster_get_node_by_id $node_index $check_node_id] == {}} { + return 0 + } + } + } + } + + # Verify cluster link counts for each node + set expected_links [expr {2 * ($num_nodes - 1)}] + for {set i 0} {$i < $num_nodes} {incr i} { + if {[llength [R $i CLUSTER LINKS]] != $expected_links} { + return 0 + } + } + + return 1 } start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout 4000 cluster-replica-no-failover yes}} { @@ -196,7 +200,7 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout # Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inbound link. # Handshake should now complete successfully. wait_for_condition 50 200 { - [cluster_3_nodes_all_know_each_other] + [cluster_nodes_all_know_each_other 3] } else { fail "Unexpected CLUSTER NODES output, all nodes should know each other." } @@ -239,12 +243,11 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout # Wait for node 0 to know about the other nodes in the cluster wait_for_condition 50 100 { - [cluster_get_node_by_id 0 $node1_id] != {} && - [cluster_get_node_by_id 0 $node2_id] != {} + [cluster_get_node_by_id 0 $node1_id] != {} } else { - fail "Node 0 never learned about node 1 and 2" + fail "Node 0 never learned about node 1" } - # At this point, node 0 learned about the other nodes in the cluster from meeting node 1. + # At this point, node 0 knows about node 1 and might know node 2 if node 1 gossiped about it. wait_for_condition 50 100 { [cluster_get_first_node_in_handshake 0] eq {} } else { @@ -267,7 +270,7 @@ start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout # Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inblound link. # Handshake should now complete successfully. wait_for_condition 50 200 { - [cluster_3_nodes_all_know_each_other] + [cluster_nodes_all_know_each_other 3] } else { fail "Unexpected CLUSTER NODES output, all nodes should know each other." } From 7ae3b058d9d07b54787a2a929ccaea5776539bcd Mon Sep 17 00:00:00 2001 From: Pierre Turin Date: Wed, 27 Nov 2024 22:56:49 +0000 Subject: [PATCH 5/9] Fix formatting on comment Signed-off-by: Pierre Turin --- src/cluster_legacy.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 7e8bb017de..35b3d10b9e 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -3237,8 +3237,8 @@ int clusterProcessPacket(clusterLink *link) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); /* If this is a MEET packet from an unknown node, we still process - * the gossip section here since we have to trust the sender because - * of the message type. */ + * the gossip section here since we have to trust the sender because + * of the message type. */ clusterProcessGossipSection(hdr, link); } else if (sender->link) { /* The MEET packet is from a known node, so the sender thinks that I do not know it. From 85904dda908473510671acc75e1e4d643c3a3844 Mon Sep 17 00:00:00 2001 From: Pierre Turin Date: Mon, 2 Dec 2024 19:53:27 +0000 Subject: [PATCH 6/9] Free outbound link on MEET packet only after handshake timeout Signed-off-by: Pierre Turin --- src/cluster_legacy.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 35b3d10b9e..72fd26c0e5 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -3240,10 +3240,12 @@ int clusterProcessPacket(clusterLink *link) { * the gossip section here since we have to trust the sender because * of the message type. */ clusterProcessGossipSection(hdr, link); - } else if (sender->link) { + } else if (sender->link && now - sender->ctime > server.cluster_node_timeout) { /* The MEET packet is from a known node, so the sender thinks that I do not know it. * Freeing my outbound link to that node, to force a reconnect and sending a PING. - * Once that node receives our PING, it should recognize the new connection as an inbound link from me. */ + * Once that node receives our PING, it should recognize the new connection as an inbound link from me. + * We should only free the outbound link if the node is known for more time than the handshake timeout, + * since during this time, the other side might still be trying to complete the handshake. */ serverAssert(link != sender->link); // we should always receive a MEET packet on an inbound link serverLog(LL_NOTICE, "Freeing outbound link to node %.40s after receiving a MEET packet from this known node", sender->name); freeClusterLink(sender->link); From 6a017b0fd84714da35d0bccd7c38d074f470626f Mon Sep 17 00:00:00 2001 From: Pierre Turin Date: Mon, 2 Dec 2024 20:04:37 +0000 Subject: [PATCH 7/9] Deflake cluster-multiple-meets test Sometimes the outbound link from node 0 to node 1 can be disconnected. Assert that node 0 know node 1 without expecting the node to be marked as connected. Signed-off-by: Pierre Turin --- tests/unit/cluster/cluster-multiple-meets.tcl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/unit/cluster/cluster-multiple-meets.tcl b/tests/unit/cluster/cluster-multiple-meets.tcl index 059f03fbe4..0b5f769930 100644 --- a/tests/unit/cluster/cluster-multiple-meets.tcl +++ b/tests/unit/cluster/cluster-multiple-meets.tcl @@ -58,7 +58,7 @@ tags {tls:skip external:skip cluster} { } else { fail "Node 1 recognizes node 0 even though it drops PONGs from node 0" } - assert {[llength [get_cluster_nodes 0 connected]] == 2} + assert {[llength [get_cluster_nodes 0]] == 2} # Drop incoming and outgoing links from/to 1 R 0 DEBUG CLUSTERLINK KILL ALL [R 1 CLUSTER MYID] @@ -77,6 +77,8 @@ tags {tls:skip external:skip cluster} { # Both a and b will turn to cluster state ok wait_for_condition 1000 50 { [CI 1 cluster_state] eq {ok} && [CI 0 cluster_state] eq {ok} && + [llength [get_cluster_nodes 0 connected]] == 2 && + [llength [get_cluster_nodes 1 connected]] == 2 && [CI 1 cluster_stats_messages_meet_sent] == [CI 0 cluster_stats_messages_meet_received] } else { fail "1 cluster_state:[CI 1 cluster_state], 0 cluster_state: [CI 0 cluster_state]" From cacd76cd7bebdeddceea871e22d6ffb45b4a781c Mon Sep 17 00:00:00 2001 From: Pierre Turin Date: Tue, 3 Dec 2024 21:40:43 +0000 Subject: [PATCH 8/9] Update comments Signed-off-by: Pierre Turin --- src/cluster_legacy.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 72fd26c0e5..ea77b151ba 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -3241,13 +3241,17 @@ int clusterProcessPacket(clusterLink *link) { * of the message type. */ clusterProcessGossipSection(hdr, link); } else if (sender->link && now - sender->ctime > server.cluster_node_timeout) { - /* The MEET packet is from a known node, so the sender thinks that I do not know it. + /* The MEET packet is from a known node, after the handshake timeout, so the sender thinks that I do not + * know it. * Freeing my outbound link to that node, to force a reconnect and sending a PING. * Once that node receives our PING, it should recognize the new connection as an inbound link from me. * We should only free the outbound link if the node is known for more time than the handshake timeout, * since during this time, the other side might still be trying to complete the handshake. */ - serverAssert(link != sender->link); // we should always receive a MEET packet on an inbound link - serverLog(LL_NOTICE, "Freeing outbound link to node %.40s after receiving a MEET packet from this known node", sender->name); + + /* We should always receive a MEET packet on an inbound link. */ + serverAssert(link != sender->link); + serverLog(LL_NOTICE, "Freeing outbound link to node %.40s after receiving a MEET packet from this known node", + sender->name); freeClusterLink(sender->link); } } From 8086a070f71b90fafcbe5df3564082622f382400 Mon Sep 17 00:00:00 2001 From: Pierre Turin Date: Thu, 12 Dec 2024 00:54:09 +0000 Subject: [PATCH 9/9] Update debug log format when sending ping Signed-off-by: Pierre Turin --- src/cluster_legacy.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index ea77b151ba..ed03fe5ed2 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -3952,9 +3952,10 @@ void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) { /* Send a PING or PONG packet to the specified node, making sure to add enough * gossip information. */ void clusterSendPing(clusterLink *link, int type) { - serverLog(LL_DEBUG, "Sending %s packet to node %.40s (%s)", + serverLog(LL_DEBUG, "Sending %s packet to node %.40s (%s) on %s link", clusterGetMessageTypeString(type), link->node ? link->node->name : "", + link->node ? link->node->human_nodename : "", link->inbound ? "inbound" : "outbound"); static unsigned long long cluster_pings_sent = 0;