Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/untangle dataset registry and meta storage #3502

Merged
merged 5 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.glassfish.jersey.internal.inject.AbstractBinder;

/**
* Central node of Conquery. Hosts the frontend, api, meta data and takes care of query distribution to
* Central node of Conquery. Hosts the frontend, api, metadata and takes care of query distribution to
* {@link ShardNode}s and respectively the {@link Worker}s hosted on them. The {@link ManagerNode} can also
* forward queries or results to statistic backends. Finally, it collects the results of queries for access over the api.
*/
Expand Down Expand Up @@ -113,7 +113,7 @@ public void run(Manager manager) throws InterruptedException {
// Create AdminServlet first to make it available to the realms
admin = new AdminServlet(this);

authController = new AuthorizationController(getStorage(), config, environment, admin);
authController = new AuthorizationController(getMetaStorage(), config, environment, admin);
environment.lifecycle().manage(authController);

// Register default components for the admin interface
Expand Down Expand Up @@ -145,14 +145,14 @@ public void run(Manager manager) throws InterruptedException {
private void registerTasks(Manager manager, Environment environment, ConqueryConfig config) {
environment.admin().addTask(formScanner);
environment.admin().addTask(
new QueryCleanupTask(getStorage(), Duration.of(
new QueryCleanupTask(getMetaStorage(), Duration.of(
config.getQueries().getOldQueriesTime().getQuantity(),
config.getQueries().getOldQueriesTime().getUnit().toChronoUnit()
)));

environment.admin().addTask(new PermissionCleanupTask(getStorage()));
environment.admin().addTask(new PermissionCleanupTask(getMetaStorage()));
manager.getAdminTasks().forEach(environment.admin()::addTask);
environment.admin().addTask(new ReloadMetaStorageTask(getStorage()));
environment.admin().addTask(new ReloadMetaStorageTask(getMetaStorage()));

final ShutdownTask shutdown = new ShutdownTask();
environment.admin().addTask(shutdown);
Expand All @@ -164,7 +164,7 @@ private void configureApiServlet(ConqueryConfig config, DropwizardResourceConfig
jerseyConfig.register(new AbstractBinder() {
@Override
protected void configure() {
bind(getStorage()).to(MetaStorage.class);
bind(getMetaStorage()).to(MetaStorage.class);
bind(getDatasetRegistry()).to(DatasetRegistry.class);
}
});
Expand Down Expand Up @@ -203,7 +203,7 @@ public void customizeApiObjectMapper(ObjectMapper objectMapper) {
injectableValues.add(Validator.class, getValidator());

getDatasetRegistry().injectInto(objectMapper);
getStorage().injectInto(objectMapper);
getMetaStorage().injectInto(objectMapper);
getConfig().injectInto(objectMapper);
}

Expand All @@ -219,10 +219,10 @@ public ObjectMapper createInternalObjectMapper(Class<? extends View> viewClass)

private void loadMetaStorage() {
log.info("Opening MetaStorage");
getStorage().openStores(getInternalObjectMapperCreator().createInternalObjectMapper(View.Persistence.Manager.class));
getMetaStorage().openStores(getInternalObjectMapperCreator().createInternalObjectMapper(View.Persistence.Manager.class));
log.info("Loading MetaStorage");
getStorage().loadData();
log.info("MetaStorage loaded {}", getStorage());
getMetaStorage().loadData();
log.info("MetaStorage loaded {}", getMetaStorage());
}

@SneakyThrows(InterruptedException.class)
Expand All @@ -236,7 +236,7 @@ public void loadNamespaces() {
final Collection<NamespaceStorage> namespaceStorages = getConfig().getStorage().discoverNamespaceStorages();
for (NamespaceStorage namespaceStorage : namespaceStorages) {
loaders.submit(() -> {
registry.createNamespace(namespaceStorage);
registry.createNamespace(namespaceStorage, getMetaStorage());
});
}

Expand All @@ -262,16 +262,16 @@ public void stop() throws Exception {
provider.close();
}
catch (Exception e) {
log.error(provider + " could not be closed", e);
log.error("{} could not be closed", provider, e);
}

}

try {
getStorage().close();
getMetaStorage().close();
}
catch (Exception e) {
log.error("{} could not be closed", getStorage(), e);
log.error("{} could not be closed", getMetaStorage(), e);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,14 @@
import java.util.InputMismatchException;
import java.util.Optional;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.models.identifiable.CentralRegistry;
import com.bakdata.conquery.models.identifiable.Identifiable;
import com.bakdata.conquery.models.identifiable.ids.Id;
import com.bakdata.conquery.models.identifiable.ids.IdUtil;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.BeanDescription;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.deser.ContextualDeserializer;
import com.fasterxml.jackson.databind.deser.SettableBeanProperty;
import com.fasterxml.jackson.databind.jsontype.TypeDeserializer;
Expand All @@ -42,7 +38,7 @@ public T deserialize(JsonParser parser, DeserializationContext ctxt) throws IOEx
ID id = ctxt.readValue(parser, idClass);

try {
final CentralRegistry centralRegistry = CentralRegistry.get(ctxt);
final CentralRegistry centralRegistry = MetaStorage.get(ctxt).getCentralRegistry();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

würde es sich anbieten die CentralRegistry umzubennen, wenn sie hier nur für die MetaIds verantwortlich ist?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also => Registry

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ich würde davon absehen, da die CentralRegistry mit dem Caching verschwindet


// Not all Components have registries, we leave it up to the validator to be angry.
if (centralRegistry == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,9 @@
import com.bakdata.conquery.models.execution.ManagedExecution;
import com.bakdata.conquery.models.forms.configs.FormConfig;
import com.bakdata.conquery.models.identifiable.CentralRegistry;
import com.bakdata.conquery.models.identifiable.ids.specific.FormConfigId;
import com.bakdata.conquery.models.identifiable.ids.specific.GroupId;
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
import com.bakdata.conquery.models.identifiable.ids.specific.RoleId;
import com.bakdata.conquery.models.identifiable.ids.specific.UserId;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.Namespace;
import com.bakdata.conquery.models.identifiable.ids.specific.*;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand All @@ -34,8 +30,6 @@ public class MetaStorage extends ConqueryStorage implements Injectable {
protected final CentralRegistry centralRegistry = new CentralRegistry();
private final StoreFactory storageFactory;

@Getter
protected final DatasetRegistry<? extends Namespace> datasetRegistry;
Comment on lines -37 to -38
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@awildturtok this went away

private IdentifiableStore<ManagedExecution> executions;
private IdentifiableStore<FormConfig> formConfigs;
private IdentifiableStore<User> authUser;
Expand All @@ -47,8 +41,8 @@ public void openStores(ObjectMapper mapper) {
authRole = storageFactory.createRoleStore(centralRegistry, "meta", this, mapper);
authGroup = storageFactory.createGroupStore(centralRegistry, "meta", this, mapper);
// Executions depend on users
executions = storageFactory.createExecutionsStore(centralRegistry, datasetRegistry, "meta", mapper);
formConfigs = storageFactory.createFormConfigStore(centralRegistry, datasetRegistry, "meta", mapper);
executions = storageFactory.createExecutionsStore(centralRegistry, "meta", mapper);
formConfigs = storageFactory.createFormConfigStore(centralRegistry, "meta", mapper);

}

Expand Down Expand Up @@ -196,4 +190,8 @@ public void addFormConfig(FormConfig formConfig) {
public MutableInjectableValues inject(MutableInjectableValues values) {
return values.add(MetaStorage.class, this);
}

public static MetaStorage get(DeserializationContext ctxt) throws JsonMappingException {
return (MetaStorage) ctxt.findInjectableValue(MetaStorage.class.getName(), null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.bakdata.conquery.models.worker.WorkerToBucketsMap;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import jakarta.validation.Validator;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -32,7 +31,7 @@ public class NamespaceStorage extends NamespacedStorage {

protected CachedStore<String, Integer> entity2Bucket;

public NamespaceStorage(StoreFactory storageFactory, String pathName, Validator validator) {
public NamespaceStorage(StoreFactory storageFactory, String pathName) {
super(storageFactory, pathName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.bakdata.conquery.models.identifiable.ids.specific.ImportId;
import com.bakdata.conquery.models.identifiable.ids.specific.SecondaryIdDescriptionId;
import com.bakdata.conquery.models.identifiable.ids.specific.TableId;
import com.bakdata.conquery.models.worker.SingletonNamespaceCollection;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import lombok.Getter;
Expand Down Expand Up @@ -56,7 +57,8 @@ public NamespacedStorage(StoreFactory storageFactory, String pathName) {
}

public void openStores(ObjectMapper objectMapper) {

// Before we start to parse the stores we need to replace the injected value for the IdResolveContext (from DatasetRegistry to this centralRegistry)
new SingletonNamespaceCollection(centralRegistry).injectInto(objectMapper);

dataset = storageFactory.createDatasetStore(pathName, objectMapper);
secondaryIds = storageFactory.createSecondaryIdDescriptionStore(centralRegistry, pathName, objectMapper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class DelegateManager<N extends Namespace> implements Manager {
ConqueryConfig config;
Environment environment;
DatasetRegistry<N> datasetRegistry;
MetaStorage storage;
ImportHandler importHandler;
StorageListener storageListener;
Supplier<Collection<ShardNodeInformation>> nodeProvider;
Expand All @@ -42,7 +43,7 @@ public void stop() throws Exception {
}

@Override
public MetaStorage getStorage() {
return datasetRegistry.getMetaStorage();
public MetaStorage getMetaStorage() {
return storage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@
@RequiredArgsConstructor
public class InternalObjectMapperCreator {
private final ConqueryConfig config;
private final MetaStorage storage;
private final Validator validator;
private DatasetRegistry<? extends Namespace> datasetRegistry = null;
private MetaStorage storage = null;

public void init(DatasetRegistry<? extends Namespace> datasetRegistry) {
this.datasetRegistry = datasetRegistry;
this.storage = datasetRegistry.getMetaStorage();
}

public ObjectMapper createInternalObjectMapper(@Nullable Class<? extends View> viewClass) {
Expand Down
5 changes: 3 additions & 2 deletions backend/src/main/java/com/bakdata/conquery/mode/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.Namespace;
import com.bakdata.conquery.models.worker.ShardNodeInformation;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.lifecycle.Managed;
import io.dropwizard.servlets.tasks.Task;
import io.dropwizard.core.setup.Environment;

/**
* A manager provides the implementations that differ by running mode.
Expand All @@ -27,5 +27,6 @@ public interface Manager extends Managed {
List<Task> getAdminTasks();
InternalObjectMapperCreator getInternalObjectMapperCreator();
JobManager getJobManager();
MetaStorage getStorage();

MetaStorage getMetaStorage();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ static JobManager newJobManager(ConqueryConfig config) {
return new JobManager(JOB_MANAGER_NAME, config.isFailOnError());
}

static InternalObjectMapperCreator newInternalObjectMapperCreator(ConqueryConfig config, Validator validator) {
return new InternalObjectMapperCreator(config, validator);
static InternalObjectMapperCreator newInternalObjectMapperCreator(ConqueryConfig config, MetaStorage metaStorage, Validator validator) {
return new InternalObjectMapperCreator(config, metaStorage, validator);
}

static <N extends Namespace> DatasetRegistry<N> createDatasetRegistry(
Expand All @@ -33,16 +33,13 @@ static <N extends Namespace> DatasetRegistry<N> createDatasetRegistry(
InternalObjectMapperCreator creator
) {
final IndexService indexService = new IndexService(config.getCsv().createCsvParserSettings(), config.getIndex().getEmptyLabel());
DatasetRegistry<N> datasetRegistry = new DatasetRegistry<>(
return new DatasetRegistry<>(
config.getCluster().getEntityBucketSize(),
config,
creator,
namespaceHandler,
indexService
);
MetaStorage storage = new MetaStorage(config.getStorage(), datasetRegistry);
datasetRegistry.setMetaStorage(storage);
return datasetRegistry;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;

import com.bakdata.conquery.io.jackson.Injectable;
import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.io.jackson.View;
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.io.storage.NamespaceStorage;
Expand Down Expand Up @@ -32,6 +33,7 @@ public interface NamespaceHandler<N extends Namespace> {
static NamespaceSetupData createNamespaceSetup(NamespaceStorage storage, final ConqueryConfig config, final InternalObjectMapperCreator mapperCreator, IndexService indexService) {
List<Injectable> injectables = new ArrayList<>();
injectables.add(indexService);

ObjectMapper persistenceMapper = mapperCreator.createInternalObjectMapper(View.Persistence.Manager.class);
ObjectMapper communicationMapper = mapperCreator.createInternalObjectMapper(View.InternalCommunication.class);
ObjectMapper preprocessMapper = mapperCreator.createInternalObjectMapper(null);
Expand All @@ -40,8 +42,9 @@ static NamespaceSetupData createNamespaceSetup(NamespaceStorage storage, final C
injectables.forEach(i -> i.injectInto(communicationMapper));
injectables.forEach(i -> i.injectInto(preprocessMapper));

// Open and load the stores
storage.openStores(persistenceMapper);

// Each store needs its own mapper because each injects its own registry
storage.openStores(Jackson.copyMapperAndInjectables(persistenceMapper));
storage.loadData();

JobManager jobManager = new JobManager(storage.getDataset().getName(), config.isFailOnError());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;
import java.util.function.Supplier;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.mode.*;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.jobs.JobManager;
Expand All @@ -19,7 +20,8 @@ public class ClusterManagerProvider implements ManagerProvider {

public ClusterManager provideManager(ConqueryConfig config, Environment environment) {
final JobManager jobManager = ManagerProvider.newJobManager(config);
final InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator());
final MetaStorage storage = new MetaStorage(config.getStorage());
final InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, storage, environment.getValidator());
final ClusterState clusterState = new ClusterState();
final NamespaceHandler<DistributedNamespace> namespaceHandler = new ClusterNamespaceHandler(clusterState, config, creator);
final DatasetRegistry<DistributedNamespace> datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator);
Expand All @@ -35,7 +37,7 @@ public ClusterManager provideManager(ConqueryConfig config, Environment environm

final DelegateManager<DistributedNamespace>
delegate =
new DelegateManager<>(config, environment, datasetRegistry, importHandler, extension, nodeProvider, adminTasks, creator, jobManager);
new DelegateManager<>(config, environment, datasetRegistry, storage, importHandler, extension, nodeProvider, adminTasks, creator, jobManager);

environment.healthChecks().register("cluster", new ClusterHealthCheck(clusterState));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;
import java.util.function.Supplier;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.mode.DelegateManager;
import com.bakdata.conquery.mode.InternalObjectMapperCreator;
import com.bakdata.conquery.mode.ManagerProvider;
Expand Down Expand Up @@ -32,15 +33,18 @@ public LocalManagerProvider(SqlDialectFactory dialectFactory) {

public DelegateManager<LocalNamespace> provideManager(ConqueryConfig config, Environment environment) {

InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator());
NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, creator, dialectFactory);
DatasetRegistry<LocalNamespace> datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator);
final MetaStorage storage = new MetaStorage(config.getStorage());
final InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, storage, environment.getValidator());
final NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, creator, dialectFactory);
final DatasetRegistry<LocalNamespace> datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator);

creator.init(datasetRegistry);

return new DelegateManager<>(
config,
environment,
datasetRegistry,
storage,
new FailingImportHandler(),
new LocalStorageListener(),
EMPTY_NODE_PROVIDER,
Expand Down
Loading
Loading