Skip to content

Commit

Permalink
refactor xodus out of serializing store
Browse files Browse the repository at this point in the history
  • Loading branch information
thoniTUB committed May 5, 2022
1 parent 04555c1 commit 61ba8d0
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.bakdata.conquery.io.storage;

import java.util.function.BiConsumer;

public interface RawStore {
boolean add(byte[] writeKey, byte[] writeValue);

byte[] get(byte[] writeKey);

String getName();

void forEach(BiConsumer<byte[], byte[]> consumer);

boolean remove(byte[] bytes);

boolean update(byte[] writeKey, byte[] writeValue);

void clear();

void deleteStore();

void close();

int count();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.io.jackson.JacksonUtil;
import com.bakdata.conquery.io.storage.RawStore;
import com.bakdata.conquery.io.storage.Store;
import com.bakdata.conquery.models.config.XodusStoreFactory;
import com.bakdata.conquery.models.exceptions.ValidatorHelper;
Expand All @@ -23,8 +24,6 @@
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Throwables;
import jetbrains.exodus.ArrayByteIterable;
import jetbrains.exodus.ByteIterable;
import lombok.Data;
import lombok.NonNull;
import lombok.ToString;
Expand Down Expand Up @@ -62,7 +61,7 @@ public class SerializingStore<KEY, VALUE> implements Store<KEY, VALUE> {
/**
* Deserializer for values
*/
private ObjectReader valueReader;
private final ObjectReader valueReader;

/**
* Optional validator used for serialization.
Expand All @@ -72,7 +71,7 @@ public class SerializingStore<KEY, VALUE> implements Store<KEY, VALUE> {
/**
* The underlying store to write the values to.
*/
private final XodusStore store;
private final RawStore store;

/**
*
Expand All @@ -94,8 +93,7 @@ public class SerializingStore<KEY, VALUE> implements Store<KEY, VALUE> {

private final ObjectMapper objectMapper;

@SuppressWarnings("unchecked")
public <CLASS_K extends Class<KEY>, CLASS_V extends Class<VALUE>> SerializingStore(XodusStore store,
public <CLASS_K extends Class<KEY>, CLASS_V extends Class<VALUE>> SerializingStore(RawStore store,
Validator validator,
ObjectMapper objectMapper,
CLASS_K keyType,
Expand Down Expand Up @@ -146,7 +144,7 @@ public void add(KEY key, VALUE value) {

@Override
public VALUE get(KEY key) {
ByteIterable binValue = store.get(writeKey(key));
byte[] binValue = store.get(writeKey(key));
try {
return readValue(binValue);
} catch (Exception e) {
Expand All @@ -171,17 +169,17 @@ public VALUE get(KEY key) {
@Override
public IterationStatistic forEach(StoreEntryConsumer<KEY, VALUE> consumer) {
IterationStatistic result = new IterationStatistic();
ArrayList<ByteIterable> unreadables = new ArrayList<>();
ArrayList<byte[]> unreadables = new ArrayList<>();
store.forEach((k, v) -> {
result.incrTotalProcessed();

// Try to read the key first
KEY key = getDeserializedAndDumpFailed(
k,
this::readKey,
() -> new String(k.getBytesUnsafe()),
v,
"Could not parse key [{}]");
k,
this::readKey,
() -> new String(k),
v,
"Could not parse key [{}]");
if (key == null) {
unreadables.add(k);
result.incrFailedKeys();
Expand All @@ -203,7 +201,7 @@ public IterationStatistic forEach(StoreEntryConsumer<KEY, VALUE> consumer) {

// Apply the consumer to key and value
try {
consumer.accept(key, value, v.getLength());
consumer.accept(key, value, v.length);
}
catch (Exception e) {
log.warn("Unable to apply for-each consumer on key[{}]", key, e);
Expand All @@ -229,26 +227,28 @@ public IterationStatistic forEach(StoreEntryConsumer<KEY, VALUE> consumer) {
}
return result;
}

/**
* Deserializes the gives serial value (either a key or a value of an store entry) to a concrete object. If that fails the entry-value is dumped if configured so to a file using the entry-key for the filename.
* @param <TYPE> The deserialized object type.
* @param serial The to be deserialized object (key or value of an entry)
* @param deserializer The concrete deserializer to use.
*
* @param <TYPE> The deserialized object type.
* @param serial The to be deserialized object (key or value of an entry)
* @param deserializer The concrete deserializer to use.
* @param onFailKeyStringSupplier When deserilization failed and dump is enabled this is used in the dump file name.
* @param onFailOrigValue Will be the dumpfile content rendered as a json.
* @param onFailWarnMsgFmt The warn message that will be logged on failure.
* @param onFailOrigValue Will be the dumpfile content rendered as a json.
* @param onFailWarnMsgFmt The warn message that will be logged on failure.
* @return The deserialized value
*/
private <TYPE> TYPE getDeserializedAndDumpFailed(ByteIterable serial, Function<ByteIterable, TYPE> deserializer, Supplier<String> onFailKeyStringSupplier, ByteIterable onFailOrigValue, String onFailWarnMsgFmt ){
private <TYPE> TYPE getDeserializedAndDumpFailed(byte[] serial, Function<byte[], TYPE> deserializer, Supplier<String> onFailKeyStringSupplier, byte[] onFailOrigValue, String onFailWarnMsgFmt) {
try {
return deserializer.apply(serial);
} catch (Exception e) {
if(unreadableValuesDumpDir != null) {
return deserializer.apply(serial);
}
catch (Exception e) {
if (unreadableValuesDumpDir != null) {
dumpToFile(onFailOrigValue, onFailKeyStringSupplier.get(), unreadableValuesDumpDir, store.getName(), objectMapper);
}
// With trace also print the stacktrace
log.warn(onFailWarnMsgFmt, onFailKeyStringSupplier.get(), (Throwable) (log.isTraceEnabled()? e : null));
log.warn(onFailWarnMsgFmt, onFailKeyStringSupplier.get(), (Throwable) (log.isTraceEnabled() ? e : null));
}
return null;
}
Expand All @@ -275,79 +275,82 @@ public void remove(KEY key) {
/**
* Serialize value with {@code valueWriter}.
*/
private ByteIterable writeValue(VALUE value) {
private byte[] writeValue(VALUE value) {
return write(value, valueWriter);
}

/**
* Serialize key with {@code keyWriter}.
*/
private ByteIterable writeKey(KEY key) {
private byte[] writeKey(KEY key) {
return write(key, keyWriter);
}

/**
* Deserialize value with {@code valueReader}.
*/
private VALUE readValue(ByteIterable value) {
private VALUE readValue(byte[] value) {
return read(valueReader, value);
}

/**
* Deserialize value with {@code keyReader}.
*/
private KEY readKey(ByteIterable key) {
private KEY readKey(byte[] key) {
return read(keyReader, key);
}

/**
* Try writing object with writer.
*/
private ByteIterable write(Object obj, ObjectWriter writer) {
private byte[] write(Object obj, ObjectWriter writer) {
try {
byte[] bytes = writer.writeValueAsBytes(obj);
if (log.isTraceEnabled()) {
String json = JacksonUtil.toJsonDebug(bytes);
log.trace("Written Messagepack ({}): {}", valueType.getName(), json);
}
return new ArrayByteIterable(bytes);
} catch (JsonProcessingException e) {
return bytes;
}
catch (JsonProcessingException e) {
throw new RuntimeException("Failed to write " + obj, e);
}
}

/**
* Try read value with reader.
*/
private <T> T read(ObjectReader reader, ByteIterable obj) {
private <T> T read(ObjectReader reader, byte[] obj) {
if (obj == null) {
return null;
}
try {
return reader.readValue(obj.getBytesUnsafe(), 0, obj.getLength());
} catch (IOException e) {
throw new RuntimeException("Failed to read " + JacksonUtil.toJsonDebug(obj.getBytesUnsafe()), e);
return reader.readValue(obj, 0, obj.length);
}
catch (IOException e) {
throw new RuntimeException("Failed to read " + JacksonUtil.toJsonDebug(obj), e);
}
}

/**
* Dumps the content of an unreadable value to a file as a json (it tries to parse it as an object and than tries to dump it as a json).
* @param obj The object to dump.
* @param keyOfDump The key under which the unreadable value is accessible. It is used for the file name.
*
* @param obj The object to dump.
* @param keyOfDump The key under which the unreadable value is accessible. It is used for the file name.
* @param unreadableDumpDir The director to dump to. The method assumes that the directory exists and is okay to write to.
* @param storeName The name of the store which is also used in the dump file name.
* @param storeName The name of the store which is also used in the dump file name.
*/
private static void dumpToFile(@NonNull ByteIterable obj, @NonNull String keyOfDump, @NonNull File unreadableDumpDir, @NonNull String storeName, ObjectMapper objectMapper) {
private static void dumpToFile(byte[] obj, @NonNull String keyOfDump, @NonNull File unreadableDumpDir, @NonNull String storeName, ObjectMapper objectMapper) {
// Create dump filehandle
File dumpfile = new File(unreadableDumpDir, makeDumpfileName(keyOfDump, storeName));
if(dumpfile.exists()) {
log.warn("Abort dumping of file {} because it already exists.",dumpfile);
if (dumpfile.exists()) {
log.warn("Abort dumping of file {} because it already exists.", dumpfile);
return;
}
// Write dump
try {
log.info("Dumping value of key {} to {} (because it cannot be deserialized anymore).", keyOfDump, dumpfile.getCanonicalPath());
JsonNode dump = objectMapper.readerFor(JsonNode.class).readValue(obj.getBytesUnsafe(), 0, obj.getLength());
JsonNode dump = objectMapper.readerFor(JsonNode.class).readValue(obj, 0, obj.length);
Jackson.MAPPER.writer().writeValue(dumpfile, dump);
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import com.bakdata.conquery.io.storage.RawStore;
import com.google.common.primitives.Ints;
import jetbrains.exodus.ArrayByteIterable;
import jetbrains.exodus.ByteIterable;
import jetbrains.exodus.env.Cursor;
import jetbrains.exodus.env.Environment;
Expand All @@ -15,7 +17,7 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class XodusStore {
public class XodusStore implements RawStore {
private final Store store;
@Getter
private final Environment environment;
Expand All @@ -27,40 +29,41 @@ public class XodusStore {

public XodusStore(Environment env, String name, Consumer<XodusStore> storeCloseHook, Consumer<XodusStore> storeRemoveHook) {
// Arbitrary duration that is strictly shorter than the timeout to not get interrupted by StuckTxMonitor
this.timeoutHalfMillis = env.getEnvironmentConfig().getEnvMonitorTxnsTimeout()/2;
this.timeoutHalfMillis = env.getEnvironmentConfig().getEnvMonitorTxnsTimeout() / 2;
this.name = name;
this.environment = env;
this.storeCloseHook = storeCloseHook;
this.storeRemoveHook = storeRemoveHook;
this.store = env.computeInTransaction(
t->env.openStore(this.name, StoreConfig.WITHOUT_DUPLICATES_WITH_PREFIXING, t)
t -> env.openStore(this.name, StoreConfig.WITHOUT_DUPLICATES_WITH_PREFIXING, t)
);
}
public boolean add(ByteIterable key, ByteIterable value) {
return environment.computeInTransaction(t -> store.add(t, key, value));

public boolean add(byte[] key, byte[] value) {
return environment.computeInTransaction(t -> store.add(t, new ArrayByteIterable(key), new ArrayByteIterable(value)));
}

public ByteIterable get(ByteIterable key) {
return environment.computeInReadonlyTransaction(t -> store.get(t, key));
public byte[] get(byte[] key) {
return environment.computeInReadonlyTransaction(t -> store.get(t, new ArrayByteIterable(key))).getBytesUnsafe();
}

/**
* Iterate over all key-value pairs in a consistent manner.
* The transaction is read only!
*
* @param consumer function called for-each key-value pair.
*/
public void forEach(BiConsumer<ByteIterable, ByteIterable> consumer) {
public void forEach(BiConsumer<byte[], byte[]> consumer) {
AtomicReference<ByteIterable> lastKey = new AtomicReference<>();
AtomicBoolean done = new AtomicBoolean(false);
while(!done.get()) {
while (!done.get()) {
environment.executeInReadonlyTransaction(t -> {
try(Cursor c = store.openCursor(t)) {
try (Cursor c = store.openCursor(t)) {
//try to load everything in the same transaction
//but keep within half of the timeout time
long start = System.currentTimeMillis();
//search where we left of
if(lastKey.get() != null) {
if (lastKey.get() != null) {
c.getSearchKey(lastKey.get());
}
while(System.currentTimeMillis()-start < timeoutHalfMillis) {
Expand All @@ -69,19 +72,19 @@ public void forEach(BiConsumer<ByteIterable, ByteIterable> consumer) {
return;
}
lastKey.set(c.getKey());
consumer.accept(lastKey.get(), c.getValue());
consumer.accept(lastKey.get().getBytesUnsafe(), c.getValue().getBytesUnsafe());
}
}
});
}
}

public boolean update(ByteIterable key, ByteIterable value) {
return environment.computeInTransaction(t -> store.put(t, key, value));
public boolean update(byte[] key, byte[] value) {
return environment.computeInTransaction(t -> store.put(t, new ArrayByteIterable(key), new ArrayByteIterable(value)));
}
public boolean remove(ByteIterable key) {
return environment.computeInTransaction(t -> store.delete(t, key));

public boolean remove(byte[] key) {
return environment.computeInTransaction(t -> store.delete(t, new ArrayByteIterable(key)));
}

public int count() {
Expand Down

0 comments on commit 61ba8d0

Please sign in to comment.