Skip to content

Commit

Permalink
Track ColumnChunk allocations through the BufferManager
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Sep 5, 2024
1 parent d18721d commit 510fa03
Show file tree
Hide file tree
Showing 84 changed files with 875 additions and 677 deletions.
21 changes: 12 additions & 9 deletions src/graph/on_disk_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "common/vector/value_vector.h"
#include "graph/graph.h"
#include "main/client_context.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/storage_manager.h"
#include "storage/store/rel_table.h"

Expand All @@ -17,23 +18,25 @@ using namespace kuzu::common;
namespace kuzu {
namespace graph {

static std::unique_ptr<RelTableScanState> getRelScanState(RelDataDirection direction,
ValueVector* srcVector, ValueVector* dstVector) {
static std::unique_ptr<RelTableScanState> getRelScanState(MemoryManager& memoryManager,

Check warning on line 21 in src/graph/on_disk_graph.cpp

View check run for this annotation

Codecov / codecov/patch

src/graph/on_disk_graph.cpp#L21

Added line #L21 was not covered by tests
RelDataDirection direction, ValueVector* srcVector, ValueVector* dstVector) {
// Empty columnIDs since we do not scan any rel property.
auto columnIDs = std::vector<column_id_t>{};
auto columns = std::vector<Column*>{};
// TODO(Guodong): FIX-ME.
auto scanState =
std::make_unique<RelTableScanState>(columnIDs, columns, nullptr, nullptr, direction);
auto scanState = std::make_unique<RelTableScanState>(memoryManager, columnIDs, columns, nullptr,
nullptr, direction);

Check warning on line 28 in src/graph/on_disk_graph.cpp

View check run for this annotation

Codecov / codecov/patch

src/graph/on_disk_graph.cpp#L27-L28

Added lines #L27 - L28 were not covered by tests
scanState->boundNodeIDVector = srcVector;
scanState->outputVectors.push_back(dstVector);
return scanState;
}

OnDiskGraphScanState::OnDiskGraphScanState(ValueVector* srcNodeIDVector,
ValueVector* dstNodeIDVector) {
fwdScanState = getRelScanState(RelDataDirection::FWD, srcNodeIDVector, dstNodeIDVector);
bwdScanState = getRelScanState(RelDataDirection::BWD, srcNodeIDVector, dstNodeIDVector);
OnDiskGraphScanState::OnDiskGraphScanState(MemoryManager& memoryManager,
ValueVector* srcNodeIDVector, ValueVector* dstNodeIDVector) {

Check warning on line 35 in src/graph/on_disk_graph.cpp

View check run for this annotation

Codecov / codecov/patch

src/graph/on_disk_graph.cpp#L34-L35

Added lines #L34 - L35 were not covered by tests
fwdScanState =
getRelScanState(memoryManager, RelDataDirection::FWD, srcNodeIDVector, dstNodeIDVector);

Check warning on line 37 in src/graph/on_disk_graph.cpp

View check run for this annotation

Codecov / codecov/patch

src/graph/on_disk_graph.cpp#L37

Added line #L37 was not covered by tests
bwdScanState =
getRelScanState(memoryManager, RelDataDirection::BWD, srcNodeIDVector, dstNodeIDVector);

Check warning on line 39 in src/graph/on_disk_graph.cpp

View check run for this annotation

Codecov / codecov/patch

src/graph/on_disk_graph.cpp#L39

Added line #L39 was not covered by tests
}

OnDiskGraphScanStates::OnDiskGraphScanStates(std::span<table_id_t> tableIDs, MemoryManager* mm) {
Expand All @@ -47,7 +50,7 @@ OnDiskGraphScanStates::OnDiskGraphScanStates(std::span<table_id_t> tableIDs, Mem

for (auto tableID : tableIDs) {
scanStates.emplace_back(std::make_pair(tableID,
OnDiskGraphScanState(srcNodeIDVector.get(), dstNodeIDVector.get())));
OnDiskGraphScanState(*mm, srcNodeIDVector.get(), dstNodeIDVector.get())));

Check warning on line 53 in src/graph/on_disk_graph.cpp

View check run for this annotation

Codecov / codecov/patch

src/graph/on_disk_graph.cpp#L53

Added line #L53 was not covered by tests
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ struct BufferPoolConstants {
#else
static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = static_cast<uint64_t>(1) << 43; // (8TB)
#endif
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 26; // (64MB)
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 28; // (256MB)
};

struct StorageConstants {
Expand Down
12 changes: 12 additions & 0 deletions src/include/common/serializer/deserializer.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -86,6 +87,17 @@ class Deserializer {
}
}

template<typename T>
void deserializeVectorOfPtrs(std::vector<std::unique_ptr<T>>& values,
std::function<std::unique_ptr<T>(Deserializer&)> deserializeFunc) {
uint64_t vectorSize;
deserializeValue(vectorSize);
values.resize(vectorSize);
for (auto i = 0u; i < vectorSize; i++) {
values[i] = deserializeFunc(*this);
}
}

template<typename T>
void deserializeUnorderedSet(std::unordered_set<T>& values) {
uint64_t setSize;
Expand Down
8 changes: 5 additions & 3 deletions src/include/graph/on_disk_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@
#include "common/vector/value_vector.h"
#include "graph.h"
#include "graph_entry.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/store/node_table.h"
#include "storage/store/rel_table.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace graph {

struct OnDiskGraphScanState {
std::unique_ptr<storage::RelTableScanState> fwdScanState;
std::unique_ptr<storage::RelTableScanState> bwdScanState;

explicit OnDiskGraphScanState(common::ValueVector* srcNodeIDVector,
common::ValueVector* dstNodeIDVector);
explicit OnDiskGraphScanState(storage::MemoryManager& memoryManager,
common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector);
};

class OnDiskGraphScanStates : public GraphScanState {
Expand Down
13 changes: 10 additions & 3 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ namespace kuzu {
namespace storage {
class NodeTable;
class RelTable;
class MemoryManager;
} // namespace storage

namespace transaction {
class Transaction;
}
namespace processor {

using partitioner_func_t =
Expand Down Expand Up @@ -39,6 +42,10 @@ struct PartitionerSharedState {
storage::NodeTable* srcNodeTable;
storage::NodeTable* dstNodeTable;
storage::RelTable* relTable;
storage::MemoryManager& mm;

explicit PartitionerSharedState(storage::MemoryManager& mm)
: mtx{}, srcNodeTable{nullptr}, dstNodeTable{nullptr}, mm{mm} {}

// FIXME(Guodong): we should not maintain maxNodeOffsets.
std::vector<common::offset_t> maxNodeOffsets; // max node offset in each direction.
Expand Down Expand Up @@ -159,8 +166,8 @@ class Partitioner final : public Sink {
const std::shared_ptr<common::DataChunkState>& state) const;
// TODO: For now, RelBatchInsert will guarantee all data are inside one data chunk. Should be
// generalized to resultSet later if needed.
void copyDataToPartitions(common::partition_idx_t partitioningIdx,
common::DataChunk chunkToCopyFrom) const;
void copyDataToPartitions(storage::MemoryManager& memoryManager,
common::partition_idx_t partitioningIdx, common::DataChunk chunkToCopyFrom) const;

private:
PartitionerDataInfo dataInfo;
Expand Down
10 changes: 7 additions & 3 deletions src/include/processor/operator/persistent/batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include "storage/store/table.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace processor {

struct BatchInsertInfo {
Expand Down Expand Up @@ -39,17 +42,18 @@ struct BatchInsertSharedState {
storage::Table* table;
std::shared_ptr<FactorizedTable> fTable;
storage::WAL* wal;
storage::MemoryManager* mm;

BatchInsertSharedState(storage::Table* table, std::shared_ptr<FactorizedTable> fTable,
storage::WAL* wal)
storage::WAL* wal, storage::MemoryManager* mm)
: numRows{0}, numErroredRows(std::make_shared<common::row_idx_t>(0)), table{table},
fTable{std::move(fTable)}, wal{wal} {};
fTable{std::move(fTable)}, wal{wal}, mm{mm} {};
BatchInsertSharedState(const BatchInsertSharedState& other) = delete;

virtual ~BatchInsertSharedState() = default;

std::unique_ptr<BatchInsertSharedState> copy() const {
auto result = std::make_unique<BatchInsertSharedState>(table, fTable, wal);
auto result = std::make_unique<BatchInsertSharedState>(table, fTable, wal, mm);
result->numRows.store(numRows.load());
return result;
}
Expand Down
17 changes: 11 additions & 6 deletions src/include/processor/operator/persistent/node_batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include "storage/store/chunked_node_group.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace transaction {
class Transaction;
} // namespace transaction
Expand Down Expand Up @@ -71,8 +74,9 @@ struct NodeBatchInsertSharedState final : BatchInsertSharedState {
std::unique_ptr<storage::ChunkedNodeGroup> sharedNodeGroup;

NodeBatchInsertSharedState(storage::Table* table, common::column_id_t pkColumnID,
common::LogicalType pkType, std::shared_ptr<FactorizedTable> fTable, storage::WAL* wal)
: BatchInsertSharedState{table, std::move(fTable), wal}, pkColumnID{pkColumnID},
common::LogicalType pkType, std::shared_ptr<FactorizedTable> fTable, storage::WAL* wal,
storage::MemoryManager* mm)
: BatchInsertSharedState{table, std::move(fTable), wal, mm}, pkColumnID{pkColumnID},
pkType{std::move(pkType)}, readerSharedState{nullptr}, distinctSharedState{nullptr},
sharedNodeGroup{nullptr} {}

Expand Down Expand Up @@ -117,16 +121,17 @@ class NodeBatchInsert final : public BatchInsert {
// written
void writeAndResetNodeGroup(transaction::Transaction* transaction,
std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
std::optional<IndexBuilder>& indexBuilder) const;
std::optional<IndexBuilder>& indexBuilder, storage::MemoryManager* mm) const;

private:
void appendIncompleteNodeGroup(transaction::Transaction* transaction,
std::unique_ptr<storage::ChunkedNodeGroup> localNodeGroup,
std::optional<IndexBuilder>& indexBuilder) const;
void clearToIndex(std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
std::optional<IndexBuilder>& indexBuilder, storage::MemoryManager* mm) const;
void clearToIndex(storage::MemoryManager* mm,
std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
common::offset_t startIndexInGroup) const;

void copyToNodeGroup(transaction::Transaction* transaction) const;
void copyToNodeGroup(transaction::Transaction* transaction, storage::MemoryManager* mm) const;
};

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace kuzu {
namespace storage {
class CSRNodeGroup;
struct ChunkedCSRHeader;
class MemoryManager;
} // namespace storage

namespace processor {
Expand Down
5 changes: 4 additions & 1 deletion src/include/processor/operator/scan/scan_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include "storage/store/rel_table.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace processor {

struct ScanRelTableInfo {
Expand All @@ -25,7 +28,7 @@ struct ScanRelTableInfo {
columnIDs{std::move(columnIDs)}, columnPredicates{std::move(columnPredicates)} {}
EXPLICIT_COPY_DEFAULT_MOVE(ScanRelTableInfo);

void initScanState();
void initScanState(storage::MemoryManager& memoryManager);

private:
ScanRelTableInfo(const ScanRelTableInfo& other)
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/buffer_manager/buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ class EvictionQueue {
*/

class BufferManager {
friend class MemoryAllocator;
friend class FileHandle;
friend class MemoryManager;

public:
BufferManager(uint64_t bufferPoolSize, uint64_t maxDBSize);
Expand Down
68 changes: 27 additions & 41 deletions src/include/storage/buffer_manager/memory_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,77 +20,63 @@ class VirtualFileSystem;

namespace storage {

class MemoryAllocator;
class MemoryManager;
class FileHandle;
class BufferManager;

class MemoryBuffer {
public:
MemoryBuffer(MemoryAllocator* allocator, common::page_idx_t blockIdx, uint8_t* buffer,
MemoryBuffer(MemoryManager* mm, common::page_idx_t blockIdx, uint8_t* buffer,
uint64_t size = common::TEMP_PAGE_SIZE);
~MemoryBuffer();
DELETE_COPY_AND_MOVE(MemoryBuffer);

public:
std::span<uint8_t> buffer;
common::page_idx_t pageIdx;
MemoryAllocator* allocator;
};

class MemoryAllocator {
friend class MemoryBuffer;

public:
MemoryAllocator(BufferManager* bm, common::VirtualFileSystem* vfs,
main::ClientContext* context);

~MemoryAllocator();

std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero, uint64_t size);
inline common::page_offset_t getPageSize() const { return pageSize; }

private:
void freeBlock(common::page_idx_t pageIdx, std::span<uint8_t> buffer);

private:
FileHandle* fh;
BufferManager* bm;
common::page_offset_t pageSize;
std::stack<common::page_idx_t> freePages;
std::mutex allocatorLock;
MemoryManager* mm;
};

/*
* The Memory Manager (MM) is used for allocating/reclaiming intermediate memory blocks.
* It can allocate a memory buffer of size TEMP_PAGE from the buffer manager backed by a
* FileHandle with temp in-mem file.
* It can allocate a memory buffer of size PAGE_256KB from the buffer manager backed by a
* BMFileHandle with temp in-mem file.
*
* Internally, MM uses a MemoryAllocator. The MemoryAllocator is holding the FileHandle backed by
* a temp in-mem file, and responsible for allocating/reclaiming memory buffers of its size class
* from the buffer manager. The MemoryAllocator keeps track of free pages in the FileHandle, so
* that it can reuse those freed pages without allocating new pages. The MemoryAllocator is
* The MemoryManager holds a BMFileHandle backed by
* a temp in-mem file, and is responsible for allocating/reclaiming memory buffers of its size class
* from the buffer manager. The MemoryManager keeps track of free pages in the BMFileHandle, so
* that it can reuse those freed pages without allocating new pages. The MemoryManager is
* thread-safe, so that multiple threads can allocate/reclaim memory blocks with the same size class
* at the same time.
*
* MM will return a MemoryBuffer to the caller, which is a wrapper of the allocated memory block,
* and it will automatically call its allocator to reclaim the memory block when it is destroyed.
*/
class MemoryManager {
friend class MemoryBuffer;

public:
explicit MemoryManager(BufferManager* bm, common::VirtualFileSystem* vfs,
main::ClientContext* context)
: bm{bm} {
allocator = std::make_unique<MemoryAllocator>(bm, vfs, context);
}
MemoryManager(BufferManager* bm, common::VirtualFileSystem* vfs, main::ClientContext* context);

~MemoryManager();

std::unique_ptr<MemoryBuffer> mallocBuffer(bool initializeToZero, uint64_t size);
std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero = false,
uint64_t size = common::TEMP_PAGE_SIZE) {
return allocator->allocateBuffer(initializeToZero, size);
}
uint64_t size = common::TEMP_PAGE_SIZE);
inline common::page_offset_t getPageSize() const { return pageSize; }

BufferManager* getBufferManager() const { return bm; }

private:
void freeBlock(common::page_idx_t pageIdx, std::span<uint8_t> buffer);

private:
FileHandle* fh;
BufferManager* bm;
std::unique_ptr<MemoryAllocator> allocator;
common::page_offset_t pageSize;
std::stack<common::page_idx_t> freePages;
std::mutex allocatorLock;
};

} // namespace storage
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/storage/local_storage/local_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace storage {

class ChunkedNodeGroup;
struct TableScanState;
class MemoryManager;

struct TableReadState;
class LocalNodeTable final : public LocalTable {
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace kuzu {
namespace storage {
class MemoryManager;

static constexpr common::column_id_t LOCAL_BOUND_NODE_ID_COLUMN_ID = 0;
static constexpr common::column_id_t LOCAL_NBR_NODE_ID_COLUMN_ID = 1;
Expand Down Expand Up @@ -64,7 +65,7 @@ class LocalRelTable final : public LocalTable {
common::column_id_t columnID);

private:
common::row_idx_t findMatchingRow(common::offset_t srcNodeOffset,
common::row_idx_t findMatchingRow(MemoryManager& memoryManager, common::offset_t srcNodeOffset,
common::offset_t dstNodeOffset, common::offset_t relOffset);

private:
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/local_storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Transaction;
} // namespace transaction

namespace storage {
class MemoryManager;

using offset_to_row_idx_t = std::map<common::offset_t, common::row_idx_t>;
using offset_to_row_idx_vec_t = std::map<common::offset_t, std::vector<common::row_idx_t>>;
Expand Down
Loading

0 comments on commit 510fa03

Please sign in to comment.