diff --git a/src/main/java/io/vertx/redis/client/SubscriptionHandler.java b/src/main/java/io/vertx/redis/client/SubscriptionHandler.java new file mode 100644 index 00000000..b290c187 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/SubscriptionHandler.java @@ -0,0 +1,93 @@ +package io.vertx.redis.client; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.internal.logging.Logger; +import io.vertx.core.internal.logging.LoggerFactory; +import io.vertx.core.json.JsonObject; +import io.vertx.redis.client.impl.types.Multi; + +/** + * A handler that can be installed on a Redis connection using {@link RedisConnection#handler(Handler)} + * in order to consume subscription messages and send them on the Vert.x {@linkplain Vertx#eventBus() event bus}. + *

+ * By default, the address to which the messages are sent is {@code io.vertx.redis.}. + * This can be changed by passing a different address prefix to {@link #create(Vertx, String)}. + * For example, if the prefix is {@code com.example}, the address is {@code com.example.}. + */ +public class SubscriptionHandler implements Handler { + private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHandler.class); + + private static final String DEFAULT_ADDRESS_PREFIX = "io.vertx.redis"; + + private final String prefix; + private final EventBus eventBus; + + /** + * Creates a subscription handler that forwards messages to the event bus of the given {@code vertx}. + * The default address prefix is used. + */ + public static SubscriptionHandler create(Vertx vertx) { + return new SubscriptionHandler(DEFAULT_ADDRESS_PREFIX, vertx.eventBus()); + } + + /** + * Creates a subscription handler that forwards messages to the event bus of the given {@code vertx}. + * The given address prefix is used. + */ + public static SubscriptionHandler create(Vertx vertx, String addressPrefix) { + return new SubscriptionHandler(addressPrefix, vertx.eventBus()); + } + + private SubscriptionHandler(String prefix, EventBus eventBus) { + this.prefix = prefix; + this.eventBus = eventBus; + } + + @Override + public void handle(Response reply) { + // pub/sub messages are arrays + if (reply instanceof Multi) { + // Detect valid published messages according to https://redis.io/topics/pubsub + String type = reply.get(0).toString(); + + if (reply.size() == 3 && "message".equals(type)) { + eventBus.send(prefix + "." + reply.get(1).toString(), + new JsonObject() + .put("status", "OK") + .put("type", type) + .put("value", new JsonObject() + .put("channel", reply.get(1).toString()) + .put("message", reply.get(2).toString()))); + return; + } + + if (reply.size() == 4 && "pmessage".equals(type)) { + eventBus.send(prefix + "." + reply.get(1).toString(), + new JsonObject() + .put("status", "OK") + .put("type", type) + .put("value", new JsonObject() + .put("pattern", reply.get(1).toString()) + .put("channel", reply.get(2).toString()) + .put("message", reply.get(3).toString()))); + return; + } + + if (reply.size() == 3 && ("subscribe".equals(type) || "psubscribe".equals(type) + || "unsubscribe".equals(type) || "punsubscribe".equals(type))) { + eventBus.send(prefix + "." + reply.get(1).toString(), + new JsonObject() + .put("status", "OK") + .put("type", type) + .put("value", new JsonObject() + .put("channel", reply.get(1).toString()) + .put("remaining", reply.get(2).toString()))); + return; + } + } + + LOG.warn("No handler waiting for message: " + reply); + } +} diff --git a/src/main/java/io/vertx/redis/client/impl/RedisStandaloneConnection.java b/src/main/java/io/vertx/redis/client/impl/RedisStandaloneConnection.java index db334fb1..6ba95cc3 100644 --- a/src/main/java/io/vertx/redis/client/impl/RedisStandaloneConnection.java +++ b/src/main/java/io/vertx/redis/client/impl/RedisStandaloneConnection.java @@ -32,9 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger; public class RedisStandaloneConnection implements RedisConnectionInternal, ParserHandler { - - private static final String BASE_ADDRESS = "io.vertx.redis"; - private static final Logger LOG = LoggerFactory.getLogger(RedisStandaloneConnection.class); private static final ErrorType CONNECTION_CLOSED = ErrorType.create("CONNECTION_CLOSED"); @@ -45,7 +42,6 @@ public class RedisStandaloneConnection implements RedisConnectionInternal, Parse private final VertxInternal vertx; // to be used for callbacks private final ContextInternal context; - private final EventBus eventBus; private final NetSocket netSocket; // waiting: commands that have been sent but not answered // the queue is only accessed from the event loop @@ -69,7 +65,6 @@ public RedisStandaloneConnection(VertxInternal vertx, ContextInternal context, P this.context = context; this.poolOptions = options; this.listener = connectionListener; - this.eventBus = vertx.eventBus(); this.netSocket = netSocket; this.waiting = new ArrayQueue(maxWaitingHandlers); this.expiresAt = computeExpiration(); @@ -386,40 +381,10 @@ public void handle(Response reply) { } // pub/sub mode - if ((reply != null && reply.type() == ResponseType.PUSH) || empty) { + if (reply != null && reply.type() == ResponseType.PUSH || empty) { if (onMessage != null) { context.execute(reply, onMessage); } else { - // pub/sub messages are arrays - if (reply instanceof Multi) { - // Detect valid published messages according to https://redis.io/topics/pubsub - - if (reply.size() == 3 && "message".equals(reply.get(0).toString())) { - // channel - eventBus.send( - BASE_ADDRESS + "." + reply.get(1).toString(), - new JsonObject() - .put("status", "OK") - .put("value", new JsonObject() - .put("channel", reply.get(1).toString()) - .put("message", reply.get(2).toString()))); - return; - } - - if (reply.size() == 4 && "pmessage".equals(reply.get(0).toString())) { - // pattern - eventBus.send( - BASE_ADDRESS + "." + reply.get(1).toString(), - new JsonObject() - .put("status", "OK") - .put("value", new JsonObject() - .put("pattern", reply.get(1).toString()) - .put("channel", reply.get(2).toString()) - .put("message", reply.get(3).toString()))); - return; - } - // fallback will just go to the log - } LOG.warn("No handler waiting for message: " + reply); } return; diff --git a/src/test/java/io/vertx/tests/redis/client/RedisClientPubSubTest.java b/src/test/java/io/vertx/tests/redis/client/RedisClientPubSubTest.java index a362dbcb..d9cf8ae2 100644 --- a/src/test/java/io/vertx/tests/redis/client/RedisClientPubSubTest.java +++ b/src/test/java/io/vertx/tests/redis/client/RedisClientPubSubTest.java @@ -25,6 +25,7 @@ import io.vertx.redis.client.RedisConnection; import io.vertx.redis.client.RedisOptions; import io.vertx.redis.client.Request; +import io.vertx.redis.client.SubscriptionHandler; import io.vertx.tests.redis.containers.RedisStandalone; import org.junit.After; import org.junit.Before; @@ -72,6 +73,7 @@ public void after() throws Exception { public void testPublishSubscribe(TestContext should) throws Exception { Async test = should.async(); consumer = vertx.eventBus().consumer("io.vertx.redis.news", msg -> test.complete()); + subConn.handler(SubscriptionHandler.create(vertx)); subConn.send(Request.cmd(Command.SUBSCRIBE).arg("news")).await(20, TimeUnit.SECONDS); publishMsg(Request.cmd(Command.PUBLISH).arg("news").arg("foo"), 5); } @@ -80,6 +82,7 @@ public void testPublishSubscribe(TestContext should) throws Exception { public void testPublishPSubscribe(TestContext should) throws Exception { final Async test = should.async(); consumer = vertx.eventBus().consumer("io.vertx.redis.new*", msg -> test.complete()); + subConn.handler(SubscriptionHandler.create(vertx)); subConn.send(Request.cmd(Command.PSUBSCRIBE).arg("new*")).await(20, TimeUnit.SECONDS); publishMsg(Request.cmd(Command.PUBLISH).arg("news").arg("foo"), 5); } diff --git a/src/test/java/io/vertx/tests/redis/client/RedisPubSubTest.java b/src/test/java/io/vertx/tests/redis/client/RedisPubSubTest.java index a72f9eeb..b4791dcd 100644 --- a/src/test/java/io/vertx/tests/redis/client/RedisPubSubTest.java +++ b/src/test/java/io/vertx/tests/redis/client/RedisPubSubTest.java @@ -11,6 +11,7 @@ import io.vertx.redis.client.RedisConnection; import io.vertx.redis.client.RedisOptions; import io.vertx.redis.client.Request; +import io.vertx.redis.client.SubscriptionHandler; import io.vertx.tests.redis.containers.RedisCluster; import org.junit.After; import org.junit.Before; @@ -40,26 +41,22 @@ public void before(TestContext should) { RedisOptions options = new RedisOptions() .setConnectionString(redis.getRedisNode0Uri()) .setType(RedisClientType.CLUSTER); - redisPublish = Redis.createClient(rule.vertx(), options); - redisPublish - .connect().onComplete(connectPub -> { - should.assertTrue(connectPub.succeeded()); - pubConn = connectPub.result(); + redisPublish = Redis.createClient(rule.vertx(), options); + redisPublish.connect().onComplete(should.asyncAssertSuccess(connectPub -> { + pubConn = connectPub; redisSubscribe = Redis.createClient(rule.vertx(), options); - redisSubscribe - .connect().onComplete(connectSub -> { - should.assertTrue(connectSub.succeeded()); + redisSubscribe.connect().onComplete(should.asyncAssertSuccess(connectSub -> { + subConn = connectSub; - subConn = connectSub.result(); test.complete(); - }); - }); + })); + })); } @After - public void after(TestContext should) { + public void after() { redisPublish.close(); redisSubscribe.close(); } @@ -77,6 +74,7 @@ public void testSubscribeMultipleTimes(TestContext should) { System.out.println("received redis msg " + msg.body()); rule.vertx().eventBus().publish(CHAN, msg.body()); }); + subConn.handler(SubscriptionHandler.create(rule.vertx())); subUnsub(CHAN, N, should.async(N)); rule.vertx().setTimer(1000, id -> pubConn.send(Request.cmd(Command.PUBLISH).arg(CHAN).arg("hello")).onComplete(preply -> should.assertTrue(preply.succeeded())));