From 6ab88888201cf415722afe427589a1d2606590ae Mon Sep 17 00:00:00 2001 From: Binbin Date: Mon, 30 Sep 2024 12:02:09 +0800 Subject: [PATCH 01/23] Trigger manual failover on SIGTERM / shutdown to cluster primary When a primary disappears, its slots are not served until an automatic failover happens. It takes about n seconds (node timeout plus some seconds). It's too much time for us to not accept writes. If the host machine is about to shutdown for any reason, the processes typically get a sigterm and have some time to shutdown gracefully. In Kubernetes, this is 30 seconds by default. When a primary receives a SIGTERM or a SHUTDOWN, let it trigger a failover to one of the replicas as part of the graceful shutdown. This can reduce some unavailability time. For example the replica needs to sense the primary failure within the node-timeout before initating an election, and now it can initiate an election quickly and win and gossip it. This closes #939. Signed-off-by: Binbin --- src/cluster_legacy.c | 26 +++++-- src/cluster_legacy.h | 1 + src/config.c | 1 + src/server.c | 24 +++++++ src/server.h | 1 + tests/support/util.tcl | 18 +++++ .../cluster/auto-failover-on-shutdown.tcl | 68 +++++++++++++++++++ valkey.conf | 4 ++ 8 files changed, 138 insertions(+), 5 deletions(-) create mode 100644 tests/unit/cluster/auto-failover-on-shutdown.tcl diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 120d55501c..2e84f4b937 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -3506,10 +3506,14 @@ int clusterProcessPacket(clusterLink *link) { * a non zero number of slots, and its currentEpoch is greater or * equal to epoch where this node started the election. */ if (clusterNodeIsVotingPrimary(sender) && sender_claimed_current_epoch >= server.cluster->failover_auth_epoch) { - server.cluster->failover_auth_count++; - /* Maybe we reached a quorum here, set a flag to make sure - * we check ASAP. */ - clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); + /* todo: see if this needed. */ + /* My primary has already voted for me, so don't count it anymore. */ + if (!(sender == myself->replicaof && server.cluster->mf_is_primary_failover)) { + server.cluster->failover_auth_count++; + /* Maybe we reached a quorum here, set a flag to make sure + * we check ASAP. */ + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); + } } } else if (type == CLUSTERMSG_TYPE_MFSTART) { /* This message is acceptable only if I'm a primary and the sender @@ -4592,6 +4596,11 @@ void clusterHandleReplicaFailover(void) { if (server.cluster->mf_end) { server.cluster->failover_auth_time = mstime(); server.cluster->failover_auth_rank = 0; + /* todo: see if this is needed. */ + /* This is a failover triggered by my primary, let's counts its vote. */ + if (server.cluster->mf_is_primary_failover) { + server.cluster->failover_auth_count++; + } clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); } serverLog(LL_NOTICE, @@ -4816,6 +4825,7 @@ void resetManualFailover(void) { } server.cluster->mf_end = 0; /* No manual failover in progress. */ server.cluster->mf_can_start = 0; + server.cluster->mf_is_primary_failover = 0; server.cluster->mf_replica = NULL; server.cluster->mf_primary_offset = -1; } @@ -4844,6 +4854,7 @@ void clusterHandleManualFailover(void) { /* Our replication offset matches the primary replication offset * announced after clients were paused. We can start the failover. */ server.cluster->mf_can_start = 1; + server.cluster->mf_is_primary_failover = 0; serverLog(LL_NOTICE, "All primary replication stream processed, " "manual failover can start."); clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); @@ -6730,8 +6741,13 @@ int clusterCommandSpecial(client *c) { /* If this is a forced failover, we don't need to talk with our * primary to agree about the offset. We just failover taking over * it without coordination. */ - serverLog(LL_NOTICE, "Forced failover user request accepted (user request from '%s').", client); + if (c == server.primary) { + serverLog(LL_NOTICE, "Forced failover primary request accepted (primary request from '%s').", client); + } else { + serverLog(LL_NOTICE, "Forced failover user request accepted (user request from '%s').", client); + } server.cluster->mf_can_start = 1; + server.cluster->mf_is_primary_failover = 1; /* We can start a manual failover as soon as possible, setting a flag * here so that we don't need to waiting for the cron to kick in. */ clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER); diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 5280644e6e..c97ec2df7f 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -393,6 +393,7 @@ struct clusterState { or -1 if still not received. */ int mf_can_start; /* If non-zero signal that the manual failover can start requesting primary vote. */ + int mf_is_primary_failover; /* The manual failover was triggered by my primary. */ /* The following fields are used by primaries to take state on elections. */ uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */ int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */ diff --git a/src/config.c b/src/config.c index 6c03cbb476..17389526f5 100644 --- a/src/config.c +++ b/src/config.c @@ -3135,6 +3135,7 @@ standardConfig static_configs[] = { createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL), createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL), createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL), + createBoolConfig("auto-failover-on-shutdown", NULL, MODIFIABLE_CONFIG, server.auto_failover_on_shutdown, 0, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), diff --git a/src/server.c b/src/server.c index 866023a455..f45c022556 100644 --- a/src/server.c +++ b/src/server.c @@ -4306,6 +4306,7 @@ int finishShutdown(void) { int force = server.shutdown_flags & SHUTDOWN_FORCE; /* Log a warning for each replica that is lagging. */ + client *best_replica = NULL; listIter replicas_iter; listNode *replicas_list_node; int num_replicas = 0, num_lagging_replicas = 0; @@ -4320,6 +4321,14 @@ int finishShutdown(void) { replicationGetReplicaName(replica), server.primary_repl_offset - replica->repl_ack_off, lag, replstateToString(replica->repl_state)); } + /* Find the best replica, that is, the replica with the largest offset. */ + if (replica->repl_state == REPLICA_STATE_ONLINE) { + if (best_replica == NULL) { + best_replica = replica; + } else if (replica->repl_ack_off > best_replica->repl_ack_off) { + best_replica = replica; + } + } } if (num_replicas > 0) { serverLog(LL_NOTICE, "%d of %d replicas are in sync when shutting down.", num_replicas - num_lagging_replicas, @@ -4419,6 +4428,21 @@ int finishShutdown(void) { * send them pending writes. */ flushReplicasOutputBuffers(); + if (server.auto_failover_on_shutdown && server.cluster_enabled && best_replica) { + /* Sending a CLUSTER FAILOVER FORCE to the best replica. */ + const char *buf = "*3\r\n$7\r\nCLUSTER\r\n$8\r\nFAILOVER\r\n$5\r\nFORCE\r\n"; + if (connWrite(best_replica->conn, buf, strlen(buf)) == (int)strlen(buf)) { + serverLog(LL_NOTICE, "Sending CLUSTER FAILOVER FORCE to replica %s succeeded.", + replicationGetReplicaName(best_replica)); + } else { + serverLog(LL_WARNING, "Failed to send CLUSTER FAILOVER FORCE to replica: %s", strerror(errno)); + } + } + + if (server.auto_failover_on_shutdown && server.cluster_enabled && !best_replica) { + serverLog(LL_WARNING, "Unable to find a replica to perform an auto failover on shutdown."); + } + /* Close the listening sockets. Apparently this allows faster restarts. */ closeListeningSockets(1); diff --git a/src/server.h b/src/server.h index 1b8f08833f..537a2ca6e7 100644 --- a/src/server.h +++ b/src/server.h @@ -2169,6 +2169,7 @@ struct valkeyServer { unsigned long cluster_blacklist_ttl; /* Duration in seconds that a node is denied re-entry into * the cluster after it is forgotten with CLUSTER FORGET. */ int cluster_slot_stats_enabled; /* Cluster slot usage statistics tracking enabled. */ + int auto_failover_on_shutdown; /* Trigger manual failover on shutdown to primary. */ /* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */ uint32_t debug_cluster_close_link_on_packet_drop : 1; sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; /* Index in array is a bitwise or of CACHE_CONN_TYPE_* */ diff --git a/tests/support/util.tcl b/tests/support/util.tcl index e53cda3071..8cc0deb098 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -126,6 +126,24 @@ proc wait_replica_online r { } } +proc get_replica_acked_offset {primary replica_ip replica_port} { + set infostr [$primary info replication] + if {[regexp -lineanchor "^slave\\d:ip=$replica_ip,port=$replica_port,.*,offset=(\\d+).*\r\n" $infostr _ value]} { + return $value + } +} + +proc wait_replica_acked_ofs {primary replica_ip replica_port} { + $primary config set repl-ping-replica-period 3600 + wait_for_condition 50 100 { + [status $primary master_repl_offset] eq [get_replica_acked_offset $primary $replica_ip $replica_port] + } else { + puts "INFO REPLICATION: [$primary info replication]" + fail "replica acked offset didn't match in time" + } + $primary config set repl-ping-replica-period 10 +} + proc wait_for_ofs_sync {r1 r2} { wait_for_condition 50 100 { [status $r1 master_repl_offset] eq [status $r2 master_repl_offset] diff --git a/tests/unit/cluster/auto-failover-on-shutdown.tcl b/tests/unit/cluster/auto-failover-on-shutdown.tcl new file mode 100644 index 0000000000..2abbca2f5a --- /dev/null +++ b/tests/unit/cluster/auto-failover-on-shutdown.tcl @@ -0,0 +1,68 @@ +proc shutdown_on_how {srv_id how} { + if {$how == "shutdown"} { + catch {R $srv_id shutdown nosave} + } elseif {$how == "sigterm"} { + exec kill -SIGTERM [s -$srv_id process_id] + } +} + +proc test_main {how} { + test "auto-failover-on-shutdown will always pick a best replica and send CLUSTER FAILOVER - $how" { + set primary [srv 0 client] + set replica1 [srv -3 client] + set replica1_pid [s -3 process_id] + set replica2 [srv -6 client] + set replica2_ip [srv -6 host] + set replica2_port [srv -6 port] + + # Pause a replica so it has no chance to catch up with the offset. + pause_process $replica1_pid + + # Primary write some data to increse the offset. + for {set i 0} {$i < 10} {incr i} { + $primary incr key_991803 + } + + # Wait the replica2 catch up with the offset + wait_replica_acked_ofs $primary $replica2_ip $replica2_port + + # Shutdown the primary. + shutdown_on_how 0 $how + + # Wait for the replica2 to become a primary. + wait_for_condition 1000 50 { + [s -6 role] eq {master} + } else { + puts "s -6 role: [s -6 role]" + fail "Failover does not happened" + } + + # Make sure that the expected logs are printed. + verify_log_message 0 "*Sending CLUSTER FAILOVER FORCE to replica*" 0 + verify_log_message -6 "*Forced failover primary request accepted*" 0 + + resume_process $replica1_pid + } + + test "Unable to find a replica to perform an auto failover - $how" { + set primary [srv -6 client] + set replica1 [srv -3 client] + set replica1_pid [s -3 process_id] + + pause_process $replica1_pid + + $primary client kill type replica + shutdown_on_how 6 $how + wait_for_log_messages -6 {"*Unable to find a replica to perform an auto failover on shutdown*"} 0 1000 10 + + resume_process $replica1_pid + } +} + +start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 5000 shutdown-timeout 0 auto-failover-on-shutdown yes}} { + test_main "shutdown" +} + +start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 5000 shutdown-timeout 0 auto-failover-on-shutdown yes}} { + test_main "sigterm" +} diff --git a/valkey.conf b/valkey.conf index f9d102a95d..8624a48a91 100644 --- a/valkey.conf +++ b/valkey.conf @@ -1594,6 +1594,10 @@ aof-timestamp-enabled no # shutdown-on-sigint default # shutdown-on-sigterm default +# TODO +# +# auto-failover-on-shutdown no + ################ NON-DETERMINISTIC LONG BLOCKING COMMANDS ##################### # Maximum time in milliseconds for EVAL scripts, functions and in some cases From 4b49f03d20ed65064ccb79cc679c36eae585c018 Mon Sep 17 00:00:00 2001 From: Binbin Date: Mon, 30 Sep 2024 12:17:46 +0800 Subject: [PATCH 02/23] fix typo Signed-off-by: Binbin --- tests/unit/cluster/auto-failover-on-shutdown.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/cluster/auto-failover-on-shutdown.tcl b/tests/unit/cluster/auto-failover-on-shutdown.tcl index 2abbca2f5a..6e8a171646 100644 --- a/tests/unit/cluster/auto-failover-on-shutdown.tcl +++ b/tests/unit/cluster/auto-failover-on-shutdown.tcl @@ -18,7 +18,7 @@ proc test_main {how} { # Pause a replica so it has no chance to catch up with the offset. pause_process $replica1_pid - # Primary write some data to increse the offset. + # Primary write some data to increase the offset. for {set i 0} {$i < 10} {incr i} { $primary incr key_991803 } From df0ef8d0304982dbe406a3318c92942e780ec280 Mon Sep 17 00:00:00 2001 From: Binbin Date: Sun, 6 Oct 2024 14:47:32 +0800 Subject: [PATCH 03/23] add comment in the test Signed-off-by: Binbin --- tests/unit/cluster/auto-failover-on-shutdown.tcl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/unit/cluster/auto-failover-on-shutdown.tcl b/tests/unit/cluster/auto-failover-on-shutdown.tcl index 6e8a171646..84050abb3f 100644 --- a/tests/unit/cluster/auto-failover-on-shutdown.tcl +++ b/tests/unit/cluster/auto-failover-on-shutdown.tcl @@ -6,6 +6,9 @@ proc shutdown_on_how {srv_id how} { } } +# We will start a cluster with 3 primary nodes and 4 replicas, the primary 1 will have 2 replicas. +# We will pause the replica 1, and then shutdown the primary 1, and making replica 2 to become +# the new primary. proc test_main {how} { test "auto-failover-on-shutdown will always pick a best replica and send CLUSTER FAILOVER - $how" { set primary [srv 0 client] From 519eb2a31edabc613ad8caf901ef2fd7a945dd38 Mon Sep 17 00:00:00 2001 From: Binbin Date: Mon, 21 Oct 2024 11:16:25 +0800 Subject: [PATCH 04/23] removing mf_is_primary_failover Signed-off-by: Binbin --- src/cluster_legacy.c | 21 +++++---------------- src/cluster_legacy.h | 1 - 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 7ab0613f8a..3a3be879f8 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -3502,14 +3502,10 @@ int clusterProcessPacket(clusterLink *link) { * a non zero number of slots, and its currentEpoch is greater or * equal to epoch where this node started the election. */ if (clusterNodeIsVotingPrimary(sender) && sender_claimed_current_epoch >= server.cluster->failover_auth_epoch) { - /* todo: see if this needed. */ - /* My primary has already voted for me, so don't count it anymore. */ - if (!(sender == myself->replicaof && server.cluster->mf_is_primary_failover)) { - server.cluster->failover_auth_count++; - /* Maybe we reached a quorum here, set a flag to make sure - * we check ASAP. */ - clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); - } + server.cluster->failover_auth_count++; + /* Maybe we reached a quorum here, set a flag to make sure + * we check ASAP. */ + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); } } else if (type == CLUSTERMSG_TYPE_MFSTART) { /* This message is acceptable only if I'm a primary and the sender @@ -4592,11 +4588,7 @@ void clusterHandleReplicaFailover(void) { if (server.cluster->mf_end) { server.cluster->failover_auth_time = mstime(); server.cluster->failover_auth_rank = 0; - /* todo: see if this is needed. */ - /* This is a failover triggered by my primary, let's counts its vote. */ - if (server.cluster->mf_is_primary_failover) { - server.cluster->failover_auth_count++; - } + server.cluster->failover_auth_count++; clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); } serverLog(LL_NOTICE, @@ -4821,7 +4813,6 @@ void resetManualFailover(void) { } server.cluster->mf_end = 0; /* No manual failover in progress. */ server.cluster->mf_can_start = 0; - server.cluster->mf_is_primary_failover = 0; server.cluster->mf_replica = NULL; server.cluster->mf_primary_offset = -1; } @@ -4850,7 +4841,6 @@ void clusterHandleManualFailover(void) { /* Our replication offset matches the primary replication offset * announced after clients were paused. We can start the failover. */ server.cluster->mf_can_start = 1; - server.cluster->mf_is_primary_failover = 0; serverLog(LL_NOTICE, "All primary replication stream processed, " "manual failover can start."); clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); @@ -6743,7 +6733,6 @@ int clusterCommandSpecial(client *c) { serverLog(LL_NOTICE, "Forced failover user request accepted (user request from '%s').", client); } server.cluster->mf_can_start = 1; - server.cluster->mf_is_primary_failover = 1; /* We can start a manual failover as soon as possible, setting a flag * here so that we don't need to waiting for the cron to kick in. */ clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER); diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index c97ec2df7f..5280644e6e 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -393,7 +393,6 @@ struct clusterState { or -1 if still not received. */ int mf_can_start; /* If non-zero signal that the manual failover can start requesting primary vote. */ - int mf_is_primary_failover; /* The manual failover was triggered by my primary. */ /* The following fields are used by primaries to take state on elections. */ uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */ int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */ From 32043ddc21298dcb71e00257cf23c1b56c415acb Mon Sep 17 00:00:00 2001 From: Binbin Date: Sun, 27 Oct 2024 20:45:20 +0800 Subject: [PATCH 05/23] try to fix test Signed-off-by: Binbin --- tests/support/util.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 8cc0deb098..77c22207f2 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -135,7 +135,7 @@ proc get_replica_acked_offset {primary replica_ip replica_port} { proc wait_replica_acked_ofs {primary replica_ip replica_port} { $primary config set repl-ping-replica-period 3600 - wait_for_condition 50 100 { + wait_for_condition 500 100 { [status $primary master_repl_offset] eq [get_replica_acked_offset $primary $replica_ip $replica_port] } else { puts "INFO REPLICATION: [$primary info replication]" From e7b33faea3963c058ef8f67c8ab419ea7bb67226 Mon Sep 17 00:00:00 2001 From: Binbin Date: Mon, 28 Oct 2024 00:57:42 +0800 Subject: [PATCH 06/23] try to stable the test Signed-off-by: Binbin --- tests/support/util.tcl | 15 ++++++++++----- tests/unit/cluster/auto-failover-on-shutdown.tcl | 1 + 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 77c22207f2..816fe4cdf6 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -126,17 +126,22 @@ proc wait_replica_online r { } } -proc get_replica_acked_offset {primary replica_ip replica_port} { +proc check_replica_acked_ofs {primary replica_ip replica_port} { set infostr [$primary info replication] - if {[regexp -lineanchor "^slave\\d:ip=$replica_ip,port=$replica_port,.*,offset=(\\d+).*\r\n" $infostr _ value]} { - return $value + set master_repl_offset [getInfoProperty $infostr master_repl_offset] + if {[regexp -lineanchor "^slave\\d:ip=$replica_ip,port=$replica_port,.*,offset=(\\d+).*\r\n" $infostr _ offset]} { + if {$master_repl_offset == $offset} { + return 1 + } + return 0 } + return 0 } proc wait_replica_acked_ofs {primary replica_ip replica_port} { $primary config set repl-ping-replica-period 3600 - wait_for_condition 500 100 { - [status $primary master_repl_offset] eq [get_replica_acked_offset $primary $replica_ip $replica_port] + wait_for_condition 50 100 { + [check_replica_acked_ofs $primary $replica_ip $replica_port] eq 1 } else { puts "INFO REPLICATION: [$primary info replication]" fail "replica acked offset didn't match in time" diff --git a/tests/unit/cluster/auto-failover-on-shutdown.tcl b/tests/unit/cluster/auto-failover-on-shutdown.tcl index 84050abb3f..aa9dfee20b 100644 --- a/tests/unit/cluster/auto-failover-on-shutdown.tcl +++ b/tests/unit/cluster/auto-failover-on-shutdown.tcl @@ -27,6 +27,7 @@ proc test_main {how} { } # Wait the replica2 catch up with the offset + wait_for_ofs_sync $primary $replica2 wait_replica_acked_ofs $primary $replica2_ip $replica2_port # Shutdown the primary. From d6649e522f9150a53aac179a0efb99ba11cc3214 Mon Sep 17 00:00:00 2001 From: Binbin Date: Mon, 28 Oct 2024 01:08:14 +0800 Subject: [PATCH 07/23] Move the logic to clusterHandleServerShutdown Signed-off-by: Binbin --- src/cluster_legacy.c | 26 ++++++++++++++++++++++++++ src/server.c | 24 ------------------------ 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 3a3be879f8..dc6ff45f5a 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1192,6 +1192,32 @@ void clusterInitLast(void) { /* Called when a cluster node receives SHUTDOWN. */ void clusterHandleServerShutdown(void) { + if (server.auto_failover_on_shutdown) { + /* Find the first best replica, that is, the replica with the largest offset. */ + client *best_replica = NULL; + listIter replicas_iter; + listNode *replicas_list_node; + listRewind(server.replicas, &replicas_iter); + while ((replicas_list_node = listNext(&replicas_iter)) != NULL) { + client *replica = listNodeValue(replicas_list_node); + if (replica->repl_state != REPLICA_STATE_ONLINE) continue; + if (best_replica == NULL || replica->repl_ack_off > best_replica->repl_ack_off) best_replica = replica; + if (best_replica->repl_ack_off == server.primary_repl_offset) break; + } + if (best_replica) { + /* Send a CLUSTER FAILOVER FORCE to the best replica. */ + const char *buf = "*3\r\n$7\r\nCLUSTER\r\n$8\r\nFAILOVER\r\n$5\r\nFORCE\r\n"; + if (connWrite(best_replica->conn, buf, strlen(buf)) == (int)strlen(buf)) { + serverLog(LL_NOTICE, "Sending CLUSTER FAILOVER FORCE to replica %s succeeded.", + replicationGetReplicaName(best_replica)); + } else { + serverLog(LL_WARNING, "Failed to send CLUSTER FAILOVER FORCE to replica: %s", strerror(errno)); + } + } else { + serverLog(LL_NOTICE, "Unable to find a replica to perform an auto failover on shutdown."); + } + } + /* The error logs have been logged in the save function if the save fails. */ serverLog(LL_NOTICE, "Saving the cluster configuration file before exiting."); clusterSaveConfig(1); diff --git a/src/server.c b/src/server.c index 53f0cd3956..531fb07b76 100644 --- a/src/server.c +++ b/src/server.c @@ -4308,7 +4308,6 @@ int finishShutdown(void) { int force = server.shutdown_flags & SHUTDOWN_FORCE; /* Log a warning for each replica that is lagging. */ - client *best_replica = NULL; listIter replicas_iter; listNode *replicas_list_node; int num_replicas = 0, num_lagging_replicas = 0; @@ -4323,14 +4322,6 @@ int finishShutdown(void) { replicationGetReplicaName(replica), server.primary_repl_offset - replica->repl_ack_off, lag, replstateToString(replica->repl_state)); } - /* Find the best replica, that is, the replica with the largest offset. */ - if (replica->repl_state == REPLICA_STATE_ONLINE) { - if (best_replica == NULL) { - best_replica = replica; - } else if (replica->repl_ack_off > best_replica->repl_ack_off) { - best_replica = replica; - } - } } if (num_replicas > 0) { serverLog(LL_NOTICE, "%d of %d replicas are in sync when shutting down.", num_replicas - num_lagging_replicas, @@ -4430,21 +4421,6 @@ int finishShutdown(void) { * send them pending writes. */ flushReplicasOutputBuffers(); - if (server.auto_failover_on_shutdown && server.cluster_enabled && best_replica) { - /* Sending a CLUSTER FAILOVER FORCE to the best replica. */ - const char *buf = "*3\r\n$7\r\nCLUSTER\r\n$8\r\nFAILOVER\r\n$5\r\nFORCE\r\n"; - if (connWrite(best_replica->conn, buf, strlen(buf)) == (int)strlen(buf)) { - serverLog(LL_NOTICE, "Sending CLUSTER FAILOVER FORCE to replica %s succeeded.", - replicationGetReplicaName(best_replica)); - } else { - serverLog(LL_WARNING, "Failed to send CLUSTER FAILOVER FORCE to replica: %s", strerror(errno)); - } - } - - if (server.auto_failover_on_shutdown && server.cluster_enabled && !best_replica) { - serverLog(LL_WARNING, "Unable to find a replica to perform an auto failover on shutdown."); - } - /* Close the listening sockets. Apparently this allows faster restarts. */ closeListeningSockets(1); From 64831c996b7b00799e1f7b5feb4b6524214c314d Mon Sep 17 00:00:00 2001 From: Binbin Date: Mon, 28 Oct 2024 11:18:21 +0800 Subject: [PATCH 08/23] Adjust the tests Signed-off-by: Binbin --- tests/support/util.tcl | 4 +- .../cluster/auto-failover-on-shutdown.tcl | 40 +++++++++++++------ 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 816fe4cdf6..0ad83bb7c4 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -138,8 +138,9 @@ proc check_replica_acked_ofs {primary replica_ip replica_port} { return 0 } -proc wait_replica_acked_ofs {primary replica_ip replica_port} { +proc wait_replica_acked_ofs {primary replica replica_ip replica_port} { $primary config set repl-ping-replica-period 3600 + $replica config set hz 500 wait_for_condition 50 100 { [check_replica_acked_ofs $primary $replica_ip $replica_port] eq 1 } else { @@ -147,6 +148,7 @@ proc wait_replica_acked_ofs {primary replica_ip replica_port} { fail "replica acked offset didn't match in time" } $primary config set repl-ping-replica-period 10 + $replica config set hz 10 } proc wait_for_ofs_sync {r1 r2} { diff --git a/tests/unit/cluster/auto-failover-on-shutdown.tcl b/tests/unit/cluster/auto-failover-on-shutdown.tcl index aa9dfee20b..f03fdba289 100644 --- a/tests/unit/cluster/auto-failover-on-shutdown.tcl +++ b/tests/unit/cluster/auto-failover-on-shutdown.tcl @@ -1,4 +1,4 @@ -proc shutdown_on_how {srv_id how} { +proc shutdown_how {srv_id how} { if {$how == "shutdown"} { catch {R $srv_id shutdown nosave} } elseif {$how == "sigterm"} { @@ -9,8 +9,8 @@ proc shutdown_on_how {srv_id how} { # We will start a cluster with 3 primary nodes and 4 replicas, the primary 1 will have 2 replicas. # We will pause the replica 1, and then shutdown the primary 1, and making replica 2 to become # the new primary. -proc test_main {how} { - test "auto-failover-on-shutdown will always pick a best replica and send CLUSTER FAILOVER - $how" { +proc test_main {how shutdown_timeout} { + test "auto-failover-on-shutdown will always pick a best replica and send CLUSTER FAILOVER - $how - shutdown-timeout: $shutdown_timeout" { set primary [srv 0 client] set replica1 [srv -3 client] set replica1_pid [s -3 process_id] @@ -18,6 +18,9 @@ proc test_main {how} { set replica2_ip [srv -6 host] set replica2_port [srv -6 port] + $primary config set auto-failover-on-shutdown yes + $primary config set shutdown-timeout $shutdown_timeout + # Pause a replica so it has no chance to catch up with the offset. pause_process $replica1_pid @@ -26,12 +29,17 @@ proc test_main {how} { $primary incr key_991803 } - # Wait the replica2 catch up with the offset - wait_for_ofs_sync $primary $replica2 - wait_replica_acked_ofs $primary $replica2_ip $replica2_port + if {$shutdown_timeout != 0} { + # Wait the replica2 catch up with the offset + wait_for_ofs_sync $primary $replica2 + wait_replica_acked_ofs $primary $replica2 $replica2_ip $replica2_port + } else { + # If shutdown-timeout is enable, we expect the primary to pause writing + # and wait for the replica to catch up with the offset. + } # Shutdown the primary. - shutdown_on_how 0 $how + shutdown_how 0 $how # Wait for the replica2 to become a primary. wait_for_condition 1000 50 { @@ -56,17 +64,25 @@ proc test_main {how} { pause_process $replica1_pid $primary client kill type replica - shutdown_on_how 6 $how + shutdown_how 6 $how wait_for_log_messages -6 {"*Unable to find a replica to perform an auto failover on shutdown*"} 0 1000 10 resume_process $replica1_pid } } -start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 5000 shutdown-timeout 0 auto-failover-on-shutdown yes}} { - test_main "shutdown" +start_cluster 3 4 {tags {external:skip cluster}} { + test_main "shutdown" 0 +} + +start_cluster 3 4 {tags {external:skip cluster}} { + test_main "sigterm" 0 +} + +start_cluster 3 4 {tags {external:skip cluster}} { + test_main "shutdown" 10 } -start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 5000 shutdown-timeout 0 auto-failover-on-shutdown yes}} { - test_main "sigterm" +start_cluster 3 4 {tags {external:skip cluster}} { + test_main "sigterm" 10 } From b06a8c46067d80aa00466e1911ea0f7255069d97 Mon Sep 17 00:00:00 2001 From: Binbin Date: Mon, 28 Oct 2024 12:32:40 +0800 Subject: [PATCH 09/23] Do shutdown failover only when offset is match Signed-off-by: Binbin --- src/cluster_legacy.c | 9 ++++++--- tests/support/util.tcl | 2 +- tests/unit/cluster/auto-failover-on-shutdown.tcl | 7 ++++++- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index dc6ff45f5a..7449d9d428 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1200,10 +1200,13 @@ void clusterHandleServerShutdown(void) { listRewind(server.replicas, &replicas_iter); while ((replicas_list_node = listNext(&replicas_iter)) != NULL) { client *replica = listNodeValue(replicas_list_node); - if (replica->repl_state != REPLICA_STATE_ONLINE) continue; - if (best_replica == NULL || replica->repl_ack_off > best_replica->repl_ack_off) best_replica = replica; - if (best_replica->repl_ack_off == server.primary_repl_offset) break; + /* This is done only when the replica offset is caught up, to avoid data loss */ + if (replica->repl_state == REPLICA_STATE_ONLINE && replica->repl_ack_off == server.primary_repl_offset) { + best_replica = replica; + break; + } } + if (best_replica) { /* Send a CLUSTER FAILOVER FORCE to the best replica. */ const char *buf = "*3\r\n$7\r\nCLUSTER\r\n$8\r\nFAILOVER\r\n$5\r\nFORCE\r\n"; diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 0ad83bb7c4..49a9c273e9 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -141,7 +141,7 @@ proc check_replica_acked_ofs {primary replica_ip replica_port} { proc wait_replica_acked_ofs {primary replica replica_ip replica_port} { $primary config set repl-ping-replica-period 3600 $replica config set hz 500 - wait_for_condition 50 100 { + wait_for_condition 100 100 { [check_replica_acked_ofs $primary $replica_ip $replica_port] eq 1 } else { puts "INFO REPLICATION: [$primary info replication]" diff --git a/tests/unit/cluster/auto-failover-on-shutdown.tcl b/tests/unit/cluster/auto-failover-on-shutdown.tcl index f03fdba289..00e15e009e 100644 --- a/tests/unit/cluster/auto-failover-on-shutdown.tcl +++ b/tests/unit/cluster/auto-failover-on-shutdown.tcl @@ -20,6 +20,10 @@ proc test_main {how shutdown_timeout} { $primary config set auto-failover-on-shutdown yes $primary config set shutdown-timeout $shutdown_timeout + $primary config set repl-ping-replica-period 3600 + + # To avoid failover kick in. + $replica2 config set cluster-replica-no-failover yes # Pause a replica so it has no chance to catch up with the offset. pause_process $replica1_pid @@ -29,7 +33,7 @@ proc test_main {how shutdown_timeout} { $primary incr key_991803 } - if {$shutdown_timeout != 0} { + if {$shutdown_timeout == 0} { # Wait the replica2 catch up with the offset wait_for_ofs_sync $primary $replica2 wait_replica_acked_ofs $primary $replica2 $replica2_ip $replica2_port @@ -63,6 +67,7 @@ proc test_main {how shutdown_timeout} { pause_process $replica1_pid + $primary config set auto-failover-on-shutdown yes $primary client kill type replica shutdown_how 6 $how wait_for_log_messages -6 {"*Unable to find a replica to perform an auto failover on shutdown*"} 0 1000 10 From e56a360a4e01c09c18a41eb715f8d61094ec3a0b Mon Sep 17 00:00:00 2001 From: Binbin Date: Wed, 8 Jan 2025 13:35:20 +0800 Subject: [PATCH 10/23] remove count++ and fix confilct Signed-off-by: Binbin --- src/cluster_legacy.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 5238ac8f87..2128386fc6 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -4724,14 +4724,9 @@ void clusterHandleReplicaFailover(void) { if (server.cluster->mf_end) { server.cluster->failover_auth_time = now; server.cluster->failover_auth_rank = 0; -<<<<<<< HEAD - server.cluster->failover_auth_count++; - clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); -======= /* Reset auth_age since it is outdated now and we can bypass the auth_timeout * check in the next state and start the election ASAP. */ auth_age = 0; ->>>>>>> upstream/unstable } serverLog(LL_NOTICE, "Start of election delayed for %lld milliseconds " From c9bfd6948a01a36eb9f0cc1de71194423f1b3ee0 Mon Sep 17 00:00:00 2001 From: Binbin Date: Thu, 23 Jan 2025 20:02:49 +0800 Subject: [PATCH 11/23] CLUSTER FAILOVER replicaid node-id Signed-off-by: Binbin --- src/cluster.h | 1 - src/cluster_legacy.c | 47 +++++++++++++++++++++++++++++------------- src/replication.c | 26 +++++++++++++++++++++++ src/server.h | 1 + tests/support/util.tcl | 2 +- 5 files changed, 61 insertions(+), 16 deletions(-) diff --git a/src/cluster.h b/src/cluster.h index 142f2d70b3..e1b0713d1b 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -7,7 +7,6 @@ #define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */ #define CLUSTER_SLOTS (1 << CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */ -#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */ #define CLUSTER_OK 0 /* Everything looks ok */ #define CLUSTER_FAIL 1 /* The cluster can't work */ #define CLUSTER_NAMELEN 40 /* sha1 hex length */ diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index f11efaa5a1..39c2ac387e 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1233,7 +1233,7 @@ void clusterInitLast(void) { /* Called when a cluster node receives SHUTDOWN. */ void clusterHandleServerShutdown(void) { - if (server.auto_failover_on_shutdown) { + if (nodeIsPrimary(myself) && server.auto_failover_on_shutdown) { /* Find the first best replica, that is, the replica with the largest offset. */ client *best_replica = NULL; listIter replicas_iter; @@ -1241,8 +1241,12 @@ void clusterHandleServerShutdown(void) { listRewind(server.replicas, &replicas_iter); while ((replicas_list_node = listNext(&replicas_iter)) != NULL) { client *replica = listNodeValue(replicas_list_node); - /* This is done only when the replica offset is caught up, to avoid data loss */ - if (replica->repl_state == REPLICA_STATE_ONLINE && replica->repl_ack_off == server.primary_repl_offset) { + /* This is done only when the replica offset is caught up, to avoid data loss. + * And 0x800ff is 8.0.255, we only support new versions for this feature. */ + if (replica->repl_data->repl_state == REPLICA_STATE_ONLINE && + // replica->repl_data->replica_version > 0x800ff && + replica->name && sdslen(replica->name->ptr) == CLUSTER_NAMELEN && + replica->repl_data->repl_ack_off == server.primary_repl_offset) { best_replica = replica; break; } @@ -1250,8 +1254,9 @@ void clusterHandleServerShutdown(void) { if (best_replica) { /* Send a CLUSTER FAILOVER FORCE to the best replica. */ - const char *buf = "*3\r\n$7\r\nCLUSTER\r\n$8\r\nFAILOVER\r\n$5\r\nFORCE\r\n"; - if (connWrite(best_replica->conn, buf, strlen(buf)) == (int)strlen(buf)) { + char buf[128]; + size_t buflen = snprintf(buf, sizeof(buf), "*5\r\n$7\r\nCLUSTER\r\n$8\r\nFAILOVER\r\n$5\r\nFORCE\r\n$9\r\nreplicaid\r\n$%d\r\n%s\r\n", CLUSTER_NAMELEN, (char *)best_replica->name->ptr); + if (connWrite(best_replica->conn, buf, buflen) == (int)strlen(buf)) { serverLog(LL_NOTICE, "Sending CLUSTER FAILOVER FORCE to replica %s succeeded.", replicationGetReplicaName(best_replica)); } else { @@ -7027,32 +7032,46 @@ int clusterCommandSpecial(client *c) { } else { addReplyLongLong(c, clusterNodeFailureReportsCount(n)); } - } else if (!strcasecmp(c->argv[1]->ptr, "failover") && (c->argc == 2 || c->argc == 3)) { - /* CLUSTER FAILOVER [FORCE|TAKEOVER] */ + } else if (!strcasecmp(c->argv[1]->ptr, "failover") && (c->argc >= 2)) { + /* CLUSTER FAILOVER [FORCE|TAKEOVER] [replicaid ] */ int force = 0, takeover = 0; + robj *replicaid = NULL; - if (c->argc == 3) { - if (!strcasecmp(c->argv[2]->ptr, "force")) { + for (int j = 2; j < c->argc; j++) { + int moreargs = (c->argc - 1) - j; + if (!strcasecmp(c->argv[j]->ptr, "force")) { force = 1; - } else if (!strcasecmp(c->argv[2]->ptr, "takeover")) { + } else if (!strcasecmp(c->argv[j]->ptr, "takeover")) { takeover = 1; force = 1; /* Takeover also implies force. */ + } else if (!strcasecmp(c->argv[j]->ptr, "replicaid") && moreargs) { + j++; + replicaid = c->argv[j]; } else { addReplyErrorObject(c, shared.syntaxerr); return 1; } } + /* Check if it should be executed by myself. */ + if (replicaid != NULL) { + clusterNode *n = clusterLookupNode(replicaid->ptr, sdslen(replicaid->ptr)); + if (n != myself) { + /* Ignore this command, including the sanity check and the process. */ + addReply(c, shared.ok); + return 1; + } + } + /* Check preconditions. */ if (clusterNodeIsPrimary(myself)) { - addReplyError(c, "You should send CLUSTER FAILOVER to a replica"); + if (replicaid == NULL) addReplyError(c, "You should send CLUSTER FAILOVER to a replica"); return 1; } else if (myself->replicaof == NULL) { - addReplyError(c, "I'm a replica but my master is unknown to me"); + if (replicaid == NULL) addReplyError(c, "I'm a replica but my master is unknown to me"); return 1; } else if (!force && (nodeFailed(myself->replicaof) || myself->replicaof->link == NULL)) { - addReplyError(c, "Master is down or failed, " - "please use CLUSTER FAILOVER FORCE"); + if (replicaid == NULL) addReplyError(c, "Master is down or failed, please use CLUSTER FAILOVER FORCE"); return 1; } resetManualFailover(); diff --git a/src/replication.c b/src/replication.c index 9913d64d65..106325f10c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3594,6 +3594,14 @@ void syncWithPrimary(connection *conn) { err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL); if (err) goto write_error; + /* Inform the primary of our (replica) node name. */ + if (server.cluster_enabled) { + char *argv[] = {"CLIENT", "SETNAME", server.cluster->myself->name}; + size_t lens[] = {6, 7, CLUSTER_NAMELEN}; + err = sendCommandArgv(conn, 3, argv, lens); + if (err) goto write_error; + } + server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; return; } @@ -3684,6 +3692,24 @@ void syncWithPrimary(connection *conn) { } sdsfree(err); err = NULL; + if (server.cluster_enabled) { + server.repl_state = REPL_STATE_RECEIVE_SETNAME_REPLY; + return; + } else { + server.repl_state = REPL_STATE_SEND_PSYNC; + } + } + + /* Receive CLIENT SETNAME reply. */ + if (server.repl_state == REPL_STATE_RECEIVE_SETNAME_REPLY) { + err = receiveSynchronousResponse(conn); + if (err == NULL) goto no_response_error; + /* Ignore the error if any. 8.1 introduced this logic and we don't care if it failed. */ + if (err[0] == '-') { + serverLog(LL_NOTICE, "(Non critical) Primary does not understand CLIENT SETNAME: %s", err); + } + sdsfree(err); + err = NULL; server.repl_state = REPL_STATE_SEND_PSYNC; } diff --git a/src/server.h b/src/server.h index d098dbb083..7434568142 100644 --- a/src/server.h +++ b/src/server.h @@ -396,6 +396,7 @@ typedef enum { REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_VERSION_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_RECEIVE_SETNAME_REPLY, /* Wait for CLIENT SETNAME reply */ REPL_STATE_SEND_PSYNC, /* Send PSYNC */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ /* --- End of handshake states --- */ diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 3b5adcb6d3..4b7cf25a59 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -141,7 +141,7 @@ proc check_replica_acked_ofs {primary replica_ip replica_port} { proc wait_replica_acked_ofs {primary replica replica_ip replica_port} { $primary config set repl-ping-replica-period 3600 $replica config set hz 500 - wait_for_condition 100 100 { + wait_for_condition 1000 50 { [check_replica_acked_ofs $primary $replica_ip $replica_port] eq 1 } else { puts "INFO REPLICATION: [$primary info replication]" From c8037a1f1a5e6c91bc2772f48ede3c20177f183e Mon Sep 17 00:00:00 2001 From: Binbin Date: Fri, 24 Jan 2025 13:21:55 +0800 Subject: [PATCH 12/23] code review v1 Signed-off-by: Binbin --- src/cluster_legacy.c | 43 +++++++++++-------- src/replication.c | 2 +- src/server.c | 6 +-- src/server.h | 1 + .../cluster/auto-failover-on-shutdown.tcl | 1 - 5 files changed, 31 insertions(+), 22 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 39c2ac387e..56af9fbbbe 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1253,15 +1253,23 @@ void clusterHandleServerShutdown(void) { } if (best_replica) { - /* Send a CLUSTER FAILOVER FORCE to the best replica. */ + /* Send the CLUSTER FAILOVER FORCE REPLICAID node-id to all replicas since + * it is a shared replication buffer, but only the replica with the matching + * node-id will execute it. The caller will call flushReplicasOutputBuffers, + * so in here it is a best effort. */ char buf[128]; - size_t buflen = snprintf(buf, sizeof(buf), "*5\r\n$7\r\nCLUSTER\r\n$8\r\nFAILOVER\r\n$5\r\nFORCE\r\n$9\r\nreplicaid\r\n$%d\r\n%s\r\n", CLUSTER_NAMELEN, (char *)best_replica->name->ptr); - if (connWrite(best_replica->conn, buf, buflen) == (int)strlen(buf)) { - serverLog(LL_NOTICE, "Sending CLUSTER FAILOVER FORCE to replica %s succeeded.", - replicationGetReplicaName(best_replica)); - } else { - serverLog(LL_WARNING, "Failed to send CLUSTER FAILOVER FORCE to replica: %s", strerror(errno)); - } + size_t buflen = snprintf(buf, sizeof(buf), + "*5\r\n$7\r\nCLUSTER\r\n" + "$8\r\nFAILOVER\r\n" + "$5\r\nFORCE\r\n" + "$9\r\nREPLICAID\r\n" + "$%d\r\n%s\r\n", + CLUSTER_NAMELEN, + (char *)best_replica->name->ptr); + /* Must install write handler for all replicas first before feeding + * replication stream. */ + prepareReplicasToWrite(); + feedReplicationBuffer(buf, buflen); } else { serverLog(LL_NOTICE, "Unable to find a replica to perform an auto failover on shutdown."); } @@ -7033,7 +7041,9 @@ int clusterCommandSpecial(client *c) { addReplyLongLong(c, clusterNodeFailureReportsCount(n)); } } else if (!strcasecmp(c->argv[1]->ptr, "failover") && (c->argc >= 2)) { - /* CLUSTER FAILOVER [FORCE|TAKEOVER] [replicaid ] */ + /* CLUSTER FAILOVER [FORCE|TAKEOVER] [REPLICAID ] + * REPLICAID is currently available only for internal so we won't + * put it into the JSON file. */ int force = 0, takeover = 0; robj *replicaid = NULL; @@ -7044,7 +7054,8 @@ int clusterCommandSpecial(client *c) { } else if (!strcasecmp(c->argv[j]->ptr, "takeover")) { takeover = 1; force = 1; /* Takeover also implies force. */ - } else if (!strcasecmp(c->argv[j]->ptr, "replicaid") && moreargs) { + } else if (c == server.primary && !strcasecmp(c->argv[j]->ptr, "replicaid") && moreargs) { + /* This option is currently available only for primary. */ j++; replicaid = c->argv[j]; } else { @@ -7054,13 +7065,11 @@ int clusterCommandSpecial(client *c) { } /* Check if it should be executed by myself. */ - if (replicaid != NULL) { - clusterNode *n = clusterLookupNode(replicaid->ptr, sdslen(replicaid->ptr)); - if (n != myself) { - /* Ignore this command, including the sanity check and the process. */ - addReply(c, shared.ok); - return 1; - } + if (replicaid != NULL && memcmp(replicaid->ptr, myself->name, CLUSTER_NAMELEN) != 0) { + /* Ignore this command, including the sanity check and the process. */ + serverLog(LL_NOTICE, "return ok"); + addReply(c, shared.ok); + return 1; } /* Check preconditions. */ diff --git a/src/replication.c b/src/replication.c index 106325f10c..7c06944c62 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3704,7 +3704,7 @@ void syncWithPrimary(connection *conn) { if (server.repl_state == REPL_STATE_RECEIVE_SETNAME_REPLY) { err = receiveSynchronousResponse(conn); if (err == NULL) goto no_response_error; - /* Ignore the error if any. 8.1 introduced this logic and we don't care if it failed. */ + /* Ignore the error if any, we don't care if it failed, it is best effort. */ if (err[0] == '-') { serverLog(LL_NOTICE, "(Non critical) Primary does not understand CLIENT SETNAME: %s", err); } diff --git a/src/server.c b/src/server.c index 144841eff9..b821a4429d 100644 --- a/src/server.c +++ b/src/server.c @@ -4585,6 +4585,9 @@ int finishShutdown(void) { unlink(server.pidfile); } + /* Handle cluster-related matters when shutdown. */ + if (server.cluster_enabled) clusterHandleServerShutdown(); + /* Best effort flush of replica output buffers, so that we hopefully * send them pending writes. */ flushReplicasOutputBuffers(); @@ -4592,9 +4595,6 @@ int finishShutdown(void) { /* Close the listening sockets. Apparently this allows faster restarts. */ closeListeningSockets(1); - /* Handle cluster-related matters when shutdown. */ - if (server.cluster_enabled) clusterHandleServerShutdown(); - serverLog(LL_WARNING, "%s is now ready to exit, bye bye...", server.sentinel_mode ? "Sentinel" : "Valkey"); return C_OK; diff --git a/src/server.h b/src/server.h index 7434568142..d9dbd20455 100644 --- a/src/server.h +++ b/src/server.h @@ -2891,6 +2891,7 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout); ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); /* Replication */ +int prepareReplicasToWrite(void); void replicationFeedReplicas(int dictid, robj **argv, int argc); void replicationFeedStreamFromPrimaryStream(char *buf, size_t buflen); void resetReplicationBuffer(void); diff --git a/tests/unit/cluster/auto-failover-on-shutdown.tcl b/tests/unit/cluster/auto-failover-on-shutdown.tcl index 00e15e009e..b0d9015441 100644 --- a/tests/unit/cluster/auto-failover-on-shutdown.tcl +++ b/tests/unit/cluster/auto-failover-on-shutdown.tcl @@ -54,7 +54,6 @@ proc test_main {how shutdown_timeout} { } # Make sure that the expected logs are printed. - verify_log_message 0 "*Sending CLUSTER FAILOVER FORCE to replica*" 0 verify_log_message -6 "*Forced failover primary request accepted*" 0 resume_process $replica1_pid From 7d55db6500a6f420381e6cc793d52a4a08260e6b Mon Sep 17 00:00:00 2001 From: Binbin Date: Wed, 5 Feb 2025 11:10:28 +0800 Subject: [PATCH 13/23] code review: remove error reply check, add cross-version test Signed-off-by: Binbin --- src/cluster_legacy.c | 24 ++++++++----- tests/assets/minimal-cluster.conf | 1 + .../cluster/auto-failover-on-shutdown.tcl | 12 +++---- tests/unit/cluster/cross-version-cluster.tcl | 36 +++++++++++++++++++ utils/create-cluster/create-cluster | 4 +-- 5 files changed, 61 insertions(+), 16 deletions(-) create mode 100644 tests/unit/cluster/cross-version-cluster.tcl diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index bfbddd3897..f940e7c396 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1254,6 +1254,7 @@ void clusterInitLast(void) { void clusterHandleServerShutdown(void) { if (nodeIsPrimary(myself) && server.auto_failover_on_shutdown) { /* Find the first best replica, that is, the replica with the largest offset. */ + int old_replica = 0; client *best_replica = NULL; listIter replicas_iter; listNode *replicas_list_node; @@ -1261,13 +1262,16 @@ void clusterHandleServerShutdown(void) { while ((replicas_list_node = listNext(&replicas_iter)) != NULL) { client *replica = listNodeValue(replicas_list_node); /* This is done only when the replica offset is caught up, to avoid data loss. - * And 0x800ff is 8.0.255, we only support new versions for this feature. */ + * And 0x80100 is 8.1.0, we only support this feature in this version. */ + if (replica->repl_data->replica_version < 0x80100) { + old_replica = 1; + best_replica = NULL; + break; + } if (replica->repl_data->repl_state == REPLICA_STATE_ONLINE && - // replica->repl_data->replica_version > 0x800ff && replica->name && sdslen(replica->name->ptr) == CLUSTER_NAMELEN && replica->repl_data->repl_ack_off == server.primary_repl_offset) { best_replica = replica; - break; } } @@ -1289,8 +1293,13 @@ void clusterHandleServerShutdown(void) { * replication stream. */ prepareReplicasToWrite(); feedReplicationBuffer(buf, buflen); + serverLog(LL_NOTICE, "Perform auto failover to replica %s on shutdown.", (char *)best_replica->name->ptr); } else { - serverLog(LL_NOTICE, "Unable to find a replica to perform an auto failover on shutdown."); + if (old_replica) { + serverLog(LL_NOTICE, "Unable to perform auto failover on shutdown since there are old replicas."); + } else { + serverLog(LL_NOTICE, "Unable to find a replica to perform the auto failover on shutdown."); + } } } @@ -7152,20 +7161,19 @@ int clusterCommandSpecial(client *c) { /* Check if it should be executed by myself. */ if (replicaid != NULL && memcmp(replicaid->ptr, myself->name, CLUSTER_NAMELEN) != 0) { /* Ignore this command, including the sanity check and the process. */ - serverLog(LL_NOTICE, "return ok"); addReply(c, shared.ok); return 1; } /* Check preconditions. */ if (clusterNodeIsPrimary(myself)) { - if (replicaid == NULL) addReplyError(c, "You should send CLUSTER FAILOVER to a replica"); + addReplyError(c, "You should send CLUSTER FAILOVER to a replica"); return 1; } else if (myself->replicaof == NULL) { - if (replicaid == NULL) addReplyError(c, "I'm a replica but my master is unknown to me"); + addReplyError(c, "I'm a replica but my master is unknown to me"); return 1; } else if (!force && (nodeFailed(myself->replicaof) || myself->replicaof->link == NULL)) { - if (replicaid == NULL) addReplyError(c, "Master is down or failed, please use CLUSTER FAILOVER FORCE"); + addReplyError(c, "Master is down or failed, please use CLUSTER FAILOVER FORCE"); return 1; } resetManualFailover(); diff --git a/tests/assets/minimal-cluster.conf b/tests/assets/minimal-cluster.conf index 426d2aa166..65a6315a8b 100644 --- a/tests/assets/minimal-cluster.conf +++ b/tests/assets/minimal-cluster.conf @@ -1,4 +1,5 @@ # Minimal configuration for testing. +databases 1 always-show-logo yes daemonize no pidfile /var/run/valkey.pid diff --git a/tests/unit/cluster/auto-failover-on-shutdown.tcl b/tests/unit/cluster/auto-failover-on-shutdown.tcl index b0d9015441..bf6853bd3b 100644 --- a/tests/unit/cluster/auto-failover-on-shutdown.tcl +++ b/tests/unit/cluster/auto-failover-on-shutdown.tcl @@ -9,7 +9,7 @@ proc shutdown_how {srv_id how} { # We will start a cluster with 3 primary nodes and 4 replicas, the primary 1 will have 2 replicas. # We will pause the replica 1, and then shutdown the primary 1, and making replica 2 to become # the new primary. -proc test_main {how shutdown_timeout} { +proc test_auto_failover {how shutdown_timeout} { test "auto-failover-on-shutdown will always pick a best replica and send CLUSTER FAILOVER - $how - shutdown-timeout: $shutdown_timeout" { set primary [srv 0 client] set replica1 [srv -3 client] @@ -69,24 +69,24 @@ proc test_main {how shutdown_timeout} { $primary config set auto-failover-on-shutdown yes $primary client kill type replica shutdown_how 6 $how - wait_for_log_messages -6 {"*Unable to find a replica to perform an auto failover on shutdown*"} 0 1000 10 + wait_for_log_messages -6 {"*Unable to find a replica to perform the auto failover on shutdown*"} 0 1000 10 resume_process $replica1_pid } } start_cluster 3 4 {tags {external:skip cluster}} { - test_main "shutdown" 0 + test_auto_failover "shutdown" 0 } start_cluster 3 4 {tags {external:skip cluster}} { - test_main "sigterm" 0 + test_auto_failover "sigterm" 0 } start_cluster 3 4 {tags {external:skip cluster}} { - test_main "shutdown" 10 + test_auto_failover "shutdown" 10 } start_cluster 3 4 {tags {external:skip cluster}} { - test_main "sigterm" 10 + test_auto_failover "sigterm" 10 } diff --git a/tests/unit/cluster/cross-version-cluster.tcl b/tests/unit/cluster/cross-version-cluster.tcl new file mode 100644 index 0000000000..f6825ae8bb --- /dev/null +++ b/tests/unit/cluster/cross-version-cluster.tcl @@ -0,0 +1,36 @@ +# Test cross version compatibility of cluster. +# +# Use minimal.conf to make sure we don't use any configs not supported on the old version. + +# make sure the test infra won't use SELECT +set old_singledb $::singledb +set ::singledb 1 + +tags {external:skip needs:other-server cluster} { + # To run this test use the `--other-server-path` parameter and pass in a compatible server path supporting + # SendClusterMessage module API. + # + # ./runtest --single unit/cluster/cross-version-cluster --other-server-path tests/tmp/valkey-7-2/valkey-server + + # Test cross version compatibility of cluster module send message API. + start_cluster 3 1 {tags {external:skip cluster} overrides {auto-failover-on-shutdown yes cluster-ping-interval 1000}} { + set primary [srv 0 client] + set primary_host [srv 0 host] + set primary_port [srv 0 port] + set primary_id [$primary cluster myid] + + start_server {config "minimal-cluster.conf" start-other-server 1 overrides {cluster-ping-interval 1000}} { + # Add a replica of the old version to the cluster + r cluster meet $primary_host $primary_port + wait_for_cluster_propagation + r cluster replicate $primary_id + wait_for_cluster_state "ok" + + # Make sure the primary won't do the auto-failover. + catch {$primary shutdown nosave} + verify_log_message -1 "*Unable to perform auto failover on shutdown since there are old replicas*" 0 + } + } +} + +set ::singledb $old_singledb diff --git a/utils/create-cluster/create-cluster b/utils/create-cluster/create-cluster index cfad7fbf9d..a26c4fd22f 100755 --- a/utils/create-cluster/create-cluster +++ b/utils/create-cluster/create-cluster @@ -28,7 +28,7 @@ then while [ $((PORT < ENDPORT)) != "0" ]; do PORT=$((PORT+1)) echo "Starting $PORT" - $BIN_PATH/valkey-server --port $PORT --protected-mode $PROTECTED_MODE --cluster-enabled yes --cluster-config-file nodes-${PORT}.conf --cluster-node-timeout $TIMEOUT --appendonly yes --appendfilename appendonly-${PORT}.aof --appenddirname appendonlydir-${PORT} --dbfilename dump-${PORT}.rdb --logfile ${PORT}.log --daemonize yes --enable-protected-configs yes --enable-debug-command yes --enable-module-command yes ${ADDITIONAL_OPTIONS} + $BIN_PATH/valkey-server --port $PORT --protected-mode $PROTECTED_MODE --databases 1 --cluster-enabled yes --cluster-config-file nodes-${PORT}.conf --cluster-node-timeout $TIMEOUT --appendonly yes --appendfilename appendonly-${PORT}.aof --appenddirname appendonlydir-${PORT} --dbfilename dump-${PORT}.rdb --logfile ${PORT}.log --daemonize yes --enable-protected-configs yes --enable-debug-command yes --enable-module-command yes --auto-failover-on-shutdown yes ${ADDITIONAL_OPTIONS} done exit 0 fi @@ -70,7 +70,7 @@ then while [ $((PORT < ENDPORT)) != "0" ]; do PORT=$((PORT+1)) echo "Starting $PORT" - $BIN_PATH/valkey-server --port $PORT --protected-mode $PROTECTED_MODE --cluster-enabled yes --cluster-config-file nodes-${PORT}.conf --cluster-node-timeout $TIMEOUT --appendonly yes --appendfilename appendonly-${PORT}.aof --appenddirname appendonlydir-${PORT} --dbfilename dump-${PORT}.rdb --logfile ${PORT}.log --daemonize yes --enable-protected-configs yes --enable-debug-command yes --enable-module-command yes ${ADDITIONAL_OPTIONS} + $BIN_PATH/valkey-server --port $PORT --protected-mode $PROTECTED_MODE --databases 1 --cluster-enabled yes --cluster-config-file nodes-${PORT}.conf --cluster-node-timeout $TIMEOUT --appendonly yes --appendfilename appendonly-${PORT}.aof --appenddirname appendonlydir-${PORT} --dbfilename dump-${PORT}.rdb --logfile ${PORT}.log --daemonize yes --enable-protected-configs yes --enable-debug-command yes --enable-module-command yes --auto-failover-on-shutdown yes ${ADDITIONAL_OPTIONS} done exit 0 fi From 4d5da8adea5471b5e6efc14c9c7ff35ed8255e21 Mon Sep 17 00:00:00 2001 From: Binbin Date: Fri, 7 Feb 2025 17:38:22 +0800 Subject: [PATCH 14/23] fix format and add log to debug the test Signed-off-by: Binbin --- src/cluster.h | 10 +++++----- tests/support/util.tcl | 10 +++++----- tests/unit/cluster/auto-failover-on-shutdown.tcl | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/cluster.h b/src/cluster.h index e1b0713d1b..1d62dda6a1 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -5,11 +5,11 @@ * Cluster exported API. *----------------------------------------------------------------------------*/ -#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */ -#define CLUSTER_SLOTS (1 << CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */ -#define CLUSTER_OK 0 /* Everything looks ok */ -#define CLUSTER_FAIL 1 /* The cluster can't work */ -#define CLUSTER_NAMELEN 40 /* sha1 hex length */ +#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */ +#define CLUSTER_SLOTS (1 << CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */ +#define CLUSTER_OK 0 /* Everything looks ok */ +#define CLUSTER_FAIL 1 /* The cluster can't work */ +#define CLUSTER_NAMELEN 40 /* sha1 hex length */ /* Reason why the cluster state changes to fail. When adding new reasons, * make sure to update clusterLogFailReason. */ diff --git a/tests/support/util.tcl b/tests/support/util.tcl index a09c1e6343..9957417f24 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -126,10 +126,10 @@ proc wait_replica_online r { } } -proc check_replica_acked_ofs {primary replica_ip replica_port} { +proc check_replica_acked_ofs {primary replica_host replica_port} { set infostr [$primary info replication] set master_repl_offset [getInfoProperty $infostr master_repl_offset] - if {[regexp -lineanchor "^slave\\d:ip=$replica_ip,port=$replica_port,.*,offset=(\\d+).*\r\n" $infostr _ offset]} { + if {[regexp -lineanchor "^slave\\d:ip=$replica_host,port=$replica_port,.*,offset=(\\d+).*\r\n" $infostr _ offset]} { if {$master_repl_offset == $offset} { return 1 } @@ -138,14 +138,14 @@ proc check_replica_acked_ofs {primary replica_ip replica_port} { return 0 } -proc wait_replica_acked_ofs {primary replica replica_ip replica_port} { +proc wait_replica_acked_ofs {primary replica replica_host replica_port} { $primary config set repl-ping-replica-period 3600 $replica config set hz 500 wait_for_condition 1000 50 { - [check_replica_acked_ofs $primary $replica_ip $replica_port] eq 1 + [check_replica_acked_ofs $primary $replica_host $replica_port] eq 1 } else { puts "INFO REPLICATION: [$primary info replication]" - fail "replica acked offset didn't match in time" + fail "replica $replica_host:$replica_port acked offset didn't match in time" } $primary config set repl-ping-replica-period 10 $replica config set hz 10 diff --git a/tests/unit/cluster/auto-failover-on-shutdown.tcl b/tests/unit/cluster/auto-failover-on-shutdown.tcl index bf6853bd3b..ee1e2b4788 100644 --- a/tests/unit/cluster/auto-failover-on-shutdown.tcl +++ b/tests/unit/cluster/auto-failover-on-shutdown.tcl @@ -15,7 +15,7 @@ proc test_auto_failover {how shutdown_timeout} { set replica1 [srv -3 client] set replica1_pid [s -3 process_id] set replica2 [srv -6 client] - set replica2_ip [srv -6 host] + set replica2_host [srv -6 host] set replica2_port [srv -6 port] $primary config set auto-failover-on-shutdown yes @@ -36,7 +36,7 @@ proc test_auto_failover {how shutdown_timeout} { if {$shutdown_timeout == 0} { # Wait the replica2 catch up with the offset wait_for_ofs_sync $primary $replica2 - wait_replica_acked_ofs $primary $replica2 $replica2_ip $replica2_port + wait_replica_acked_ofs $primary $replica2 $replica2_host $replica2_port } else { # If shutdown-timeout is enable, we expect the primary to pause writing # and wait for the replica to catch up with the offset. From a1f957ca7c22b03291f975b4dfadf7dbf8617ac6 Mon Sep 17 00:00:00 2001 From: Binbin Date: Fri, 7 Feb 2025 17:39:57 +0800 Subject: [PATCH 15/23] Update valkey.conf MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Viktor Söderqvist Signed-off-by: Binbin --- valkey.conf | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/valkey.conf b/valkey.conf index 9fc362f34c..b43f54bb88 100644 --- a/valkey.conf +++ b/valkey.conf @@ -1692,7 +1692,10 @@ aof-timestamp-enabled no # shutdown-on-sigint default # shutdown-on-sigterm default -# TODO +# If the node receives a SIGTERM or is shut down in another way when it is +# running in cluster mode and is a primary with replicas, it can trigger a +# manual failover to one of its replicas before shutting down. This is faster +# and safer than waiting for an automatic failover to happen. Default is 'no'. # # auto-failover-on-shutdown no From 37147e8d515925cc38fa59eb60a2c7234bdec2c9 Mon Sep 17 00:00:00 2001 From: Binbin Date: Fri, 7 Feb 2025 19:37:01 +0800 Subject: [PATCH 16/23] minor fixes in regexp, avoid matching the second line Signed-off-by: Binbin --- tests/support/util.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 9957417f24..86932e5370 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -129,7 +129,7 @@ proc wait_replica_online r { proc check_replica_acked_ofs {primary replica_host replica_port} { set infostr [$primary info replication] set master_repl_offset [getInfoProperty $infostr master_repl_offset] - if {[regexp -lineanchor "^slave\\d:ip=$replica_host,port=$replica_port,.*,offset=(\\d+).*\r\n" $infostr _ offset]} { + if {[regexp -lineanchor "^slave\\d+:ip=$replica_host,port=$replica_port,state=online,offset=(\\d+).*\r\n" $infostr _ offset]} { if {$master_repl_offset == $offset} { return 1 } From 27b6f6d4bfc06ce95b3320e81a905b5cc3a8f1f3 Mon Sep 17 00:00:00 2001 From: Binbin Date: Tue, 11 Feb 2025 16:13:53 +0800 Subject: [PATCH 17/23] code review from Ping Signed-off-by: Binbin --- src/cluster_legacy.c | 105 +++++++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 49 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index f940e7c396..99d94bc6dc 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1250,59 +1250,66 @@ void clusterInitLast(void) { } } -/* Called when a cluster node receives SHUTDOWN. */ -void clusterHandleServerShutdown(void) { - if (nodeIsPrimary(myself) && server.auto_failover_on_shutdown) { - /* Find the first best replica, that is, the replica with the largest offset. */ - int old_replica = 0; - client *best_replica = NULL; - listIter replicas_iter; - listNode *replicas_list_node; - listRewind(server.replicas, &replicas_iter); - while ((replicas_list_node = listNext(&replicas_iter)) != NULL) { - client *replica = listNodeValue(replicas_list_node); - /* This is done only when the replica offset is caught up, to avoid data loss. - * And 0x80100 is 8.1.0, we only support this feature in this version. */ - if (replica->repl_data->replica_version < 0x80100) { - old_replica = 1; - best_replica = NULL; - break; - } - if (replica->repl_data->repl_state == REPLICA_STATE_ONLINE && - replica->name && sdslen(replica->name->ptr) == CLUSTER_NAMELEN && - replica->repl_data->repl_ack_off == server.primary_repl_offset) { - best_replica = replica; - } - } - - if (best_replica) { - /* Send the CLUSTER FAILOVER FORCE REPLICAID node-id to all replicas since - * it is a shared replication buffer, but only the replica with the matching - * node-id will execute it. The caller will call flushReplicasOutputBuffers, - * so in here it is a best effort. */ - char buf[128]; - size_t buflen = snprintf(buf, sizeof(buf), - "*5\r\n$7\r\nCLUSTER\r\n" - "$8\r\nFAILOVER\r\n" - "$5\r\nFORCE\r\n" - "$9\r\nREPLICAID\r\n" - "$%d\r\n%s\r\n", - CLUSTER_NAMELEN, - (char *)best_replica->name->ptr); - /* Must install write handler for all replicas first before feeding - * replication stream. */ - prepareReplicasToWrite(); - feedReplicationBuffer(buf, buflen); - serverLog(LL_NOTICE, "Perform auto failover to replica %s on shutdown.", (char *)best_replica->name->ptr); +void clusterAutoFailoverOnShutdown(void) { + if (!nodeIsPrimary(myself) || !server.auto_failover_on_shutdown) return; + + /* Find the first best replica, that is, the replica with the largest offset. */ + int legacy_replica = 0; + client *best_replica = NULL; + listIter replicas_iter; + listNode *replicas_list_node; + listRewind(server.replicas, &replicas_iter); + while ((replicas_list_node = listNext(&replicas_iter)) != NULL) { + client *replica = listNodeValue(replicas_list_node); + /* This is done only when the replica offset is caught up, to avoid data loss. + * And 0x80100 is 8.1.0, we only support this feature in this version. */ + if (replica->repl_data->replica_version < 0x80100) { + legacy_replica = 1; + best_replica = NULL; + break; + } + if (replica->repl_data->repl_state == REPLICA_STATE_ONLINE && + replica->name && sdslen(replica->name->ptr) == CLUSTER_NAMELEN && + replica->repl_data->repl_ack_off == server.primary_repl_offset) { + best_replica = replica; + } + } + + /* We are not able to find the replica to do the auto failover. */ + if (best_replica == NULL) { + if (legacy_replica) { + serverLog(LL_NOTICE, "Unable to perform auto failover on shutdown since there are legacy replicas."); } else { - if (old_replica) { - serverLog(LL_NOTICE, "Unable to perform auto failover on shutdown since there are old replicas."); - } else { - serverLog(LL_NOTICE, "Unable to find a replica to perform the auto failover on shutdown."); - } + serverLog(LL_NOTICE, "Unable to find a replica to perform the auto failover on shutdown."); } + return; } + /* Send the CLUSTER FAILOVER FORCE REPLICAID node-id to all replicas since + * it is a shared replication buffer, but only the replica with the matching + * node-id will execute it. The caller will call flushReplicasOutputBuffers, + * so in here it is a best effort. */ + char buf[128]; + size_t buflen = snprintf(buf, sizeof(buf), + "*5\r\n$7\r\nCLUSTER\r\n" + "$8\r\nFAILOVER\r\n" + "$5\r\nFORCE\r\n" + "$9\r\nREPLICAID\r\n" + "$%d\r\n%s\r\n", + CLUSTER_NAMELEN, + (char *)best_replica->name->ptr); + /* Must install write handler for all replicas first before feeding + * replication stream. */ + prepareReplicasToWrite(); + feedReplicationBuffer(buf, buflen); + serverLog(LL_NOTICE, "Perform auto failover to replica %s on shutdown.", (char *)best_replica->name->ptr); +} + +/* Called when a cluster node receives SHUTDOWN. */ +void clusterHandleServerShutdown(void) { + /* Check if we are able to do the auto failover on shutdown. */ + clusterAutoFailoverOnShutdown(); + /* The error logs have been logged in the save function if the save fails. */ serverLog(LL_NOTICE, "Saving the cluster configuration file before exiting."); clusterSaveConfig(1); From 8423921c364cc7b16e77309fd83acc35d4d20a75 Mon Sep 17 00:00:00 2001 From: Binbin Date: Tue, 11 Feb 2025 16:14:37 +0800 Subject: [PATCH 18/23] Fix test Signed-off-by: Binbin --- tests/unit/cluster/cross-version-cluster.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/cluster/cross-version-cluster.tcl b/tests/unit/cluster/cross-version-cluster.tcl index f6825ae8bb..29ab8d8d6b 100644 --- a/tests/unit/cluster/cross-version-cluster.tcl +++ b/tests/unit/cluster/cross-version-cluster.tcl @@ -28,7 +28,7 @@ tags {external:skip needs:other-server cluster} { # Make sure the primary won't do the auto-failover. catch {$primary shutdown nosave} - verify_log_message -1 "*Unable to perform auto failover on shutdown since there are old replicas*" 0 + verify_log_message -1 "*Unable to perform auto failover on shutdown since there are legacy replicas*" 0 } } } From 9e00910346d61c79d91d30dbe60789826a127735 Mon Sep 17 00:00:00 2001 From: Binbin Date: Tue, 25 Feb 2025 14:44:18 +0800 Subject: [PATCH 19/23] change to use replconf set-cluster-node-id Signed-off-by: Binbin --- src/cluster_legacy.c | 5 ++--- src/replication.c | 30 ++++++++++++++++++++++++------ src/server.h | 5 +++-- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 4533482d4d..b2e266d381 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1269,7 +1269,6 @@ void clusterAutoFailoverOnShutdown(void) { break; } if (replica->repl_data->repl_state == REPLICA_STATE_ONLINE && - replica->name && sdslen(replica->name->ptr) == CLUSTER_NAMELEN && replica->repl_data->repl_ack_off == server.primary_repl_offset) { best_replica = replica; } @@ -1297,12 +1296,12 @@ void clusterAutoFailoverOnShutdown(void) { "$9\r\nREPLICAID\r\n" "$%d\r\n%s\r\n", CLUSTER_NAMELEN, - (char *)best_replica->name->ptr); + best_replica->repl_data->nodeid); /* Must install write handler for all replicas first before feeding * replication stream. */ prepareReplicasToWrite(); feedReplicationBuffer(buf, buflen); - serverLog(LL_NOTICE, "Perform auto failover to replica %s on shutdown.", (char *)best_replica->name->ptr); + serverLog(LL_NOTICE, "Perform auto failover to replica %.40s on shutdown.", best_replica->repl_data->nodeid); } /* Called when a cluster node receives SHUTDOWN. */ diff --git a/src/replication.c b/src/replication.c index b11a973bf9..202b1d1064 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1345,6 +1345,13 @@ void freeClientReplicationData(client *c) { * - rdb-channel <1|0> * Used to identify the client as a replica's rdb connection in an dual channel * sync session. + * + * - set-rdb-client-id + * Used to identify the current replica main channel with existing rdb-connection + * with the given id. + * + * - set-cluster-node-id + * Used to inform the primary of the node-id of the replica in cluster mode. * */ void replconfCommand(client *c) { int j; @@ -1494,6 +1501,14 @@ void replconfCommand(client *c) { return; } c->repl_data->associated_rdb_client_id = (uint64_t)client_id; + } else if (!strcasecmp(c->argv[j]->ptr, "set-cluster-node-id")) { + /* REPLCONF SET-CLUSTER-NODE-ID */ + if (!server.cluster_enabled) return; + + clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); + if (!n) return; + + memcpy(c->repl_data->nodeid, n->name, CLUSTER_NAMELEN); } else { addReplyErrorFormat(c, "Unrecognized REPLCONF option: %s", (char *)c->argv[j]->ptr); return; @@ -3628,8 +3643,8 @@ void syncWithPrimary(connection *conn) { /* Inform the primary of our (replica) node name. */ if (server.cluster_enabled) { - char *argv[] = {"CLIENT", "SETNAME", server.cluster->myself->name}; - size_t lens[] = {6, 7, CLUSTER_NAMELEN}; + char *argv[] = {"REPLCONF", "SET-CLUSTER-NODE-ID", server.cluster->myself->name}; + size_t lens[] = {8, 19, CLUSTER_NAMELEN}; err = sendCommandArgv(conn, 3, argv, lens); if (err) goto write_error; } @@ -3725,20 +3740,23 @@ void syncWithPrimary(connection *conn) { sdsfree(err); err = NULL; if (server.cluster_enabled) { - server.repl_state = REPL_STATE_RECEIVE_SETNAME_REPLY; + server.repl_state = REPL_STATE_RECEIVE_NODEID_REPLY; return; } else { server.repl_state = REPL_STATE_SEND_PSYNC; } } - /* Receive CLIENT SETNAME reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_SETNAME_REPLY) { + /* Receive REPLCONF SET-CLUSTER-NODE-ID reply. */ + if (server.repl_state == REPL_STATE_RECEIVE_NODEID_REPLY) { err = receiveSynchronousResponse(conn); if (err == NULL) goto no_response_error; /* Ignore the error if any, we don't care if it failed, it is best effort. */ if (err[0] == '-') { - serverLog(LL_NOTICE, "(Non critical) Primary does not understand CLIENT SETNAME: %s", err); + serverLog(LL_NOTICE, + "(Non critical) Primary does not understand " + "REPLCONF SET-CLUSTER-NODE-ID: %s", + err); } sdsfree(err); err = NULL; diff --git a/src/server.h b/src/server.h index a1eba2fe31..274bab61af 100644 --- a/src/server.h +++ b/src/server.h @@ -151,6 +151,7 @@ struct hdr_histogram; #else #define CONFIG_ACTIVE_DEFRAG_DEFAULT 1 #endif +#define CLUSTER_NAMELEN 40 /* Bucket sizes for client eviction pools. Each bucket stores clients with * memory usage of up to twice the size of the bucket below it. */ @@ -410,7 +411,7 @@ typedef enum { REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_VERSION_REPLY, /* Wait for REPLCONF reply */ - REPL_STATE_RECEIVE_SETNAME_REPLY, /* Wait for CLIENT SETNAME reply */ + REPL_STATE_RECEIVE_NODEID_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_SEND_PSYNC, /* Send PSYNC */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ /* --- End of handshake states --- */ @@ -550,7 +551,6 @@ typedef enum { #define MAXMEMORY_FLAG_LRU (1 << 0) #define MAXMEMORY_FLAG_LFU (1 << 1) #define MAXMEMORY_FLAG_ALLKEYS (1 << 2) -#define MAXMEMORY_FLAG_NO_SHARED_INTEGERS (MAXMEMORY_FLAG_LRU | MAXMEMORY_FLAG_LFU) #define MAXMEMORY_VOLATILE_LRU ((0 << 8) | MAXMEMORY_FLAG_LRU) #define MAXMEMORY_VOLATILE_LFU ((1 << 8) | MAXMEMORY_FLAG_LFU) @@ -1155,6 +1155,7 @@ typedef struct ClientReplicationData { see the definition of replBufBlock. */ size_t ref_block_pos; /* Access position of referenced buffer block, i.e. the next offset to send. */ + char nodeid[CLUSTER_NAMELEN]; /* Node id in cluster mode. */ } ClientReplicationData; typedef struct ClientModuleData { From 8cba555bebe58cd4d10cf4caf87dae0dcfad5aff Mon Sep 17 00:00:00 2001 From: Binbin Date: Tue, 25 Feb 2025 14:52:17 +0800 Subject: [PATCH 20/23] Update valkey.conf Co-authored-by: Ping Xie Signed-off-by: Binbin --- valkey.conf | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/valkey.conf b/valkey.conf index 0c06ca7db0..b2b7438ec8 100644 --- a/valkey.conf +++ b/valkey.conf @@ -1692,10 +1692,10 @@ aof-timestamp-enabled no # shutdown-on-sigint default # shutdown-on-sigterm default -# If the node receives a SIGTERM or is shut down in another way when it is -# running in cluster mode and is a primary with replicas, it can trigger a -# manual failover to one of its replicas before shutting down. This is faster -# and safer than waiting for an automatic failover to happen. Default is 'no'. +# In cluster mode, if a primary node with replicas receives a SIGTERM or is shut down, +# it can proactively initiate a manual failover. This promotes one of its replicas to + # primary before shutdown, resulting in a quicker and safer transition than relying on +# an automatic failover. Default is 'no'. # # auto-failover-on-shutdown no From ade48cb5def1c2a7561196603dbe1e2e27ddb028 Mon Sep 17 00:00:00 2001 From: Binbin Date: Tue, 25 Feb 2025 17:00:02 +0800 Subject: [PATCH 21/23] Change nodeid to sds Signed-off-by: Binbin --- src/cluster_legacy.c | 7 ++++--- src/replication.c | 16 ++++++++++++---- src/server.h | 3 +-- valkey.conf | 2 +- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index b2e266d381..c01be5022b 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1269,7 +1269,8 @@ void clusterAutoFailoverOnShutdown(void) { break; } if (replica->repl_data->repl_state == REPLICA_STATE_ONLINE && - replica->repl_data->repl_ack_off == server.primary_repl_offset) { + replica->repl_data->repl_ack_off == server.primary_repl_offset && + replica->repl_data->replica_nodeid && sdslen(replica->repl_data->replica_nodeid) == CLUSTER_NAMELEN) { best_replica = replica; } } @@ -1296,12 +1297,12 @@ void clusterAutoFailoverOnShutdown(void) { "$9\r\nREPLICAID\r\n" "$%d\r\n%s\r\n", CLUSTER_NAMELEN, - best_replica->repl_data->nodeid); + best_replica->repl_data->replica_nodeid); /* Must install write handler for all replicas first before feeding * replication stream. */ prepareReplicasToWrite(); feedReplicationBuffer(buf, buflen); - serverLog(LL_NOTICE, "Perform auto failover to replica %.40s on shutdown.", best_replica->repl_data->nodeid); + serverLog(LL_NOTICE, "Perform auto failover to replica %s on shutdown.", best_replica->repl_data->replica_nodeid); } /* Called when a cluster node receives SHUTDOWN. */ diff --git a/src/replication.c b/src/replication.c index 202b1d1064..8746a01c96 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1296,6 +1296,7 @@ void freeClientReplicationData(client *c) { } if (c->flag.primary) replicationHandlePrimaryDisconnection(); sdsfree(c->repl_data->replica_addr); + sdsfree(c->repl_data->replica_nodeid); zfree(c->repl_data); c->repl_data = NULL; } @@ -1503,12 +1504,19 @@ void replconfCommand(client *c) { c->repl_data->associated_rdb_client_id = (uint64_t)client_id; } else if (!strcasecmp(c->argv[j]->ptr, "set-cluster-node-id")) { /* REPLCONF SET-CLUSTER-NODE-ID */ - if (!server.cluster_enabled) return; + if (!server.cluster_enabled) { + addReplyError(c, "This instance has cluster support disabled"); + return; + } - clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); - if (!n) return; + clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[j + 1]->ptr)); + if (!n) { + addReplyErrorFormat(c, "Unknown node %s", (char *)c->argv[j + 1]->ptr); + return; + } - memcpy(c->repl_data->nodeid, n->name, CLUSTER_NAMELEN); + if (c->repl_data->replica_nodeid) sdsfree(c->repl_data->replica_nodeid); + c->repl_data->replica_nodeid = sdsdup(c->argv[j + 1]->ptr); } else { addReplyErrorFormat(c, "Unrecognized REPLCONF option: %s", (char *)c->argv[j]->ptr); return; diff --git a/src/server.h b/src/server.h index 274bab61af..ff576a7b8e 100644 --- a/src/server.h +++ b/src/server.h @@ -151,7 +151,6 @@ struct hdr_histogram; #else #define CONFIG_ACTIVE_DEFRAG_DEFAULT 1 #endif -#define CLUSTER_NAMELEN 40 /* Bucket sizes for client eviction pools. Each bucket stores clients with * memory usage of up to twice the size of the bucket below it. */ @@ -1155,7 +1154,7 @@ typedef struct ClientReplicationData { see the definition of replBufBlock. */ size_t ref_block_pos; /* Access position of referenced buffer block, i.e. the next offset to send. */ - char nodeid[CLUSTER_NAMELEN]; /* Node id in cluster mode. */ + sds replica_nodeid; /* Node id in cluster mode. */ } ClientReplicationData; typedef struct ClientModuleData { diff --git a/valkey.conf b/valkey.conf index b2b7438ec8..e283df2cfb 100644 --- a/valkey.conf +++ b/valkey.conf @@ -1694,7 +1694,7 @@ aof-timestamp-enabled no # In cluster mode, if a primary node with replicas receives a SIGTERM or is shut down, # it can proactively initiate a manual failover. This promotes one of its replicas to - # primary before shutdown, resulting in a quicker and safer transition than relying on +# primary before shutdown, resulting in a quicker and safer transition than relying on # an automatic failover. Default is 'no'. # # auto-failover-on-shutdown no From 6b5cf7ff996965b05b925c8df3b9d739a596e7ac Mon Sep 17 00:00:00 2001 From: Binbin Date: Tue, 25 Feb 2025 17:02:11 +0800 Subject: [PATCH 22/23] code review from Ping, add the assert <= 128 Signed-off-by: Binbin --- src/cluster_legacy.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index c01be5022b..7c0a8f2c8a 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1298,6 +1298,7 @@ void clusterAutoFailoverOnShutdown(void) { "$%d\r\n%s\r\n", CLUSTER_NAMELEN, best_replica->repl_data->replica_nodeid); + serverAssert(buflen <= 128); /* Must install write handler for all replicas first before feeding * replication stream. */ prepareReplicasToWrite(); From e3fdb7c262437a691a4c5d05408e38ad808c7556 Mon Sep 17 00:00:00 2001 From: Binbin Date: Tue, 25 Feb 2025 17:02:50 +0800 Subject: [PATCH 23/23] Fix the index Signed-off-by: Binbin --- src/replication.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 8746a01c96..a3de3be6dd 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1509,7 +1509,7 @@ void replconfCommand(client *c) { return; } - clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[j + 1]->ptr)); + clusterNode *n = clusterLookupNode(c->argv[j + 1]->ptr, sdslen(c->argv[j + 1]->ptr)); if (!n) { addReplyErrorFormat(c, "Unknown node %s", (char *)c->argv[j + 1]->ptr); return;