Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to have different instances LocalMemoryMetadataStore that share the same state #12390

Merged
merged 1 commit into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@
*/
package org.apache.pulsar.metadata.impl;

import com.google.common.collect.MapMaker;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import lombok.Data;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
Expand All @@ -41,6 +47,7 @@
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

@Slf4j
public class LocalMemoryMetadataStore extends AbstractMetadataStore implements MetadataStoreExtended {

@Data
Expand All @@ -55,56 +62,85 @@ private static class Value {
private final NavigableMap<String, Value> map;
private final AtomicLong sequentialIdGenerator;

public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) {
map = new TreeMap<>();
sequentialIdGenerator = new AtomicLong();
}

@Override
public synchronized CompletableFuture<Optional<GetResult>> get(String path) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
private static final Map<String, NavigableMap<String, Value>> STATIC_MAPS = new MapMaker()
.weakValues().makeMap();
private static final Map<String, AtomicLong> STATIC_ID_GEN_MAP = new MapMaker()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why weak values?
This will make tests possible unpredictable, because eviction will depend on GC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a static map, so it would be never cleaned up otherwise.

It will not make the test unpredictable because there will be hard references to the map value for each instance of LocalMemoryMetadataStore that will keep it from getting GCed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why weak values?
This will make tests possible unpredictable, because eviction will depend on GC

.weakValues().makeMap();

public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
throws MetadataStoreException {
URI uri;
try {
uri = new URI(metadataURL);
} catch (URISyntaxException e) {
throw new MetadataStoreException(e);
}

Value v = map.get(path);
if (v != null) {
return FutureUtils.value(
Optional.of(new GetResult(v.data, new Stat(path, v.version, v.createdTimestamp, v.modifiedTimestamp,
v.isEphemeral(), true))));
// Local means a private data set
if ("local".equals(uri.getHost())) {
map = new TreeMap<>();
sequentialIdGenerator = new AtomicLong();
} else {
return FutureUtils.value(Optional.empty());
// Use a reference from a shared data set
String name = uri.getHost();
map = STATIC_MAPS.computeIfAbsent(name, __ -> new TreeMap<>());
sequentialIdGenerator = STATIC_ID_GEN_MAP.computeIfAbsent(name, __ -> new AtomicLong());
log.info("Created LocalMemoryDataStore for '{}'", name);
}
}

@Override
public synchronized CompletableFuture<List<String>> getChildrenFromStore(String path) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}
public CompletableFuture<Optional<GetResult>> get(String path) {
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}

String firstKey = path.equals("/") ? path : path + "/";
String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/'
Value v = map.get(path);
if (v != null) {
return FutureUtils.value(
Optional.of(
new GetResult(v.data, new Stat(path, v.version, v.createdTimestamp, v.modifiedTimestamp,
v.isEphemeral(), true))));
} else {
return FutureUtils.value(Optional.empty());
}
}
}

List<String> children = new ArrayList<>();
map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
String relativePath = key.replace(firstKey, "");
if (!relativePath.contains("/")) {
// Only return first-level children
children.add(relativePath);
@Override
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}
});

return FutureUtils.value(children);
String firstKey = path.equals("/") ? path : path + "/";
String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/'

List<String> children = new ArrayList<>();
map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
String relativePath = key.replace(firstKey, "");
if (!relativePath.contains("/")) {
// Only return first-level children
children.add(relativePath);
}
});

return FutureUtils.value(children);
}
}

@Override
public synchronized CompletableFuture<Boolean> existsFromStore(String path) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}
public CompletableFuture<Boolean> existsFromStore(String path) {
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}

Value v = map.get(path);
return FutureUtils.value(v != null ? true : false);
Value v = map.get(path);
return FutureUtils.value(v != null ? true : false);
}
}

@Override
Expand All @@ -113,79 +149,86 @@ public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> exp
}

@Override
public synchronized CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}
public CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}

boolean hasVersion = optExpectedVersion.isPresent();
int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
boolean hasVersion = optExpectedVersion.isPresent();
int expectedVersion = optExpectedVersion.orElse(-1L).intValue();

if (options.contains(CreateOption.Sequential)) {
path += Long.toString(sequentialIdGenerator.getAndIncrement());
}
if (options.contains(CreateOption.Sequential)) {
path += Long.toString(sequentialIdGenerator.getAndIncrement());
}

long now = System.currentTimeMillis();
long now = System.currentTimeMillis();

if (hasVersion && expectedVersion == -1) {
Value newValue = new Value(0, data, now, now, options.contains(CreateOption.Ephemeral));
Value existingValue = map.putIfAbsent(path, newValue);
if (existingValue != null) {
return FutureUtils.exception(new BadVersionException(""));
} else {
receivedNotification(new Notification(NotificationType.Created, path));
String parent = parent(path);
if (parent != null) {
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
}
return FutureUtils.value(new Stat(path, 0, now, now, newValue.isEphemeral(), true));
}
} else {
Value existingValue = map.get(path);
long existingVersion = existingValue != null ? existingValue.version : -1;
if (hasVersion && expectedVersion != existingVersion) {
return FutureUtils.exception(new BadVersionException(""));
} else {
long newVersion = existingValue != null ? existingValue.version + 1 : 0;
long createdTimestamp = existingValue != null ? existingValue.createdTimestamp : now;
Value newValue = new Value(newVersion, data, createdTimestamp, now,
options.contains(CreateOption.Ephemeral));
map.put(path, newValue);

NotificationType type = existingValue == null ? NotificationType.Created : NotificationType.Modified;
receivedNotification(new Notification(type, path));
if (type == NotificationType.Created) {
if (hasVersion && expectedVersion == -1) {
Value newValue = new Value(0, data, now, now, options.contains(CreateOption.Ephemeral));
Value existingValue = map.putIfAbsent(path, newValue);
if (existingValue != null) {
return FutureUtils.exception(new BadVersionException(""));
} else {
receivedNotification(new Notification(NotificationType.Created, path));
String parent = parent(path);
if (parent != null) {
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
}
return FutureUtils.value(new Stat(path, 0, now, now, newValue.isEphemeral(), true));
}
} else {
Value existingValue = map.get(path);
long existingVersion = existingValue != null ? existingValue.version : -1;
if (hasVersion && expectedVersion != existingVersion) {
return FutureUtils.exception(new BadVersionException(""));
} else {
long newVersion = existingValue != null ? existingValue.version + 1 : 0;
long createdTimestamp = existingValue != null ? existingValue.createdTimestamp : now;
Value newValue = new Value(newVersion, data, createdTimestamp, now,
options.contains(CreateOption.Ephemeral));
map.put(path, newValue);

NotificationType type =
existingValue == null ? NotificationType.Created : NotificationType.Modified;
receivedNotification(new Notification(type, path));
if (type == NotificationType.Created) {
String parent = parent(path);
if (parent != null) {
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
}
}
return FutureUtils
.value(new Stat(path, newValue.version, newValue.createdTimestamp,
newValue.modifiedTimestamp,
false, true));
}
return FutureUtils
.value(new Stat(path, newValue.version, newValue.createdTimestamp, newValue.modifiedTimestamp, false, true));
}
}
}

@Override
public synchronized CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpectedVersion) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}
public CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpectedVersion) {
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}

Value value = map.get(path);
if (value == null) {
return FutureUtils.exception(new NotFoundException(""));
} else if (optExpectedVersion.isPresent() && optExpectedVersion.get() != value.version) {
return FutureUtils.exception(new BadVersionException(""));
} else {
map.remove(path);
receivedNotification(new Notification(NotificationType.Deleted, path));
String parent = parent(path);
if (parent != null) {
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
Value value = map.get(path);
if (value == null) {
return FutureUtils.exception(new NotFoundException(""));
} else if (optExpectedVersion.isPresent() && optExpectedVersion.get() != value.version) {
return FutureUtils.exception(new BadVersionException(""));
} else {
map.remove(path);
receivedNotification(new Notification(NotificationType.Deleted, path));
String parent = parent(path);
if (parent != null) {
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
}
return FutureUtils.value(null);
}
return FutureUtils.value(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.metadata;

import static org.testng.Assert.assertTrue;
import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
import org.apache.pulsar.tests.TestRetrySupport;
Expand Down Expand Up @@ -52,7 +53,7 @@ public Object[][] implementations() {
// Supplier<String> lambda is used for providing the value.
return new Object[][] {
{ "ZooKeeper", stringSupplier(() -> zks.getConnectionString()) },
{ "Memory", stringSupplier(() -> "memory://local") },
{ "Memory", stringSupplier(() -> "memory://" + UUID.randomUUID()) },
};
}

Expand Down
Loading