Skip to content

Commit

Permalink
Add CLUSTER SHARDS command support (#3598)
Browse files Browse the repository at this point in the history
* Add CLUSTER SHARDS command (#2984)

* Added new classes for CLUSTER SHARDS response

* Slots type changed to List<List<Long>> + style fixed

* Apply suggestions from code review

* reset redis before ClusterCommandTest added

* isSsl() added to ClusterShardNodeInfo

* Several assertion added to clusterShards test

---------

Co-authored-by: Khokhlov Aleksey <wormogig@gmail.com>
Co-authored-by: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 14, 2023
1 parent 701df24 commit 452ea52
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 5 deletions.
102 changes: 102 additions & 0 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,108 @@ public Map<String, CommandInfo> build(Object data) {
}
};

private static final Builder<List<List<Long>>> CLUSTER_SHARD_SLOTS_RANGES = new Builder<List<List<Long>>>() {

@Override
public List<List<Long>> build(Object data) {
if (null == data) {
return null;
}

List<Long> rawSlots = (List<Long>) data;
List<List<Long>> slotsRanges = new ArrayList<>();
for (int i = 0; i < rawSlots.size(); i += 2) {
slotsRanges.add(Arrays.asList(rawSlots.get(i), rawSlots.get(i + 1)));
}
return slotsRanges;
}
};

private static final Builder<List<ClusterShardNodeInfo>> CLUSTER_SHARD_NODE_INFO_LIST
= new Builder<List<ClusterShardNodeInfo>>() {

final Map<String, Builder> mappingFunctions = createDecoderMap();

private Map<String, Builder> createDecoderMap() {

Map<String, Builder> tempMappingFunctions = new HashMap<>();
tempMappingFunctions.put(ClusterShardNodeInfo.ID, STRING);
tempMappingFunctions.put(ClusterShardNodeInfo.ENDPOINT, STRING);
tempMappingFunctions.put(ClusterShardNodeInfo.IP, STRING);
tempMappingFunctions.put(ClusterShardNodeInfo.HOSTNAME, STRING);
tempMappingFunctions.put(ClusterShardNodeInfo.PORT, LONG);
tempMappingFunctions.put(ClusterShardNodeInfo.TLS_PORT, LONG);
tempMappingFunctions.put(ClusterShardNodeInfo.ROLE, STRING);
tempMappingFunctions.put(ClusterShardNodeInfo.REPLICATION_OFFSET, LONG);
tempMappingFunctions.put(ClusterShardNodeInfo.HEALTH, STRING);

return tempMappingFunctions;
}

@Override
@SuppressWarnings("unchecked")
public List<ClusterShardNodeInfo> build(Object data) {
if (null == data) {
return null;
}

List<ClusterShardNodeInfo> response = new ArrayList<>();

List<Object> clusterShardNodeInfos = (List<Object>) data;
for (Object clusterShardNodeInfoObject : clusterShardNodeInfos) {
List<Object> clusterShardNodeInfo = (List<Object>) clusterShardNodeInfoObject;
Iterator<Object> iterator = clusterShardNodeInfo.iterator();
response.add(new ClusterShardNodeInfo(createMapFromDecodingFunctions(iterator, mappingFunctions)));
}

return response;
}

@Override
public String toString() {
return "List<ClusterShardNodeInfo>";
}
};

public static final Builder<List<ClusterShardInfo>> CLUSTER_SHARD_INFO_LIST
= new Builder<List<ClusterShardInfo>>() {

final Map<String, Builder> mappingFunctions = createDecoderMap();

private Map<String, Builder> createDecoderMap() {

Map<String, Builder> tempMappingFunctions = new HashMap<>();
tempMappingFunctions.put(ClusterShardInfo.SLOTS, CLUSTER_SHARD_SLOTS_RANGES);
tempMappingFunctions.put(ClusterShardInfo.NODES, CLUSTER_SHARD_NODE_INFO_LIST);

return tempMappingFunctions;
}

@Override
@SuppressWarnings("unchecked")
public List<ClusterShardInfo> build(Object data) {
if (null == data) {
return null;
}

List<ClusterShardInfo> response = new ArrayList<>();

List<Object> clusterShardInfos = (List<Object>) data;
for (Object clusterShardInfoObject : clusterShardInfos) {
List<Object> clusterShardInfo = (List<Object>) clusterShardInfoObject;
Iterator<Object> iterator = clusterShardInfo.iterator();
response.add(new ClusterShardInfo(createMapFromDecodingFunctions(iterator, mappingFunctions)));
}

return response;
}

@Override
public String toString() {
return "List<ClusterShardInfo>";
}
};

public static final Builder<List<Module>> MODULE_LIST = new Builder<List<Module>>() {
@Override
public List<Module> build(Object data) {
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -6386,7 +6386,7 @@ public String srandmember(final String key) {
* @param key
* @param count if positive, return an array of distinct elements.
* If negative the behavior changes and the command is allowed to
* return the same element multiple times
* return the same element multiple times
* @return A list of randomly selected elements
*/
@Override
Expand Down Expand Up @@ -8758,7 +8758,7 @@ public long clusterKeySlot(final String key) {
public long clusterCountFailureReports(final String nodeId) {
checkIsInMultiOrPipeline();
connection.sendCommand(CLUSTER, "COUNT-FAILURE-REPORTS", nodeId);
return connection.getIntegerReply();
return connection.getIntegerReply();
}

@Override
Expand Down Expand Up @@ -8826,12 +8826,20 @@ public String clusterFailover(ClusterFailoverOption failoverOption) {
}

@Override
@Deprecated
public List<Object> clusterSlots() {
checkIsInMultiOrPipeline();
connection.sendCommand(CLUSTER, ClusterKeyword.SLOTS);
return connection.getObjectMultiBulkReply();
}

@Override
public List<ClusterShardInfo> clusterShards() {
checkIsInMultiOrPipeline();
connection.sendCommand(CLUSTER, ClusterKeyword.SHARDS);
return BuilderFactory.CLUSTER_SHARD_INFO_LIST.build(connection.getObjectMultiBulkReply());
}

@Override
public String clusterMyId() {
checkIsInMultiOrPipeline();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public static enum ClusterKeyword implements Rawable {
MEET, RESET, INFO, FAILOVER, SLOTS, NODES, REPLICAS, SLAVES, MYID, ADDSLOTS, DELSLOTS,
GETKEYSINSLOT, SETSLOT, NODE, MIGRATING, IMPORTING, STABLE, FORGET, FLUSHSLOTS, KEYSLOT,
COUNTKEYSINSLOT, SAVECONFIG, REPLICATE, LINKS, ADDSLOTSRANGE, DELSLOTSRANGE, BUMPEPOCH,
MYSHARDID;
MYSHARDID, SHARDS;

private final byte[] raw;

Expand Down
17 changes: 17 additions & 0 deletions src/main/java/redis/clients/jedis/commands/ClusterCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import redis.clients.jedis.args.ClusterResetType;
import redis.clients.jedis.args.ClusterFailoverOption;
import redis.clients.jedis.resps.ClusterShardInfo;

public interface ClusterCommands {

Expand Down Expand Up @@ -79,8 +80,24 @@ public interface ClusterCommands {

String clusterFailover(ClusterFailoverOption failoverOption);

/**
* {@code CLUSTER SLOTS} command is deprecated since Redis 7.
*
* @deprecated Use {@link ClusterCommands#clusterShards()}.
*/
@Deprecated
List<Object> clusterSlots();

/**
* {@code CLUSTER SHARDS} returns details about the shards of the cluster.
* This command replaces the {@code CLUSTER SLOTS} command from Redis 7,
* by providing a more efficient and extensible representation of the cluster.
*
* @return a list of shards, with each shard containing two objects, 'slots' and 'nodes'.
* @see <a href="https://redis.io/commands/cluster-shards/">CLUSTER SHARDS</a>
*/
List<ClusterShardInfo> clusterShards();

String clusterReset();

/**
Expand Down
44 changes: 44 additions & 0 deletions src/main/java/redis/clients/jedis/resps/ClusterShardInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package redis.clients.jedis.resps;

import java.util.List;
import java.util.Map;

/**
* This class holds information about a shard of the cluster with command {@code CLUSTER SHARDS}.
* They can be accessed via getters. There is also {@link ClusterShardInfo#getClusterShardInfo()}
* method that returns a generic {@link Map} in case more info are returned from the server.
*/
public class ClusterShardInfo {

public static final String SLOTS = "slots";
public static final String NODES = "nodes";

private final List<List<Long>> slots;
private final List<ClusterShardNodeInfo> nodes;

private final Map<String, Object> clusterShardInfo;

/**
* @param map contains key-value pairs with cluster shard info
*/
@SuppressWarnings("unchecked")
public ClusterShardInfo(Map<String, Object> map) {
slots = (List<List<Long>>) map.get(SLOTS);
nodes = (List<ClusterShardNodeInfo>) map.get(NODES);

clusterShardInfo = map;
}

public List<List<Long>> getSlots() {
return slots;
}

public List<ClusterShardNodeInfo> getNodes() {
return nodes;
}

public Map<String, Object> getClusterShardInfo() {
return clusterShardInfo;
}

}
94 changes: 94 additions & 0 deletions src/main/java/redis/clients/jedis/resps/ClusterShardNodeInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package redis.clients.jedis.resps;

import java.util.Map;

/**
* This class holds information about a node of the cluster with command {@code CLUSTER SHARDS}.
* They can be accessed via getters. There is also {@link ClusterShardNodeInfo#getClusterShardNodeInfo()}
* method that returns a generic {@link Map} in case more info are returned from the server.
*/
public class ClusterShardNodeInfo {

public static final String ID = "id";
public static final String ENDPOINT = "endpoint";
public static final String IP = "ip";
public static final String HOSTNAME = "hostname";
public static final String PORT = "port";
public static final String TLS_PORT = "tls-port";
public static final String ROLE = "role";
public static final String REPLICATION_OFFSET = "replication-offset";
public static final String HEALTH = "health";

private final String id;
private final String endpoint;
private final String ip;
private final String hostname;
private final Long port;
private final Long tlsPort;
private final String role;
private final Long replicationOffset;
private final String health;

private final Map<String, Object> clusterShardNodeInfo;

/**
* @param map contains key-value pairs with node info
*/
public ClusterShardNodeInfo(Map<String, Object> map) {
id = (String) map.get(ID);
endpoint = (String) map.get(ENDPOINT);
ip = (String) map.get(IP);
hostname = (String) map.get(HOSTNAME);
port = (Long) map.get(PORT);
tlsPort = (Long) map.get(TLS_PORT);
role = (String) map.get(ROLE);
replicationOffset = (Long) map.get(REPLICATION_OFFSET);
health = (String) map.get(HEALTH);

clusterShardNodeInfo = map;
}

public String getId() {
return id;
}

public String getEndpoint() {
return endpoint;
}

public String getIp() {
return ip;
}

public String getHostname() {
return hostname;
}

public Long getPort() {
return port;
}

public Long getTlsPort() {
return tlsPort;
}

public String getRole() {
return role;
}

public Long getReplicationOffset() {
return replicationOffset;
}

public String getHealth() {
return health;
}

public Map<String, Object> getClusterShardNodeInfo() {
return clusterShardNodeInfo;
}

public boolean isSsl() {
return tlsPort != null;
}
}
Loading

0 comments on commit 452ea52

Please sign in to comment.