Skip to content

Commit

Permalink
Decompressor::Iterator - define input_iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
battlmonstr committed Mar 15, 2024
1 parent 39ad8e4 commit 2a27986
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 27 deletions.
1 change: 1 addition & 0 deletions .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Checks: >
-bugprone-unchecked-optional-access,
-bugprone-unused-raii,
cert-*,
-cert-dcl21-cpp,
-cert-err58-cpp,
-clang-analyzer-*,
clang-diagnostic-*,
Expand Down
79 changes: 75 additions & 4 deletions silkworm/db/snapshots/seg/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "decompressor.hpp"

#include <bitset>
#include <limits>
#include <stdexcept>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -300,6 +301,39 @@ std::ostream& operator<<(std::ostream& out, const PositionTable& pt) {
return out;
}

class Decompressor::ReadModeGuard {
public:
ReadModeGuard(
const MemoryMappedFile& file,
Decompressor::ReadMode new_mode,
Decompressor::ReadMode old_mode)
: file_(file),
old_mode_(old_mode) {
set_mode(new_mode);
}
virtual ~ReadModeGuard() {
set_mode(old_mode_);
}

private:
void set_mode(Decompressor::ReadMode mode) {
switch (mode) {
case ReadMode::kNormal:
file_.advise_normal();
break;
case ReadMode::kRandom:
file_.advise_random();
break;
case ReadMode::kSequential:
file_.advise_sequential();
break;
}
}

const MemoryMappedFile& file_;
Decompressor::ReadMode old_mode_;
};

Decompressor::Decompressor(std::filesystem::path compressed_path, std::optional<MemoryMappedRegion> compressed_region)
: compressed_path_(std::move(compressed_path)), compressed_region_{compressed_region} {}

Expand Down Expand Up @@ -349,12 +383,22 @@ void Decompressor::open() {

bool Decompressor::read_ahead(ReadAheadFuncRef fn) {
ensure(bool(compressed_file_), "decompressor closed, call open first");
compressed_file_->advise_sequential();
[[maybe_unused]] auto _ = gsl::finally([&]() { compressed_file_->advise_random(); });
Iterator it{this};
auto read_mode_guard = std::make_shared<ReadModeGuard>(*compressed_file_, ReadMode::kSequential, ReadMode::kRandom);
Iterator it{this, std::move(read_mode_guard)};
return fn(it);
}

Decompressor::Iterator Decompressor::begin() {
ensure(bool(compressed_file_), "decompressor closed, call open first");
auto read_mode_guard = std::make_shared<ReadModeGuard>(*compressed_file_, ReadMode::kSequential, ReadMode::kRandom);
Iterator it{this, std::move(read_mode_guard)};
if (it.has_next()) {
++it;
return it;
}
return end();
}

void Decompressor::close() {
compressed_file_.reset();
}
Expand Down Expand Up @@ -462,7 +506,11 @@ void Decompressor::read_positions(ByteView dict) {
SILK_TRACE << *position_dict_;
}

Decompressor::Iterator::Iterator(const Decompressor* decoder) : decoder_(decoder) {}
Decompressor::Iterator::Iterator(
const Decompressor* decoder,
std::shared_ptr<ReadModeGuard> read_mode_guard)
: decoder_(decoder),
read_mode_guard_(std::move(read_mode_guard)) {}

ByteView Decompressor::Iterator::data() const {
return ByteView{decoder_->words_start_, decoder_->words_length_};
Expand Down Expand Up @@ -790,4 +838,27 @@ uint16_t Decompressor::Iterator::next_code(std::size_t bit_length) {
return code;
}

Decompressor::Iterator& Decompressor::Iterator::operator++() {
if (has_next()) {
current_word_.clear();
next(current_word_);
} else {
*this = make_end(decoder_);
}
return *this;
}

bool operator==(const Decompressor::Iterator& lhs, const Decompressor::Iterator& rhs) {
return (lhs.decoder_ == rhs.decoder_) &&
(lhs.word_offset_ == rhs.word_offset_) &&
(lhs.bit_position_ == rhs.bit_position_);
}

Decompressor::Iterator Decompressor::Iterator::make_end(const Decompressor* decoder) {
Iterator it{decoder, {}};
it.word_offset_ = std::numeric_limits<uint64_t>::max();
it.bit_position_ = std::numeric_limits<uint8_t>::max();
return it;
}

} // namespace silkworm::snapshots::seg
41 changes: 39 additions & 2 deletions silkworm/db/snapshots/seg/decompressor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
#include <array>
#include <filesystem>
#include <functional>
#include <iterator>
#include <memory>
#include <ostream>
#include <span>
#include <string>
#include <utility>
#include <vector>

#include <absl/functional/function_ref.h>
Expand Down Expand Up @@ -177,10 +179,18 @@ class Decompressor {
//! The max number of positions in decoding tables
constexpr static std::size_t kMaxTablePositions = (1 << DecodingTable::kMaxTableBitLength) * 100;

enum class ReadMode : uint8_t {
kNormal,
kRandom,
kSequential,
};

class ReadModeGuard;

//! Read-only access to the file data stream
class Iterator {
public:
explicit Iterator(const Decompressor* decoder);
Iterator(const Decompressor* decoder, std::shared_ptr<ReadModeGuard> read_mode_guard);

[[nodiscard]] std::size_t data_size() const { return decoder_->words_length_; }

Expand Down Expand Up @@ -211,6 +221,25 @@ class Decompressor {
//! Reset to the specified offset in the data stream
void reset(uint64_t data_offset);

//! input_iterator concept boilerplate

using iterator_category = std::input_iterator_tag;
using difference_type = void;
using value_type = Bytes;
using pointer = value_type*;
using reference = value_type&;

reference operator*() { return current_word_; }
pointer operator->() { return &current_word_; }

Iterator operator++(int) { return std::exchange(*this, ++Iterator{*this}); }
Iterator& operator++();

friend bool operator!=(const Iterator& lhs, const Iterator& rhs) = default;
friend bool operator==(const Iterator& lhs, const Iterator& rhs);

static Iterator make_end(const Decompressor* decoder);

private:
//! View on the whole data stream.
[[nodiscard]] inline ByteView data() const;
Expand All @@ -232,6 +261,11 @@ class Decompressor {

//! Bit position [0..7] in current word of the data file
uint8_t bit_position_{0};

//! Last extracted word
Bytes current_word_;

std::shared_ptr<ReadModeGuard> read_mode_guard_;
};

using ReadAheadFuncRef = absl::FunctionRef<bool(Iterator)>;
Expand Down Expand Up @@ -261,7 +295,10 @@ class Decompressor {
bool read_ahead(ReadAheadFuncRef fn);

//! Get an iterator to the compressed data
[[nodiscard]] Iterator make_iterator() const { return Iterator{this}; }
[[nodiscard]] Iterator make_iterator() const { return Iterator{this, {}}; }

Iterator begin();
Iterator end() const { return Iterator::make_end(this); }

void close();

Expand Down
12 changes: 3 additions & 9 deletions silkworm/db/snapshots/seg/seg_zip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,9 @@ void seg_unzip(const std::filesystem::path& path) {
out_path.replace_extension("idt");
RawWordsStream words{out_path, RawWordsStream::OpenMode::kCreate, 1_Mebi};

decompressor.read_ahead([&](Decompressor::Iterator it) -> bool {
Bytes word;
while (it.has_next()) {
word.clear();
it.next(word);
words.write_word(word);
}
return true;
});
for (auto& word : decompressor) {
words.write_word(word);
}
}

} // namespace silkworm::snapshots::seg
14 changes: 7 additions & 7 deletions silkworm/infra/common/memory_mapped_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ void MemoryMappedFile::map_existing(bool read_only) {
fd = INVALID_HANDLE_VALUE;
}

void MemoryMappedFile::advise_normal() {
void MemoryMappedFile::advise_normal() const {
}

void MemoryMappedFile::advise_random() {
void MemoryMappedFile::advise_random() const {
}

void MemoryMappedFile::advise_sequential() {
void MemoryMappedFile::advise_sequential() const {
}

void* MemoryMappedFile::mmap(FileDescriptor fd, size_t size, bool read_only) {
Expand Down Expand Up @@ -147,15 +147,15 @@ void MemoryMappedFile::map_existing(bool read_only) {
region_ = {address, size};
}

void MemoryMappedFile::advise_normal() {
void MemoryMappedFile::advise_normal() const {
advise(MADV_NORMAL);
}

void MemoryMappedFile::advise_random() {
void MemoryMappedFile::advise_random() const {
advise(MADV_RANDOM);
}

void MemoryMappedFile::advise_sequential() {
void MemoryMappedFile::advise_sequential() const {
advise(MADV_SEQUENTIAL);
}

Expand All @@ -179,7 +179,7 @@ void MemoryMappedFile::unmap() {
}
}

void MemoryMappedFile::advise(int advice) {
void MemoryMappedFile::advise(int advice) const {
const int result = ::madvise(region_.data(), region_.size(), advice);
if (result == -1) {
// Ignore not implemented in kernel error because it still works (from Erigon)
Expand Down
8 changes: 4 additions & 4 deletions silkworm/infra/common/memory_mapped_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ class MemoryMappedFile {
return std::filesystem::last_write_time(path_);
}

void advise_normal();
void advise_random();
void advise_sequential();
void advise_normal() const;
void advise_random() const;
void advise_sequential() const;

private:
void map_existing(bool read_only);
Expand All @@ -126,7 +126,7 @@ class MemoryMappedFile {
HANDLE file_ = nullptr;
HANDLE mapping_ = nullptr;
#else
void advise(int advice);
void advise(int advice) const;
#endif
};

Expand Down
2 changes: 1 addition & 1 deletion silkworm/sentry/common/random.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ std::list<T*> random_list_items(std::list<T>& l, size_t max_count) {

BackInsertPtrIterator& operator*() { return *this; }
BackInsertPtrIterator& operator++() { return *this; }
BackInsertPtrIterator operator++(int) { return *this; } // NOLINT(cert-dcl21-cpp)
BackInsertPtrIterator operator++(int) { return *this; }

private:
std::list<T*>* container_;
Expand Down

0 comments on commit 2a27986

Please sign in to comment.