From a1f3322ed358ab6841f0d3e43f2afcc54788b887 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 16 Aug 2024 10:40:28 +0300 Subject: [PATCH] [fix][test] Fix flaky SubscriptionSeekTest.testSeekIsByReceive (#23170) --- .../broker/service/SubscriptionSeekTest.java | 37 +++++++++++++++---- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index 3fc795a8c3e2a..582d10294a5a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -86,9 +86,11 @@ protected void cleanup() throws Exception { public void testSeek() throws Exception { final String topicName = "persistent://prop/use/ns-abc/testSeek"; + @Cleanup Producer producer = pulsarClient.newProducer().topic(topicName).create(); // Disable pre-fetch in consumer to track the messages received + @Cleanup org.apache.pulsar.client.api.Consumer consumer = pulsarClient.newConsumer().topic(topicName) .subscriptionName("my-subscription").receiverQueueSize(0).subscribe(); @@ -138,11 +140,13 @@ public void testSeek() throws Exception { @Test public void testSeekIsByReceive() throws PulsarClientException { - final String topicName = "persistent://prop/use/ns-abc/testSeek"; + final String topicName = "persistent://prop/use/ns-abc/testSeekIsByReceive"; + @Cleanup Producer producer = pulsarClient.newProducer().topic(topicName).create(); String subscriptionName = "my-subscription"; + @Cleanup org.apache.pulsar.client.api.Consumer consumer = pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriptionName) .subscribe(); @@ -164,6 +168,7 @@ public void testSeekForBatch() throws Exception { final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch"; String subscriptionName = "my-subscription-batch"; + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) .enableBatching(true) .batchingMaxMessages(3) @@ -190,6 +195,7 @@ public void testSeekForBatch() throws Exception { producer.close(); + @Cleanup org.apache.pulsar.client.api.Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topicName) .subscriptionName(subscriptionName) @@ -220,6 +226,7 @@ public void testSeekForBatchMessageAndSpecifiedBatchIndex() throws Exception { final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatchMessageAndSpecifiedBatchIndex"; String subscriptionName = "my-subscription-batch"; + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) .enableBatching(true) .batchingMaxMessages(3) @@ -264,6 +271,7 @@ public void testSeekForBatchMessageAndSpecifiedBatchIndex() throws Exception { .serviceUrl(lookupUrl.toString()) .build(); + @Cleanup org.apache.pulsar.client.api.Consumer consumer = newPulsarClient.newConsumer(Schema.STRING) .topic(topicName) .subscriptionName(subscriptionName) @@ -300,6 +308,7 @@ public void testSeekForBatchByAdmin() throws PulsarClientException, ExecutionExc final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatchByAdmin-" + UUID.randomUUID().toString(); String subscriptionName = "my-subscription-batch"; + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) .enableBatching(true) .batchingMaxMessages(3) @@ -325,7 +334,7 @@ public void testSeekForBatchByAdmin() throws PulsarClientException, ExecutionExc producer.close(); - + @Cleanup org.apache.pulsar.client.api.Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topicName) .subscriptionName(subscriptionName) @@ -381,6 +390,7 @@ public void testConcurrentResetCursor() throws Exception { final String topicName = "persistent://prop/use/ns-abc/testConcurrentReset_" + System.currentTimeMillis(); final String subscriptionName = "test-sub-name"; + @Cleanup Producer producer = pulsarClient.newProducer().topic(topicName).create(); admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); @@ -430,6 +440,7 @@ public void testSeekOnPartitionedTopic() throws Exception { final String topicName = "persistent://prop/use/ns-abc/testSeekPartitions"; admin.topics().createPartitionedTopic(topicName, 2); + @Cleanup org.apache.pulsar.client.api.Consumer consumer = pulsarClient.newConsumer().topic(topicName) .subscriptionName("my-subscription").subscribe(); @@ -447,9 +458,11 @@ public void testSeekTime() throws Exception { long resetTimeInMillis = TimeUnit.SECONDS .toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr)); + @Cleanup Producer producer = pulsarClient.newProducer().topic(topicName).create(); // Disable pre-fetch in consumer to track the messages received + @Cleanup org.apache.pulsar.client.api.Consumer consumer = pulsarClient.newConsumer().topic(topicName) .subscriptionName("my-subscription").receiverQueueSize(0).subscribe(); @@ -483,6 +496,7 @@ public void testSeekTimeByFunction() throws Exception { int msgNum = 20; admin.topics().createPartitionedTopic(topicName, partitionNum); creatProducerAndSendMsg(topicName, msgNum); + @Cleanup org.apache.pulsar.client.api.Consumer consumer = pulsarClient .newConsumer(Schema.STRING).startMessageIdInclusive() .topic(topicName).subscriptionName("my-sub").subscribe(); @@ -530,6 +544,7 @@ public void testSeekTimeOnPartitionedTopic() throws Exception { long resetTimeInMillis = TimeUnit.SECONDS .toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr)); admin.topics().createPartitionedTopic(topicName, partitions); + @Cleanup Producer producer = pulsarClient.newProducer().topic(topicName).create(); // Disable pre-fetch in consumer to track the messages received org.apache.pulsar.client.api.Consumer consumer = pulsarClient.newConsumer().topic(topicName) @@ -583,12 +598,14 @@ public void testSeekTimeOnPartitionedTopic() throws Exception { public void testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek() throws Exception { final String topicName = "persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek"; // Disable pre-fetch in consumer to track the messages received + @Cleanup org.apache.pulsar.client.api.Consumer consumer1 = pulsarClient.newConsumer() .topic(topicName) .subscriptionType(SubscriptionType.Shared) .subscriptionName("my-subscription") .subscribe(); + @Cleanup org.apache.pulsar.client.api.Consumer consumer2 = pulsarClient.newConsumer() .topic(topicName) .subscriptionType(SubscriptionType.Shared) @@ -615,20 +632,20 @@ public void testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek() t for (Consumer consumer : consumers) { assertFalse(connectedSinceSet.contains(consumer.getStats().getConnectedSince())); } - consumer1.close(); - consumer2.close(); } @Test public void testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek() throws Exception { final String topicName = "persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek"; // Disable pre-fetch in consumer to track the messages received + @Cleanup org.apache.pulsar.client.api.Consumer consumer1 = pulsarClient.newConsumer() .topic(topicName) .subscriptionType(SubscriptionType.Failover) .subscriptionName("my-subscription") .subscribe(); + @Cleanup org.apache.pulsar.client.api.Consumer consumer2 = pulsarClient.newConsumer() .topic(topicName) .subscriptionType(SubscriptionType.Failover) @@ -668,11 +685,13 @@ public void testSeekByFunction() throws Exception { int msgNum = 160; admin.topics().createPartitionedTopic(topicName, partitionNum); creatProducerAndSendMsg(topicName, msgNum); + @Cleanup org.apache.pulsar.client.api.Consumer consumer = pulsarClient .newConsumer(Schema.STRING).startMessageIdInclusive() .topic(topicName).subscriptionName("my-sub").subscribe(); TopicName partitionedTopic = TopicName.get(topicName); + @Cleanup Reader reader = pulsarClient.newReader(Schema.STRING) .startMessageId(MessageId.earliest) .topic(partitionedTopic.getPartition(0).toString()).create(); @@ -721,12 +740,11 @@ public void testSeekByFunction() throws Exception { for (MessageId messageId : msgNotIn) { assertFalse(received.contains(messageId)); } - reader.close(); - consumer.close(); } private List creatProducerAndSendMsg(String topic, int msgNum) throws Exception { List messageIds = new ArrayList<>(); + @Cleanup Producer producer = pulsarClient .newProducer(Schema.STRING) .enableBatching(false) @@ -735,7 +753,6 @@ private List creatProducerAndSendMsg(String topic, int msgNum) throws for (int i = 0; i < msgNum; i++) { messageIds.add(producer.send("msg" + i)); } - producer.close(); return messageIds; } @@ -756,6 +773,7 @@ public void testSeekByFunctionAndMultiTopic() throws Exception { MessageId msgIdInTopic2Partition0 = admin.topics().getLastMessageId(topic2.getPartition(0).toString()); MessageId msgIdInTopic2Partition2 = admin.topics().getLastMessageId(topic2.getPartition(2).toString()); + @Cleanup org.apache.pulsar.client.api.Consumer consumer = pulsarClient .newConsumer(Schema.STRING).startMessageIdInclusive() .topics(Arrays.asList(topicName, topicName2)).subscriptionName("my-sub").subscribe(); @@ -796,6 +814,7 @@ public void testSeekWillNotEncounteredFencedError() throws Exception { // Create a pulsar client with a subscription fenced counter. ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); AtomicInteger receivedFencedErrorCounter = new AtomicInteger(); + @Cleanup PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { protected void handleError(CommandError error) { @@ -807,10 +826,13 @@ protected void handleError(CommandError error) { }); // publish some messages. + @Cleanup org.apache.pulsar.client.api.Consumer consumer = client.newConsumer(Schema.STRING) .topic(topicName) .subscriptionName("s1") .subscribe(); + + @Cleanup Producer producer = client.newProducer(Schema.STRING) .topic(topicName).create(); MessageIdImpl msgId1 = (MessageIdImpl) producer.send("0"); @@ -850,6 +872,7 @@ protected void handleError(CommandError error) { public void testExceptionBySeekFunction() throws Exception { final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID(); creatProducerAndSendMsg(topicName,10); + @Cleanup org.apache.pulsar.client.api.Consumer consumer = pulsarClient .newConsumer() .topic(topicName).subscriptionName("my-sub").subscribe();