From e376757be5ec77e6c4dc94edc358f07dab92f957 Mon Sep 17 00:00:00 2001 From: Andrew Bell Date: Tue, 17 Nov 2020 10:19:06 -0500 Subject: [PATCH] Working code. --- epf/Cell.cpp | 6 ++---- epf/Epf.cpp | 13 ++++++------- epf/FileProcessor.cpp | 2 +- epf/FileProcessor.hpp | 2 +- epf/Writer.cpp | 3 +++ 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/epf/Cell.cpp b/epf/Cell.cpp index fa5a8cc..400493a 100644 --- a/epf/Cell.cpp +++ b/epf/Cell.cpp @@ -24,9 +24,7 @@ void Cell::initialize() m_buf = m_writer->bufferCache().fetch(); m_pos = m_buf->data(); - // The end position is one less than the buffer size to allow for - // speculative writes into the cell. See FileProcessor for details. - m_endPos = m_pos + m_pointSize * ((BufSize / m_pointSize) - 1); + m_endPos = m_pos + m_pointSize * (BufSize / m_pointSize); } void Cell::write() @@ -43,7 +41,7 @@ void Cell::write() void Cell::advance() { m_pos += m_pointSize; - if (m_pos > m_endPos) + if (m_pos >= m_endPos) { write(); initialize(); diff --git a/epf/Epf.cpp b/epf/Epf.cpp index 298d4f7..8ad9a05 100644 --- a/epf/Epf.cpp +++ b/epf/Epf.cpp @@ -155,8 +155,6 @@ void Epf::run(const std::vector& options) // Make a writer with 4 threads. m_writer.reset(new Writer(m_outputDir, 4)); - std::vector> processors; - // Sort file infos so the largest files come first. This helps to make sure we don't delay // processing big files that take the longest (use threads more efficiently). std::sort(fileInfos.begin(), fileInfos.end(), [](const FileInfo& f1, const FileInfo& f2) @@ -164,11 +162,12 @@ void Epf::run(const std::vector& options) // Add the files to the processing pool for (const FileInfo& fi : fileInfos) { - std::unique_ptr fp( - new FileProcessor(fi, layout->pointSize(), m_grid, m_writer.get())); - std::function processor = std::bind(&FileProcessor::operator(), fp.get()); - m_pool.add(processor); - processors.push_back(std::move(fp)); + int pointSize = layout->pointSize(); + m_pool.add([&fi, pointSize, this]() + { + FileProcessor fp(fi, pointSize, m_grid, m_writer.get()); + fp.run(); + }); } // Wait for all the processors to finish and restart. diff --git a/epf/FileProcessor.cpp b/epf/FileProcessor.cpp index d045301..ac720c9 100644 --- a/epf/FileProcessor.cpp +++ b/epf/FileProcessor.cpp @@ -30,7 +30,7 @@ FileProcessor::FileProcessor(const FileInfo& fi, size_t pointSize, const Grid& g m_fi(fi), m_cellMgr(pointSize, writer), m_grid(grid), m_cnt(++m_totalCnt) {} -void FileProcessor::operator()() +void FileProcessor::run() { Options opts; opts.add("filename", m_fi.filename); diff --git a/epf/FileProcessor.hpp b/epf/FileProcessor.hpp index a25b714..266a8c6 100644 --- a/epf/FileProcessor.hpp +++ b/epf/FileProcessor.hpp @@ -29,7 +29,7 @@ class FileProcessor FileProcessor(const FileInfo& fi, size_t pointSize, const Grid& grid, Writer *writer); Cell *getCell(const VoxelKey& key); - void operator()(); + void run(); private: FileInfo m_fi; diff --git a/epf/Writer.cpp b/epf/Writer.cpp index a96e0c3..a109420 100644 --- a/epf/Writer.cpp +++ b/epf/Writer.cpp @@ -114,6 +114,9 @@ void Writer::run() // Remove the key from the active key list. std::ofstream out(path(wd.key), std::ios::app | std::ios::binary); out.write(reinterpret_cast(wd.data->data()), wd.data->size()); + out.close(); + if (!out) + throw Error("Failure writing to '" + path(wd.key) + "'."); m_bufferCache.replace(std::move(wd.data)); std::lock_guard lock(m_mutex);