-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix namespace unload might be blocked too long with extensible load manager #23433
[fix][broker] Fix namespace unload might be blocked too long with extensible load manager #23433
Conversation
…ensible load manager
I've investigating #23413 as well and might push another PR for it if this PR was reviewed quickly. Otherwise, I might include the fix in this PR. |
...est/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, good work @BewareMyPower. I asked one question.
|
I fixed the test. And I also found the process on The However, from the failure of Besides, the Anyway, these details are complicated and it could be a new topic to discuss. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #23433 +/- ##
============================================
+ Coverage 73.57% 74.27% +0.70%
- Complexity 32624 34963 +2339
============================================
Files 1877 1952 +75
Lines 139502 147150 +7648
Branches 15299 16201 +902
============================================
+ Hits 102638 109296 +6658
- Misses 28908 29424 +516
- Partials 7956 8430 +474
Flags with carried forward coverage won't be shown. Click here to find out more.
|
private boolean channelDisabled() { | ||
final var channelState = this.channelState; | ||
if (channelState == Disabled || channelState == Closed) { | ||
log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to make this log generic or remove it as it is used in multiple places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's right, I'll change it in the new PR
When a broker shutdown, the expectation is that the connected admin clients should fail all pending requests as their connections are closed. It is a bit surprising the admin client was hung in this case, and the cleanup process waited too long until timed out. I wonder why the admin client didn't fail the health checks sooner. |
@@ -1462,6 +1478,10 @@ private CompletableFuture<Void> healthCheckBrokerAsync(String brokerId) { | |||
} | |||
|
|||
private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, CompletableFuture<Void> future) { | |||
if (channelDisabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means that this leader broker is shutting down and trying to handle the broker deletion notification. It is counterintuitive that this shutting down leader broker thinks the target broker is healthy here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we should check channelDisabled()
in doCleanup
before calling healthCheckBrokerAsync
in doCleanup(..., false)
.
The reason to make the change here is that the future of healthCheckBrokerAsync
could be blocked until timeout during the close phase.
# this log was added by me
2024-10-11T10:59:40,184 - INFO - [pulsar-385-8:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:61361 2
2024-10-11T10:59:45,130 - INFO - [pulsar-load-manager-384-1:ServiceUnitStateChannelImpl] - Failed to check broker:localhost:61361 health
java.util.concurrent.TimeoutException: null
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[?:?]
at org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.doCleanup(ServiceUnitStateChannelImpl.java:1525) ~[classes/:?]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added more logs and checked it again. The root cause is that this method is still scheduled when the web service is closed.
pulsar.getExecutor()
.schedule(() -> doHealthCheckBrokerAsyncWithRetries(brokerId, retry + 1, future),
Math.min(MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS, retry * retry * 50),
MILLISECONDS);
Logs:
2024-10-11T11:28:58,966 - INFO - [pulsar-load-manager-1209-1:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:58117 0
2024-10-11T11:28:58,970 - WARN - [AsyncHttpClient-1250-1:ServiceUnitStateChannelImpl] - XYZ localhost:58117 Failed to do health check
2024-10-11T11:28:58,971 - INFO - [pulsar-1210-9:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:58117 1
2024-10-11T11:28:58,972 - WARN - [AsyncHttpClient-1250-1:ServiceUnitStateChannelImpl] - XYZ localhost:58117 Failed to do health check
2024-10-11T11:28:59,015 - INFO - [main:PulsarService] - XYZ start closing web service
2024-10-11T11:28:59,019 - INFO - [main:PulsarService] - XYZ finished closing web service
2024-10-11T11:28:59,022 - INFO - [pulsar-1210-9:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:58117 2
2024-10-11T11:28:59,040 - WARN - [AsyncHttpClient-1334-2:ServiceUnitStateChannelImpl] - XYZ localhost:58117 Failed to do health check
2024-10-11T11:28:59,150 - INFO - [pulsar-load-manager-1251-1:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:58158 0
2024-10-11T11:28:59,157 - WARN - [AsyncHttpClient-1292-1:ServiceUnitStateChannelImpl] - XYZ localhost:58158 Failed to do health check
2024-10-11T11:28:59,157 - INFO - [pulsar-1252-9:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:58158 1
2024-10-11T11:28:59,159 - WARN - [AsyncHttpClient-1292-1:ServiceUnitStateChannelImpl] - XYZ localhost:58158 Failed to do health check
2024-10-11T11:28:59,213 - INFO - [pulsar-1252-4:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:58158 2
2024-10-11T11:28:59,215 - WARN - [AsyncHttpClient-1292-1:ServiceUnitStateChannelImpl] - XYZ localhost:58158 Failed to do health check
2024-10-11T11:28:59,420 - INFO - [pulsar-1252-1:ServiceUnitStateChannelImpl] - XYZ doHealthCheckBrokerAsyncWithRetries localhost:58158 3
So the doCleanup
should be skipped.
Actually, the key question that since doCleanup(..., true)
is called during the close. Should we skip doCleanup(..., false)
after that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to do the following change to avoid testOverrideOrphanStateData
fail. i.e. doCleanup(..., false)
won't be skipped. Only the healthCheckBrokerAsync
call will be skipped
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index ea1bf01be5..30d19415e7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -1334,17 +1334,14 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private boolean channelDisabled() {
final var channelState = this.channelState;
- if (channelState == Disabled || channelState == Closed) {
- log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState);
- return true;
- }
- return false;
+ return channelState == Disabled || channelState == Closed;
}
private void scheduleCleanup(String broker, long delayInSecs) {
var scheduled = new MutableObject<CompletableFuture<Void>>();
try {
if (channelDisabled()) {
+ log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState);
return;
}
cleanupJobs.computeIfAbsent(broker, k -> {
@@ -1478,10 +1475,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
}
private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, CompletableFuture<Void> future) {
- if (channelDisabled()) {
- future.complete(null);
- return;
- }
try {
var admin = getPulsarAdmin();
admin.brokers().healthcheckAsync(TopicVersion.V2, Optional.of(brokerId))
@@ -1519,7 +1512,9 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
}
// if not gracefully, verify the broker is inactive by health-check.
- if (!gracefully) {
+ // When the channel is disabled, the broker is during the close phase where the web service might not be
+ // available so that calling healthCheckBrokerAsync() could fail with timeout.
+ if (!gracefully && !channelDisabled()) {
try {
healthCheckBrokerAsync(broker).get(
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
…ensible load manager (apache#23433) (cherry picked from commit 5506f50)
Fixes #23412
Motivation
#23382 brings a regression to the broker stop phase, which could be blocked by waiting:
healthCheckBrokerAsync
inServiceUnitStateChannelImpl#doCleanup
, which could be blocked formetadataStoreOperationTimeoutSeconds
seconds (default: 30s)unloadNamespaceBundle
inBrokerService#unloadNamespaceBundlesGracefully
, which could be blocked fornamespaceBundleUnloadingTimeoutMs
milliseconds (default: 1 min) for each problematic bundleThe 2nd method could be blocked whenhealthCheckBrokerAsync
failed withNotFoundException
but the following steps were not skipped.Modifications
channelDisabled
beforehealthCheckBrokerAsync
metadataStoreOperationTimeoutSeconds
andnamespaceBundleUnloadingTimeoutMs
configs are reduced to avoid waiting too long if the test will fail.Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: