Skip to content

Commit

Permalink
[fix][broker] Support advertised listeners when gracefully transferri…
Browse files Browse the repository at this point in the history
…ng topics (ExtensibleLoadManagerImpl only) (apache#22862)
  • Loading branch information
heesung-sn authored Jun 7, 2024
1 parent 80d1cf9 commit 5af0595
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.ignoreUnrecoverableBKException;
import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.apache.pulsar.common.protocol.Commands.newCloseConsumer;
import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -71,6 +70,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
Expand All @@ -82,6 +82,7 @@
import org.apache.pulsar.broker.limiter.ConnectionController;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
Expand Down Expand Up @@ -149,6 +150,7 @@
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
Expand Down Expand Up @@ -3186,15 +3188,28 @@ public void closeProducer(Producer producer, Optional<BrokerLookupData> assigned
closeProducer(producer.getProducerId(), producer.getEpoch(), assignedBrokerLookupData);
}

private LookupData getLookupData(BrokerLookupData lookupData) {
LookupOptions.LookupOptionsBuilder builder = LookupOptions.builder();
if (StringUtils.isNotBlank((listenerName))) {
builder.advertisedListenerName(listenerName);
}
try {
return lookupData.toLookupResult(builder.build()).getLookupData();
} catch (PulsarServerException e) {
log.error("Failed to get lookup data", e);
throw new RuntimeException(e);
}
}

private void closeProducer(long producerId, long epoch, Optional<BrokerLookupData> assignedBrokerLookupData) {
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
if (assignedBrokerLookupData.isPresent()) {
writeAndFlush(Commands.newCloseProducer(producerId, -1L,
assignedBrokerLookupData.get().pulsarServiceUrl(),
assignedBrokerLookupData.get().pulsarServiceUrlTls()));
} else {
writeAndFlush(Commands.newCloseProducer(producerId, -1L));
}
assignedBrokerLookupData.ifPresentOrElse(lookup -> {
LookupData lookupData = getLookupData(lookup);
writeAndFlush(Commands.newCloseProducer(producerId, -1L,
lookupData.getBrokerUrl(),
lookupData.getBrokerUrlTls()));
},
() -> writeAndFlush(Commands.newCloseProducer(producerId, -1L)));

// The client does not necessarily know that the producer is closed, but the connection is still
// active, and there could be messages in flight already. We want to ignore these messages for a time
Expand All @@ -3220,9 +3235,13 @@ public void closeConsumer(Consumer consumer, Optional<BrokerLookupData> assigned

private void closeConsumer(long consumerId, Optional<BrokerLookupData> assignedBrokerLookupData) {
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
writeAndFlush(newCloseConsumer(consumerId, -1L,
assignedBrokerLookupData.map(BrokerLookupData::pulsarServiceUrl).orElse(null),
assignedBrokerLookupData.map(BrokerLookupData::pulsarServiceUrlTls).orElse(null)));
assignedBrokerLookupData.ifPresentOrElse(lookup -> {
LookupData lookupData = getLookupData(lookup);
writeAndFlush(Commands.newCloseConsumer(consumerId, -1L,
lookupData.getBrokerUrl(),
lookupData.getBrokerUrlTls()));
},
() -> writeAndFlush(Commands.newCloseConsumer(consumerId, -1L, null, null)));
} else {
close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import com.google.common.collect.Sets;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.reflect.FieldUtils;
Expand Down Expand Up @@ -65,7 +64,14 @@ protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) {
this.defaultTestNamespace = defaultTestNamespace;
}

protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
@Override
protected void doInitConf() throws Exception {
super.doInitConf();
updateConfig(conf);
}


protected ServiceConfiguration updateConfig(ServiceConfiguration conf) {
conf.setForceDeleteNamespaceAllowed(true);
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
Expand All @@ -79,10 +85,9 @@ protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
@Override
@BeforeClass(alwaysRun = true)
protected void setup() throws Exception {
initConfig(conf);
super.internalSetup(conf);
pulsar1 = pulsar;
var conf2 = initConfig(getDefaultConf());
var conf2 = updateConfig(getDefaultConf());
additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2);
pulsar2 = additionalPulsarTestContext.getPulsarService();

Expand Down Expand Up @@ -147,7 +152,7 @@ private void setSecondaryLoadManager() throws IllegalAccessException {
FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true);
}

protected CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
protected static CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
return pulsar.getNamespaceService().getBundleAsync(topic);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -434,6 +435,19 @@ public Object[][] isPersistentTopicSubscriptionTypeTest() {
@Test(timeOut = 30_000, dataProvider = "isPersistentTopicSubscriptionTypeTest")
public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, SubscriptionType subscriptionType)
throws Exception {
testTransferClientReconnectionWithoutLookup(topicDomain, subscriptionType, defaultTestNamespace, admin,
lookupUrl.toString(), pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager);
}

@Test(enabled = false)
public static void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain,
SubscriptionType subscriptionType,
String defaultTestNamespace,
PulsarAdmin admin, String brokerServiceUrl,
PulsarService pulsar1, PulsarService pulsar2,
ExtensibleLoadManager primaryLoadManager,
ExtensibleLoadManager secondaryLoadManager)
throws Exception {
var id = String.format("test-tx-client-reconnect-%s-%s", subscriptionType, UUID.randomUUID());
var topic = String.format("%s://%s/%s", topicDomain.toString(), defaultTestNamespace, id);
var topicName = TopicName.get(topic);
Expand All @@ -443,15 +457,16 @@ public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain,
var consumers = new ArrayList<Consumer<String>>();
try {
var lookups = new ArrayList<LookupService>();

var pulsarClient = pulsarClient(brokerServiceUrl, 0);
clients.add(pulsarClient);
@Cleanup
var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
lookups.add(spyLookupService(pulsarClient));

var consumerCount = subscriptionType == SubscriptionType.Exclusive ? 1 : 3;

for (int i = 0; i < consumerCount; i++) {
var client = newPulsarClient(lookupUrl.toString(), 0);
var client = pulsarClient(brokerServiceUrl, 0);
clients.add(client);
var consumer = client.newConsumer(Schema.STRING).
subscriptionName(id).
Expand All @@ -478,7 +493,7 @@ public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain,
dstBrokerUrl = pulsar1.getBrokerId();
dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl();
}
checkOwnershipState(broker, bundle);
checkOwnershipState(broker, bundle, primaryLoadManager, secondaryLoadManager, pulsar1);

var messageCountBeforeUnloading = 100;
var messageCountAfterUnloading = 100;
Expand Down Expand Up @@ -572,6 +587,17 @@ public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain,
@Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicSubscriptionTypeTest")
public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain,
SubscriptionType subscriptionType) throws Exception {
testUnloadClientReconnectionWithLookup(topicDomain, subscriptionType, defaultTestNamespace, admin,
lookupUrl.toString(), pulsar1);
}

@Test(enabled = false)
public static void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain,
SubscriptionType subscriptionType,
String defaultTestNamespace,
PulsarAdmin admin,
String brokerServiceUrl,
PulsarService pulsar1) throws Exception {
var id = String.format("test-unload-%s-client-reconnect-%s-%s",
topicDomain, subscriptionType, UUID.randomUUID());
var topic = String.format("%s://%s/%s", topicDomain, defaultTestNamespace, id);
Expand All @@ -580,6 +606,7 @@ public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain,
var consumers = new ArrayList<Consumer<String>>();
try {
@Cleanup
var pulsarClient = pulsarClient(brokerServiceUrl, 0);
var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();

var consumerCount = subscriptionType == SubscriptionType.Exclusive ? 1 : 3;
Expand Down Expand Up @@ -651,13 +678,26 @@ public Object[][] isPersistentTopicTest() {

@Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest")
public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws Exception {
testOptimizeUnloadDisable(topicDomain, defaultTestNamespace, admin, lookupUrl.toString(), pulsar1, pulsar2);
}

@Test(enabled = false)
public static void testOptimizeUnloadDisable(TopicDomain topicDomain,
String defaultTestNamespace,
PulsarAdmin admin,
String brokerServiceUrl,
PulsarService pulsar1,
PulsarService pulsar2) throws Exception {
var id = String.format("test-optimize-unload-disable-%s-%s", topicDomain, UUID.randomUUID());
var topic = String.format("%s://%s/%s", topicDomain, defaultTestNamespace, id);
var topicName = TopicName.get(topic);

pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(false);
pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(false);

@Cleanup
var pulsarClient = pulsarClient(brokerServiceUrl, 0);

@Cleanup
var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();

Expand Down Expand Up @@ -719,13 +759,16 @@ public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws Exception
verify(lookup, times(2)).getBroker(topicName);
}

private LookupService spyLookupService(PulsarClient client) throws IllegalAccessException {
protected static LookupService spyLookupService(PulsarClient client) throws IllegalAccessException {
LookupService svc = (LookupService) FieldUtils.readDeclaredField(client, "lookup", true);
var lookup = spy(svc);
FieldUtils.writeDeclaredField(client, "lookup", lookup, true);
return lookup;
}
private void checkOwnershipState(String broker, NamespaceBundle bundle)

protected static void checkOwnershipState(String broker, NamespaceBundle bundle,
ExtensibleLoadManager primaryLoadManager,
ExtensibleLoadManager secondaryLoadManager, PulsarService pulsar1)
throws ExecutionException, InterruptedException {
var targetLoadManager = secondaryLoadManager;
var otherLoadManager = primaryLoadManager;
Expand All @@ -737,6 +780,11 @@ private void checkOwnershipState(String broker, NamespaceBundle bundle)
assertFalse(otherLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
}

protected void checkOwnershipState(String broker, NamespaceBundle bundle)
throws ExecutionException, InterruptedException {
checkOwnershipState(broker, bundle, primaryLoadManager, secondaryLoadManager, pulsar1);
}

@Test(timeOut = 30 * 1000)
public void testSplitBundleAdminAPI() throws Exception {
final String namespace = "public/testSplitBundleAdminAPI";
Expand Down Expand Up @@ -1745,4 +1793,11 @@ public String name() {

}

protected static PulsarClient pulsarClient(String url, int intervalInSecs) throws PulsarClientException {
return
PulsarClient.builder()
.serviceUrl(url)
.statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions;

import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicDomain;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
* Unit test for {@link ExtensibleLoadManagerImpl with AdvertisedListeners broker configs}.
*/
@Slf4j
@Test(groups = "flaky")
@SuppressWarnings("unchecked")
public class ExtensibleLoadManagerImplWithAdvertisedListenersTest extends ExtensibleLoadManagerImplBaseTest {

public String brokerServiceUrl;
public ExtensibleLoadManagerImplWithAdvertisedListenersTest() {
super("public/test");
}

@Override
protected ServiceConfiguration updateConfig(ServiceConfiguration conf) {
super.updateConfig(conf);
int privatePulsarPort = nextLockedFreePort();
int publicPulsarPort = nextLockedFreePort();
conf.setInternalListenerName("internal");
conf.setBindAddresses("external:pulsar://localhost:" + publicPulsarPort);
conf.setAdvertisedListeners(
"external:pulsar://localhost:" + publicPulsarPort +
",internal:pulsar://localhost:" + privatePulsarPort);
conf.setWebServicePortTls(Optional.empty());
conf.setBrokerServicePortTls(Optional.empty());
conf.setBrokerServicePort(Optional.of(privatePulsarPort));
conf.setWebServicePort(Optional.of(0));
brokerServiceUrl = conf.getBindAddresses().replaceAll("external:", "");
return conf;
}

@DataProvider(name = "isPersistentTopicSubscriptionTypeTest")
public Object[][] isPersistentTopicSubscriptionTypeTest() {
return new Object[][]{
{TopicDomain.non_persistent, SubscriptionType.Exclusive},
{TopicDomain.persistent, SubscriptionType.Key_Shared}
};
}

@Test(timeOut = 30_000, dataProvider = "isPersistentTopicSubscriptionTypeTest")
public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, SubscriptionType subscriptionType)
throws Exception {
ExtensibleLoadManagerImplTest.testTransferClientReconnectionWithoutLookup(topicDomain, subscriptionType,
defaultTestNamespace, admin,
brokerServiceUrl,
pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager);
}

@Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicSubscriptionTypeTest")
public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain,
SubscriptionType subscriptionType) throws Exception {
ExtensibleLoadManagerImplTest.testUnloadClientReconnectionWithLookup(topicDomain, subscriptionType,
defaultTestNamespace, admin,
brokerServiceUrl,
pulsar1);
}

@DataProvider(name = "isPersistentTopicTest")
public Object[][] isPersistentTopicTest() {
return new Object[][]{{TopicDomain.persistent}, {TopicDomain.non_persistent}};
}

@Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest")
public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws Exception {
ExtensibleLoadManagerImplTest.testOptimizeUnloadDisable(topicDomain, defaultTestNamespace, admin,
brokerServiceUrl, pulsar1, pulsar2);
}

}
Loading

0 comments on commit 5af0595

Please sign in to comment.