From d3075df17db7101c7787a821ccf9abf8a7632b36 Mon Sep 17 00:00:00 2001 From: Max Thonagel <12283268+thoniTUB@users.noreply.github.com> Date: Thu, 12 Sep 2024 10:36:35 +0200 Subject: [PATCH 1/5] Bring back type structure for frontend --- .../apiv1/frontend/FrontendResultType.java | 28 +++++++++++++++++++ .../apiv1/frontend/FrontendSelect.java | 2 +- .../concepts/FrontEndConceptBuilder.java | 3 +- 3 files changed, 31 insertions(+), 2 deletions(-) create mode 100644 backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendResultType.java diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendResultType.java b/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendResultType.java new file mode 100644 index 0000000000..4a574dd295 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendResultType.java @@ -0,0 +1,28 @@ +package com.bakdata.conquery.apiv1.frontend; + +import jakarta.validation.constraints.NotNull; + +import com.bakdata.conquery.models.types.ResultType; +import lombok.experimental.SuperBuilder; + +@SuperBuilder +public class FrontendResultType { + @NotNull + String type; + + @SuperBuilder + public static class List extends FrontendResultType { + @NotNull + FrontendResultType elementType; + } + + public static FrontendResultType from(ResultType resultType) { + if (resultType instanceof ResultType.ListT listT) { + return List.builder() + .elementType(from(listT.getElementType())) + .type(listT.typeInfo()).build(); + } + + return FrontendResultType.builder().type(resultType.typeInfo()).build(); + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendSelect.java b/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendSelect.java index 95f9e1c94b..4bf5cf5f4f 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendSelect.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendSelect.java @@ -14,7 +14,7 @@ public class FrontendSelect { private SelectId id; private String label; private String description; - private String resultType; + private FrontendResultType resultType; @JsonProperty("default") private Boolean isDefault; } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/FrontEndConceptBuilder.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/FrontEndConceptBuilder.java index 721fc2c920..6dfb73ab7e 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/FrontEndConceptBuilder.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/FrontEndConceptBuilder.java @@ -12,6 +12,7 @@ import com.bakdata.conquery.apiv1.frontend.FrontendFilterConfiguration; import com.bakdata.conquery.apiv1.frontend.FrontendList; import com.bakdata.conquery.apiv1.frontend.FrontendNode; +import com.bakdata.conquery.apiv1.frontend.FrontendResultType; import com.bakdata.conquery.apiv1.frontend.FrontendRoot; import com.bakdata.conquery.apiv1.frontend.FrontendSecondaryId; import com.bakdata.conquery.apiv1.frontend.FrontendSelect; @@ -172,7 +173,7 @@ public FrontendSelect createSelect(Select select) { .id(select.getId()) .label(select.getLabel()) .description(select.getDescription()) - .resultType(select.getResultType().typeInfo()) + .resultType(FrontendResultType.from(select.getResultType())) .isDefault(select.isDefault()) .build(); } From ccc446262330ecd34f80631c988ad84f3368494c Mon Sep 17 00:00:00 2001 From: Max Thonagel <12283268+thoniTUB@users.noreply.github.com> Date: Thu, 12 Sep 2024 11:47:51 +0200 Subject: [PATCH 2/5] adds getters to container type --- .../bakdata/conquery/apiv1/frontend/FrontendResultType.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendResultType.java b/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendResultType.java index 4a574dd295..db203da0f2 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendResultType.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendResultType.java @@ -3,13 +3,16 @@ import jakarta.validation.constraints.NotNull; import com.bakdata.conquery.models.types.ResultType; +import lombok.Getter; import lombok.experimental.SuperBuilder; +@Getter @SuperBuilder public class FrontendResultType { @NotNull String type; + @Getter @SuperBuilder public static class List extends FrontendResultType { @NotNull From 06d30b46debbfc943f445a863bd32d364e51d441 Mon Sep 17 00:00:00 2001 From: Max Thonagel <12283268+thoniTUB@users.noreply.github.com> Date: Thu, 12 Sep 2024 13:11:39 +0200 Subject: [PATCH 3/5] don't use typeInfo for list type --- .../com/bakdata/conquery/apiv1/frontend/FrontendResultType.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendResultType.java b/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendResultType.java index db203da0f2..22d81d5937 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendResultType.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendResultType.java @@ -23,7 +23,7 @@ public static FrontendResultType from(ResultType resultType) { if (resultType instanceof ResultType.ListT listT) { return List.builder() .elementType(from(listT.getElementType())) - .type(listT.typeInfo()).build(); + .type("LIST").build(); } return FrontendResultType.builder().type(resultType.typeInfo()).build(); From dc6289f6fc6e2a5ba8627923d3ca7c93ff449945 Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Tue, 17 Sep 2024 10:37:16 +0200 Subject: [PATCH 4/5] use ConcurrentHashMap where concurrent access is possible. --- .../bakdata/conquery/commands/ShardNode.java | 6 +- .../bakdata/conquery/io/mina/ChunkReader.java | 6 +- .../mode/cluster/ClusterConnectionShard.java | 4 +- .../mode/cluster/InternalMapperFactory.java | 9 +- .../models/jobs/UpdateFilterSearchJob.java | 9 +- .../network/NetworkMessageContext.java | 9 +- .../conquery/models/query/C10nCache.java | 8 +- .../conquery/models/query/FilterSearch.java | 11 +- .../{Workers.java => ShardWorkers.java} | 11 +- .../io/AbstractSerializationTest.java | 7 +- .../types/ColumnStoreSerializationTests.java | 4 +- .../conquery/util/NonPersistentStore.java | 141 +++++++++--------- 12 files changed, 109 insertions(+), 116 deletions(-) rename backend/src/main/java/com/bakdata/conquery/models/worker/{Workers.java => ShardWorkers.java} (92%) diff --git a/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java b/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java index aac321e32a..a1e084264b 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java @@ -10,8 +10,8 @@ import com.bakdata.conquery.mode.cluster.ClusterConnectionShard; import com.bakdata.conquery.mode.cluster.InternalMapperFactory; import com.bakdata.conquery.models.config.ConqueryConfig; +import com.bakdata.conquery.models.worker.ShardWorkers; import com.bakdata.conquery.models.worker.Worker; -import com.bakdata.conquery.models.worker.Workers; import com.bakdata.conquery.util.io.ConqueryMDC; import io.dropwizard.core.ConfiguredBundle; import io.dropwizard.core.setup.Environment; @@ -33,7 +33,7 @@ public class ShardNode implements ConfiguredBundle { private final String name; @Setter - private Workers workers; + private ShardWorkers workers; private ClusterConnectionShard clusterConnection; public ShardNode() { @@ -51,7 +51,7 @@ public void run(ConqueryConfig config, Environment environment) throws Exception InternalMapperFactory internalMapperFactory = new InternalMapperFactory(config, environment.getValidator()); - workers = new Workers( + workers = new ShardWorkers( config.getQueries().getExecutionPool(), internalMapperFactory, config.getCluster().getEntityBucketSize(), diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkReader.java b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkReader.java index 8fd06fdaa6..a5fd805263 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkReader.java +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkReader.java @@ -6,10 +6,10 @@ import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; -import java.util.HashMap; -import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import com.bakdata.conquery.io.jackson.JacksonUtil; import com.fasterxml.jackson.databind.JsonNode; @@ -107,7 +107,7 @@ private MessageManager getMessageManager(IoSession session) { @Getter @RequiredArgsConstructor public static class MessageManager { - private final Map messages = new HashMap<>(); + private final ConcurrentMap messages = new ConcurrentHashMap<>(); private UUID lastId = null; private ChunkedMessage.List lastMessage = null; diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java index bf229a2125..54cfe44459 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java @@ -22,9 +22,9 @@ import com.bakdata.conquery.models.messages.network.specific.RegisterWorker; import com.bakdata.conquery.models.messages.network.specific.UpdateJobManagerStatus; import com.bakdata.conquery.models.worker.IdResolveContext; +import com.bakdata.conquery.models.worker.ShardWorkers; import com.bakdata.conquery.models.worker.Worker; import com.bakdata.conquery.models.worker.WorkerInformation; -import com.bakdata.conquery.models.worker.Workers; import com.fasterxml.jackson.databind.ObjectMapper; import io.dropwizard.core.setup.Environment; import io.dropwizard.lifecycle.Managed; @@ -49,7 +49,7 @@ public class ClusterConnectionShard implements Managed, IoHandler { private final ConqueryConfig config; private final Environment environment; - private final Workers workers; + private final ShardWorkers workers; private final InternalMapperFactory internalMapperFactory; private JobManager jobManager; diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java index e4ab7af4bd..655a340fd7 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java @@ -1,17 +1,16 @@ package com.bakdata.conquery.mode.cluster; -import jakarta.validation.Validator; - import com.bakdata.conquery.io.jackson.Jackson; import com.bakdata.conquery.io.jackson.MutableInjectableValues; import com.bakdata.conquery.io.jackson.View; import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.worker.DatasetRegistry; -import com.bakdata.conquery.models.worker.Workers; +import com.bakdata.conquery.models.worker.ShardWorkers; import com.fasterxml.jackson.databind.DeserializationConfig; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationConfig; +import jakarta.validation.Validator; public record InternalMapperFactory(ConqueryConfig config, Validator validator) { @@ -19,7 +18,7 @@ public ObjectMapper createShardCommunicationMapper() { return createInternalObjectMapper(View.InternalCommunication.class); } - public ObjectMapper createWorkerCommunicationMapper(Workers workers) { + public ObjectMapper createWorkerCommunicationMapper(ShardWorkers workers) { final ObjectMapper objectMapper = createInternalObjectMapper(View.InternalCommunication.class); workers.injectInto(objectMapper); @@ -27,7 +26,7 @@ public ObjectMapper createWorkerCommunicationMapper(Workers workers) { return objectMapper; } - public ObjectMapper createWorkerPersistenceMapper(Workers workers) { + public ObjectMapper createWorkerPersistenceMapper(ShardWorkers workers) { final ObjectMapper objectMapper = createInternalObjectMapper(View.Persistence.Shard.class); workers.injectInto(objectMapper); diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/UpdateFilterSearchJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/UpdateFilterSearchJob.java index 023633f288..c4307ca9b8 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/UpdateFilterSearchJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/UpdateFilterSearchJob.java @@ -3,10 +3,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -76,8 +76,7 @@ public void execute() throws Exception { // Most computations are cheap but data intensive: we fork here to use as many cores as possible. final ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() - 1); - final HashMap> searchCache = new HashMap<>(); - final Map> synchronizedResult = Collections.synchronizedMap(searchCache); + final Map> searchCache = new ConcurrentHashMap<>(); log.debug("Found {} searchable Objects.", collectedSearchables.values().stream().mapToLong(Set::size).sum()); @@ -95,7 +94,7 @@ public void execute() throws Exception { try { final TrieSearch search = searchable.createTrieSearch(indexConfig); - synchronizedResult.put(searchable, search); + searchCache.put(searchable, search); log.debug( "DONE collecting {} entries for `{}`, within {}", @@ -125,7 +124,7 @@ public void execute() throws Exception { service.shutdownNow(); return; } - log.debug("Still waiting for {} to finish.", Sets.difference(collectedSearchables.get(Searchable.class), synchronizedResult.keySet())); + log.debug("Still waiting for {} to finish.", Sets.difference(collectedSearchables.get(Searchable.class), searchCache.keySet())); } // Shrink searches before registering in the filter search diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/network/NetworkMessageContext.java b/backend/src/main/java/com/bakdata/conquery/models/messages/network/NetworkMessageContext.java index f975d9a3b8..5816f91dd1 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/network/NetworkMessageContext.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/network/NetworkMessageContext.java @@ -1,7 +1,5 @@ package com.bakdata.conquery.models.messages.network; -import jakarta.validation.Validator; - import com.bakdata.conquery.commands.ManagerNode; import com.bakdata.conquery.commands.ShardNode; import com.bakdata.conquery.io.mina.MessageSender; @@ -10,7 +8,8 @@ import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.DistributedNamespace; -import com.bakdata.conquery.models.worker.Workers; +import com.bakdata.conquery.models.worker.ShardWorkers; +import jakarta.validation.Validator; import lombok.Getter; @Getter @@ -32,12 +31,12 @@ public boolean isConnected() { @Getter public static class ShardNodeNetworkContext extends NetworkMessageContext { - private final Workers workers; + private final ShardWorkers workers; private final ConqueryConfig config; private final Validator validator; private final NetworkSession rawSession; - public ShardNodeNetworkContext(NetworkSession session, Workers workers, ConqueryConfig config, Validator validator) { + public ShardNodeNetworkContext(NetworkSession session, ShardWorkers workers, ConqueryConfig config, Validator validator) { super(session, config.getCluster().getBackpressure()); this.workers = workers; this.config = config; diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/C10nCache.java b/backend/src/main/java/com/bakdata/conquery/models/query/C10nCache.java index f4519ca988..11238e9f52 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/C10nCache.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/C10nCache.java @@ -1,18 +1,18 @@ package com.bakdata.conquery.models.query; -import java.util.HashMap; import java.util.Locale; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import c10n.C10N; import lombok.experimental.UtilityClass; @UtilityClass public class C10nCache { - private static Map> cache = new HashMap<>(); + private static ConcurrentMap> cache = new ConcurrentHashMap<>(); public T getLocalized(Class clazz, Locale locale) { - return (T) cache.computeIfAbsent(locale, (ignored) -> new HashMap<>()) + return (T) cache.computeIfAbsent(locale, (ignored) -> new ConcurrentHashMap<>()) .computeIfAbsent(clazz, ignored -> C10N.get(clazz, locale)); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/FilterSearch.java b/backend/src/main/java/com/bakdata/conquery/models/query/FilterSearch.java index 35b60f3416..cd7ae58607 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/FilterSearch.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/FilterSearch.java @@ -2,11 +2,12 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; import com.bakdata.conquery.apiv1.frontend.FrontendValue; @@ -34,8 +35,8 @@ public class FilterSearch { * In the code below, the keys of this map will usually be called "reference". */ @JsonIgnore - private Map> searchCache = new HashMap<>(); - private Map, Integer> totals = new HashMap<>(); + private ConcurrentMap> searchCache = new ConcurrentHashMap<>(); + private ConcurrentMap, Integer> totals = new ConcurrentHashMap<>(); /** * From a given {@link FrontendValue} extract all relevant keywords. @@ -127,7 +128,7 @@ public void shrinkSearch(Searchable searchable) { } public synchronized void clearSearch() { - totals = new HashMap<>(); - searchCache = new HashMap<>(); + totals = new ConcurrentHashMap<>(); + searchCache = new ConcurrentHashMap<>(); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/Workers.java b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardWorkers.java similarity index 92% rename from backend/src/main/java/com/bakdata/conquery/models/worker/Workers.java rename to backend/src/main/java/com/bakdata/conquery/models/worker/ShardWorkers.java index f7861315b8..d365519aa9 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/Workers.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardWorkers.java @@ -1,13 +1,11 @@ package com.bakdata.conquery.models.worker; -import java.util.HashMap; -import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; -import jakarta.validation.Validator; import com.bakdata.conquery.commands.ShardNode; import com.bakdata.conquery.io.storage.WorkerStorage; @@ -22,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.ObjectMapper; import io.dropwizard.lifecycle.Managed; +import jakarta.validation.Validator; import lombok.Getter; import lombok.NonNull; import lombok.Setter; @@ -33,13 +32,13 @@ * Each Shard contains one {@link Worker} per {@link Dataset}. */ @Slf4j -public class Workers extends IdResolveContext implements Managed { +public class ShardWorkers extends IdResolveContext implements Managed { @Getter @Setter private AtomicInteger nextWorker = new AtomicInteger(0); @Getter private final ConcurrentHashMap workers = new ConcurrentHashMap<>(); @JsonIgnore - private final transient Map dataset2Worker = new HashMap<>(); + private final transient ConcurrentMap dataset2Worker = new ConcurrentHashMap<>(); /** * Shared ExecutorService among Workers for Jobs. @@ -54,7 +53,7 @@ public class Workers extends IdResolveContext implements Managed { private final int secondaryIdSubPlanRetention; - public Workers(ThreadPoolDefinition queryThreadPoolDefinition, InternalMapperFactory internalMapperFactory, int entityBucketSize, int secondaryIdSubPlanRetention) { + public ShardWorkers(ThreadPoolDefinition queryThreadPoolDefinition, InternalMapperFactory internalMapperFactory, int entityBucketSize, int secondaryIdSubPlanRetention) { this.queryThreadPoolDefinition = queryThreadPoolDefinition; // TODO This shouldn't be coupled to the query thread pool definition diff --git a/backend/src/test/java/com/bakdata/conquery/io/AbstractSerializationTest.java b/backend/src/test/java/com/bakdata/conquery/io/AbstractSerializationTest.java index 2d98d606df..64c67acf55 100644 --- a/backend/src/test/java/com/bakdata/conquery/io/AbstractSerializationTest.java +++ b/backend/src/test/java/com/bakdata/conquery/io/AbstractSerializationTest.java @@ -2,8 +2,6 @@ import static org.mockito.Mockito.mock; -import jakarta.validation.Validator; - import com.bakdata.conquery.io.jackson.Jackson; import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.io.storage.NamespaceStorage; @@ -15,10 +13,11 @@ import com.bakdata.conquery.models.index.IndexService; import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.DistributedNamespace; -import com.bakdata.conquery.models.worker.Workers; +import com.bakdata.conquery.models.worker.ShardWorkers; import com.bakdata.conquery.util.NonPersistentStoreFactory; import com.fasterxml.jackson.databind.ObjectMapper; import io.dropwizard.jersey.validation.Validators; +import jakarta.validation.Validator; import lombok.Getter; import org.junit.jupiter.api.BeforeEach; @@ -62,7 +61,7 @@ public void before() { namespaceStorage.updateDataset(new Dataset("serialization_test")); // Prepare shard node internal mapper - final Workers workers = mock(Workers.class); + final ShardWorkers workers = mock(ShardWorkers.class); shardInternalMapper = internalMapperFactory.createWorkerPersistenceMapper(workers); // Prepare api mapper with a Namespace injected (usually done by PathParamInjector) diff --git a/backend/src/test/java/com/bakdata/conquery/models/events/stores/types/ColumnStoreSerializationTests.java b/backend/src/test/java/com/bakdata/conquery/models/events/stores/types/ColumnStoreSerializationTests.java index 06f41ef571..6811691944 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/events/stores/types/ColumnStoreSerializationTests.java +++ b/backend/src/test/java/com/bakdata/conquery/models/events/stores/types/ColumnStoreSerializationTests.java @@ -34,7 +34,7 @@ import com.bakdata.conquery.models.events.stores.specific.ScaledDecimalStore; import com.bakdata.conquery.models.exceptions.JSONException; import com.bakdata.conquery.models.identifiable.CentralRegistry; -import com.bakdata.conquery.models.worker.Workers; +import com.bakdata.conquery.models.worker.ShardWorkers; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import io.dropwizard.jersey.validation.Validators; @@ -62,7 +62,7 @@ public static void setupRegistry() { // Prepare shard node internal mapper InternalMapperFactory internalMapperFactory = new InternalMapperFactory(new ConqueryConfig(), Validators.newValidator()); - shardInternalMapper = internalMapperFactory.createWorkerPersistenceMapper(mock(Workers.class)); + shardInternalMapper = internalMapperFactory.createWorkerPersistenceMapper(mock(ShardWorkers.class)); } @Test diff --git a/backend/src/test/java/com/bakdata/conquery/util/NonPersistentStore.java b/backend/src/test/java/com/bakdata/conquery/util/NonPersistentStore.java index e0f2ed9363..216d858144 100644 --- a/backend/src/test/java/com/bakdata/conquery/util/NonPersistentStore.java +++ b/backend/src/test/java/com/bakdata/conquery/util/NonPersistentStore.java @@ -2,8 +2,8 @@ import java.io.IOException; import java.util.Collection; -import java.util.HashMap; -import java.util.function.BiConsumer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import com.bakdata.conquery.io.storage.Store; import com.bakdata.conquery.io.storage.xodus.stores.SerializingStore; @@ -12,74 +12,71 @@ @ToString public class NonPersistentStore implements Store { - private final HashMap map = new HashMap<>(); - - @Override - public void add(KEY key, VALUE value) { - map.put(key, value); - } - - @Override - public VALUE get(KEY key) { - return map.get(key); - } - - @Override - public SerializingStore.IterationStatistic forEach(StoreEntryConsumer consumer) { - final SerializingStore.IterationStatistic stats = new SerializingStore.IterationStatistic(); - map.forEach(new BiConsumer() { - @Override - public void accept(KEY key, VALUE value) { - consumer.accept(key,value, stats.getTotalProcessed()); - stats.incrTotalProcessed(); - - } - }); - return stats; - } - - @Override - public void update(KEY key, VALUE value) { - map.put(key,value); - } - - @Override - public void remove(KEY key) { - map.remove(key); - } - - @Override - public void loadData() { - - } - - @Override - public int count() { - return map.size(); - } - - @Override - public Collection getAll() { - return map.values(); - } - - @Override - public Collection getAllKeys() { - return map.keySet(); - } - - @Override - public void clear() { - map.clear(); - } - - @Override - public void removeStore() { - clear(); - } - - @Override - public void close() throws IOException { - // Nothing to close - } + private final ConcurrentMap map = new ConcurrentHashMap<>(); + + @Override + public void add(KEY key, VALUE value) { + map.put(key, value); + } + + @Override + public VALUE get(KEY key) { + return map.get(key); + } + + @Override + public SerializingStore.IterationStatistic forEach(StoreEntryConsumer consumer) { + final SerializingStore.IterationStatistic stats = new SerializingStore.IterationStatistic(); + map.forEach((key, value) -> { + consumer.accept(key, value, stats.getTotalProcessed()); + stats.incrTotalProcessed(); + + }); + return stats; + } + + @Override + public void update(KEY key, VALUE value) { + map.put(key, value); + } + + @Override + public void remove(KEY key) { + map.remove(key); + } + + @Override + public void loadData() { + + } + + @Override + public int count() { + return map.size(); + } + + @Override + public Collection getAll() { + return map.values(); + } + + @Override + public Collection getAllKeys() { + return map.keySet(); + } + + @Override + public void removeStore() { + clear(); + } + + @Override + public void clear() { + map.clear(); + } + + @Override + public void close() throws IOException { + // Nothing to close + } } From 796515c66b41a7af00f2029699790157975704d2 Mon Sep 17 00:00:00 2001 From: Max Thonagel <12283268+thoniTUB@users.noreply.github.com> Date: Thu, 19 Sep 2024 15:39:19 +0200 Subject: [PATCH 5/5] adds getters to container type --- .../bakdata/conquery/mode/cluster/InternalMapperFactory.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java index 655a340fd7..9aa102eb54 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/InternalMapperFactory.java @@ -1,5 +1,7 @@ package com.bakdata.conquery.mode.cluster; +import jakarta.validation.Validator; + import com.bakdata.conquery.io.jackson.Jackson; import com.bakdata.conquery.io.jackson.MutableInjectableValues; import com.bakdata.conquery.io.jackson.View; @@ -10,7 +12,6 @@ import com.fasterxml.jackson.databind.DeserializationConfig; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationConfig; -import jakarta.validation.Validator; public record InternalMapperFactory(ConqueryConfig config, Validator validator) { @@ -35,7 +36,7 @@ public ObjectMapper createWorkerPersistenceMapper(ShardWorkers workers) { } public ObjectMapper createNamespacePersistenceMapper(DatasetRegistry datasetRegistry) { - final ObjectMapper objectMapper = createInternalObjectMapper(View.Persistence.Shard.class); + final ObjectMapper objectMapper = createInternalObjectMapper(View.Persistence.Manager.class); datasetRegistry.injectInto(objectMapper);