Skip to content

Commit

Permalink
feat: streaming: fasta: process as a batch per partition
Browse files Browse the repository at this point in the history
Use the temporary `next` based interface for now.

- change functions to take only the vector out of the QueryResult
- change `addSequencesToResultsForPartition` method into a local function
- add hack to action.cpp to bypass sorting for streaming results

Refs: #112
  • Loading branch information
pflanze committed Jul 8, 2024
1 parent 8b24478 commit 66bfdff
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 38 deletions.
2 changes: 2 additions & 0 deletions include/silo/query_engine/actions/action.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class Action {
virtual void validateOrderByFields(const Database& database) const = 0;

[[nodiscard]] virtual QueryResult execute(
/// Life time: until query result was delivered (and the lock
/// inside `FixedDatabase` is released)
const Database& database,
std::vector<OperatorResult> bitmap_filter
) const = 0;
Expand Down
7 changes: 0 additions & 7 deletions include/silo/query_engine/actions/fasta.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ class Fasta : public Action {
std::vector<OperatorResult> bitmap_filter
) const override;

void addSequencesToResultsForPartition(
QueryResult& results,
const silo::DatabasePartition& database_partition,
const OperatorResult& bitmap,
const std::string& primary_key_column
) const;

public:
explicit Fasta(std::vector<std::string>&& sequence_names);
};
Expand Down
7 changes: 5 additions & 2 deletions src/silo/query_engine/actions/action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,11 @@ QueryResult Action::executeAndOrder(
if (offset.has_value() && offset.value() >= result.entriesMut().size()) {
return {};
}
applySort(result);
applyOffsetAndLimit(result);
// XX HACK
if (result.isMaterialized()) {
applySort(result);
applyOffsetAndLimit(result); // XXXX
}
return result;
}

Expand Down
57 changes: 34 additions & 23 deletions src/silo/query_engine/actions/fasta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <fmt/format.h>
#include <spdlog/spdlog.h>
#include <duckdb.hpp>
#include <memory>
#include <nlohmann/json.hpp>

#include "silo/database.h"
Expand Down Expand Up @@ -94,7 +95,7 @@ std::string getTableQuery(
}

void addSequencesFromResultTableToJson(
QueryResult& results,
std::vector<QueryResultEntry>& results,
duckdb::Connection& connection,
const std::string& result_table_name,
const std::vector<std::string>& sequence_names,
Expand All @@ -116,28 +117,29 @@ void addSequencesFromResultTableToJson(
table_reader.loadTable();
std::optional<std::string> genome_buffer;

const size_t start_of_partition_in_result = results.entriesMut().size() - number_of_values;
const size_t end_of_partition_in_result = results.entriesMut().size();
const size_t start_of_partition_in_result = results.size() - number_of_values;
const size_t end_of_partition_in_result = results.size();
for (size_t idx = start_of_partition_in_result; idx < end_of_partition_in_result; idx++) {
auto current_key = table_reader.next(genome_buffer);
assert(current_key.has_value());
if (genome_buffer.has_value()) {
results.entriesMut().at(idx).fields.emplace(sequence_name, *genome_buffer);
results.at(idx).fields.emplace(sequence_name, *genome_buffer);
} else {
results.entriesMut().at(idx).fields.emplace(sequence_name, std::nullopt);
results.at(idx).fields.emplace(sequence_name, std::nullopt);
}
}
}
}

} // namespace

void Fasta::addSequencesToResultsForPartition(
QueryResult& results,
void addSequencesToResultsForPartition(
std::vector<std::string>& sequence_names,
std::vector<QueryResultEntry>& results,
const DatabasePartition& database_partition,
const OperatorResult& bitmap,
const std::string& primary_key_column
) const {
) {
duckdb::DuckDB duck_db;
duckdb::Connection connection(duck_db);

Expand Down Expand Up @@ -194,7 +196,7 @@ void Fasta::addSequencesToResultsForPartition(
// Add the primary key to the result
QueryResultEntry entry;
entry.fields.emplace(primary_key_column, primary_key.value());
results.entriesMut().emplace_back(std::move(entry));
results.emplace_back(std::move(entry));

appender.EndRow();
appender.Flush();
Expand Down Expand Up @@ -231,8 +233,6 @@ QueryResult Fasta::execute(const Database& database, std::vector<OperatorResult>
);
}

const std::string& primary_key_column = database.database_config.schema.primary_key;

size_t total_count = 0;
for (auto& filter : bitmap_filter) {
total_count += filter->cardinality();
Expand All @@ -242,18 +242,29 @@ QueryResult Fasta::execute(const Database& database, std::vector<OperatorResult>
fmt::format("Fasta action currently limited to {} sequences", SEQUENCE_LIMIT)
);

QueryResult results;
results.entriesMut().reserve(total_count);

for (uint32_t partition_index = 0; partition_index < database.partitions.size();
++partition_index) {
const auto& database_partition = database.partitions[partition_index];
const auto& bitmap = bitmap_filter[partition_index];

addSequencesToResultsForPartition(results, database_partition, bitmap, primary_key_column);
}

return results;
uint32_t partition_index = 0;
return QueryResult{[bitmap_filter =
std::make_shared<std::vector<OperatorResult>>(std::move(bitmap_filter)),
&database,
partition_index,
sequence_names =
sequence_names](std::vector<QueryResultEntry>& results) mutable {
while (partition_index < database.partitions.size()) {
SPDLOG_DEBUG(
"fasta closure for partition_index {}/{}", partition_index, database.partitions.size()
);
const auto& database_partition = database.partitions[partition_index];
const auto& bitmap = (*bitmap_filter)[partition_index];
const std::string& primary_key_column = database.database_config.schema.primary_key;
addSequencesToResultsForPartition(
sequence_names, results, database_partition, bitmap, primary_key_column
);
++partition_index;
if (!results.empty()) {
return;
}
}
}};
}

// NOLINTNEXTLINE(readability-identifier-naming)
Expand Down
8 changes: 5 additions & 3 deletions src/silo_api/query_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "silo_api/error_request_handler.h"

namespace silo_api {
using silo::query_engine::QueryResultEntry;

QueryHandler::QueryHandler(silo_api::DatabaseMutex& database_mutex)
: database_mutex(database_mutex) {}
Expand All @@ -34,14 +35,15 @@ void QueryHandler::post(
try {
const auto fixed_database = database_mutex.getDatabase();

const auto query_result = fixed_database.database.executeQuery(query);
auto query_result = fixed_database.database.executeQuery(query);

response.set("data-version", fixed_database.database.getDataVersionTimestamp().value);

response.setContentType("application/x-ndjson");
std::ostream& out_stream = response.send();
for (const auto& entry : query_result.entries()) {
out_stream << nlohmann::json(entry) << '\n';
std::optional<std::reference_wrapper<const QueryResultEntry>> entry;
while ((entry = query_result.next())) {
out_stream << nlohmann::json(*entry) << '\n';
}
} catch (const silo::QueryParseException& ex) {
response.setContentType("application/json");
Expand Down
5 changes: 2 additions & 3 deletions src/silo_api/request_handler_factory.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ TEST_F(RequestHandlerTestFixture, handlesPostQueryRequest) {
};
std::vector<silo::query_engine::QueryResultEntry> tmp{{fields1}, {fields2}};
silo::query_engine::QueryResult query_result{std::move(tmp)};
EXPECT_CALL(database_mutex.mock_database, executeQuery)
.WillRepeatedly(testing::Return(std::move(query_result)));
EXPECT_CALL(database_mutex.mock_database, executeQuery).WillOnce(testing::Return(query_result));
EXPECT_CALL(database_mutex.mock_database, getDataVersionTimestamp)
.WillRepeatedly(testing::Return(silo::DataVersion::Timestamp::fromString("1234").value()));
.WillOnce(testing::Return(silo::DataVersion::Timestamp::fromString("1234").value()));

request.setMethod("POST");
request.setURI("/query");
Expand Down

0 comments on commit 66bfdff

Please sign in to comment.