Skip to content

Commit

Permalink
Extract the Redis subscription -> Vert.x event bus handler
Browse files Browse the repository at this point in the history
The `SubscriptionHandler` does almost the same job, but has to be set
on the `RedisConnection` explicitly (using the `handler()` method).
In addition to forwarding the messages to the event bus, the new
`SubscriptionHandler` also forwards subscription/unsubscriptions.
  • Loading branch information
Ladicek committed Oct 4, 2024
1 parent 412b613 commit aa91a2b
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 48 deletions.
93 changes: 93 additions & 0 deletions src/main/java/io/vertx/redis/client/SubscriptionHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.vertx.redis.client;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.impl.types.Multi;

/**
* A handler that can be installed on a Redis connection using {@link RedisConnection#handler(Handler)}
* in order to consume subscription messages and send them on the Vert.x {@linkplain Vertx#eventBus() event bus}.
* <p>
* By default, the address to which the messages are sent is {@code io.vertx.redis.<Redis channel>}.
* This can be changed by passing a different address prefix to {@link #create(Vertx, String)}.
* For example, if the prefix is {@code com.example}, the address is {@code com.example.<Redis channel>}.
*/
public class SubscriptionHandler implements Handler<Response> {
private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHandler.class);

private static final String DEFAULT_ADDRESS_PREFIX = "io.vertx.redis";

private final String prefix;
private final EventBus eventBus;

/**
* Creates a subscription handler that forwards messages to the event bus of the given {@code vertx}.
* The default address prefix is used.
*/
public static SubscriptionHandler create(Vertx vertx) {
return new SubscriptionHandler(DEFAULT_ADDRESS_PREFIX, vertx.eventBus());
}

/**
* Creates a subscription handler that forwards messages to the event bus of the given {@code vertx}.
* The given address prefix is used.
*/
public static SubscriptionHandler create(Vertx vertx, String addressPrefix) {
return new SubscriptionHandler(addressPrefix, vertx.eventBus());
}

private SubscriptionHandler(String prefix, EventBus eventBus) {
this.prefix = prefix;
this.eventBus = eventBus;
}

@Override
public void handle(Response reply) {
// pub/sub messages are arrays
if (reply instanceof Multi) {
// Detect valid published messages according to https://redis.io/topics/pubsub
String type = reply.get(0).toString();

if (reply.size() == 3 && "message".equals(type)) {
eventBus.send(prefix + "." + reply.get(1).toString(),
new JsonObject()
.put("status", "OK")
.put("type", type)
.put("value", new JsonObject()
.put("channel", reply.get(1).toString())
.put("message", reply.get(2).toString())));
return;
}

if (reply.size() == 4 && "pmessage".equals(type)) {
eventBus.send(prefix + "." + reply.get(1).toString(),
new JsonObject()
.put("status", "OK")
.put("type", type)
.put("value", new JsonObject()
.put("pattern", reply.get(1).toString())
.put("channel", reply.get(2).toString())
.put("message", reply.get(3).toString())));
return;
}

if (reply.size() == 3 && ("subscribe".equals(type) || "psubscribe".equals(type)
|| "unsubscribe".equals(type) || "punsubscribe".equals(type))) {
eventBus.send(prefix + "." + reply.get(1).toString(),
new JsonObject()
.put("status", "OK")
.put("type", type)
.put("value", new JsonObject()
.put("channel", reply.get(1).toString())
.put("remaining", reply.get(2).toString())));
return;
}
}

LOG.warn("No handler waiting for message: " + reply);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
import java.util.concurrent.atomic.AtomicInteger;

public class RedisStandaloneConnection implements RedisConnectionInternal, ParserHandler {

private static final String BASE_ADDRESS = "io.vertx.redis";

private static final Logger LOG = LoggerFactory.getLogger(RedisStandaloneConnection.class);

private static final ErrorType CONNECTION_CLOSED = ErrorType.create("CONNECTION_CLOSED");
Expand All @@ -45,7 +42,6 @@ public class RedisStandaloneConnection implements RedisConnectionInternal, Parse
private final VertxInternal vertx;
// to be used for callbacks
private final ContextInternal context;
private final EventBus eventBus;
private final NetSocket netSocket;
// waiting: commands that have been sent but not answered
// the queue is only accessed from the event loop
Expand All @@ -69,7 +65,6 @@ public RedisStandaloneConnection(VertxInternal vertx, ContextInternal context, P
this.context = context;
this.poolOptions = options;
this.listener = connectionListener;
this.eventBus = vertx.eventBus();
this.netSocket = netSocket;
this.waiting = new ArrayQueue(maxWaitingHandlers);
this.expiresAt = computeExpiration();
Expand Down Expand Up @@ -386,40 +381,10 @@ public void handle(Response reply) {
}

// pub/sub mode
if ((reply != null && reply.type() == ResponseType.PUSH) || empty) {
if (reply != null && reply.type() == ResponseType.PUSH || empty) {
if (onMessage != null) {
context.execute(reply, onMessage);
} else {
// pub/sub messages are arrays
if (reply instanceof Multi) {
// Detect valid published messages according to https://redis.io/topics/pubsub

if (reply.size() == 3 && "message".equals(reply.get(0).toString())) {
// channel
eventBus.send(
BASE_ADDRESS + "." + reply.get(1).toString(),
new JsonObject()
.put("status", "OK")
.put("value", new JsonObject()
.put("channel", reply.get(1).toString())
.put("message", reply.get(2).toString())));
return;
}

if (reply.size() == 4 && "pmessage".equals(reply.get(0).toString())) {
// pattern
eventBus.send(
BASE_ADDRESS + "." + reply.get(1).toString(),
new JsonObject()
.put("status", "OK")
.put("value", new JsonObject()
.put("pattern", reply.get(1).toString())
.put("channel", reply.get(2).toString())
.put("message", reply.get(3).toString())));
return;
}
// fallback will just go to the log
}
LOG.warn("No handler waiting for message: " + reply);
}
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.vertx.redis.client.RedisConnection;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.SubscriptionHandler;
import io.vertx.tests.redis.containers.RedisStandalone;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -72,6 +73,7 @@ public void after() throws Exception {
public void testPublishSubscribe(TestContext should) throws Exception {
Async test = should.async();
consumer = vertx.eventBus().consumer("io.vertx.redis.news", msg -> test.complete());
subConn.handler(SubscriptionHandler.create(vertx));
subConn.send(Request.cmd(Command.SUBSCRIBE).arg("news")).await(20, TimeUnit.SECONDS);
publishMsg(Request.cmd(Command.PUBLISH).arg("news").arg("foo"), 5);
}
Expand All @@ -80,6 +82,7 @@ public void testPublishSubscribe(TestContext should) throws Exception {
public void testPublishPSubscribe(TestContext should) throws Exception {
final Async test = should.async();
consumer = vertx.eventBus().consumer("io.vertx.redis.new*", msg -> test.complete());
subConn.handler(SubscriptionHandler.create(vertx));
subConn.send(Request.cmd(Command.PSUBSCRIBE).arg("new*")).await(20, TimeUnit.SECONDS);
publishMsg(Request.cmd(Command.PUBLISH).arg("news").arg("foo"), 5);
}
Expand Down
22 changes: 10 additions & 12 deletions src/test/java/io/vertx/tests/redis/client/RedisPubSubTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.vertx.redis.client.RedisConnection;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.SubscriptionHandler;
import io.vertx.tests.redis.containers.RedisCluster;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -40,26 +41,22 @@ public void before(TestContext should) {
RedisOptions options = new RedisOptions()
.setConnectionString(redis.getRedisNode0Uri())
.setType(RedisClientType.CLUSTER);
redisPublish = Redis.createClient(rule.vertx(), options);
redisPublish
.connect().onComplete(connectPub -> {
should.assertTrue(connectPub.succeeded());

pubConn = connectPub.result();
redisPublish = Redis.createClient(rule.vertx(), options);
redisPublish.connect().onComplete(should.asyncAssertSuccess(connectPub -> {
pubConn = connectPub;

redisSubscribe = Redis.createClient(rule.vertx(), options);
redisSubscribe
.connect().onComplete(connectSub -> {
should.assertTrue(connectSub.succeeded());
redisSubscribe.connect().onComplete(should.asyncAssertSuccess(connectSub -> {
subConn = connectSub;

subConn = connectSub.result();
test.complete();
});
});
}));
}));
}

@After
public void after(TestContext should) {
public void after() {
redisPublish.close();
redisSubscribe.close();
}
Expand All @@ -77,6 +74,7 @@ public void testSubscribeMultipleTimes(TestContext should) {
System.out.println("received redis msg " + msg.body());
rule.vertx().eventBus().publish(CHAN, msg.body());
});
subConn.handler(SubscriptionHandler.create(rule.vertx()));
subUnsub(CHAN, N, should.async(N));
rule.vertx().setTimer(1000, id ->
pubConn.send(Request.cmd(Command.PUBLISH).arg(CHAN).arg("hello")).onComplete(preply -> should.assertTrue(preply.succeeded())));
Expand Down

0 comments on commit aa91a2b

Please sign in to comment.