Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split imports on preprocess #3389

Merged
merged 47 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
faaa7f8
initial impl to split cqpps on write
awildturtok Apr 10, 2024
ff1499c
Update backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJ…
awildturtok Apr 10, 2024
f38d321
adds bucket as preprocess param
awildturtok Apr 11, 2024
0f2a151
minor fix to CachedStore
awildturtok Apr 23, 2024
4917e0a
use new store in RestartTest (old environments were closed but worked…
awildturtok Apr 23, 2024
ec06b0e
fix bollean parameters
awildturtok May 2, 2024
09618e5
count buckets instead of exception-oriented flow
awildturtok Jun 3, 2024
729f41e
review feedback
awildturtok Jun 5, 2024
8b77f38
rearrange ImportJob methods into ClusterImportHandler.
awildturtok Jun 5, 2024
a1c22d4
cleanup
awildturtok Jun 12, 2024
bc747b4
Merge branch 'develop' into feature/split-cqpps
awildturtok Jun 12, 2024
bd3a5c7
map validation exceptions to http error
awildturtok Jun 12, 2024
9601c87
fix dropped param
awildturtok Jun 12, 2024
b5c49d4
Merge branch 'develop' into feature/split-cqpps
awildturtok Jun 13, 2024
83bd498
delay CalulateCBlocksJob to speedup import AND batching of CBlocksJob
awildturtok Jun 17, 2024
b897477
cleanup of toString and logging in ImportJob.java
awildturtok Jun 17, 2024
428646c
Merge remote-tracking branch 'origin/feature/split-cqpps' into featur…
awildturtok Jun 17, 2024
bff7c07
adds missing CPSType anno
awildturtok Jun 17, 2024
4acdf86
fix CPSBase of StartCalculateCblocks
awildturtok Jun 24, 2024
02ba25a
TODOs for entity2Bucket
awildturtok Jun 24, 2024
2afbc9d
inline sending of buckets only do postprocessing in slowqueue
awildturtok Jun 24, 2024
92c3561
cleanup
awildturtok Jun 24, 2024
bdb2048
fixes ENTITY_TO_BUCKET type signature
awildturtok Jun 24, 2024
88090ba
fix NonPersistentStoreFactory types
awildturtok Jun 24, 2024
0fddfa3
experiment: drop porgressReporter. We suspect directExecutor might in…
awildturtok Jun 25, 2024
5f47569
log pool information while waiting
awildturtok Jun 25, 2024
e3046e3
reduce logging
awildturtok Jun 25, 2024
32bd184
drop way too coarse mutex
awildturtok Jun 25, 2024
6937f16
fixes logging
awildturtok Jun 25, 2024
b33e282
restructure entities registration
awildturtok Jun 25, 2024
d7a4dd7
simplify CalculateCBlocksJob
awildturtok Jun 25, 2024
858bb9b
fix ProgrammaticTests
awildturtok Jun 25, 2024
753aa1e
adds new adminEndpointInfo
awildturtok Jun 25, 2024
735aec8
adds missing fix CPSBase of StartCalculateCblocks phase
awildturtok Jun 25, 2024
5de8f20
fixes a bug with entities not getting registered
awildturtok Jun 26, 2024
1e3a077
logging cleanup
awildturtok Jun 26, 2024
020e08e
reverts some schema changes to make migration to old storage possible
awildturtok Jul 3, 2024
be059f1
Merge remote-tracking branch 'origin/develop' into feature/split-cqpps
awildturtok Jul 4, 2024
ee1a236
change log-level of WorkerStorage messages for Bucket/CBlock management
awildturtok Jul 4, 2024
23f745b
makes ProgressReporterImpl.java multithreaded
awildturtok Jul 8, 2024
f38c0cc
use ConcurrentHashMap in ConceptTreeCache as it might cause race-cond…
awildturtok Jul 9, 2024
488130d
store Cache using Optional to circumvent null limitation of Concurren…
awildturtok Jul 9, 2024
7ebff2a
minor logging fixes for CalculateCBlocksJob.java
awildturtok Jul 9, 2024
5b959b3
revert deferral of calculating CBlocks. Instead we use an adapted imp…
awildturtok Jul 18, 2024
67033ed
Merge branch 'develop' into feature/split-cqpps
awildturtok Jul 18, 2024
f439350
fix missed calculateCBlocks reference
awildturtok Jul 18, 2024
7cce1c0
Merge remote-tracking branch 'origin/develop' into feature/split-cqpps
awildturtok Aug 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import net.sourceforge.argparse4j.inf.ArgumentGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.jetbrains.annotations.NotNull;

@Slf4j
@FieldNameConstants
Expand All @@ -52,7 +53,7 @@ public class PreprocessorCommand extends ConqueryCommand {
private final List<String> failed = Collections.synchronizedList(new ArrayList<>());
private final List<String> success = Collections.synchronizedList(new ArrayList<>());
private ExecutorService pool;
private boolean isFailFast = false;
private boolean isFailFast;
private boolean isStrict = true;

public PreprocessorCommand() {
Expand All @@ -71,14 +72,14 @@ public static boolean requiresProcessing(PreprocessingJob preprocessingJob) {

log.info("EXISTS ALREADY");

int currentHash = preprocessingJob.getDescriptor()
.calculateValidityHash(preprocessingJob.getCsvDirectory(), preprocessingJob.getTag());
final int currentHash = preprocessingJob.getDescriptor()
.calculateValidityHash(preprocessingJob.getCsvDirectory(), preprocessingJob.getTag());


final ObjectMapper om = Jackson.BINARY_MAPPER.copy();
try (final PreprocessedReader parser = new PreprocessedReader(new GZIPInputStream(new FileInputStream(preprocessingJob.getPreprocessedFile())), om)) {

PreprocessedHeader header = parser.readHeader();
final PreprocessedHeader header = parser.readHeader();

if (header.getValidityHash() == currentHash) {
log.info("\tHASH STILL VALID");
Expand Down Expand Up @@ -140,6 +141,11 @@ public void configure(Subparser subparser) {
.setDefault(true)
.help("Escalate missing files to errors.");

group.addArgument("--buckets")
.type(Integer.class)
.setDefault(100)
.help("Number of buckets to use for id-hashing. This value is required to be a constant per-dataset.");

thoniTUB marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand All @@ -150,41 +156,48 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig

// Tag if present is appended to input-file csvs, output-file cqpp and used as id of cqpps

isFailFast = Optional.ofNullable(namespace.getBoolean("fast-fail")).orElse(false);
isStrict = Optional.ofNullable(namespace.getBoolean("strict")).orElse(true);
isFailFast = namespace.getBoolean("fast-fail");
isStrict = namespace.getBoolean("strict");

final List<String> tags = namespace.<String>getList("tag");
final List<String> tags = namespace.getList("tag");

final File inDir = namespace.get("in");
final File outDir = namespace.get("out");
final List<File> descriptionFiles = namespace.<File>getList("desc");
final List<File> descriptionFilesRoot = namespace.getList("desc");
final int buckets = namespace.getInt("buckets");


log.info("Preprocessing from command line config.");

final Collection<PreprocessingJob> jobs = new ArrayList<>();
final Collection<PreprocessingJob> jobs = collectJobs(descriptionFilesRoot, tags, inDir, outDir, environment);

if (tags == null || tags.isEmpty()) {
for (File desc : descriptionFiles) {
final List<PreprocessingJob> descriptions =
findPreprocessingDescriptions(desc, inDir, outDir, Optional.empty(), environment.getValidator());
jobs.addAll(descriptions);
}
final List<PreprocessingJob> broken = validateJobs(jobs, environment);

jobs.removeIf(Predicate.not(PreprocessorCommand::requiresProcessing));

preprocessJobs(jobs, buckets, config);


log.info("Successfully Preprocess {} Jobs:", success.size());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.info("Successfully Preprocess {} Jobs:", success.size());
log.info("Successfully preprocessed {} jobs:", success.size());

success.forEach(desc -> log.info("\tSucceeded Preprocessing for {}", desc));

if (!broken.isEmpty()) {
log.warn("Did not find {} Files", broken.size());
broken.forEach(desc -> log.warn("\tDid not find file for {}", desc));
}
else {
for (String tag : tags) {
for (File desc : descriptionFiles) {
final List<PreprocessingJob> jobDescriptions =
findPreprocessingDescriptions(desc, inDir, outDir, Optional.of(tag), environment.getValidator());

jobs.addAll(jobDescriptions);
}
}
if (isFailed()) {
log.error("Failed {} Preprocessing Jobs:", failed.size());
failed.forEach(desc -> log.error("\tFailed Preprocessing for {}", desc));
doFail();
}
}

List<PreprocessingJob> broken = new ArrayList<>();
@NotNull
private List<PreprocessingJob> validateJobs(Collection<PreprocessingJob> jobs, Environment environment) {
final List<PreprocessingJob> broken = new ArrayList<>();

for (Iterator<PreprocessingJob> iterator = jobs.iterator(); iterator.hasNext(); ) {
for (final Iterator<PreprocessingJob> iterator = jobs.iterator(); iterator.hasNext(); ) {
final PreprocessingJob job = iterator.next();

try {
Expand Down Expand Up @@ -213,22 +226,48 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig
log.error("FAILED Preprocessing, files are missing or invalid.");
doFail();
}
return broken;
}

jobs.removeIf(Predicate.not(PreprocessorCommand::requiresProcessing));
@NotNull
private Collection<PreprocessingJob> collectJobs(List<File> descriptionFiles, List<String> tags, File inDir, File outDir, Environment environment)
throws IOException {
final Collection<PreprocessingJob> jobs = new ArrayList<>();

if (tags == null || tags.isEmpty()) {
for (File desc : descriptionFiles) {
final List<PreprocessingJob> descriptions =
findPreprocessingDescriptions(desc, inDir, outDir, Optional.empty(), environment.getValidator());
jobs.addAll(descriptions);
}
}
else {
for (String tag : tags) {
for (File desc : descriptionFiles) {
final List<PreprocessingJob> jobDescriptions =
findPreprocessingDescriptions(desc, inDir, outDir, Optional.of(tag), environment.getValidator());

jobs.addAll(jobDescriptions);
}
}
}
return jobs;
}

private void preprocessJobs(Collection<PreprocessingJob> jobs, int buckets, ConqueryConfig config) throws InterruptedException {
final long totalSize = jobs.stream()
.mapToLong(PreprocessingJob::estimateTotalCsvSizeBytes)
.sum();

log.info("Required to preprocess {} in total", BinaryByteUnit.format(totalSize));

ProgressBar totalProgress = new ProgressBar(totalSize, System.out);
final ProgressBar totalProgress = new ProgressBar(totalSize, System.out);

for (PreprocessingJob job : jobs) {
pool.submit(() -> {
ConqueryMDC.setLocation(job.toString());
try {
Preprocessor.preprocess(job, totalProgress, config);
Preprocessor.preprocess(job, totalProgress, config, buckets);
success.add(job.toString());
}
catch (FileNotFoundException e) {
Expand All @@ -246,23 +285,6 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig
pool.awaitTermination(24, TimeUnit.HOURS);

ConqueryMDC.clearLocation();


if (!success.isEmpty()) {
log.info("Successfully Preprocess {} Jobs:", success.size());
success.forEach(desc -> log.info("\tSucceeded Preprocessing for {}", desc));
}

if (!broken.isEmpty()) {
log.warn("Did not find {} Files", broken.size());
broken.forEach(desc -> log.warn("\tDid not find file for {}", desc));
}

if (isFailed()) {
log.error("Failed {} Preprocessing Jobs:", failed.size());
failed.forEach(desc -> log.error("\tFailed Preprocessing for {}", desc));
doFail();
}
}

private void addMissing(PreprocessingJob job) {
Expand All @@ -281,7 +303,7 @@ private void addFailed(PreprocessingJob job) {

public List<PreprocessingJob> findPreprocessingDescriptions(File descriptionFiles, File inDir, File outputDir, Optional<String> tag, Validator validator)
throws IOException {
List<PreprocessingJob> out = new ArrayList<>();
final List<PreprocessingJob> out = new ArrayList<>();

final File[] files = descriptionFiles.isFile()
? new File[]{descriptionFiles}
Expand All @@ -302,8 +324,7 @@ private boolean isFailed() {
return !failed.isEmpty();
}

private Optional<PreprocessingJob> tryExtractDescriptor(Validator validator, Optional<String> tag, File descriptionFile, File outputDir, File csvDir)
throws IOException {
private Optional<PreprocessingJob> tryExtractDescriptor(Validator validator, Optional<String> tag, File descriptionFile, File outputDir, File csvDir) {
try {
final TableImportDescriptor
descriptor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import java.util.Objects;
import java.util.OptionalInt;

import jakarta.validation.Validator;

import com.bakdata.conquery.io.storage.xodus.stores.CachedStore;
import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore;
import com.bakdata.conquery.models.config.StoreFactory;
Expand All @@ -19,6 +17,7 @@
import com.bakdata.conquery.models.worker.WorkerToBucketsMap;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import jakarta.validation.Validator;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand Down Expand Up @@ -121,12 +120,8 @@ public OptionalInt getEntityBucket(String entity) {
return OptionalInt.of(bucket);
}

public int assignEntityBucket(String entity, int bucketSize) {
final int bucket = (int) Math.ceil((1d + getNumberOfEntities()) / (double) bucketSize);

entity2Bucket.add(entity, bucket);

return bucket;
public void assignEntityBucket(String entity, int bucket) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void assignEntityBucket(String entity, int bucket) {
public void assignEntityToBucket(String entity, int bucket) {

entity2Bucket.update(entity, bucket);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public void add(KEY key, VALUE value) {

@Override
public VALUE get(KEY key) {
// TODO: 08.01.2020 fk: This assumes that all values have been read at some point!
return cache.get(key);
return cache.computeIfAbsent(key, store::get);
thoniTUB marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@ class ClusterImportHandler implements ImportHandler {
@SneakyThrows
@Override
public void updateImport(Namespace namespace, InputStream inputStream) {
ImportJob job = ImportJob.createOrUpdate(
final Table table = ImportJob.createOrUpdate(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final Table table = ImportJob.createOrUpdate(
final Table table = ImportJob.createAndQueue(

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ich fand es vorher besser. es war eher split-phase, den job erst zu erstellen und dann abzuschicken.

So sieht es erstmal komisch aus, dass die Funktion eine Table zurückgibt

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ja vorher war es besser, hat aber alles auf einmal im RAM gehalten, das will ich ja mit dem PR vermeiden. Aber der Name ist tatsächlich nicht mehr passend.

datasetRegistry.get(namespace.getDataset().getId()),
inputStream,
config.getCluster().getEntityBucketSize(),
config,
true
);

namespace.getJobManager().addSlowJob(job);

clearDependentConcepts(namespace.getStorage().getAllConcepts(), job.getTable());
clearDependentConcepts(namespace.getStorage().getAllConcepts(), table);
}

private void clearDependentConcepts(Collection<Concept<?>> allConcepts, Table table) {
Expand All @@ -59,22 +57,21 @@ private void clearDependentConcepts(Collection<Concept<?>> allConcepts, Table ta
@SneakyThrows
@Override
public void addImport(Namespace namespace, InputStream inputStream) {
ImportJob job = ImportJob.createOrUpdate(
final Table table = ImportJob.createOrUpdate(
datasetRegistry.get(namespace.getDataset().getId()),
inputStream,
config.getCluster().getEntityBucketSize(),
config,
false
);
namespace.getJobManager().addSlowJob(job);

clearDependentConcepts(namespace.getStorage().getAllConcepts(), job.getTable());
clearDependentConcepts(namespace.getStorage().getAllConcepts(), table);
}

@Override
public void deleteImport(Import imp) {

DatasetId id = imp.getTable().getDataset().getId();
final DatasetId id = imp.getTable().getDataset().getId();
final DistributedNamespace namespace = datasetRegistry.get(id);

clearDependentConcepts(namespace.getStorage().getAllConcepts(), imp.getTable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,15 @@
@FieldNameConstants
@Getter
@Setter
@ToString(of = {"numberOfEvents", "stores"}, callSuper = true)
@ToString(onlyExplicitlyIncluded = true, callSuper = true)
@AllArgsConstructor
@RequiredArgsConstructor(onConstructor_ = {@JsonCreator}, access = AccessLevel.PROTECTED)
public class Bucket extends IdentifiableImpl<BucketId> implements NamespacedIdentifiable<BucketId> {

@Min(0)
private final int bucket;

@Min(0)
private final int numberOfEvents;
@ToString.Include
@JsonManagedReference
@Setter(AccessLevel.PROTECTED)
private ColumnStore[] stores;
Expand All @@ -78,6 +77,12 @@ public class Bucket extends IdentifiableImpl<BucketId> implements NamespacedIden
private final Import imp;


@JsonIgnore
@ToString.Include
public int getNumberOfEvents(){
return ends.values().intStream().max().orElse(0);
thoniTUB marked this conversation as resolved.
Show resolved Hide resolved
}

@JsonIgnore
@ValidationMethod(message = "Number of events does not match the length of some stores.")
public boolean isNumberOfEventsEqualsNumberOfStores() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class EmptyBucket extends Bucket {
private static final EmptyBucket Instance = new EmptyBucket();

public EmptyBucket() {
super(0, 0, Object2IntMaps.emptyMap(), Object2IntMaps.emptyMap(), null);
super(0, Object2IntMaps.emptyMap(), Object2IntMaps.emptyMap(), null);
this.setStores(new ColumnStore[0]);
}

Expand Down

This file was deleted.

Loading
Loading