-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix namespace unload might be blocked too long with extensible load manager #23433
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,6 +87,7 @@ | |
import org.apache.pulsar.broker.namespace.NamespaceService; | ||
import org.apache.pulsar.broker.service.BrokerServiceException; | ||
import org.apache.pulsar.client.admin.PulsarAdmin; | ||
import org.apache.pulsar.client.admin.PulsarAdminException; | ||
import org.apache.pulsar.client.api.Schema; | ||
import org.apache.pulsar.common.naming.NamespaceBundle; | ||
import org.apache.pulsar.common.naming.NamespaceBundleFactory; | ||
|
@@ -1291,6 +1292,14 @@ private void handleBrokerCreationEvent(String broker) { | |
broker, cleanupJobs.size()); | ||
} | ||
} | ||
}) | ||
.exceptionally(e -> { | ||
if (FutureUtil.unwrapCompletionException(e) instanceof PulsarAdminException.NotFoundException) { | ||
log.warn("{} Failed to run health check: {}", broker, e.getMessage()); | ||
} else { | ||
log.error("{} Failed to run health check", broker, e); | ||
} | ||
return null; | ||
}); | ||
} | ||
} | ||
|
@@ -1323,12 +1332,19 @@ private void handleBrokerDeletionEvent(String broker) { | |
} | ||
} | ||
|
||
private boolean channelDisabled() { | ||
final var channelState = this.channelState; | ||
if (channelState == Disabled || channelState == Closed) { | ||
log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState); | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
private void scheduleCleanup(String broker, long delayInSecs) { | ||
var scheduled = new MutableObject<CompletableFuture<Void>>(); | ||
try { | ||
final var channelState = this.channelState; | ||
if (channelState == Disabled || channelState == Closed) { | ||
log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState); | ||
if (channelDisabled()) { | ||
return; | ||
} | ||
cleanupJobs.computeIfAbsent(broker, k -> { | ||
|
@@ -1462,6 +1478,10 @@ private CompletableFuture<Void> healthCheckBrokerAsync(String brokerId) { | |
} | ||
|
||
private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, CompletableFuture<Void> future) { | ||
if (channelDisabled()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This means that this leader broker is shutting down and trying to handle the broker deletion notification. It is counterintuitive that this shutting down leader broker thinks the target broker is healthy here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I think we should check The reason to make the change here is that the future of
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added more logs and checked it again. The root cause is that this method is still scheduled when the web service is closed. pulsar.getExecutor()
.schedule(() -> doHealthCheckBrokerAsyncWithRetries(brokerId, retry + 1, future),
Math.min(MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS, retry * retry * 50),
MILLISECONDS); Logs:
So the Actually, the key question that since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to do the following change to avoid diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index ea1bf01be5..30d19415e7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -1334,17 +1334,14 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private boolean channelDisabled() {
final var channelState = this.channelState;
- if (channelState == Disabled || channelState == Closed) {
- log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState);
- return true;
- }
- return false;
+ return channelState == Disabled || channelState == Closed;
}
private void scheduleCleanup(String broker, long delayInSecs) {
var scheduled = new MutableObject<CompletableFuture<Void>>();
try {
if (channelDisabled()) {
+ log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState);
return;
}
cleanupJobs.computeIfAbsent(broker, k -> {
@@ -1478,10 +1475,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
}
private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, CompletableFuture<Void> future) {
- if (channelDisabled()) {
- future.complete(null);
- return;
- }
try {
var admin = getPulsarAdmin();
admin.brokers().healthcheckAsync(TopicVersion.V2, Optional.of(brokerId))
@@ -1519,7 +1512,9 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
}
// if not gracefully, verify the broker is inactive by health-check.
- if (!gracefully) {
+ // When the channel is disabled, the broker is during the close phase where the web service might not be
+ // available so that calling healthCheckBrokerAsync() could fail with timeout.
+ if (!gracefully && !channelDisabled()) {
try {
healthCheckBrokerAsync(broker).get(
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lgtm |
||
future.complete(null); | ||
return; | ||
} | ||
try { | ||
var admin = getPulsarAdmin(); | ||
admin.brokers().healthcheckAsync(TopicVersion.V2, Optional.of(brokerId)) | ||
|
@@ -1472,7 +1492,6 @@ private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, Com | |
return; | ||
} | ||
if (retry == MAX_BROKER_HEALTH_CHECK_RETRY) { | ||
log.error("Failed health-check broker :{}", brokerId, e); | ||
future.completeExceptionally(FutureUtil.unwrapCompletionException(e)); | ||
} else { | ||
pulsar.getExecutor() | ||
|
@@ -1509,7 +1528,12 @@ private synchronized void doCleanup(String broker, boolean gracefully) { | |
return; | ||
} catch (Exception e) { | ||
if (debug()) { | ||
log.info("Failed to check broker:{} health", broker, e); | ||
if (e instanceof ExecutionException | ||
&& e.getCause() instanceof PulsarAdminException.NotFoundException) { | ||
log.info("The broker {} is not healthy because it's not found", broker); | ||
} else { | ||
log.info("Failed to check broker:{} health", broker, e); | ||
} | ||
} | ||
log.info("Checked the broker:{} health. Continue the orphan bundle cleanup", broker); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to make this log generic or remove it as it is used in multiple places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's right, I'll change it in the new PR