Skip to content

Commit

Permalink
[fix][test] Fix flaky SubscriptionSeekTest.testSeekIsByReceive (#23170)
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored Aug 16, 2024
1 parent b6815d2 commit a1f3322
Showing 1 changed file with 30 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ protected void cleanup() throws Exception {
public void testSeek() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testSeek";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

// Disable pre-fetch in consumer to track the messages received
@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscription").receiverQueueSize(0).subscribe();

Expand Down Expand Up @@ -138,11 +140,13 @@ public void testSeek() throws Exception {

@Test
public void testSeekIsByReceive() throws PulsarClientException {
final String topicName = "persistent://prop/use/ns-abc/testSeek";
final String topicName = "persistent://prop/use/ns-abc/testSeekIsByReceive";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

String subscriptionName = "my-subscription";
@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName)
.subscribe();
Expand All @@ -164,6 +168,7 @@ public void testSeekForBatch() throws Exception {
final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch";
String subscriptionName = "my-subscription-batch";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true)
.batchingMaxMessages(3)
Expand All @@ -190,6 +195,7 @@ public void testSeekForBatch() throws Exception {
producer.close();


@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
Expand Down Expand Up @@ -220,6 +226,7 @@ public void testSeekForBatchMessageAndSpecifiedBatchIndex() throws Exception {
final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatchMessageAndSpecifiedBatchIndex";
String subscriptionName = "my-subscription-batch";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true)
.batchingMaxMessages(3)
Expand Down Expand Up @@ -264,6 +271,7 @@ public void testSeekForBatchMessageAndSpecifiedBatchIndex() throws Exception {
.serviceUrl(lookupUrl.toString())
.build();

@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = newPulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
Expand Down Expand Up @@ -300,6 +308,7 @@ public void testSeekForBatchByAdmin() throws PulsarClientException, ExecutionExc
final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatchByAdmin-" + UUID.randomUUID().toString();
String subscriptionName = "my-subscription-batch";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true)
.batchingMaxMessages(3)
Expand All @@ -325,7 +334,7 @@ public void testSeekForBatchByAdmin() throws PulsarClientException, ExecutionExc

producer.close();


@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
Expand Down Expand Up @@ -381,6 +390,7 @@ public void testConcurrentResetCursor() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testConcurrentReset_" + System.currentTimeMillis();
final String subscriptionName = "test-sub-name";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
Expand Down Expand Up @@ -430,6 +440,7 @@ public void testSeekOnPartitionedTopic() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testSeekPartitions";

admin.topics().createPartitionedTopic(topicName, 2);
@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscription").subscribe();

Expand All @@ -447,9 +458,11 @@ public void testSeekTime() throws Exception {
long resetTimeInMillis = TimeUnit.SECONDS
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

// Disable pre-fetch in consumer to track the messages received
@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscription").receiverQueueSize(0).subscribe();

Expand Down Expand Up @@ -483,6 +496,7 @@ public void testSeekTimeByFunction() throws Exception {
int msgNum = 20;
admin.topics().createPartitionedTopic(topicName, partitionNum);
creatProducerAndSendMsg(topicName, msgNum);
@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING).startMessageIdInclusive()
.topic(topicName).subscriptionName("my-sub").subscribe();
Expand Down Expand Up @@ -530,6 +544,7 @@ public void testSeekTimeOnPartitionedTopic() throws Exception {
long resetTimeInMillis = TimeUnit.SECONDS
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
admin.topics().createPartitionedTopic(topicName, partitions);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
// Disable pre-fetch in consumer to track the messages received
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
Expand Down Expand Up @@ -583,12 +598,14 @@ public void testSeekTimeOnPartitionedTopic() throws Exception {
public void testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek";
// Disable pre-fetch in consumer to track the messages received
@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscription")
.subscribe();

@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
Expand All @@ -615,20 +632,20 @@ public void testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek() t
for (Consumer consumer : consumers) {
assertFalse(connectedSinceSet.contains(consumer.getStats().getConnectedSince()));
}
consumer1.close();
consumer2.close();
}

@Test
public void testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek";
// Disable pre-fetch in consumer to track the messages received
@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Failover)
.subscriptionName("my-subscription")
.subscribe();

@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Failover)
Expand Down Expand Up @@ -668,11 +685,13 @@ public void testSeekByFunction() throws Exception {
int msgNum = 160;
admin.topics().createPartitionedTopic(topicName, partitionNum);
creatProducerAndSendMsg(topicName, msgNum);
@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING).startMessageIdInclusive()
.topic(topicName).subscriptionName("my-sub").subscribe();

TopicName partitionedTopic = TopicName.get(topicName);
@Cleanup
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.startMessageId(MessageId.earliest)
.topic(partitionedTopic.getPartition(0).toString()).create();
Expand Down Expand Up @@ -721,12 +740,11 @@ public void testSeekByFunction() throws Exception {
for (MessageId messageId : msgNotIn) {
assertFalse(received.contains(messageId));
}
reader.close();
consumer.close();
}

private List<MessageId> creatProducerAndSendMsg(String topic, int msgNum) throws Exception {
List<MessageId> messageIds = new ArrayList<>();
@Cleanup
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING)
.enableBatching(false)
Expand All @@ -735,7 +753,6 @@ private List<MessageId> creatProducerAndSendMsg(String topic, int msgNum) throws
for (int i = 0; i < msgNum; i++) {
messageIds.add(producer.send("msg" + i));
}
producer.close();
return messageIds;
}

Expand All @@ -756,6 +773,7 @@ public void testSeekByFunctionAndMultiTopic() throws Exception {
MessageId msgIdInTopic2Partition0 = admin.topics().getLastMessageId(topic2.getPartition(0).toString());
MessageId msgIdInTopic2Partition2 = admin.topics().getLastMessageId(topic2.getPartition(2).toString());

@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING).startMessageIdInclusive()
.topics(Arrays.asList(topicName, topicName2)).subscriptionName("my-sub").subscribe();
Expand Down Expand Up @@ -796,6 +814,7 @@ public void testSeekWillNotEncounteredFencedError() throws Exception {
// Create a pulsar client with a subscription fenced counter.
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
AtomicInteger receivedFencedErrorCounter = new AtomicInteger();
@Cleanup
PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
protected void handleError(CommandError error) {
Expand All @@ -807,10 +826,13 @@ protected void handleError(CommandError error) {
});

// publish some messages.
@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("s1")
.subscribe();

@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topicName).create();
MessageIdImpl msgId1 = (MessageIdImpl) producer.send("0");
Expand Down Expand Up @@ -850,6 +872,7 @@ protected void handleError(CommandError error) {
public void testExceptionBySeekFunction() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
creatProducerAndSendMsg(topicName,10);
@Cleanup
org.apache.pulsar.client.api.Consumer consumer = pulsarClient
.newConsumer()
.topic(topicName).subscriptionName("my-sub").subscribe();
Expand Down

0 comments on commit a1f3322

Please sign in to comment.