From c1cdfeb3cb25e8bcbd5e095e5dbc0f78ae35492f Mon Sep 17 00:00:00 2001 From: Alexander Taepper Date: Thu, 1 Feb 2024 20:17:15 +0100 Subject: [PATCH] feat: build metadata in parallel to sequences. Do not create unaligned sequence tables in preprocessing, rather hive-partition them directly to disk. Better (debug-)logging --- .../preprocessing/preprocessing_database.h | 6 +- include/silo/preprocessing/preprocessor.h | 50 +- include/silo/preprocessing/sequence_info.h | 8 +- include/silo/storage/database_partition.h | 3 - .../silo/storage/unaligned_sequence_store.h | 19 +- .../silo/zstdfasta/zstdfasta_table_reader.h | 2 + src/silo/config/database_config.cpp | 2 +- src/silo/database.cpp | 19 +- src/silo/preprocessing/metadata_info.cpp | 2 +- .../preprocessing/preprocessing_database.cpp | 14 +- src/silo/preprocessing/preprocessor.cpp | 548 +++++++++++------- src/silo/preprocessing/preprocessor.test.cpp | 2 +- src/silo/preprocessing/sequence_info.cpp | 61 +- src/silo/query_engine/actions/fasta.cpp | 16 +- src/silo/storage/reference_genomes.test.cpp | 14 +- src/silo/storage/unaligned_sequence_store.cpp | 27 +- src/silo/zstdfasta/zstdfasta_table_reader.cpp | 13 + src/silo_api/api.cpp | 16 +- 18 files changed, 512 insertions(+), 310 deletions(-) diff --git a/include/silo/preprocessing/preprocessing_database.h b/include/silo/preprocessing/preprocessing_database.h index 2968b7d22..377a87a99 100644 --- a/include/silo/preprocessing/preprocessing_database.h +++ b/include/silo/preprocessing/preprocessing_database.h @@ -1,6 +1,8 @@ #pragma once +#include #include +#include #include #include @@ -24,10 +26,12 @@ class PreprocessingDatabase { duckdb::Connection connection; public: - PreprocessingDatabase(const std::string& backing_file); + PreprocessingDatabase(const std::optional& backing_file); duckdb::Connection& getConnection(); + void refreshConnection(); + Partitions getPartitionDescriptor(); static void registerSequences(const silo::ReferenceGenomes& reference_genomes); diff --git a/include/silo/preprocessing/preprocessor.h b/include/silo/preprocessing/preprocessor.h index 685914224..9e7dc3860 100644 --- a/include/silo/preprocessing/preprocessor.h +++ b/include/silo/preprocessing/preprocessor.h @@ -10,6 +10,8 @@ class PangoLineageAliasLookup; namespace preprocessing { +class SequenceInfo; + class Preprocessor { PreprocessingConfig preprocessing_config; config::DatabaseConfig database_config; @@ -24,18 +26,36 @@ class Preprocessor { Database preprocess(); private: - void buildTablesFromNdjsonInput( - const std::filesystem::path& file_name, - const ReferenceGenomes& reference_genomes - ); + void buildTablesFromNdjsonInput(const std::filesystem::path& file_name); void buildMetadataTableFromFile(const std::filesystem::path& metadata_filename); void buildPartitioningTable(); void buildPartitioningTableByColumn(const std::string& partition_by_field); void buildEmptyPartitioning(); - void createSequenceViews(const ReferenceGenomes& reference_genomes); - void createPartitionedSequenceTables(const ReferenceGenomes& reference_genomes); + void createPartitionedSequenceTablesFromNdjson( + const std::filesystem::path& file_name, + const ReferenceGenomes& reference_genomes + ); + void createAlignedPartitionedSequenceViews( + const std::filesystem::path& file_name, + const ReferenceGenomes& reference_genomes, + const SequenceInfo& sequence_info, + const std::string& partition_by_select, + const std::string& partition_by_where + ); + void createUnalignedPartitionedSequenceFiles( + const std::filesystem::path& file_name, + const ReferenceGenomes& reference_genomes, + const std::string& partition_by_select, + const std::string& partition_by_where + ); + void createUnalignedPartitionedSequenceFile( + const std::string& seq_name, + const std::string& table_sql + ); + + void createPartitionedSequenceTablesFromSequenceFiles(const ReferenceGenomes& reference_genomes); void createPartitionedTableForSequence( const std::string& sequence_name, const std::string& reference_sequence, @@ -50,6 +70,24 @@ class Preprocessor { const silo::PangoLineageAliasLookup& alias_key, const std::filesystem::path& intermediate_results_directory ); + + void buildMetadataStore( + Database& database, + const preprocessing::Partitions& partition_descriptor, + const std::string& order_by_clause + ); + void buildNucleotideSequenceStore( + Database& database, + const preprocessing::Partitions& partition_descriptor, + const ReferenceGenomes& reference_genomes, + const std::string& order_by_clause + ); + void buildAminoAcidSequenceStore( + Database& database, + const preprocessing::Partitions& partition_descriptor, + const ReferenceGenomes& reference_genomes, + const std::string& order_by_clause + ); }; } // namespace preprocessing } // namespace silo diff --git a/include/silo/preprocessing/sequence_info.h b/include/silo/preprocessing/sequence_info.h index 46f21962a..613dd7fe7 100644 --- a/include/silo/preprocessing/sequence_info.h +++ b/include/silo/preprocessing/sequence_info.h @@ -25,7 +25,13 @@ class SequenceInfo { public: SequenceInfo(const silo::ReferenceGenomes& reference_genomes); - std::vector getSequenceSelects(); + std::vector getAlignedSequenceSelects() const; + + static std::string getNucleotideSequenceSelect(const std::string& seq_name); + + static std::string getUnalignedSequenceSelect(const std::string& seq_name); + + static std::string getAminoAcidSequenceSelect(const std::string& seq_name); void validate(duckdb::Connection& connection, const std::filesystem::path& input_filename) const; }; diff --git a/include/silo/storage/database_partition.h b/include/silo/storage/database_partition.h index 1ba22fff3..1a01264f0 100644 --- a/include/silo/storage/database_partition.h +++ b/include/silo/storage/database_partition.h @@ -59,9 +59,6 @@ class DatabasePartition { for(auto& [name, store] : aa_sequences){ archive & store; } - for(auto& [name, store] : unaligned_nuc_sequences){ - archive & store; - } archive & sequence_count; // clang-format on } diff --git a/include/silo/storage/unaligned_sequence_store.h b/include/silo/storage/unaligned_sequence_store.h index c38cdcc93..19c046574 100644 --- a/include/silo/storage/unaligned_sequence_store.h +++ b/include/silo/storage/unaligned_sequence_store.h @@ -15,22 +15,17 @@ class ZstdFastaTableReader; class UnalignedSequenceStorePartition { friend class boost::serialization::access; - template - void serialize(Archive& archive, [[maybe_unused]] const uint32_t version) { - archive & sequence_count; - } + std::string sql_for_reading_file; public: - std::filesystem::path file_name; - std::string& compression_dictionary; - uint32_t sequence_count = 0; + const std::string& compression_dictionary; explicit UnalignedSequenceStorePartition( - std::filesystem::path file_name, - std::string& compression_dictionary + std::string sql_for_reading_file, + const std::string& compression_dictionary ); - size_t fill(silo::ZstdFastaTableReader& input); + std::string getReadSQL() const; }; class UnalignedSequenceStore { @@ -39,6 +34,10 @@ class UnalignedSequenceStore { std::filesystem::path folder_path; std::string compression_dictionary; + private: + std::filesystem::path partitionFilename(size_t partition_id) const; + + public: void saveFolder(const std::filesystem::path& save_location) const; explicit UnalignedSequenceStore( diff --git a/include/silo/zstdfasta/zstdfasta_table_reader.h b/include/silo/zstdfasta/zstdfasta_table_reader.h index 74f7ca9b2..227a590a1 100644 --- a/include/silo/zstdfasta/zstdfasta_table_reader.h +++ b/include/silo/zstdfasta/zstdfasta_table_reader.h @@ -56,6 +56,8 @@ class ZstdFastaTableReader { void copyTableTo(std::string_view file_name); + void copyTableToPartitioned(std::string_view file_name, std::string_view partition_key); + size_t lineCount(); }; } // namespace silo diff --git a/src/silo/config/database_config.cpp b/src/silo/config/database_config.cpp index 75df24c9c..9ca0ce73e 100644 --- a/src/silo/config/database_config.cpp +++ b/src/silo/config/database_config.cpp @@ -211,7 +211,7 @@ std::optional DatabaseConfig::getMetadata(const std::string& n void DatabaseConfig::writeConfig(const std::filesystem::path& config_path) const { const YAML::Node node = YAML::convert::encode(*this); - SPDLOG_INFO("Writing database config to {}", config_path.string()); + SPDLOG_DEBUG("Writing database config to {}", config_path.string()); std::ofstream out_file(config_path); out_file << YAML::Dump(node); } diff --git a/src/silo/database.cpp b/src/silo/database.cpp index ccc827441..f92a7ede0 100644 --- a/src/silo/database.cpp +++ b/src/silo/database.cpp @@ -375,6 +375,7 @@ void Database::saveDatabaseState(const std::filesystem::path& save_directory) { const std::filesystem::path versioned_save_directory = save_directory / getDataVersion().toString(); + SPDLOG_INFO("Saving database to '{}'", versioned_save_directory.string()); if (std::filesystem::exists(versioned_save_directory)) { auto error = fmt::format( @@ -388,6 +389,8 @@ void Database::saveDatabaseState(const std::filesystem::path& save_directory) { std::filesystem::create_directory(versioned_save_directory); + SPDLOG_INFO("Saving database config and schema"); + const std::filesystem::path database_config_filename = versioned_save_directory / "database_config.yaml"; database_config.writeConfig(database_config_filename); @@ -406,6 +409,8 @@ void Database::saveDatabaseState(const std::filesystem::path& save_directory) { ::boost::archive::binary_oarchive column_archive(column_file); column_archive << columns; + SPDLOG_INFO("Saving database sequence schema"); + auto nuc_sequences_map = getNucSequences(); std::ofstream nuc_sequences_file = openOutputFileOrThrow(versioned_save_directory / "nuc_sequences.silo"); @@ -418,8 +423,15 @@ void Database::saveDatabaseState(const std::filesystem::path& save_directory) { ::boost::archive::binary_oarchive aa_sequences_archive(aa_sequences_file); aa_sequences_archive << aa_sequences_map; + SPDLOG_INFO("Saving unaligned sequence data"); + for (auto& [name, store] : unaligned_nuc_sequences) { - store.saveFolder(versioned_save_directory / name); + const std::filesystem::path unaligned_sequence_directory = + versioned_save_directory / ("unaligned_nuc_" + name); + SPDLOG_DEBUG( + "Saving unaligned sequence {} to folder '{}'", name, unaligned_sequence_directory.string() + ); + store.saveFolder(unaligned_sequence_directory); } std::vector partition_archives; @@ -639,8 +651,9 @@ void Database::initializeNucSequences( } SPDLOG_TRACE("initializing unaligned nucleotide sequences"); for (const auto& [nuc_name, reference_sequence] : reference_sequences) { - const std::filesystem::path sequence_directory = intermediate_results_directory / nuc_name; - create_directory(sequence_directory); + const std::filesystem::path sequence_directory = + intermediate_results_directory / ("unaligned_nuc_" + nuc_name); + std::filesystem::create_directory(sequence_directory); if (!std::filesystem::is_directory(sequence_directory)) { SPDLOG_TRACE( "Sequence directory for unaligned sequences {} could not be created.", diff --git a/src/silo/preprocessing/metadata_info.cpp b/src/silo/preprocessing/metadata_info.cpp index 40d74fa8e..d2e980404 100644 --- a/src/silo/preprocessing/metadata_info.cpp +++ b/src/silo/preprocessing/metadata_info.cpp @@ -176,7 +176,7 @@ std::vector MetadataInfo::getMetadataSelects() const { std::vector ret; ret.reserve(metadata_selects.size()); for (const auto& [field, select] : metadata_selects) { - ret.push_back(fmt::format(R"({} as "{}")", select, field)); + ret.push_back(fmt::format(R"({} AS "{}")", select, field)); } return ret; } diff --git a/src/silo/preprocessing/preprocessing_database.cpp b/src/silo/preprocessing/preprocessing_database.cpp index b432c3953..e621c986e 100644 --- a/src/silo/preprocessing/preprocessing_database.cpp +++ b/src/silo/preprocessing/preprocessing_database.cpp @@ -20,7 +20,6 @@ using duckdb::BigIntValue; using duckdb::BinaryExecutor; -using duckdb::Connection; using duckdb::DataChunk; using duckdb::ExpressionState; using duckdb::ListValue; @@ -100,11 +99,14 @@ std::unordered_map& backing_file +) + : duck_db(backing_file.value_or(":memory:")), connection(duck_db) { query("PRAGMA default_null_order='NULLS FIRST';"); query("SET preserve_insertion_order=FALSE;"); + query("SET memory_limit='50 GB';"); connection.CreateVectorizedFunction( std::string(COMPRESS_NUC), @@ -135,10 +137,14 @@ void PreprocessingDatabase::registerSequences(const silo::ReferenceGenomes& refe Compressors::initialize(reference_genomes); } -Connection& PreprocessingDatabase::getConnection() { +duckdb::Connection& PreprocessingDatabase::getConnection() { return connection; } +void PreprocessingDatabase::refreshConnection() { + connection = duckdb::Connection{duck_db}; +} + preprocessing::Partitions PreprocessingDatabase::getPartitionDescriptor() { auto partition_descriptor_from_sql = connection.Query("SELECT partition_id, count FROM partitioning ORDER BY partition_id"); diff --git a/src/silo/preprocessing/preprocessor.cpp b/src/silo/preprocessing/preprocessor.cpp index 811c1289b..1b9787d9f 100644 --- a/src/silo/preprocessing/preprocessor.cpp +++ b/src/silo/preprocessing/preprocessor.cpp @@ -23,14 +23,12 @@ namespace silo::preprocessing { Preprocessor::Preprocessor( - preprocessing::PreprocessingConfig preprocessing_config, - config::DatabaseConfig database_config + preprocessing::PreprocessingConfig preprocessing_config_, + config::DatabaseConfig database_config_ ) - : preprocessing_config(std::move(preprocessing_config)), - database_config(std::move(database_config)), - preprocessing_db( - this->preprocessing_config.getPreprocessingDatabaseLocation().value_or(":memory:") - ) {} + : preprocessing_config(std::move(preprocessing_config_)), + database_config(std::move(database_config_)), + preprocessing_db(preprocessing_config.getPreprocessingDatabaseLocation()) {} Database Preprocessor::preprocess() { SPDLOG_INFO("preprocessing - reading reference genome"); @@ -52,11 +50,11 @@ Database Preprocessor::preprocess() { "preprocessing - building preprocessing tables from ndjson input '{}'", ndjson_input_filename.value().string() ); - buildTablesFromNdjsonInput(ndjson_input_filename.value(), reference_genomes); + buildTablesFromNdjsonInput(ndjson_input_filename.value()); SPDLOG_DEBUG("preprocessing - building partitioning tables"); buildPartitioningTable(); SPDLOG_DEBUG("preprocessing - creating compressed sequence views for building SILO"); - createSequenceViews(reference_genomes); + createPartitionedSequenceTablesFromNdjson(ndjson_input_filename.value(), reference_genomes); } else { SPDLOG_INFO("preprocessing - classic metadata file pipeline chosen"); SPDLOG_DEBUG( @@ -67,7 +65,7 @@ Database Preprocessor::preprocess() { SPDLOG_DEBUG("preprocessing - building partitioning tables"); buildPartitioningTable(); SPDLOG_DEBUG("preprocessing - creating partitioned sequence tables for building SILO"); - createPartitionedSequenceTables(reference_genomes); + createPartitionedSequenceTablesFromSequenceFiles(reference_genomes); } SPDLOG_INFO("preprocessing - finished initial loading of data"); @@ -78,6 +76,7 @@ Database Preprocessor::preprocess() { SPDLOG_INFO("preprocessing - building database"); + preprocessing_db.refreshConnection(); return buildDatabase( partition_descriptor, reference_genomes, @@ -87,10 +86,7 @@ Database Preprocessor::preprocess() { ); } -void Preprocessor::buildTablesFromNdjsonInput( - const std::filesystem::path& file_name, - const ReferenceGenomes& reference_genomes -) { +void Preprocessor::buildTablesFromNdjsonInput(const std::filesystem::path& file_name) { if (!std::filesystem::exists(file_name)) { throw silo::preprocessing::PreprocessingException( fmt::format("The specified input file {} does not exist.", file_name.string()) @@ -102,33 +98,38 @@ void Preprocessor::buildTablesFromNdjsonInput( ); } - SequenceInfo sequence_info(reference_genomes); - sequence_info.validate(preprocessing_db.getConnection(), file_name); - + SPDLOG_DEBUG("build - validating metadata file '{}' with config", file_name.string()); const auto metadata_info = MetadataInfo::validateFromNdjsonFile(file_name, database_config); - PreprocessingDatabase::registerSequences(reference_genomes); - (void)preprocessing_db.query(fmt::format( R"-( - CREATE OR REPLACE TABLE preprocessing_table AS SELECT {}, {} - FROM '{}' - WHERE metadata.{} is not null; + CREATE OR REPLACE TABLE metadata_table AS + SELECT {} + FROM '{}'; )-", boost::join(metadata_info.getMetadataSelects(), ","), - boost::join(sequence_info.getSequenceSelects(), ","), - file_name.string(), - database_config.schema.primary_key + file_name.string() )); - (void)preprocessing_db.query(fmt::format( + auto null_primary_key_result = preprocessing_db.query(fmt::format( R"-( - create or replace view metadata_table as - select {} - from preprocessing_table; + SELECT {0} FROM metadata_table + WHERE {0} IS NULL; )-", - boost::join(metadata_info.getMetadataFields(), ",") + database_config.schema.primary_key )); + if (null_primary_key_result->RowCount() > 0) { + const std::string error_message = fmt::format( + "Error, there are {} primary keys that are NULL", + null_primary_key_result->RowCount(), + file_name.string() + ); + SPDLOG_ERROR(error_message); + if (null_primary_key_result->RowCount() <= 10) { + SPDLOG_ERROR(null_primary_key_result->ToString()); + } + throw silo::preprocessing::PreprocessingException(error_message); + } } void Preprocessor::buildMetadataTableFromFile(const std::filesystem::path& metadata_filename) { @@ -136,9 +137,9 @@ void Preprocessor::buildMetadataTableFromFile(const std::filesystem::path& metad MetadataInfo::validateFromMetadataFile(metadata_filename, database_config); (void)preprocessing_db.query(fmt::format( - "create or replace table metadata_table as\n" - "select {}\n" - "from '{}';", + "CREATE OR REPLACE TABLE metadata_table AS\n" + "SELECT {}\n" + "FROM '{}';", boost::join(metadata_info.getMetadataSelects(), ","), metadata_filename.string() )); @@ -162,10 +163,9 @@ void Preprocessor::buildPartitioningTableByColumn(const std::string& partition_b (void)preprocessing_db.query(fmt::format( R"-( -create -or replace table partition_keys as -select row_number() over () - 1 as id, partition_key, count -from (SELECT {} as partition_key, COUNT(*) as count +CREATE OR REPLACE TABLE partition_keys AS +SELECT row_number() OVER () - 1 AS id, partition_key, count +FROM (SELECT {} AS partition_key, COUNT(*) AS count FROM metadata_table GROUP BY partition_key ORDER BY partition_key); @@ -176,53 +176,51 @@ from (SELECT {} as partition_key, COUNT(*) as count // create Recursive Hierarchical Partitioning By Partition Field (void)preprocessing_db.query( R"-( -create or replace table partitioning as -with recursive - allowed_count(allowed_count) as (select sum(count) / 32 from partition_keys), - grouped_partition_keys(from_id, to_id, count) as - (select id, id, count - from partition_keys - where id = 0 - union all - select case when l1.count <= allowed_count then l1.from_id else l2.id end, +CREATE OR REPLACE TABLE partitioning AS +WITH RECURSIVE + allowed_count(allowed_count) AS (SELECT sum(count) / 32 FROM partition_keys), + grouped_partition_keys(from_id, to_id, count) AS + (SELECT id, id, count + FROM partition_keys + WHERE id = 0 + UNION ALL + SELECT CASE WHEN l1.count <= allowed_count THEN l1.from_id ELSE l2.id END, l2.id, - case when l1.count <= allowed_count - then l1.count + l2.count - else l2.count end - from grouped_partition_keys l1, + CASE WHEN l1.count <= allowed_count + THEN l1.count + l2.count + ELSE l2.count END + FROM grouped_partition_keys l1, partition_keys l2, allowed_count -where l1.to_id + 1 = l2.id) -select row_number() over () - 1 as partition_id, from_id, to_id, count -from (select from_id, max(to_id) as to_id, max(count) as count - from grouped_partition_keys - group by from_id) +WHERE l1.to_id + 1 = l2.id) +SELECT row_number() OVER () - 1 AS partition_id, from_id, to_id, count +FROM (SELECT from_id, MAX(to_id) AS to_id, MAX(count) AS count + FROM grouped_partition_keys + GROUP BY from_id) )-" ); (void)preprocessing_db.query( R"-( -create -or replace table partition_key_to_partition as -select partition_keys.partition_key as partition_key, - partitioning.partition_id as partition_id -from partition_keys, +CREATE OR REPLACE TABLE partition_key_to_partition AS +SELECT partition_keys.partition_key AS partition_key, + partitioning.partition_id AS partition_id +FROM partition_keys, partitioning -where partition_keys.id >= partitioning.from_id +WHERE partition_keys.id >= partitioning.from_id AND partition_keys.id <= partitioning.to_id; )-" ); (void)preprocessing_db.query(fmt::format( R"-( -create -or replace view partitioned_metadata as -select partitioning.partition_id as partition_id, metadata_table.* -from partition_keys, +CREATE OR REPLACE VIEW partitioned_metadata AS +SELECT partitioning.partition_id AS partition_id, metadata_table.* +FROM partition_keys, partitioning, metadata_table -where (metadata_table.{0} = partition_keys.partition_key or (metadata_table.{0} is null -and partition_keys.partition_key is null)) +WHERE (metadata_table.{0} = partition_keys.partition_key OR (metadata_table.{0} IS NULL +AND partition_keys.partition_key IS NULL)) AND partition_keys.id >= partitioning.from_id AND partition_keys.id <= partitioning.to_id; )-", @@ -238,94 +236,153 @@ void Preprocessor::buildEmptyPartitioning() { (void)preprocessing_db.query( R"-( -create or replace table partitioning as -select 0::bigint as partition_id, 0::bigint as from_id, 0::bigint as to_id, count(*) as count -from metadata_table; +CREATE OR REPLACE TABLE partitioning AS +SELECT 0::bigint AS partition_id, 0::bigint AS from_id, 0::bigint AS to_id, count(*) AS count +FROM metadata_table; )-" ); (void)preprocessing_db.query( - "create or replace table partition_key_to_partition as\n" - "select 0::bigint as partition_key, 0::bigint as partition_id;" + "CREATE OR REPLACE TABLE partition_key_to_partition AS\n" + "SELECT 0::bigint AS partition_key, 0::bigint AS partition_id;" ); (void)preprocessing_db.query( - "create\n" - "or replace view partitioned_metadata as\n" - "select 0::bigint as partition_id, metadata_table.*\n" - "from metadata_table;" + "CREATE OR REPLACE VIEW partitioned_metadata AS\n" + "SELECT 0::bigint AS partition_id, metadata_table.*\n" + "FROM metadata_table;" ); } -void Preprocessor::createSequenceViews(const ReferenceGenomes& reference_genomes) { - std::string order_by_select = - ", " + database_config.schema.primary_key + " as " + database_config.schema.primary_key; - if (database_config.schema.date_to_sort_by.has_value()) { - order_by_select += ", " + database_config.schema.date_to_sort_by.value() + " as " + - database_config.schema.date_to_sort_by.value(); - } - std::string partition_by_where; +void Preprocessor::createPartitionedSequenceTablesFromNdjson( + const std::filesystem::path& file_name, + const ReferenceGenomes& reference_genomes +) { + const SequenceInfo sequence_info(reference_genomes); + sequence_info.validate(preprocessing_db.getConnection(), file_name); + + PreprocessingDatabase::registerSequences(reference_genomes); + std::string partition_by_select; + std::string partition_by_where; if (database_config.schema.partition_by.has_value()) { - partition_by_select = "partition_key_to_partition.partition_id as partition_id"; + partition_by_select = "partition_key_to_partition.partition_id AS partition_id"; partition_by_where = fmt::format( - "where (preprocessing_table.{0} = partition_key_to_partition.partition_key) or " - "(preprocessing_table.{0} is null and " - "partition_key_to_partition.partition_key is null)", + "WHERE (metadata.\"{0}\" = partition_key_to_partition.partition_key) OR " + "(metadata.\"{0}\" IS NULL AND " + "partition_key_to_partition.partition_key IS NULL)", database_config.schema.partition_by.value() ); } else { - partition_by_select = "0 as partition_id"; + partition_by_select = "0 AS partition_id"; partition_by_where = ""; } + createUnalignedPartitionedSequenceFiles( + file_name, reference_genomes, partition_by_select, partition_by_where + ); + + createAlignedPartitionedSequenceViews( + file_name, reference_genomes, sequence_info, partition_by_select, partition_by_where + ); +} + +void Preprocessor::createAlignedPartitionedSequenceViews( + const std::filesystem::path& file_name, + const ReferenceGenomes& reference_genomes, + const SequenceInfo& sequence_info, + const std::string& partition_by_select, + const std::string& partition_by_where +) { + std::string order_by_select = ", metadata.\"" + database_config.schema.primary_key + "\" AS \"" + + database_config.schema.primary_key + "\""; + std::string order_by_fields = ", \"" + database_config.schema.primary_key + "\""; + if (database_config.schema.date_to_sort_by.has_value()) { + order_by_select += ", metadata.\"" + database_config.schema.date_to_sort_by.value() + + "\" AS \"" + database_config.schema.date_to_sort_by.value() + "\""; + order_by_fields += ", \"" + database_config.schema.date_to_sort_by.value() + "\""; + } + + (void)preprocessing_db.query(fmt::format( + "CREATE OR REPLACE TABLE sequence_table AS\n" + "SELECT metadata.{} AS key, {}," + "{}" + "{} \n" + "FROM '{}', partition_key_to_partition " + "{};", + database_config.schema.primary_key, + boost::join(sequence_info.getAlignedSequenceSelects(), ","), + partition_by_select, + order_by_select, + file_name.string(), + partition_by_where + )); + for (const auto& [seq_name, _] : reference_genomes.raw_nucleotide_sequences) { (void)preprocessing_db.query(fmt::format( - "create or replace view nuc_{0} as\n" - "select {1} as key, nuc_{0} as sequence," - "{2}" - "{3} \n" - "from preprocessing_table, partition_key_to_partition " - "{4};", + "CREATE OR REPLACE VIEW nuc_{0} AS\n" + "SELECT key, nuc_{0} AS sequence, partition_id" + "{1}" + "FROM sequence_table;", seq_name, - database_config.schema.primary_key, - partition_by_select, - order_by_select, - partition_by_where - )); - (void)preprocessing_db.query(fmt::format( - "create or replace view unaligned_nuc_{0} as\n" - "select {1} as key, unaligned_nuc_{0} as sequence," - "{2}" - "{3} \n" - "from preprocessing_table, partition_key_to_partition " - "{4};", - seq_name, - database_config.schema.primary_key, - partition_by_select, - order_by_select, - partition_by_where + order_by_fields )); } for (const auto& [seq_name, _] : reference_genomes.raw_aa_sequences) { (void)preprocessing_db.query(fmt::format( - "create or replace view gene_{0} as\n" - "select {1} as key, gene_{0} as sequence, " - "{2}\n" - "{3} \n" - "from preprocessing_table, partition_key_to_partition " - "{4};", + "CREATE OR REPLACE VIEW gene_{0} AS\n" + "SELECT key, gene_{0} AS sequence, partition_id" + "{1}" + "FROM sequence_table;", seq_name, + order_by_fields + )); + } +} + +void Preprocessor::createUnalignedPartitionedSequenceFiles( + const std::filesystem::path& file_name, + const ReferenceGenomes& reference_genomes, + const std::string& partition_by_select, + const std::string& partition_by_where +) { + for (const auto& [seq_name, _] : reference_genomes.raw_nucleotide_sequences) { + const std::string table_sql = fmt::format( + "SELECT metadata.\"{}\" AS key, {}," + "{} \n" + "FROM '{}', partition_key_to_partition " + "{}", database_config.schema.primary_key, + SequenceInfo::getUnalignedSequenceSelect(seq_name), partition_by_select, - order_by_select, + file_name.string(), partition_by_where - )); + ); + createUnalignedPartitionedSequenceFile(seq_name, table_sql); } } -void Preprocessor::createPartitionedSequenceTables(const ReferenceGenomes& reference_genomes) { +void Preprocessor::createUnalignedPartitionedSequenceFile( + const std::string& seq_name, + const std::string& table_sql +) { + const std::filesystem::path save_location = + preprocessing_config.getIntermediateResultsDirectory() / ("unaligned_nuc_" + seq_name); + preprocessing_db.query(fmt::format( + "COPY ({}) TO '{}' (FORMAT PARQUET, PARTITION_BY ({}), OVERWRITE_OR_IGNORE);", + table_sql, + save_location.string(), + "partition_id" + )); + preprocessing_db.query("VACUUM;"); +} + +void Preprocessor::createPartitionedSequenceTablesFromSequenceFiles( + const ReferenceGenomes& reference_genomes +) { + PreprocessingDatabase::registerSequences(reference_genomes); + for (const auto& [sequence_name, reference_sequence] : reference_genomes.raw_nucleotide_sequences) { createPartitionedTableForSequence( @@ -335,13 +392,25 @@ void Preprocessor::createPartitionedSequenceTables(const ReferenceGenomes& refer .replace_extension(silo::preprocessing::FASTA_EXTENSION), "nuc_" ); - createPartitionedTableForSequence( - sequence_name, + + preprocessing_db.generateSequenceTableFromFasta( + "unaligned_tmp", reference_sequence, preprocessing_config.getUnalignedNucFilenameNoExtension(sequence_name) - .replace_extension(silo::preprocessing::FASTA_EXTENSION), - "unaligned_nuc_" + .replace_extension(silo::preprocessing::FASTA_EXTENSION) + ); + createUnalignedPartitionedSequenceFile( + sequence_name, + fmt::format( + "SELECT unaligned_tmp.key AS key, unaligned_tmp.sequence AS unaligned_nuc_{}, " + "partitioned_metadata.partition_id AS partition_id " + "FROM unaligned_tmp RIGHT JOIN partitioned_metadata " + "ON unaligned_tmp.key = partitioned_metadata.{} ", + sequence_name, + database_config.schema.primary_key + ) ); + preprocessing_db.query("DROP TABLE IF EXISTS unaligned_tmp;"); } for (const auto& [sequence_name, reference_sequence] : reference_genomes.raw_aa_sequences) { @@ -361,10 +430,10 @@ void Preprocessor::createPartitionedTableForSequence( const std::filesystem::path& filename, const std::string& table_prefix ) { - std::string order_by_select = ", raw.key as " + database_config.schema.primary_key; + std::string order_by_select = ", raw.key AS " + database_config.schema.primary_key; if (database_config.schema.date_to_sort_by.has_value()) { order_by_select += ", partitioned_metadata." + - database_config.schema.date_to_sort_by.value() + " as " + + database_config.schema.date_to_sort_by.value() + " AS " + database_config.schema.date_to_sort_by.value(); } @@ -375,12 +444,12 @@ void Preprocessor::createPartitionedTableForSequence( (void)preprocessing_db.query(fmt::format( R"-( - create or replace view {} as - select key, sequence, - partitioned_metadata.partition_id as partition_id + CREATE OR REPLACE VIEW {} AS + SELECT key, sequence, + partitioned_metadata.partition_id AS partition_id {} - from {} as raw right join partitioned_metadata - on raw.key = partitioned_metadata.{}; + FROM {} AS raw RIGHT JOIN partitioned_metadata + ON raw.key = partitioned_metadata.{}; )-", table_name, order_by_select, @@ -414,23 +483,73 @@ Database Preprocessor::buildDatabase( database.initializeNucSequences(reference_genomes.nucleotide_sequences); database.initializeAASequences(reference_genomes.aa_sequences); - SPDLOG_INFO("build - building metadata store"); - - for (size_t partition_id = 0; partition_id < partition_descriptor.getPartitions().size(); - ++partition_id) { - const auto& part = partition_descriptor.getPartitions()[partition_id]; - for (size_t chunk_index = 0; chunk_index < part.getPartitionChunks().size(); - ++chunk_index) { - const uint32_t sequences_added = database.partitions[partition_id].columns.fill( - preprocessing_db.getConnection(), partition_id, order_by_clause, database_config - ); - database.partitions[partition_id].sequence_count += sequences_added; - } - SPDLOG_INFO("build - finished columns for partition {}", partition_id); - } + tbb::task_group tasks; + + tasks.run([&]() { + SPDLOG_INFO("build - building metadata store in parallel"); + + buildMetadataStore(database, partition_descriptor, order_by_clause); + + SPDLOG_INFO("build - finished metadata store"); + }); + + tasks.run([&]() { + SPDLOG_INFO("build - building nucleotide sequence stores"); + buildNucleotideSequenceStore( + database, partition_descriptor, reference_genomes, order_by_clause + ); + SPDLOG_INFO("build - finished nucleotide sequence stores"); + + SPDLOG_INFO("build - building amino acid sequence stores"); + buildAminoAcidSequenceStore( + database, partition_descriptor, reference_genomes, order_by_clause + ); + SPDLOG_INFO("build - finished amino acid sequence stores"); + }); + + tasks.wait(); + + SPDLOG_INFO("build - finalizing insertion indexes"); + database.finalizeInsertionIndexes(); + } + + SPDLOG_INFO("Build took {} ms", micros); + SPDLOG_INFO("database info: {}", database.getDatabaseInfo()); + + database.validate(); + + return database; +} - SPDLOG_INFO("build - building sequence stores"); +void Preprocessor::buildMetadataStore( + Database& database, + const preprocessing::Partitions& partition_descriptor, + const std::string& order_by_clause +) { + for (size_t partition_id = 0; partition_id < partition_descriptor.getPartitions().size(); + ++partition_id) { + const auto& part = partition_descriptor.getPartitions()[partition_id]; + for (size_t chunk_index = 0; chunk_index < part.getPartitionChunks().size(); ++chunk_index) { + const uint32_t sequences_added = + database.partitions.at(partition_id) + .columns.fill( + preprocessing_db.getConnection(), partition_id, order_by_clause, database_config + ); + database.partitions.at(partition_id).sequence_count += sequences_added; + } + SPDLOG_INFO("build - finished columns for partition {}", partition_id); + } +} +void Preprocessor::buildNucleotideSequenceStore( + Database& database, + const preprocessing::Partitions& partition_descriptor, + const ReferenceGenomes& reference_genomes, + const std::string& order_by_clause +) { + for (const auto& pair : reference_genomes.raw_nucleotide_sequences) { + const std::string& nuc_name = pair.first; + const std::string& reference_sequence = pair.second; tbb::parallel_for( tbb::blocked_range(0, partition_descriptor.getPartitions().size()), [&](const auto& local) { @@ -439,86 +558,73 @@ Database Preprocessor::buildDatabase( const auto& part = partition_descriptor.getPartitions()[partition_index]; for (size_t chunk_index = 0; chunk_index < part.getPartitionChunks().size(); ++chunk_index) { - for (const auto& [nuc_name, reference_sequence] : - reference_genomes.raw_nucleotide_sequences) { - { - SPDLOG_DEBUG( - "build - building aligned sequence store for nucleotide sequence {} and " - "partition {}", - nuc_name, - partition_index - ); - - silo::ZstdFastaTableReader sequence_input( - preprocessing_db.getConnection(), - "nuc_" + nuc_name, - reference_sequence, - "sequence", - fmt::format("partition_id = {}", partition_index), - order_by_clause - ); - database.partitions[partition_index].nuc_sequences.at(nuc_name).fill( - sequence_input - ); - } - - { - SPDLOG_DEBUG( - "build - building unaligned sequence store for nucleotide sequence {} " - "and " - "partition {}", - nuc_name, - partition_index - ); - - silo::ZstdFastaTableReader unaligned_sequence_input( - preprocessing_db.getConnection(), - "unaligned_nuc_" + nuc_name, - reference_sequence, - "sequence", - fmt::format("partition_id = {}", partition_index), - order_by_clause - ); - database.partitions[partition_index] - .unaligned_nuc_sequences.at(nuc_name) - .fill(unaligned_sequence_input); - } - } - for (const auto& [aa_name, reference_sequence] : - reference_genomes.raw_aa_sequences) { - SPDLOG_DEBUG( - "build - building sequence store for amino acid sequence {} and partition " - "{}", - aa_name, - partition_index - ); - - silo::ZstdFastaTableReader sequence_input( - preprocessing_db.getConnection(), - "gene_" + aa_name, - reference_sequence, - "sequence", - fmt::format("partition_id = {}", partition_index), - order_by_clause - ); - database.partitions[partition_index].aa_sequences.at(aa_name).fill( - sequence_input - ); - } + SPDLOG_DEBUG( + "build - building aligned sequence store for nucleotide " + "sequence {} and partition {}", + nuc_name, + partition_index + ); + + silo::ZstdFastaTableReader sequence_input( + preprocessing_db.getConnection(), + "nuc_" + nuc_name, + reference_sequence, + "sequence", + fmt::format("partition_id = {}", partition_index), + order_by_clause + ); + database.partitions.at(partition_index) + .nuc_sequences.at(nuc_name) + .fill(sequence_input); } - SPDLOG_INFO("build - finished sequences for partition {}", partition_index); } } ); - database.finalizeInsertionIndexes(); + SPDLOG_INFO("build - finished nucleotide sequence {}", nuc_name); } +} - SPDLOG_INFO("Build took {} ms", micros); - SPDLOG_INFO("database info: {}", database.getDatabaseInfo()); - - database.validate(); - - return database; +void Preprocessor::buildAminoAcidSequenceStore( + silo::Database& database, + const preprocessing::Partitions& partition_descriptor, + const silo::ReferenceGenomes& reference_genomes, + const std::string& order_by_clause +) { + for (const auto& pair : reference_genomes.raw_aa_sequences) { + const std::string& aa_name = pair.first; + const std::string& reference_sequence = pair.second; + tbb::parallel_for( + tbb::blocked_range(0, partition_descriptor.getPartitions().size()), + [&](const auto& local) { + for (auto partition_index = local.begin(); partition_index != local.end(); + ++partition_index) { + const auto& part = partition_descriptor.getPartitions()[partition_index]; + for (size_t chunk_index = 0; chunk_index < part.getPartitionChunks().size(); + ++chunk_index) { + SPDLOG_DEBUG( + "build - building sequence store for amino acid " + "sequence {} and partition {}", + aa_name, + partition_index + ); + + silo::ZstdFastaTableReader sequence_input( + preprocessing_db.getConnection(), + "gene_" + aa_name, + reference_sequence, + "sequence", + fmt::format("partition_id = {}", partition_index), + order_by_clause + ); + database.partitions.at(partition_index) + .aa_sequences.at(aa_name) + .fill(sequence_input); + } + } + } + ); + SPDLOG_INFO("build - finished amino acid sequence {}", aa_name); + } } } // namespace silo::preprocessing diff --git a/src/silo/preprocessing/preprocessor.test.cpp b/src/silo/preprocessing/preprocessor.test.cpp index bcd176bf7..b09e4583b 100644 --- a/src/silo/preprocessing/preprocessor.test.cpp +++ b/src/silo/preprocessing/preprocessor.test.cpp @@ -112,7 +112,7 @@ TEST_P(PreprocessorTestFixture, shouldProcessDataSetWithMissingSequences) { const auto database_info = database.getDatabaseInfo(); - EXPECT_GT(database_info.total_size, 0); + EXPECT_GT(database_info.total_size, 0UL); EXPECT_EQ(database_info.sequence_count, scenario.expected_sequence_count); const silo::query_engine::QueryEngine query_engine(database); diff --git a/src/silo/preprocessing/sequence_info.cpp b/src/silo/preprocessing/sequence_info.cpp index 3355a7ccb..58902b2be 100644 --- a/src/silo/preprocessing/sequence_info.cpp +++ b/src/silo/preprocessing/sequence_info.cpp @@ -17,48 +17,55 @@ SequenceInfo::SequenceInfo(const silo::ReferenceGenomes& reference_genomes) { } } -std::vector SequenceInfo::getSequenceSelects() { +std::vector SequenceInfo::getAlignedSequenceSelects() const { std::vector sequence_selects; sequence_selects.reserve(nuc_sequence_names.size() + aa_sequence_names.size()); for (const std::string& name : nuc_sequence_names) { - sequence_selects.emplace_back(fmt::format( - "{0}(alignedNucleotideSequences.{1}, " - "'{1}') as nuc_{1}", - preprocessing::PreprocessingDatabase::COMPRESS_NUC, - name - )); - sequence_selects.emplace_back(fmt::format( - "{0}(unalignedNucleotideSequences.{1}, " - "'{1}') as unaligned_nuc_{1}", - preprocessing::PreprocessingDatabase::COMPRESS_NUC, - name - )); + sequence_selects.emplace_back(getNucleotideSequenceSelect(name)); } for (const std::string& name : aa_sequence_names) { - sequence_selects.emplace_back(fmt::format( - "{0}(alignedAminoAcidSequences.{1}, " - "'{1}') as gene_{1}", - preprocessing::PreprocessingDatabase::COMPRESS_AA, - name - )); + sequence_selects.emplace_back(getAminoAcidSequenceSelect(name)); } return sequence_selects; } +std::string SequenceInfo::getNucleotideSequenceSelect(const std::string& seq_name) { + return fmt::format( + "{0}(alignedNucleotideSequences.{1}, '{1}') AS nuc_{1}", + preprocessing::PreprocessingDatabase::COMPRESS_NUC, + seq_name + ); +} + +std::string SequenceInfo::getUnalignedSequenceSelect(const std::string& seq_name) { + return fmt::format( + "{0}(unalignedNucleotideSequences.{1}, '{1}') AS unaligned_nuc_{1}", + preprocessing::PreprocessingDatabase::COMPRESS_NUC, + seq_name + ); +} + +std::string SequenceInfo::getAminoAcidSequenceSelect(const std::string& seq_name) { + return fmt::format( + "{0}(alignedAminoAcidSequences.{1}, '{1}') AS gene_{1}", + preprocessing::PreprocessingDatabase::COMPRESS_AA, + seq_name + ); +} + void SequenceInfo::validate( duckdb::Connection& connection, const std::filesystem::path& input_filename ) const { auto result = connection.Query(fmt::format( "SELECT json_keys(alignedNucleotideSequences), json_keys(alignedAminoAcidSequences) " - "FROM " - "'{}' LIMIT 1; ", + "FROM '{}' LIMIT 1; ", input_filename.string() )); if (result->HasError()) { throw silo::preprocessing::PreprocessingException( - "Preprocessing exception when retrieving the fields 'alignedNucleotideSequences' and " - "'alignedAminoAcidSequences', duckdb threw with error: " + + "Preprocessing exception when retrieving the fields 'alignedNucleotideSequences' " + "and 'alignedAminoAcidSequences', duckdb threw with error: " + result->GetError() ); } @@ -91,8 +98,8 @@ void SequenceInfo::validate( == nuc_sequence_names_to_validate.end()) { // TODO(#220) handle the cases when segments are left out appropriately throw silo::preprocessing::PreprocessingException(fmt::format( - "The aligned nucleotide sequence {} which is contained in the reference sequences is " - "not contained in the input file {}.", + "The aligned nucleotide sequence {} which is contained in the reference " + "sequences is not contained in the input file {}.", name, input_filename.string() )); @@ -112,8 +119,8 @@ void SequenceInfo::validate( if (std::find(aa_sequence_names_to_validate.begin(), aa_sequence_names_to_validate.end(), name) == aa_sequence_names_to_validate.end()) { throw silo::preprocessing::PreprocessingException(fmt::format( - "The aligned amino acid sequence {} which is contained in the reference sequences is " - "not contained in the input file {}.", + "The aligned amino acid sequence {} which is contained in the reference " + "sequences is not contained in the input file {}.", name, input_filename.string() )); diff --git a/src/silo/query_engine/actions/fasta.cpp b/src/silo/query_engine/actions/fasta.cpp index 2ba9fccb0..6dd6d51b1 100644 --- a/src/silo/query_engine/actions/fasta.cpp +++ b/src/silo/query_engine/actions/fasta.cpp @@ -62,20 +62,22 @@ std::string getTableQuery( const DatabasePartition& database_partition, const std::string& key_table_name ) { - std::vector file_names; - file_names.reserve(sequence_names.size()); + std::vector read_file_sqls; + read_file_sqls.reserve(sequence_names.size()); for (const auto& sequence_name : sequence_names) { - file_names.emplace_back(database_partition.unaligned_nuc_sequences.at(sequence_name).file_name + read_file_sqls.emplace_back( + database_partition.unaligned_nuc_sequences.at(sequence_name).getReadSQL() ); } std::string select_clause; std::string table_clause; std::string join_clause; - for (size_t idx = 0; idx < file_names.size(); idx++) { - const auto& file_name = file_names.at(idx); - select_clause += fmt::format(", t{0}.sequence as t{0}_sequence", idx); - table_clause += fmt::format(", '{}' t{}", file_name.string(), idx); + for (size_t idx = 0; idx < read_file_sqls.size(); idx++) { + const auto& sql_to_read_file = read_file_sqls.at(idx); + select_clause += + fmt::format(", t{0}.unaligned_nuc_{1} as t{0}_sequence", idx, sequence_names.at(idx)); + table_clause += fmt::format(", ({}) t{}", sql_to_read_file.string(), idx); join_clause += fmt::format(" AND key_table.key = t{}.key", idx); } diff --git a/src/silo/storage/reference_genomes.test.cpp b/src/silo/storage/reference_genomes.test.cpp index d615faf54..c3bc472aa 100644 --- a/src/silo/storage/reference_genomes.test.cpp +++ b/src/silo/storage/reference_genomes.test.cpp @@ -9,23 +9,23 @@ TEST(ReferenceGenome, readFromFile) { auto under_test = silo::ReferenceGenomes::readFromFile("testBaseData/exampleDataset/reference_genomes.json"); - ASSERT_EQ(under_test.nucleotide_sequences.size(), 2); - ASSERT_EQ(under_test.aa_sequences.size(), 12); + ASSERT_EQ(under_test.nucleotide_sequences.size(), 2UL); + ASSERT_EQ(under_test.aa_sequences.size(), 12UL); - ASSERT_EQ(under_test.nucleotide_sequences.at("main").size(), 29903); + ASSERT_EQ(under_test.nucleotide_sequences.at("main").size(), 29903UL); ASSERT_EQ(under_test.nucleotide_sequences.at("main").at(0), silo::Nucleotide::Symbol::A); - ASSERT_EQ(under_test.nucleotide_sequences.at("testSecondSequence").size(), 4); + ASSERT_EQ(under_test.nucleotide_sequences.at("testSecondSequence").size(), 4UL); ASSERT_EQ( under_test.nucleotide_sequences.at("testSecondSequence").at(1), silo::Nucleotide::Symbol::C ); - ASSERT_EQ(under_test.aa_sequences.at("S").size(), 1274); + ASSERT_EQ(under_test.aa_sequences.at("S").size(), 1274UL); ASSERT_EQ(under_test.aa_sequences.at("S").at(3), silo::AminoAcid::Symbol::F); - ASSERT_EQ(under_test.aa_sequences.at("ORF1a").size(), 4401); + ASSERT_EQ(under_test.aa_sequences.at("ORF1a").size(), 4401UL); ASSERT_EQ(under_test.aa_sequences.at("ORF1a").at(10), silo::AminoAcid::Symbol::K); - ASSERT_EQ(under_test.aa_sequences.at("ORF9b").size(), 98); + ASSERT_EQ(under_test.aa_sequences.at("ORF9b").size(), 98UL); ASSERT_EQ(under_test.aa_sequences.at("ORF9b").at(10), silo::AminoAcid::Symbol::A); } diff --git a/src/silo/storage/unaligned_sequence_store.cpp b/src/silo/storage/unaligned_sequence_store.cpp index 4949fb703..88548d695 100644 --- a/src/silo/storage/unaligned_sequence_store.cpp +++ b/src/silo/storage/unaligned_sequence_store.cpp @@ -14,23 +14,19 @@ #include "silo/common/format_number.h" #include "silo/common/nucleotide_symbols.h" #include "silo/common/symbol_map.h" +#include "silo/persistence/exception.h" #include "silo/preprocessing/preprocessing_exception.h" #include "silo/zstdfasta/zstdfasta_table_reader.h" silo::UnalignedSequenceStorePartition::UnalignedSequenceStorePartition( - std::filesystem::path file_name, - std::string& compression_dictionary + std::string sql_for_reading_file, + const std::string& compression_dictionary ) - : file_name(std::move(file_name)), + : sql_for_reading_file(std::move(sql_for_reading_file)), compression_dictionary(compression_dictionary) {} -size_t silo::UnalignedSequenceStorePartition::fill(silo::ZstdFastaTableReader& input) { - const size_t line_count = input.lineCount(); - - input.copyTableTo(file_name.string()); - - sequence_count += line_count; - return line_count; +std::string silo::UnalignedSequenceStorePartition::getReadSQL() const { + return sql_for_reading_file; } silo::UnalignedSequenceStore::UnalignedSequenceStore( @@ -41,11 +37,18 @@ silo::UnalignedSequenceStore::UnalignedSequenceStore( compression_dictionary(std::move(compression_dictionary)) {} silo::UnalignedSequenceStorePartition& silo::UnalignedSequenceStore::createPartition() { + const size_t partition_id = partitions.size(); return partitions.emplace_back( - folder_path / fmt::format("P{}.parquet", partitions.size()), compression_dictionary + fmt::format( + "SELECT * FROM read_parquet('{}/*/*.parquet', hive_partitioning = 1) " + "WHERE partition_id = {}", + folder_path.string(), + partition_id + ), + compression_dictionary ); } void silo::UnalignedSequenceStore::saveFolder(const std::filesystem::path& save_location) const { - std::filesystem::copy(folder_path, save_location); + std::filesystem::copy(folder_path, save_location, std::filesystem::copy_options::recursive); } diff --git a/src/silo/zstdfasta/zstdfasta_table_reader.cpp b/src/silo/zstdfasta/zstdfasta_table_reader.cpp index 57ee7dcd0..926bece40 100644 --- a/src/silo/zstdfasta/zstdfasta_table_reader.cpp +++ b/src/silo/zstdfasta/zstdfasta_table_reader.cpp @@ -138,6 +138,19 @@ void silo::ZstdFastaTableReader::copyTableTo(std::string_view file_name) { } } +void silo::ZstdFastaTableReader::copyTableToPartitioned( + std::string_view file_name, + std::string_view partition_key +) { + query_result = + connection.Query(fmt::format("COPY ({}) to '{}' ;", getTableQuery(), file_name, partition_key) + ); + if (query_result->HasError()) { + SPDLOG_ERROR("Error when executing SQL " + query_result->GetError()); + throw preprocessing::PreprocessingException("Error when SQL " + query_result->GetError()); + } +} + size_t silo::ZstdFastaTableReader::lineCount() { query_result = connection.Query(fmt::format("SELECT COUNT(*) FROM ({})", getTableQuery())); if (query_result->HasError()) { diff --git a/src/silo_api/api.cpp b/src/silo_api/api.cpp index 8fa5d1dd1..0868e241f 100644 --- a/src/silo_api/api.cpp +++ b/src/silo_api/api.cpp @@ -201,16 +201,22 @@ class SiloServer : public Poco::Util::ServerApplication { return Application::EXIT_OK; }; + silo::Database runPreprocessor( + const silo::preprocessing::PreprocessingConfig& preprocessing_config + ) { + auto database_config = databaseConfig(config()); + + auto preprocessor = silo::preprocessing::Preprocessor(preprocessing_config, database_config); + + return preprocessor.preprocess(); + } + int handlePreprocessing() { SPDLOG_INFO("Starting SILO preprocessing"); try { const auto preprocessing_config = preprocessingConfig(config()); - auto database_config = databaseConfig(config()); - - auto preprocessor = - silo::preprocessing::Preprocessor(preprocessing_config, database_config); - auto database = preprocessor.preprocess(); + auto database = runPreprocessor(preprocessing_config); database.saveDatabaseState(preprocessing_config.getOutputDirectory()); } catch (const std::exception& ex) {