Skip to content

Commit

Permalink
feat: build metadata in parallel to sequences. Do not create unaligne…
Browse files Browse the repository at this point in the history
…d sequence tables in preprocessing, rather hive-partition them directly to disk. Better (debug-)logging
  • Loading branch information
Taepper authored and JonasKellerer committed Feb 5, 2024
1 parent ba54196 commit c1cdfeb
Show file tree
Hide file tree
Showing 18 changed files with 512 additions and 310 deletions.
6 changes: 5 additions & 1 deletion include/silo/preprocessing/preprocessing_database.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <filesystem>
#include <memory>
#include <optional>
#include <string>

#include <duckdb.hpp>
Expand All @@ -24,10 +26,12 @@ class PreprocessingDatabase {
duckdb::Connection connection;

public:
PreprocessingDatabase(const std::string& backing_file);
PreprocessingDatabase(const std::optional<std::filesystem::path>& backing_file);

duckdb::Connection& getConnection();

void refreshConnection();

Partitions getPartitionDescriptor();

static void registerSequences(const silo::ReferenceGenomes& reference_genomes);
Expand Down
50 changes: 44 additions & 6 deletions include/silo/preprocessing/preprocessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class PangoLineageAliasLookup;

namespace preprocessing {

class SequenceInfo;

class Preprocessor {
PreprocessingConfig preprocessing_config;
config::DatabaseConfig database_config;
Expand All @@ -24,18 +26,36 @@ class Preprocessor {
Database preprocess();

private:
void buildTablesFromNdjsonInput(
const std::filesystem::path& file_name,
const ReferenceGenomes& reference_genomes
);
void buildTablesFromNdjsonInput(const std::filesystem::path& file_name);
void buildMetadataTableFromFile(const std::filesystem::path& metadata_filename);

void buildPartitioningTable();
void buildPartitioningTableByColumn(const std::string& partition_by_field);
void buildEmptyPartitioning();

void createSequenceViews(const ReferenceGenomes& reference_genomes);
void createPartitionedSequenceTables(const ReferenceGenomes& reference_genomes);
void createPartitionedSequenceTablesFromNdjson(
const std::filesystem::path& file_name,
const ReferenceGenomes& reference_genomes
);
void createAlignedPartitionedSequenceViews(
const std::filesystem::path& file_name,
const ReferenceGenomes& reference_genomes,
const SequenceInfo& sequence_info,
const std::string& partition_by_select,
const std::string& partition_by_where
);
void createUnalignedPartitionedSequenceFiles(
const std::filesystem::path& file_name,
const ReferenceGenomes& reference_genomes,
const std::string& partition_by_select,
const std::string& partition_by_where
);
void createUnalignedPartitionedSequenceFile(
const std::string& seq_name,
const std::string& table_sql
);

void createPartitionedSequenceTablesFromSequenceFiles(const ReferenceGenomes& reference_genomes);
void createPartitionedTableForSequence(
const std::string& sequence_name,
const std::string& reference_sequence,
Expand All @@ -50,6 +70,24 @@ class Preprocessor {
const silo::PangoLineageAliasLookup& alias_key,
const std::filesystem::path& intermediate_results_directory
);

void buildMetadataStore(
Database& database,
const preprocessing::Partitions& partition_descriptor,
const std::string& order_by_clause
);
void buildNucleotideSequenceStore(
Database& database,
const preprocessing::Partitions& partition_descriptor,
const ReferenceGenomes& reference_genomes,
const std::string& order_by_clause
);
void buildAminoAcidSequenceStore(
Database& database,
const preprocessing::Partitions& partition_descriptor,
const ReferenceGenomes& reference_genomes,
const std::string& order_by_clause
);
};
} // namespace preprocessing
} // namespace silo
8 changes: 7 additions & 1 deletion include/silo/preprocessing/sequence_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ class SequenceInfo {
public:
SequenceInfo(const silo::ReferenceGenomes& reference_genomes);

std::vector<std::string> getSequenceSelects();
std::vector<std::string> getAlignedSequenceSelects() const;

static std::string getNucleotideSequenceSelect(const std::string& seq_name);

static std::string getUnalignedSequenceSelect(const std::string& seq_name);

static std::string getAminoAcidSequenceSelect(const std::string& seq_name);

void validate(duckdb::Connection& connection, const std::filesystem::path& input_filename) const;
};
Expand Down
3 changes: 0 additions & 3 deletions include/silo/storage/database_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ class DatabasePartition {
for(auto& [name, store] : aa_sequences){
archive & store;
}
for(auto& [name, store] : unaligned_nuc_sequences){
archive & store;
}
archive & sequence_count;
// clang-format on
}
Expand Down
19 changes: 9 additions & 10 deletions include/silo/storage/unaligned_sequence_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,17 @@ class ZstdFastaTableReader;
class UnalignedSequenceStorePartition {
friend class boost::serialization::access;

template <class Archive>
void serialize(Archive& archive, [[maybe_unused]] const uint32_t version) {
archive & sequence_count;
}
std::string sql_for_reading_file;

public:
std::filesystem::path file_name;
std::string& compression_dictionary;
uint32_t sequence_count = 0;
const std::string& compression_dictionary;

explicit UnalignedSequenceStorePartition(
std::filesystem::path file_name,
std::string& compression_dictionary
std::string sql_for_reading_file,
const std::string& compression_dictionary
);

size_t fill(silo::ZstdFastaTableReader& input);
std::string getReadSQL() const;
};

class UnalignedSequenceStore {
Expand All @@ -39,6 +34,10 @@ class UnalignedSequenceStore {
std::filesystem::path folder_path;
std::string compression_dictionary;

private:
std::filesystem::path partitionFilename(size_t partition_id) const;

public:
void saveFolder(const std::filesystem::path& save_location) const;

explicit UnalignedSequenceStore(
Expand Down
2 changes: 2 additions & 0 deletions include/silo/zstdfasta/zstdfasta_table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class ZstdFastaTableReader {

void copyTableTo(std::string_view file_name);

void copyTableToPartitioned(std::string_view file_name, std::string_view partition_key);

size_t lineCount();
};
} // namespace silo
2 changes: 1 addition & 1 deletion src/silo/config/database_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ std::optional<DatabaseMetadata> DatabaseConfig::getMetadata(const std::string& n

void DatabaseConfig::writeConfig(const std::filesystem::path& config_path) const {
const YAML::Node node = YAML::convert<DatabaseConfig>::encode(*this);
SPDLOG_INFO("Writing database config to {}", config_path.string());
SPDLOG_DEBUG("Writing database config to {}", config_path.string());
std::ofstream out_file(config_path);
out_file << YAML::Dump(node);
}
Expand Down
19 changes: 16 additions & 3 deletions src/silo/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ void Database::saveDatabaseState(const std::filesystem::path& save_directory) {

const std::filesystem::path versioned_save_directory =
save_directory / getDataVersion().toString();
SPDLOG_INFO("Saving database to '{}'", versioned_save_directory.string());

if (std::filesystem::exists(versioned_save_directory)) {
auto error = fmt::format(
Expand All @@ -388,6 +389,8 @@ void Database::saveDatabaseState(const std::filesystem::path& save_directory) {

std::filesystem::create_directory(versioned_save_directory);

SPDLOG_INFO("Saving database config and schema");

const std::filesystem::path database_config_filename =
versioned_save_directory / "database_config.yaml";
database_config.writeConfig(database_config_filename);
Expand All @@ -406,6 +409,8 @@ void Database::saveDatabaseState(const std::filesystem::path& save_directory) {
::boost::archive::binary_oarchive column_archive(column_file);
column_archive << columns;

SPDLOG_INFO("Saving database sequence schema");

auto nuc_sequences_map = getNucSequences();
std::ofstream nuc_sequences_file =
openOutputFileOrThrow(versioned_save_directory / "nuc_sequences.silo");
Expand All @@ -418,8 +423,15 @@ void Database::saveDatabaseState(const std::filesystem::path& save_directory) {
::boost::archive::binary_oarchive aa_sequences_archive(aa_sequences_file);
aa_sequences_archive << aa_sequences_map;

SPDLOG_INFO("Saving unaligned sequence data");

for (auto& [name, store] : unaligned_nuc_sequences) {
store.saveFolder(versioned_save_directory / name);
const std::filesystem::path unaligned_sequence_directory =
versioned_save_directory / ("unaligned_nuc_" + name);
SPDLOG_DEBUG(
"Saving unaligned sequence {} to folder '{}'", name, unaligned_sequence_directory.string()
);
store.saveFolder(unaligned_sequence_directory);
}

std::vector<std::ofstream> partition_archives;
Expand Down Expand Up @@ -639,8 +651,9 @@ void Database::initializeNucSequences(
}
SPDLOG_TRACE("initializing unaligned nucleotide sequences");
for (const auto& [nuc_name, reference_sequence] : reference_sequences) {
const std::filesystem::path sequence_directory = intermediate_results_directory / nuc_name;
create_directory(sequence_directory);
const std::filesystem::path sequence_directory =
intermediate_results_directory / ("unaligned_nuc_" + nuc_name);
std::filesystem::create_directory(sequence_directory);
if (!std::filesystem::is_directory(sequence_directory)) {
SPDLOG_TRACE(
"Sequence directory for unaligned sequences {} could not be created.",
Expand Down
2 changes: 1 addition & 1 deletion src/silo/preprocessing/metadata_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ std::vector<std::string> MetadataInfo::getMetadataSelects() const {
std::vector<std::string> ret;
ret.reserve(metadata_selects.size());
for (const auto& [field, select] : metadata_selects) {
ret.push_back(fmt::format(R"({} as "{}")", select, field));
ret.push_back(fmt::format(R"({} AS "{}")", select, field));
}
return ret;
}
Expand Down
14 changes: 10 additions & 4 deletions src/silo/preprocessing/preprocessing_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

using duckdb::BigIntValue;
using duckdb::BinaryExecutor;
using duckdb::Connection;
using duckdb::DataChunk;
using duckdb::ExpressionState;
using duckdb::ListValue;
Expand Down Expand Up @@ -100,11 +99,14 @@ std::unordered_map<std::string_view, tbb::enumerable_thread_specific<silo::ZstdC

namespace silo::preprocessing {

PreprocessingDatabase::PreprocessingDatabase(const std::string& backing_file)
: duck_db(backing_file),
PreprocessingDatabase::PreprocessingDatabase(
const std::optional<std::filesystem::path>& backing_file
)
: duck_db(backing_file.value_or(":memory:")),
connection(duck_db) {
query("PRAGMA default_null_order='NULLS FIRST';");
query("SET preserve_insertion_order=FALSE;");
query("SET memory_limit='50 GB';");

connection.CreateVectorizedFunction(
std::string(COMPRESS_NUC),
Expand Down Expand Up @@ -135,10 +137,14 @@ void PreprocessingDatabase::registerSequences(const silo::ReferenceGenomes& refe
Compressors::initialize(reference_genomes);
}

Connection& PreprocessingDatabase::getConnection() {
duckdb::Connection& PreprocessingDatabase::getConnection() {
return connection;
}

void PreprocessingDatabase::refreshConnection() {
connection = duckdb::Connection{duck_db};
}

preprocessing::Partitions PreprocessingDatabase::getPartitionDescriptor() {
auto partition_descriptor_from_sql =
connection.Query("SELECT partition_id, count FROM partitioning ORDER BY partition_id");
Expand Down
Loading

0 comments on commit c1cdfeb

Please sign in to comment.