Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce CLUSTER SLOT-STATS command (#20). #351

Merged
merged 11 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
2 changes: 2 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ int detectAndUpdateCachedNodeHealth(void);
client *createCachedResponseClient(void);
void deleteCachedResponseClient(client *recording_client);
void clearCachedClusterSlotsResponse(void);
unsigned int countKeysInSlot(unsigned int hashslot);
int getSlotOrReply(client *c, robj *o);

/* functions with shared implementations */
int clusterNodeIsMyself(clusterNode *n);
Expand Down
173 changes: 173 additions & 0 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "server.h"
#include "cluster.h"

#define UNASSIGNED_SLOT 0

typedef enum {
KEY_COUNT,
INVALID,
} slotStatTypes;

/* -----------------------------------------------------------------------------
* CLUSTER SLOT-STATS command
* -------------------------------------------------------------------------- */

typedef struct {
int slot;
uint64_t stat;
} slotStatEntry;

static int doesSlotBelongToMyShard(int slot) {
clusterNode *myself = getMyClusterNode();
clusterNode *master = clusterNodeGetPrimary(myself);

return clusterNodeCoversSlot(master, slot);
}

static void markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_slot, int end_slot, int *len) {
for (int slot = start_slot; slot <= end_slot; slot++) {
if (doesSlotBelongToMyShard(slot)) {
assigned_slots[slot]++;
(*len)++;
}
}
}

static uint64_t getSlotStat(int slot, int stat_type) {
serverAssert(stat_type != INVALID);
uint64_t slot_stat = 0;
if (stat_type == KEY_COUNT) {
slot_stat = countKeysInSlot(slot);
}
return slot_stat;
}

static int slotStatEntryAscCmp(const void *a, const void *b) {
slotStatEntry entry_a = *((slotStatEntry *) a);
slotStatEntry entry_b = *((slotStatEntry *) b);
return entry_a.stat - entry_b.stat;
}

static int slotStatEntryDescCmp(const void *a, const void *b) {
slotStatEntry entry_a = *((slotStatEntry *) a);
slotStatEntry entry_b = *((slotStatEntry *) b);
return entry_b.stat - entry_a.stat;
}

static void collectAndSortSlotStats(slotStatEntry slot_stats[], int order_by, int desc) {
int i = 0;

for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
if (doesSlotBelongToMyShard(slot)) {
slot_stats[i].slot = slot;
slot_stats[i].stat = getSlotStat(slot, order_by);
i++;
}
}
qsort(slot_stats, i, sizeof(slotStatEntry), (desc) ? slotStatEntryDescCmp : slotStatEntryAscCmp);
}

static void addReplySlotStat(client *c, int slot) {
addReplyLongLong(c, slot);
addReplyMapLen(c, 1);
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));
}

static void addReplySlotStats(client *c, unsigned char *assigned_slots, int startslot, int endslot, int len) {
addReplyMapLen(c, len);

for (int slot = startslot; slot <= endslot; slot++) {
if (assigned_slots[slot]) addReplySlotStat(c, slot);
}
}

static void addReplySortedSlotStats(client *c, slotStatEntry slot_stats[], long limit) {
int num_slots_assigned = getMyShardSlotCount();
int len = min(limit, num_slots_assigned);
addReplyMapLen(c, len);

for (int i = 0; i < len; i++) {
addReplySlotStat(c, slot_stats[i].slot);
}
}

static void sortAndAddReplySlotStats(client *c, int order_by, long limit, int desc) {
slotStatEntry slot_stats[CLUSTER_SLOTS];
collectAndSortSlotStats(slot_stats, order_by, desc);
addReplySortedSlotStats(c, slot_stats, limit);
}

void clusterSlotStatsCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c, "This instance has cluster support disabled");
return;
}

/* Parse additional arguments. */
if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) {
/* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */
int startslot, endslot;
if ((startslot = getSlotOrReply(c,c->argv[3])) == C_ERR ||
(endslot = getSlotOrReply(c,c->argv[4])) == C_ERR) {
return;
}
if (startslot > endslot) {
addReplyErrorFormat(c, "Start slot number %d is greater than end slot number %d", startslot, endslot);
return;
}
/* Initialize slot assignment array. */
unsigned char assigned_slots[CLUSTER_SLOTS]= {UNASSIGNED_SLOT};
int len = 0;
markSlotsAssignedToMyShard(assigned_slots, startslot, endslot, &len);
addReplySlotStats(c, assigned_slots, startslot, endslot, len);

} else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) {
/* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */
int desc = 1, order_by = INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = KEY_COUNT;
} else {
addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported metrics are: key-count.");
return;
}
int i = 4; /* Next argument index, following ORDERBY */
int limit_counter = 0, asc_desc_counter = 0;
long limit;
while(i < c->argc) {
int moreargs = c->argc > i+1;
if (!strcasecmp(c->argv[i]->ptr, "limit") && moreargs) {
if (getRangeLongFromObjectOrReply(
c, c->argv[i+1], 1, CLUSTER_SLOTS, &limit,
"Limit has to lie in between 1 and 16384 (maximum number of slots).") != C_OK)
return;
i++;
limit_counter++;
} else if (!strcasecmp(c->argv[i]->ptr, "asc")) {
desc = 0;
asc_desc_counter++;
} else if (!strcasecmp(c->argv[i]->ptr, "desc")) {
desc = 1;
asc_desc_counter++;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
if (limit_counter > 1 || asc_desc_counter > 1) {
addReplyError(c, "Multiple filters of the same type are disallowed.");
return;
}
i++;
}
sortAndAddReplySlotStats(c, order_by, limit, desc);

} else {
addReplySubcommandSyntaxError(c);
}
}
51 changes: 51 additions & 0 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,56 @@ struct COMMAND_ARG CLUSTER_SLAVES_Args[] = {
{MAKE_ARG("node-id",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/********** CLUSTER SLOT_STATS ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER SLOT_STATS history */
#define CLUSTER_SLOT_STATS_History NULL
#endif

#ifndef SKIP_CMD_TIPS_TABLE
/* CLUSTER SLOT_STATS tips */
const char *CLUSTER_SLOT_STATS_Tips[] = {
"nondeterministic_output",
"request_policy:all_shards",
};
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* CLUSTER SLOT_STATS key specs */
#define CLUSTER_SLOT_STATS_Keyspecs NULL
#endif

/* CLUSTER SLOT_STATS filter slotsrange argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_slotsrange_Subargs[] = {
{MAKE_ARG("start-slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("end-slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* CLUSTER SLOT_STATS filter orderby order argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_orderby_order_Subargs[] = {
{MAKE_ARG("asc",ARG_TYPE_PURE_TOKEN,-1,"ASC",NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("desc",ARG_TYPE_PURE_TOKEN,-1,"DESC",NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* CLUSTER SLOT_STATS filter orderby argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_orderby_Subargs[] = {
{MAKE_ARG("metric",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("limit",ARG_TYPE_INTEGER,-1,"LIMIT",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
{MAKE_ARG("order",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_orderby_order_Subargs},
};

/* CLUSTER SLOT_STATS filter argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_Subargs[] = {
{MAKE_ARG("slotsrange",ARG_TYPE_BLOCK,-1,"SLOTSRANGE",NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_slotsrange_Subargs},
{MAKE_ARG("orderby",ARG_TYPE_BLOCK,-1,"ORDERBY",NULL,NULL,CMD_ARG_NONE,3,NULL),.subargs=CLUSTER_SLOT_STATS_filter_orderby_Subargs},
};

/* CLUSTER SLOT_STATS argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_Args[] = {
{MAKE_ARG("filter",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_Subargs},
};

/********** CLUSTER SLOTS ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -981,6 +1031,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,1,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,3),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)},
{MAKE_CMD("slaves","Lists the replica nodes of a primary node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args},
{MAKE_CMD("slot-stats","Return array of slot usage statistics for slots assigned to the current node","O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.","8.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOT_STATS_History,0,CLUSTER_SLOT_STATS_Tips,2,clusterSlotStatsCommand,-4,CMD_STALE|CMD_LOADING,0,CLUSTER_SLOT_STATS_Keyspecs,0,NULL,1),.args=CLUSTER_SLOT_STATS_Args},
{MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)},
{0}
};
Expand Down
83 changes: 83 additions & 0 deletions src/commands/cluster-slot-stats.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
{
"SLOT-STATS": {
"summary": "Return array of slot usage statistics for slots assigned to the current node",
"complexity": "O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.",
"group": "cluster",
"since": "8.0.0",
"arity": -4,
"container": "CLUSTER",
"function": "clusterSlotStatsCommand",
"command_flags": [
"STALE",
"LOADING"
],
"command_tips": [
"NONDETERMINISTIC_OUTPUT",
"REQUEST_POLICY:ALL_SHARDS"
],
"reply_schema": {
"type": "object",
"description": "Map of slots and their respective usage statistics.",
"additionalProperties": {
"type": "string"
}
},
"arguments": [
{
"name": "filter",
"type": "oneof",
"arguments": [
{
"token": "SLOTSRANGE",
"name": "slotsrange",
"type": "block",
"arguments": [
{
"name": "start-slot",
"type": "integer"
},
{
"name": "end-slot",
"type": "integer"
}
]
},
{
"token": "ORDERBY",
"name": "orderby",
"type": "block",
"arguments": [
{
"name": "metric",
"type": "string"
},
{
"token": "LIMIT",
"name": "limit",
"type": "integer",
"optional": true
},
{
"name": "order",
"type": "oneof",
"optional": true,
"arguments": [
{
"name": "asc",
"type": "pure-token",
"token": "ASC"
},
{
"name": "desc",
"type": "pure-token",
"token": "DESC"
}
]
}
]
}
]
}
]
}
}
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3746,6 +3746,7 @@ void sunsubscribeCommand(client *c);
void watchCommand(client *c);
void unwatchCommand(client *c);
void clusterCommand(client *c);
void clusterSlotStatsCommand(client *c);
void restoreCommand(client *c);
void migrateCommand(client *c);
void askingCommand(client *c);
Expand Down
Loading
Loading