From 61ba8d0be02c136b31f826e8795a3076cdfb03e3 Mon Sep 17 00:00:00 2001 From: Max Thonagel <12283268+thoniTUB@users.noreply.github.com> Date: Thu, 5 May 2022 12:44:20 +0200 Subject: [PATCH] refactor xodus out of serializing store --- .../bakdata/conquery/io/storage/RawStore.java | 25 ++++++ .../xodus/stores/SerializingStore.java | 89 ++++++++++--------- .../io/storage/xodus/stores/XodusStore.java | 39 ++++---- 3 files changed, 92 insertions(+), 61 deletions(-) create mode 100644 backend/src/main/java/com/bakdata/conquery/io/storage/RawStore.java diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/RawStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/RawStore.java new file mode 100644 index 0000000000..d3cc73a911 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/RawStore.java @@ -0,0 +1,25 @@ +package com.bakdata.conquery.io.storage; + +import java.util.function.BiConsumer; + +public interface RawStore { + boolean add(byte[] writeKey, byte[] writeValue); + + byte[] get(byte[] writeKey); + + String getName(); + + void forEach(BiConsumer consumer); + + boolean remove(byte[] bytes); + + boolean update(byte[] writeKey, byte[] writeValue); + + void clear(); + + void deleteStore(); + + void close(); + + int count(); +} diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java index 0a8f7295ce..fabf425257 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java @@ -13,6 +13,7 @@ import com.bakdata.conquery.io.jackson.Jackson; import com.bakdata.conquery.io.jackson.JacksonUtil; +import com.bakdata.conquery.io.storage.RawStore; import com.bakdata.conquery.io.storage.Store; import com.bakdata.conquery.models.config.XodusStoreFactory; import com.bakdata.conquery.models.exceptions.ValidatorHelper; @@ -23,8 +24,6 @@ import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.common.base.Throwables; -import jetbrains.exodus.ArrayByteIterable; -import jetbrains.exodus.ByteIterable; import lombok.Data; import lombok.NonNull; import lombok.ToString; @@ -62,7 +61,7 @@ public class SerializingStore implements Store { /** * Deserializer for values */ - private ObjectReader valueReader; + private final ObjectReader valueReader; /** * Optional validator used for serialization. @@ -72,7 +71,7 @@ public class SerializingStore implements Store { /** * The underlying store to write the values to. */ - private final XodusStore store; + private final RawStore store; /** * @@ -94,8 +93,7 @@ public class SerializingStore implements Store { private final ObjectMapper objectMapper; - @SuppressWarnings("unchecked") - public , CLASS_V extends Class> SerializingStore(XodusStore store, + public , CLASS_V extends Class> SerializingStore(RawStore store, Validator validator, ObjectMapper objectMapper, CLASS_K keyType, @@ -146,7 +144,7 @@ public void add(KEY key, VALUE value) { @Override public VALUE get(KEY key) { - ByteIterable binValue = store.get(writeKey(key)); + byte[] binValue = store.get(writeKey(key)); try { return readValue(binValue); } catch (Exception e) { @@ -171,17 +169,17 @@ public VALUE get(KEY key) { @Override public IterationStatistic forEach(StoreEntryConsumer consumer) { IterationStatistic result = new IterationStatistic(); - ArrayList unreadables = new ArrayList<>(); + ArrayList unreadables = new ArrayList<>(); store.forEach((k, v) -> { result.incrTotalProcessed(); // Try to read the key first KEY key = getDeserializedAndDumpFailed( - k, - this::readKey, - () -> new String(k.getBytesUnsafe()), - v, - "Could not parse key [{}]"); + k, + this::readKey, + () -> new String(k), + v, + "Could not parse key [{}]"); if (key == null) { unreadables.add(k); result.incrFailedKeys(); @@ -203,7 +201,7 @@ public IterationStatistic forEach(StoreEntryConsumer consumer) { // Apply the consumer to key and value try { - consumer.accept(key, value, v.getLength()); + consumer.accept(key, value, v.length); } catch (Exception e) { log.warn("Unable to apply for-each consumer on key[{}]", key, e); @@ -229,26 +227,28 @@ public IterationStatistic forEach(StoreEntryConsumer consumer) { } return result; } - + /** * Deserializes the gives serial value (either a key or a value of an store entry) to a concrete object. If that fails the entry-value is dumped if configured so to a file using the entry-key for the filename. - * @param The deserialized object type. - * @param serial The to be deserialized object (key or value of an entry) - * @param deserializer The concrete deserializer to use. + * + * @param The deserialized object type. + * @param serial The to be deserialized object (key or value of an entry) + * @param deserializer The concrete deserializer to use. * @param onFailKeyStringSupplier When deserilization failed and dump is enabled this is used in the dump file name. - * @param onFailOrigValue Will be the dumpfile content rendered as a json. - * @param onFailWarnMsgFmt The warn message that will be logged on failure. + * @param onFailOrigValue Will be the dumpfile content rendered as a json. + * @param onFailWarnMsgFmt The warn message that will be logged on failure. * @return The deserialized value */ - private TYPE getDeserializedAndDumpFailed(ByteIterable serial, Function deserializer, Supplier onFailKeyStringSupplier, ByteIterable onFailOrigValue, String onFailWarnMsgFmt ){ + private TYPE getDeserializedAndDumpFailed(byte[] serial, Function deserializer, Supplier onFailKeyStringSupplier, byte[] onFailOrigValue, String onFailWarnMsgFmt) { try { - return deserializer.apply(serial); - } catch (Exception e) { - if(unreadableValuesDumpDir != null) { + return deserializer.apply(serial); + } + catch (Exception e) { + if (unreadableValuesDumpDir != null) { dumpToFile(onFailOrigValue, onFailKeyStringSupplier.get(), unreadableValuesDumpDir, store.getName(), objectMapper); } // With trace also print the stacktrace - log.warn(onFailWarnMsgFmt, onFailKeyStringSupplier.get(), (Throwable) (log.isTraceEnabled()? e : null)); + log.warn(onFailWarnMsgFmt, onFailKeyStringSupplier.get(), (Throwable) (log.isTraceEnabled() ? e : null)); } return null; } @@ -275,43 +275,44 @@ public void remove(KEY key) { /** * Serialize value with {@code valueWriter}. */ - private ByteIterable writeValue(VALUE value) { + private byte[] writeValue(VALUE value) { return write(value, valueWriter); } /** * Serialize key with {@code keyWriter}. */ - private ByteIterable writeKey(KEY key) { + private byte[] writeKey(KEY key) { return write(key, keyWriter); } /** * Deserialize value with {@code valueReader}. */ - private VALUE readValue(ByteIterable value) { + private VALUE readValue(byte[] value) { return read(valueReader, value); } /** * Deserialize value with {@code keyReader}. */ - private KEY readKey(ByteIterable key) { + private KEY readKey(byte[] key) { return read(keyReader, key); } /** * Try writing object with writer. */ - private ByteIterable write(Object obj, ObjectWriter writer) { + private byte[] write(Object obj, ObjectWriter writer) { try { byte[] bytes = writer.writeValueAsBytes(obj); if (log.isTraceEnabled()) { String json = JacksonUtil.toJsonDebug(bytes); log.trace("Written Messagepack ({}): {}", valueType.getName(), json); } - return new ArrayByteIterable(bytes); - } catch (JsonProcessingException e) { + return bytes; + } + catch (JsonProcessingException e) { throw new RuntimeException("Failed to write " + obj, e); } } @@ -319,35 +320,37 @@ private ByteIterable write(Object obj, ObjectWriter writer) { /** * Try read value with reader. */ - private T read(ObjectReader reader, ByteIterable obj) { + private T read(ObjectReader reader, byte[] obj) { if (obj == null) { return null; } try { - return reader.readValue(obj.getBytesUnsafe(), 0, obj.getLength()); - } catch (IOException e) { - throw new RuntimeException("Failed to read " + JacksonUtil.toJsonDebug(obj.getBytesUnsafe()), e); + return reader.readValue(obj, 0, obj.length); + } + catch (IOException e) { + throw new RuntimeException("Failed to read " + JacksonUtil.toJsonDebug(obj), e); } } /** * Dumps the content of an unreadable value to a file as a json (it tries to parse it as an object and than tries to dump it as a json). - * @param obj The object to dump. - * @param keyOfDump The key under which the unreadable value is accessible. It is used for the file name. + * + * @param obj The object to dump. + * @param keyOfDump The key under which the unreadable value is accessible. It is used for the file name. * @param unreadableDumpDir The director to dump to. The method assumes that the directory exists and is okay to write to. - * @param storeName The name of the store which is also used in the dump file name. + * @param storeName The name of the store which is also used in the dump file name. */ - private static void dumpToFile(@NonNull ByteIterable obj, @NonNull String keyOfDump, @NonNull File unreadableDumpDir, @NonNull String storeName, ObjectMapper objectMapper) { + private static void dumpToFile(byte[] obj, @NonNull String keyOfDump, @NonNull File unreadableDumpDir, @NonNull String storeName, ObjectMapper objectMapper) { // Create dump filehandle File dumpfile = new File(unreadableDumpDir, makeDumpfileName(keyOfDump, storeName)); - if(dumpfile.exists()) { - log.warn("Abort dumping of file {} because it already exists.",dumpfile); + if (dumpfile.exists()) { + log.warn("Abort dumping of file {} because it already exists.", dumpfile); return; } // Write dump try { log.info("Dumping value of key {} to {} (because it cannot be deserialized anymore).", keyOfDump, dumpfile.getCanonicalPath()); - JsonNode dump = objectMapper.readerFor(JsonNode.class).readValue(obj.getBytesUnsafe(), 0, obj.getLength()); + JsonNode dump = objectMapper.readerFor(JsonNode.class).readValue(obj, 0, obj.length); Jackson.MAPPER.writer().writeValue(dumpfile, dump); } catch (IOException e) { diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/XodusStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/XodusStore.java index e75a3ea74d..1914e87724 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/XodusStore.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/XodusStore.java @@ -5,7 +5,9 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; +import com.bakdata.conquery.io.storage.RawStore; import com.google.common.primitives.Ints; +import jetbrains.exodus.ArrayByteIterable; import jetbrains.exodus.ByteIterable; import jetbrains.exodus.env.Cursor; import jetbrains.exodus.env.Environment; @@ -15,7 +17,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class XodusStore { +public class XodusStore implements RawStore { private final Store store; @Getter private final Environment environment; @@ -27,40 +29,41 @@ public class XodusStore { public XodusStore(Environment env, String name, Consumer storeCloseHook, Consumer storeRemoveHook) { // Arbitrary duration that is strictly shorter than the timeout to not get interrupted by StuckTxMonitor - this.timeoutHalfMillis = env.getEnvironmentConfig().getEnvMonitorTxnsTimeout()/2; + this.timeoutHalfMillis = env.getEnvironmentConfig().getEnvMonitorTxnsTimeout() / 2; this.name = name; this.environment = env; this.storeCloseHook = storeCloseHook; this.storeRemoveHook = storeRemoveHook; this.store = env.computeInTransaction( - t->env.openStore(this.name, StoreConfig.WITHOUT_DUPLICATES_WITH_PREFIXING, t) + t -> env.openStore(this.name, StoreConfig.WITHOUT_DUPLICATES_WITH_PREFIXING, t) ); } - - public boolean add(ByteIterable key, ByteIterable value) { - return environment.computeInTransaction(t -> store.add(t, key, value)); + + public boolean add(byte[] key, byte[] value) { + return environment.computeInTransaction(t -> store.add(t, new ArrayByteIterable(key), new ArrayByteIterable(value))); } - public ByteIterable get(ByteIterable key) { - return environment.computeInReadonlyTransaction(t -> store.get(t, key)); + public byte[] get(byte[] key) { + return environment.computeInReadonlyTransaction(t -> store.get(t, new ArrayByteIterable(key))).getBytesUnsafe(); } /** * Iterate over all key-value pairs in a consistent manner. * The transaction is read only! + * * @param consumer function called for-each key-value pair. */ - public void forEach(BiConsumer consumer) { + public void forEach(BiConsumer consumer) { AtomicReference lastKey = new AtomicReference<>(); AtomicBoolean done = new AtomicBoolean(false); - while(!done.get()) { + while (!done.get()) { environment.executeInReadonlyTransaction(t -> { - try(Cursor c = store.openCursor(t)) { + try (Cursor c = store.openCursor(t)) { //try to load everything in the same transaction //but keep within half of the timeout time long start = System.currentTimeMillis(); //search where we left of - if(lastKey.get() != null) { + if (lastKey.get() != null) { c.getSearchKey(lastKey.get()); } while(System.currentTimeMillis()-start < timeoutHalfMillis) { @@ -69,19 +72,19 @@ public void forEach(BiConsumer consumer) { return; } lastKey.set(c.getKey()); - consumer.accept(lastKey.get(), c.getValue()); + consumer.accept(lastKey.get().getBytesUnsafe(), c.getValue().getBytesUnsafe()); } } }); } } - public boolean update(ByteIterable key, ByteIterable value) { - return environment.computeInTransaction(t -> store.put(t, key, value)); + public boolean update(byte[] key, byte[] value) { + return environment.computeInTransaction(t -> store.put(t, new ArrayByteIterable(key), new ArrayByteIterable(value))); } - - public boolean remove(ByteIterable key) { - return environment.computeInTransaction(t -> store.delete(t, key)); + + public boolean remove(byte[] key) { + return environment.computeInTransaction(t -> store.delete(t, new ArrayByteIterable(key))); } public int count() {