Skip to content

Commit

Permalink
allow caching of xodus blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
thoniTUB committed Dec 16, 2024
1 parent 3a7f482 commit d674c77
Show file tree
Hide file tree
Showing 34 changed files with 545 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ public class QueryProcessor {
private Validator validator;


public Stream<ExecutionStatus> getAllQueries(Dataset dataset, HttpServletRequest req, Subject subject, boolean allProviders) {
final Stream<ManagedExecution> allQueries = storage.getAllExecutions();

return getQueriesFiltered(dataset.getId(), RequestAwareUriBuilder.fromRequest(req), subject, allQueries, allProviders);
public List<ExecutionStatus> getAllQueries(Dataset dataset, HttpServletRequest req, Subject subject, boolean allProviders) {
try(Stream<ManagedExecution> allQueries = storage.getAllExecutions()) {
return getQueriesFiltered(dataset.getId(), RequestAwareUriBuilder.fromRequest(req), subject, allQueries, allProviders).toList();
}
}

public Stream<ExecutionStatus> getQueriesFiltered(DatasetId datasetId, UriBuilder uriBuilder, Subject subject, Stream<ManagedExecution> allQueries, boolean allProviders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ public interface Store<KEY, VALUE> extends ManagedStore {
IterationStatistic forEach(StoreEntryConsumer<KEY, VALUE> consumer);

// TODO: 08.01.2020 fk: Is this still necessary? The implementation in XodusStore uses different methods that in our context don't act differently.
void update(KEY key, VALUE value);
boolean update(KEY key, VALUE value);

void remove(KEY key);
boolean remove(KEY key);


int count();
Expand All @@ -30,6 +30,8 @@ public interface Store<KEY, VALUE> extends ManagedStore {

void clear();

String getName();

/**
* Consumer of key-value pairs stored in this Store. Used in conjunction with for-each.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,23 +122,23 @@ public IterationStatistic forEach(StoreEntryConsumer<KEY, VALUE> consumer) {
}

@Override
public void update(KEY key, VALUE value) {
public boolean update(KEY key, VALUE value) {
remove(key);
add(key, value);
return add(key, value);
}

@Override
public void remove(KEY key) {
public boolean remove(KEY key) {
BigStoreMetaKeys meta = metaStore.get(key);

if (meta == null) {
return;
return false;
}

for (UUID id : meta.getParts()) {
dataStore.remove(id);
}
metaStore.remove(key);
return metaStore.remove(key);
}

@Override
Expand Down Expand Up @@ -219,6 +219,11 @@ public void clear() {
dataStore.clear();
}

@Override
public String getName() {
return storeInfo.getName();
}

@Override
public void removeStore() {
metaStore.removeStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ public synchronized boolean add(KEY key, VALUE value) {
}

@Override
public synchronized void update(KEY key, VALUE value) {
store.update(key, value);
public synchronized boolean update(KEY key, VALUE value) {
boolean update = store.update(key, value);
cache.put(key, value);
return update;
}

@Override
Expand All @@ -72,9 +73,10 @@ public IterationStatistic forEach(StoreEntryConsumer<KEY, VALUE> consumer) {
}

@Override
public void remove(KEY key) {
store.remove(key);
public boolean remove(KEY key) {
boolean remove = store.remove(key);
cache.invalidate(key);
return remove;
}

@Override
Expand Down Expand Up @@ -150,6 +152,11 @@ public void clear() {
cache.invalidateAll();
}

@Override
public String getName() {
return store.getName();
}

@Override
public void removeStore() {
store.removeStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class SerializingStore<KEY, VALUE> implements Store<KEY, VALUE> {
/**
* The underlying store to write the values to.
*/
private final XodusStore store;
private final Store<ByteIterable, ByteIterable> store;

/**
*
Expand All @@ -122,7 +122,7 @@ public class SerializingStore<KEY, VALUE> implements Store<KEY, VALUE> {
private final ExecutorService executor;

public <CLASS_K extends Class<KEY>, CLASS_V extends Class<VALUE>> SerializingStore(
XodusStore store,
Store<ByteIterable, ByteIterable> store,
Validator validator,
ObjectMapper objectMapper,
CLASS_K keyType,
Expand Down Expand Up @@ -225,7 +225,7 @@ public VALUE get(KEY key) {
catch (Exception e) {

if (unreadableValuesDumpDir != null) {
dumpToFile(binValue.getBytesUnsafe(), key.toString(), e, unreadableValuesDumpDir, store.getName(), objectMapper);
dumpToFile(binValue.getBytesUnsafe(), key.toString(), e, unreadableValuesDumpDir, getName(), objectMapper);
}

if (removeUnreadablesFromUnderlyingStore) {
Expand Down Expand Up @@ -300,9 +300,9 @@ private static void dumpToFile(byte[] gzippedObj, @NonNull String keyOfDump, Exc
}

@Override
public void remove(KEY key) {
log.trace("Removing value to key {} from Store[{}]", key, store.getName());
store.remove(writeKey(key));
public boolean remove(KEY key) {
log.trace("Removing value to key {} from Store[{}]", key, getName());
return store.remove(writeKey(key));
}

/**
Expand Down Expand Up @@ -381,7 +381,7 @@ public IterationStatistic forEach(StoreEntryConsumer<KEY, VALUE> consumer) {
final Queue<ListenableFuture<ByteIterable>> jobs = new ConcurrentLinkedQueue<>();

// We read in single thread, and deserialize and dispatch in multiple threads.
store.forEach((k, v) -> jobs.add(executorService.submit(() -> handle(consumer, result, k, v))));
store.forEach((k, v, c) -> jobs.add(executorService.submit(() -> handle(consumer, result, k, v))));

final ListenableFuture<List<ByteIterable>> allJobs = Futures.allAsList(jobs);

Expand Down Expand Up @@ -496,7 +496,7 @@ private <TYPE> TYPE getDeserializedAndDumpFailed(ByteIterable serial, Function<B
}

@Override
public void update(KEY key, VALUE value) {
public boolean update(KEY key, VALUE value) {
if (!valueType.isInstance(value)) {
throw new IllegalStateException("The element %s is not of the required type %s".formatted(value, valueType));
}
Expand All @@ -505,7 +505,7 @@ public void update(KEY key, VALUE value) {
ValidatorHelper.failOnError(log, validator.validate(value));
}

store.update(writeKey(key), writeValue(value));
return store.update(writeKey(key), writeValue(value));
}

@Override
Expand All @@ -519,12 +519,12 @@ public int count() {

@Override
public Stream<VALUE> getAll() {
return store.getAllKeys().stream().map(store::get).map(this::readValue);
return store.getAllKeys().map(store::get).map(this::readValue);
}

@Override
public Stream<KEY> getAllKeys() {
return store.getAllKeys().stream().map(this::readKey);
return store.getAllKeys().map(this::readKey);
}

/**
Expand All @@ -539,13 +539,18 @@ public void clear() {
store.clear();
}

@Override
public String getName() {
return store.getName();
}

@Override
public void removeStore() {
store.deleteStore();
store.removeStore();
}

@Override
public void close() {
public void close() throws IOException {
store.close();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
package com.bakdata.conquery.io.storage.xodus.stores;

import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import com.google.common.primitives.Ints;
import jetbrains.exodus.ByteIterable;
import jetbrains.exodus.env.Cursor;
import jetbrains.exodus.env.Environment;
import jetbrains.exodus.env.Store;
import jetbrains.exodus.env.StoreConfig;
import jetbrains.exodus.env.Transaction;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class XodusStore {
public class XodusStore implements com.bakdata.conquery.io.storage.Store<ByteIterable, ByteIterable> {
private final Store store;
@Getter
private final Environment environment;
Expand Down Expand Up @@ -53,7 +55,8 @@ public ByteIterable get(ByteIterable key) {
*
* @param consumer function called for-each key-value pair.
*/
public void forEach(BiConsumer<ByteIterable, ByteIterable> consumer) {
@Override
public SerializingStore.IterationStatistic forEach(StoreEntryConsumer<ByteIterable, ByteIterable> consumer) {
AtomicReference<ByteIterable> lastKey = new AtomicReference<>();
AtomicBoolean done = new AtomicBoolean(false);
while (!done.get()) {
Expand All @@ -72,23 +75,19 @@ public void forEach(BiConsumer<ByteIterable, ByteIterable> consumer) {
return;
}
lastKey.set(c.getKey());
consumer.accept(lastKey.get(), c.getValue());
ByteIterable value = c.getValue();
consumer.accept(lastKey.get(), value, value.getLength());
}
}
});
}

return null;
}

public List<ByteIterable> getAllKeys() {
return environment.computeInReadonlyTransaction(txn -> {
List<ByteIterable> keys = new ArrayList<>();
try (Cursor c = store.openCursor(txn)) {
while (c.getNext()) {
keys.add(c.getKey());
}
return keys;
}
});
public Stream<ByteIterable> getAllKeys() {
XodusIterator spliterator = new XodusIterator(environment, store);
return StreamSupport.stream(spliterator, false).onClose(spliterator::onClose);
}

public boolean update(ByteIterable key, ByteIterable value) {
Expand All @@ -103,6 +102,11 @@ public int count() {
return Ints.checkedCast(environment.computeInReadonlyTransaction(store::count));
}

@Override
public Stream<ByteIterable> getAll() {
return Stream.empty();
}


public void clear() {
environment.executeInExclusiveTransaction(t -> {
Expand All @@ -114,12 +118,17 @@ public void clear() {
});
}

public void deleteStore() {
public void removeStore() {
log.debug("Deleting store {} from environment {}", store.getName(), environment.getLocation());
environment.executeInTransaction(t -> environment.removeStore(store.getName(),t));
storeRemoveHook.accept(this);
}

@Override
public void loadData() {

}

public void close() {
if (!environment.isOpen()) {
log.trace("While closing store: Environment is already closed for {}", this);
Expand All @@ -132,4 +141,35 @@ public void close() {
public String toString() {
return "XodusStore[" + environment.getLocation() + ":" +store.getName() +"}";
}

private static class XodusIterator extends Spliterators.AbstractSpliterator<ByteIterable> {

private Transaction transaction;
private Cursor cursor;

protected XodusIterator(Environment environment, Store store) {
super(Long.MAX_VALUE, Spliterator.ORDERED);

transaction = environment.beginReadonlyTransaction();
cursor = store.openCursor(transaction);
}

@Override
public boolean tryAdvance(Consumer<? super ByteIterable> action) {
if (cursor.getNext()) {
action.accept(cursor.getKey());
return true;
}
else {
cursor.close();
return false;
}
}

public void onClose() {
if (!transaction.isFinished()) {
transaction.abort();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ private static void handleImport(Namespace namespace, InputStream inputStream, b

readAndDistributeImport(((DistributedNamespace) namespace), table, header, parser, datasetRegistry);

clearDependentConcepts(namespace.getStorage().getAllConcepts(), table);
try(Stream<Concept<?>> allConcepts = namespace.getStorage().getAllConcepts();) {
clearDependentConcepts(allConcepts, table);
}
}
}

Expand Down Expand Up @@ -183,7 +185,9 @@ public void deleteImport(Import imp) {
final DatasetId id = imp.getTable().getDataset();
final DistributedNamespace namespace = datasetRegistry.get(id);

clearDependentConcepts(namespace.getStorage().getAllConcepts(), imp.getTable().resolve());
try(Stream<Concept<?>> allConcepts = namespace.getStorage().getAllConcepts()) {
clearDependentConcepts(allConcepts, imp.getTable().resolve());
}

namespace.getStorage().removeImport(imp.getId());
namespace.getWorkerHandler().sendToAll(new RemoveImportJob(imp.getId()));
Expand Down
Loading

0 comments on commit d674c77

Please sign in to comment.