Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track ColumnChunk allocations through the BufferManager #3743

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ add_subdirectory(third_party)
if(${BUILD_KUZU})
add_definitions(-DKUZU_ROOT_DIRECTORY="${PROJECT_SOURCE_DIR}")
add_definitions(-DKUZU_CMAKE_VERSION="${CMAKE_PROJECT_VERSION}")
add_definitions(-DKUZU_EXTENSION_VERSION="0.5.2.2")
add_definitions(-DKUZU_EXTENSION_VERSION="0.5.2.4")
add_definitions(-DKUZU_PAGE_SIZE_LOG2=${PAGE_SIZE_LOG2})
add_definitions(-DKUZU_VECTOR_CAPACITY_LOG2=${VECTOR_CAPACITY_LOG2})

Expand Down
21 changes: 12 additions & 9 deletions src/graph/on_disk_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "common/vector/value_vector.h"
#include "graph/graph.h"
#include "main/client_context.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/storage_manager.h"
#include "storage/store/rel_table.h"

Expand All @@ -17,23 +18,25 @@
namespace kuzu {
namespace graph {

static std::unique_ptr<RelTableScanState> getRelScanState(RelDataDirection direction,
ValueVector* srcVector, ValueVector* dstVector) {
static std::unique_ptr<RelTableScanState> getRelScanState(MemoryManager& memoryManager,

Check warning on line 21 in src/graph/on_disk_graph.cpp

View check run for this annotation

Codecov / codecov/patch

src/graph/on_disk_graph.cpp#L21

Added line #L21 was not covered by tests
RelDataDirection direction, ValueVector* srcVector, ValueVector* dstVector) {
// Empty columnIDs since we do not scan any rel property.
auto columnIDs = std::vector<column_id_t>{};
auto columns = std::vector<Column*>{};
// TODO(Guodong): FIX-ME.
auto scanState =
std::make_unique<RelTableScanState>(columnIDs, columns, nullptr, nullptr, direction);
auto scanState = std::make_unique<RelTableScanState>(memoryManager, columnIDs, columns, nullptr,
nullptr, direction);

Check warning on line 28 in src/graph/on_disk_graph.cpp

View check run for this annotation

Codecov / codecov/patch

src/graph/on_disk_graph.cpp#L27-L28

Added lines #L27 - L28 were not covered by tests
scanState->boundNodeIDVector = srcVector;
scanState->outputVectors.push_back(dstVector);
return scanState;
}

OnDiskGraphScanState::OnDiskGraphScanState(ValueVector* srcNodeIDVector,
ValueVector* dstNodeIDVector) {
fwdScanState = getRelScanState(RelDataDirection::FWD, srcNodeIDVector, dstNodeIDVector);
bwdScanState = getRelScanState(RelDataDirection::BWD, srcNodeIDVector, dstNodeIDVector);
OnDiskGraphScanState::OnDiskGraphScanState(MemoryManager& memoryManager,
ValueVector* srcNodeIDVector, ValueVector* dstNodeIDVector) {

Check warning on line 35 in src/graph/on_disk_graph.cpp

View check run for this annotation

Codecov / codecov/patch

src/graph/on_disk_graph.cpp#L34-L35

Added lines #L34 - L35 were not covered by tests
fwdScanState =
getRelScanState(memoryManager, RelDataDirection::FWD, srcNodeIDVector, dstNodeIDVector);

Check warning on line 37 in src/graph/on_disk_graph.cpp

View check run for this annotation

Codecov / codecov/patch

src/graph/on_disk_graph.cpp#L37

Added line #L37 was not covered by tests
bwdScanState =
getRelScanState(memoryManager, RelDataDirection::BWD, srcNodeIDVector, dstNodeIDVector);

Check warning on line 39 in src/graph/on_disk_graph.cpp

View check run for this annotation

Codecov / codecov/patch

src/graph/on_disk_graph.cpp#L39

Added line #L39 was not covered by tests
}

OnDiskGraphScanStates::OnDiskGraphScanStates(std::span<table_id_t> tableIDs, MemoryManager* mm) {
Expand All @@ -47,7 +50,7 @@

for (auto tableID : tableIDs) {
scanStates.emplace_back(std::make_pair(tableID,
OnDiskGraphScanState(srcNodeIDVector.get(), dstNodeIDVector.get())));
OnDiskGraphScanState(*mm, srcNodeIDVector.get(), dstNodeIDVector.get())));

Check warning on line 53 in src/graph/on_disk_graph.cpp

View check run for this annotation

Codecov / codecov/patch

src/graph/on_disk_graph.cpp#L53

Added line #L53 was not covered by tests
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ struct BufferPoolConstants {
#else
static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = static_cast<uint64_t>(1) << 43; // (8TB)
#endif
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 26; // (64MB)
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 28; // (256MB)
};

struct StorageConstants {
Expand Down
12 changes: 12 additions & 0 deletions src/include/common/serializer/deserializer.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -86,6 +87,17 @@ class Deserializer {
}
}

template<typename T>
void deserializeVectorOfPtrs(std::vector<std::unique_ptr<T>>& values,
std::function<std::unique_ptr<T>(Deserializer&)> deserializeFunc) {
uint64_t vectorSize;
deserializeValue(vectorSize);
values.resize(vectorSize);
for (auto i = 0u; i < vectorSize; i++) {
values[i] = deserializeFunc(*this);
}
}

template<typename T>
void deserializeUnorderedSet(std::unordered_set<T>& values) {
uint64_t setSize;
Expand Down
8 changes: 5 additions & 3 deletions src/include/graph/on_disk_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@
#include "common/vector/value_vector.h"
#include "graph.h"
#include "graph_entry.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/store/node_table.h"
#include "storage/store/rel_table.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace graph {

struct OnDiskGraphScanState {
std::unique_ptr<storage::RelTableScanState> fwdScanState;
std::unique_ptr<storage::RelTableScanState> bwdScanState;

explicit OnDiskGraphScanState(common::ValueVector* srcNodeIDVector,
common::ValueVector* dstNodeIDVector);
explicit OnDiskGraphScanState(storage::MemoryManager& memoryManager,
common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector);
};

class OnDiskGraphScanStates : public GraphScanState {
Expand Down
13 changes: 10 additions & 3 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ namespace kuzu {
namespace storage {
class NodeTable;
class RelTable;
class MemoryManager;
} // namespace storage

namespace transaction {
class Transaction;
}
namespace processor {

using partitioner_func_t =
Expand Down Expand Up @@ -39,6 +42,10 @@ struct PartitionerSharedState {
storage::NodeTable* srcNodeTable;
storage::NodeTable* dstNodeTable;
storage::RelTable* relTable;
storage::MemoryManager& mm;

explicit PartitionerSharedState(storage::MemoryManager& mm)
: mtx{}, srcNodeTable{nullptr}, dstNodeTable{nullptr}, mm{mm} {}

// FIXME(Guodong): we should not maintain maxNodeOffsets.
std::vector<common::offset_t> maxNodeOffsets; // max node offset in each direction.
Expand Down Expand Up @@ -159,8 +166,8 @@ class Partitioner final : public Sink {
const std::shared_ptr<common::DataChunkState>& state) const;
// TODO: For now, RelBatchInsert will guarantee all data are inside one data chunk. Should be
// generalized to resultSet later if needed.
void copyDataToPartitions(common::partition_idx_t partitioningIdx,
common::DataChunk chunkToCopyFrom) const;
void copyDataToPartitions(storage::MemoryManager& memoryManager,
common::partition_idx_t partitioningIdx, common::DataChunk chunkToCopyFrom) const;

private:
PartitionerDataInfo dataInfo;
Expand Down
10 changes: 7 additions & 3 deletions src/include/processor/operator/persistent/batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include "storage/store/table.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace processor {

struct BatchInsertInfo {
Expand Down Expand Up @@ -39,17 +42,18 @@ struct BatchInsertSharedState {
storage::Table* table;
std::shared_ptr<FactorizedTable> fTable;
storage::WAL* wal;
storage::MemoryManager* mm;

BatchInsertSharedState(storage::Table* table, std::shared_ptr<FactorizedTable> fTable,
storage::WAL* wal)
storage::WAL* wal, storage::MemoryManager* mm)
: numRows{0}, numErroredRows(std::make_shared<common::row_idx_t>(0)), table{table},
fTable{std::move(fTable)}, wal{wal} {};
fTable{std::move(fTable)}, wal{wal}, mm{mm} {};
BatchInsertSharedState(const BatchInsertSharedState& other) = delete;

virtual ~BatchInsertSharedState() = default;

std::unique_ptr<BatchInsertSharedState> copy() const {
auto result = std::make_unique<BatchInsertSharedState>(table, fTable, wal);
auto result = std::make_unique<BatchInsertSharedState>(table, fTable, wal, mm);
result->numRows.store(numRows.load());
return result;
}
Expand Down
17 changes: 11 additions & 6 deletions src/include/processor/operator/persistent/node_batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include "storage/store/chunked_node_group.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace transaction {
class Transaction;
} // namespace transaction
Expand Down Expand Up @@ -71,8 +74,9 @@ struct NodeBatchInsertSharedState final : BatchInsertSharedState {
std::unique_ptr<storage::ChunkedNodeGroup> sharedNodeGroup;

NodeBatchInsertSharedState(storage::Table* table, common::column_id_t pkColumnID,
common::LogicalType pkType, std::shared_ptr<FactorizedTable> fTable, storage::WAL* wal)
: BatchInsertSharedState{table, std::move(fTable), wal}, pkColumnID{pkColumnID},
common::LogicalType pkType, std::shared_ptr<FactorizedTable> fTable, storage::WAL* wal,
storage::MemoryManager* mm)
: BatchInsertSharedState{table, std::move(fTable), wal, mm}, pkColumnID{pkColumnID},
pkType{std::move(pkType)}, readerSharedState{nullptr}, distinctSharedState{nullptr},
sharedNodeGroup{nullptr} {}

Expand Down Expand Up @@ -117,16 +121,17 @@ class NodeBatchInsert final : public BatchInsert {
// written
void writeAndResetNodeGroup(transaction::Transaction* transaction,
std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
std::optional<IndexBuilder>& indexBuilder) const;
std::optional<IndexBuilder>& indexBuilder, storage::MemoryManager* mm) const;

private:
void appendIncompleteNodeGroup(transaction::Transaction* transaction,
std::unique_ptr<storage::ChunkedNodeGroup> localNodeGroup,
std::optional<IndexBuilder>& indexBuilder) const;
void clearToIndex(std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
std::optional<IndexBuilder>& indexBuilder, storage::MemoryManager* mm) const;
void clearToIndex(storage::MemoryManager* mm,
std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
common::offset_t startIndexInGroup) const;

void copyToNodeGroup(transaction::Transaction* transaction) const;
void copyToNodeGroup(transaction::Transaction* transaction, storage::MemoryManager* mm) const;
};

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace kuzu {
namespace storage {
class CSRNodeGroup;
struct ChunkedCSRHeader;
class MemoryManager;
} // namespace storage

namespace processor {
Expand Down
5 changes: 4 additions & 1 deletion src/include/processor/operator/scan/scan_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include "storage/store/rel_table.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace processor {

struct ScanRelTableInfo {
Expand All @@ -25,7 +28,7 @@ struct ScanRelTableInfo {
columnIDs{std::move(columnIDs)}, columnPredicates{std::move(columnPredicates)} {}
EXPLICIT_COPY_DEFAULT_MOVE(ScanRelTableInfo);

void initScanState();
void initScanState(storage::MemoryManager& memoryManager);

private:
ScanRelTableInfo(const ScanRelTableInfo& other)
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/buffer_manager/buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ class EvictionQueue {
*/

class BufferManager {
friend class MemoryAllocator;
friend class FileHandle;
friend class MemoryManager;

public:
BufferManager(uint64_t bufferPoolSize, uint64_t maxDBSize);
Expand Down
67 changes: 26 additions & 41 deletions src/include/storage/buffer_manager/memory_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,80 +20,65 @@ class VirtualFileSystem;

namespace storage {

class MemoryAllocator;
class MemoryManager;
class FileHandle;
class BufferManager;

class KUZU_API MemoryBuffer {
public:
MemoryBuffer(MemoryAllocator* allocator, common::page_idx_t blockIdx, uint8_t* buffer,
MemoryBuffer(MemoryManager* mm, common::page_idx_t blockIdx, uint8_t* buffer,
uint64_t size = common::TEMP_PAGE_SIZE);
~MemoryBuffer();
DELETE_COPY_AND_MOVE(MemoryBuffer);

uint8_t* getData() const { return buffer.data(); }

public:
std::span<uint8_t> buffer;
common::page_idx_t pageIdx;
MemoryAllocator* allocator;
};

class KUZU_API MemoryAllocator {
friend class MemoryBuffer;

public:
MemoryAllocator(BufferManager* bm, common::VirtualFileSystem* vfs,
main::ClientContext* context);

~MemoryAllocator();

std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero, uint64_t size);
common::page_offset_t getPageSize() const { return pageSize; }

private:
void freeBlock(common::page_idx_t pageIdx, std::span<uint8_t> buffer);

private:
FileHandle* fh;
BufferManager* bm;
common::page_offset_t pageSize;
std::stack<common::page_idx_t> freePages;
std::mutex allocatorLock;
MemoryManager* mm;
};

/*
* The Memory Manager (MM) is used for allocating/reclaiming intermediate memory blocks.
* It can allocate a memory buffer of size TEMP_PAGE from the buffer manager backed by a
* FileHandle with temp in-mem file.
* It can allocate a memory buffer of size PAGE_256KB from the buffer manager backed by a
* BMFileHandle with temp in-mem file.
*
* Internally, MM uses a MemoryAllocator. The MemoryAllocator is holding the FileHandle backed by
* a temp in-mem file, and responsible for allocating/reclaiming memory buffers of its size class
* from the buffer manager. The MemoryAllocator keeps track of free pages in the FileHandle, so
* that it can reuse those freed pages without allocating new pages. The MemoryAllocator is
* The MemoryManager holds a BMFileHandle backed by
* a temp in-mem file, and is responsible for allocating/reclaiming memory buffers of its size class
* from the buffer manager. The MemoryManager keeps track of free pages in the BMFileHandle, so
* that it can reuse those freed pages without allocating new pages. The MemoryManager is
* thread-safe, so that multiple threads can allocate/reclaim memory blocks with the same size class
* at the same time.
*
* MM will return a MemoryBuffer to the caller, which is a wrapper of the allocated memory block,
* and it will automatically call its allocator to reclaim the memory block when it is destroyed.
*/
class KUZU_API MemoryManager {
friend class MemoryBuffer;

public:
explicit MemoryManager(BufferManager* bm, common::VirtualFileSystem* vfs,
main::ClientContext* context)
: bm{bm} {
allocator = std::make_unique<MemoryAllocator>(bm, vfs, context);
}
MemoryManager(BufferManager* bm, common::VirtualFileSystem* vfs, main::ClientContext* context);

~MemoryManager();

std::unique_ptr<MemoryBuffer> mallocBuffer(bool initializeToZero, uint64_t size);
std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero = false,
uint64_t size = common::TEMP_PAGE_SIZE) {
return allocator->allocateBuffer(initializeToZero, size);
}
uint64_t size = common::TEMP_PAGE_SIZE);
common::page_offset_t getPageSize() const { return pageSize; }

BufferManager* getBufferManager() const { return bm; }

private:
void freeBlock(common::page_idx_t pageIdx, std::span<uint8_t> buffer);

private:
FileHandle* fh;
BufferManager* bm;
std::unique_ptr<MemoryAllocator> allocator;
common::page_offset_t pageSize;
std::stack<common::page_idx_t> freePages;
std::mutex allocatorLock;
};

} // namespace storage
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/storage/local_storage/local_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace storage {

class ChunkedNodeGroup;
struct TableScanState;
class MemoryManager;

struct TableReadState;
class LocalNodeTable final : public LocalTable {
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace kuzu {
namespace storage {
class MemoryManager;

static constexpr common::column_id_t LOCAL_BOUND_NODE_ID_COLUMN_ID = 0;
static constexpr common::column_id_t LOCAL_NBR_NODE_ID_COLUMN_ID = 1;
Expand Down Expand Up @@ -64,7 +65,7 @@ class LocalRelTable final : public LocalTable {
common::column_id_t columnID);

private:
common::row_idx_t findMatchingRow(common::offset_t srcNodeOffset,
common::row_idx_t findMatchingRow(MemoryManager& memoryManager, common::offset_t srcNodeOffset,
common::offset_t dstNodeOffset, common::offset_t relOffset);

private:
Expand Down
Loading