Skip to content

Commit

Permalink
Fix Redis Pub/Sub subscribeAsMessages method
Browse files Browse the repository at this point in the history
The method was discarding the received items, instead of emitting them downstream.

(cherry picked from commit 3743f32)
  • Loading branch information
cescoffier authored and gsmet committed Jul 23, 2024
1 parent c3baf93 commit 0f1f29e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,14 @@ public Multi<RedisPubSubMessage<V>> subscribeAsMessages(String... channels) {

List<String> list = List.of(channels);
return Multi.createFrom().emitter(emitter -> {
subscribe(list, (channel, value) -> new DefaultRedisPubSubMessage<>(value, channel), emitter::complete,
emitter::fail)
.subscribe().with(subscriber -> emitter
.onTermination(() -> subscriber.unsubscribe(channels).subscribe().asCompletionStage()));
subscribe(list,
(channel, value) -> emitter.emit(new DefaultRedisPubSubMessage<>(value, channel)),
emitter::complete, emitter::fail)
.subscribe().with(x -> {
emitter.onTermination(() -> {
x.unsubscribe(channels).subscribe().asCompletionStage();
});
}, emitter::fail);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void initialize() {
ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(5));
pubsub = ds.pubsub(Person.class);

ReactiveRedisDataSourceImpl reactiveDS = new ReactiveRedisDataSourceImpl(vertx, redis, api);
var reactiveDS = new ReactiveRedisDataSourceImpl(vertx, redis, api);
reactive = reactiveDS.pubsub(Person.class);
}

Expand Down Expand Up @@ -371,6 +371,35 @@ void subscribeToSingleWithMultiAsMessages() {

}

@Test
void testSubscribeAsMessages() {
List<RedisPubSubMessage<Person>> people = new CopyOnWriteArrayList<>();
Multi<RedisPubSubMessage<Person>> multi = reactive.subscribeAsMessages(channel);

Cancellable cancellable = multi.subscribe().with(people::add);

pubsub.publish("foo", new Person("luke", "skywalker"));
pubsub.publish(channel, new Person("luke", "skywalker"));

Awaitility.await().until(() -> people.size() == 1);

pubsub.publish(channel, new Person("leia", "skywalker"));
pubsub.publish(channel, new Person("leia", "skywalker"));
pubsub.publish(channel, new Person("leia", "skywalker"));

Awaitility.await().until(() -> people.size() == 4);

assertThat(people).allSatisfy(m -> {
assertThat(m.getChannel()).isNotBlank();
assertThat(m.getPayload()).isNotNull();
});

cancellable.cancel();

awaitNoMoreActiveChannels();

}

@Test
void unsubscribe() {

Expand Down

0 comments on commit 0f1f29e

Please sign in to comment.