Skip to content

Commit

Permalink
Track ColumnChunk allocations through the BufferManager
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Aug 21, 2024
1 parent ec2419f commit ecae6e3
Show file tree
Hide file tree
Showing 71 changed files with 802 additions and 605 deletions.
21 changes: 12 additions & 9 deletions src/graph/on_disk_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "common/vector/value_vector.h"
#include "graph/graph.h"
#include "main/client_context.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/storage_manager.h"
#include "storage/store/rel_table.h"
// #include "storage/store/rel_table_data.h"
Expand All @@ -20,23 +21,25 @@ using namespace kuzu::common;
namespace kuzu {
namespace graph {

static std::unique_ptr<RelTableScanState> getRelScanState(RelDataDirection direction,
ValueVector* srcVector, ValueVector* dstVector) {
static std::unique_ptr<RelTableScanState> getRelScanState(MemoryManager& memoryManager,
RelDataDirection direction, ValueVector* srcVector, ValueVector* dstVector) {
// Empty columnIDs since we do not scan any rel property.
auto columnIDs = std::vector<column_id_t>{};
auto columns = std::vector<Column*>{};
// TODO(Guodong): FIX-ME.
auto scanState =
std::make_unique<RelTableScanState>(columnIDs, columns, nullptr, nullptr, direction);
auto scanState = std::make_unique<RelTableScanState>(memoryManager, columnIDs, columns, nullptr,
nullptr, direction);
scanState->boundNodeIDVector = srcVector;
scanState->outputVectors.push_back(dstVector);
return scanState;
}

OnDiskGraphScanState::OnDiskGraphScanState(ValueVector* srcNodeIDVector,
ValueVector* dstNodeIDVector) {
fwdScanState = getRelScanState(RelDataDirection::FWD, srcNodeIDVector, dstNodeIDVector);
bwdScanState = getRelScanState(RelDataDirection::BWD, srcNodeIDVector, dstNodeIDVector);
OnDiskGraphScanState::OnDiskGraphScanState(MemoryManager& memoryManager,
ValueVector* srcNodeIDVector, ValueVector* dstNodeIDVector) {
fwdScanState =
getRelScanState(memoryManager, RelDataDirection::FWD, srcNodeIDVector, dstNodeIDVector);
bwdScanState =
getRelScanState(memoryManager, RelDataDirection::BWD, srcNodeIDVector, dstNodeIDVector);
}

OnDiskGraphScanStates::OnDiskGraphScanStates(std::span<table_id_t> tableIDs, MemoryManager* mm) {
Expand All @@ -50,7 +53,7 @@ OnDiskGraphScanStates::OnDiskGraphScanStates(std::span<table_id_t> tableIDs, Mem

for (auto tableID : tableIDs) {
scanStates.emplace_back(std::make_pair(tableID,
OnDiskGraphScanState(srcNodeIDVector.get(), dstNodeIDVector.get())));
OnDiskGraphScanState(*mm, srcNodeIDVector.get(), dstNodeIDVector.get())));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ struct BufferPoolConstants {
static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = static_cast<uint64_t>(1) << 43; // (8TB)
#endif

static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 26; // (64MB)
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 28; // (256MB)
};

struct StorageConstants {
Expand Down
12 changes: 12 additions & 0 deletions src/include/common/serializer/deserializer.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -86,6 +87,17 @@ class Deserializer {
}
}

template<typename T>
void deserializeVectorOfPtrs(std::vector<std::unique_ptr<T>>& values,
std::function<std::unique_ptr<T>(Deserializer&)> deserializeFunc) {
uint64_t vectorSize;
deserializeValue(vectorSize);
values.resize(vectorSize);
for (auto i = 0u; i < vectorSize; i++) {
values[i] = deserializeFunc(*this);
}
}

template<typename T>
void deserializeUnorderedSet(std::unordered_set<T>& values) {
uint64_t setSize;
Expand Down
8 changes: 5 additions & 3 deletions src/include/graph/on_disk_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@
#include "common/vector/value_vector.h"
#include "graph.h"
#include "graph_entry.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/store/node_table.h"
#include "storage/store/rel_table.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace graph {

struct OnDiskGraphScanState {
std::unique_ptr<storage::RelTableScanState> fwdScanState;
std::unique_ptr<storage::RelTableScanState> bwdScanState;

explicit OnDiskGraphScanState(common::ValueVector* srcNodeIDVector,
common::ValueVector* dstNodeIDVector);
explicit OnDiskGraphScanState(storage::MemoryManager& memoryManager,
common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector);
};

class OnDiskGraphScanStates : public GraphScanState {
Expand Down
13 changes: 10 additions & 3 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ namespace kuzu {
namespace storage {
class NodeTable;
class RelTable;
class MemoryManager;
} // namespace storage

namespace transaction {
class Transaction;
}
namespace processor {

using partitioner_func_t =
Expand Down Expand Up @@ -39,6 +42,10 @@ struct PartitionerSharedState {
storage::NodeTable* srcNodeTable;
storage::NodeTable* dstNodeTable;
storage::RelTable* relTable;
storage::MemoryManager& mm;

explicit PartitionerSharedState(storage::MemoryManager& mm)
: mtx{}, srcNodeTable{nullptr}, dstNodeTable{nullptr}, mm{mm} {}

// FIXME(Guodong): we should not maintain maxNodeOffsets.
std::vector<common::offset_t> maxNodeOffsets; // max node offset in each direction.
Expand Down Expand Up @@ -159,8 +166,8 @@ class Partitioner final : public Sink {
const std::shared_ptr<common::DataChunkState>& state) const;
// TODO: For now, RelBatchInsert will guarantee all data are inside one data chunk. Should be
// generalized to resultSet later if needed.
void copyDataToPartitions(common::partition_idx_t partitioningIdx,
common::DataChunk chunkToCopyFrom) const;
void copyDataToPartitions(storage::MemoryManager& memoryManager,
common::partition_idx_t partitioningIdx, common::DataChunk chunkToCopyFrom) const;

private:
PartitionerDataInfo dataInfo;
Expand Down
10 changes: 7 additions & 3 deletions src/include/processor/operator/persistent/batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include "storage/store/table.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace processor {

struct BatchInsertInfo {
Expand All @@ -30,16 +33,17 @@ struct BatchInsertSharedState {
storage::Table* table;
std::shared_ptr<FactorizedTable> fTable;
storage::WAL* wal;
storage::MemoryManager* mm;

BatchInsertSharedState(storage::Table* table, std::shared_ptr<FactorizedTable> fTable,
storage::WAL* wal)
: numRows{0}, table{table}, fTable{std::move(fTable)}, wal{wal} {};
storage::WAL* wal, storage::MemoryManager* mm)
: numRows{0}, table{table}, fTable{std::move(fTable)}, wal{wal}, mm{mm} {};
BatchInsertSharedState(const BatchInsertSharedState& other) = delete;

virtual ~BatchInsertSharedState() = default;

std::unique_ptr<BatchInsertSharedState> copy() const {
auto result = std::make_unique<BatchInsertSharedState>(table, fTable, wal);
auto result = std::make_unique<BatchInsertSharedState>(table, fTable, wal, mm);
result->numRows.store(numRows.load());
return result;
}
Expand Down
19 changes: 13 additions & 6 deletions src/include/processor/operator/persistent/node_batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@
#include "processor/operator/persistent/batch_insert.h"
#include "processor/operator/persistent/index_builder.h"
#include "processor/operator/table_function_call.h"
#include "processor/operator/transaction.h"
#include "storage/store/chunked_node_group.h"
#include "storage/store/node_table.h"
#include "transaction/transaction.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace transaction {
class Transaction;
} // namespace transaction
Expand Down Expand Up @@ -72,8 +77,9 @@ struct NodeBatchInsertSharedState final : BatchInsertSharedState {
std::unique_ptr<storage::ChunkedNodeGroup> sharedNodeGroup;

NodeBatchInsertSharedState(storage::Table* table, common::column_id_t pkColumnID,
common::LogicalType pkType, std::shared_ptr<FactorizedTable> fTable, storage::WAL* wal)
: BatchInsertSharedState{table, std::move(fTable), wal}, pkColumnID{pkColumnID},
common::LogicalType pkType, std::shared_ptr<FactorizedTable> fTable, storage::WAL* wal,
storage::MemoryManager* mm)
: BatchInsertSharedState{table, std::move(fTable), wal, mm}, pkColumnID{pkColumnID},
pkType{std::move(pkType)}, readerSharedState{nullptr}, distinctSharedState{nullptr},
sharedNodeGroup{nullptr} {}

Expand Down Expand Up @@ -116,16 +122,17 @@ class NodeBatchInsert final : public BatchInsert {
// written
void writeAndResetNodeGroup(transaction::Transaction* transaction,
std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
std::optional<IndexBuilder>& indexBuilder) const;
std::optional<IndexBuilder>& indexBuilder, storage::MemoryManager* mm) const;

private:
void appendIncompleteNodeGroup(transaction::Transaction* transaction,
std::unique_ptr<storage::ChunkedNodeGroup> localNodeGroup,
std::optional<IndexBuilder>& indexBuilder) const;
void clearToIndex(std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
std::optional<IndexBuilder>& indexBuilder, storage::MemoryManager* mm) const;
void clearToIndex(storage::MemoryManager* mm,
std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
common::offset_t startIndexInGroup) const;

void copyToNodeGroup(transaction::Transaction* transaction) const;
void copyToNodeGroup(transaction::Transaction* transaction, storage::MemoryManager* mm) const;
};

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace kuzu {
namespace storage {
class CSRNodeGroup;
struct ChunkedCSRHeader;
class MemoryManager;
} // namespace storage

namespace processor {
Expand Down
5 changes: 4 additions & 1 deletion src/include/processor/operator/scan/scan_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include "storage/store/rel_table.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace processor {

struct ScanRelTableInfo {
Expand All @@ -25,7 +28,7 @@ struct ScanRelTableInfo {
columnIDs{std::move(columnIDs)}, columnPredicates{std::move(columnPredicates)} {}
EXPLICIT_COPY_DEFAULT_MOVE(ScanRelTableInfo);

void initScanState();
void initScanState(storage::MemoryManager& memoryManager);

private:
ScanRelTableInfo(const ScanRelTableInfo& other)
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/buffer_manager/buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ class EvictionQueue {
*/

class BufferManager {
friend class MemoryAllocator;
friend class BMFileHandle;
friend class MemoryManager;

public:
BufferManager(uint64_t bufferPoolSize, uint64_t maxDBSize);
Expand Down
64 changes: 25 additions & 39 deletions src/include/storage/buffer_manager/memory_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,77 +20,63 @@ class VirtualFileSystem;

namespace storage {

class MemoryAllocator;
class MemoryManager;
class BMFileHandle;
class BufferManager;

class MemoryBuffer {
public:
MemoryBuffer(MemoryAllocator* allocator, common::page_idx_t blockIdx, uint8_t* buffer,
MemoryBuffer(MemoryManager* mm, common::page_idx_t blockIdx, uint8_t* buffer,
uint64_t size = common::BufferPoolConstants::PAGE_256KB_SIZE);
~MemoryBuffer();
DELETE_COPY_AND_MOVE(MemoryBuffer);

public:
std::span<uint8_t> buffer;
common::page_idx_t pageIdx;
MemoryAllocator* allocator;
};

class MemoryAllocator {
friend class MemoryBuffer;

public:
MemoryAllocator(BufferManager* bm, common::VirtualFileSystem* vfs,
main::ClientContext* context);

~MemoryAllocator();

std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero, uint64_t size);
inline common::page_offset_t getPageSize() const { return pageSize; }

private:
void freeBlock(common::page_idx_t pageIdx, std::span<uint8_t> buffer);

private:
BMFileHandle* fh;
BufferManager* bm;
common::page_offset_t pageSize;
std::stack<common::page_idx_t> freePages;
std::mutex allocatorLock;
MemoryManager* mm;
};

/*
* The Memory Manager (MM) is used for allocating/reclaiming intermediate memory blocks.
* It can allocate a memory buffer of size PAGE_256KB from the buffer manager backed by a
* BMFileHandle with temp in-mem file.
*
* Internally, MM uses a MemoryAllocator. The MemoryAllocator is holding the BMFileHandle backed by
* a temp in-mem file, and responsible for allocating/reclaiming memory buffers of its size class
* from the buffer manager. The MemoryAllocator keeps track of free pages in the BMFileHandle, so
* that it can reuse those freed pages without allocating new pages. The MemoryAllocator is
* The MemoryManager holds a BMFileHandle backed by
* a temp in-mem file, and is responsible for allocating/reclaiming memory buffers of its size class
* from the buffer manager. The MemoryManager keeps track of free pages in the BMFileHandle, so
* that it can reuse those freed pages without allocating new pages. The MemoryManager is
* thread-safe, so that multiple threads can allocate/reclaim memory blocks with the same size class
* at the same time.
*
* MM will return a MemoryBuffer to the caller, which is a wrapper of the allocated memory block,
* and it will automatically call its allocator to reclaim the memory block when it is destroyed.
*/
class MemoryManager {
friend class MemoryBuffer;

public:
explicit MemoryManager(BufferManager* bm, common::VirtualFileSystem* vfs,
main::ClientContext* context)
: bm{bm} {
allocator = std::make_unique<MemoryAllocator>(bm, vfs, context);
}
MemoryManager(BufferManager* bm, common::VirtualFileSystem* vfs, main::ClientContext* context);

~MemoryManager();

std::unique_ptr<MemoryBuffer> mallocBuffer(bool initializeToZero, uint64_t size);
std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero = false,
uint64_t size = common::BufferPoolConstants::PAGE_256KB_SIZE) {
return allocator->allocateBuffer(initializeToZero, size);
}
uint64_t size = common::BufferPoolConstants::PAGE_256KB_SIZE);
inline common::page_offset_t getPageSize() const { return pageSize; }

BufferManager* getBufferManager() const { return bm; }

private:
void freeBlock(common::page_idx_t pageIdx, std::span<uint8_t> buffer);

private:
BMFileHandle* fh;
BufferManager* bm;
std::unique_ptr<MemoryAllocator> allocator;
common::page_offset_t pageSize;
std::stack<common::page_idx_t> freePages;
std::mutex allocatorLock;
};

} // namespace storage
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/storage/local_storage/local_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace storage {

class ChunkedNodeGroup;
struct TableScanState;
class MemoryManager;

struct TableReadState;
class LocalNodeTable final : public LocalTable {
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace kuzu {
namespace storage {
class MemoryManager;

static constexpr common::column_id_t LOCAL_BOUND_NODE_ID_COLUMN_ID = 0;
static constexpr common::column_id_t LOCAL_NBR_NODE_ID_COLUMN_ID = 1;
Expand Down Expand Up @@ -64,7 +65,7 @@ class LocalRelTable final : public LocalTable {
common::column_id_t columnID);

private:
common::row_idx_t findMatchingRow(common::offset_t srcNodeOffset,
common::row_idx_t findMatchingRow(MemoryManager& memoryManager, common::offset_t srcNodeOffset,
common::offset_t dstNodeOffset, common::offset_t relOffset);

private:
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/local_storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Transaction;
} // namespace transaction

namespace storage {
class MemoryManager;

using offset_to_row_idx_t = std::map<common::offset_t, common::row_idx_t>;
using offset_to_row_idx_vec_t = std::map<common::offset_t, std::vector<common::row_idx_t>>;
Expand Down
Loading

0 comments on commit ecae6e3

Please sign in to comment.