Skip to content

Commit

Permalink
Java: fix pubsub IT for valkey 8 (#2300)
Browse files Browse the repository at this point in the history
* Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
  • Loading branch information
Yury-Fridlyand authored Sep 18, 2024
1 parent 59cfa7f commit c672c62
Showing 1 changed file with 80 additions and 61 deletions.
141 changes: 80 additions & 61 deletions java/integTest/src/test/java/glide/PubSubTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static glide.TestUtilities.commonClusterClientConfig;
import static glide.api.BaseClient.OK;
import static glide.api.models.GlideString.gs;
import static glide.api.models.configuration.RequestRoutingConfiguration.SimpleMultiNodeRoute.ALL_NODES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -36,6 +35,7 @@
import glide.api.models.exceptions.RequestException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -83,12 +83,15 @@ private <M extends ChannelMode> BaseClient createClientWithSubscriptions(
if (callback.isPresent()) {
subConfigBuilder.callback(callback.get(), context.get());
}
return GlideClient.createClient(
commonClientConfig()
.requestTimeout(5000)
.subscriptionConfiguration(subConfigBuilder.build())
.build())
.get();
var client =
GlideClient.createClient(
commonClientConfig()
.requestTimeout(5000)
.subscriptionConfiguration(subConfigBuilder.build())
.build())
.get();
listeners.put(client, subscriptions);
return client;
} else {
var subConfigBuilder =
ClusterSubscriptionConfiguration.builder()
Expand All @@ -98,26 +101,35 @@ private <M extends ChannelMode> BaseClient createClientWithSubscriptions(
subConfigBuilder.callback(callback.get(), context.get());
}

return GlideClusterClient.createClient(
commonClusterClientConfig()
.requestTimeout(5000)
.subscriptionConfiguration(subConfigBuilder.build())
.build())
.get();
var client =
GlideClusterClient.createClient(
commonClusterClientConfig()
.requestTimeout(5000)
.subscriptionConfiguration(subConfigBuilder.build())
.build())
.get();
listeners.put(client, subscriptions);
return client;
}
}

private <M extends ChannelMode> BaseClient createClientWithSubscriptions(
boolean standalone, Map<M, Set<GlideString>> subscriptions) {
return createClientWithSubscriptions(
standalone, subscriptions, Optional.empty(), Optional.empty());
var client =
createClientWithSubscriptions(
standalone, subscriptions, Optional.empty(), Optional.empty());
listeners.put(client, subscriptions);
return client;
}

@SneakyThrows
private BaseClient createClient(boolean standalone) {
return standalone
? GlideClient.createClient(commonClientConfig().build()).get()
: GlideClusterClient.createClient(commonClusterClientConfig().build()).get();
var client =
standalone
? GlideClient.createClient(commonClientConfig().build()).get()
: GlideClusterClient.createClient(commonClusterClientConfig().build()).get();
senders.add(client);
return client;
}

/**
Expand All @@ -126,32 +138,67 @@ private BaseClient createClient(boolean standalone) {
private final ConcurrentLinkedDeque<Pair<Integer, PubSubMessage>> pubsubMessageQueue =
new ConcurrentLinkedDeque<>();

/** Clients used in a test. */
private final List<BaseClient> clients = new ArrayList<>();
/** Subscribed clients used in a test. */
private final Map<BaseClient, Map<? extends ChannelMode, Set<GlideString>>> listeners =
new HashMap<>();

/** Other clients used in a test. */
private final List<BaseClient> senders = new ArrayList<>();

private static final int MESSAGE_DELIVERY_DELAY = 500; // ms

@AfterEach
@SneakyThrows
public void cleanup() {
for (var client : clients) {
for (var pair : listeners.entrySet()) {
var client = pair.getKey();
var subscriptionTypes = pair.getValue();
if (client instanceof GlideClusterClient) {
((GlideClusterClient) client)
.customCommand(new GlideString[] {gs("unsubscribe")}, ALL_NODES)
.get();
((GlideClusterClient) client)
.customCommand(new GlideString[] {gs("punsubscribe")}, ALL_NODES)
.get();
((GlideClusterClient) client)
.customCommand(new GlideString[] {gs("sunsubscribe")}, ALL_NODES)
.get();
for (var subscription : subscriptionTypes.entrySet()) {
var channels = subscription.getValue().toArray(GlideString[]::new);
for (GlideString channel : channels) {
switch ((PubSubClusterChannelMode) subscription.getKey()) {
case EXACT:
((GlideClusterClient) client)
.customCommand(new GlideString[] {gs("unsubscribe"), channel})
.get();
break;
case PATTERN:
((GlideClusterClient) client)
.customCommand(new GlideString[] {gs("punsubscribe"), channel})
.get();
break;
case SHARDED:
((GlideClusterClient) client)
.customCommand(new GlideString[] {gs("sunsubscribe"), channel})
.get();
break;
}
}
}
} else {
((GlideClient) client).customCommand(new GlideString[] {gs("unsubscribe")}).get();
((GlideClient) client).customCommand(new GlideString[] {gs("punsubscribe")}).get();
for (var subscription : subscriptionTypes.entrySet()) {
var channels = subscription.getValue().toArray(GlideString[]::new);
switch ((PubSubChannelMode) subscription.getKey()) {
case EXACT:
((GlideClient) client)
.customCommand(ArrayUtils.addFirst(channels, gs("unsubscribe")))
.get();
break;
case PATTERN:
((GlideClient) client)
.customCommand(ArrayUtils.addFirst(channels, gs("punsubscribe")))
.get();
break;
}
}
}
}
listeners.clear();
for (var client : senders) {
client.close();
}
clients.clear();
senders.clear();
pubsubMessageQueue.clear();
}

Expand Down Expand Up @@ -246,7 +293,6 @@ public void exact_happy_path(boolean standalone, MessageReadMethod method) {
var listener =
createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions);
var sender = createClient(standalone);
clients.addAll(List.of(listener, sender));

sender.publish(message, channel).get();
Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the message
Expand Down Expand Up @@ -279,7 +325,6 @@ public void exact_happy_path_many_channels(boolean standalone, MessageReadMethod
var listener =
createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions);
var sender = createClient(standalone);
clients.addAll(List.of(listener, sender));

for (var pubsubMessage : messages) {
sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel()).get();
Expand All @@ -305,7 +350,6 @@ public void sharded_pubsub(MessageReadMethod method) {

var listener = createListener(false, method == MessageReadMethod.Callback, 1, subscriptions);
var sender = (GlideClusterClient) createClient(false);
clients.addAll(List.of(listener, sender));

sender.publish(pubsubMessage, channel, true).get();
Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the message
Expand Down Expand Up @@ -339,7 +383,6 @@ public void sharded_pubsub_many_channels(MessageReadMethod method) {

var listener = createListener(false, method == MessageReadMethod.Callback, 1, subscriptions);
var sender = (GlideClusterClient) createClient(false);
clients.addAll(List.of(listener, sender));

for (var pubsubMessage : pubsubMessages) {
sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel(), true).get();
Expand Down Expand Up @@ -376,7 +419,6 @@ public void pattern(boolean standalone, MessageReadMethod method) {
var listener =
createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions);
var sender = createClient(standalone);
clients.addAll(List.of(listener, sender));

Thread.sleep(MESSAGE_DELIVERY_DELAY); // need some time to propagate subscriptions - why?

Expand Down Expand Up @@ -419,7 +461,6 @@ public void pattern_many_channels(boolean standalone, MessageReadMethod method)
var listener =
createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions);
var sender = createClient(standalone);
clients.addAll(List.of(listener, sender));

Thread.sleep(MESSAGE_DELIVERY_DELAY); // need some time to propagate subscriptions - why?

Expand Down Expand Up @@ -470,7 +511,6 @@ public void combined_exact_and_pattern_one_client(boolean standalone, MessageRea
var listener =
createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions);
var sender = createClient(standalone);
clients.addAll(List.of(listener, sender));

for (var pubsubMessage : messages) {
sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel()).get();
Expand Down Expand Up @@ -519,7 +559,6 @@ public void combined_exact_and_pattern_multiple_clients(
createListener(standalone, method == MessageReadMethod.Callback, 2, subscriptions);

var sender = createClient(standalone);
clients.addAll(List.of(listenerExactSub, listenerPatternSub, sender));

for (var pubsubMessage : messages) {
sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel()).get();
Expand Down Expand Up @@ -596,7 +635,6 @@ public void combined_exact_pattern_and_sharded_one_client(MessageReadMethod meth

var listener = createListener(false, method == MessageReadMethod.Callback, 1, subscriptions);
var sender = (GlideClusterClient) createClient(false);
clients.addAll(List.of(listener, sender));

for (var pubsubMessage : messages) {
sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel()).get();
Expand Down Expand Up @@ -653,7 +691,6 @@ public void coexistense_of_sync_and_async_read() {

var listener = createListener(false, false, 1, subscriptions);
var sender = (GlideClusterClient) createClient(false);
clients.addAll(List.of(listener, sender));

for (var pubsubMessage : messages) {
sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel()).get();
Expand Down Expand Up @@ -756,7 +793,6 @@ public void combined_exact_pattern_and_sharded_multi_client(MessageReadMethod me
subscriptionsSharded);

var sender = (GlideClusterClient) createClient(false);
clients.addAll(List.of(listenerExact, listenerPattern, listenerSharded, sender));

for (var pubsubMessage : exactMessages) {
sender.publish(pubsubMessage.getMessage(), pubsubMessage.getChannel()).get();
Expand Down Expand Up @@ -849,8 +885,6 @@ public void three_publishing_clients_same_name_with_sharded(MessageReadMethod me
false, true, PubSubClusterChannelMode.SHARDED.ordinal(), subscriptionsSharded)
: (GlideClusterClient) createClientWithSubscriptions(false, subscriptionsSharded);

clients.addAll(List.of(listenerExact, listenerPattern, listenerSharded));

listenerPattern.publish(exactMessage.getMessage(), channel).get();
listenerSharded.publish(patternMessage.getMessage(), channel).get();
listenerExact.publish(shardedMessage.getMessage(), channel, true).get();
Expand Down Expand Up @@ -961,7 +995,6 @@ public void transaction_with_all_types_of_messages(boolean standalone, MessageRe
var listener =
createListener(standalone, method == MessageReadMethod.Callback, 1, subscriptions);
var sender = createClient(standalone);
clients.addAll(List.of(listener, sender));

if (standalone) {
var transaction =
Expand Down Expand Up @@ -1004,7 +1037,6 @@ public void pubsub_exact_max_size_message(boolean standalone) {
: Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel));
var listener = createClientWithSubscriptions(standalone, subscriptions);
var sender = createClient(standalone);
clients.addAll(Arrays.asList(listener, sender));

assertEquals(OK, sender.publish(message, channel).get());
assertEquals(OK, sender.publish(message2, channel).get());
Expand Down Expand Up @@ -1044,7 +1076,6 @@ public void pubsub_sharded_max_size_message(boolean standalone) {
Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel));
var listener = createClientWithSubscriptions(standalone, subscriptions);
var sender = createClient(standalone);
clients.addAll(Arrays.asList(listener, sender));

assertEquals(OK, sender.publish(message, channel).get());
assertEquals(OK, ((GlideClusterClient) sender).publish(message2, channel, true).get());
Expand Down Expand Up @@ -1101,7 +1132,6 @@ public void pubsub_exact_max_size_message_callback(boolean standalone) {
Optional.ofNullable(callback),
Optional.of(callbackMessages));
var sender = createClient(standalone);
clients.addAll(Arrays.asList(listener, sender));

assertEquals(OK, sender.publish(message, channel).get());

Expand Down Expand Up @@ -1143,7 +1173,6 @@ public void pubsub_sharded_max_size_message_callback(boolean standalone) {
Optional.ofNullable(callback),
Optional.of(callbackMessages));
var sender = createClient(standalone);
clients.addAll(Arrays.asList(listener, sender));

assertEquals(OK, ((GlideClusterClient) sender).publish(message, channel, true).get());

Expand Down Expand Up @@ -1188,7 +1217,6 @@ public void pubsub_test_callback_exception(boolean standalone) {
Optional.ofNullable(callback),
Optional.of(callbackMessages));
var sender = createClient(standalone);
clients.addAll(Arrays.asList(listener, sender));

assertEquals(OK, sender.publish(message1, channel).get());
assertEquals(OK, sender.publish(message2, channel).get());
Expand Down Expand Up @@ -1241,7 +1269,6 @@ public void pubsub_with_binary(boolean standalone) {
createClientWithSubscriptions(
standalone, subscriptions, Optional.of(callback), Optional.of(callbackMessages));
var sender = createClient(standalone);
clients.addAll(List.of(listener, listener2, sender));

assertEquals(OK, sender.publish(message.getMessage(), channel).get());
Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages
Expand Down Expand Up @@ -1277,7 +1304,6 @@ public void pubsub_channels(boolean standalone) {
channels.stream().map(GlideString::gs).collect(Collectors.toSet()));

var listener = createClientWithSubscriptions(standalone, subscriptions);
clients.addAll(List.of(client, listener));

// test without pattern
assertEquals(channels, Set.of(client.pubsubChannels().get()));
Expand Down Expand Up @@ -1330,7 +1356,6 @@ public void pubsub_numpat(boolean standalone) {
patterns.stream().map(GlideString::gs).collect(Collectors.toSet()));

var listener = createClientWithSubscriptions(standalone, subscriptions);
clients.addAll(List.of(client, listener));

assertEquals(2, client.pubsubNumPat().get());
assertEquals(2, listener.pubsubNumPat().get());
Expand Down Expand Up @@ -1376,8 +1401,6 @@ public void pubsub_numsub(boolean standalone) {
: Map.of(PubSubClusterChannelMode.PATTERN, Set.of(gs("channel*")));
var listener4 = createClientWithSubscriptions(standalone, subscriptions4);

clients.addAll(List.of(client, listener1, listener2, listener3, listener4));

var expected = Map.of("channel1", 1L, "channel2", 2L, "channel3", 3L, "channel4", 0L);
assertEquals(expected, client.pubsubNumSub(ArrayUtils.addFirst(channels, "channel4")).get());
assertEquals(expected, listener1.pubsubNumSub(ArrayUtils.addFirst(channels, "channel4")).get());
Expand Down Expand Up @@ -1447,7 +1470,6 @@ public void pubsub_channels_and_numpat_and_numsub_in_transaction(boolean standal
patterns.stream().map(GlideString::gs).collect(Collectors.toSet()));

var listener = createClientWithSubscriptions(standalone, subscriptions);
clients.addAll(List.of(client, listener));

result =
standalone
Expand Down Expand Up @@ -1491,7 +1513,6 @@ public void pubsub_shard_channels() {

GlideClusterClient listener =
(GlideClusterClient) createClientWithSubscriptions(false, subscriptions);
clients.addAll(List.of(client, listener));

// test without pattern
assertEquals(channels, Set.of(client.pubsubShardChannels().get()));
Expand Down Expand Up @@ -1553,8 +1574,6 @@ public void pubsub_shardnumsub() {
GlideClusterClient listener3 =
(GlideClusterClient) createClientWithSubscriptions(false, subscriptions3);

clients.addAll(List.of(client, listener1, listener2, listener3));

var expected = Map.of("channel1", 1L, "channel2", 2L, "channel3", 3L, "channel4", 0L);
assertEquals(
expected, client.pubsubShardNumSub(ArrayUtils.addFirst(channels, "channel4")).get());
Expand Down

0 comments on commit c672c62

Please sign in to comment.