Skip to content

Commit

Permalink
Remove usage of Handler/AsyncResult idiom.
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Sep 12, 2024
1 parent 3d84f5a commit bc9ecd6
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 55 deletions.
16 changes: 6 additions & 10 deletions src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
*/
package io.vertx.redis.client.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.*;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.net.NetClientOptions;
Expand Down Expand Up @@ -154,7 +150,7 @@ public Future<RedisConnection> connect() {
return promise.future();
}

private void connect(Slots slots, Handler<AsyncResult<RedisConnection>> onConnected) {
private void connect(Slots slots, Completable<RedisConnection> onConnected) {
// create a cluster connection
final Set<Throwable> failures = ConcurrentHashMap.newKeySet();
final AtomicInteger counter = new AtomicInteger();
Expand All @@ -180,7 +176,7 @@ private void connect(Slots slots, Handler<AsyncResult<RedisConnection>> onConnec
}

private void connectionComplete(AtomicInteger counter, Slots slots, Map<String, PooledRedisConnection> connections,
Set<Throwable> failures, Handler<AsyncResult<RedisConnection>> onConnected) {
Set<Throwable> failures, Completable<RedisConnection> onConnected) {
if (counter.incrementAndGet() == slots.endpoints().length) {
// end condition
if (!failures.isEmpty()) {
Expand All @@ -200,10 +196,10 @@ private void connectionComplete(AtomicInteger counter, Slots slots, Map<String,
for (Throwable failure : failures) {
message.append("\n- ").append(failure);
}
onConnected.handle(Future.failedFuture(new RedisConnectException(message.toString())));
onConnected.fail(new RedisConnectException(message.toString()));
} else {
onConnected.handle(Future.succeededFuture(new RedisClusterConnection(vertx, connectionManager,
connectOptions, sharedSlots, connections)));
onConnected.succeed(new RedisClusterConnection(vertx, connectionManager,
connectOptions, sharedSlots, connections));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package io.vertx.redis.client.impl;

import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.*;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
Expand Down Expand Up @@ -266,7 +262,7 @@ private Map<Integer, Request> splitRequest(CommandImpl cmd, List<byte[]> args) {
return map;
}

void send(String endpoint, int retries, Request command, Handler<AsyncResult<Response>> handler) {
void send(String endpoint, int retries, Request command, Completable<Response> handler) {
PooledRedisConnection connection = connections.get(endpoint);
if (connection == null) {
connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null)
Expand All @@ -285,15 +281,15 @@ void send(String endpoint, int retries, Request command, Handler<AsyncResult<Res
if (retries > 0) {
send(endpoint, retries - 1, command, handler);
} else {
handler.handle(Future.failedFuture("Failed obtaining connection to: " + endpoint));
handler.fail("Failed obtaining connection to: " + endpoint);
}
});
return;
}

connection
.send(command)
.onComplete(send -> {
.onComplete((send -> {
if (send.failed() && send.cause() instanceof ErrorType && retries >= 0) {
final ErrorType cause = (ErrorType) send.cause();

Expand All @@ -308,7 +304,7 @@ void send(String endpoint, int retries, Request command, Handler<AsyncResult<Res
String addr = cause.slice(' ', 2);
if (addr == null) {
// bad message
handler.handle(Future.failedFuture("Cannot find endpoint:port in redirection: " + cause));
handler.fail("Cannot find endpoint:port in redirection: " + cause);
return;
}

Expand All @@ -319,9 +315,9 @@ void send(String endpoint, int retries, Request command, Handler<AsyncResult<Res
}
String newEndpoint = uri.protocol() + "://" + uri.userinfo() + addr;
if (ask) {
send(newEndpoint, retries - 1, Request.cmd(Command.ASKING), resp -> {
if (resp.failed()) {
handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause));
send(newEndpoint, retries - 1, Request.cmd(Command.ASKING), (resp, err) -> {
if (err != null) {
handler.fail("Failed ASKING: " + err + ", caused by " + cause);
} else {
send(newEndpoint, retries - 1, command, handler);
}
Expand All @@ -343,7 +339,7 @@ void send(String endpoint, int retries, Request command, Handler<AsyncResult<Res
// NOAUTH will try to authenticate
connection
.send(Request.cmd(Command.AUTH).arg(connectOptions.getPassword()))
.onFailure(err -> handler.handle(Future.failedFuture(err)))
.onFailure(err -> handler.fail(err))
.onSuccess(auth -> {
// again
send(endpoint, retries - 1, command, handler);
Expand All @@ -353,11 +349,11 @@ void send(String endpoint, int retries, Request command, Handler<AsyncResult<Res
}

try {
handler.handle(send);
handler.succeed(send.result());
} catch (RuntimeException e) {
LOG.error("Handler failure", e);
}
});
}));
}

@Override
Expand Down Expand Up @@ -447,7 +443,7 @@ private Future<List<Response>> batch(List<Request> requests, Slots slots) {
return promise.future();
}

private void batch(String endpoint, int retries, List<Request> commands, Handler<AsyncResult<List<Response>>> handler) {
private void batch(String endpoint, int retries, List<Request> commands, Completable<List<Response>> handler) {
RedisConnection connection = connections.get(endpoint);
if (connection == null) {
connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null)
Expand All @@ -466,7 +462,7 @@ private void batch(String endpoint, int retries, List<Request> commands, Handler
if (retries > 0) {
batch(endpoint, retries - 1, commands, handler);
} else {
handler.handle(Future.failedFuture("Failed obtaining connection to: " + endpoint));
handler.fail("Failed obtaining connection to: " + endpoint);
}
});
return;
Expand All @@ -489,7 +485,7 @@ private void batch(String endpoint, int retries, List<Request> commands, Handler
String addr = cause.slice(' ', 2);
if (addr == null) {
// bad message
handler.handle(Future.failedFuture("Cannot find endpoint:port in redirection: " + cause));
handler.fail("Cannot find endpoint:port in redirection: " + cause);
return;
}

Expand All @@ -500,9 +496,9 @@ private void batch(String endpoint, int retries, List<Request> commands, Handler
}
String newEndpoint = uri.protocol() + "://" + uri.userinfo() + addr;
if (ask) {
batch(newEndpoint, retries - 1, Collections.singletonList(Request.cmd(Command.ASKING)), resp -> {
if (resp.failed()) {
handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause));
batch(newEndpoint, retries - 1, Collections.singletonList(Request.cmd(Command.ASKING)), (resp, err) -> {
if (err != null) {
handler.fail("Failed ASKING: " + err + ", caused by " + cause);
} else {
batch(newEndpoint, retries - 1, commands, handler);
}
Expand All @@ -524,7 +520,7 @@ private void batch(String endpoint, int retries, List<Request> commands, Handler
// try to authenticate
connection
.send(Request.cmd(Command.AUTH).arg(connectOptions.getPassword()))
.onFailure(err -> handler.handle(Future.failedFuture(err)))
.onFailure(err -> handler.fail(err))
.onSuccess(auth -> {
// again
batch(endpoint, retries - 1, commands, handler);
Expand All @@ -534,7 +530,7 @@ private void batch(String endpoint, int retries, List<Request> commands, Handler
}

try {
handler.handle(send);
handler.succeed(send.result());
} catch (RuntimeException e) {
LOG.error("Handler failure", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ private void onAllNodes(String[] endpoints, int index, Request request, List<Res
return;
}

conn.send(endpoints[index], RedisClusterConnection.RETRIES, request, ar -> {
if (ar.succeeded()) {
result.add(ar.result());
conn.send(endpoints[index], RedisClusterConnection.RETRIES, request, (res, err) -> {
if (err == null) {
result.add(res);
onAllNodes(endpoints, index + 1, request, result, conn, promise);
} else {
promise.fail(ar.cause());
promise.fail(err);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ public Future<RedisConnection> connect() {
return promise.future();
}

private void connectWithDiscoverTopology(List<String> endpoints, int index, Set<Throwable> failures, Handler<AsyncResult<RedisConnection>> onConnect) {
private void connectWithDiscoverTopology(List<String> endpoints, int index, Set<Throwable> failures, Completable<RedisConnection> onConnect) {
if (index >= endpoints.size()) {
// stop condition
StringBuilder message = new StringBuilder("Cannot connect to any of the provided endpoints");
for (Throwable failure : failures) {
message.append("\n- ").append(failure);
}
onConnect.handle(Future.failedFuture(new RedisConnectException(message.toString())));
onConnect.fail(new RedisConnectException(message.toString()));
return;
}

Expand Down Expand Up @@ -155,7 +155,7 @@ private void connectWithDiscoverTopology(List<String> endpoints, int index, Set<
if (!replica.online) {
LOG.info("Skipping offline replica: " + replica.ip + ":" + replica.port);
if (counter.incrementAndGet() == replicas.size()) {
onConnect.handle(Future.succeededFuture(new RedisReplicationConnection(vertx, connectOptions, conn, replicaConnections)));
onConnect.succeed(new RedisReplicationConnection(vertx, connectOptions, conn, replicaConnections));
}
continue;
}
Expand All @@ -166,7 +166,7 @@ private void connectWithDiscoverTopology(List<String> endpoints, int index, Set<
// failed try with the next endpoint
LOG.warn("Skipping failed replica: " + replica.ip + ":" + replica.port, err);
if (counter.incrementAndGet() == replicas.size()) {
onConnect.handle(Future.succeededFuture(new RedisReplicationConnection(vertx, connectOptions, conn, replicaConnections)));
onConnect.succeed(new RedisReplicationConnection(vertx, connectOptions, conn, replicaConnections));
}
})
.onSuccess(replicaConnection -> {
Expand All @@ -177,7 +177,7 @@ private void connectWithDiscoverTopology(List<String> endpoints, int index, Set<
replicaConnections.add(replicaConnection);
}
if (counter.incrementAndGet() == replicas.size()) {
onConnect.handle(Future.succeededFuture(new RedisReplicationConnection(vertx, connectOptions, conn, replicaConnections)));
onConnect.succeed(new RedisReplicationConnection(vertx, connectOptions, conn, replicaConnections));
}
});
}
Expand Down
18 changes: 6 additions & 12 deletions src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
*/
package io.vertx.redis.client.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.*;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.net.NetClientOptions;
Expand Down Expand Up @@ -73,14 +69,12 @@ public RedisSentinelClient(Vertx vertx, NetClientOptions tcpOptions, PoolOptions
public Future<RedisConnection> connect() {
final Promise<RedisConnection> promise = vertx.promise();

createConnectionInternal(connectOptions, connectOptions.getRole(), createConnection -> {
if (createConnection.failed()) {
promise.fail(createConnection.cause());
createConnectionInternal(connectOptions, connectOptions.getRole(), (conn, err) -> {
if (err != null) {
promise.fail(err);
return;
}

final PooledRedisConnection conn = createConnection.result();

if (connectOptions.getRole() == RedisRole.SENTINEL || connectOptions.getRole() == RedisRole.REPLICA) {
// it is possible that a replica is later promoted to a master, but that shouldn't be too big of a deal
promise.complete(conn);
Expand Down Expand Up @@ -130,11 +124,11 @@ private Future<PooledRedisConnection> createConnectionInternal(RedisRole role) {
return promise.future();
}

private void createConnectionInternal(RedisSentinelConnectOptions options, RedisRole role, Handler<AsyncResult<PooledRedisConnection>> onCreate) {
private void createConnectionInternal(RedisSentinelConnectOptions options, RedisRole role, Completable<PooledRedisConnection> onCreate) {

final Handler<AsyncResult<RedisURI>> createAndConnect = resolve -> {
if (resolve.failed()) {
onCreate.handle(Future.failedFuture(resolve.cause()));
onCreate.fail(resolve.cause());
return;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/redis/client/impl/SharedSlots.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Future<Slots> get() {
}
}

private void getSlots(List<String> endpoints, int index, Set<Throwable> failures, Handler<AsyncResult<Slots>> onGotSlots) {
private void getSlots(List<String> endpoints, int index, Set<Throwable> failures, Promise<Slots> onGotSlots) {
if (index >= endpoints.size()) {
// stop condition
StringBuilder message = new StringBuilder("Cannot connect to any of the provided endpoints");
Expand Down

0 comments on commit bc9ecd6

Please sign in to comment.