diff --git a/src/cluster.c b/src/cluster.c index caa1e5798b..b03407e499 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -814,8 +814,16 @@ static int shouldReturnTlsInfo(void) { } } +unsigned int countKeysInSlotForDb(unsigned int hashslot, serverDb *db) { + return kvstoreHashtableSize(db->keys, hashslot); +} + unsigned int countKeysInSlot(unsigned int slot) { - return kvstoreHashtableSize(server.db->keys, slot); + unsigned int result = 0; + for (int i = 0; i < server.dbnum; i++) { + result += countKeysInSlotForDb(slot, server.db + i); + } + return result; } void clusterCommandHelp(client *c) { @@ -897,7 +905,7 @@ void clusterCommand(client *c) { addReplyError(c, "Invalid slot"); return; } - addReplyLongLong(c, countKeysInSlot(slot)); + addReplyLongLong(c, countKeysInSlotForDb(slot, c->db)); } else if (!strcasecmp(c->argv[1]->ptr, "getkeysinslot") && c->argc == 4) { /* CLUSTER GETKEYSINSLOT */ long long maxkeys, slot; @@ -909,11 +917,11 @@ void clusterCommand(client *c) { return; } - unsigned int keys_in_slot = countKeysInSlot(slot); + unsigned int keys_in_slot = countKeysInSlotForDb(slot, c->db); unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys; addReplyArrayLen(c, numkeys); kvstoreHashtableIterator *kvs_di = NULL; - kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot, 0); + kvs_di = kvstoreGetHashtableIterator(c->db->keys, slot, 0); for (unsigned int i = 0; i < numkeys; i++) { void *next; serverAssert(kvstoreHashtableIteratorNext(kvs_di, &next)); @@ -1102,7 +1110,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int * NODE . */ int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE; if ((migrating_slot || importing_slot) && !pubsubshard_included) { - if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) + if (lookupKeyReadWithFlags(c->db, thiskey, flags) == NULL) missing_keys++; else existing_keys++; diff --git a/src/cluster.h b/src/cluster.h index 142f2d70b3..c2ba0e67ac 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -115,6 +115,7 @@ int detectAndUpdateCachedNodeHealth(void); client *createCachedResponseClient(int resp); void deleteCachedResponseClient(client *recording_client); void clearCachedClusterSlotsResponse(void); +unsigned int countKeysInSlotForDb(unsigned int hashslot, serverDb *db); unsigned int countKeysInSlot(unsigned int hashslot); int getSlotOrReply(client *c, robj *o); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 26aea15bbd..ebf5c53a64 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -5794,11 +5794,6 @@ int verifyClusterConfigWithData(void) { * completely depend on the replication stream. */ if (nodeIsReplica(myself)) return C_OK; - /* Make sure we only have keys in DB0. */ - for (j = 1; j < server.dbnum; j++) { - if (kvstoreSize(server.db[j].keys)) return C_ERR; - } - /* Check that all the slots we see populated memory have a corresponding * entry in the cluster table. Otherwise fix the table. */ for (j = 0; j < CLUSTER_SLOTS; j++) { @@ -6430,29 +6425,31 @@ unsigned int delKeysInSlot(unsigned int hashslot) { server.server_del_keys_in_slot = 1; unsigned int j = 0; - kvstoreHashtableIterator *kvs_di = NULL; - void *next; - kvs_di = kvstoreGetHashtableIterator(server.db->keys, hashslot, HASHTABLE_ITER_SAFE); - while (kvstoreHashtableIteratorNext(kvs_di, &next)) { - robj *valkey = next; - enterExecutionUnit(1, 0); - sds sdskey = objectGetKey(valkey); - robj *key = createStringObject(sdskey, sdslen(sdskey)); - dbDelete(&server.db[0], key); - propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del); - signalModifiedKey(NULL, &server.db[0], key); - /* The keys are not actually logically deleted from the database, just moved to another node. - * The modules needs to know that these keys are no longer available locally, so just send the - * keyspace notification to the modules, but not to clients. */ - moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id); - exitExecutionUnit(); - postExecutionUnitOperations(); - decrRefCount(key); - j++; - server.dirty++; + for (int i = 0; i < server.dbnum; i++) { + kvstoreHashtableIterator *kvs_di = NULL; + void *next; + serverDb db = server.db[i]; + kvs_di = kvstoreGetHashtableIterator(db.keys, hashslot, HASHTABLE_ITER_SAFE); + while (kvstoreHashtableIteratorNext(kvs_di, &next)) { + robj *valkey = next; + enterExecutionUnit(1, 0); + sds sdskey = objectGetKey(valkey); + robj *key = createStringObject(sdskey, sdslen(sdskey)); + dbDelete(&db, key); + propagateDeletion(&db, key, server.lazyfree_lazy_server_del); + signalModifiedKey(NULL, &db, key); + /* The keys are not actually logically deleted from the database, just moved to another node. + * The modules needs to know that these keys are no longer available locally, so just send the + * keyspace notification to the modules, but not to clients. */ + moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db.id); + exitExecutionUnit(); + postExecutionUnitOperations(); + decrRefCount(key); + j++; + server.dirty++; + } + kvstoreReleaseHashtableIterator(kvs_di); } - kvstoreReleaseHashtableIterator(kvs_di); - server.server_del_keys_in_slot = 0; serverAssert(server.execution_nesting == 0); return j; @@ -6881,6 +6878,15 @@ void clusterCommandSetSlot(client *c) { addReply(c, shared.ok); } +int dbHasNoKeys(void) { + for (int i = 0; i < server.dbnum; i++) { + if (kvstoreSize(server.db[i].keys) != 0) { + return 0; + } + } + return 1; +} + int clusterCommandSpecial(client *c) { if (!strcasecmp(c->argv[1]->ptr, "meet") && (c->argc == 4 || c->argc == 5)) { /* CLUSTER MEET [cport] */ @@ -6912,7 +6918,7 @@ int clusterCommandSpecial(client *c) { } } else if (!strcasecmp(c->argv[1]->ptr, "flushslots") && c->argc == 2) { /* CLUSTER FLUSHSLOTS */ - if (kvstoreSize(server.db[0].keys) != 0) { + if (!dbHasNoKeys()) { addReplyError(c, "DB must be empty to perform CLUSTER FLUSHSLOTS."); return 1; } @@ -7053,7 +7059,7 @@ int clusterCommandSpecial(client *c) { /* If the instance is currently a primary, it should have no assigned * slots nor keys to accept to replicate some other node. * Replicas can switch to another primary without issues. */ - if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || kvstoreSize(server.db[0].keys) != 0)) { + if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || !dbHasNoKeys())) { addReplyError(c, "To set a master the node must be empty and " "without assigned slots."); return 1; @@ -7187,7 +7193,7 @@ int clusterCommandSpecial(client *c) { /* Replicas can be reset while containing data, but not primary nodes * that must be empty. */ - if (clusterNodeIsPrimary(myself) && kvstoreSize(c->db->keys) != 0) { + if (clusterNodeIsPrimary(myself) && !dbHasNoKeys()) { addReplyError(c, "CLUSTER RESET can't be called with " "master nodes containing keys"); return 1; diff --git a/src/config.c b/src/config.c index d099e8e8b9..7af0a3b5d6 100644 --- a/src/config.c +++ b/src/config.c @@ -607,13 +607,6 @@ void loadServerConfigFromString(char *config) { goto loaderr; } - /* in case cluster mode is enabled dbnum must be 1 */ - if (server.cluster_enabled && server.dbnum > 1) { - serverLog(LL_WARNING, "WARNING: Changing databases number from %d to 1 since we are in cluster mode", - server.dbnum); - server.dbnum = 1; - } - /* To ensure backward compatibility and work while hz is out of range */ if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ; if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ; diff --git a/src/db.c b/src/db.c index f2a000030b..980958f9a3 100644 --- a/src/db.c +++ b/src/db.c @@ -860,10 +860,6 @@ void selectCommand(client *c) { if (getIntFromObjectOrReply(c, c->argv[1], &id, NULL) != C_OK) return; - if (server.cluster_enabled && id != 0) { - addReplyError(c, "SELECT is not allowed in cluster mode"); - return; - } if (selectDb(c, id) == C_ERR) { addReplyError(c, "DB index is out of range"); } else { @@ -1429,11 +1425,6 @@ void moveCommand(client *c) { int srcid, dbid; long long expire; - if (server.cluster_enabled) { - addReplyError(c, "MOVE is not allowed in cluster mode"); - return; - } - /* Obtain source and target DB pointers */ src = c->db; srcid = c->db->id; @@ -1518,11 +1509,6 @@ void copyCommand(client *c) { } } - if ((server.cluster_enabled == 1) && (srcid != 0 || dbid != 0)) { - addReplyError(c, "Copying to another database is not allowed in cluster mode"); - return; - } - /* If the user select the same DB as * the source DB and using newkey as the same key * it is probably an error. */ @@ -1728,12 +1714,6 @@ void swapMainDbWithTempDb(serverDb *tempDb) { void swapdbCommand(client *c) { int id1, id2; - /* Not allowed in cluster mode: we have just DB 0 there. */ - if (server.cluster_enabled) { - addReplyError(c, "SWAPDB is not allowed in cluster mode"); - return; - } - /* Get the two DBs indexes. */ if (getIntFromObjectOrReply(c, c->argv[1], &id1, "invalid first DB index") != C_OK) return; diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 1924203ae7..1c64740bcc 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -707,7 +707,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { * buffer with the SELECT command, that will be discarded the first * time the replies are received, so if the client is reused the * SELECT command will not be used again. */ - if (config.conn_info.input_dbnum != 0 && !is_cluster_client) { + if (config.conn_info.input_dbnum) { c->obuf = sdscatprintf(c->obuf, "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", (int)sdslen(config.input_dbnumstr), config.input_dbnumstr); c->prefix_pending++; diff --git a/src/valkey-cli.c b/src/valkey-cli.c index d2fa537036..f00ab00895 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -1553,11 +1553,11 @@ static int cliAuth(redisContext *ctx, char *user, char *auth) { } /* Send SELECT input_dbnum to the server */ -static int cliSelect(void) { +static int cliSelect(struct config *config, redisContext *ctx) { redisReply *reply; - if (config.conn_info.input_dbnum == config.dbnum) return REDIS_OK; + if (config->conn_info.input_dbnum == config->dbnum) return REDIS_OK; - reply = redisCommand(context, "SELECT %d", config.conn_info.input_dbnum); + reply = redisCommand(ctx, "SELECT %d", config->conn_info.input_dbnum); if (reply == NULL) { fprintf(stderr, "\nI/O error\n"); return REDIS_ERR; @@ -1566,9 +1566,9 @@ static int cliSelect(void) { int result = REDIS_OK; if (reply->type == REDIS_REPLY_ERROR) { result = REDIS_ERR; - fprintf(stderr, "SELECT %d failed: %s\n", config.conn_info.input_dbnum, reply->str); + fprintf(stderr, "SELECT %d failed: %s\n", config->conn_info.input_dbnum, reply->str); } else { - config.dbnum = config.conn_info.input_dbnum; + config->dbnum = config->conn_info.input_dbnum; cliRefreshPrompt(); } freeReplyObject(reply); @@ -1667,7 +1667,7 @@ static int cliConnect(int flags) { /* Do AUTH, select the right DB, switch to RESP3 if needed. */ if (cliAuth(context, config.conn_info.user, config.conn_info.auth) != REDIS_OK) return REDIS_ERR; - if (cliSelect() != REDIS_OK) return REDIS_ERR; + if (cliSelect(&config, context) != REDIS_OK) return REDIS_ERR; if (cliSwitchProto() != REDIS_OK) return REDIS_ERR; } @@ -2448,7 +2448,7 @@ static int cliSendCommand(int argc, char **argv, long repeat) { config.conn_info.input_dbnum = config.dbnum = atoi(argv[1]); cliRefreshPrompt(); } else if (!strcasecmp(command, "auth") && (argc == 2 || argc == 3)) { - cliSelect(); + cliSelect(&config, context); } else if (!strcasecmp(command, "multi") && argc == 1 && config.last_cmd_type != REDIS_REPLY_ERROR) { config.in_multi = 1; config.pre_multi_dbnum = config.dbnum; @@ -4789,8 +4789,10 @@ static redisReply *clusterManagerMigrateKeysInReply(clusterManagerNode *source, argv_len = zcalloc(argc * sizeof(size_t)); char portstr[255]; char timeoutstr[255]; + char dbnum[255]; snprintf(portstr, 10, "%d", target->port); snprintf(timeoutstr, 10, "%d", timeout); + snprintf(dbnum, 10, "%d", config.dbnum); argv[0] = "MIGRATE"; argv_len[0] = 7; argv[1] = target->ip; @@ -4799,8 +4801,8 @@ static redisReply *clusterManagerMigrateKeysInReply(clusterManagerNode *source, argv_len[2] = strlen(portstr); argv[3] = ""; argv_len[3] = 0; - argv[4] = "0"; - argv_len[4] = 1; + argv[4] = dbnum; + argv_len[4] = strlen(dbnum); argv[5] = timeoutstr; argv_len[5] = strlen(timeoutstr); if (replace) { @@ -4852,6 +4854,8 @@ static redisReply *clusterManagerMigrateKeysInReply(clusterManagerNode *source, return migrate_reply; } +static int getDatabases(redisContext *ctx); + /* Migrate all keys in the given slot from source to target.*/ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, clusterManagerNode *target, @@ -4863,7 +4867,19 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, int success = 1; int do_fix = config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_FIX; int do_replace = config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_REPLACE; + + int dbnum = getDatabases(source->context); + int orig_db = config.conn_info.input_dbnum; + config.conn_info.input_dbnum = 0; + while (1) { + if (config.conn_info.input_dbnum == dbnum) { + break; + } + if (cliSelect(&config, source->context) == REDIS_ERR) { + success = 0; + goto next; + } char *dots = NULL; redisReply *reply = NULL, *migrate_reply = NULL; reply = CLUSTER_MANAGER_COMMAND(source, @@ -4871,7 +4887,9 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, "GETKEYSINSLOT %d %d", slot, pipeline); success = (reply != NULL); - if (!success) return 0; + if (!success) { + goto next; + } if (reply->type == REDIS_REPLY_ERROR) { success = 0; if (err != NULL) { @@ -4885,7 +4903,9 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, size_t count = reply->elements; if (count == 0) { freeReplyObject(reply); - break; + reply = NULL; + config.conn_info.input_dbnum++; + continue; } if (verbose) dots = zmalloc((count + 1) * sizeof(char)); /* Calling MIGRATE command. */ @@ -5009,8 +5029,13 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, if (reply != NULL) freeReplyObject(reply); if (migrate_reply != NULL) freeReplyObject(migrate_reply); if (dots) zfree(dots); + reply = NULL; + migrate_reply = NULL; + dots = NULL; if (!success) break; } + config.conn_info.input_dbnum = orig_db; + cliSelect(&config, source->context); return success; } @@ -8651,11 +8676,11 @@ static int getDbSize(void) { return size; } -static int getDatabases(void) { +static int getDatabases(redisContext *ctx) { redisReply *reply; int dbnum; - reply = redisCommand(context, "CONFIG GET databases"); + reply = redisCommand(ctx, "CONFIG GET databases"); if (reply == NULL) { fprintf(stderr, "\nI/O error\n"); @@ -9158,7 +9183,7 @@ void bytesToHuman(char *s, size_t size, long long n) { static void statMode(void) { redisReply *reply; long aux, requests = 0; - int dbnum = getDatabases(); + int dbnum = getDatabases(context); int i = 0; while (1) { diff --git a/tests/cluster/tests/05-cluster-multidatabases.tcl b/tests/cluster/tests/05-cluster-multidatabases.tcl new file mode 100644 index 0000000000..b22bd79edf --- /dev/null +++ b/tests/cluster/tests/05-cluster-multidatabases.tcl @@ -0,0 +1,462 @@ +# Tests multi-databases in cluster mode + +proc pause {{message "Hit Enter to continue ==> "}} { + puts -nonewline $message + flush stdout + gets stdin +} + +source "../tests/includes/init-tests.tcl" + + +proc get_my_replica {cluster_nodes} { + set my_node_id "" + + # Split the string into lines + set lines [split $cluster_nodes "\n"] + + # Find "myself" node ID + foreach line $lines { + if {[string match {*myself,master*} $line]} { + set my_node_id [lindex $line 0] + break + } + } + + # Find the slave of "myself" + if {$my_node_id != ""} { + foreach line $lines { + if {[string match {*slave*} $line]} { + if {[lindex $line 3] == $my_node_id} { + set slave_info [split [lindex $line 1] ":@"] + set ip [lindex $slave_info 0] + set port [lindex $slave_info 1] + return [valkey_client_by_addr $ip $port] + + } + } + } + } + assert_failed "No replica found!" +} + + +test "Create a 5 nodes cluster with replicas, all slots allocated to one node" { + cluster_allocate_slaves 5 5 + + # Allocate all slots in one shard to allow easier testing with key-based commands + cluster_allocate_with_continuous_slots 1 + # pause "\nPress any key: "; +} + +test "Cluster is up" { + assert_cluster_state ok +} + +test "Key-based commands can be used on multiple databases" { + set primary_id 0 + # Switching database from 0 to 9 + R $primary_id select 9 + # Writing a key to database 9 + set key "{1}key1" + set val "hello" + R $primary_id set $key $val + assert_equal [R $primary_id get $key] $val + R $primary_id flushdb +} + +test "Key-based commands across multiple databases" { + set primary_id 0 + set keys_per_db 100 + + # Set keys in all DBs + for {set db 0} {$db < 16} {incr db} { + R $primary_id select $db + for {set i 0} {$i < $keys_per_db} {incr i} { + set key "{$db}key$i" + set val "hello_db${db}_key$i" + R $primary_id set $key $val + assert_equal [R $primary_id get $key] $val + } + assert_equal [R $primary_id dbsize] $keys_per_db + } + + # Verify all values + for {set db 0} {$db < 16} {incr db} { + R $primary_id select $db + assert_equal [R $primary_id dbsize] $keys_per_db + for {set i 0} {$i < $keys_per_db} {incr i} { + set key "{$db}key$i" + set expected_val "hello_db${db}_key$i" + assert_equal [R $primary_id get $key] $expected_val + } + } + + # Delete all keys and verify deletion + for {set db 0} {$db < 16} {incr db} { + R $primary_id select $db + for {set i 0} {$i < $keys_per_db} {incr i} { + set key "{$db}key$i" + R $primary_id del $key + } + assert_equal [R $primary_id dbsize] 0 + } + +} + +test "Validate slot statistics using cluster countkeysinslot and cluster getkeysinslot" { + set primary_id 0 + set keys_per_db 100 + + # Set keys in all DBs with same hash slot + for {set db 0} {$db < 16} {incr db} { + R $primary_id select $db + for {set i 0} {$i < $keys_per_db} {incr i} { + set key "{x}key$i" + set val "hello_db${db}_{x}key$i" + R $primary_id set $key $val + assert_equal [R $primary_id get $key] $val + } + } + + # Get a random key to determine the slot, which will be identical to all keys + set random_key [R $primary_id randomkey] + assert_not_equal $random_key "" + + set slot [R $primary_id cluster keyslot $random_key] + assert_not_equal $slot "" + + # Validate slot key distribution in each database + for {set db 0} {$db < 16} {incr db} { + R $primary_id select $db + set keys_in_slot [R $primary_id cluster countkeysinslot $slot] + # Since all keys are mapped to a single slot, the number of keys in the currently selected db should match $keys_per_db + assert_equal $keys_in_slot $keys_per_db + } + + # Verify key retrieval by slot for each database + for {set db 0} {$db < 16} {incr db} { + R $primary_id select $db + set slot_keys [R $primary_id cluster getkeysinslot $slot $keys_per_db] + assert_equal [llength $slot_keys] $keys_per_db + foreach key $slot_keys { + if {![regexp {^\{x\}key\d+$} $key]} { + error "Key format mismatch: $key" + } + set expected_val "hello_db${db}_$key" + assert_equal [R $primary_id get $key] $expected_val + } + } + + # Delete all keys and verify slot emptiness + for {set db 0} {$db < 16} {incr db} { + R $primary_id select $db + for {set i 0} {$i < $keys_per_db} {incr i} { + set key "{x}key$i" + R $primary_id del $key + } + assert_equal [R $primary_id dbsize] 0 + } + + # Ensure the slot is empty + set remaining_keys [R $primary_id cluster countkeysinslot $slot] + assert_equal $remaining_keys 0 +} + + + +test "Replication: Write to multiple databases and verify replica" { + set primary_id 0 + + set replica [get_my_replica [R $primary_id cluster nodes]] + + $replica READONLY + + + set keys_per_db 50 + + # Set keys in all DBs on the primary node + for {set db 0} {$db < 16} {incr db} { + R $primary_id select $db + for {set i 0} {$i < $keys_per_db} {incr i} { + set key "key$i" + set val "primary_db${db}_key$i" + R $primary_id set $key $val + assert_equal [R $primary_id get $key] $val + } + assert_equal [R $primary_id dbsize] $keys_per_db + } + + + # Wait for replication to catch up + after 500 + + # Verify data exists in the replica + for {set db 0} {$db < 16} {incr db} { + $replica select $db + assert_equal [$replica dbsize] $keys_per_db + for {set i 0} {$i < $keys_per_db} {incr i} { + set key "key$i" + set expected_val "primary_db${db}_key$i" + assert_equal [$replica get $key] $expected_val + } + } + + # Delete all keys on the primary node + for {set db 0} {$db < 16} {incr db} { + R $primary_id select $db + for {set i 0} {$i < $keys_per_db} {incr i} { + set key "key$i" + R $primary_id del $key + } + assert_equal [R $primary_id dbsize] 0 + } + + # Wait for deletion to replicate + after 500 + + # Ensure replica is also empty + for {set db 0} {$db < 16} {incr db} { + $replica select $db + assert_equal [$replica dbsize] 0 + } +} + + +test "Replication: Swap and Flush Databases" { + set primary_id 0 + set replica [get_my_replica [R $primary_id cluster nodes]] + + $replica READONLY + + # Create two databases and add keys + R $primary_id select 1 + R $primary_id set key1 "value1_db1" + R $primary_id set key2 "value2_db1" + + R $primary_id select 2 + R $primary_id set key1 "value1_db2" + R $primary_id set key2 "value2_db2" + + # Wait for replication to catch up + after 500 + + # Verify keys exist in replica + $replica select 1 + assert_equal [$replica get key1] "value1_db1" + assert_equal [$replica get key2] "value2_db1" + + $replica select 2 + assert_equal [$replica get key1] "value1_db2" + assert_equal [$replica get key2] "value2_db2" + + # Swap databases on primary + R $primary_id swapdb 1 2 + + # Wait for replication to catch up + after 500 + + # Verify swap is reflected in replica + $replica select 1 + assert_equal [$replica get key1] "value1_db2" + assert_equal [$replica get key2] "value2_db2" + + $replica select 2 + assert_equal [$replica get key1] "value1_db1" + assert_equal [$replica get key2] "value2_db1" + + # Flush database on primary + R $primary_id select 1 + R $primary_id flushdb + + R $primary_id select 2 + R $primary_id flushdb + + # Wait for replication to catch up + after 500 + + # Ensure databases are empty in replica + $replica select 1 + assert_equal [$replica dbsize] 0 + + $replica select 2 + assert_equal [$replica dbsize] 0 +} + +test "Cross-DB Expiry Handling" { + set primary_id 0 + set replica [get_my_replica [R $primary_id cluster nodes]] + $replica READONLY + + set key "key1" + $replica select 1 + R $primary_id select 1 + R $primary_id set $key "value1" + after 500 + + assert_equal [$replica exists $key] 1 + + R $primary_id expire $key 1 + + after 1500 + assert_equal [R $primary_id exists $key] 0 + assert_equal [$replica exists $key] 0 + + + R $primary_id flushall +} + + +test "Slot Migration With Multiple Databases" { + set primary_id_src 0 + set primary_id_src_nodeid [R $primary_id_src CLUSTER MYID] + set primary_id_target 1 + set primary_id_target_port [RPort $primary_id_target] + set primary_id_target_nodeid [R $primary_id_target CLUSTER MYID] + + R $primary_id_src select 1 + R $primary_id_src set "{x}key1" "value1_db1" + assert_equal [R $primary_id_src get "{x}key1"] "value1_db1" + + R $primary_id_src select 2 + R $primary_id_src set "{x}key2" "value2_db2" + assert_equal [R $primary_id_src get "{x}key2"] "value2_db2" + + set slot [R $primary_id_src cluster keyslot "{x}key1"] + + + R $primary_id_target cluster setslot $slot importing $primary_id_src_nodeid + R $primary_id_src cluster setslot $slot migrating $primary_id_target_nodeid + + R $primary_id_src select 1 + R $primary_id_src migrate 127.0.0.1 $primary_id_target_port "{x}key1" 1 5000 + + # If not all keys were migrated, the slot can not be migrated + set result [catch {assert_error [R $primary_id_src cluster setslot $slot node $primary_id_target_nodeid]} err] + assert_match "ERR*" $err + + R $primary_id_src select 2 + R $primary_id_src migrate 127.0.0.1 $primary_id_target_port "{x}key2" 2 5000 + + R $primary_id_target cluster setslot $slot node $primary_id_target_nodeid + R $primary_id_src cluster setslot $slot node $primary_id_target_nodeid + + R $primary_id_target select 1 + assert_equal [R $primary_id_target get "{x}key1"] "value1_db1" + R $primary_id_target select 2 + assert_equal [R $primary_id_target get "{x}key2"] "value2_db2" + + R $primary_id_src flushall + R $primary_id_target flushall +} + + +test "Persistence across restart with multiple databases" { + set primary_id 0 + set keys_per_db 100 + + # Set keys in all DBs + for {set db 0} {$db < 16} {incr db} { + R $primary_id select $db + for {set i 0} {$i < $keys_per_db} {incr i} { + set key "${db}key$i" + set val "value_db${db}_key$i" + R $primary_id set $key $val + assert_equal [R $primary_id get $key] $val + } + assert_equal [R $primary_id dbsize] $keys_per_db + } + + # Run BGSAVE to save the RDB + R $primary_id save + + # Restart instance + kill_instance valkey $primary_id + restart_instance valkey $primary_id + + assert_cluster_state ok + + # Verify keys after restart + for {set db 0} {$db < 16} {incr db} { + R $primary_id select $db + assert_equal [R $primary_id dbsize] $keys_per_db + for {set i 0} {$i < $keys_per_db} {incr i} { + set key "${db}key$i" + set expected_val "value_db${db}_key$i" + assert_equal [R $primary_id get $key] $expected_val + } + } + R $primary_id_src flushall + R $primary_id_target flushall +} + +test "Copy key to other database" { + set primary_id 0 + + set key "{xyz}key" + set key_copy "{xyz}key_copy" + + # Set key "xyz" in database 0 with a test value + R $primary_id select 0 + R $primary_id set $key "test_value" + + # Use the COPY command to copy key "xyz" to a new key "xyz_copy" in database 15 + R $primary_id copy $key $key_copy DB 15 + + # Verify that the copied key exists in database 15 with the correct value + R $primary_id select 15 + assert_equal [R $primary_id get $key_copy] "test_value" + + # Optionally, verify that the original key still exists in database 0 + R $primary_id select 0 + assert_equal [R $primary_id get $key] "test_value" + + R $primary_id flushall +} + +test "Move key to other database" { + set primary_id 0 + + set key "{xyz}key1" + + + # Set key "xyz" in database 0 with a test value + R $primary_id select 0 + R $primary_id set $key "test_value" + + R $primary_id move $key 15 + + # Verify that the copied key exists in database 15 with the correct value + R $primary_id select 15 + assert_equal [R $primary_id get $key] "test_value" + + R $primary_id flushall +} + +test "Flushslot with multiple databases" { + set primary_id 0 + # Add a key in each of the first 4 databases (db0 to db3) + for {set db 0} {$db < 4} {incr db} { + R $primary_id select $db + R $primary_id set "key${db}" "value${db}" + } + + # Attempt to run CLUSTER FLUSHSLOTS and expect it to fail + assert_error "ERR DB must be empty to perform CLUSTER FLUSHSLOTS." {R $primary_id CLUSTER FLUSHSLOTS} + + # Flush database 0 and try again; it should still fail since keys remain in db1, db2, and db3 + R $primary_id select 0 + R $primary_id flushdb + + # Attempt to run CLUSTER FLUSHSLOTS and expect it to fail + assert_error "ERR DB must be empty to perform CLUSTER FLUSHSLOTS." {R $primary_id CLUSTER FLUSHSLOTS} + + for {set db 0} {$db < 4} {incr db} { + R $primary_id select $db + R $primary_id flushdb + } + + # FLUSHSLOTS should not fail now + R $primary_id CLUSTER FLUSHSLOTS + +} \ No newline at end of file diff --git a/tests/support/cluster.tcl b/tests/support/cluster.tcl index e9a5395be5..a3fc665921 100644 --- a/tests/support/cluster.tcl +++ b/tests/support/cluster.tcl @@ -196,7 +196,7 @@ proc ::valkey_cluster::__method__masternode_notfor_slot {id slot} { error "Slot $slot is everywhere" } -proc ::valkey_cluster::__dispatch__ {id method args} { +proc ::valkey_cluster::__dispatch__ {id method args} { if {[info command ::valkey_cluster::__method__$method] eq {}} { # Get the keys from the command. set keys [::valkey_cluster::get_keys_from_command $method $args] diff --git a/tests/unit/cluster/cli.tcl b/tests/unit/cluster/cli.tcl index 3f1f0e9ffa..a50c6da76c 100644 --- a/tests/unit/cluster/cli.tcl +++ b/tests/unit/cluster/cli.tcl @@ -434,6 +434,105 @@ start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cl } ;# foreach ip_or_localhost +start_multiple_servers 3 [list overrides $base_conf] { + + set node1 [srv 0 client] + set node2 [srv -1 client] + set node3 [srv -2 client] + set node3_pid [srv -2 pid] + set node3_rd [valkey_deferring_client -2] + + test {Create 3 node cluster} { + exec src/valkey-cli --cluster-yes --cluster create \ + 127.0.0.1:[srv 0 port] \ + 127.0.0.1:[srv -1 port] \ + 127.0.0.1:[srv -2 port] + + wait_for_condition 1000 50 { + [CI 0 cluster_state] eq {ok} && + [CI 1 cluster_state] eq {ok} && + [CI 2 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + } + + + test "Multi-database fill slot 0" { + # keys {3560}* mapped to slot 0 + # Iterate over databases 0, 1, 2, and 3. + for {set db 0} {$db < 4} {incr db} { + # Select the database on node1. + $node1 SELECT $db + + # Insert 100 keys into the current database. + for {set i 0} {$i < 100} {incr i} { + # Each key uses the hash tag {3560} to ensure it maps to slot 0. + set key "{3560}_db${db}_$i" + set value "value_db${db}_$i" + $node1 SET $key $value + } + + # Verify the key count in slot 0. + set count [$node1 CLUSTER COUNTKEYSINSLOT 0] + if { $count != 100 } { + fail "For DB $db, expected 100 keys in slot 0, got $count" + } + } + } + + test "Perform a Multi-database Resharding" { + # 4 batches to migrate 100 keys + for {set i 0} {$i < 4} {incr i} { + exec src/valkey-cli --cluster-yes --cluster reshard 127.0.0.1:[srv 0 port] \ + --cluster-to [$node3 cluster myid] \ + --cluster-from [$node1 cluster myid] \ + --cluster-pipeline 25 \ + --cluster-slots 1 + } + } + + + test "Verify multi-database slot migrate" { + + # For each database, verify that node3 now holds all 100 keys in slot 0 with correct contents. + for {set db 0} {$db < 4} {incr db} { + # Select the current database on node3. + $node3 SELECT $db + + + # First, verify the key count in slot 0 on node 3 + set count [$node3 CLUSTER COUNTKEYSINSLOT 0] + if { $count != 100 } { + bp 1 + fail "For DB $db, expected 100 keys in slot 0 on node3, got $count" + } + + # First, verify the key count in slot 0 on node 1 + set count [$node1 CLUSTER COUNTKEYSINSLOT 0] + if { $count != 0 } { + bp 1 + fail "For DB $db, expected 100 keys in slot 0 on node3, got $count" + } + + + # Now verify that each key has the expected value. + for {set i 0} {$i < 100} {incr i} { + # Construct the key with hash tag {3560} which we used we filling the db + set key "{3560}_db${db}_$i" + # The expected value is based on the original test. + set expected "value_db${db}_$i" + # Retrieve the actual value stored on node3. + set actual [$node3 GET $key] + + if { $actual ne $expected } { + fail "For DB $db, key $key: expected '$expected', got '$actual'" + } + } + } + } +} + } ;# tags set ::singledb $old_singledb diff --git a/tests/unit/lazyfree.tcl b/tests/unit/lazyfree.tcl index 17f460003a..d00fcb6228 100644 --- a/tests/unit/lazyfree.tcl +++ b/tests/unit/lazyfree.tcl @@ -1,5 +1,12 @@ start_server {tags {"lazyfree"}} { test "UNLINK can reclaim memory in background" { + + # The test framework invokes "flushall", replacing kvstores even if empty. + # Old ones are lazily freed by BIO, causing transient memory fluctuations. + # In cluster mode with multi-DB, this affects this test reliability. + # A short delay is needed as wait_lazyfree_done only counts keys. + after 100 + set orig_mem [s used_memory] set args {} for {set i 0} {$i < 100000} {incr i} { diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 8f6e5e8dd3..7517114b8a 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -460,7 +460,7 @@ run_solo {defrag} { puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" } - assert_morethan [s allocator_frag_ratio] 1.35 + assert_morethan [s allocator_frag_ratio] 1.33 catch {r config set activedefrag yes} e if {[r config get activedefrag] eq "activedefrag yes"} {