From 64278c8e1bfd81cca15d2d21a2613fee38947670 Mon Sep 17 00:00:00 2001 From: Jarmo Tiitto Date: Wed, 16 Aug 2023 22:20:25 +0300 Subject: [PATCH] Hashy CubeSwapper Implement way to temporally dump the cube data into disk storage in order to save system memory. For `./cubes -n 13 -w -s -u` run heaptrack tool reports: - total runtime: 26min 18s - peak RSS: 2.4 Gb - peak heap memory: 978 Mb This confirms that only the std::unordered_set<> internal nodes (and the lookup array) are kept in memory. Slow down is expected as accessing an element reads it from the disk. The swap files are named as `storage_.bin` in the cache folder. These files are normally deleted as soon as they are no longer needed. Important!! the process can open so many files simultaneously that the system NOFILE limit is reached. This limit should be raised with `ulimit -n 128000` to avoid terminating the program. The minimum number for open file handles is at least: * 32 - CubeSwapSet is specialized std::unordered_set<> that stores the cube data in a file. - CubeStorage acts as pseudo allocator for the cube data. - CubePtr is the key type inserted in to CubeSwapSet. This only an 64-bit offset into the backing file and CubePtr is owned by CubeStorage that created it. - CubePtr::get(const CubeStorage&) reads out the Cube from the storage. Hashy users are adapted to use it where needed. - Clearing Hashy is now quite fast because there is no memory to be freed for CubePtrs. SubsubHashy::clear() simply deletes the data and the backing file. - Compiling in C++20 mode enables speed up by allowing SubsubHashy::contains() to work with Cube and CubePtr types. Signed-off-by: JATothrim --- cpp/CMakeLists.txt | 1 + cpp/include/cubeSwapSet.hpp | 178 ++++++++++++++++++++++++++++++++++++ cpp/include/hashes.hpp | 86 +++++++++++++---- cpp/include/newCache.hpp | 5 +- cpp/src/cubeSwapSet.cpp | 121 ++++++++++++++++++++++++ cpp/src/cubes.cpp | 45 ++++----- cpp/src/newCache.cpp | 11 ++- 7 files changed, 393 insertions(+), 54 deletions(-) create mode 100644 cpp/include/cubeSwapSet.hpp create mode 100644 cpp/src/cubeSwapSet.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 68344fe..ad05812 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -71,6 +71,7 @@ add_library(CubeObjs OBJECT "src/cubes.cpp" "src/rotations.cpp" "src/newCache.cpp" + "src/cubeSwapSet.cpp" ) ConfigureTarget(CubeObjs) diff --git a/cpp/include/cubeSwapSet.hpp b/cpp/include/cubeSwapSet.hpp new file mode 100644 index 0000000..b7a2e5f --- /dev/null +++ b/cpp/include/cubeSwapSet.hpp @@ -0,0 +1,178 @@ +#pragma once +#ifndef OPENCUBES_CUBE_DISKSWAP_SET_HPP +#define OPENCUBES_CUBE_DISKSWAP_SET_HPP + +#include +#include +#include +#include +#include +#include + +#include "cube.hpp" +#include "mapped_file.hpp" + +/** + * Implement std::unordered_set<> that stores element data in a file. + * + * Cubes stored with size N in the set have constant cost of RAM memory: + * Only the std::unordered_set<> itself and the internals nodes are stored in RAM. + * The element *data* (i.e. XYZ data) is stored in the file. + * The performance cost is that each time the element is accessed + * the data has to be read back from the file. + * (Iterating the entire CubeSwapSet involves reading the entire backing file) + * + * Clearing the CubeSwapSet does not release the backing file space managed by CubeStorage. + * Call to CubeStorage::discard() is required after clearing or destructing + * the CubeSwapSet instance to cleanup the file. + * Elements cannot be removed one-by-one. + */ +class CubeStorage; + +/** + * Overlay that reads the cube data from the backing file. + * CubePtr needs its associated CubeStorage instance to be able to + * access its contents with CubePtr::get() + * The associated CubeStorage owning the CubePtr + * should always be available where CubePtr is used. + */ +class CubePtr { + protected: + mapped::seekoff_t m_seek = 0; + + public: + explicit CubePtr(mapped::seekoff_t offset) : m_seek(offset) {} + CubePtr(const CubePtr& c) : m_seek(c.m_seek) {} + + /** + * Get the Cube pointed by this instance. + */ + Cube get(const CubeStorage& storage) const; + + template + void copyout(const CubeStorage& storage, size_t n, Itr out) const { + auto tmp = get(storage); + std::copy_n(tmp.begin(), n, out); + } + + mapped::seekoff_t seek() const { return m_seek; } +}; + +/** + * Stateful comparator for Cubeptr + */ +class CubePtrEqual { + protected: + const CubeStorage* m_storage = nullptr; + public: + // C++20 feature: + using is_transparent = void; + + CubePtrEqual(const CubeStorage* ctx) : m_storage(ctx) {} + CubePtrEqual(const CubePtrEqual& ctx) : m_storage(ctx.m_storage) {} + + bool operator()(const CubePtr& a, const CubePtr& b) const { return a.get(*m_storage) == b.get(*m_storage); } + + bool operator()(const Cube& a, const CubePtr& b) const { return a == b.get(*m_storage); } + + bool operator()(const CubePtr& a, const Cube& b) const { return a.get(*m_storage) == b; } +}; + +class CubePtrHash { + protected: + const CubeStorage* m_storage = nullptr; + public: + // C++20 feature: + using is_transparent = void; + using transparent_key_equal = CubePtrEqual; + + CubePtrHash(const CubeStorage* ctx) : m_storage(ctx) {} + CubePtrHash(const CubePtrHash& ctx) : m_storage(ctx.m_storage) {} + + size_t operator()(const Cube& x) const { + std::size_t seed = x.size(); + for (auto& p : x) { + auto x = HashXYZ()(p); + seed ^= x + 0x9e3779b9 + (seed << 6) + (seed >> 2); + } + return seed; + } + + size_t operator()(const CubePtr& x) const { + auto cube = x.get(*m_storage); + std::size_t seed = cube.size(); + for (auto& p : cube) { + auto x = HashXYZ()(p); + seed ^= x + 0x9e3779b9 + (seed << 6) + (seed >> 2); + } + return seed; + } +}; + +class CubeStorage { + protected: + std::mutex m_mtx; + std::filesystem::path m_fpath; + std::shared_ptr m_file; + std::unique_ptr m_map; + + static std::atomic m_init_num; + const size_t m_cube_size; + mapped::seekoff_t m_prev_seek = 0; + mapped::seekoff_t m_alloc_seek = 0; + + public: + /** + * Initialize Cube file storage + * @param fname directory where to store the backing file. + * @param n The storage is reserved in n sized chunks. + * This should be equal to Cube::size() that are passed into allocate() + * as no other allocation size is supported. + * @note the backing file creation is delayed until allocate() is called first time. + */ + CubeStorage(std::filesystem::path path, size_t n); + ~CubeStorage(); + + // not copyable + CubeStorage(const CubeStorage&) = delete; + CubeStorage& operator=(const CubeStorage&) = delete; + // move constructible: but only if no allocations exists + CubeStorage(CubeStorage&& mv); + CubeStorage& operator=(CubeStorage&& mv) = delete; + + size_t cubeSize() const { return m_cube_size; } + + /** + * Store Cube data into the backing file. + * Returns CubePtr that can be inserted into CubeSwapSet. + * @note cube.size() must be equal to this->cubeSize() + */ + CubePtr allocate(const Cube& cube); + + /** + * Revert the effect of last allocate() + */ + void cancel_allocation(); + + /** + * Retrieve the cube data from the backing file. + */ + Cube read(const CubePtr& x) const; + + /** + * Drop all stored data. + * Shrinks the backing file to zero size and deletes it. + */ + void discard(); +}; + +/** + * CubeStorage enabled std::unordered_set<> + * + * The CubeSwapSet must be constructed with already initialized + * stateful instances of CubePtrEqual and CubePtrHash functors + * that resolve the CubePtr instance using the CubeStorage instance. + */ +using CubeSwapSet = std::unordered_set; + +#endif \ No newline at end of file diff --git a/cpp/include/hashes.hpp b/cpp/include/hashes.hpp index 7234bd3..79bedfb 100644 --- a/cpp/include/hashes.hpp +++ b/cpp/include/hashes.hpp @@ -3,12 +3,15 @@ #define OPENCUBES_HASHES_HPP #include #include +#include +#include #include #include #include #include #include "cube.hpp" +#include "cubeSwapSet.hpp" #include "utils.hpp" struct HashCube { @@ -27,24 +30,30 @@ using CubeSet = std::unordered_set>; class Subsubhashy { protected: - CubeSet set; + CubeStorage set_storage; + CubeSwapSet set; mutable std::shared_mutex set_mutex; public: + explicit Subsubhashy(std::filesystem::path path, size_t n) : set_storage(path, n), set(1, CubePtrHash(&set_storage), CubePtrEqual(&set_storage)) {} + template void insert(CubeT &&c) { std::lock_guard lock(set_mutex); - set.emplace(std::forward(c)); + auto [itr, isnew] = set.emplace(set_storage.allocate(std::forward(c))); + if (!isnew) { + set_storage.cancel_allocation(); + } } +#if __cplusplus > 201703L +// todo: need C++17 equivalent for *generic* +// contains() or find() that accepts both Cube and CubePtr types bool contains(const Cube &c) const { std::shared_lock lock(set_mutex); - auto itr = set.find(c); - if (itr != set.end()) { - return true; - } - return false; + return set.contains(c); } +#endif auto size() const { std::shared_lock lock(set_mutex); @@ -57,27 +66,45 @@ class Subsubhashy { set.reserve(1); } + // Get CubeStorage instance. + // [this->begin(), this->end()] iterated CubePtr's + // Can be resolved with CubePtr::get(this->storage()) + // that returns copy of the data as Cube. + const CubeStorage &storage() const { return set_storage; } + auto begin() const { return set.begin(); } auto end() const { return set.end(); } auto begin() { return set.begin(); } auto end() { return set.end(); } }; -template class Subhashy { protected: - std::array byhash; + std::deque byhash; public: + Subhashy(int NUM, size_t N, std::filesystem::path path) { + for (int i = 0; i < NUM; ++i) { + byhash.emplace_back(path, N); + } + } + template void insert(CubeT &&c) { HashCube hash; - auto idx = hash(c) % NUM; + auto idx = hash(c) % byhash.size(); auto &set = byhash[idx]; - if (!set.contains(c)) set.insert(std::forward(c)); +#if __cplusplus > 201703L + if (set.contains(c)) return; +#endif + set.insert(std::forward(c)); // printf("new size %ld\n\r", byshape[shape].size()); } + void clear() { + for (auto &set : byhash) set.clear(); + } + auto size() const { size_t sum = 0; for (auto &set : byhash) { @@ -95,7 +122,9 @@ class Subhashy { class Hashy { protected: - std::map> byshape; + std::map byshape; + std::filesystem::path base_path; + int N; mutable std::shared_mutex set_mutex; public: @@ -111,24 +140,41 @@ class Hashy { return out; } + explicit Hashy(std::string path = ".") : base_path(path) {} + void init(int n) { // create all subhashy which will be needed for N - std::lock_guard lock(set_mutex); - for (auto s : generateShapes(n)) byshape[s].size(); + N = n; + for (auto s : generateShapes(n)) { + initSubHashy(n, s); + } std::printf("%ld sets by shape for N=%d\n\r", byshape.size(), n); } - Subhashy<32> &at(XYZ shape) { + Subhashy &initSubHashy(int n, XYZ s) { + assert(N == n); + + auto itr = byshape.find(s); + if (itr == byshape.end()) { + auto [itr, isnew] = byshape.emplace(s, Subhashy(32, n, base_path)); + assert(isnew); + itr->second.size(); + return itr->second; + } else { + return itr->second; + } + } + + Subhashy &at(XYZ shape) { std::shared_lock lock(set_mutex); auto itr = byshape.find(shape); if (itr != byshape.end()) { return itr->second; } - lock.unlock(); - // Not sure if this is supposed to happen normally - // if init() creates all subhashys required. - std::lock_guard elock(set_mutex); - return byshape[shape]; + // should never get here... + std::printf("BUG: missing shape [%2d %2d %2d]:\n\r", shape.x(), shape.y(), shape.z()); + std::abort(); + return *((Subhashy *)0); } template diff --git a/cpp/include/newCache.hpp b/cpp/include/newCache.hpp index 1d62940..b9705ce 100644 --- a/cpp/include/newCache.hpp +++ b/cpp/include/newCache.hpp @@ -167,9 +167,8 @@ class FlatCache : public ICache { for (auto& [shape, set] : hashes) { auto begin = allXYZs.data() + allXYZs.size(); for (auto& subset : set) { - for (auto& cube : subset) - // allXYZs.emplace_back(allXYZs.end(), subset.set.begin(), subset.set.end()); - std::copy(cube.begin(), cube.end(), std::back_inserter(allXYZs)); + for (auto& cubeptr : subset) + cubeptr.copyout(subset.storage(), n, std::back_inserter(allXYZs)); } auto end = allXYZs.data() + allXYZs.size(); // std::printf(" SR %p %p\n", (void*)begin, (void*)end); diff --git a/cpp/src/cubeSwapSet.cpp b/cpp/src/cubeSwapSet.cpp new file mode 100644 index 0000000..1f391ff --- /dev/null +++ b/cpp/src/cubeSwapSet.cpp @@ -0,0 +1,121 @@ +#include "cubeSwapSet.hpp" + +#include + +std::atomic CubeStorage::m_init_num(0); + +CubeStorage::CubeStorage(std::filesystem::path path, size_t n) : m_cube_size(n) { + // Generate file name: + m_fpath = path / ("storage_" + std::to_string(m_init_num.fetch_add(1)) + ".bin"); +} + +CubeStorage::~CubeStorage() { discard(); } + +CubeStorage::CubeStorage(CubeStorage&& mv) + : m_fpath(std::move(mv.m_fpath)), m_file(std::move(mv.m_file)), m_map(std::move(mv.m_map)), m_cube_size(mv.m_cube_size), m_alloc_seek(mv.m_alloc_seek) { + // no allocations can exist in the moved from object: + assert(m_alloc_seek == 0); +} + +CubePtr CubeStorage::allocate(const Cube& cube) { + std::lock_guard lock(m_mtx); + + if (!m_file) { + using namespace mapped; + // file not open yet. + m_file = std::make_shared(); + if (m_file->openrw(m_fpath.c_str(), 0, file::CREATE | file::RESIZE | file::FSTUNE)) { + std::printf("CubeStorage::allocate() ERROR: Failed to create backing file: %s\n", m_fpath.c_str()); + std::abort(); + } + // Map some data. + // todo: mapped::file could provide following: + // m_file->readAt(offset,size,datain) + // m_file->writeAt(offset,size,dataout) + // so that we don't need this mapping for I/O. + // However the mapped::region::readAt() will be faster if + // the area fits in the region window and is accessed multiple times. + m_map = std::make_unique(m_file, 0, PAGE_SIZE); + } + + if (m_cube_size != cube.size()) { + std::printf("CubeStorage::allocate() ERROR: Cube size different than initialized"); + std::abort(); + } + + m_map->writeAt(m_alloc_seek, m_cube_size * sizeof(XYZ), cube.data()); + + auto fpos = m_alloc_seek; + m_prev_seek = m_alloc_seek; + m_alloc_seek += m_cube_size * sizeof(XYZ); + + return CubePtr(fpos); +} + +void CubeStorage::cancel_allocation() { + std::lock_guard lock(m_mtx); + // last allocation was mistake. + if (m_alloc_seek >= m_cube_size * sizeof(XYZ)) m_alloc_seek -= m_cube_size * sizeof(XYZ); + + // allocate() -> cancel_allocation() must be serialized: + assert(m_alloc_seek == m_prev_seek); +} + +Cube CubeStorage::read(const CubePtr& x) const { + // todo: How to speed up: + // Option 1: + // Memory-map the file in 2 MiB aligned chunks: + // This would speed up reading the same data multiple times. + // Chunk is mapped by rounding down the x.seek() to multiple of 2MiB + // and creating 2MiB sized mapping at that file offset. + // Caching the last file offset used we could detect + // when we have do do jump() to the next "reading window". + // -Plus: let the kernel do the caching for us. + // -Plus: no memory overhead. + // -Minus: if implemented with just single memory-map per CubeStorage + // threads can fight about what chunk is currently mapped. + // Option 2: + // Implement fine-grained read-cache with: + // std::unordered_map + // And begin evicting them once the cache is full using + // cache eviction policy. (E.g. least-recently-used LRU) + // The cache should be made to be thread local + // so it won't interfere with other workers. + // -Plus: We decide how much data to keep in memory + // -Plus: No need to remap the memory. + // -Minus: complicated to implement. + Cube tmp(m_cube_size); + m_map->readAt(x.seek(), m_cube_size * sizeof(XYZ), tmp.data()); + return tmp; +} + +void CubeStorage::discard() { + std::lock_guard lock(m_mtx); + + if (m_file) { + // avoid flushing any more data to disk: + m_map->discard(0, m_map->regionSize()); + m_map.reset(); + m_file->truncate(0); + m_file.reset(); + m_alloc_seek = 0; + + // Try remove the file created... + std::error_code ec; + auto stat = std::filesystem::status(m_fpath, ec); + if (!ec && std::filesystem::is_regular_file(stat)) { + if (!std::filesystem::remove(m_fpath, ec)) { + std::printf("WARN: failed to remove file: %s", m_fpath.c_str()); + } + } else { + std::printf("WARN: failed to get file status: %s", m_fpath.c_str()); + } + } +} + +Cube CubePtr::get(const CubeStorage& storage) const { + // CubePtr::get() is really just an convenience function... + // However this cannot be implemented in the header file because + // CubeStorage definition is not known. + return storage.read(*this); +} diff --git a/cpp/src/cubes.cpp b/cpp/src/cubes.cpp index 6b60085..89b4e12 100644 --- a/cpp/src/cubes.cpp +++ b/cpp/src/cubes.cpp @@ -2,13 +2,13 @@ #include #include +#include #include +#include #include #include #include #include -#include -#include #include "cube.hpp" #include "hashes.hpp" @@ -29,11 +29,7 @@ struct Workset { XYZ targetShape, shape, expandDim; bool notSameShape; Workset(Hashy &hashes, XYZ targetShape, XYZ shape, XYZ expandDim, bool notSameShape) - : hashes(hashes) - , targetShape(targetShape) - , shape(shape) - , expandDim(expandDim) - , notSameShape(notSameShape) {} + : hashes(hashes), targetShape(targetShape), shape(shape), expandDim(expandDim), notSameShape(notSameShape) {} void setRange(ShapeRange &data) { _begin_total = data.begin(); @@ -139,7 +135,7 @@ struct Workset { struct Worker { std::shared_ptr ws; int id; - int state = 3; // 1 == completed/waiting for job, 2 == processing, 3 == job assigned. + int state = 3; // 1 == completed/waiting for job, 2 == processing, 3 == job assigned. std::mutex mtx; std::condition_variable cond; std::condition_variable cond2; @@ -156,7 +152,7 @@ struct Worker { void launch(std::shared_ptr ws_) { std::unique_lock lock(mtx); - while(state > 1) { + while (state != 1) { cond2.wait(lock); } ws = ws_; @@ -166,7 +162,7 @@ struct Worker { void sync() { std::unique_lock lock(mtx); - while(state > 1) { + while (state != 1) { cond2.wait(lock); } ws.reset(); @@ -175,13 +171,11 @@ struct Worker { void run() { std::unique_lock lock(mtx); std::printf("thread nro. %d started.\n", id); - while(state) { + while (state) { state = 1; cond2.notify_one(); - while(state == 1) - cond.wait(lock); - if(!state) - return; + while (state == 1) cond.wait(lock); + if (!state) return; state = 2; // std::printf("start %d\n", id); auto subset = ws->getPart(); @@ -207,7 +201,7 @@ FlatCache gen(int n, int threads, bool use_cache, bool write_cache, bool split_c if (!std::filesystem::is_directory(base_path)) { std::filesystem::create_directory(base_path); } - Hashy hashes; + Hashy hashes(base_path); if (n < 1) return {}; else if (n == 1) { @@ -248,12 +242,13 @@ FlatCache gen(int n, int threads, bool use_cache, bool write_cache, bool split_c auto start = std::chrono::steady_clock::now(); uint32_t totalOutputShapes = hashes.numShapes(); uint32_t outShapeCount = 0; - auto prevShapes = Hashy::generateShapes(n - 1); - for (auto &tup : hashes) { + + for (const auto &tup : hashes) { outShapeCount++; XYZ targetShape = tup.first; std::printf("process output shape %3d/%d [%2d %2d %2d]\n\r", outShapeCount, totalOutputShapes, targetShape.x(), targetShape.y(), targetShape.z()); + for (uint32_t sid = 0; sid < prevShapes.size(); ++sid) { auto &shape = prevShapes[sid]; int diffx = targetShape.x() - shape.x(); @@ -289,7 +284,7 @@ FlatCache gen(int n, int threads, bool use_cache, bool write_cache, bool split_c ws->setRange(s); // Wait for jobs to complete. - for (auto& thr : workers) { + for (auto &thr : workers) { thr.sync(); } std::printf(" shape %d %d %d\n\r", shape.x(), shape.y(), shape.z()); @@ -297,25 +292,23 @@ FlatCache gen(int n, int threads, bool use_cache, bool write_cache, bool split_c // Because the workset is held by shared_ptr // main thread can do above preparation work in parallel // while the jobs are running. - for (auto& thr : workers) { + for (auto &thr : workers) { thr.launch(ws); } } // Wait for jobs to complete. - for (auto& thr : workers) { + for (auto &thr : workers) { thr.sync(); } std::printf(" num: %lu\n\r", hashes.at(targetShape).size()); totalSum += hashes.at(targetShape).size(); if (write_cache && split_cache) { cw.save(base_path + "cubes_" + std::to_string(n) + "_" + std::to_string(targetShape.x()) + "-" + std::to_string(targetShape.y()) + "-" + - std::to_string(targetShape.z()) + ".bin", - hashes, n); + std::to_string(targetShape.z()) + ".bin", + hashes, n); } if (split_cache) { - for (auto &subset : hashes.at(targetShape)) { - subset.clear(); - } + hashes.at(targetShape).clear(); } } diff --git a/cpp/src/newCache.cpp b/cpp/src/newCache.cpp index d54b057..cfb078a 100644 --- a/cpp/src/newCache.cpp +++ b/cpp/src/newCache.cpp @@ -1,4 +1,5 @@ #include "newCache.hpp" +#include "cubeSwapSet.hpp" #include @@ -201,11 +202,11 @@ void CacheWriter::save(std::string path, Hashy &hashes, uint8_t n) { auto xyz = std::make_shared>(file_, (*shapeEntry)[0].offset, num_cubes * n); auto put = xyz->get(); - auto copyrange = [n](CubeSet::iterator itr, CubeSet::iterator end, XYZ *dest) -> void { + auto copyrange = [n](const CubeStorage& storage, CubeSwapSet::iterator itr, CubeSwapSet::iterator end, XYZ *dest) -> void { while (itr != end) { static_assert(sizeof(XYZ) == XYZ_SIZE); - assert(itr->size() == n); - itr->copyout(n, dest); + assert(storage.cubeSize() == n); + itr->copyout(storage, n, dest); dest += n; ++itr; } @@ -233,14 +234,14 @@ void CacheWriter::save(std::string path, Hashy &hashes, uint8_t n) { std::flush(std::cout); std::lock_guard lock(m_mtx); - m_copy.emplace_back(std::bind(copyrange, start, itr, dest)); + m_copy.emplace_back(std::bind(copyrange, std::ref(subset.storage()), start, itr, dest)); ++m_num_copys; m_run.notify_all(); } // copy remainder, if any. if (dist) { std::lock_guard lock(m_mtx); - m_copy.emplace_back(std::bind(copyrange, itr, subset.end(), put)); + m_copy.emplace_back(std::bind(copyrange, std::ref(subset.storage()), itr, subset.end(), put)); ++m_num_copys; m_run.notify_all(); put += n * dist;