Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reintegrate Main #3561

Merged
merged 10 commits into from
Sep 25, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.bakdata.conquery.apiv1.frontend;

import jakarta.validation.constraints.NotNull;

import com.bakdata.conquery.models.types.ResultType;
import lombok.Getter;
import lombok.experimental.SuperBuilder;

@Getter
@SuperBuilder
public class FrontendResultType {
@NotNull
String type;

@Getter
@SuperBuilder
public static class List extends FrontendResultType {
@NotNull
FrontendResultType elementType;
}

public static FrontendResultType from(ResultType resultType) {
if (resultType instanceof ResultType.ListT<?> listT) {
return List.builder()
.elementType(from(listT.getElementType()))
.type("LIST").build();
}

return FrontendResultType.builder().type(resultType.typeInfo()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class FrontendSelect {
private SelectId id;
private String label;
private String description;
private String resultType;
private FrontendResultType resultType;
@JsonProperty("default")
private Boolean isDefault;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import com.bakdata.conquery.mode.cluster.ClusterConnectionShard;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.worker.ShardWorkers;
import com.bakdata.conquery.models.worker.Worker;
import com.bakdata.conquery.models.worker.Workers;
import com.bakdata.conquery.util.io.ConqueryMDC;
import io.dropwizard.core.ConfiguredBundle;
import io.dropwizard.core.setup.Environment;
Expand All @@ -33,7 +33,7 @@ public class ShardNode implements ConfiguredBundle<ConqueryConfig> {

private final String name;
@Setter
private Workers workers;
private ShardWorkers workers;
private ClusterConnectionShard clusterConnection;

public ShardNode() {
Expand All @@ -51,7 +51,7 @@ public void run(ConqueryConfig config, Environment environment) throws Exception


InternalMapperFactory internalMapperFactory = new InternalMapperFactory(config, environment.getValidator());
workers = new Workers(
workers = new ShardWorkers(
config.getQueries().getExecutionPool(),
internalMapperFactory,
config.getCluster().getEntityBucketSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import com.bakdata.conquery.io.jackson.JacksonUtil;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -107,7 +107,7 @@ private MessageManager getMessageManager(IoSession session) {
@Getter @RequiredArgsConstructor
public static class MessageManager {

private final Map<UUID, ChunkedMessage.List> messages = new HashMap<>();
private final ConcurrentMap<UUID, ChunkedMessage.List> messages = new ConcurrentHashMap<>();
private UUID lastId = null;
private ChunkedMessage.List lastMessage = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import com.bakdata.conquery.models.messages.network.specific.RegisterWorker;
import com.bakdata.conquery.models.messages.network.specific.UpdateJobManagerStatus;
import com.bakdata.conquery.models.worker.IdResolveContext;
import com.bakdata.conquery.models.worker.ShardWorkers;
import com.bakdata.conquery.models.worker.Worker;
import com.bakdata.conquery.models.worker.WorkerInformation;
import com.bakdata.conquery.models.worker.Workers;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.lifecycle.Managed;
Expand All @@ -49,7 +49,7 @@ public class ClusterConnectionShard implements Managed, IoHandler {

private final ConqueryConfig config;
private final Environment environment;
private final Workers workers;
private final ShardWorkers workers;
private final InternalMapperFactory internalMapperFactory;

private JobManager jobManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.Workers;
import com.bakdata.conquery.models.worker.ShardWorkers;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationConfig;
Expand All @@ -19,15 +19,15 @@ public ObjectMapper createShardCommunicationMapper() {
return createInternalObjectMapper(View.InternalCommunication.class);
}

public ObjectMapper createWorkerCommunicationMapper(Workers workers) {
public ObjectMapper createWorkerCommunicationMapper(ShardWorkers workers) {
final ObjectMapper objectMapper = createInternalObjectMapper(View.InternalCommunication.class);

workers.injectInto(objectMapper);

return objectMapper;
}

public ObjectMapper createWorkerPersistenceMapper(Workers workers) {
public ObjectMapper createWorkerPersistenceMapper(ShardWorkers workers) {
final ObjectMapper objectMapper = createInternalObjectMapper(View.Persistence.Shard.class);

workers.injectInto(objectMapper);
Expand All @@ -36,7 +36,7 @@ public ObjectMapper createWorkerPersistenceMapper(Workers workers) {
}

public ObjectMapper createNamespacePersistenceMapper(DatasetRegistry<?> datasetRegistry) {
final ObjectMapper objectMapper = createInternalObjectMapper(View.Persistence.Shard.class);
final ObjectMapper objectMapper = createInternalObjectMapper(View.Persistence.Manager.class);

datasetRegistry.injectInto(objectMapper);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.bakdata.conquery.apiv1.frontend.FrontendFilterConfiguration;
import com.bakdata.conquery.apiv1.frontend.FrontendList;
import com.bakdata.conquery.apiv1.frontend.FrontendNode;
import com.bakdata.conquery.apiv1.frontend.FrontendResultType;
import com.bakdata.conquery.apiv1.frontend.FrontendRoot;
import com.bakdata.conquery.apiv1.frontend.FrontendSecondaryId;
import com.bakdata.conquery.apiv1.frontend.FrontendSelect;
Expand Down Expand Up @@ -179,7 +180,7 @@ public FrontendSelect createSelect(Select select) {
.id(select.getId())
.label(select.getLabel())
.description(select.getDescription())
.resultType(select.getResultType().typeInfo())
.resultType(FrontendResultType.from(select.getResultType()))
.isDefault(select.isDefault())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -76,8 +76,7 @@ public void execute() throws Exception {
// Most computations are cheap but data intensive: we fork here to use as many cores as possible.
final ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() - 1);

final HashMap<Searchable, TrieSearch<FrontendValue>> searchCache = new HashMap<>();
final Map<Searchable, TrieSearch<FrontendValue>> synchronizedResult = Collections.synchronizedMap(searchCache);
final Map<Searchable, TrieSearch<FrontendValue>> searchCache = new ConcurrentHashMap<>();

log.debug("Found {} searchable Objects.", collectedSearchables.values().stream().mapToLong(Set::size).sum());

Expand All @@ -95,7 +94,7 @@ public void execute() throws Exception {
try {
final TrieSearch<FrontendValue> search = searchable.createTrieSearch(indexConfig);

synchronizedResult.put(searchable, search);
searchCache.put(searchable, search);

log.debug(
"DONE collecting {} entries for `{}`, within {}",
Expand Down Expand Up @@ -125,7 +124,7 @@ public void execute() throws Exception {
service.shutdownNow();
return;
}
log.debug("Still waiting for {} to finish.", Sets.difference(collectedSearchables.get(Searchable.class), synchronizedResult.keySet()));
log.debug("Still waiting for {} to finish.", Sets.difference(collectedSearchables.get(Searchable.class), searchCache.keySet()));
}

// Shrink searches before registering in the filter search
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.bakdata.conquery.models.messages.network;

import jakarta.validation.Validator;

import com.bakdata.conquery.commands.ManagerNode;
import com.bakdata.conquery.commands.ShardNode;
import com.bakdata.conquery.io.mina.MessageSender;
Expand All @@ -10,7 +8,8 @@
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.Workers;
import com.bakdata.conquery.models.worker.ShardWorkers;
import jakarta.validation.Validator;
import lombok.Getter;

@Getter
Expand All @@ -32,12 +31,12 @@ public boolean isConnected() {
@Getter
public static class ShardNodeNetworkContext extends NetworkMessageContext<MessageToManagerNode> {

private final Workers workers;
private final ShardWorkers workers;
private final ConqueryConfig config;
private final Validator validator;
private final NetworkSession rawSession;

public ShardNodeNetworkContext(NetworkSession session, Workers workers, ConqueryConfig config, Validator validator) {
public ShardNodeNetworkContext(NetworkSession session, ShardWorkers workers, ConqueryConfig config, Validator validator) {
super(session, config.getCluster().getBackpressure());
this.workers = workers;
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package com.bakdata.conquery.models.query;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import c10n.C10N;
import lombok.experimental.UtilityClass;

@UtilityClass
public class C10nCache {
private static Map<Locale, Map<Class, Object>> cache = new HashMap<>();
private static ConcurrentMap<Locale, ConcurrentMap<Class, Object>> cache = new ConcurrentHashMap<>();

public <T> T getLocalized(Class<? super T> clazz, Locale locale) {
return (T) cache.computeIfAbsent(locale, (ignored) -> new HashMap<>())
return (T) cache.computeIfAbsent(locale, (ignored) -> new ConcurrentHashMap<>())
.computeIfAbsent(clazz, ignored -> C10N.get(clazz, locale));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

import com.bakdata.conquery.apiv1.frontend.FrontendValue;
Expand Down Expand Up @@ -34,8 +35,8 @@ public class FilterSearch {
* In the code below, the keys of this map will usually be called "reference".
*/
@JsonIgnore
private Map<Searchable, TrieSearch<FrontendValue>> searchCache = new HashMap<>();
private Map<SelectFilter<?>, Integer> totals = new HashMap<>();
private ConcurrentMap<Searchable, TrieSearch<FrontendValue>> searchCache = new ConcurrentHashMap<>();
private ConcurrentMap<SelectFilter<?>, Integer> totals = new ConcurrentHashMap<>();

/**
* From a given {@link FrontendValue} extract all relevant keywords.
Expand Down Expand Up @@ -127,7 +128,7 @@ public void shrinkSearch(Searchable searchable) {
}

public synchronized void clearSearch() {
totals = new HashMap<>();
searchCache = new HashMap<>();
totals = new ConcurrentHashMap<>();
searchCache = new ConcurrentHashMap<>();
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package com.bakdata.conquery.models.worker;

import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import jakarta.validation.Validator;

import com.bakdata.conquery.commands.ShardNode;
import com.bakdata.conquery.io.storage.WorkerStorage;
Expand All @@ -22,6 +20,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.lifecycle.Managed;
import jakarta.validation.Validator;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
Expand All @@ -33,13 +32,13 @@
* Each Shard contains one {@link Worker} per {@link Dataset}.
*/
@Slf4j
public class Workers extends IdResolveContext implements Managed {
public class ShardWorkers extends IdResolveContext implements Managed {
@Getter @Setter
private AtomicInteger nextWorker = new AtomicInteger(0);
@Getter
private final ConcurrentHashMap<WorkerId, Worker> workers = new ConcurrentHashMap<>();
@JsonIgnore
private final transient Map<DatasetId, Worker> dataset2Worker = new HashMap<>();
private final transient ConcurrentMap<DatasetId, Worker> dataset2Worker = new ConcurrentHashMap<>();

/**
* Shared ExecutorService among Workers for Jobs.
Expand All @@ -54,7 +53,7 @@ public class Workers extends IdResolveContext implements Managed {
private final int secondaryIdSubPlanRetention;


public Workers(ThreadPoolDefinition queryThreadPoolDefinition, InternalMapperFactory internalMapperFactory, int entityBucketSize, int secondaryIdSubPlanRetention) {
public ShardWorkers(ThreadPoolDefinition queryThreadPoolDefinition, InternalMapperFactory internalMapperFactory, int entityBucketSize, int secondaryIdSubPlanRetention) {
this.queryThreadPoolDefinition = queryThreadPoolDefinition;

// TODO This shouldn't be coupled to the query thread pool definition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import static org.mockito.Mockito.mock;

import jakarta.validation.Validator;

import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.io.storage.NamespaceStorage;
Expand All @@ -15,10 +13,11 @@
import com.bakdata.conquery.models.index.IndexService;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.Workers;
import com.bakdata.conquery.models.worker.ShardWorkers;
import com.bakdata.conquery.util.NonPersistentStoreFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.jersey.validation.Validators;
import jakarta.validation.Validator;
import lombok.Getter;
import org.junit.jupiter.api.BeforeEach;

Expand Down Expand Up @@ -62,7 +61,7 @@ public void before() {
namespaceStorage.updateDataset(new Dataset("serialization_test"));

// Prepare shard node internal mapper
final Workers workers = mock(Workers.class);
final ShardWorkers workers = mock(ShardWorkers.class);
shardInternalMapper = internalMapperFactory.createWorkerPersistenceMapper(workers);

// Prepare api mapper with a Namespace injected (usually done by PathParamInjector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import com.bakdata.conquery.models.events.stores.specific.ScaledDecimalStore;
import com.bakdata.conquery.models.exceptions.JSONException;
import com.bakdata.conquery.models.identifiable.CentralRegistry;
import com.bakdata.conquery.models.worker.Workers;
import com.bakdata.conquery.models.worker.ShardWorkers;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.dropwizard.jersey.validation.Validators;
Expand Down Expand Up @@ -62,7 +62,7 @@ public static void setupRegistry() {

// Prepare shard node internal mapper
InternalMapperFactory internalMapperFactory = new InternalMapperFactory(new ConqueryConfig(), Validators.newValidator());
shardInternalMapper = internalMapperFactory.createWorkerPersistenceMapper(mock(Workers.class));
shardInternalMapper = internalMapperFactory.createWorkerPersistenceMapper(mock(ShardWorkers.class));
}

@Test
Expand Down
Loading
Loading