Skip to content

Commit

Permalink
Allow to have different instances LocalMemoryMetadataStore that share…
Browse files Browse the repository at this point in the history
… the same state (#12390)
  • Loading branch information
merlimat authored Oct 25, 2021
1 parent 464a9cd commit 9f25843
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@
*/
package org.apache.pulsar.metadata.impl;

import com.google.common.collect.MapMaker;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import lombok.Data;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
Expand All @@ -41,6 +47,7 @@
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

@Slf4j
public class LocalMemoryMetadataStore extends AbstractMetadataStore implements MetadataStoreExtended {

@Data
Expand All @@ -55,56 +62,85 @@ private static class Value {
private final NavigableMap<String, Value> map;
private final AtomicLong sequentialIdGenerator;

public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) {
map = new TreeMap<>();
sequentialIdGenerator = new AtomicLong();
}

@Override
public synchronized CompletableFuture<Optional<GetResult>> get(String path) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
private static final Map<String, NavigableMap<String, Value>> STATIC_MAPS = new MapMaker()
.weakValues().makeMap();
private static final Map<String, AtomicLong> STATIC_ID_GEN_MAP = new MapMaker()
.weakValues().makeMap();

public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
throws MetadataStoreException {
URI uri;
try {
uri = new URI(metadataURL);
} catch (URISyntaxException e) {
throw new MetadataStoreException(e);
}

Value v = map.get(path);
if (v != null) {
return FutureUtils.value(
Optional.of(new GetResult(v.data, new Stat(path, v.version, v.createdTimestamp, v.modifiedTimestamp,
v.isEphemeral(), true))));
// Local means a private data set
if ("local".equals(uri.getHost())) {
map = new TreeMap<>();
sequentialIdGenerator = new AtomicLong();
} else {
return FutureUtils.value(Optional.empty());
// Use a reference from a shared data set
String name = uri.getHost();
map = STATIC_MAPS.computeIfAbsent(name, __ -> new TreeMap<>());
sequentialIdGenerator = STATIC_ID_GEN_MAP.computeIfAbsent(name, __ -> new AtomicLong());
log.info("Created LocalMemoryDataStore for '{}'", name);
}
}

@Override
public synchronized CompletableFuture<List<String>> getChildrenFromStore(String path) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}
public CompletableFuture<Optional<GetResult>> get(String path) {
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}

String firstKey = path.equals("/") ? path : path + "/";
String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/'
Value v = map.get(path);
if (v != null) {
return FutureUtils.value(
Optional.of(
new GetResult(v.data, new Stat(path, v.version, v.createdTimestamp, v.modifiedTimestamp,
v.isEphemeral(), true))));
} else {
return FutureUtils.value(Optional.empty());
}
}
}

List<String> children = new ArrayList<>();
map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
String relativePath = key.replace(firstKey, "");
if (!relativePath.contains("/")) {
// Only return first-level children
children.add(relativePath);
@Override
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}
});

return FutureUtils.value(children);
String firstKey = path.equals("/") ? path : path + "/";
String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/'

List<String> children = new ArrayList<>();
map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
String relativePath = key.replace(firstKey, "");
if (!relativePath.contains("/")) {
// Only return first-level children
children.add(relativePath);
}
});

return FutureUtils.value(children);
}
}

@Override
public synchronized CompletableFuture<Boolean> existsFromStore(String path) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}
public CompletableFuture<Boolean> existsFromStore(String path) {
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}

Value v = map.get(path);
return FutureUtils.value(v != null ? true : false);
Value v = map.get(path);
return FutureUtils.value(v != null ? true : false);
}
}

@Override
Expand All @@ -113,79 +149,86 @@ public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> exp
}

@Override
public synchronized CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}
public CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}

boolean hasVersion = optExpectedVersion.isPresent();
int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
boolean hasVersion = optExpectedVersion.isPresent();
int expectedVersion = optExpectedVersion.orElse(-1L).intValue();

if (options.contains(CreateOption.Sequential)) {
path += Long.toString(sequentialIdGenerator.getAndIncrement());
}
if (options.contains(CreateOption.Sequential)) {
path += Long.toString(sequentialIdGenerator.getAndIncrement());
}

long now = System.currentTimeMillis();
long now = System.currentTimeMillis();

if (hasVersion && expectedVersion == -1) {
Value newValue = new Value(0, data, now, now, options.contains(CreateOption.Ephemeral));
Value existingValue = map.putIfAbsent(path, newValue);
if (existingValue != null) {
return FutureUtils.exception(new BadVersionException(""));
} else {
receivedNotification(new Notification(NotificationType.Created, path));
String parent = parent(path);
if (parent != null) {
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
}
return FutureUtils.value(new Stat(path, 0, now, now, newValue.isEphemeral(), true));
}
} else {
Value existingValue = map.get(path);
long existingVersion = existingValue != null ? existingValue.version : -1;
if (hasVersion && expectedVersion != existingVersion) {
return FutureUtils.exception(new BadVersionException(""));
} else {
long newVersion = existingValue != null ? existingValue.version + 1 : 0;
long createdTimestamp = existingValue != null ? existingValue.createdTimestamp : now;
Value newValue = new Value(newVersion, data, createdTimestamp, now,
options.contains(CreateOption.Ephemeral));
map.put(path, newValue);

NotificationType type = existingValue == null ? NotificationType.Created : NotificationType.Modified;
receivedNotification(new Notification(type, path));
if (type == NotificationType.Created) {
if (hasVersion && expectedVersion == -1) {
Value newValue = new Value(0, data, now, now, options.contains(CreateOption.Ephemeral));
Value existingValue = map.putIfAbsent(path, newValue);
if (existingValue != null) {
return FutureUtils.exception(new BadVersionException(""));
} else {
receivedNotification(new Notification(NotificationType.Created, path));
String parent = parent(path);
if (parent != null) {
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
}
return FutureUtils.value(new Stat(path, 0, now, now, newValue.isEphemeral(), true));
}
} else {
Value existingValue = map.get(path);
long existingVersion = existingValue != null ? existingValue.version : -1;
if (hasVersion && expectedVersion != existingVersion) {
return FutureUtils.exception(new BadVersionException(""));
} else {
long newVersion = existingValue != null ? existingValue.version + 1 : 0;
long createdTimestamp = existingValue != null ? existingValue.createdTimestamp : now;
Value newValue = new Value(newVersion, data, createdTimestamp, now,
options.contains(CreateOption.Ephemeral));
map.put(path, newValue);

NotificationType type =
existingValue == null ? NotificationType.Created : NotificationType.Modified;
receivedNotification(new Notification(type, path));
if (type == NotificationType.Created) {
String parent = parent(path);
if (parent != null) {
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
}
}
return FutureUtils
.value(new Stat(path, newValue.version, newValue.createdTimestamp,
newValue.modifiedTimestamp,
false, true));
}
return FutureUtils
.value(new Stat(path, newValue.version, newValue.createdTimestamp, newValue.modifiedTimestamp, false, true));
}
}
}

@Override
public synchronized CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpectedVersion) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}
public CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpectedVersion) {
synchronized (map) {
if (!isValidPath(path)) {
return FutureUtils.exception(new MetadataStoreException(""));
}

Value value = map.get(path);
if (value == null) {
return FutureUtils.exception(new NotFoundException(""));
} else if (optExpectedVersion.isPresent() && optExpectedVersion.get() != value.version) {
return FutureUtils.exception(new BadVersionException(""));
} else {
map.remove(path);
receivedNotification(new Notification(NotificationType.Deleted, path));
String parent = parent(path);
if (parent != null) {
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
Value value = map.get(path);
if (value == null) {
return FutureUtils.exception(new NotFoundException(""));
} else if (optExpectedVersion.isPresent() && optExpectedVersion.get() != value.version) {
return FutureUtils.exception(new BadVersionException(""));
} else {
map.remove(path);
receivedNotification(new Notification(NotificationType.Deleted, path));
String parent = parent(path);
if (parent != null) {
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
}
return FutureUtils.value(null);
}
return FutureUtils.value(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.metadata;

import static org.testng.Assert.assertTrue;
import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
import org.apache.pulsar.tests.TestRetrySupport;
Expand Down Expand Up @@ -52,7 +53,7 @@ public Object[][] implementations() {
// Supplier<String> lambda is used for providing the value.
return new Object[][] {
{ "ZooKeeper", stringSupplier(() -> zks.getConnectionString()) },
{ "Memory", stringSupplier(() -> "memory://local") },
{ "Memory", stringSupplier(() -> "memory://" + UUID.randomUUID()) },
};
}

Expand Down
Loading

0 comments on commit 9f25843

Please sign in to comment.