Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redesigned preprocessing #215

Merged
merged 80 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
57e11ee
first test
Taepper Aug 29, 2023
b09a631
tmp
Taepper Sep 4, 2023
8c354f6
tmp2
Taepper Sep 5, 2023
9e20e74
wip
Taepper Sep 8, 2023
56da99c
WIP
Taepper Sep 11, 2023
23e676b
WIP
Taepper Sep 12, 2023
9fbe59c
WIP
Taepper Sep 14, 2023
7aa27da
draft for ndjson preprocessing, that should also be ready for further…
Taepper Sep 18, 2023
6fb0f87
fix
Taepper Sep 18, 2023
04625d6
fix concurrency issue
Taepper Sep 18, 2023
b62675c
revert extensive logging
Taepper Sep 18, 2023
0b775b8
integration fixes
Taepper Sep 18, 2023
1e58a52
fix memory leaks
Taepper Sep 19, 2023
9d00745
refactoring
Taepper Sep 19, 2023
1f49096
supply primary_key
Taepper Sep 19, 2023
43960cf
polishing
Taepper Sep 19, 2023
3472280
filename robustness
Taepper Sep 20, 2023
9c4688f
Reworking preprocessing without csv reader
Taepper Sep 22, 2023
e9899f5
wip
Taepper Oct 2, 2023
c4ea65b
WIP
Taepper Oct 18, 2023
a2cd1ec
Running end to end
Taepper Oct 23, 2023
86aca65
Running end to end and few tests
Taepper Oct 23, 2023
9ee8e28
Missing data types
Taepper Oct 23, 2023
7def798
Apply partitioning to sequence tables
Taepper Oct 23, 2023
3d43f1d
Update Dockerfile
Taepper Oct 23, 2023
93dee6c
Alpine version less specific
Taepper Oct 23, 2023
4e244e7
Remove pango partitioning logic
Taepper Oct 23, 2023
707832c
Adhoc fix of zstd table reader test
Taepper Oct 23, 2023
fab6b7f
update test to reflext new preprocessing_config
Taepper Oct 23, 2023
a7193c3
correctly deal with all Null values when building from duckdb
Taepper Oct 23, 2023
b130082
disable database test because of missing backwards compatibility
Taepper Oct 23, 2023
90169fc
Disable metadata validator until backwards compatibility reestablished
Taepper Oct 23, 2023
2c2354e
fix: specifying apk versions
fengelniederhammer Oct 23, 2023
d8f9614
wip
Taepper Oct 23, 2023
bce7fe2
fix includes
Taepper Oct 23, 2023
279bcba
various logging and error handling
Taepper Oct 23, 2023
9864d9f
various error handling and do not die when duckdb inferred SQL values…
Taepper Oct 23, 2023
43d33a3
More error handling and logging
Taepper Oct 23, 2023
36617bb
support insertions
Taepper Oct 23, 2023
66f2a65
statically linked duckdb
Taepper Oct 23, 2023
e43faca
add ordering of tables by parameter predicate
Taepper Oct 24, 2023
dedaccb
fix bug of not resetting row when fetching new chunk
Taepper Oct 24, 2023
b244d8a
fix duckdb sort order, sort sequences and make one test deterministic
Taepper Oct 24, 2023
f93b1e3
reintroduce unit tests, make 2 more tests deterministic and a bugfix …
Taepper Oct 24, 2023
13c8ab6
update endToEnd info test numbers
Taepper Oct 24, 2023
44fc0ea
improved error messages
Taepper Oct 24, 2023
ebb67f3
catch all block for logging around preprocessing
Taepper Oct 24, 2023
3bcad8b
introduce limit of 10000 for FastaAligned action
Taepper Oct 24, 2023
724dea0
Display error when loading data
Taepper Oct 24, 2023
ea2bd09
Additional check for float column in FloatBetween
Taepper Oct 24, 2023
7cb7738
Save preprocessing duckdb in output directory
Taepper Oct 24, 2023
d83b03f
More trace logging in detailed db info
Taepper Oct 24, 2023
3d3944c
exit > 0 when an error happens in preprocessing
fengelniederhammer Nov 3, 2023
6b7c9e9
refactor: preprocessing into own directory while encapsulating logic …
Taepper Nov 20, 2023
4782ca7
CI: update versions
Taepper Nov 20, 2023
7772ae3
feat: add more tests, make less flaky and viable with large dataset
Taepper Nov 24, 2023
5e3f7bd
Alphabetical dependency order
Taepper Dec 4, 2023
b256bcf
cleaning up unused files and some code edits
Taepper Dec 4, 2023
fc9a063
Better error return codes
Taepper Dec 6, 2023
60c80c2
Remove refactored code
Taepper Dec 6, 2023
67c9ac5
Remove catch blocks with identical behavior
Taepper Dec 6, 2023
a33ac0d
Remove using directives
Taepper Dec 6, 2023
f72dfd5
Code refactoring
Taepper Dec 6, 2023
fa99325
Remove unused functions
Taepper Dec 6, 2023
a677a29
Split up buildPartitioningTable functino
Taepper Dec 6, 2023
cbfbc2c
More concise typing
Taepper Dec 6, 2023
a1fbba8
Remove outdated TODO item
Taepper Dec 6, 2023
1ef4a76
Fail earlier with malformed database config
Taepper Dec 6, 2023
4926906
Refactor preprocessing
Taepper Dec 6, 2023
2867c4f
Code edits
Taepper Dec 11, 2023
61bfa07
Remove obsolete partitioning, preprocessing_config and MetadataReader…
Taepper Dec 12, 2023
5ad66ec
More centralized db logging and error checking, clearer control-flow …
Taepper Dec 13, 2023
5393742
Add ndjson dataset with identical data for endToEndTests
Taepper Dec 13, 2023
274e14d
Split up database validation step after build
Taepper Dec 13, 2023
15b8f17
Extra imports
Taepper Dec 13, 2023
2fea9b2
MetadataInfo proper constructor
Taepper Dec 15, 2023
7f57c4c
Code edits
Taepper Dec 15, 2023
37a1294
Minor polishing edits
Taepper Dec 15, 2023
fabf265
ci: also execute e2e tests with NDJSON as preprocessing input
fengelniederhammer Dec 18, 2023
619035b
Last edits
Taepper Dec 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion include/silo/preprocessing/preprocessing_database.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ namespace preprocessing {
class Partitions;

class PreprocessingDatabase {
public:
static constexpr std::string_view COMPRESS_NUC = "compressNuc";
static constexpr std::string_view COMPRESS_AA = "compressAA";

private:
duckdb::DuckDB duck_db;
duckdb::Connection connection;

Expand All @@ -24,7 +29,7 @@ class PreprocessingDatabase {

Partitions getPartitionDescriptor();

void registerSequences(const silo::ReferenceGenomes& reference_genomes);
static void registerSequences(const silo::ReferenceGenomes& reference_genomes);
Taepper marked this conversation as resolved.
Show resolved Hide resolved

std::unique_ptr<duckdb::MaterializedQueryResult> query(std::string sql_query);

Expand All @@ -35,5 +40,11 @@ class PreprocessingDatabase {
);
};

std::vector<std::string> extractStringListValue(
duckdb::MaterializedQueryResult& result,
size_t row,
size_t column
);

} // namespace preprocessing
} // namespace silo
32 changes: 8 additions & 24 deletions src/silo/preprocessing/metadata_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,13 @@
#include <duckdb.hpp>

#include "silo/config/database_config.h"
#include "silo/preprocessing/preprocessing_database.h"
#include "silo/preprocessing/preprocessing_exception.h"

namespace {

std::vector<std::string> extractStringListValue(
duckdb::MaterializedQueryResult& result,
size_t row,
size_t column
) {
std::vector<std::string> return_value;
const duckdb::Value tmp_value = result.GetValue(column, row);
std::vector<duckdb::Value> child_values = duckdb::ListValue::GetChildren(tmp_value);
std::transform(
child_values.begin(),
child_values.end(),
std::back_inserter(return_value),
[](const duckdb::Value& value) { return value.GetValue<std::string>(); }
);
return return_value;
}

std::unordered_map<std::string, std::string> validateFieldsAgainstConfig(
const std::unordered_map<std::string, std::string> found_metadata_fields,
const std::unordered_map<std::string, std::string>& found_metadata_fields,
const silo::config::DatabaseConfig& database_config
) {
std::vector<std::string> config_metadata_fields;
Expand All @@ -40,10 +24,10 @@ std::unordered_map<std::string, std::string> validateFieldsAgainstConfig(
);

std::unordered_map<std::string, std::string> validated_metadata_fields;
for (const auto& pair : found_metadata_fields) {
if (std::find(config_metadata_fields.begin(), config_metadata_fields.end(), pair.first)
for (const auto& [field_name, access_path] : found_metadata_fields) {
if (std::find(config_metadata_fields.begin(), config_metadata_fields.end(), field_name)
!= config_metadata_fields.end()) {
validated_metadata_fields.emplace(pair);
validated_metadata_fields.emplace(field_name, access_path);
}
}
for (const std::string& name : config_metadata_fields) {
Expand All @@ -57,8 +41,8 @@ std::unordered_map<std::string, std::string> validateFieldsAgainstConfig(
}

std::string metadata_field_string;
for (const auto& [field, select] : validated_metadata_fields) {
metadata_field_string += "'" + field + "' with selection '" + select + "',";
for (const auto& [field_name, select] : validated_metadata_fields) {
metadata_field_string += "'" + field_name + "' with selection '" + select + "',";
}
SPDLOG_TRACE("Found metadata fields: " + metadata_field_string);
return validated_metadata_fields;
Expand Down Expand Up @@ -167,7 +151,7 @@ MetadataInfo MetadataInfo::validateFromNdjsonFile(
}

std::unordered_map<std::string, std::string> metadata_fields_to_validate;
for (const std::string& metadata_field : extractStringListValue(*result, 0, 0)) {
for (const std::string& metadata_field : preprocessing::extractStringListValue(*result, 0, 0)) {
metadata_fields_to_validate[metadata_field] = "metadata." + metadata_field;
}
detectInsertionLists(ndjson_file, metadata_fields_to_validate);
Expand Down
27 changes: 22 additions & 5 deletions src/silo/preprocessing/preprocessing_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ PreprocessingDatabase::PreprocessingDatabase(const std::string& backing_file)
}

connection.CreateVectorizedFunction(
"compressNuc",
std::string(COMPRESS_NUC),
{duckdb::LogicalType::VARCHAR, duckdb::LogicalType::VARCHAR},
duckdb::LogicalType::BLOB,
Compressors::compressNuc
);

connection.CreateVectorizedFunction(
"compressAA",
std::string(COMPRESS_AA),
{duckdb::LogicalType::VARCHAR, duckdb::LogicalType::VARCHAR},
duckdb::LogicalType::BLOB,
Compressors::compressAA
Expand Down Expand Up @@ -161,7 +161,7 @@ preprocessing::Partitions PreprocessingDatabase::getPartitionDescriptor() {
it != partition_descriptor_from_sql->end();
++it) {
const duckdb::Value db_partition_id = it.current_row.GetValue<duckdb::Value>(0);
int64_t partition_id_int = duckdb::BigIntValue::Get(db_partition_id);
const int64_t partition_id_int = duckdb::BigIntValue::Get(db_partition_id);
if (partition_id_int != check_partition_id_sorted_and_contiguous) {
throw PreprocessingException(
"The partition IDs produced by the preprocessing are not sorted, not starting from 0 "
Expand All @@ -172,7 +172,7 @@ preprocessing::Partitions PreprocessingDatabase::getPartitionDescriptor() {
uint32_t partition_id = partition_id_int;

const duckdb::Value db_partition_size = it.current_row.GetValue<duckdb::Value>(1);
int64_t partition_size_bigint = duckdb::BigIntValue::Get(db_partition_size);
const int64_t partition_size_bigint = duckdb::BigIntValue::Get(db_partition_size);
if (partition_size_bigint <= 0) {
throw PreprocessingException("Non-positive partition size encountered.");
}
Expand All @@ -182,7 +182,7 @@ preprocessing::Partitions PreprocessingDatabase::getPartitionDescriptor() {
);
}

uint32_t partition_size = static_cast<uint32_t>(partition_size_bigint);
const auto partition_size = static_cast<uint32_t>(partition_size_bigint);

partitions.emplace_back(std::vector<preprocessing::PartitionChunk>{
{partition_id, 0, partition_size, 0}});
Expand All @@ -200,4 +200,21 @@ void PreprocessingDatabase::generateNucSequenceTable(
ZstdFastaTable::generate(connection, table_name, fasta_reader, reference_sequence);
}

std::vector<std::string> extractStringListValue(
duckdb::MaterializedQueryResult& result,
size_t row,
size_t column
) {
std::vector<std::string> return_value;
const duckdb::Value tmp_value = result.GetValue(column, row);
std::vector<duckdb::Value> child_values = duckdb::ListValue::GetChildren(tmp_value);
std::transform(
child_values.begin(),
child_values.end(),
std::back_inserter(return_value),
[](const duckdb::Value& value) { return value.GetValue<std::string>(); }
);
return return_value;
}

} // namespace silo::preprocessing
29 changes: 6 additions & 23 deletions src/silo/preprocessing/sequence_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,6 @@
#include "silo/preprocessing/preprocessing_exception.h"
#include "silo/storage/reference_genomes.h"

namespace {
std::vector<std::string> extractStringListValue(
duckdb::MaterializedQueryResult& result,
size_t row,
size_t column
) {
std::vector<std::string> return_value;
const duckdb::Value tmp_value = result.GetValue(column, row);
std::vector<duckdb::Value> child_values = duckdb::ListValue::GetChildren(tmp_value);
std::transform(
child_values.begin(),
child_values.end(),
std::back_inserter(return_value),
[](const duckdb::Value& value) { return value.GetValue<std::string>(); }
);
return return_value;
}
} // namespace

namespace silo::preprocessing {

SequenceInfo::SequenceInfo(const silo::ReferenceGenomes& reference_genomes) {
Expand All @@ -49,15 +30,17 @@ std::vector<std::string> SequenceInfo::getSequenceSelects() {
std::vector<std::string> sequence_selects;
for (const std::string& name : nuc_sequence_names) {
sequence_selects.emplace_back(fmt::format(
Taepper marked this conversation as resolved.
Show resolved Hide resolved
"compressNuc(alignedNucleotideSequences.{0}, "
"'{0}') as nuc_{0}",
"{0}(alignedNucleotideSequences.{1}, "
"'{1}') as nuc_{1}",
preprocessing::PreprocessingDatabase::COMPRESS_NUC,
name
));
}
for (const std::string& name : aa_sequence_names) {
sequence_selects.emplace_back(fmt::format(
"compressAA(alignedAminoAcidSequences.{0}, "
"'{0}') as gene_{0}",
"{0}(alignedAminoAcidSequences.{1}, "
"'{1}') as gene_{1}",
preprocessing::PreprocessingDatabase::COMPRESS_AA,
name
));
}
Expand Down