diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java index bbb92f6a6cf4c..64a05503e1970 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.tests.integration.loadbalance; import static org.apache.pulsar.tests.integration.containers.PulsarContainer.BROKER_HTTP_PORT; -import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically; import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -28,7 +27,6 @@ import static org.testng.Assert.fail; import com.google.common.collect.Sets; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -78,7 +76,6 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport { .clusterName(clusterName) .numBrokers(NUM_BROKERS).build(); private PulsarCluster pulsarCluster = null; - private List brokerUrls = null; private String hosts; private PulsarAdmin admin; @@ -97,10 +94,8 @@ public void setup() throws Exception { spec.brokerEnvs(brokerEnvs); pulsarCluster = PulsarCluster.forSpec(spec); pulsarCluster.start(); - brokerUrls = brokerUrls(); - hosts = pulsarCluster.getAllBrokersHttpServiceUrl(); - admin = PulsarAdmin.builder().serviceHttpUrl(hosts).build(); + admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build(); // all brokers alive assertEquals(admin.brokers().getActiveBrokers(clusterName).size(), NUM_BROKERS); @@ -138,10 +133,8 @@ public void testConcurrentLookups() throws Exception { String topicName = "persistent://" + DEFAULT_NAMESPACE + "/testConcurrentLookups"; List admins = new ArrayList<>(); int numAdminForBroker = 10; - for (String url : brokerUrls) { - for (int i = 0; i < numAdminForBroker; i++) { - admins.add(PulsarAdmin.builder().serviceHttpUrl(url).build()); - } + for (int i = 0; i < numAdminForBroker; i++) { + admins.add(PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()); } admin.topics().createPartitionedTopic(topicName, 100); @@ -174,7 +167,7 @@ public void testConcurrentLookups() throws Exception { @Test(timeOut = 30 * 1000) public void testTransferAdminApi() throws Exception { String topicName = "persistent://" + DEFAULT_NAMESPACE + "/testUnloadAdminApi"; - admin.topics().createNonPartitionedTopic(topicName); + createNonPartitionedTopicAndRetry(topicName); String broker = admin.lookups().lookupTopic(topicName); int index = extractBrokerIndex(broker); @@ -203,7 +196,7 @@ public void testTransferAdminApi() throws Exception { @Test(timeOut = 30 * 1000) public void testSplitBundleAdminApi() throws Exception { String topicName = "persistent://" + DEFAULT_NAMESPACE + "/testSplitBundleAdminApi"; - admin.topics().createNonPartitionedTopic(topicName); + createNonPartitionedTopicAndRetry(topicName); String broker = admin.lookups().lookupTopic(topicName); log.info("The topic: {} owned by {}", topicName, broker); BundlesData bundles = admin.namespaces().getBundles(DEFAULT_NAMESPACE); @@ -254,7 +247,7 @@ public void testDeleteNamespace() throws Exception { public void testStopBroker() throws Exception { String topicName = "persistent://" + DEFAULT_NAMESPACE + "/test-stop-broker-topic"; - admin.topics().createNonPartitionedTopic(topicName); + createNonPartitionedTopicAndRetry(topicName); String broker = admin.lookups().lookupTopic(topicName); log.info("The topic: {} owned by: {}", topicName, broker); @@ -271,7 +264,7 @@ public void testStopBroker() throws Exception { assertNotEquals(broker1, broker); } - @Test(timeOut = 40 * 1000) + @Test(timeOut = 80 * 1000) public void testAntiaffinityPolicy() throws PulsarAdminException { final String namespaceAntiAffinityGroup = "my-anti-affinity-filter"; final String antiAffinityEnabledNameSpace = DEFAULT_TENANT + "/my-ns-filter" + nsSuffix; @@ -308,7 +301,7 @@ public void testAntiaffinityPolicy() throws PulsarAdminException { assertEquals(result.size(), NUM_BROKERS); } - @Test(timeOut = 40 * 1000) + @Test(timeOut = 240 * 1000) public void testIsolationPolicy() throws Exception { final String namespaceIsolationPolicyName = "my-isolation-policy"; final String isolationEnabledNameSpace = DEFAULT_TENANT + "/my-isolation-policy" + nsSuffix; @@ -345,13 +338,10 @@ public void testIsolationPolicy() throws Exception { } final String topic = "persistent://" + isolationEnabledNameSpace + "/topic"; - try { - admin.topics().createNonPartitionedTopic(topic); - } catch (PulsarAdminException.ConflictException e) { - //expected when retried - } + createNonPartitionedTopicAndRetry(topic); String broker = admin.lookups().lookupTopic(topic); + assertEquals(extractBrokerIndex(broker), 0); for (BrokerContainer container : pulsarCluster.getBrokers()) { String name = container.getHostName(); @@ -360,13 +350,17 @@ public void testIsolationPolicy() throws Exception { } } - assertEquals(extractBrokerIndex(broker), 0); - - broker = admin.lookups().lookupTopic(topic); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted( + () -> { + List activeBrokers = admin.brokers().getActiveBrokers(); + assertEquals(activeBrokers.size(), 2); + } + ); - final String brokerName = broker; - retryStrategically((test) -> extractBrokerIndex(brokerName) == 1, 100, 200); - assertEquals(extractBrokerIndex(broker), 1); + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + String ownerBroker = admin.lookups().lookupTopic(topic); + assertEquals(extractBrokerIndex(ownerBroker), 1); + }); for (BrokerContainer container : pulsarCluster.getBrokers()) { String name = container.getHostName(); @@ -374,6 +368,13 @@ public void testIsolationPolicy() throws Exception { container.stop(); } } + + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted( + () -> { + List activeBrokers = admin.brokers().getActiveBrokers(); + assertEquals(activeBrokers.size(), 1); + } + ); try { admin.lookups().lookupTopic(topic); fail(); @@ -384,6 +385,21 @@ public void testIsolationPolicy() throws Exception { } } + private void createNonPartitionedTopicAndRetry(String topicName) throws Exception { + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { + try { + admin.topics().createNonPartitionedTopic(topicName); + return true; + } catch (PulsarAdminException.ConflictException e) { + return true; + //expected when retried + } catch (Exception e) { + log.error("Failed to create topic: ", e); + return false; + } + }); + } + private String getBrokerUrl(int index) { return String.format("pulsar-broker-%d:%d", index, BROKER_HTTP_PORT); } @@ -412,13 +428,4 @@ private int generateRandomExcludingX(int n, int x) { return randomNumber; } - - private List brokerUrls() { - Collection brokers = pulsarCluster.getBrokers(); - List brokerUrls = new ArrayList<>(NUM_BROKERS); - brokers.forEach(broker -> { - brokerUrls.add("http://" + broker.getHost() + ":" + broker.getMappedPort(BROKER_HTTP_PORT)); - }); - return brokerUrls; - } }