Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata information is not cleaned when broker exits abnormally #23889

Open
2 tasks done
Joforde opened this issue Jan 24, 2025 · 10 comments · May be fixed by #23902
Open
2 tasks done

Metadata information is not cleaned when broker exits abnormally #23889

Joforde opened this issue Jan 24, 2025 · 10 comments · May be fixed by #23902
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Comments

@Joforde
Copy link

Joforde commented Jan 24, 2025

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

When a broker starts, it registers its metadata with the metadata service (such as Zookeeper or ETCD) under the /loadbalance/brokers directory. When the broker exits gracefully, it actively calls the unregister method to remove its own metadata.

@Override
public CompletableFuture<Void> registerAsync() {
final var state = this.state.get();
if (state != State.Started && state != State.Registered) {
log.info("[{}] Skip registering self because the state is {}", getBrokerId(), state);
return CompletableFuture.completedFuture(null);
}
log.info("[{}] Started registering self to {} (state: {})", getBrokerId(), brokerIdKeyPath, state);
return brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
.orTimeout(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS)

@Override
public synchronized void unregister() throws MetadataStoreException {
if (state.compareAndSet(State.Registered, State.Unregistering)) {
try {
brokerLookupDataMetadataCache.delete(brokerIdKeyPath)
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
} catch (ExecutionException e) {

However, if the broker is forced to exit due to issues like hardware failure, network problems, or being terminated with kill -9, it does not call the unregister method to delete its metadata. This results in the metadata for brokers that are no longer accessible remaining in the system.

Currently, Pulsar retrieves all active brokers by fetching all child nodes under the metadata service's /loadbalance/brokers path. This can lead to offline brokers being considered active, and bundles may be assigned to brokers that are not accessible, causing new namespace clients to experience read and write failures.

@Override
public CompletableFuture<List<String>> getAvailableBrokersAsync() {
this.checkState();
return brokerLookupDataMetadataCache.getChildren(LOADBALANCE_BROKERS_ROOT);
}

Solution

I am not yet certain how to resolve this issue. I am currently reviewing the relevant code and drafting a solution document. If you have any suggestions, please feel free to leave a comment.

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@Joforde Joforde added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Jan 24, 2025
@Joforde Joforde changed the title Meta information is not cleaned when broker exits abnormally Metadata information is not cleaned when broker exits abnormally Jan 24, 2025
@lhotari
Copy link
Member

lhotari commented Jan 24, 2025

However, if the broker is forced to exit due to issues like hardware failure, network problems, or being terminated with kill -9, it does not call the unregister method to delete its metadata. This results in the metadata for brokers that are no longer accessible remaining in the system.

The metadata should get deleted automatically when the session expires. When a broker crashes, a new broker instance would not be able to start until the session expires and the empheral metadata entries expire.

There's EnumSet.of(CreateOption.Ephemeral):

return brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral))

What metadata store implementation are you using? (What version, deployment type?)
Just in case you are using Oxia since you are facing this issue. With Oxia there was a related bug, addressed by streamnative/oxia#534 since Oxia v0.10.0. With Oxia, I believe it's worth tracking the recent releases and keeping up-to-date.

@Joforde
Copy link
Author

Joforde commented Jan 26, 2025

@lhotari Thank you for your clear and accurate response; I found it very helpful. I will try the latest version of Oxia latter.

In this case, I am using Pulsar version 4.0.2, with ZooKeeper as the metadata storage service.

I have identified the critical issue: when a broker registers information with the metadata storage service, it sets the expectedVersion field to Optional.empty(). For implementations using ZooKeeper, this creates a PERSISTENT node, which means that the node cannot be destroyed when the broker crashes.

The verification method is as follows:

@Test
public void zookeeperEphemeralKeys() throws Exception {
    final String key1 = newKey();
    final String key2 = newKey();
    @Cleanup MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(), MetadataStoreConfig.builder().build());
    store.put(key1, "value-1".getBytes(), Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).join();
    store.put(key2, "value-1".getBytes(), Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
    store.close();

    @Cleanup MetadataStoreExtended store2 = MetadataStoreExtended.create(zks.getConnectionString(), MetadataStoreConfig.builder().build());
    assertFalse(store2.exists(key1).join());
    // This check will not pass
    assertFalse(store2.exists(key2).join());
    store2.close();
}

I have made the following modifications.
When using ZooKeeper as the metadata storage service, to allow ephemeral nodes to be set up successfully without requiring a version number
Are there any other problems that I should consider or test?
1e7a3b4

@lhotari
Copy link
Member

lhotari commented Jan 26, 2025

In this case, I am using Pulsar version 4.0.2, with ZooKeeper as the metadata storage service.

I have identified the critical issue: when a broker registers information with the metadata storage service, it sets the expectedVersion field to Optional.empty(). For implementations using ZooKeeper, this creates a PERSISTENT node, which means that the node cannot be destroyed when the broker crashes.

Good observations, @Joforde. Thanks for sharing. Are you using the extensible load manager?
It looks like changes #23298 and #23359 by @BewareMyPower are in this area.

It seems that the Pulsar default load manager implementation (loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl) would use this code to acquire a lock for the broker metadata:

brokerDataLock = brokersData.acquireLock(brokerZnodePath, localData).join();

What loadManagerClassName setting do you have?

@lhotari
Copy link
Member

lhotari commented Jan 26, 2025

I have made the following modifications.
When using ZooKeeper as the metadata storage service, to allow ephemeral nodes to be set up successfully without requiring a version number
Are there any other problems that I should consider or test?
1e7a3b4

@Joforde I guess the intention of #23298 change was to overwrite any existing node and create a new emphemeral node if one doesn't exist. Adding tests and support for all metadata implementations would be necessary in addressing the issue, it seems.

@Joforde
Copy link
Author

Joforde commented Jan 26, 2025

What loadManagerClassName setting do you have?

The loadManagerClassName I have set is org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.

@BewareMyPower
Copy link
Contributor

since p.getOptions() is empty, it calls setData, leading to a NONODE error, and ultimately calls internalStorePut to create a PERSISTENT node

This is anti-intuitive IMO. The ZkMetadataStore should not adopt this behavior.

@heesung-sn
Copy link
Contributor

https://github.com/apache/pulsar/blob/master/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java#L441-L443

Don't we need to pass opPut.getOptions() here so that it can pass the original opPut.getOptions()(CreateOption.Ephemeral) when the set operation fails?

put(opPut.getPath(), opPut.getData(), Optional.of(-1L), opPut.getOptions()).thenAccept(

@lhotari
Copy link
Member

lhotari commented Jan 27, 2025

https://github.com/apache/pulsar/blob/master/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java#L441-L443

Don't we need to pass opPut.getOptions() here so that it can pass the original opPut.getOptions()(CreateOption.Ephemeral) when the set operation fails?

put(opPut.getPath(), opPut.getData(), Optional.of(-1L), opPut.getOptions()).thenAccept(

Good catch @heesung-sn! Would you like to submit a PR for addressing this bug?

@heesung-sn heesung-sn linked a pull request Jan 27, 2025 that will close this issue
15 tasks
@heesung-sn
Copy link
Contributor

raised a pr : #23902

@heesung-sn
Copy link
Contributor

@Joforde
side note: if you are testing ExtensibleLoadManagerImpl, I recommend trying this setting, too.
loadManagerServiceUnitStateTableViewClassName=org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateMetadataStoreTableViewImpl

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants