Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix namespace unload might be blocked too long with extensible load manager #23433

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,11 @@ public void healthCheck(@Suspended AsyncResponse asyncResponse,
asyncResponse.resume(Response.ok("ok").build());
}).exceptionally(ex -> {
if (!isRedirectException(ex)) {
LOG.error("[{}] Fail to run health check.", clientAppId(), ex);
if (isNotFoundException(ex)) {
LOG.warn("[{}] Failed to run health check: {}", clientAppId(), ex.getMessage());
} else {
LOG.error("[{}] Failed to run health check.", clientAppId(), ex);
}
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
Expand Down Expand Up @@ -1291,6 +1292,14 @@ private void handleBrokerCreationEvent(String broker) {
broker, cleanupJobs.size());
}
}
})
.exceptionally(e -> {
if (FutureUtil.unwrapCompletionException(e) instanceof PulsarAdminException.NotFoundException) {
log.warn("{} Failed to run health check: {}", broker, e.getMessage());
} else {
log.error("{} Failed to run health check", broker, e);
}
return null;
});
}
}
Expand Down Expand Up @@ -1323,12 +1332,19 @@ private void handleBrokerDeletionEvent(String broker) {
}
}

private boolean channelDisabled() {
final var channelState = this.channelState;
if (channelState == Disabled || channelState == Closed) {
log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to make this log generic or remove it as it is used in multiple places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's right, I'll change it in the new PR

return true;
}
return false;
}

private void scheduleCleanup(String broker, long delayInSecs) {
var scheduled = new MutableObject<CompletableFuture<Void>>();
try {
final var channelState = this.channelState;
if (channelState == Disabled || channelState == Closed) {
log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState);
if (channelDisabled()) {
return;
}
cleanupJobs.computeIfAbsent(broker, k -> {
Expand Down Expand Up @@ -1462,6 +1478,10 @@ private CompletableFuture<Void> healthCheckBrokerAsync(String brokerId) {
}

private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, CompletableFuture<Void> future) {
if (channelDisabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that this leader broker is shutting down and trying to handle the broker deletion notification. It is counterintuitive that this shutting down leader broker thinks the target broker is healthy here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we should check channelDisabled() in doCleanup before calling healthCheckBrokerAsync in doCleanup(..., false).

The reason to make the change here is that the future of healthCheckBrokerAsync could be blocked until timeout during the close phase.

# this log was added by me
2024-10-11T10:59:40,184 - INFO  - [pulsar-385-8:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:61361 2
2024-10-11T10:59:45,130 - INFO  - [pulsar-load-manager-384-1:ServiceUnitStateChannelImpl] - Failed to check broker:localhost:61361 health
java.util.concurrent.TimeoutException: null
	at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[?:?]
	at org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.doCleanup(ServiceUnitStateChannelImpl.java:1525) ~[classes/:?]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added more logs and checked it again. The root cause is that this method is still scheduled when the web service is closed.

                            pulsar.getExecutor()
                                    .schedule(() -> doHealthCheckBrokerAsyncWithRetries(brokerId, retry + 1, future),
                                            Math.min(MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS, retry * retry * 50),
                                            MILLISECONDS);

Logs:

2024-10-11T11:28:58,966 - INFO  - [pulsar-load-manager-1209-1:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:58117 0
2024-10-11T11:28:58,970 - WARN  - [AsyncHttpClient-1250-1:ServiceUnitStateChannelImpl] - XYZ localhost:58117 Failed to do health check
2024-10-11T11:28:58,971 - INFO  - [pulsar-1210-9:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:58117 1
2024-10-11T11:28:58,972 - WARN  - [AsyncHttpClient-1250-1:ServiceUnitStateChannelImpl] - XYZ localhost:58117 Failed to do health check
2024-10-11T11:28:59,015 - INFO  - [main:PulsarService] - XYZ start closing web service
2024-10-11T11:28:59,019 - INFO  - [main:PulsarService] - XYZ finished closing web service
2024-10-11T11:28:59,022 - INFO  - [pulsar-1210-9:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:58117 2
2024-10-11T11:28:59,040 - WARN  - [AsyncHttpClient-1334-2:ServiceUnitStateChannelImpl] - XYZ localhost:58117 Failed to do health check
2024-10-11T11:28:59,150 - INFO  - [pulsar-load-manager-1251-1:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:58158 0
2024-10-11T11:28:59,157 - WARN  - [AsyncHttpClient-1292-1:ServiceUnitStateChannelImpl] - XYZ localhost:58158 Failed to do health check
2024-10-11T11:28:59,157 - INFO  - [pulsar-1252-9:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:58158 1
2024-10-11T11:28:59,159 - WARN  - [AsyncHttpClient-1292-1:ServiceUnitStateChannelImpl] - XYZ localhost:58158 Failed to do health check
2024-10-11T11:28:59,213 - INFO  - [pulsar-1252-4:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:58158 2
2024-10-11T11:28:59,215 - WARN  - [AsyncHttpClient-1292-1:ServiceUnitStateChannelImpl] - XYZ localhost:58158 Failed to do health check
2024-10-11T11:28:59,420 - INFO  - [pulsar-1252-1:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:58158 3

So the doCleanup should be skipped.

Actually, the key question that since doCleanup(..., true) is called during the close. Should we skip doCleanup(..., false) after that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to do the following change to avoid testOverrideOrphanStateData fail. i.e. doCleanup(..., false) won't be skipped. Only the healthCheckBrokerAsync call will be skipped

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index ea1bf01be5..30d19415e7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -1334,17 +1334,14 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
 
     private boolean channelDisabled() {
         final var channelState = this.channelState;
-        if (channelState == Disabled || channelState == Closed) {
-            log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState);
-            return true;
-        }
-        return false;
+        return channelState == Disabled || channelState == Closed;
     }
 
     private void scheduleCleanup(String broker, long delayInSecs) {
         var scheduled = new MutableObject<CompletableFuture<Void>>();
         try {
             if (channelDisabled()) {
+                log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState);
                 return;
             }
             cleanupJobs.computeIfAbsent(broker, k -> {
@@ -1478,10 +1475,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
     }
 
     private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, CompletableFuture<Void> future) {
-        if (channelDisabled()) {
-            future.complete(null);
-            return;
-        }
         try {
             var admin = getPulsarAdmin();
             admin.brokers().healthcheckAsync(TopicVersion.V2, Optional.of(brokerId))
@@ -1519,7 +1512,9 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         }
 
         // if not gracefully, verify the broker is inactive by health-check.
-        if (!gracefully) {
+        // When the channel is disabled, the broker is during the close phase where the web service might not be
+        // available so that calling healthCheckBrokerAsync() could fail with timeout.
+        if (!gracefully && !channelDisabled()) {
             try {
                 healthCheckBrokerAsync(broker).get(
                         pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm

future.complete(null);
return;
}
try {
var admin = getPulsarAdmin();
admin.brokers().healthcheckAsync(TopicVersion.V2, Optional.of(brokerId))
Expand All @@ -1472,7 +1492,6 @@ private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, Com
return;
}
if (retry == MAX_BROKER_HEALTH_CHECK_RETRY) {
log.error("Failed health-check broker :{}", brokerId, e);
future.completeExceptionally(FutureUtil.unwrapCompletionException(e));
} else {
pulsar.getExecutor()
Expand Down Expand Up @@ -1509,7 +1528,12 @@ private synchronized void doCleanup(String broker, boolean gracefully) {
return;
} catch (Exception e) {
if (debug()) {
log.info("Failed to check broker:{} health", broker, e);
if (e instanceof ExecutionException
&& e.getCause() instanceof PulsarAdminException.NotFoundException) {
log.info("The broker {} is not healthy because it's not found", broker);
} else {
log.info("Failed to check broker:{} health", broker, e);
}
}
log.info("Checked the broker:{} health. Continue the orphan bundle cleanup", broker);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "flaky")
@Test(groups = "broker")
public class ExtensibleLoadManagerCloseTest {

private static final String clusterName = "test";
Expand Down Expand Up @@ -88,14 +88,18 @@ private ServiceConfiguration brokerConfig() {
config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
config.setLoadBalancerDebugModeEnabled(true);
config.setBrokerShutdownTimeoutMs(100);

// Reduce these timeout configs to avoid failed tests being blocked too long
config.setMetadataStoreOperationTimeoutSeconds(5);
config.setNamespaceBundleUnloadingTimeoutMs(5000);
return config;
}


@Test
@Test(invocationCount = 10)
lhotari marked this conversation as resolved.
Show resolved Hide resolved
public void testCloseAfterLoadingBundles() throws Exception {
setupBrokers(3);
final var topic = "test";
final var topic = "test-" + System.currentTimeMillis();
final var admin = brokers.get(0).getAdminClient();
admin.topics().createPartitionedTopic(topic, 20);
admin.lookups().lookupPartitionedTopic(topic);
Expand Down
Loading