diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index e4b25e265d..ed03fe5ed2 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1335,6 +1335,10 @@ clusterLink *createClusterLink(clusterNode *node) { * with this link will have the 'link' field set to NULL. */ void freeClusterLink(clusterLink *link) { serverAssert(link != NULL); + serverLog(LL_DEBUG, "Freeing cluster link for node: %.40s:%s", + link->node ? link->node->name : "", + link->inbound ? "inbound" : "outbound"); + if (link->conn) { connClose(link->conn); link->conn = NULL; @@ -1350,6 +1354,7 @@ void freeClusterLink(clusterLink *link) { } else if (link->node->inbound_link == link) { serverAssert(link->inbound); link->node->inbound_link = NULL; + link->node->inbound_link_freed_time = mstime(); } } zfree(link); @@ -1489,6 +1494,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { node->fail_time = 0; node->link = NULL; node->inbound_link = NULL; + node->inbound_link_freed_time = node->ctime; memset(node->ip, 0, sizeof(node->ip)); node->announce_client_ipv4 = sdsempty(); node->announce_client_ipv6 = sdsempty(); @@ -1695,6 +1701,9 @@ void clusterAddNode(clusterNode *node) { * it is a replica node. */ void clusterDelNode(clusterNode *delnode) { + serverAssert(delnode != NULL); + serverLog(LL_DEBUG, "Deleting node %.40s from cluster view", delnode->name); + int j; dictIterator *di; dictEntry *de; @@ -2077,7 +2086,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { /* Return 1 if we already have a node in HANDSHAKE state matching the * specified ip address and port number. This function is used in order to * avoid adding a new handshake node for the same address multiple times. */ -int clusterHandshakeInProgress(char *ip, int port, int cport) { +static int clusterHandshakeInProgress(char *ip, int port, int cport) { dictIterator *di; dictEntry *de; @@ -2099,7 +2108,7 @@ int clusterHandshakeInProgress(char *ip, int port, int cport) { * * EAGAIN - There is already a handshake in progress for this address. * EINVAL - IP or port are not valid. */ -int clusterStartHandshake(char *ip, int port, int cport) { +static int clusterStartHandshake(char *ip, int port, int cport) { clusterNode *n; char norm_ip[NET_IP_STR_LEN]; struct sockaddr_storage sa; @@ -3205,33 +3214,48 @@ int clusterProcessPacket(clusterLink *link) { } } - /* Add this node if it is new for us and the msg type is MEET. - * In this stage we don't try to add the node with the right - * flags, replicaof pointer, and so forth, as this details will be - * resolved when we'll receive PONGs from the node. The exception - * to this is the flag that indicates extensions are supported, as - * we want to send extensions right away in the return PONG in order - * to reduce the amount of time needed to stabilize the shard ID. */ - if (!sender && type == CLUSTERMSG_TYPE_MEET) { - clusterNode *node; - - node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE); - serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK); - getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port); - node->cport = ntohs(hdr->cport); - if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) { - node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED; + if (type == CLUSTERMSG_TYPE_MEET) { + if (!sender) { + /* Add this node if it is new for us and the msg type is MEET. + * In this stage we don't try to add the node with the right + * flags, replicaof pointer, and so forth, as this details will be + * resolved when we'll receive PONGs from the node. The exception + * to this is the flag that indicates extensions are supported, as + * we want to send extensions right away in the return PONG in order + * to reduce the amount of time needed to stabilize the shard ID. */ + clusterNode *node; + + node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE); + serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK); + getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port); + node->cport = ntohs(hdr->cport); + if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) { + node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED; + } + setClusterNodeToInboundClusterLink(node, link); + clusterAddNode(node); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); + + /* If this is a MEET packet from an unknown node, we still process + * the gossip section here since we have to trust the sender because + * of the message type. */ + clusterProcessGossipSection(hdr, link); + } else if (sender->link && now - sender->ctime > server.cluster_node_timeout) { + /* The MEET packet is from a known node, after the handshake timeout, so the sender thinks that I do not + * know it. + * Freeing my outbound link to that node, to force a reconnect and sending a PING. + * Once that node receives our PING, it should recognize the new connection as an inbound link from me. + * We should only free the outbound link if the node is known for more time than the handshake timeout, + * since during this time, the other side might still be trying to complete the handshake. */ + + /* We should always receive a MEET packet on an inbound link. */ + serverAssert(link != sender->link); + serverLog(LL_NOTICE, "Freeing outbound link to node %.40s after receiving a MEET packet from this known node", + sender->name); + freeClusterLink(sender->link); } - setClusterNodeToInboundClusterLink(node, link); - clusterAddNode(node); - clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } - /* If this is a MEET packet from an unknown node, we still process - * the gossip section here since we have to trust the sender because - * of the message type. */ - if (!sender && type == CLUSTERMSG_TYPE_MEET) clusterProcessGossipSection(hdr, link); - /* Anyway reply with a PONG */ clusterSendPing(link, CLUSTERMSG_TYPE_PONG); } @@ -3241,7 +3265,7 @@ int clusterProcessPacket(clusterLink *link) { serverLog(LL_DEBUG, "%s packet received: %.40s", clusterGetMessageTypeString(type), link->node ? link->node->name : "NULL"); - if (sender && (sender->flags & CLUSTER_NODE_MEET)) { + if (sender && nodeInMeetState(sender)) { /* Once we get a response for MEET from the sender, we can stop sending more MEET. */ sender->flags &= ~CLUSTER_NODE_MEET; serverLog(LL_NOTICE, "Successfully completed handshake with %.40s (%s)", sender->name, @@ -3666,7 +3690,7 @@ void clusterLinkConnectHandler(connection *conn) { * of a PING one, to force the receiver to add us in its node * table. */ mstime_t old_ping_sent = node->ping_sent; - clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING); + clusterSendPing(link, nodeInMeetState(node) ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING); if (old_ping_sent) { /* If there was an active ping before the link was * disconnected, we want to restore the ping time, otherwise @@ -3745,7 +3769,9 @@ void clusterReadHandler(connection *conn) { if (nread <= 0) { /* I/O error... */ - serverLog(LL_DEBUG, "I/O error reading from node link: %s", + serverLog(LL_DEBUG, "I/O error reading from node link (%.40s:%s): %s", + link->node ? link->node->name : "", + link->inbound ? "inbound" : "outbound", (nread == 0) ? "connection closed" : connGetLastError(conn)); handleLinkIOError(link); return; @@ -3926,6 +3952,12 @@ void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) { /* Send a PING or PONG packet to the specified node, making sure to add enough * gossip information. */ void clusterSendPing(clusterLink *link, int type) { + serverLog(LL_DEBUG, "Sending %s packet to node %.40s (%s) on %s link", + clusterGetMessageTypeString(type), + link->node ? link->node->name : "", + link->node ? link->node->human_nodename : "", + link->inbound ? "inbound" : "outbound"); + static unsigned long long cluster_pings_sent = 0; cluster_pings_sent++; int gossipcount = 0; /* Number of gossip sections added so far. */ @@ -4941,6 +4973,15 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_ clusterDelNode(node); return 1; } + if (node->link != NULL && node->inbound_link == NULL && nodeInNormalState(node) && + now - node->inbound_link_freed_time > handshake_timeout) { + /* Node has an outbound link, but no inbound link for more than the handshake timeout. + * This probably means this node does not know us yet, whereas we know it. + * So we send it a MEET packet to do a handshake with it and correct the inconsistent cluster view. */ + node->flags |= CLUSTER_NODE_MEET; + serverLog(LL_NOTICE, "Sending MEET packet to node %.40s because there is no inbound link for it", node->name); + clusterSendPing(node->link, CLUSTERMSG_TYPE_MEET); + } if (node->link == NULL) { clusterLink *link = createClusterLink(node); diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 39148c748d..492ecc0fdb 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -61,12 +61,14 @@ typedef struct clusterLink { #define nodeIsPrimary(n) ((n)->flags & CLUSTER_NODE_PRIMARY) #define nodeIsReplica(n) ((n)->flags & CLUSTER_NODE_REPLICA) #define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE) +#define nodeInMeetState(n) ((n)->flags & CLUSTER_NODE_MEET) #define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR)) #define nodeTimedOut(n) ((n)->flags & CLUSTER_NODE_PFAIL) #define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL) #define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER) #define nodeSupportsExtensions(n) ((n)->flags & CLUSTER_NODE_EXTENSIONS_SUPPORTED) #define nodeSupportsLightMsgHdr(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_SUPPORTED) +#define nodeInNormalState(n) (!((n)->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET | CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL))) /* This structure represent elements of node->fail_reports. */ typedef struct clusterNodeFailReport { @@ -343,6 +345,8 @@ struct _clusterNode { * failover scenarios. */ mstime_t repl_offset_time; /* Unix time we received offset for this node */ mstime_t orphaned_time; /* Starting time of orphaned primary condition */ + mstime_t inbound_link_freed_time; /* Last time we freed the inbound link for this node. + If it was never freed, it is the same as ctime */ long long repl_offset; /* Last known repl offset for this node. */ char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */ sds announce_client_ipv4; /* IPv4 for clients only. */ diff --git a/tests/support/cluster_util.tcl b/tests/support/cluster_util.tcl index 686f00071b..4f641c5e96 100644 --- a/tests/support/cluster_util.tcl +++ b/tests/support/cluster_util.tcl @@ -323,6 +323,15 @@ proc get_cluster_nodes {id {status "*"}} { return $nodes } +# Returns the parsed myself node entry as a dictionary. +proc get_myself id { + set nodes [get_cluster_nodes $id] + foreach n $nodes { + if {[cluster_has_flag $n myself]} {return $n} + } + return {} +} + # Returns 1 if no node knows node_id, 0 if any node knows it. proc node_is_forgotten {node_id} { for {set j 0} {$j < [llength $::servers]} {incr j} { diff --git a/tests/unit/cluster/cluster-multiple-meets.tcl b/tests/unit/cluster/cluster-multiple-meets.tcl index 059f03fbe4..0b5f769930 100644 --- a/tests/unit/cluster/cluster-multiple-meets.tcl +++ b/tests/unit/cluster/cluster-multiple-meets.tcl @@ -58,7 +58,7 @@ tags {tls:skip external:skip cluster} { } else { fail "Node 1 recognizes node 0 even though it drops PONGs from node 0" } - assert {[llength [get_cluster_nodes 0 connected]] == 2} + assert {[llength [get_cluster_nodes 0]] == 2} # Drop incoming and outgoing links from/to 1 R 0 DEBUG CLUSTERLINK KILL ALL [R 1 CLUSTER MYID] @@ -77,6 +77,8 @@ tags {tls:skip external:skip cluster} { # Both a and b will turn to cluster state ok wait_for_condition 1000 50 { [CI 1 cluster_state] eq {ok} && [CI 0 cluster_state] eq {ok} && + [llength [get_cluster_nodes 0 connected]] == 2 && + [llength [get_cluster_nodes 1 connected]] == 2 && [CI 1 cluster_stats_messages_meet_sent] == [CI 0 cluster_stats_messages_meet_received] } else { fail "1 cluster_state:[CI 1 cluster_state], 0 cluster_state: [CI 0 cluster_state]" diff --git a/tests/unit/cluster/cluster-reliable-meet.tcl b/tests/unit/cluster/cluster-reliable-meet.tcl index 45f5a6dc89..f189e96d5b 100644 --- a/tests/unit/cluster/cluster-reliable-meet.tcl +++ b/tests/unit/cluster/cluster-reliable-meet.tcl @@ -3,6 +3,12 @@ set old_singledb $::singledb set ::singledb 1 tags {tls:skip external:skip cluster} { + set CLUSTER_PACKET_TYPE_PING 0 + set CLUSTER_PACKET_TYPE_PONG 1 + set CLUSTER_PACKET_TYPE_MEET 2 + set CLUSTER_PACKET_TYPE_NONE -1 + set CLUSTER_PACKET_TYPE_ALL -2 + set base_conf [list cluster-enabled yes] start_multiple_servers 2 [list overrides $base_conf] { test "Cluster nodes are reachable" { @@ -22,9 +28,6 @@ tags {tls:skip external:skip cluster} { wait_for_cluster_state fail } - set CLUSTER_PACKET_TYPE_MEET 2 - set CLUSTER_PACKET_TYPE_NONE -1 - test "Cluster nodes haven't met each other" { assert {[llength [get_cluster_nodes 1]] == 1} assert {[llength [get_cluster_nodes 0]] == 1} @@ -75,3 +78,202 @@ tags {tls:skip external:skip cluster} { set ::singledb $old_singledb +proc cluster_get_first_node_in_handshake id { + set nodes [get_cluster_nodes $id] + foreach n $nodes { + if {[cluster_has_flag $n handshake]} { + return [dict get $n id] + } + } + return {} +} + +proc cluster_nodes_all_know_each_other {num_nodes} { + # Collect node IDs dynamically + set node_ids {} + for {set i 0} {$i < $num_nodes} {incr i} { + lappend node_ids [dict get [get_myself $i] id] + } + + # Check if all nodes know each other + foreach node_id $node_ids { + foreach check_node_id $node_ids { + for {set node_index 0} {$node_index < $num_nodes} {incr node_index} { + if {[cluster_get_node_by_id $node_index $check_node_id] == {}} { + return 0 + } + } + } + } + + # Verify cluster link counts for each node + set expected_links [expr {2 * ($num_nodes - 1)}] + for {set i 0} {$i < $num_nodes} {incr i} { + if {[llength [R $i CLUSTER LINKS]] != $expected_links} { + return 0 + } + } + + return 1 +} + +start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout 4000 cluster-replica-no-failover yes}} { + set CLUSTER_PACKET_TYPE_PING 0 + set CLUSTER_PACKET_TYPE_PONG 1 + set CLUSTER_PACKET_TYPE_MEET 2 + set CLUSTER_PACKET_TYPE_NONE -1 + set CLUSTER_PACKET_TYPE_ALL -2 + + test "Handshake eventually succeeds after node handshake timeout on both sides with inconsistent view of the cluster" { + set cluster_port [find_available_port $::baseport $::portcount] + start_server [list overrides [list cluster-enabled yes cluster-node-timeout 4000 cluster-port $cluster_port]] { + # In this test we will trigger a handshake timeout on both sides of the handshake. + # Node 1 and 2 already know each other, then we make node 1 meet node 0: + # + # Node 1 -- MEET -> Node 0 [Node 0 might learn about Node 2 from the gossip section of the msg] + # Node 1 <- PONG -- Node 0 [we drop this message, so Node 1 will eventually mark the handshake as timed out] + # Node 1 <- PING -- Node 0 [we drop this message, so Node 1 will never send a PONG and Node 0 will eventually mark the handshake as timed out] + # + # After the handshake is timed out, we allow all cluster bus messages to go through. + # Eventually Node 0 should send a MEET packet to the other nodes to complete the handshake. + + set node0_id [dict get [get_myself 0] id] + set node1_id [dict get [get_myself 1] id] + set node2_id [dict get [get_myself 2] id] + + # Drop all cluster bus messages + R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_ALL + # Drop MEET cluster bus messages, so that Node 0 cannot start a handshake with Node 2. + R 2 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_MEET + + R 1 CLUSTER MEET [srv 0 host] [srv 0 port] $cluster_port + + # Wait for Node 0 to be in handshake + wait_for_condition 10 400 { + [cluster_get_first_node_in_handshake 0] != {} + } else { + fail "Node 0 never entered handshake state" + } + + # We want Node 0 to learn about Node 2 through the gossip section of the MEET message + set meet_retry 0 + while {[cluster_get_node_by_id 0 $node2_id] eq {}} { + if {$meet_retry == 10} { + error "assertion: Retried to meet Node 0 too many times" + } + # If Node 0 doesn't know about Node 1 & 2, it means Node 1 did not gossip about node 2 in its MEET message. + # So we kill the outbound link from Node 1 to Node 0, to force a reconnect and a re-send of the MEET message. + after 100 + # Since we are in handshake, we use a randomly generated ID we have to find + R 1 DEBUG CLUSTERLINK KILL ALL [cluster_get_first_node_in_handshake 1] + incr meet_retry 1 + } + + # Wait for Node 1's handshake to timeout + wait_for_condition 50 100 { + [cluster_get_first_node_in_handshake 1] eq {} + } else { + fail "Node 1 never exited handshake state" + } + + # Wait for Node 0's handshake to timeout + wait_for_condition 50 100 { + [cluster_get_first_node_in_handshake 1] eq {} + } else { + fail "Node 0 never exited handshake state" + } + + # At this point Node 0 knows Node 1 & 2 through the gossip, but they don't know Node 0. + wait_for_condition 50 100 { + [cluster_get_node_by_id 0 $node1_id] != {} && + [cluster_get_node_by_id 0 $node2_id] != {} && + [cluster_get_node_by_id 1 $node0_id] eq {} && + [cluster_get_node_by_id 2 $node0_id] eq {} + } else { + fail "Unexpected CLUSTER NODES output, nodes 1 & 2 should not know node 0." + } + + # Allow all messages to go through again + R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE + R 2 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE + + # Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inbound link. + # Handshake should now complete successfully. + wait_for_condition 50 200 { + [cluster_nodes_all_know_each_other 3] + } else { + fail "Unexpected CLUSTER NODES output, all nodes should know each other." + } + } ;# stop Node 0 + } ;# test +} ;# stop cluster + +start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout 4000 cluster-replica-no-failover yes}} { + set CLUSTER_PACKET_TYPE_PING 0 + set CLUSTER_PACKET_TYPE_PONG 1 + set CLUSTER_PACKET_TYPE_MEET 2 + set CLUSTER_PACKET_TYPE_NONE -1 + set CLUSTER_PACKET_TYPE_ALL -2 + + test "Handshake eventually succeeds after node handshake timeout on one side with inconsistent view of the cluster" { + set cluster_port [find_available_port $::baseport $::portcount] + start_server [list overrides [list cluster-enabled yes cluster-node-timeout 4000 cluster-port $cluster_port]] { + # In this test we will trigger a handshake timeout on one side of the handshake. + # Node 1 and 2 already know each other, then we make node 0 meet node 1: + # + # Node 0 -- MEET -> Node 1 + # Node 0 <- PONG -- Node 1 + # Node 0 <- PING -- Node 1 [Node 0 will mark the handshake as successful] + # Node 0 -- PONG -> Node 1 [we drop this message, so node 1 will eventually mark the handshake as timed out] + # + # After the handshake is timed out, we allow all cluster bus messages to go through. + # Eventually Node 0 should send a MEET packet to the other nodes to complete the handshake. + + set node0_id [dict get [get_myself 0] id] + set node1_id [dict get [get_myself 1] id] + set node2_id [dict get [get_myself 2] id] + + # Drop PONG messages + R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_PONG + # Drop MEET cluster bus messages, so that Node 0 cannot start a handshake with Node 2. + R 2 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_MEET + + # Node 0 meets node 1 + R 0 CLUSTER MEET [srv -1 host] [srv -1 port] + + # Wait for node 0 to know about the other nodes in the cluster + wait_for_condition 50 100 { + [cluster_get_node_by_id 0 $node1_id] != {} + } else { + fail "Node 0 never learned about node 1" + } + # At this point, node 0 knows about node 1 and might know node 2 if node 1 gossiped about it. + wait_for_condition 50 100 { + [cluster_get_first_node_in_handshake 0] eq {} + } else { + fail "Node 1 never exited handshake state" + } + # At this point, from node 0 point of view, the handshake with node 1 succeeded. + + wait_for_condition 50 100 { + [cluster_get_first_node_in_handshake 1] eq {} + } else { + fail "Node 1 never exited handshake state" + } + assert {[cluster_get_node_by_id 1 $node0_id] eq {}} + # At this point, from node 1 point of view, the handshake with node 0 timed out. + + # Allow all messages + R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE + R 2 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE + + # Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inblound link. + # Handshake should now complete successfully. + wait_for_condition 50 200 { + [cluster_nodes_all_know_each_other 3] + } else { + fail "Unexpected CLUSTER NODES output, all nodes should know each other." + } + } ;# stop Node 0 + } ;# test +} ;# stop cluster