Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] Check max producers/consumers limitation first before other ops to save resources #23074

Merged
merged 7 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,18 @@ private PublishRate publishRateInBroker(ServiceConfiguration config) {
return new PublishRate(config.getMaxPublishRatePerTopicInMessages(), config.getMaxPublishRatePerTopicInBytes());
}

public boolean isProducersExceeded(String producerName) {
String replicatorPrefix = brokerService.getPulsar().getConfig().getReplicatorPrefix() + ".";
boolean isRemote = producerName.startsWith(replicatorPrefix);
return isProducersExceeded(isRemote);
}

protected boolean isProducersExceeded(Producer producer) {
if (isSystemTopic() || producer.isRemote()) {
return isProducersExceeded(producer.isRemote());
}

protected boolean isProducersExceeded(boolean isRemote) {
if (isSystemTopic() || isRemote) {
return false;
}
Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
Expand Down Expand Up @@ -536,7 +546,7 @@ public int getNumberOfSameAddressProducers(final String clientAddress) {
return count;
}

protected boolean isConsumersExceededOnTopic() {
public boolean isConsumersExceededOnTopic() {
if (isSystemTopic()) {
return false;
}
Expand Down Expand Up @@ -973,12 +983,6 @@ protected void checkTopicFenced() throws BrokerServiceException {
}

protected CompletableFuture<Void> internalAddProducer(Producer producer) {
if (isProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException(
"Topic '" + topic + "' reached max producers limit"));
}

if (isSameAddressProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic);
return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,13 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
new SubscriptionNotFoundException(
"Subscription does not exist"));
}
if (((AbstractTopic) topic).isConsumersExceededOnTopic()) {
gaoran10 marked this conversation as resolved.
Show resolved Hide resolved
log.warn("[{}] Attempting to add consumer to topic which reached max"
+ " consumers limit", topic);
Throwable t =
new ConsumerBusyException("Topic reached max consumers limit");
return FutureUtil.failedFuture(t);
}

SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
.subscriptionName(subscriptionName)
Expand Down Expand Up @@ -1545,6 +1552,18 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}

service.getOrCreateTopic(topicName.toString()).thenCompose((Topic topic) -> {
// Check max producer limitation to avoid unnecessary ops wasting resources. For example: the new
// producer reached max producer limitation, but pulsar did schema check first, it would waste CPU
if (((AbstractTopic) topic).isProducersExceeded(producerName)) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
String errorMsg = "Topic '" + topicName.toString() + "' reached max producers limit";
Throwable t = new BrokerServiceException.ProducerBusyException(errorMsg);
producerFuture.completeExceptionally(t);
producers.remove(producerId, producerFuture);
commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(t), errorMsg);
gaoran10 marked this conversation as resolved.
Show resolved Hide resolved
return CompletableFuture.completedFuture(null);
}

// Before creating producer, check if backlog quota exceeded
// on topic for size based limit and time based limit
CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -52,6 +54,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.NotAcceptableException;
import javax.ws.rs.core.Response.Status;
import lombok.AllArgsConstructor;
Expand All @@ -70,6 +73,7 @@
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -127,7 +131,13 @@
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -2870,49 +2880,80 @@ public void testMaxProducersPerTopicUnlimited() throws Exception {
final String myNamespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(myNamespace, Set.of("test"));
final String topic = "persistent://" + myNamespace + "/testMaxProducersPerTopicUnlimited";
admin.topics().createNonPartitionedTopic(topic);
AtomicInteger schemaOpsCounter = injectSchemaCheckCounterForTopic(topic);
//the policy is set to 0, so there will be no restrictions
admin.namespaces().setMaxProducersPerTopic(myNamespace, 0);
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 0);
List<Producer<byte[]>> producers = new ArrayList<>();
List<Producer<String>> producers = new ArrayList<>();
for (int i = 0; i < maxProducersPerTopic + 1; i++) {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
producers.add(producer);
}
assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 1);

admin.namespaces().removeMaxProducersPerTopic(myNamespace);
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == null);

try {
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
fail("should fail");
} catch (PulsarClientException e) {
String expectMsg = "Topic '" + topic + "' reached max producers limit";
assertTrue(e.getMessage().contains(expectMsg));
assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 1);
}
//set the limit to 3
admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 3);
// should success
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
producers.add(producer);
assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 2);
try {
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic).create();
fail("should fail");
} catch (PulsarClientException e) {
String expectMsg = "Topic '" + topic + "' reached max producers limit";
assertTrue(e.getMessage().contains(expectMsg));
assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 2);
}

//clean up
for (Producer<byte[]> tempProducer : producers) {
for (Producer<String> tempProducer : producers) {
tempProducer.close();
}
}

private AtomicInteger injectSchemaCheckCounterForTopic(String topicName) {
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
WhiteboxImpl.getInternalState(pulsar.getBrokerService(), "topics");
AbstractTopic topic = (AbstractTopic) topics.get(topicName).join().get();
AbstractTopic spyTopic = Mockito.spy(topic);
AtomicInteger counter = new AtomicInteger();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
counter.incrementAndGet();
return invocation.callRealMethod();
}
}).when(spyTopic).addSchema(any(SchemaData.class));
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
counter.incrementAndGet();
return invocation.callRealMethod();
}
}).when(spyTopic).addSchemaIfIdleOrCheckCompatible(any(SchemaData.class));
topics.put(topicName, CompletableFuture.completedFuture(Optional.of(spyTopic)));
return counter;
}

@Test
public void testMaxConsumersPerTopicUnlimited() throws Exception {
restartClusterAfterTest();
Expand All @@ -2924,49 +2965,55 @@ public void testMaxConsumersPerTopicUnlimited() throws Exception {
final String myNamespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(myNamespace, Set.of("test"));
final String topic = "persistent://" + myNamespace + "/testMaxConsumersPerTopicUnlimited";
admin.topics().createNonPartitionedTopic(topic);
AtomicInteger schemaOpsCounter = injectSchemaCheckCounterForTopic(topic);

assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace));
//the policy is set to 0, so there will be no restrictions
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0);
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0);
List<Consumer<byte[]>> consumers = new ArrayList<>();
List<Consumer<String>> consumers = new ArrayList<>();
for (int i = 0; i < maxConsumersPerTopic + 1; i++) {
Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
consumers.add(consumer);
}
assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 2);

admin.namespaces().removeMaxConsumersPerTopic(myNamespace);
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == null);
try {
@Cleanup
Consumer<byte[]> subscribe =
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
Consumer<String> subscribe = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Topic reached max consumers limit"));
assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 2);
}
//set the limit to 3
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 3);
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 3);
// should success
Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
consumers.add(consumer);
assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 3);
try {
@Cleanup
Consumer<byte[]> subscribe =
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
Consumer<String> subscribe = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Topic reached max consumers limit"));
assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 3);
}

//clean up
for (Consumer<byte[]> subConsumer : consumers) {
for (Consumer<String> subConsumer : consumers) {
subConsumer.close();
}
}
Expand Down
Loading