diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 68344fe..ad05812 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -71,6 +71,7 @@ add_library(CubeObjs OBJECT "src/cubes.cpp" "src/rotations.cpp" "src/newCache.cpp" + "src/cubeSwapSet.cpp" ) ConfigureTarget(CubeObjs) diff --git a/cpp/include/cubeSwapSet.hpp b/cpp/include/cubeSwapSet.hpp new file mode 100644 index 0000000..b7a2e5f --- /dev/null +++ b/cpp/include/cubeSwapSet.hpp @@ -0,0 +1,178 @@ +#pragma once +#ifndef OPENCUBES_CUBE_DISKSWAP_SET_HPP +#define OPENCUBES_CUBE_DISKSWAP_SET_HPP + +#include +#include +#include +#include +#include +#include + +#include "cube.hpp" +#include "mapped_file.hpp" + +/** + * Implement std::unordered_set<> that stores element data in a file. + * + * Cubes stored with size N in the set have constant cost of RAM memory: + * Only the std::unordered_set<> itself and the internals nodes are stored in RAM. + * The element *data* (i.e. XYZ data) is stored in the file. + * The performance cost is that each time the element is accessed + * the data has to be read back from the file. + * (Iterating the entire CubeSwapSet involves reading the entire backing file) + * + * Clearing the CubeSwapSet does not release the backing file space managed by CubeStorage. + * Call to CubeStorage::discard() is required after clearing or destructing + * the CubeSwapSet instance to cleanup the file. + * Elements cannot be removed one-by-one. + */ +class CubeStorage; + +/** + * Overlay that reads the cube data from the backing file. + * CubePtr needs its associated CubeStorage instance to be able to + * access its contents with CubePtr::get() + * The associated CubeStorage owning the CubePtr + * should always be available where CubePtr is used. + */ +class CubePtr { + protected: + mapped::seekoff_t m_seek = 0; + + public: + explicit CubePtr(mapped::seekoff_t offset) : m_seek(offset) {} + CubePtr(const CubePtr& c) : m_seek(c.m_seek) {} + + /** + * Get the Cube pointed by this instance. + */ + Cube get(const CubeStorage& storage) const; + + template + void copyout(const CubeStorage& storage, size_t n, Itr out) const { + auto tmp = get(storage); + std::copy_n(tmp.begin(), n, out); + } + + mapped::seekoff_t seek() const { return m_seek; } +}; + +/** + * Stateful comparator for Cubeptr + */ +class CubePtrEqual { + protected: + const CubeStorage* m_storage = nullptr; + public: + // C++20 feature: + using is_transparent = void; + + CubePtrEqual(const CubeStorage* ctx) : m_storage(ctx) {} + CubePtrEqual(const CubePtrEqual& ctx) : m_storage(ctx.m_storage) {} + + bool operator()(const CubePtr& a, const CubePtr& b) const { return a.get(*m_storage) == b.get(*m_storage); } + + bool operator()(const Cube& a, const CubePtr& b) const { return a == b.get(*m_storage); } + + bool operator()(const CubePtr& a, const Cube& b) const { return a.get(*m_storage) == b; } +}; + +class CubePtrHash { + protected: + const CubeStorage* m_storage = nullptr; + public: + // C++20 feature: + using is_transparent = void; + using transparent_key_equal = CubePtrEqual; + + CubePtrHash(const CubeStorage* ctx) : m_storage(ctx) {} + CubePtrHash(const CubePtrHash& ctx) : m_storage(ctx.m_storage) {} + + size_t operator()(const Cube& x) const { + std::size_t seed = x.size(); + for (auto& p : x) { + auto x = HashXYZ()(p); + seed ^= x + 0x9e3779b9 + (seed << 6) + (seed >> 2); + } + return seed; + } + + size_t operator()(const CubePtr& x) const { + auto cube = x.get(*m_storage); + std::size_t seed = cube.size(); + for (auto& p : cube) { + auto x = HashXYZ()(p); + seed ^= x + 0x9e3779b9 + (seed << 6) + (seed >> 2); + } + return seed; + } +}; + +class CubeStorage { + protected: + std::mutex m_mtx; + std::filesystem::path m_fpath; + std::shared_ptr m_file; + std::unique_ptr m_map; + + static std::atomic m_init_num; + const size_t m_cube_size; + mapped::seekoff_t m_prev_seek = 0; + mapped::seekoff_t m_alloc_seek = 0; + + public: + /** + * Initialize Cube file storage + * @param fname directory where to store the backing file. + * @param n The storage is reserved in n sized chunks. + * This should be equal to Cube::size() that are passed into allocate() + * as no other allocation size is supported. + * @note the backing file creation is delayed until allocate() is called first time. + */ + CubeStorage(std::filesystem::path path, size_t n); + ~CubeStorage(); + + // not copyable + CubeStorage(const CubeStorage&) = delete; + CubeStorage& operator=(const CubeStorage&) = delete; + // move constructible: but only if no allocations exists + CubeStorage(CubeStorage&& mv); + CubeStorage& operator=(CubeStorage&& mv) = delete; + + size_t cubeSize() const { return m_cube_size; } + + /** + * Store Cube data into the backing file. + * Returns CubePtr that can be inserted into CubeSwapSet. + * @note cube.size() must be equal to this->cubeSize() + */ + CubePtr allocate(const Cube& cube); + + /** + * Revert the effect of last allocate() + */ + void cancel_allocation(); + + /** + * Retrieve the cube data from the backing file. + */ + Cube read(const CubePtr& x) const; + + /** + * Drop all stored data. + * Shrinks the backing file to zero size and deletes it. + */ + void discard(); +}; + +/** + * CubeStorage enabled std::unordered_set<> + * + * The CubeSwapSet must be constructed with already initialized + * stateful instances of CubePtrEqual and CubePtrHash functors + * that resolve the CubePtr instance using the CubeStorage instance. + */ +using CubeSwapSet = std::unordered_set; + +#endif \ No newline at end of file diff --git a/cpp/include/hashes.hpp b/cpp/include/hashes.hpp index 7234bd3..79bedfb 100644 --- a/cpp/include/hashes.hpp +++ b/cpp/include/hashes.hpp @@ -3,12 +3,15 @@ #define OPENCUBES_HASHES_HPP #include #include +#include +#include #include #include #include #include #include "cube.hpp" +#include "cubeSwapSet.hpp" #include "utils.hpp" struct HashCube { @@ -27,24 +30,30 @@ using CubeSet = std::unordered_set>; class Subsubhashy { protected: - CubeSet set; + CubeStorage set_storage; + CubeSwapSet set; mutable std::shared_mutex set_mutex; public: + explicit Subsubhashy(std::filesystem::path path, size_t n) : set_storage(path, n), set(1, CubePtrHash(&set_storage), CubePtrEqual(&set_storage)) {} + template void insert(CubeT &&c) { std::lock_guard lock(set_mutex); - set.emplace(std::forward(c)); + auto [itr, isnew] = set.emplace(set_storage.allocate(std::forward(c))); + if (!isnew) { + set_storage.cancel_allocation(); + } } +#if __cplusplus > 201703L +// todo: need C++17 equivalent for *generic* +// contains() or find() that accepts both Cube and CubePtr types bool contains(const Cube &c) const { std::shared_lock lock(set_mutex); - auto itr = set.find(c); - if (itr != set.end()) { - return true; - } - return false; + return set.contains(c); } +#endif auto size() const { std::shared_lock lock(set_mutex); @@ -57,27 +66,45 @@ class Subsubhashy { set.reserve(1); } + // Get CubeStorage instance. + // [this->begin(), this->end()] iterated CubePtr's + // Can be resolved with CubePtr::get(this->storage()) + // that returns copy of the data as Cube. + const CubeStorage &storage() const { return set_storage; } + auto begin() const { return set.begin(); } auto end() const { return set.end(); } auto begin() { return set.begin(); } auto end() { return set.end(); } }; -template class Subhashy { protected: - std::array byhash; + std::deque byhash; public: + Subhashy(int NUM, size_t N, std::filesystem::path path) { + for (int i = 0; i < NUM; ++i) { + byhash.emplace_back(path, N); + } + } + template void insert(CubeT &&c) { HashCube hash; - auto idx = hash(c) % NUM; + auto idx = hash(c) % byhash.size(); auto &set = byhash[idx]; - if (!set.contains(c)) set.insert(std::forward(c)); +#if __cplusplus > 201703L + if (set.contains(c)) return; +#endif + set.insert(std::forward(c)); // printf("new size %ld\n\r", byshape[shape].size()); } + void clear() { + for (auto &set : byhash) set.clear(); + } + auto size() const { size_t sum = 0; for (auto &set : byhash) { @@ -95,7 +122,9 @@ class Subhashy { class Hashy { protected: - std::map> byshape; + std::map byshape; + std::filesystem::path base_path; + int N; mutable std::shared_mutex set_mutex; public: @@ -111,24 +140,41 @@ class Hashy { return out; } + explicit Hashy(std::string path = ".") : base_path(path) {} + void init(int n) { // create all subhashy which will be needed for N - std::lock_guard lock(set_mutex); - for (auto s : generateShapes(n)) byshape[s].size(); + N = n; + for (auto s : generateShapes(n)) { + initSubHashy(n, s); + } std::printf("%ld sets by shape for N=%d\n\r", byshape.size(), n); } - Subhashy<32> &at(XYZ shape) { + Subhashy &initSubHashy(int n, XYZ s) { + assert(N == n); + + auto itr = byshape.find(s); + if (itr == byshape.end()) { + auto [itr, isnew] = byshape.emplace(s, Subhashy(32, n, base_path)); + assert(isnew); + itr->second.size(); + return itr->second; + } else { + return itr->second; + } + } + + Subhashy &at(XYZ shape) { std::shared_lock lock(set_mutex); auto itr = byshape.find(shape); if (itr != byshape.end()) { return itr->second; } - lock.unlock(); - // Not sure if this is supposed to happen normally - // if init() creates all subhashys required. - std::lock_guard elock(set_mutex); - return byshape[shape]; + // should never get here... + std::printf("BUG: missing shape [%2d %2d %2d]:\n\r", shape.x(), shape.y(), shape.z()); + std::abort(); + return *((Subhashy *)0); } template diff --git a/cpp/include/newCache.hpp b/cpp/include/newCache.hpp index 1d62940..b9705ce 100644 --- a/cpp/include/newCache.hpp +++ b/cpp/include/newCache.hpp @@ -167,9 +167,8 @@ class FlatCache : public ICache { for (auto& [shape, set] : hashes) { auto begin = allXYZs.data() + allXYZs.size(); for (auto& subset : set) { - for (auto& cube : subset) - // allXYZs.emplace_back(allXYZs.end(), subset.set.begin(), subset.set.end()); - std::copy(cube.begin(), cube.end(), std::back_inserter(allXYZs)); + for (auto& cubeptr : subset) + cubeptr.copyout(subset.storage(), n, std::back_inserter(allXYZs)); } auto end = allXYZs.data() + allXYZs.size(); // std::printf(" SR %p %p\n", (void*)begin, (void*)end); diff --git a/cpp/src/cubeSwapSet.cpp b/cpp/src/cubeSwapSet.cpp new file mode 100644 index 0000000..1f391ff --- /dev/null +++ b/cpp/src/cubeSwapSet.cpp @@ -0,0 +1,121 @@ +#include "cubeSwapSet.hpp" + +#include + +std::atomic CubeStorage::m_init_num(0); + +CubeStorage::CubeStorage(std::filesystem::path path, size_t n) : m_cube_size(n) { + // Generate file name: + m_fpath = path / ("storage_" + std::to_string(m_init_num.fetch_add(1)) + ".bin"); +} + +CubeStorage::~CubeStorage() { discard(); } + +CubeStorage::CubeStorage(CubeStorage&& mv) + : m_fpath(std::move(mv.m_fpath)), m_file(std::move(mv.m_file)), m_map(std::move(mv.m_map)), m_cube_size(mv.m_cube_size), m_alloc_seek(mv.m_alloc_seek) { + // no allocations can exist in the moved from object: + assert(m_alloc_seek == 0); +} + +CubePtr CubeStorage::allocate(const Cube& cube) { + std::lock_guard lock(m_mtx); + + if (!m_file) { + using namespace mapped; + // file not open yet. + m_file = std::make_shared(); + if (m_file->openrw(m_fpath.c_str(), 0, file::CREATE | file::RESIZE | file::FSTUNE)) { + std::printf("CubeStorage::allocate() ERROR: Failed to create backing file: %s\n", m_fpath.c_str()); + std::abort(); + } + // Map some data. + // todo: mapped::file could provide following: + // m_file->readAt(offset,size,datain) + // m_file->writeAt(offset,size,dataout) + // so that we don't need this mapping for I/O. + // However the mapped::region::readAt() will be faster if + // the area fits in the region window and is accessed multiple times. + m_map = std::make_unique(m_file, 0, PAGE_SIZE); + } + + if (m_cube_size != cube.size()) { + std::printf("CubeStorage::allocate() ERROR: Cube size different than initialized"); + std::abort(); + } + + m_map->writeAt(m_alloc_seek, m_cube_size * sizeof(XYZ), cube.data()); + + auto fpos = m_alloc_seek; + m_prev_seek = m_alloc_seek; + m_alloc_seek += m_cube_size * sizeof(XYZ); + + return CubePtr(fpos); +} + +void CubeStorage::cancel_allocation() { + std::lock_guard lock(m_mtx); + // last allocation was mistake. + if (m_alloc_seek >= m_cube_size * sizeof(XYZ)) m_alloc_seek -= m_cube_size * sizeof(XYZ); + + // allocate() -> cancel_allocation() must be serialized: + assert(m_alloc_seek == m_prev_seek); +} + +Cube CubeStorage::read(const CubePtr& x) const { + // todo: How to speed up: + // Option 1: + // Memory-map the file in 2 MiB aligned chunks: + // This would speed up reading the same data multiple times. + // Chunk is mapped by rounding down the x.seek() to multiple of 2MiB + // and creating 2MiB sized mapping at that file offset. + // Caching the last file offset used we could detect + // when we have do do jump() to the next "reading window". + // -Plus: let the kernel do the caching for us. + // -Plus: no memory overhead. + // -Minus: if implemented with just single memory-map per CubeStorage + // threads can fight about what chunk is currently mapped. + // Option 2: + // Implement fine-grained read-cache with: + // std::unordered_map + // And begin evicting them once the cache is full using + // cache eviction policy. (E.g. least-recently-used LRU) + // The cache should be made to be thread local + // so it won't interfere with other workers. + // -Plus: We decide how much data to keep in memory + // -Plus: No need to remap the memory. + // -Minus: complicated to implement. + Cube tmp(m_cube_size); + m_map->readAt(x.seek(), m_cube_size * sizeof(XYZ), tmp.data()); + return tmp; +} + +void CubeStorage::discard() { + std::lock_guard lock(m_mtx); + + if (m_file) { + // avoid flushing any more data to disk: + m_map->discard(0, m_map->regionSize()); + m_map.reset(); + m_file->truncate(0); + m_file.reset(); + m_alloc_seek = 0; + + // Try remove the file created... + std::error_code ec; + auto stat = std::filesystem::status(m_fpath, ec); + if (!ec && std::filesystem::is_regular_file(stat)) { + if (!std::filesystem::remove(m_fpath, ec)) { + std::printf("WARN: failed to remove file: %s", m_fpath.c_str()); + } + } else { + std::printf("WARN: failed to get file status: %s", m_fpath.c_str()); + } + } +} + +Cube CubePtr::get(const CubeStorage& storage) const { + // CubePtr::get() is really just an convenience function... + // However this cannot be implemented in the header file because + // CubeStorage definition is not known. + return storage.read(*this); +} diff --git a/cpp/src/cubes.cpp b/cpp/src/cubes.cpp index 6b60085..89b4e12 100644 --- a/cpp/src/cubes.cpp +++ b/cpp/src/cubes.cpp @@ -2,13 +2,13 @@ #include #include +#include #include +#include #include #include #include #include -#include -#include #include "cube.hpp" #include "hashes.hpp" @@ -29,11 +29,7 @@ struct Workset { XYZ targetShape, shape, expandDim; bool notSameShape; Workset(Hashy &hashes, XYZ targetShape, XYZ shape, XYZ expandDim, bool notSameShape) - : hashes(hashes) - , targetShape(targetShape) - , shape(shape) - , expandDim(expandDim) - , notSameShape(notSameShape) {} + : hashes(hashes), targetShape(targetShape), shape(shape), expandDim(expandDim), notSameShape(notSameShape) {} void setRange(ShapeRange &data) { _begin_total = data.begin(); @@ -139,7 +135,7 @@ struct Workset { struct Worker { std::shared_ptr ws; int id; - int state = 3; // 1 == completed/waiting for job, 2 == processing, 3 == job assigned. + int state = 3; // 1 == completed/waiting for job, 2 == processing, 3 == job assigned. std::mutex mtx; std::condition_variable cond; std::condition_variable cond2; @@ -156,7 +152,7 @@ struct Worker { void launch(std::shared_ptr ws_) { std::unique_lock lock(mtx); - while(state > 1) { + while (state != 1) { cond2.wait(lock); } ws = ws_; @@ -166,7 +162,7 @@ struct Worker { void sync() { std::unique_lock lock(mtx); - while(state > 1) { + while (state != 1) { cond2.wait(lock); } ws.reset(); @@ -175,13 +171,11 @@ struct Worker { void run() { std::unique_lock lock(mtx); std::printf("thread nro. %d started.\n", id); - while(state) { + while (state) { state = 1; cond2.notify_one(); - while(state == 1) - cond.wait(lock); - if(!state) - return; + while (state == 1) cond.wait(lock); + if (!state) return; state = 2; // std::printf("start %d\n", id); auto subset = ws->getPart(); @@ -207,7 +201,7 @@ FlatCache gen(int n, int threads, bool use_cache, bool write_cache, bool split_c if (!std::filesystem::is_directory(base_path)) { std::filesystem::create_directory(base_path); } - Hashy hashes; + Hashy hashes(base_path); if (n < 1) return {}; else if (n == 1) { @@ -248,12 +242,13 @@ FlatCache gen(int n, int threads, bool use_cache, bool write_cache, bool split_c auto start = std::chrono::steady_clock::now(); uint32_t totalOutputShapes = hashes.numShapes(); uint32_t outShapeCount = 0; - auto prevShapes = Hashy::generateShapes(n - 1); - for (auto &tup : hashes) { + + for (const auto &tup : hashes) { outShapeCount++; XYZ targetShape = tup.first; std::printf("process output shape %3d/%d [%2d %2d %2d]\n\r", outShapeCount, totalOutputShapes, targetShape.x(), targetShape.y(), targetShape.z()); + for (uint32_t sid = 0; sid < prevShapes.size(); ++sid) { auto &shape = prevShapes[sid]; int diffx = targetShape.x() - shape.x(); @@ -289,7 +284,7 @@ FlatCache gen(int n, int threads, bool use_cache, bool write_cache, bool split_c ws->setRange(s); // Wait for jobs to complete. - for (auto& thr : workers) { + for (auto &thr : workers) { thr.sync(); } std::printf(" shape %d %d %d\n\r", shape.x(), shape.y(), shape.z()); @@ -297,25 +292,23 @@ FlatCache gen(int n, int threads, bool use_cache, bool write_cache, bool split_c // Because the workset is held by shared_ptr // main thread can do above preparation work in parallel // while the jobs are running. - for (auto& thr : workers) { + for (auto &thr : workers) { thr.launch(ws); } } // Wait for jobs to complete. - for (auto& thr : workers) { + for (auto &thr : workers) { thr.sync(); } std::printf(" num: %lu\n\r", hashes.at(targetShape).size()); totalSum += hashes.at(targetShape).size(); if (write_cache && split_cache) { cw.save(base_path + "cubes_" + std::to_string(n) + "_" + std::to_string(targetShape.x()) + "-" + std::to_string(targetShape.y()) + "-" + - std::to_string(targetShape.z()) + ".bin", - hashes, n); + std::to_string(targetShape.z()) + ".bin", + hashes, n); } if (split_cache) { - for (auto &subset : hashes.at(targetShape)) { - subset.clear(); - } + hashes.at(targetShape).clear(); } } diff --git a/cpp/src/newCache.cpp b/cpp/src/newCache.cpp index d54b057..cfb078a 100644 --- a/cpp/src/newCache.cpp +++ b/cpp/src/newCache.cpp @@ -1,4 +1,5 @@ #include "newCache.hpp" +#include "cubeSwapSet.hpp" #include @@ -201,11 +202,11 @@ void CacheWriter::save(std::string path, Hashy &hashes, uint8_t n) { auto xyz = std::make_shared>(file_, (*shapeEntry)[0].offset, num_cubes * n); auto put = xyz->get(); - auto copyrange = [n](CubeSet::iterator itr, CubeSet::iterator end, XYZ *dest) -> void { + auto copyrange = [n](const CubeStorage& storage, CubeSwapSet::iterator itr, CubeSwapSet::iterator end, XYZ *dest) -> void { while (itr != end) { static_assert(sizeof(XYZ) == XYZ_SIZE); - assert(itr->size() == n); - itr->copyout(n, dest); + assert(storage.cubeSize() == n); + itr->copyout(storage, n, dest); dest += n; ++itr; } @@ -233,14 +234,14 @@ void CacheWriter::save(std::string path, Hashy &hashes, uint8_t n) { std::flush(std::cout); std::lock_guard lock(m_mtx); - m_copy.emplace_back(std::bind(copyrange, start, itr, dest)); + m_copy.emplace_back(std::bind(copyrange, std::ref(subset.storage()), start, itr, dest)); ++m_num_copys; m_run.notify_all(); } // copy remainder, if any. if (dist) { std::lock_guard lock(m_mtx); - m_copy.emplace_back(std::bind(copyrange, itr, subset.end(), put)); + m_copy.emplace_back(std::bind(copyrange, std::ref(subset.storage()), itr, subset.end(), put)); ++m_num_copys; m_run.notify_all(); put += n * dist;