From b7f9bbd207f70fd9e5282e3ca334f34383be3365 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Fri, 19 Jan 2024 19:09:13 -0800 Subject: [PATCH] rework Signed-off-by: Yury-Fridlyand --- .../src/main/java/glide/api/BaseClient.java | 23 ++- .../main/java/glide/api/ClusterClient.java | 68 ++++---- .../src/main/java/glide/api/RedisClient.java | 39 +++-- .../java/glide/api/commands/BaseCommands.java | 33 ++-- .../api/commands/ClusterBaseCommands.java | 22 +++ .../java/glide/api/models/RedisValue.java | 35 ++-- .../glide/api/models/configuration/Route.java | 48 ++++++ .../api/models/configuration/Routes.java | 13 -- .../models/configuration/SimpleRoutes.java | 10 -- .../api/models/configuration/SlotIdRoute.java | 18 -- .../models/configuration/SlotKeyRoute.java | 15 -- .../api/models/configuration/SlotTypes.java | 11 -- .../connectors/handlers/ChannelHandler.java | 1 + .../glide/managers/ClusterCommandManager.java | 61 ------- .../java/glide/managers/CommandManager.java | 156 +++++++++++------- .../java/glide/models/RequestBuilder.java | 119 ------------- 16 files changed, 277 insertions(+), 395 deletions(-) create mode 100644 java/client/src/main/java/glide/api/commands/ClusterBaseCommands.java create mode 100644 java/client/src/main/java/glide/api/models/configuration/Route.java delete mode 100644 java/client/src/main/java/glide/api/models/configuration/Routes.java delete mode 100644 java/client/src/main/java/glide/api/models/configuration/SimpleRoutes.java delete mode 100644 java/client/src/main/java/glide/api/models/configuration/SlotIdRoute.java delete mode 100644 java/client/src/main/java/glide/api/models/configuration/SlotKeyRoute.java delete mode 100644 java/client/src/main/java/glide/api/models/configuration/SlotTypes.java delete mode 100644 java/client/src/main/java/glide/managers/ClusterCommandManager.java delete mode 100644 java/client/src/main/java/glide/models/RequestBuilder.java diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 321c94c970e..003c1286e82 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -1,10 +1,13 @@ package glide.api; +import glide.ffi.resolvers.RedisValueResolver; +import glide.managers.BaseCommandResponseResolver; import glide.managers.CommandManager; import glide.managers.ConnectionManager; import java.util.concurrent.ExecutionException; +import java.util.function.Function; import lombok.AllArgsConstructor; -import lombok.Getter; +import response.ResponseOuterClass.Response; /** * Base Client class for Redis @@ -14,8 +17,22 @@ @AllArgsConstructor public abstract class BaseClient implements AutoCloseable { - protected ConnectionManager connectionManager; - protected CommandManager commandManager; + protected final ConnectionManager connectionManager; + protected final CommandManager commandManager; + protected final Function dataConverter; + + /** + * Extracts the response from the Protobuf response and either throws an exception or returns the + * appropriate response has an Object + * + * @param response Redis protobuf message + * @return Response Object + */ + protected T handleObjectResponse(Response response) { + // convert protobuf response into Object and then Object into T + return dataConverter.apply( + new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer).apply(response)); + } /** * Closes this resource, relinquishing any underlying resources. This method is invoked diff --git a/java/client/src/main/java/glide/api/ClusterClient.java b/java/client/src/main/java/glide/api/ClusterClient.java index 59aa1ea3bed..8b386281589 100644 --- a/java/client/src/main/java/glide/api/ClusterClient.java +++ b/java/client/src/main/java/glide/api/ClusterClient.java @@ -1,12 +1,16 @@ package glide.api; import static glide.api.RedisClient.buildChannelHandler; +import static glide.api.RedisClient.buildCommandManager; import static glide.api.RedisClient.buildConnectionManager; +import glide.api.commands.ClusterBaseCommands; +import glide.api.commands.Command; import glide.api.models.RedisValue; import glide.api.models.configuration.RedisClusterClientConfiguration; +import glide.api.models.configuration.Route; import glide.connectors.handlers.ChannelHandler; -import glide.managers.ClusterCommandManager; +import glide.managers.CommandManager; import glide.managers.ConnectionManager; import java.util.concurrent.CompletableFuture; @@ -14,37 +18,41 @@ * Async (non-blocking) client for Redis in Cluster mode. Use {@link #CreateClient} to request a * client to Redis. */ -public class ClusterClient extends BaseClient { +public class ClusterClient extends BaseClient + implements ClusterBaseCommands { - protected ClusterClient( - ConnectionManager connectionManager, ClusterCommandManager commandManager) { - super(connectionManager, commandManager); - } + protected ClusterClient(ConnectionManager connectionManager, CommandManager commandManager) { + super(connectionManager, commandManager, RedisValue::of); + } - /** - * Request an async (non-blocking) Redis client in Cluster mode. - * - * @param config - Redis Client Configuration - * @return a Future to connect and return a RedisClient - */ - public static CompletableFuture CreateClient( - RedisClusterClientConfiguration config) { - ChannelHandler channelHandler = buildChannelHandler(); - ConnectionManager connectionManager = buildConnectionManager(channelHandler); - ClusterCommandManager commandManager = buildClusterCommandManager(channelHandler); - // TODO: Support exception throwing, including interrupted exceptions - return connectionManager - .connectToRedis(config) - .thenApplyAsync(ignored -> new ClusterClient(connectionManager, commandManager)); - } + /** + * Async request for an async (non-blocking) Redis client in Cluster mode. + * + * @param config Redis cluster client Configuration + * @return a Future to connect and return a ClusterClient + */ + public static CompletableFuture CreateClient( + RedisClusterClientConfiguration config) { + ChannelHandler channelHandler = buildChannelHandler(); + ConnectionManager connectionManager = buildConnectionManager(channelHandler); + CommandManager commandManager = buildCommandManager(channelHandler); + // TODO: Support exception throwing, including interrupted exceptions + return connectionManager + .connectToRedis(config) + .thenApply(ignored -> new ClusterClient(connectionManager, commandManager)); + } - protected static ClusterCommandManager buildClusterCommandManager( - ChannelHandler channelHandler) { - return new ClusterCommandManager<>(channelHandler, RedisValue::of); - } + @Override + public CompletableFuture customCommand(String[] args) { + Command command = + Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build(); + return commandManager.submitNewCommand(command, this::handleObjectResponse); + } - @Override - public ClusterCommandManager getCommandManager() { - return (ClusterCommandManager) commandManager; - } + @Override + public CompletableFuture customCommand(String[] args, Route route) { + Command command = + Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build(); + return commandManager.submitNewCommand(command, this::handleObjectResponse, route); + } } diff --git a/java/client/src/main/java/glide/api/RedisClient.java b/java/client/src/main/java/glide/api/RedisClient.java index 3e3c86cceb4..adaa4d82837 100644 --- a/java/client/src/main/java/glide/api/RedisClient.java +++ b/java/client/src/main/java/glide/api/RedisClient.java @@ -15,12 +15,16 @@ * Async (non-blocking) client for Redis in Standalone mode. Use {@link #CreateClient} to request a * client to Redis. */ -public class RedisClient extends BaseClient implements BaseCommands { +public class RedisClient extends BaseClient implements BaseCommands { + + protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) { + super(connectionManager, commandManager, obj -> obj); + } /** - * Request an async (non-blocking) Redis client in Standalone mode. + * Async request for an async (non-blocking) Redis client in Standalone mode. * - * @param config - Redis Client Configuration + * @param config Redis client Configuration * @return a Future to connect and return a RedisClient */ public static CompletableFuture CreateClient(RedisClientConfiguration config) { @@ -30,31 +34,26 @@ public static CompletableFuture CreateClient(RedisClientConfigurati // TODO: Support exception throwing, including interrupted exceptions return connectionManager .connectToRedis(config) - .thenApply(ignore -> new RedisClient(connectionManager, commandManager)); + .thenApply(ignored -> new RedisClient(connectionManager, commandManager)); } - protected static CommandManager buildCommandManager(ChannelHandler channelHandler) { - return new CommandManager(channelHandler); + protected static ChannelHandler buildChannelHandler() { + CallbackDispatcher callbackDispatcher = new CallbackDispatcher(); + return new ChannelHandler(callbackDispatcher, getSocket()); } - protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) { - super(connectionManager, commandManager); + protected static ConnectionManager buildConnectionManager(ChannelHandler channelHandler) { + return new ConnectionManager(channelHandler); } - /** - * Executes a single custom command, without checking inputs. Every part of the command, including - * subcommands, should be added as a separate value in args. - * - * @param args command and arguments for the custom command call - * @return CompletableFuture with the response - */ + protected static CommandManager buildCommandManager(ChannelHandler channelHandler) { + return new CommandManager(channelHandler); + } + + @Override public CompletableFuture customCommand(String[] args) { Command command = Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build(); - return commandManager.submitNewCommand(command, BaseCommands::handleObjectResponse); + return commandManager.submitNewCommand(command, this::handleObjectResponse); } - - protected static CommandManager buildCommandManager2(ChannelHandler channelHandler) { - return new CommandManager<>(channelHandler, Object::toString); - } } diff --git a/java/client/src/main/java/glide/api/commands/BaseCommands.java b/java/client/src/main/java/glide/api/commands/BaseCommands.java index bcb9871d5c8..0c382e51748 100644 --- a/java/client/src/main/java/glide/api/commands/BaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/BaseCommands.java @@ -1,31 +1,20 @@ package glide.api.commands; -import glide.ffi.resolvers.RedisValueResolver; -import glide.managers.BaseCommandResponseResolver; import java.util.concurrent.CompletableFuture; -import response.ResponseOuterClass.Response; -/** Base Commands interface to handle generic command and transaction requests. */ -public interface BaseCommands { +/** + * Base Commands interface to handle generic command and transaction requests. + * + * @param The data return type. + */ +public interface BaseCommands { /** - * Extracts the response from the Protobuf response and either throws an exception or returns the - * appropriate response has an Object + * Executes a single custom command, without checking inputs. Every part of the command, including + * subcommands, should be added as a separate value in args. * - * @param response Redis protobuf message - * @return Response Object + * @param args command and arguments for the custom command call + * @return CompletableFuture with the response */ - static Object handleObjectResponse(Response response) { - // return function to convert protobuf.Response into the response object by - // calling valueFromPointer - return (new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer)).apply(response); - } - - /** - * Execute a @see{Command} by sending command via socket manager - * - * @param args arguments for the custom command - * @return a CompletableFuture with response result from Redis - */ - CompletableFuture customCommand(String[] args); + CompletableFuture customCommand(String[] args); } diff --git a/java/client/src/main/java/glide/api/commands/ClusterBaseCommands.java b/java/client/src/main/java/glide/api/commands/ClusterBaseCommands.java new file mode 100644 index 00000000000..9645b769e4b --- /dev/null +++ b/java/client/src/main/java/glide/api/commands/ClusterBaseCommands.java @@ -0,0 +1,22 @@ +package glide.api.commands; + +import glide.api.models.configuration.Route; +import java.util.concurrent.CompletableFuture; + +/** + * Base Commands interface to handle generic command and transaction requests with routing options. + * + * @param The data return type. + */ +public interface ClusterBaseCommands extends BaseCommands { + + /** + * Executes a single custom command, without checking inputs. Every part of the command, including + * subcommands, should be added as a separate value in args. + * + * @param args command and arguments for the custom command call + * @param route node routing configuration for the command + * @return CompletableFuture with the response + */ + CompletableFuture customCommand(String[] args, Route route); +} diff --git a/java/client/src/main/java/glide/api/models/RedisValue.java b/java/client/src/main/java/glide/api/models/RedisValue.java index bdb99e00a22..d254884f4b8 100644 --- a/java/client/src/main/java/glide/api/models/RedisValue.java +++ b/java/client/src/main/java/glide/api/models/RedisValue.java @@ -6,24 +6,27 @@ /** A union-like type which can store single or bulk value retrieved from Redis. */ @Getter public class RedisValue { - private Map bulkValue = null; - private String singleValue = null; + /** Get per-node value. */ + private Map multiValue = null; - private RedisValue() {} + /** Get the single value. */ + private Object singleValue = null; - @SuppressWarnings("unchecked") - public static RedisValue of(Object data) { - var res = new RedisValue(); - if (data instanceof Map) { - res.bulkValue = (Map) data; - } else { - res.singleValue = data.toString(); + private RedisValue() {} + + @SuppressWarnings("unchecked") + public static RedisValue of(Object data) { + var res = new RedisValue(); + if (data instanceof Map) { + res.multiValue = (Map) data; + } else { + res.singleValue = data; + } + return res; } - return res; - } - /** Get the value type. Use it prior to accessing the data. */ - public boolean hasBulkData() { - return bulkValue != null; - } + /** Get the value type. Use it prior to accessing the data. */ + public boolean hasMultiData() { + return multiValue != null; + } } diff --git a/java/client/src/main/java/glide/api/models/configuration/Route.java b/java/client/src/main/java/glide/api/models/configuration/Route.java new file mode 100644 index 00000000000..fc25248a1e3 --- /dev/null +++ b/java/client/src/main/java/glide/api/models/configuration/Route.java @@ -0,0 +1,48 @@ +package glide.api.models.configuration; + +import lombok.Builder; +import lombok.Getter; + +/** Request routing configuration. */ +@Builder +@Getter +public class Route { + + public enum RouteType { + /** Route request to all nodes. */ + AllNodes, + /** Route request to all primary nodes. */ + AllPrimaries, + /** Route request to a random node. */ + Random, + /** Route request to the primary node that contains the slot with the given id. */ + PrimarySlotId, + /** Route request to the replica node that contains the slot with the given id. */ + ReplicaSlotId, + /** Route request to the primary node that contains the slot that the given key matches. */ + PrimarySlotKey, + /** Route request to the replica node that contains the slot that the given key matches. */ + ReplicaSlotKey, + } + + /** + * Request routing configuration overrides the {@link ReadFrom} connection configuration.
+ * If {@link RouteType#ReplicaSlotId} or {@link RouteType#ReplicaSlotKey} is used, the request + * will be routed to a replica, even if the strategy is {@link ReadFrom#PRIMARY}. + */ + private final RouteType routeType; + + /** + * Slot number. There are 16384 slots in a redis cluster, and each shard manages a slot range. + * Unless the slot is known, it's better to route using {@link RouteType#PrimarySlotKey} or {@link + * RouteType#ReplicaSlotKey}.
+ * Could be used with {@link RouteType#PrimarySlotId} or {@link RouteType#ReplicaSlotId} only. + */ + private final int slotId; + + /** + * The request will be sent to nodes managing this key.
+ * Could be used with {@link RouteType#PrimarySlotKey} or {@link RouteType#ReplicaSlotKey} only. + */ + private final String slotKey; +} diff --git a/java/client/src/main/java/glide/api/models/configuration/Routes.java b/java/client/src/main/java/glide/api/models/configuration/Routes.java deleted file mode 100644 index 8636551f281..00000000000 --- a/java/client/src/main/java/glide/api/models/configuration/Routes.java +++ /dev/null @@ -1,13 +0,0 @@ -package glide.api.models.configuration; - -/** - * Request routing configuration, could be defined by one of the derived classes.
- * Supported implementations are: - * - *
    - *
  • {@link SimpleRoutes} - *
  • {@link SlotKeyRoute} - *
  • {@link SlotIdRoute} - *
- */ -public interface Routes {} diff --git a/java/client/src/main/java/glide/api/models/configuration/SimpleRoutes.java b/java/client/src/main/java/glide/api/models/configuration/SimpleRoutes.java deleted file mode 100644 index 408c7120ba9..00000000000 --- a/java/client/src/main/java/glide/api/models/configuration/SimpleRoutes.java +++ /dev/null @@ -1,10 +0,0 @@ -package glide.api.models.configuration; - -public enum SimpleRoutes implements Routes { - /** Route request to all nodes. */ - AllNodes, - /** Route request to all primary nodes. */ - AllPrimaries, - /** Route request to a random node. */ - Random, -} diff --git a/java/client/src/main/java/glide/api/models/configuration/SlotIdRoute.java b/java/client/src/main/java/glide/api/models/configuration/SlotIdRoute.java deleted file mode 100644 index 6990aaf5980..00000000000 --- a/java/client/src/main/java/glide/api/models/configuration/SlotIdRoute.java +++ /dev/null @@ -1,18 +0,0 @@ -package glide.api.models.configuration; - -import lombok.Builder; -import lombok.Getter; - -/** Route request to the node that contains the slot with the given id. */ -@Builder -@Getter -public class SlotIdRoute implements Routes { - /** Slot type. */ - private final SlotTypes slotTypes; - - /** - * Slot number. There are 16384 slots in a redis cluster, and each shard manages a slot range. - * Unless the slot is known, it's better to route using `SlotKeyTypes` - */ - private final int slotId; -} diff --git a/java/client/src/main/java/glide/api/models/configuration/SlotKeyRoute.java b/java/client/src/main/java/glide/api/models/configuration/SlotKeyRoute.java deleted file mode 100644 index 58760a4ed95..00000000000 --- a/java/client/src/main/java/glide/api/models/configuration/SlotKeyRoute.java +++ /dev/null @@ -1,15 +0,0 @@ -package glide.api.models.configuration; - -import lombok.Builder; -import lombok.Getter; - -/** Route request to the node that contains the slot that the given key matches. */ -@Builder -@Getter -public class SlotKeyRoute implements Routes { - /** Slot type. */ - private final SlotTypes slotTypes; - - /** The request will be sent to nodes managing this key. */ - private final String slotKey; -} diff --git a/java/client/src/main/java/glide/api/models/configuration/SlotTypes.java b/java/client/src/main/java/glide/api/models/configuration/SlotTypes.java deleted file mode 100644 index c9eb4f0c566..00000000000 --- a/java/client/src/main/java/glide/api/models/configuration/SlotTypes.java +++ /dev/null @@ -1,11 +0,0 @@ -package glide.api.models.configuration; - -/** - * Request routing configuration overrides the `readFrom` configuration.
- * If {@link #Primary} is used the request will be routed to a replica, even if the strategy is - * {@link ReadFrom#PRIMARY}. - */ -public enum SlotTypes { - Primary, - Replica, -} diff --git a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java index f91bbed3db3..916715e1db7 100644 --- a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java @@ -58,6 +58,7 @@ public ChannelHandler( .channel(domainSocketChannelClass) .handler(channelInitializer) .connect(domainSocketAddress) + .syncUninterruptibly() // TODO call here .sync() if needed or remove this comment .channel(); this.callbackDispatcher = callbackDispatcher; diff --git a/java/client/src/main/java/glide/managers/ClusterCommandManager.java b/java/client/src/main/java/glide/managers/ClusterCommandManager.java deleted file mode 100644 index 4f91989bd1d..00000000000 --- a/java/client/src/main/java/glide/managers/ClusterCommandManager.java +++ /dev/null @@ -1,61 +0,0 @@ -package glide.managers; - -import glide.api.ClusterClient; -import glide.api.models.RedisValue; -import glide.api.models.configuration.Routes; -import glide.connectors.handlers.ChannelHandler; -import glide.models.RequestBuilder; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; -import redis_request.RedisRequestOuterClass.RequestType; - -/** - * {@link CommandManager} extension for {@link ClusterClient}. - * - * @param The data return type - only {@link RedisValue} is supported. - */ -public class ClusterCommandManager extends CommandManager { - public ClusterCommandManager(ChannelHandler channel, Function dataConverter) { - super(channel, dataConverter); - } - - /** - * Async (non-blocking) get.
- * See REDIS docs for GET. - * - * @param key The key name - * @param routes Request routing configuration - */ - public CompletableFuture get(String key, Routes routes) { - return submitNewRequest(RequestType.GetString, List.of(key), routes); - } - - /** - * Async (non-blocking) set.
- * See REDIS docs for SET. - * - * @param key The key name - * @param value The value to set - * @param routes Request routing configuration - */ - public CompletableFuture set(String key, String value, Routes routes) { - return submitNewRequest(RequestType.SetString, List.of(key, value), routes); - } - - /** - * Build a command and submit it Netty to send. - * - * @param command Command type - * @param args Command arguments - * @param routes Request routing configuration - * @return A result promise - */ - private CompletableFuture submitNewRequest( - RequestType command, List args, Routes routes) { - return channel - .write(RequestBuilder.prepareRedisRequest(command, args, routes), true) - .thenApplyAsync(this::extractValueFromGlideRsResponse) - .thenApplyAsync(dataConverter); - } -} diff --git a/java/client/src/main/java/glide/managers/CommandManager.java b/java/client/src/main/java/glide/managers/CommandManager.java index 1cda8c21553..73f45353210 100644 --- a/java/client/src/main/java/glide/managers/CommandManager.java +++ b/java/client/src/main/java/glide/managers/CommandManager.java @@ -1,22 +1,23 @@ package glide.managers; import glide.api.commands.Command; -import glide.api.models.configuration.Routes; +import glide.api.commands.Command.RequestType; +import glide.api.models.configuration.Route; +import glide.connectors.handlers.CallbackDispatcher; import glide.connectors.handlers.ChannelHandler; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; import lombok.RequiredArgsConstructor; import redis_request.RedisRequestOuterClass; +import redis_request.RedisRequestOuterClass.Command.ArgsArray; +import redis_request.RedisRequestOuterClass.RedisRequest; import response.ResponseOuterClass.Response; /** * Service responsible for submitting command requests to a socket channel handler and unpack * responses from the same socket channel handler. - * - * @param The data return type - only {@link String} is supported. */ @RequiredArgsConstructor -public class CommandManager { +public class CommandManager { /** UDS connection representation. */ private final ChannelHandler channel; @@ -24,8 +25,8 @@ public class CommandManager { /** * Build a command and send. * - * @param command - * @param responseHandler - to handle the response object + * @param command The command to execute + * @param responseHandler The handler for the response object * @return A result promise of type T */ public CompletableFuture submitNewCommand( @@ -34,76 +35,117 @@ public CompletableFuture submitNewCommand( // when complete, convert the response to our expected type T using the given responseHandler return channel .write(prepareRedisRequest(command.getRequestType(), command.getArguments()), true) - .thenApplyAsync(response -> responseHandler.apply(response)); + .thenApplyAsync(responseHandler::apply); + } + + /** + * Build a command and send. + * + * @param command The command to execute + * @param responseHandler The handler for the response object + * @param route The routing options + * @return A result promise of type T + */ + public CompletableFuture submitNewCommand( + Command command, RedisExceptionCheckedFunction responseHandler, Route route) { + // write command request to channel + // when complete, convert the response to our expected type T using the given responseHandler + return channel + .write(prepareRedisRequest(command.getRequestType(), command.getArguments(), route), true) + .thenApplyAsync(responseHandler::apply); } /** * Build a protobuf command/transaction request object.
* Used by {@link CommandManager}. * - * @return An uncompleted request. CallbackDispatcher is responsible to complete it by adding a - * callback id. + * @return An uncompleted request. {@link CallbackDispatcher} is responsible to complete it by + * adding a callback id. */ - private RedisRequestOuterClass.RedisRequest.Builder prepareRedisRequest( - Command.RequestType command, String[] args) { - RedisRequestOuterClass.Command.ArgsArray.Builder commandArgs = - RedisRequestOuterClass.Command.ArgsArray.newBuilder(); + private RedisRequest.Builder prepareRedisRequest(RequestType command, String[] args) { + ArgsArray.Builder commandArgs = ArgsArray.newBuilder(); for (var arg : args) { commandArgs.addArgs(arg); } - return RedisRequestOuterClass.RedisRequest.newBuilder() + return RedisRequest.newBuilder() .setSingleCommand( RedisRequestOuterClass.Command.newBuilder() .setRequestType(mapRequestTypes(command)) .setArgsArray(commandArgs.build()) - .build()) - .setRoute( - RedisRequestOuterClass.Routes.newBuilder() - .setSimpleRoutes(RedisRequestOuterClass.SimpleRoutes.AllNodes) .build()); } - private RedisRequestOuterClass.RequestType mapRequestTypes(Command.RequestType inType) { - switch (inType) { - case CUSTOM_COMMAND: - return RedisRequestOuterClass.RequestType.CustomCommand; - } - throw new RuntimeException("Unsupported request type"); - } - - // TODO rework after rebasing on top of https://github.com/Bit-Quill/glide-for-redis/pull/58 - protected final Function dataConverter; + /** + * Build a protobuf command/transaction request object with routing options.
+ * Used by {@link CommandManager}. + * + * @return An uncompleted request. {@link CallbackDispatcher} is responsible to complete it by + * adding a callback id. + */ + private RedisRequest.Builder prepareRedisRequest( + RequestType command, String[] args, Route route) { + RedisRequest.Builder builder = prepareRedisRequest(command, args); + switch (route.getRouteType()) { + case Random: + case AllNodes: + case AllPrimaries: + builder.setRoute( + RedisRequestOuterClass.Routes.newBuilder() + .setSimpleRoutes(getSimpleRoutes(route.getRouteType())) + .build()); + break; + case PrimarySlotKey: + case ReplicaSlotKey: + builder.setRoute( + RedisRequestOuterClass.Routes.newBuilder() + .setSlotKeyRoute( + RedisRequestOuterClass.SlotKeyRoute.newBuilder() + .setSlotKey(route.getSlotKey()) + .setSlotType(getSlotTypes(route.getRouteType())))); + break; + case PrimarySlotId: + case ReplicaSlotId: + builder.setRoute( + RedisRequestOuterClass.Routes.newBuilder() + .setSlotIdRoute( + RedisRequestOuterClass.SlotIdRoute.newBuilder() + .setSlotId(route.getSlotId()) + .setSlotType(getSlotTypes(route.getRouteType())))); + } + return builder; + } + private RedisRequestOuterClass.RequestType mapRequestTypes(RequestType inType) { + switch (inType) { + case CUSTOM_COMMAND: + return RedisRequestOuterClass.RequestType.CustomCommand; + } + throw new RuntimeException("Unsupported request type"); + } - /** - * Build a command and submit it Netty to send. - * - * @param command Command type - * @param args Command arguments - * @return A result promise - */ - protected CompletableFuture submitNewRequest(RequestType command, List args) { - return channel - .write(RequestBuilder.prepareRedisRequest(command, args), true) - .thenApplyAsync(this::extractValueFromGlideRsResponse) - .thenApplyAsync(dataConverter); - } + private RedisRequestOuterClass.SimpleRoutes getSimpleRoutes(Route.RouteType routeType) { + switch (routeType) { + case Random: + return RedisRequestOuterClass.SimpleRoutes.Random; + case AllNodes: + return RedisRequestOuterClass.SimpleRoutes.AllNodes; + case AllPrimaries: + return RedisRequestOuterClass.SimpleRoutes.AllPrimaries; + } + throw new IllegalStateException("Unreachable code"); + } - /** - * Build a command and submit it Netty to send. - * - * @param command Command type - * @param args Command arguments - * @param routes Request routing configuration - * @return A result promise - */ - private CompletableFuture submitNewRequest( - RequestType command, List args, Routes routes) { - return channel - .write(RequestBuilder.prepareRedisRequest(command, args, routes), true) - .thenApplyAsync(this::extractValueFromGlideRsResponse) - .thenApplyAsync(dataConverter); - } + private RedisRequestOuterClass.SlotTypes getSlotTypes(Route.RouteType routeType) { + switch (routeType) { + case PrimarySlotId: + case PrimarySlotKey: + return RedisRequestOuterClass.SlotTypes.Primary; + case ReplicaSlotId: + case ReplicaSlotKey: + return RedisRequestOuterClass.SlotTypes.Replica; + } + throw new IllegalStateException("Unreachable code"); + } } diff --git a/java/client/src/main/java/glide/models/RequestBuilder.java b/java/client/src/main/java/glide/models/RequestBuilder.java deleted file mode 100644 index 3c5471c14d5..00000000000 --- a/java/client/src/main/java/glide/models/RequestBuilder.java +++ /dev/null @@ -1,119 +0,0 @@ -package glide.models; - -import connection_request.ConnectionRequestOuterClass.ConnectionRequest; -import connection_request.ConnectionRequestOuterClass.NodeAddress; -import connection_request.ConnectionRequestOuterClass.ReadFrom; -import connection_request.ConnectionRequestOuterClass.TlsMode; -import glide.api.models.configuration.Routes; -import glide.api.models.configuration.SimpleRoutes; -import glide.api.models.configuration.SlotIdRoute; -import glide.api.models.configuration.SlotKeyRoute; -import glide.api.models.configuration.SlotTypes; -import glide.connectors.handlers.CallbackDispatcher; -import glide.managers.CommandManager; -import glide.managers.ConnectionManager; -import java.util.List; -import redis_request.RedisRequestOuterClass; -import redis_request.RedisRequestOuterClass.Command; -import redis_request.RedisRequestOuterClass.Command.ArgsArray; -import redis_request.RedisRequestOuterClass.RedisRequest; -import redis_request.RedisRequestOuterClass.RequestType; - -public class RequestBuilder { - - /** - * Build a protobuf connection request.
- * Used by {@link ConnectionManager#connectToRedis}. - */ - // TODO support more parameters and/or configuration object - public static ConnectionRequest createConnectionRequest( - String host, int port, boolean useSsl, boolean clusterMode) { - return ConnectionRequest.newBuilder() - .addAddresses(NodeAddress.newBuilder().setHost(host).setPort(port).build()) - .setTlsMode(useSsl ? TlsMode.SecureTls : TlsMode.NoTls) - .setClusterModeEnabled(clusterMode) - .setReadFrom(ReadFrom.Primary) - .setDatabaseId(0) - .build(); - } - - /** - * Build a protobuf command/transaction request draft.
- * Used by {@link CommandManager}. - * - * @return An uncompleted request. {@link CallbackDispatcher} is responsible to complete it by - * adding a callback id. - */ - public static RedisRequest.Builder prepareRedisRequest(RequestType command, List args) { - var commandArgs = ArgsArray.newBuilder(); - for (var arg : args) { - commandArgs.addArgs(arg); - } - - return RedisRequest.newBuilder() - .setSingleCommand( - Command.newBuilder().setRequestType(command).setArgsArray(commandArgs.build()).build()); - } - - /** - * Build a protobuf command/transaction request draft.
- * Used by {@link CommandManager}. - * - * @return An uncompleted request. {@link CallbackDispatcher} is responsible to complete it by - * adding a callback id. - */ - public static RedisRequest.Builder prepareRedisRequest( - RequestType command, List args, Routes routes) { - var builder = prepareRedisRequest(command, args); - - if (routes instanceof SimpleRoutes) { - builder.setRoute( - RedisRequestOuterClass.Routes.newBuilder() - .setSimpleRoutes(simpleRoutesToProtoc((SimpleRoutes) routes)) - .build()); - } else if (routes instanceof SlotIdRoute) { - var slotId = (SlotIdRoute) routes; - builder.setRoute( - RedisRequestOuterClass.Routes.newBuilder() - .setSlotIdRoute( - RedisRequestOuterClass.SlotIdRoute.newBuilder() - .setSlotId(slotId.getSlotId()) - .setSlotType(slotTypesToProtoc(slotId.getSlotTypes())))); - } else if (routes instanceof SlotKeyRoute) { - var slotKey = (SlotKeyRoute) routes; - builder.setRoute( - RedisRequestOuterClass.Routes.newBuilder() - .setSlotKeyRoute( - RedisRequestOuterClass.SlotKeyRoute.newBuilder() - .setSlotKey(slotKey.getSlotKey()) - .setSlotType(slotTypesToProtoc(slotKey.getSlotTypes())))); - } else { - throw new IllegalArgumentException( - "Unknown type of routes: " + routes.getClass().getSimpleName()); - } - return builder; - } - - private static RedisRequestOuterClass.SimpleRoutes simpleRoutesToProtoc( - SimpleRoutes simpleRoutes) { - switch (simpleRoutes) { - case Random: - return RedisRequestOuterClass.SimpleRoutes.Random; - case AllNodes: - return RedisRequestOuterClass.SimpleRoutes.AllNodes; - case AllPrimaries: - return RedisRequestOuterClass.SimpleRoutes.AllPrimaries; - } - throw new IllegalStateException("Unreachable code"); - } - - private static RedisRequestOuterClass.SlotTypes slotTypesToProtoc(SlotTypes slotTypes) { - switch (slotTypes) { - case Primary: - return RedisRequestOuterClass.SlotTypes.Primary; - case Replica: - return RedisRequestOuterClass.SlotTypes.Replica; - } - throw new IllegalStateException("Unreachable code"); - } -}