-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow to have different instances LocalMemoryMetadataStore that share the same state #12390
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,17 +18,23 @@ | |
*/ | ||
package org.apache.pulsar.metadata.impl; | ||
|
||
import com.google.common.collect.MapMaker; | ||
import java.net.URI; | ||
import java.net.URISyntaxException; | ||
import java.util.ArrayList; | ||
import java.util.EnumSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.NavigableMap; | ||
import java.util.Optional; | ||
import java.util.TreeMap; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
import lombok.Data; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.bookkeeper.common.concurrent.FutureUtils; | ||
import org.apache.pulsar.metadata.api.GetResult; | ||
import org.apache.pulsar.metadata.api.MetadataStoreConfig; | ||
|
@@ -41,6 +47,7 @@ | |
import org.apache.pulsar.metadata.api.extended.CreateOption; | ||
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; | ||
|
||
@Slf4j | ||
public class LocalMemoryMetadataStore extends AbstractMetadataStore implements MetadataStoreExtended { | ||
|
||
@Data | ||
|
@@ -55,56 +62,85 @@ private static class Value { | |
private final NavigableMap<String, Value> map; | ||
private final AtomicLong sequentialIdGenerator; | ||
|
||
public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) { | ||
map = new TreeMap<>(); | ||
sequentialIdGenerator = new AtomicLong(); | ||
} | ||
|
||
@Override | ||
public synchronized CompletableFuture<Optional<GetResult>> get(String path) { | ||
if (!isValidPath(path)) { | ||
return FutureUtils.exception(new MetadataStoreException("")); | ||
private static final Map<String, NavigableMap<String, Value>> STATIC_MAPS = new MapMaker() | ||
.weakValues().makeMap(); | ||
private static final Map<String, AtomicLong> STATIC_ID_GEN_MAP = new MapMaker() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why weak values? |
||
.weakValues().makeMap(); | ||
|
||
public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) | ||
throws MetadataStoreException { | ||
URI uri; | ||
try { | ||
uri = new URI(metadataURL); | ||
} catch (URISyntaxException e) { | ||
throw new MetadataStoreException(e); | ||
} | ||
|
||
Value v = map.get(path); | ||
if (v != null) { | ||
return FutureUtils.value( | ||
Optional.of(new GetResult(v.data, new Stat(path, v.version, v.createdTimestamp, v.modifiedTimestamp, | ||
v.isEphemeral(), true)))); | ||
// Local means a private data set | ||
if ("local".equals(uri.getHost())) { | ||
map = new TreeMap<>(); | ||
sequentialIdGenerator = new AtomicLong(); | ||
} else { | ||
return FutureUtils.value(Optional.empty()); | ||
// Use a reference from a shared data set | ||
String name = uri.getHost(); | ||
map = STATIC_MAPS.computeIfAbsent(name, __ -> new TreeMap<>()); | ||
sequentialIdGenerator = STATIC_ID_GEN_MAP.computeIfAbsent(name, __ -> new AtomicLong()); | ||
log.info("Created LocalMemoryDataStore for '{}'", name); | ||
} | ||
} | ||
|
||
@Override | ||
public synchronized CompletableFuture<List<String>> getChildrenFromStore(String path) { | ||
if (!isValidPath(path)) { | ||
return FutureUtils.exception(new MetadataStoreException("")); | ||
} | ||
public CompletableFuture<Optional<GetResult>> get(String path) { | ||
synchronized (map) { | ||
if (!isValidPath(path)) { | ||
return FutureUtils.exception(new MetadataStoreException("")); | ||
} | ||
|
||
String firstKey = path.equals("/") ? path : path + "/"; | ||
String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/' | ||
Value v = map.get(path); | ||
if (v != null) { | ||
return FutureUtils.value( | ||
Optional.of( | ||
new GetResult(v.data, new Stat(path, v.version, v.createdTimestamp, v.modifiedTimestamp, | ||
v.isEphemeral(), true)))); | ||
} else { | ||
return FutureUtils.value(Optional.empty()); | ||
} | ||
} | ||
} | ||
|
||
List<String> children = new ArrayList<>(); | ||
map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> { | ||
String relativePath = key.replace(firstKey, ""); | ||
if (!relativePath.contains("/")) { | ||
// Only return first-level children | ||
children.add(relativePath); | ||
@Override | ||
public CompletableFuture<List<String>> getChildrenFromStore(String path) { | ||
synchronized (map) { | ||
if (!isValidPath(path)) { | ||
return FutureUtils.exception(new MetadataStoreException("")); | ||
} | ||
}); | ||
|
||
return FutureUtils.value(children); | ||
String firstKey = path.equals("/") ? path : path + "/"; | ||
String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/' | ||
|
||
List<String> children = new ArrayList<>(); | ||
map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> { | ||
String relativePath = key.replace(firstKey, ""); | ||
if (!relativePath.contains("/")) { | ||
// Only return first-level children | ||
children.add(relativePath); | ||
} | ||
}); | ||
|
||
return FutureUtils.value(children); | ||
} | ||
} | ||
|
||
@Override | ||
public synchronized CompletableFuture<Boolean> existsFromStore(String path) { | ||
if (!isValidPath(path)) { | ||
return FutureUtils.exception(new MetadataStoreException("")); | ||
} | ||
public CompletableFuture<Boolean> existsFromStore(String path) { | ||
synchronized (map) { | ||
if (!isValidPath(path)) { | ||
return FutureUtils.exception(new MetadataStoreException("")); | ||
} | ||
|
||
Value v = map.get(path); | ||
return FutureUtils.value(v != null ? true : false); | ||
Value v = map.get(path); | ||
return FutureUtils.value(v != null ? true : false); | ||
} | ||
} | ||
|
||
@Override | ||
|
@@ -113,79 +149,86 @@ public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> exp | |
} | ||
|
||
@Override | ||
public synchronized CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion, | ||
EnumSet<CreateOption> options) { | ||
if (!isValidPath(path)) { | ||
return FutureUtils.exception(new MetadataStoreException("")); | ||
} | ||
public CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion, | ||
EnumSet<CreateOption> options) { | ||
synchronized (map) { | ||
if (!isValidPath(path)) { | ||
return FutureUtils.exception(new MetadataStoreException("")); | ||
} | ||
|
||
boolean hasVersion = optExpectedVersion.isPresent(); | ||
int expectedVersion = optExpectedVersion.orElse(-1L).intValue(); | ||
boolean hasVersion = optExpectedVersion.isPresent(); | ||
int expectedVersion = optExpectedVersion.orElse(-1L).intValue(); | ||
|
||
if (options.contains(CreateOption.Sequential)) { | ||
path += Long.toString(sequentialIdGenerator.getAndIncrement()); | ||
} | ||
if (options.contains(CreateOption.Sequential)) { | ||
path += Long.toString(sequentialIdGenerator.getAndIncrement()); | ||
} | ||
|
||
long now = System.currentTimeMillis(); | ||
long now = System.currentTimeMillis(); | ||
|
||
if (hasVersion && expectedVersion == -1) { | ||
Value newValue = new Value(0, data, now, now, options.contains(CreateOption.Ephemeral)); | ||
Value existingValue = map.putIfAbsent(path, newValue); | ||
if (existingValue != null) { | ||
return FutureUtils.exception(new BadVersionException("")); | ||
} else { | ||
receivedNotification(new Notification(NotificationType.Created, path)); | ||
String parent = parent(path); | ||
if (parent != null) { | ||
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent)); | ||
} | ||
return FutureUtils.value(new Stat(path, 0, now, now, newValue.isEphemeral(), true)); | ||
} | ||
} else { | ||
Value existingValue = map.get(path); | ||
long existingVersion = existingValue != null ? existingValue.version : -1; | ||
if (hasVersion && expectedVersion != existingVersion) { | ||
return FutureUtils.exception(new BadVersionException("")); | ||
} else { | ||
long newVersion = existingValue != null ? existingValue.version + 1 : 0; | ||
long createdTimestamp = existingValue != null ? existingValue.createdTimestamp : now; | ||
Value newValue = new Value(newVersion, data, createdTimestamp, now, | ||
options.contains(CreateOption.Ephemeral)); | ||
map.put(path, newValue); | ||
|
||
NotificationType type = existingValue == null ? NotificationType.Created : NotificationType.Modified; | ||
receivedNotification(new Notification(type, path)); | ||
if (type == NotificationType.Created) { | ||
if (hasVersion && expectedVersion == -1) { | ||
Value newValue = new Value(0, data, now, now, options.contains(CreateOption.Ephemeral)); | ||
Value existingValue = map.putIfAbsent(path, newValue); | ||
if (existingValue != null) { | ||
return FutureUtils.exception(new BadVersionException("")); | ||
} else { | ||
receivedNotification(new Notification(NotificationType.Created, path)); | ||
String parent = parent(path); | ||
if (parent != null) { | ||
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent)); | ||
} | ||
return FutureUtils.value(new Stat(path, 0, now, now, newValue.isEphemeral(), true)); | ||
} | ||
} else { | ||
Value existingValue = map.get(path); | ||
long existingVersion = existingValue != null ? existingValue.version : -1; | ||
if (hasVersion && expectedVersion != existingVersion) { | ||
return FutureUtils.exception(new BadVersionException("")); | ||
} else { | ||
long newVersion = existingValue != null ? existingValue.version + 1 : 0; | ||
long createdTimestamp = existingValue != null ? existingValue.createdTimestamp : now; | ||
Value newValue = new Value(newVersion, data, createdTimestamp, now, | ||
options.contains(CreateOption.Ephemeral)); | ||
map.put(path, newValue); | ||
|
||
NotificationType type = | ||
existingValue == null ? NotificationType.Created : NotificationType.Modified; | ||
receivedNotification(new Notification(type, path)); | ||
if (type == NotificationType.Created) { | ||
String parent = parent(path); | ||
if (parent != null) { | ||
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent)); | ||
} | ||
} | ||
return FutureUtils | ||
.value(new Stat(path, newValue.version, newValue.createdTimestamp, | ||
newValue.modifiedTimestamp, | ||
false, true)); | ||
} | ||
return FutureUtils | ||
.value(new Stat(path, newValue.version, newValue.createdTimestamp, newValue.modifiedTimestamp, false, true)); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public synchronized CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpectedVersion) { | ||
if (!isValidPath(path)) { | ||
return FutureUtils.exception(new MetadataStoreException("")); | ||
} | ||
public CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpectedVersion) { | ||
synchronized (map) { | ||
if (!isValidPath(path)) { | ||
return FutureUtils.exception(new MetadataStoreException("")); | ||
} | ||
|
||
Value value = map.get(path); | ||
if (value == null) { | ||
return FutureUtils.exception(new NotFoundException("")); | ||
} else if (optExpectedVersion.isPresent() && optExpectedVersion.get() != value.version) { | ||
return FutureUtils.exception(new BadVersionException("")); | ||
} else { | ||
map.remove(path); | ||
receivedNotification(new Notification(NotificationType.Deleted, path)); | ||
String parent = parent(path); | ||
if (parent != null) { | ||
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent)); | ||
Value value = map.get(path); | ||
if (value == null) { | ||
return FutureUtils.exception(new NotFoundException("")); | ||
} else if (optExpectedVersion.isPresent() && optExpectedVersion.get() != value.version) { | ||
return FutureUtils.exception(new BadVersionException("")); | ||
} else { | ||
map.remove(path); | ||
receivedNotification(new Notification(NotificationType.Deleted, path)); | ||
String parent = parent(path); | ||
if (parent != null) { | ||
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent)); | ||
} | ||
return FutureUtils.value(null); | ||
} | ||
return FutureUtils.value(null); | ||
} | ||
} | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why weak values?
This will make tests possible unpredictable, because eviction will depend on GC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a static map, so it would be never cleaned up otherwise.
It will not make the test unpredictable because there will be hard references to the map value for each instance of
LocalMemoryMetadataStore
that will keep it from getting GCed.