From e87d92720a032cd4c59493cc7d6a5f4cf65ef516 Mon Sep 17 00:00:00 2001 From: Christian Jaeger Date: Tue, 2 Jul 2024 23:16:41 +0200 Subject: [PATCH] feat: streaming: fasta_aligned: process in sub-partition batches - Let it have its own copy of PARTITION_CHUNK_SIZE, since the two values may need to be changed independently. - Mostly same as fasta.cpp, but using Range a bit more here, directly for tracking/stopping the iteration in the inner loop. And mutating the range in the QueryResult closure context directly via mutDrop(). - Reduced batch size. No reason to keep it large here. Refs: #112 --- src/silo/query_engine/actions/fasta.cpp | 1 + .../query_engine/actions/fasta_aligned.cpp | 106 ++++++++++++------ 2 files changed, 75 insertions(+), 32 deletions(-) diff --git a/src/silo/query_engine/actions/fasta.cpp b/src/silo/query_engine/actions/fasta.cpp index db7ff73ce..0623dc029 100644 --- a/src/silo/query_engine/actions/fasta.cpp +++ b/src/silo/query_engine/actions/fasta.cpp @@ -239,6 +239,7 @@ uint32_t addSequencesToResultsForPartition( return last_row_id; } +// Note: fasta_aligned.cpp has its own PARTITION_CHUNK_SIZE const size_t PARTITION_CHUNK_SIZE = 10000; } // namespace diff --git a/src/silo/query_engine/actions/fasta_aligned.cpp b/src/silo/query_engine/actions/fasta_aligned.cpp index 5c130fa7a..fc7567d52 100644 --- a/src/silo/query_engine/actions/fasta_aligned.cpp +++ b/src/silo/query_engine/actions/fasta_aligned.cpp @@ -1,5 +1,6 @@ #include "silo/query_engine/actions/fasta_aligned.h" +#include #include #include #include @@ -7,6 +8,9 @@ #include #include #include +#include +#include +#include #include #include "silo/common/aa_symbols.h" @@ -81,6 +85,10 @@ std::string reconstructSequence( } return reconstructed_sequence; } + +// Note: fasta.cpp has its own PARTITION_CHUNK_SIZE +const size_t PARTITION_CHUNK_SIZE = 100; + } // namespace QueryResult FastaAligned::execute( @@ -103,42 +111,76 @@ QueryResult FastaAligned::execute( } } - size_t total_count = 0; - for (auto& filter : bitmap_filter) { - total_count += filter->cardinality(); - } - CHECK_SILO_QUERY( - total_count < 10001, "FastaAligned action currently limited to 10000 sequences" - ); - - QueryResult results; - for (uint32_t partition_index = 0; partition_index < database.partitions.size(); - ++partition_index) { - const auto& database_partition = database.partitions[partition_index]; - const auto& bitmap = bitmap_filter[partition_index]; - for (const uint32_t row_id : *bitmap) { - QueryResultEntry entry; - std::string primary_key_column = database.database_config.schema.primary_key; - entry.fields.emplace( - std::move(primary_key_column), - database_partition.columns.getValue(primary_key_column, row_id) - ); - for (const auto& nuc_sequence_name : nuc_sequence_names) { - const auto& sequence_store = database_partition.nuc_sequences.at(nuc_sequence_name); - entry.fields.emplace( - nuc_sequence_name, reconstructSequence(sequence_store, row_id) - ); + uint32_t partition_index = 0; + std::optional> remaining_result_row_indices{}; + return QueryResult{[nuc_sequence_names = std::move(nuc_sequence_names), + aa_sequence_names = std::move(aa_sequence_names), + bitmap_filter = + std::make_shared>(std::move(bitmap_filter)), + remaining_result_row_indices, + &database, + partition_index](std::vector& results) mutable { + for (; partition_index < database.partitions.size(); + ++partition_index, remaining_result_row_indices = {}) { + auto& bitmap = (*bitmap_filter)[partition_index]; + if (!remaining_result_row_indices) { + remaining_result_row_indices = {{0, uint64ToUint32(bitmap->cardinality())}}; } - for (const auto& aa_sequence_name : aa_sequence_names) { - const auto& aa_store = database_partition.aa_sequences.at(aa_sequence_name); - entry.fields.emplace( - aa_sequence_name, reconstructSequence(aa_store, row_id) + + // The range of results to fully process in this batch + Range result_row_indices = + remaining_result_row_indices->take(PARTITION_CHUNK_SIZE); + // Remove the same range from the result rows that need to be + // created for the current partition + remaining_result_row_indices->mutDrop(PARTITION_CHUNK_SIZE); + + if (!result_row_indices.isEmpty()) { + SPDLOG_TRACE( + "FastaAligned::execute: refill QueryResult for partition_index {}/{}, {}/{}", + partition_index, + database.partitions.size(), + result_row_indices.toString(), + remaining_result_row_indices->beyondLast() ); + + const auto& database_partition = database.partitions[partition_index]; + for (const uint32_t row_id : *bitmap) { + QueryResultEntry entry; + std::string primary_key_column = database.database_config.schema.primary_key; + entry.fields.emplace( + std::move(primary_key_column), + database_partition.columns.getValue(primary_key_column, row_id) + ); + for (const auto& nuc_sequence_name : nuc_sequence_names) { + const auto& sequence_store = + database_partition.nuc_sequences.at(nuc_sequence_name); + entry.fields.emplace( + nuc_sequence_name, reconstructSequence(sequence_store, row_id) + ); + } + for (const auto& aa_sequence_name : aa_sequence_names) { + const auto& aa_store = database_partition.aa_sequences.at(aa_sequence_name); + entry.fields.emplace( + aa_sequence_name, reconstructSequence(aa_store, row_id) + ); + } + results.emplace_back(entry); + + result_row_indices.mutRest(); + if (result_row_indices.isEmpty()) { + // Finished the batch. Remove processed `row_id`s; + // we already removed the corresponding result + // indices from `remaining_result_row_indices`. + bitmap->removeRange(0, row_id + 1); + return; + } + } + std::cerr << "ERROR: ran out of bitmap before finishing result_row_indices\n" + << std::flush; + abort(); } - results.entriesMut().emplace_back(entry); } - } - return results; + }}; } // NOLINTNEXTLINE(readability-identifier-naming)