Skip to content

Commit

Permalink
feat: streaming: fasta_aligned: process in sub-partition batches
Browse files Browse the repository at this point in the history
- Let it have its own copy of PARTITION_CHUNK_SIZE, since the two
  values may need to be changed independently.

- Mostly same as fasta.cpp, but using Range a bit more here, directly
  for tracking/stopping the iteration in the inner loop. And mutating
  the range in the QueryResult closure context directly via mutDrop().

- Reduced batch size. No reason to keep it large here.

Refs: #112
  • Loading branch information
pflanze committed Jul 8, 2024
1 parent 3d19469 commit e87d927
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/silo/query_engine/actions/fasta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ uint32_t addSequencesToResultsForPartition(
return last_row_id;
}

// Note: fasta_aligned.cpp has its own PARTITION_CHUNK_SIZE
const size_t PARTITION_CHUNK_SIZE = 10000;

} // namespace
Expand Down
106 changes: 74 additions & 32 deletions src/silo/query_engine/actions/fasta_aligned.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
#include "silo/query_engine/actions/fasta_aligned.h"

#include <iostream>
#include <map>
#include <optional>
#include <utility>

#include <fmt/format.h>
#include <oneapi/tbb/blocked_range.h>
#include <oneapi/tbb/parallel_for.h>
#include <silo/common/numbers.h>
#include <silo/common/range.h>
#include <spdlog/spdlog.h>
#include <nlohmann/json.hpp>

#include "silo/common/aa_symbols.h"
Expand Down Expand Up @@ -81,6 +85,10 @@ std::string reconstructSequence(
}
return reconstructed_sequence;
}

// Note: fasta.cpp has its own PARTITION_CHUNK_SIZE
const size_t PARTITION_CHUNK_SIZE = 100;

} // namespace

QueryResult FastaAligned::execute(
Expand All @@ -103,42 +111,76 @@ QueryResult FastaAligned::execute(
}
}

size_t total_count = 0;
for (auto& filter : bitmap_filter) {
total_count += filter->cardinality();
}
CHECK_SILO_QUERY(
total_count < 10001, "FastaAligned action currently limited to 10000 sequences"
);

QueryResult results;
for (uint32_t partition_index = 0; partition_index < database.partitions.size();
++partition_index) {
const auto& database_partition = database.partitions[partition_index];
const auto& bitmap = bitmap_filter[partition_index];
for (const uint32_t row_id : *bitmap) {
QueryResultEntry entry;
std::string primary_key_column = database.database_config.schema.primary_key;
entry.fields.emplace(
std::move(primary_key_column),
database_partition.columns.getValue(primary_key_column, row_id)
);
for (const auto& nuc_sequence_name : nuc_sequence_names) {
const auto& sequence_store = database_partition.nuc_sequences.at(nuc_sequence_name);
entry.fields.emplace(
nuc_sequence_name, reconstructSequence<Nucleotide>(sequence_store, row_id)
);
uint32_t partition_index = 0;
std::optional<Range<uint32_t>> remaining_result_row_indices{};
return QueryResult{[nuc_sequence_names = std::move(nuc_sequence_names),
aa_sequence_names = std::move(aa_sequence_names),
bitmap_filter =
std::make_shared<std::vector<OperatorResult>>(std::move(bitmap_filter)),
remaining_result_row_indices,
&database,
partition_index](std::vector<QueryResultEntry>& results) mutable {
for (; partition_index < database.partitions.size();
++partition_index, remaining_result_row_indices = {}) {
auto& bitmap = (*bitmap_filter)[partition_index];
if (!remaining_result_row_indices) {
remaining_result_row_indices = {{0, uint64ToUint32(bitmap->cardinality())}};
}
for (const auto& aa_sequence_name : aa_sequence_names) {
const auto& aa_store = database_partition.aa_sequences.at(aa_sequence_name);
entry.fields.emplace(
aa_sequence_name, reconstructSequence<AminoAcid>(aa_store, row_id)

// The range of results to fully process in this batch
Range<uint32_t> result_row_indices =
remaining_result_row_indices->take(PARTITION_CHUNK_SIZE);
// Remove the same range from the result rows that need to be
// created for the current partition
remaining_result_row_indices->mutDrop(PARTITION_CHUNK_SIZE);

if (!result_row_indices.isEmpty()) {
SPDLOG_TRACE(
"FastaAligned::execute: refill QueryResult for partition_index {}/{}, {}/{}",
partition_index,
database.partitions.size(),
result_row_indices.toString(),
remaining_result_row_indices->beyondLast()
);

const auto& database_partition = database.partitions[partition_index];
for (const uint32_t row_id : *bitmap) {
QueryResultEntry entry;
std::string primary_key_column = database.database_config.schema.primary_key;
entry.fields.emplace(
std::move(primary_key_column),
database_partition.columns.getValue(primary_key_column, row_id)
);
for (const auto& nuc_sequence_name : nuc_sequence_names) {
const auto& sequence_store =
database_partition.nuc_sequences.at(nuc_sequence_name);
entry.fields.emplace(
nuc_sequence_name, reconstructSequence<Nucleotide>(sequence_store, row_id)
);
}
for (const auto& aa_sequence_name : aa_sequence_names) {
const auto& aa_store = database_partition.aa_sequences.at(aa_sequence_name);
entry.fields.emplace(
aa_sequence_name, reconstructSequence<AminoAcid>(aa_store, row_id)
);
}
results.emplace_back(entry);

result_row_indices.mutRest();
if (result_row_indices.isEmpty()) {
// Finished the batch. Remove processed `row_id`s;
// we already removed the corresponding result
// indices from `remaining_result_row_indices`.
bitmap->removeRange(0, row_id + 1);
return;
}
}
std::cerr << "ERROR: ran out of bitmap before finishing result_row_indices\n"
<< std::flush;
abort();
}
results.entriesMut().emplace_back(entry);
}
}
return results;
}};
}

// NOLINTNEXTLINE(readability-identifier-naming)
Expand Down

0 comments on commit e87d927

Please sign in to comment.