Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/capsule object mapper creator #3528

Merged
merged 3 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ public void startStandalone(Environment environment, Namespace namespace, Conque
sc.run(clone, environment);
}

ConqueryMDC.setLocation("ManagerNode");
log.debug("Waiting for ShardNodes to start");

// starts the Jersey Server
ConqueryMDC.setLocation("ManagerNode");
log.debug("Starting REST Server");
ConqueryMDC.setLocation(null);
super.run(environment, namespace, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
import jakarta.validation.Validator;

import com.bakdata.conquery.io.cps.CPSTypeIdResolver;
import com.bakdata.conquery.io.jackson.MutableInjectableValues;
import com.bakdata.conquery.io.jackson.PathParamInjector;
import com.bakdata.conquery.io.jackson.View;
import com.bakdata.conquery.io.jersey.RESTServer;
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.io.storage.NamespaceStorage;
Expand All @@ -31,9 +29,7 @@
import com.bakdata.conquery.tasks.PermissionCleanupTask;
import com.bakdata.conquery.tasks.QueryCleanupTask;
import com.bakdata.conquery.tasks.ReloadMetaStorageTask;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationConfig;
import com.google.common.base.Throwables;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.jersey.DropwizardResourceConfig;
Expand Down Expand Up @@ -84,8 +80,8 @@ public void run(Manager manager) throws InterruptedException {

this.manager = manager;

final ObjectMapper objectMapper = environment.getObjectMapper();
customizeApiObjectMapper(objectMapper, getDatasetRegistry(), getMetaStorage(), config, validator);
final ObjectMapper apiObjectMapper = environment.getObjectMapper();
getInternalMapperFactory().customizeApiObjectMapper(apiObjectMapper, getDatasetRegistry(), getMetaStorage());


// FormScanner needs to be instantiated before plugins are initialized
Expand Down Expand Up @@ -172,59 +168,9 @@ protected void configure() {
jerseyConfig.register(PathParamInjector.class);
}

/**
* Customize the mapper from the environment, that is used in the REST-API.
* In contrast to the internal object mapper this uses textual JSON representation
* instead of the binary smile format. It also does not expose internal fields through serialization.
* <p>
* Internal and external mapper have in common that they might process the same classes/objects and that
* they are configured to understand certain Conquery specific data types.
*
* @param objectMapper to be configured (should be a JSON mapper)
*/
public static void customizeApiObjectMapper(
ObjectMapper objectMapper,
DatasetRegistry<?> datasetRegistry,
MetaStorage metaStorage,
ConqueryConfig config,
Validator validator) {

// Set serialization config
SerializationConfig serializationConfig = objectMapper.getSerializationConfig();

serializationConfig = serializationConfig.withView(View.Api.class);

objectMapper.setConfig(serializationConfig);

// Set deserialization config
DeserializationConfig deserializationConfig = objectMapper.getDeserializationConfig();

deserializationConfig = deserializationConfig.withView(View.Api.class);

objectMapper.setConfig(deserializationConfig);

final MutableInjectableValues injectableValues = new MutableInjectableValues();
objectMapper.setInjectableValues(injectableValues);
injectableValues.add(Validator.class, validator);

datasetRegistry.injectInto(objectMapper);
metaStorage.injectInto(objectMapper);
config.injectInto(objectMapper);
}

/**
* Create a new internal object mapper for binary (de-)serialization that is equipped with {@link ManagerNode} related injectables.
*
* @return a preconfigured binary object mapper
* @see ManagerNode#customizeApiObjectMapper(ObjectMapper, DatasetRegistry, MetaStorage, ConqueryConfig, Validator)
*/
public ObjectMapper createInternalObjectMapper(Class<? extends View> viewClass) {
return getInternalObjectMapperCreator().createInternalObjectMapper(viewClass);
}

private void loadMetaStorage() {
log.info("Opening MetaStorage");
getMetaStorage().openStores(createInternalObjectMapper(View.Persistence.Manager.class));
getMetaStorage().openStores(getInternalMapperFactory().createManagerPersistenceMapper(getDatasetRegistry(), getMetaStorage()));
log.info("Loading MetaStorage");
getMetaStorage().loadData();
log.info("MetaStorage loaded {}", getMetaStorage());
Expand Down
46 changes: 5 additions & 41 deletions backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,14 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import jakarta.validation.Validator;

import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.io.jackson.MutableInjectableValues;
import com.bakdata.conquery.io.jackson.View;
import com.bakdata.conquery.io.storage.WorkerStorage;
import com.bakdata.conquery.mode.cluster.ClusterConnectionShard;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.worker.Worker;
import com.bakdata.conquery.models.worker.Workers;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationConfig;
import io.dropwizard.core.ConfiguredBundle;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.lifecycle.setup.LifecycleEnvironment;
Expand Down Expand Up @@ -55,19 +49,19 @@ public ShardNode(String name) {
public void run(ConqueryConfig config, Environment environment) throws Exception {
LifecycleEnvironment lifecycle = environment.lifecycle();


InternalMapperFactory internalMapperFactory = new InternalMapperFactory(config, environment.getValidator());
workers = new Workers(
config.getQueries().getExecutionPool(),
() -> createInternalObjectMapper(View.Persistence.Shard.class, config, environment.getValidator()),
() -> createInternalObjectMapper(View.InternalCommunication.class, config, environment.getValidator()),
internalMapperFactory,
config.getCluster().getEntityBucketSize(),
config.getQueries().getSecondaryIdSubPlanRetention()
);

lifecycle.manage(workers);


clusterConnection =
new ClusterConnectionShard(config, environment, workers, () -> createInternalObjectMapper(View.InternalCommunication.class, config, environment.getValidator()));
new ClusterConnectionShard(config, environment, workers, internalMapperFactory);

lifecycle.manage(clusterConnection);

Expand Down Expand Up @@ -102,37 +96,7 @@ public void run(ConqueryConfig config, Environment environment) throws Exception
log.info("All Worker loaded: {}", workers.getWorkers().size());
}

/**
* Pendant to {@link ManagerNode#createInternalObjectMapper(Class)}.
* <p>
* TODO May move to {@link ConqueryCommand}
*
* @return a preconfigured binary object mapper
*/
public static ObjectMapper createInternalObjectMapper(Class<? extends View> viewClass, ConqueryConfig config, Validator validator) {
final ObjectMapper objectMapper = config.configureObjectMapper(Jackson.copyMapperAndInjectables(Jackson.BINARY_MAPPER));

final MutableInjectableValues injectableValues = new MutableInjectableValues();
objectMapper.setInjectableValues(injectableValues);
injectableValues.add(Validator.class, validator);


// Set serialization config
SerializationConfig serializationConfig = objectMapper.getSerializationConfig();

serializationConfig = serializationConfig.withView(viewClass);

objectMapper.setConfig(serializationConfig);

// Set deserialization config
DeserializationConfig deserializationConfig = objectMapper.getDeserializationConfig();

deserializationConfig = deserializationConfig.withView(viewClass);

objectMapper.setConfig(deserializationConfig);

return objectMapper;
}

public boolean isBusy() {
return clusterConnection.isBusy() || workers.isBusy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.function.Supplier;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
Expand All @@ -29,13 +30,9 @@ public class DelegateManager<N extends Namespace> implements Manager {
StorageListener storageListener;
Supplier<Collection<ShardNodeInformation>> nodeProvider;
List<Task> adminTasks;
InternalObjectMapperCreator internalObjectMapperCreator;
InternalMapperFactory internalMapperFactory;
JobManager jobManager;

@Override
public void start() throws Exception {
}

@Override
public void stop() throws Exception {
jobManager.close();
Expand Down

This file was deleted.

3 changes: 2 additions & 1 deletion backend/src/main/java/com/bakdata/conquery/mode/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.function.Supplier;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
Expand All @@ -25,7 +26,7 @@ public interface Manager extends Managed {
StorageListener getStorageListener();
Supplier<Collection<ShardNodeInformation>> getNodeProvider();
List<Task> getAdminTasks();
InternalObjectMapperCreator getInternalObjectMapperCreator();
InternalMapperFactory getInternalMapperFactory();
JobManager getJobManager();

MetaStorage getMetaStorage();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.bakdata.conquery.mode;

import jakarta.validation.Validator;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.index.IndexService;
import com.bakdata.conquery.models.jobs.JobManager;
Expand All @@ -23,20 +21,16 @@ static JobManager newJobManager(ConqueryConfig config) {
return new JobManager(JOB_MANAGER_NAME, config.isFailOnError());
}

static InternalObjectMapperCreator newInternalObjectMapperCreator(ConqueryConfig config, MetaStorage metaStorage, Validator validator) {
return new InternalObjectMapperCreator(config, metaStorage, validator);
}

static <N extends Namespace> DatasetRegistry<N> createDatasetRegistry(
NamespaceHandler<N> namespaceHandler,
ConqueryConfig config,
InternalObjectMapperCreator creator
InternalMapperFactory internalMapperFactory
) {
final IndexService indexService = new IndexService(config.getCsv().createCsvParserSettings(), config.getIndex().getEmptyLabel());
return new DatasetRegistry<>(
config.getCluster().getEntityBucketSize(),
config,
creator,
internalMapperFactory,
namespaceHandler,
indexService
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

import com.bakdata.conquery.io.jackson.Injectable;
import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.io.jackson.View;
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
import com.bakdata.conquery.models.index.IndexService;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.query.FilterSearch;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.Namespace;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.core.setup.Environment;
Expand All @@ -24,21 +24,21 @@
*/
public interface NamespaceHandler<N extends Namespace> {

N createNamespace(NamespaceStorage storage, MetaStorage metaStorage, IndexService indexService, Environment environment);
N createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, DatasetRegistry<N> datasetRegistry, Environment environment);

void removeNamespace(DatasetId id, N namespace);

/**
* Creates the {@link NamespaceSetupData} that is shared by all {@link Namespace} types.
*/
static NamespaceSetupData createNamespaceSetup(NamespaceStorage storage, final ConqueryConfig config, final InternalObjectMapperCreator mapperCreator, IndexService indexService) {
static NamespaceSetupData createNamespaceSetup(NamespaceStorage storage, final ConqueryConfig config, final InternalMapperFactory internalMapperFactory, DatasetRegistry<?> datasetRegistry) {
List<Injectable> injectables = new ArrayList<>();
injectables.add(indexService);
injectables.add(datasetRegistry);
injectables.add(storage);

ObjectMapper persistenceMapper = mapperCreator.createInternalObjectMapper(View.Persistence.Manager.class);
ObjectMapper communicationMapper = mapperCreator.createInternalObjectMapper(View.InternalCommunication.class);
ObjectMapper preprocessMapper = mapperCreator.createInternalObjectMapper(null);
ObjectMapper persistenceMapper = internalMapperFactory.createNamespacePersistenceMapper(datasetRegistry);
ObjectMapper communicationMapper = internalMapperFactory.createManagerCommunicationMapper(datasetRegistry);
ObjectMapper preprocessMapper = internalMapperFactory.createPreprocessMapper(datasetRegistry);

injectables.forEach(i -> {
i.injectInto(persistenceMapper);
Expand All @@ -54,7 +54,7 @@ static NamespaceSetupData createNamespaceSetup(NamespaceStorage storage, final C
JobManager jobManager = new JobManager(storage.getDataset().getName(), config.isFailOnError());

FilterSearch filterSearch = new FilterSearch(config.getIndex());
return new NamespaceSetupData(injectables, indexService, communicationMapper, preprocessMapper, jobManager, filterSearch);
return new NamespaceSetupData(injectables, communicationMapper, preprocessMapper, jobManager, filterSearch);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.util.List;

import com.bakdata.conquery.io.jackson.Injectable;
import com.bakdata.conquery.models.index.IndexService;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.query.FilterSearch;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -15,7 +14,6 @@
@Value
public class NamespaceSetupData {
List<Injectable> injectables;
IndexService indexService;
ObjectMapper communicationMapper;
ObjectMapper preprocessMapper;
JobManager jobManager;
Expand Down
Loading
Loading