Skip to content

Commit

Permalink
feat: serialization of partition descriptor to json
Browse files Browse the repository at this point in the history
  • Loading branch information
Taepper committed Jul 20, 2023
1 parent 6ca14d7 commit 472c1da
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 48 deletions.
1 change: 1 addition & 0 deletions .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Checks: >
-google-readability-avoid-underscore-in-googletest-name,
-abseil-string-find-str-contains
-bugprone-easily-swappable-parameters
-readability-magic-numbers
# TODO(someone): clean up misc-non-private-member-variables-in-classes and add option back in
# Not using google-readability-avoid-underscore-in-googletest-name because it also fails for test_name
# Not using abseil-string-find-str-contains because we don't want to include more libraries
Expand Down
33 changes: 23 additions & 10 deletions include/silo/preprocessing/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,35 @@
#include <string>
#include <vector>

#include <nlohmann/json_fwd.hpp>

namespace boost::serialization {
class access;
}

namespace silo::preprocessing {

class PangoLineageCounts;
class Partition;

class Chunk {
friend class Partition;

struct Chunk {
template <class Archive>
[[maybe_unused]] void serialize(Archive& archive, [[maybe_unused]] const uint32_t version) {
// clang-format off
archive& prefix;
archive& count_of_sequences;
archive& offset;
archive& pango_lineages;
// clang-format on
}
std::string prefix;
uint32_t count_of_sequences;
uint32_t offset;
std::vector<std::string> pango_lineages;

public:
Chunk(std::string_view lineage, uint32_t count);
Chunk(std::vector<std::string>&& lineages, uint32_t count);

void addChunk(Chunk&& other);

std::string_view getPrefix() const;
uint32_t getCountOfSequences() const;
uint32_t getOffset() const;
const std::vector<std::string>& getPangoLineages() const;
};

class Partition {
Expand Down Expand Up @@ -63,6 +74,8 @@ class Partitions {

void save(std::ostream& output_file) const;

static Partitions load(std::istream& input_file);

[[nodiscard]] const std::vector<Partition>& getPartitions() const;

[[nodiscard]] const std::vector<PartitionChunk>& getPartitionChunks() const;
Expand Down
139 changes: 103 additions & 36 deletions src/silo/preprocessing/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
#include <tuple>
#include <utility>

#include <nlohmann/json.hpp>

#include "silo/persistence/exception.h"
#include "silo/preprocessing/pango_lineage_count.h"

namespace silo::preprocessing {

std::string commonPangoPrefix(const std::string& lineage1, const std::string& lineage2) {
std::string commonPangoPrefix(std::string_view lineage1, std::string_view lineage2) {
std::string prefix;
// Buffer until it reaches another .
std::string buffer;
Expand Down Expand Up @@ -45,11 +47,8 @@ std::vector<silo::preprocessing::Chunk> mergePangosToChunks(
) {
// Initialize chunks such that every chunk is just a pango_lineage
std::list<Chunk> chunks;
for (const auto& count : pango_lineage_counts) {
std::vector<std::string> pango_lineages;
pango_lineages.push_back(count.pango_lineage);
const Chunk tmp = {count.pango_lineage, count.count_of_sequences, 0, pango_lineages};
chunks.emplace_back(tmp);
for (const auto& [lineage, count] : pango_lineage_counts) {
chunks.emplace_back(lineage, count);
}
// We want to prioritise merges more closely related chunks.
// Therefore, we first merge the chunks, with longer matching prefixes.
Expand All @@ -64,20 +63,15 @@ std::vector<silo::preprocessing::Chunk> mergePangosToChunks(
for (uint32_t len = max_len; len > 0; len--) {
for (auto it = chunks.begin(); it != chunks.end() && std::next(it) != chunks.end();) {
auto&& [pango1, pango2] = std::tie(*it, *std::next(it));
std::string const common_prefix = commonPangoPrefix(pango1.prefix, pango2.prefix);
const std::string common_prefix =
commonPangoPrefix(pango1.getPrefix(), pango2.getPrefix());
// We only look at possible merges with a common_prefix length of #len
const bool one_chunk_is_very_small =
pango1.count_of_sequences < min_size || pango2.count_of_sequences < min_size;
const bool both_chunks_still_want_to_grow =
pango1.count_of_sequences < target_size && pango2.count_of_sequences < target_size;
pango1.getCountOfSequences() < min_size || pango2.getCountOfSequences() < min_size;
const bool both_chunks_still_want_to_grow = pango1.getCountOfSequences() < target_size &&
pango2.getCountOfSequences() < target_size;
if (common_prefix.size() == len && (one_chunk_is_very_small || both_chunks_still_want_to_grow)) {
pango2.prefix = common_prefix;
pango2.count_of_sequences += pango1.count_of_sequences;
pango2.pango_lineages.insert(
pango2.pango_lineages.end(),
pango1.pango_lineages.begin(),
pango1.pango_lineages.end()
);
pango2.addChunk(std::move(pango1));

// We merged pango1 into pango2 -> Now delete pango1
// Do not need to increment, because erase will make it automatically point to next
Expand All @@ -99,7 +93,7 @@ silo::preprocessing::Partition::Partition(std::vector<Chunk>&& chunks_)
uint32_t running_total = 0;
for (Chunk& chunk : chunks) {
chunk.offset = running_total;
running_total += chunk.count_of_sequences;
running_total += chunk.getCountOfSequences();
}
sequence_count = running_total;
}
Expand All @@ -118,16 +112,16 @@ silo::preprocessing::Partitions::Partitions(std::vector<Partition> partitions_)
for (uint32_t chunk_id = 0, limit2 = part.getChunks().size(); chunk_id < limit2; ++chunk_id) {
const auto& chunk = part.getChunks()[chunk_id];
partition_chunks.emplace_back(preprocessing::PartitionChunk{
part_id, chunk_id, chunk.count_of_sequences});
part_id, chunk_id, chunk.getCountOfSequences()});
}
}

for (uint32_t i = 0, limit = partitions.size(); i < limit; ++i) {
const auto& part = partitions[i];
for (uint32_t j = 0, limit2 = part.getChunks().size(); j < limit2; ++j) {
const auto& chunk = part.getChunks()[j];
for (const auto& pango : chunk.pango_lineages) {
pango_to_chunk[pango] = {i, j, chunk.count_of_sequences};
for (const auto& pango : chunk.getPangoLineages()) {
pango_to_chunk[pango] = {i, j, chunk.getCountOfSequences()};
}
}
}
Expand Down Expand Up @@ -164,11 +158,15 @@ Partitions buildPartitions(
partitions.emplace_back(std::move(chunks));
} else if (arch == Architecture::SINGLE_SINGLE) {
// Merge pango_lineages, such that all lineages are in one chunk
Chunk chunk;
for (const auto& pango : pango_lineage_counts.pango_lineage_counts) {
chunk.pango_lineages.push_back(pango.pango_lineage);
if (!pango_lineage_counts.pango_lineage_counts.empty()) {
auto it = pango_lineage_counts.pango_lineage_counts.begin();
Chunk chunk{it->pango_lineage, it->count_of_sequences};
it++;
for (; it != pango_lineage_counts.pango_lineage_counts.end(); ++it) {
chunk.addChunk({it->pango_lineage, it->count_of_sequences});
}
partitions.emplace_back(std::vector<Chunk>{{chunk}});
}
partitions.emplace_back(std::vector<Chunk>{{chunk}});
}
return Partitions{partitions};
}
Expand All @@ -190,6 +188,47 @@ bool PartitionChunk::operator==(const PartitionChunk& other) const {
return partition == other.partition && chunk == other.chunk && size == other.size;
}

Chunk::Chunk(std::string_view lineage, uint32_t count)
: prefix(lineage),
count_of_sequences(count),
offset(0),
pango_lineages({{std::string{lineage}}}) {}

Chunk::Chunk(std::vector<std::string>&& lineages, uint32_t count)
: count_of_sequences(count),
offset(0),
pango_lineages(lineages) {
if (lineages.empty()) {
throw std::runtime_error("Empty chunks should be impossible to create by design.");
}
std::sort(pango_lineages.begin(), pango_lineages.end());
prefix = commonPangoPrefix(pango_lineages.front(), pango_lineages.back());
}

void Chunk::addChunk(Chunk&& other) {
prefix = commonPangoPrefix(prefix, other.getPrefix());
count_of_sequences += other.count_of_sequences;
pango_lineages.insert(
pango_lineages.end(), other.pango_lineages.begin(), other.pango_lineages.end()
);
}

std::string_view Chunk::getPrefix() const {
return prefix;
}

uint32_t Chunk::getCountOfSequences() const {
return count_of_sequences;
}

uint32_t Chunk::getOffset() const {
return offset;
}

const std::vector<std::string>& Chunk::getPangoLineages() const {
return pango_lineages;
}

} // namespace silo::preprocessing

std::size_t std::hash<silo::preprocessing::PartitionChunk>::operator()(
Expand All @@ -200,19 +239,47 @@ std::size_t std::hash<silo::preprocessing::PartitionChunk>::operator()(
(hash<uint32_t>()(partition_chunk.chunk) >> 2);
}

template <>
struct nlohmann::adl_serializer<silo::preprocessing::Chunk> {
// NOLINTNEXTLINE(readability-identifier-naming)
static silo::preprocessing::Chunk from_json(const nlohmann::json& js_object) {
return silo::preprocessing::Chunk{
js_object["lineages"].template get<std::vector<std::string>>(),
js_object["count"].template get<uint32_t>()};
}

// NOLINTNEXTLINE(readability-identifier-naming)
static void to_json(nlohmann::json& js_object, silo::preprocessing::Chunk chunk) {
js_object["lineages"] = chunk.getPangoLineages();
js_object["count"] = chunk.getCountOfSequences();
}
};

template <>
struct nlohmann::adl_serializer<silo::preprocessing::Partition> {
// NOLINTNEXTLINE(readability-identifier-naming)
static silo::preprocessing::Partition from_json(const nlohmann::json& js_object) {
return silo::preprocessing::Partition{
js_object.template get<std::vector<silo::preprocessing::Chunk>>()};
}

// NOLINTNEXTLINE(readability-identifier-naming)
static void to_json(nlohmann::json& js_object, silo::preprocessing::Partition partition) {
js_object = partition.getChunks();
}
};

namespace silo::preprocessing {

void Partitions::save(std::ostream& output_file) const {
for (const auto& partition : partitions) {
output_file << "P\t" << partition.getChunks().size() << '\t' << partition.getSequenceCount()
<< '\n';
for (const auto& chunk : partition.getChunks()) {
output_file << "C\t" << chunk.prefix << '\t' << chunk.pango_lineages.size() << '\t'
<< chunk.count_of_sequences << '\t' << chunk.offset << '\n';
for (const auto& pango_lineage : chunk.pango_lineages) {
output_file << "L\t" << pango_lineage << '\n';
}
}
}
const nlohmann::json json(partitions);
output_file << json.dump(4) << std::endl;
}

Partitions Partitions::load(std::istream& input_file) {
nlohmann::json json;
json = nlohmann::json::parse(input_file);
const std::vector<Partition> partitions = json.get<std::vector<Partition>>();
return Partitions{partitions};
}
} // namespace silo::preprocessing
99 changes: 99 additions & 0 deletions src/silo/preprocessing/partition.test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#include "silo/preprocessing/partition.h"

#include "silo/preprocessing/preprocessing_config.h"

#include <gtest/gtest.h>
#include <yaml-cpp/yaml.h>
#include <fstream>

using silo::preprocessing::Chunk;
using silo::preprocessing::Partition;
using silo::preprocessing::Partitions;

Partitions createSimplePartitionsObject() {
std::vector<Chunk> chunks1;
chunks1.emplace_back(std::vector<std::string>{{"A.1", "A.3", "B.1", "A.2"}}, 8);
chunks1.emplace_back(std::vector<std::string>{{"B.2", "C.3", "C.1", "C.2"}}, 11123);
Partition partition1(std::move(chunks1));
std::vector<Chunk> chunks2;
chunks2.emplace_back(std::vector<std::string>{{"XY.1", "XY.3", "XY.A.A.A.3", "XY.2312"}}, 123);
chunks2.emplace_back(std::vector<std::string>{{"XT.1", "XT.3", "XTA.A.3", "XT.2312"}}, 512);
Partition partition2(std::move(chunks2));
return Partitions{{std::move(partition1), std::move(partition2)}};
}

std::string getExpectedSimplePartitionsDump() {
return "[\n"
" [\n"
" {\n"
" \"count\": 8,\n"
" \"lineages\": [\n"
" \"A.1\",\n"
" \"A.2\",\n"
" \"A.3\",\n"
" \"B.1\"\n"
" ]\n"
" },\n"
" {\n"
" \"count\": 11123,\n"
" \"lineages\": [\n"
" \"B.2\",\n"
" \"C.1\",\n"
" \"C.2\",\n"
" \"C.3\"\n"
" ]\n"
" }\n"
" ],\n"
" [\n"
" {\n"
" \"count\": 123,\n"
" \"lineages\": [\n"
" \"XY.1\",\n"
" \"XY.2312\",\n"
" \"XY.3\",\n"
" \"XY.A.A.A.3\"\n"
" ]\n"
" },\n"
" {\n"
" \"count\": 512,\n"
" \"lineages\": [\n"
" \"XT.1\",\n"
" \"XT.2312\",\n"
" \"XT.3\",\n"
" \"XTA.A.3\"\n"
" ]\n"
" }\n"
" ]\n"
"]\n";
}

TEST(Partitions, shouldSaveSimpleConfig) {
const Partitions under_test = createSimplePartitionsObject();
std::ofstream out_file("output/test.partitions");
under_test.save(out_file);
out_file.close();
const std::ifstream in_file("output/test.partitions");

std::stringstream buffer;
buffer << (in_file.rdbuf());
const std::string file_contents = buffer.str();
const std::string expected_file_contents = getExpectedSimplePartitionsDump();

ASSERT_EQ(file_contents, expected_file_contents);
}

TEST(Partitions, shouldSaveAndLoadSimpleConfig) {
const Partitions partitions = createSimplePartitionsObject();
std::ofstream out_file("output/test.partitions");
partitions.save(out_file);
out_file.close();
std::ifstream in_file("output/test.partitions");

const Partitions under_test = Partitions::load(in_file);
auto partition_chunks = partitions.getPartitionChunks();
auto partition_chunks_under_test = under_test.getPartitionChunks();
ASSERT_EQ(partition_chunks.size(), partition_chunks_under_test.size());
for (size_t i = 0; i < partition_chunks.size(); ++i) {
ASSERT_EQ(partition_chunks[i], partition_chunks_under_test[i]);
}
}
4 changes: 2 additions & 2 deletions src/silo/query_engine/filter_expressions/date_between.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ std::vector<silo::query_engine::operators::RangeSelection::Range> DateBetween::

const auto* base = date_column.getValues().data();
for (const auto& chunk : chunks) {
const auto* begin = &date_column.getValues()[chunk.offset];
const auto* end = &date_column.getValues()[chunk.offset + chunk.count_of_sequences];
const auto* begin = &date_column.getValues()[chunk.getOffset()];
const auto* end = &date_column.getValues()[chunk.getOffset() + chunk.getOffset()];
// If lower bound is empty we use 1 as the lower-bound, as 0 represents NULL values
const auto* lower = std::lower_bound(begin, end, date_from.value_or(1));
const uint32_t lower_index = lower - base;
Expand Down

0 comments on commit 472c1da

Please sign in to comment.