From 844a9bc7c1356e3a4301b7149b4f149dbe2fda3f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 23 Oct 2024 19:04:34 +0800 Subject: [PATCH] [fix][client] Fix producer/consumer stop to reconnect or Pub/Sub due to IO thread race-condition (#23499) (cherry picked from commit ff4a25e8f7827c10be9614fe064cdd5249379c8f) --- .../impl/SimpleProduceConsumeIoTest.java | 134 ++++++++++++++++++ .../pulsar/client/impl/ConnectionPool.java | 2 +- 2 files changed, 135 insertions(+), 1 deletion(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java new file mode 100644 index 0000000000000..4da3ce2573334 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +public class SimpleProduceConsumeIoTest extends ProducerConsumerBase { + + private PulsarClientImpl singleConnectionPerBrokerClient; + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + singleConnectionPerBrokerClient = (PulsarClientImpl) PulsarClient.builder().connectionsPerBroker(1) + .serviceUrl(lookupUrl.toString()).build(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + if (singleConnectionPerBrokerClient != null) { + singleConnectionPerBrokerClient.close(); + } + super.internalCleanup(); + } + + /** + * 1. Create a producer with a pooled connection. + * 2. When executing "producer.connectionOpened", the pooled connection has been closed due to a network issue. + * 3. Verify: the producer can be created successfully. + */ + @Test + public void testUnstableNetWorkWhenCreatingProducer() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(topic); + // Trigger a pooled connection creation. + ProducerImpl p = (ProducerImpl) singleConnectionPerBrokerClient.newProducer().topic(topic).create(); + ClientCnx cnx = p.getClientCnx(); + p.close(); + + // 1. Create a new producer with the pooled connection(since there is a pooled connection, the new producer + // will reuse it). + // 2. Trigger a network issue. + CountDownLatch countDownLatch = new CountDownLatch(1); + // A task for trigger network issue. + new Thread(() -> { + try { + countDownLatch.await(); + cnx.ctx().close(); + } catch (Exception ex) { + } + }).start(); + // Create a new producer with the pooled connection. + AtomicReference>> p2FutureWrap = new AtomicReference<>(); + new Thread(() -> { + ProducerBuilder producerBuilder = singleConnectionPerBrokerClient.newProducer().topic(topic); + ProducerConfigurationData producerConf = WhiteboxImpl.getInternalState(producerBuilder, "conf"); + CompletableFuture> p2Future = new CompletableFuture(); + p2FutureWrap.set(p2Future); + new ProducerImpl<>(singleConnectionPerBrokerClient, "public/default/tp1", producerConf, p2Future, + -1, Schema.BYTES, null, Optional.empty()) { + @Override + public CompletableFuture connectionOpened(final ClientCnx cnx) { + // Mock a network issue, and wait for the issue occurred. + countDownLatch.countDown(); + try { + Thread.sleep(1500); + } catch (InterruptedException e) { + } + // Call the real implementation. + return super.connectionOpened(cnx); + } + }; + }).start(); + + // Verify: the producer can be created successfully. + Awaitility.await().untilAsserted(() -> { + assertNotNull(p2FutureWrap.get()); + assertTrue(p2FutureWrap.get().isDone()); + }); + // Print log. + p2FutureWrap.get().exceptionally(ex -> { + log.error("Failed to create producer", ex); + return null; + }); + Awaitility.await().untilAsserted(() -> { + assertFalse(p2FutureWrap.get().isCompletedExceptionally()); + assertTrue("Ready".equals( + WhiteboxImpl.getInternalState(p2FutureWrap.get().join(), "state").toString())); + }); + + // Cleanup. + p2FutureWrap.get().join().close(); + admin.topics().delete(topic); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index a6a809af8585b..0f49b77b05765 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -275,7 +275,7 @@ public CompletableFuture getConnection(InetSocketAddress logicalAddre } // Try use exists connection. if (clientCnx.getIdleState().tryMarkUsingAndClearIdleTime()) { - return CompletableFuture.completedFuture(clientCnx); + return CompletableFuture.supplyAsync(() -> clientCnx, clientCnx.ctx().executor()); } else { // If connection already release, create a new one. pool.remove(key, completableFuture);