Skip to content

Commit

Permalink
Introduce CLUSTER SLOT-STATS command (#20). (#351)
Browse files Browse the repository at this point in the history
The command provides detailed slot usage statistics upon invocation,
with initial support for key-count metric. cpu-usec (approved) and
memory-bytes (pending-approval) metrics will soon follow after the
merger of this PR.

---------

Signed-off-by: Kyle Kim <kimkyle@amazon.com>
Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
  • Loading branch information
kyle-yh-kim and madolson authored Jun 27, 2024
1 parent 7719dbb commit 1269532
Show file tree
Hide file tree
Showing 7 changed files with 662 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
2 changes: 2 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ int detectAndUpdateCachedNodeHealth(void);
client *createCachedResponseClient(void);
void deleteCachedResponseClient(client *recording_client);
void clearCachedClusterSlotsResponse(void);
unsigned int countKeysInSlot(unsigned int hashslot);
int getSlotOrReply(client *c, robj *o);

/* functions with shared implementations */
int clusterNodeIsMyself(clusterNode *n);
Expand Down
182 changes: 182 additions & 0 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "server.h"
#include "cluster.h"

#define UNASSIGNED_SLOT 0

typedef enum {
KEY_COUNT,
INVALID,
} slotStatTypes;

/* -----------------------------------------------------------------------------
* CLUSTER SLOT-STATS command
* -------------------------------------------------------------------------- */

/* Struct used to temporarily hold slot statistics for sorting. */
typedef struct {
int slot;
uint64_t stat;
} slotStatForSort;

static int doesSlotBelongToMyShard(int slot) {
clusterNode *myself = getMyClusterNode();
clusterNode *primary = clusterNodeGetPrimary(myself);

return clusterNodeCoversSlot(primary, slot);
}

static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_slot, int end_slot) {
int assigned_slots_count = 0;
for (int slot = start_slot; slot <= end_slot; slot++) {
if (doesSlotBelongToMyShard(slot)) {
assigned_slots[slot]++;
assigned_slots_count++;
}
}
return assigned_slots_count;
}

static uint64_t getSlotStat(int slot, int stat_type) {
serverAssert(stat_type != INVALID);
uint64_t slot_stat = 0;
if (stat_type == KEY_COUNT) {
slot_stat = countKeysInSlot(slot);
}
return slot_stat;
}

static int slotStatForSortAscCmp(const void *a, const void *b) {
slotStatForSort entry_a = *((slotStatForSort *)a);
slotStatForSort entry_b = *((slotStatForSort *)b);
return entry_a.stat - entry_b.stat;
}

static int slotStatForSortDescCmp(const void *a, const void *b) {
slotStatForSort entry_a = *((slotStatForSort *)a);
slotStatForSort entry_b = *((slotStatForSort *)b);
return entry_b.stat - entry_a.stat;
}

static void collectAndSortSlotStats(slotStatForSort slot_stats[], int order_by, int desc) {
int i = 0;

for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
if (doesSlotBelongToMyShard(slot)) {
slot_stats[i].slot = slot;
slot_stats[i].stat = getSlotStat(slot, order_by);
i++;
}
}
qsort(slot_stats, i, sizeof(slotStatForSort), (desc) ? slotStatForSortDescCmp : slotStatForSortAscCmp);
}

static void addReplySlotStat(client *c, int slot) {
addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot,
* and 1st index represents (map) usage statistics. */
addReplyLongLong(c, slot);
addReplyMapLen(c, 1); /* Nested map representing slot usage statistics. */
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));
}

/* Adds reply for the SLOTSRANGE variant.
* Response is ordered in ascending slot number. */
static void addReplySlotsRange(client *c, unsigned char *assigned_slots, int startslot, int endslot, int len) {
addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */

for (int slot = startslot; slot <= endslot; slot++) {
if (assigned_slots[slot]) addReplySlotStat(c, slot);
}
}

static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], long limit) {
int num_slots_assigned = getMyShardSlotCount();
int len = min(limit, num_slots_assigned);
addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */

for (int i = 0; i < len; i++) {
addReplySlotStat(c, slot_stats[i].slot);
}
}

/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
slotStatForSort slot_stats[CLUSTER_SLOTS];
collectAndSortSlotStats(slot_stats, order_by, desc);
addReplySortedSlotStats(c, slot_stats, limit);
}

void clusterSlotStatsCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c, "This instance has cluster support disabled");
return;
}

/* Parse additional arguments. */
if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) {
/* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */
int startslot, endslot;
if ((startslot = getSlotOrReply(c, c->argv[3])) == C_ERR ||
(endslot = getSlotOrReply(c, c->argv[4])) == C_ERR) {
return;
}
if (startslot > endslot) {
addReplyErrorFormat(c, "Start slot number %d is greater than end slot number %d", startslot, endslot);
return;
}
/* Initialize slot assignment array. */
unsigned char assigned_slots[CLUSTER_SLOTS] = {UNASSIGNED_SLOT};
int assigned_slots_count = markSlotsAssignedToMyShard(assigned_slots, startslot, endslot);
addReplySlotsRange(c, assigned_slots, startslot, endslot, assigned_slots_count);

} else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) {
/* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */
int desc = 1, order_by = INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = KEY_COUNT;
} else {
addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported metrics are: key-count.");
return;
}
int i = 4; /* Next argument index, following ORDERBY */
int limit_counter = 0, asc_desc_counter = 0;
long limit;
while (i < c->argc) {
int moreargs = c->argc > i + 1;
if (!strcasecmp(c->argv[i]->ptr, "limit") && moreargs) {
if (getRangeLongFromObjectOrReply(
c, c->argv[i + 1], 1, CLUSTER_SLOTS, &limit,
"Limit has to lie in between 1 and 16384 (maximum number of slots).") != C_OK) {
return;
}
i++;
limit_counter++;
} else if (!strcasecmp(c->argv[i]->ptr, "asc")) {
desc = 0;
asc_desc_counter++;
} else if (!strcasecmp(c->argv[i]->ptr, "desc")) {
desc = 1;
asc_desc_counter++;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
if (limit_counter > 1 || asc_desc_counter > 1) {
addReplyError(c, "Multiple filters of the same type are disallowed.");
return;
}
i++;
}
addReplyOrderBy(c, order_by, limit, desc);

} else {
addReplySubcommandSyntaxError(c);
}
}
51 changes: 51 additions & 0 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,56 @@ struct COMMAND_ARG CLUSTER_SLAVES_Args[] = {
{MAKE_ARG("node-id",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/********** CLUSTER SLOT_STATS ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER SLOT_STATS history */
#define CLUSTER_SLOT_STATS_History NULL
#endif

#ifndef SKIP_CMD_TIPS_TABLE
/* CLUSTER SLOT_STATS tips */
const char *CLUSTER_SLOT_STATS_Tips[] = {
"nondeterministic_output",
"request_policy:all_shards",
};
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* CLUSTER SLOT_STATS key specs */
#define CLUSTER_SLOT_STATS_Keyspecs NULL
#endif

/* CLUSTER SLOT_STATS filter slotsrange argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_slotsrange_Subargs[] = {
{MAKE_ARG("start-slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("end-slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* CLUSTER SLOT_STATS filter orderby order argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_orderby_order_Subargs[] = {
{MAKE_ARG("asc",ARG_TYPE_PURE_TOKEN,-1,"ASC",NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("desc",ARG_TYPE_PURE_TOKEN,-1,"DESC",NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* CLUSTER SLOT_STATS filter orderby argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_orderby_Subargs[] = {
{MAKE_ARG("metric",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("limit",ARG_TYPE_INTEGER,-1,"LIMIT",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
{MAKE_ARG("order",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_orderby_order_Subargs},
};

/* CLUSTER SLOT_STATS filter argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_Subargs[] = {
{MAKE_ARG("slotsrange",ARG_TYPE_BLOCK,-1,"SLOTSRANGE",NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_slotsrange_Subargs},
{MAKE_ARG("orderby",ARG_TYPE_BLOCK,-1,"ORDERBY",NULL,NULL,CMD_ARG_NONE,3,NULL),.subargs=CLUSTER_SLOT_STATS_filter_orderby_Subargs},
};

/* CLUSTER SLOT_STATS argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_Args[] = {
{MAKE_ARG("filter",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_Subargs},
};

/********** CLUSTER SLOTS ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -981,6 +1031,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,1,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,3),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)},
{MAKE_CMD("slaves","Lists the replica nodes of a primary node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args},
{MAKE_CMD("slot-stats","Return an array of slot usage statistics for slots assigned to the current node.","O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.","8.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOT_STATS_History,0,CLUSTER_SLOT_STATS_Tips,2,clusterSlotStatsCommand,-4,CMD_STALE|CMD_LOADING,0,CLUSTER_SLOT_STATS_Keyspecs,0,NULL,1),.args=CLUSTER_SLOT_STATS_Args},
{MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)},
{0}
};
Expand Down
102 changes: 102 additions & 0 deletions src/commands/cluster-slot-stats.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
{
"SLOT-STATS": {
"summary": "Return an array of slot usage statistics for slots assigned to the current node.",
"complexity": "O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.",
"group": "cluster",
"since": "8.0.0",
"arity": -4,
"container": "CLUSTER",
"function": "clusterSlotStatsCommand",
"command_flags": [
"STALE",
"LOADING"
],
"command_tips": [
"NONDETERMINISTIC_OUTPUT",
"REQUEST_POLICY:ALL_SHARDS"
],
"reply_schema": {
"type": "array",
"description": "Array of nested arrays, where the inner array element represents a slot and its respective usage statistics.",
"items": {
"type": "array",
"description": "Array of size 2, where 0th index represents (int) slot and 1st index represents (map) usage statistics.",
"minItems": 2,
"maxItems": 2,
"items": [
{
"description": "Slot Number.",
"type": "integer"
},
{
"type": "object",
"description": "Map of slot usage statistics.",
"additionalProperties": false,
"properties": {
"key-count": {
"type": "integer"
}
}
}
]
}
},
"arguments": [
{
"name": "filter",
"type": "oneof",
"arguments": [
{
"token": "SLOTSRANGE",
"name": "slotsrange",
"type": "block",
"arguments": [
{
"name": "start-slot",
"type": "integer"
},
{
"name": "end-slot",
"type": "integer"
}
]
},
{
"token": "ORDERBY",
"name": "orderby",
"type": "block",
"arguments": [
{
"name": "metric",
"type": "string"
},
{
"token": "LIMIT",
"name": "limit",
"type": "integer",
"optional": true
},
{
"name": "order",
"type": "oneof",
"optional": true,
"arguments": [
{
"name": "asc",
"type": "pure-token",
"token": "ASC"
},
{
"name": "desc",
"type": "pure-token",
"token": "DESC"
}
]
}
]
}
]
}
]
}
}
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3750,6 +3750,7 @@ void sunsubscribeCommand(client *c);
void watchCommand(client *c);
void unwatchCommand(client *c);
void clusterCommand(client *c);
void clusterSlotStatsCommand(client *c);
void restoreCommand(client *c);
void migrateCommand(client *c);
void askingCommand(client *c);
Expand Down
Loading

0 comments on commit 1269532

Please sign in to comment.