From bc9ecd6fc345fab57187aaa0445e1529eb5e8036 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Thu, 12 Sep 2024 13:55:30 +0200 Subject: [PATCH] Remove usage of Handler/AsyncResult idiom. --- .../redis/client/impl/RedisClusterClient.java | 16 +++---- .../client/impl/RedisClusterConnection.java | 42 +++++++++---------- .../redis/client/impl/RedisClusterImpl.java | 8 ++-- .../client/impl/RedisReplicationClient.java | 10 ++--- .../client/impl/RedisSentinelClient.java | 18 +++----- .../vertx/redis/client/impl/SharedSlots.java | 2 +- 6 files changed, 41 insertions(+), 55 deletions(-) diff --git a/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java b/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java index 8aab6c2f..05922ca0 100644 --- a/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java +++ b/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java @@ -15,11 +15,7 @@ */ package io.vertx.redis.client.impl; -import io.vertx.core.AsyncResult; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Promise; -import io.vertx.core.Vertx; +import io.vertx.core.*; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.core.net.NetClientOptions; @@ -154,7 +150,7 @@ public Future connect() { return promise.future(); } - private void connect(Slots slots, Handler> onConnected) { + private void connect(Slots slots, Completable onConnected) { // create a cluster connection final Set failures = ConcurrentHashMap.newKeySet(); final AtomicInteger counter = new AtomicInteger(); @@ -180,7 +176,7 @@ private void connect(Slots slots, Handler> onConnec } private void connectionComplete(AtomicInteger counter, Slots slots, Map connections, - Set failures, Handler> onConnected) { + Set failures, Completable onConnected) { if (counter.incrementAndGet() == slots.endpoints().length) { // end condition if (!failures.isEmpty()) { @@ -200,10 +196,10 @@ private void connectionComplete(AtomicInteger counter, Slots slots, Map splitRequest(CommandImpl cmd, List args) { return map; } - void send(String endpoint, int retries, Request command, Handler> handler) { + void send(String endpoint, int retries, Request command, Completable handler) { PooledRedisConnection connection = connections.get(endpoint); if (connection == null) { connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null) @@ -285,7 +281,7 @@ void send(String endpoint, int retries, Request command, Handler 0) { send(endpoint, retries - 1, command, handler); } else { - handler.handle(Future.failedFuture("Failed obtaining connection to: " + endpoint)); + handler.fail("Failed obtaining connection to: " + endpoint); } }); return; @@ -293,7 +289,7 @@ void send(String endpoint, int retries, Request command, Handler { + .onComplete((send -> { if (send.failed() && send.cause() instanceof ErrorType && retries >= 0) { final ErrorType cause = (ErrorType) send.cause(); @@ -308,7 +304,7 @@ void send(String endpoint, int retries, Request command, Handler { - if (resp.failed()) { - handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause)); + send(newEndpoint, retries - 1, Request.cmd(Command.ASKING), (resp, err) -> { + if (err != null) { + handler.fail("Failed ASKING: " + err + ", caused by " + cause); } else { send(newEndpoint, retries - 1, command, handler); } @@ -343,7 +339,7 @@ void send(String endpoint, int retries, Request command, Handler handler.handle(Future.failedFuture(err))) + .onFailure(err -> handler.fail(err)) .onSuccess(auth -> { // again send(endpoint, retries - 1, command, handler); @@ -353,11 +349,11 @@ void send(String endpoint, int retries, Request command, Handler> batch(List requests, Slots slots) { return promise.future(); } - private void batch(String endpoint, int retries, List commands, Handler>> handler) { + private void batch(String endpoint, int retries, List commands, Completable> handler) { RedisConnection connection = connections.get(endpoint); if (connection == null) { connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null) @@ -466,7 +462,7 @@ private void batch(String endpoint, int retries, List commands, Handler if (retries > 0) { batch(endpoint, retries - 1, commands, handler); } else { - handler.handle(Future.failedFuture("Failed obtaining connection to: " + endpoint)); + handler.fail("Failed obtaining connection to: " + endpoint); } }); return; @@ -489,7 +485,7 @@ private void batch(String endpoint, int retries, List commands, Handler String addr = cause.slice(' ', 2); if (addr == null) { // bad message - handler.handle(Future.failedFuture("Cannot find endpoint:port in redirection: " + cause)); + handler.fail("Cannot find endpoint:port in redirection: " + cause); return; } @@ -500,9 +496,9 @@ private void batch(String endpoint, int retries, List commands, Handler } String newEndpoint = uri.protocol() + "://" + uri.userinfo() + addr; if (ask) { - batch(newEndpoint, retries - 1, Collections.singletonList(Request.cmd(Command.ASKING)), resp -> { - if (resp.failed()) { - handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause)); + batch(newEndpoint, retries - 1, Collections.singletonList(Request.cmd(Command.ASKING)), (resp, err) -> { + if (err != null) { + handler.fail("Failed ASKING: " + err + ", caused by " + cause); } else { batch(newEndpoint, retries - 1, commands, handler); } @@ -524,7 +520,7 @@ private void batch(String endpoint, int retries, List commands, Handler // try to authenticate connection .send(Request.cmd(Command.AUTH).arg(connectOptions.getPassword())) - .onFailure(err -> handler.handle(Future.failedFuture(err))) + .onFailure(err -> handler.fail(err)) .onSuccess(auth -> { // again batch(endpoint, retries - 1, commands, handler); @@ -534,7 +530,7 @@ private void batch(String endpoint, int retries, List commands, Handler } try { - handler.handle(send); + handler.succeed(send.result()); } catch (RuntimeException e) { LOG.error("Handler failure", e); } diff --git a/src/main/java/io/vertx/redis/client/impl/RedisClusterImpl.java b/src/main/java/io/vertx/redis/client/impl/RedisClusterImpl.java index 243c7247..dfeb3804 100644 --- a/src/main/java/io/vertx/redis/client/impl/RedisClusterImpl.java +++ b/src/main/java/io/vertx/redis/client/impl/RedisClusterImpl.java @@ -83,12 +83,12 @@ private void onAllNodes(String[] endpoints, int index, Request request, List { - if (ar.succeeded()) { - result.add(ar.result()); + conn.send(endpoints[index], RedisClusterConnection.RETRIES, request, (res, err) -> { + if (err == null) { + result.add(res); onAllNodes(endpoints, index + 1, request, result, conn, promise); } else { - promise.fail(ar.cause()); + promise.fail(err); } }); } diff --git a/src/main/java/io/vertx/redis/client/impl/RedisReplicationClient.java b/src/main/java/io/vertx/redis/client/impl/RedisReplicationClient.java index 4c0480e0..293a29db 100644 --- a/src/main/java/io/vertx/redis/client/impl/RedisReplicationClient.java +++ b/src/main/java/io/vertx/redis/client/impl/RedisReplicationClient.java @@ -117,14 +117,14 @@ public Future connect() { return promise.future(); } - private void connectWithDiscoverTopology(List endpoints, int index, Set failures, Handler> onConnect) { + private void connectWithDiscoverTopology(List endpoints, int index, Set failures, Completable onConnect) { if (index >= endpoints.size()) { // stop condition StringBuilder message = new StringBuilder("Cannot connect to any of the provided endpoints"); for (Throwable failure : failures) { message.append("\n- ").append(failure); } - onConnect.handle(Future.failedFuture(new RedisConnectException(message.toString()))); + onConnect.fail(new RedisConnectException(message.toString())); return; } @@ -155,7 +155,7 @@ private void connectWithDiscoverTopology(List endpoints, int index, Set< if (!replica.online) { LOG.info("Skipping offline replica: " + replica.ip + ":" + replica.port); if (counter.incrementAndGet() == replicas.size()) { - onConnect.handle(Future.succeededFuture(new RedisReplicationConnection(vertx, connectOptions, conn, replicaConnections))); + onConnect.succeed(new RedisReplicationConnection(vertx, connectOptions, conn, replicaConnections)); } continue; } @@ -166,7 +166,7 @@ private void connectWithDiscoverTopology(List endpoints, int index, Set< // failed try with the next endpoint LOG.warn("Skipping failed replica: " + replica.ip + ":" + replica.port, err); if (counter.incrementAndGet() == replicas.size()) { - onConnect.handle(Future.succeededFuture(new RedisReplicationConnection(vertx, connectOptions, conn, replicaConnections))); + onConnect.succeed(new RedisReplicationConnection(vertx, connectOptions, conn, replicaConnections)); } }) .onSuccess(replicaConnection -> { @@ -177,7 +177,7 @@ private void connectWithDiscoverTopology(List endpoints, int index, Set< replicaConnections.add(replicaConnection); } if (counter.incrementAndGet() == replicas.size()) { - onConnect.handle(Future.succeededFuture(new RedisReplicationConnection(vertx, connectOptions, conn, replicaConnections))); + onConnect.succeed(new RedisReplicationConnection(vertx, connectOptions, conn, replicaConnections)); } }); } diff --git a/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java b/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java index 61d67ad2..ff6145a9 100644 --- a/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java +++ b/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java @@ -15,11 +15,7 @@ */ package io.vertx.redis.client.impl; -import io.vertx.core.AsyncResult; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Promise; -import io.vertx.core.Vertx; +import io.vertx.core.*; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.core.net.NetClientOptions; @@ -73,14 +69,12 @@ public RedisSentinelClient(Vertx vertx, NetClientOptions tcpOptions, PoolOptions public Future connect() { final Promise promise = vertx.promise(); - createConnectionInternal(connectOptions, connectOptions.getRole(), createConnection -> { - if (createConnection.failed()) { - promise.fail(createConnection.cause()); + createConnectionInternal(connectOptions, connectOptions.getRole(), (conn, err) -> { + if (err != null) { + promise.fail(err); return; } - final PooledRedisConnection conn = createConnection.result(); - if (connectOptions.getRole() == RedisRole.SENTINEL || connectOptions.getRole() == RedisRole.REPLICA) { // it is possible that a replica is later promoted to a master, but that shouldn't be too big of a deal promise.complete(conn); @@ -130,11 +124,11 @@ private Future createConnectionInternal(RedisRole role) { return promise.future(); } - private void createConnectionInternal(RedisSentinelConnectOptions options, RedisRole role, Handler> onCreate) { + private void createConnectionInternal(RedisSentinelConnectOptions options, RedisRole role, Completable onCreate) { final Handler> createAndConnect = resolve -> { if (resolve.failed()) { - onCreate.handle(Future.failedFuture(resolve.cause())); + onCreate.fail(resolve.cause()); return; } diff --git a/src/main/java/io/vertx/redis/client/impl/SharedSlots.java b/src/main/java/io/vertx/redis/client/impl/SharedSlots.java index b095157d..16f7d703 100644 --- a/src/main/java/io/vertx/redis/client/impl/SharedSlots.java +++ b/src/main/java/io/vertx/redis/client/impl/SharedSlots.java @@ -55,7 +55,7 @@ Future get() { } } - private void getSlots(List endpoints, int index, Set failures, Handler> onGotSlots) { + private void getSlots(List endpoints, int index, Set failures, Promise onGotSlots) { if (index >= endpoints.size()) { // stop condition StringBuilder message = new StringBuilder("Cannot connect to any of the provided endpoints");