Skip to content

Commit

Permalink
[fix][test] Fix ExtensibleLoadManager flaky integration test (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 authored Dec 26, 2023
1 parent 1d83b84 commit 0dd1672
Showing 1 changed file with 42 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.tests.integration.loadbalance;

import static org.apache.pulsar.tests.integration.containers.PulsarContainer.BROKER_HTTP_PORT;
import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand All @@ -28,7 +27,6 @@
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -78,7 +76,6 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
.clusterName(clusterName)
.numBrokers(NUM_BROKERS).build();
private PulsarCluster pulsarCluster = null;
private List<String> brokerUrls = null;
private String hosts;
private PulsarAdmin admin;

Expand All @@ -97,10 +94,8 @@ public void setup() throws Exception {
spec.brokerEnvs(brokerEnvs);
pulsarCluster = PulsarCluster.forSpec(spec);
pulsarCluster.start();
brokerUrls = brokerUrls();

hosts = pulsarCluster.getAllBrokersHttpServiceUrl();
admin = PulsarAdmin.builder().serviceHttpUrl(hosts).build();
admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
// all brokers alive
assertEquals(admin.brokers().getActiveBrokers(clusterName).size(), NUM_BROKERS);

Expand Down Expand Up @@ -138,10 +133,8 @@ public void testConcurrentLookups() throws Exception {
String topicName = "persistent://" + DEFAULT_NAMESPACE + "/testConcurrentLookups";
List<PulsarAdmin> admins = new ArrayList<>();
int numAdminForBroker = 10;
for (String url : brokerUrls) {
for (int i = 0; i < numAdminForBroker; i++) {
admins.add(PulsarAdmin.builder().serviceHttpUrl(url).build());
}
for (int i = 0; i < numAdminForBroker; i++) {
admins.add(PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build());
}

admin.topics().createPartitionedTopic(topicName, 100);
Expand Down Expand Up @@ -174,7 +167,7 @@ public void testConcurrentLookups() throws Exception {
@Test(timeOut = 30 * 1000)
public void testTransferAdminApi() throws Exception {
String topicName = "persistent://" + DEFAULT_NAMESPACE + "/testUnloadAdminApi";
admin.topics().createNonPartitionedTopic(topicName);
createNonPartitionedTopicAndRetry(topicName);
String broker = admin.lookups().lookupTopic(topicName);

int index = extractBrokerIndex(broker);
Expand Down Expand Up @@ -203,7 +196,7 @@ public void testTransferAdminApi() throws Exception {
@Test(timeOut = 30 * 1000)
public void testSplitBundleAdminApi() throws Exception {
String topicName = "persistent://" + DEFAULT_NAMESPACE + "/testSplitBundleAdminApi";
admin.topics().createNonPartitionedTopic(topicName);
createNonPartitionedTopicAndRetry(topicName);
String broker = admin.lookups().lookupTopic(topicName);
log.info("The topic: {} owned by {}", topicName, broker);
BundlesData bundles = admin.namespaces().getBundles(DEFAULT_NAMESPACE);
Expand Down Expand Up @@ -254,7 +247,7 @@ public void testDeleteNamespace() throws Exception {
public void testStopBroker() throws Exception {
String topicName = "persistent://" + DEFAULT_NAMESPACE + "/test-stop-broker-topic";

admin.topics().createNonPartitionedTopic(topicName);
createNonPartitionedTopicAndRetry(topicName);
String broker = admin.lookups().lookupTopic(topicName);
log.info("The topic: {} owned by: {}", topicName, broker);

Expand All @@ -271,7 +264,7 @@ public void testStopBroker() throws Exception {
assertNotEquals(broker1, broker);
}

@Test(timeOut = 40 * 1000)
@Test(timeOut = 80 * 1000)
public void testAntiaffinityPolicy() throws PulsarAdminException {
final String namespaceAntiAffinityGroup = "my-anti-affinity-filter";
final String antiAffinityEnabledNameSpace = DEFAULT_TENANT + "/my-ns-filter" + nsSuffix;
Expand Down Expand Up @@ -308,7 +301,7 @@ public void testAntiaffinityPolicy() throws PulsarAdminException {
assertEquals(result.size(), NUM_BROKERS);
}

@Test(timeOut = 40 * 1000)
@Test(timeOut = 240 * 1000)
public void testIsolationPolicy() throws Exception {
final String namespaceIsolationPolicyName = "my-isolation-policy";
final String isolationEnabledNameSpace = DEFAULT_TENANT + "/my-isolation-policy" + nsSuffix;
Expand Down Expand Up @@ -345,13 +338,10 @@ public void testIsolationPolicy() throws Exception {
}

final String topic = "persistent://" + isolationEnabledNameSpace + "/topic";
try {
admin.topics().createNonPartitionedTopic(topic);
} catch (PulsarAdminException.ConflictException e) {
//expected when retried
}
createNonPartitionedTopicAndRetry(topic);

String broker = admin.lookups().lookupTopic(topic);
assertEquals(extractBrokerIndex(broker), 0);

for (BrokerContainer container : pulsarCluster.getBrokers()) {
String name = container.getHostName();
Expand All @@ -360,20 +350,31 @@ public void testIsolationPolicy() throws Exception {
}
}

assertEquals(extractBrokerIndex(broker), 0);

broker = admin.lookups().lookupTopic(topic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), 2);
}
);

final String brokerName = broker;
retryStrategically((test) -> extractBrokerIndex(brokerName) == 1, 100, 200);
assertEquals(extractBrokerIndex(broker), 1);
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
String ownerBroker = admin.lookups().lookupTopic(topic);
assertEquals(extractBrokerIndex(ownerBroker), 1);
});

for (BrokerContainer container : pulsarCluster.getBrokers()) {
String name = container.getHostName();
if (name.contains("1")) {
container.stop();
}
}

Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), 1);
}
);
try {
admin.lookups().lookupTopic(topic);
fail();
Expand All @@ -384,6 +385,21 @@ public void testIsolationPolicy() throws Exception {
}
}

private void createNonPartitionedTopicAndRetry(String topicName) throws Exception {
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
try {
admin.topics().createNonPartitionedTopic(topicName);
return true;
} catch (PulsarAdminException.ConflictException e) {
return true;
//expected when retried
} catch (Exception e) {
log.error("Failed to create topic: ", e);
return false;
}
});
}

private String getBrokerUrl(int index) {
return String.format("pulsar-broker-%d:%d", index, BROKER_HTTP_PORT);
}
Expand Down Expand Up @@ -412,13 +428,4 @@ private int generateRandomExcludingX(int n, int x) {

return randomNumber;
}

private List<String> brokerUrls() {
Collection<BrokerContainer> brokers = pulsarCluster.getBrokers();
List<String> brokerUrls = new ArrayList<>(NUM_BROKERS);
brokers.forEach(broker -> {
brokerUrls.add("http://" + broker.getHost() + ":" + broker.getMappedPort(BROKER_HTTP_PORT));
});
return brokerUrls;
}
}

0 comments on commit 0dd1672

Please sign in to comment.