Skip to content

Commit

Permalink
Merge pull request #3082 from kuzudb/fix-node-insert
Browse files Browse the repository at this point in the history
Fix node insert
  • Loading branch information
ray6080 authored Mar 19, 2024
2 parents c3decc2 + c39704d commit 8b2c768
Show file tree
Hide file tree
Showing 9 changed files with 21,224 additions and 40 deletions.
12 changes: 6 additions & 6 deletions src/binder/bind/ddl/bind_create_rdf_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ BoundCreateTableInfo Binder::bindCreateRdfGraphInfo(const CreateTableInfo* info)
std::vector<PropertyInfo> resourceTripleProperties;
resourceTripleProperties.emplace_back(InternalKeyword::ID, *LogicalType::INTERNAL_ID());
resourceTripleProperties.emplace_back(std::string(rdf::PID), *LogicalType::INTERNAL_ID());
auto boundResourceTripleExtraInfo = std::make_unique<BoundExtraCreateRelTableInfo>(
common::RelMultiplicity::MANY, common::RelMultiplicity::MANY, INVALID_TABLE_ID,
INVALID_TABLE_ID, std::move(resourceTripleProperties));
auto boundResourceTripleExtraInfo =
std::make_unique<BoundExtraCreateRelTableInfo>(RelMultiplicity::MANY, RelMultiplicity::MANY,
INVALID_TABLE_ID, INVALID_TABLE_ID, std::move(resourceTripleProperties));
auto boundResourceTripleCreateInfo = BoundCreateTableInfo(
TableType::REL, resourceTripleTableName, std::move(boundResourceTripleExtraInfo));
// Literal triple table.
auto literalTripleTableName = RDFGraphCatalogEntry::getLiteralTripleTableName(rdfGraphName);
std::vector<PropertyInfo> literalTripleProperties;
literalTripleProperties.emplace_back(InternalKeyword::ID, *LogicalType::INTERNAL_ID());
literalTripleProperties.emplace_back(std::string(rdf::PID), *LogicalType::INTERNAL_ID());
auto boundLiteralTripleExtraInfo = std::make_unique<BoundExtraCreateRelTableInfo>(
common::RelMultiplicity::MANY, common::RelMultiplicity::MANY, INVALID_TABLE_ID,
INVALID_TABLE_ID, std::move(literalTripleProperties));
auto boundLiteralTripleExtraInfo =
std::make_unique<BoundExtraCreateRelTableInfo>(RelMultiplicity::MANY, RelMultiplicity::MANY,
INVALID_TABLE_ID, INVALID_TABLE_ID, std::move(literalTripleProperties));
auto boundLiteralTripleCreateInfo = BoundCreateTableInfo(
TableType::REL, literalTripleTableName, std::move(boundLiteralTripleExtraInfo));
// Rdf table.
Expand Down
19 changes: 9 additions & 10 deletions src/catalog/catalog_entry/rel_table_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ namespace kuzu {
namespace catalog {

RelTableCatalogEntry::RelTableCatalogEntry(std::string name, table_id_t tableID,
common::RelMultiplicity srcMultiplicity, common::RelMultiplicity dstMultiplicity,
table_id_t srcTableID, table_id_t dstTableID)
RelMultiplicity srcMultiplicity, RelMultiplicity dstMultiplicity, table_id_t srcTableID,
table_id_t dstTableID)
: TableCatalogEntry{CatalogEntryType::REL_TABLE_ENTRY, std::move(name), tableID},
srcMultiplicity{srcMultiplicity}, dstMultiplicity{dstMultiplicity}, srcTableID{srcTableID},
dstTableID{dstTableID} {}
Expand All @@ -30,14 +30,13 @@ column_id_t RelTableCatalogEntry::getColumnID(property_id_t propertyID) const {
auto it = std::find_if(properties.begin(), properties.end(),
[&propertyID](const auto& property) { return property.getPropertyID() == propertyID; });
// Skip the first column in the rel table, which is reserved for nbrID.
return it == properties.end() ? common::INVALID_COLUMN_ID :
std::distance(properties.begin(), it) + 1;
return it == properties.end() ? INVALID_COLUMN_ID : std::distance(properties.begin(), it) + 1;
}

bool RelTableCatalogEntry::isSingleMultiplicity(RelDataDirection direction) const {
return getMultiplicity(direction) == common::RelMultiplicity::ONE;
return getMultiplicity(direction) == RelMultiplicity::ONE;
}
common::RelMultiplicity RelTableCatalogEntry::getMultiplicity(RelDataDirection direction) const {
RelMultiplicity RelTableCatalogEntry::getMultiplicity(RelDataDirection direction) const {
return direction == RelDataDirection::FWD ? dstMultiplicity : srcMultiplicity;
}
table_id_t RelTableCatalogEntry::getBoundTableID(RelDataDirection relDirection) const {
Expand All @@ -57,8 +56,8 @@ void RelTableCatalogEntry::serialize(Serializer& serializer) const {

std::unique_ptr<RelTableCatalogEntry> RelTableCatalogEntry::deserialize(
Deserializer& deserializer) {
common::RelMultiplicity srcMultiplicity;
common::RelMultiplicity dstMultiplicity;
RelMultiplicity srcMultiplicity;
RelMultiplicity dstMultiplicity;
table_id_t srcTableID;
table_id_t dstTableID;
deserializer.deserializeValue(srcMultiplicity);
Expand All @@ -85,8 +84,8 @@ std::string RelTableCatalogEntry::toCypher(main::ClientContext* clientContext) c
ss << "CREATE REL TABLE " << getName() << "( FROM " << srcTableName << " TO " << dstTableName
<< ", ";
Property::toCypher(getPropertiesRef(), ss);
auto srcMultiStr = srcMultiplicity == common::RelMultiplicity::MANY ? "MANY" : "ONE";
auto dstMultiStr = dstMultiplicity == common::RelMultiplicity::MANY ? "MANY" : "ONE";
auto srcMultiStr = srcMultiplicity == RelMultiplicity::MANY ? "MANY" : "ONE";
auto dstMultiStr = dstMultiplicity == RelMultiplicity::MANY ? "MANY" : "ONE";
ss << srcMultiStr << "_" << dstMultiStr << ");";
return ss.str();
}
Expand Down
2 changes: 1 addition & 1 deletion src/include/binder/ddl/bound_create_table_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "common/types/types.h"

namespace kuzu {
namespace catalog {
namespace common {
enum class RelMultiplicity : uint8_t;
}
namespace binder {
Expand Down
5 changes: 4 additions & 1 deletion src/include/storage/buffer_manager/buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,10 @@ class BufferManager {
BMFileHandle* fileHandle, common::page_idx_t pageIdx, PageState* pageState);

inline uint64_t reserveUsedMemory(uint64_t size) { return usedMemory.fetch_add(size); }
inline uint64_t freeUsedMemory(uint64_t size) { return usedMemory.fetch_sub(size); }
inline uint64_t freeUsedMemory(uint64_t size) {
KU_ASSERT(usedMemory.load() >= size);
return usedMemory.fetch_sub(size);
}

inline uint8_t* getFrame(BMFileHandle& fileHandle, common::page_idx_t pageIdx) {
return vmRegions[fileHandle.getPageSizeClass()]->getFrame(fileHandle.getFrameIdx(pageIdx));
Expand Down
6 changes: 5 additions & 1 deletion src/storage/buffer_manager/buffer_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ void BufferManager::removeFilePagesFromFrames(BMFileHandle& fileHandle) {

void BufferManager::flushAllDirtyPagesInFrames(BMFileHandle& fileHandle) {
for (auto pageIdx = 0u; pageIdx < fileHandle.getNumPages(); ++pageIdx) {
removePageFromFrame(fileHandle, pageIdx, true /* flush */);
flushIfDirtyWithoutLock(fileHandle, pageIdx);
}
}

Expand All @@ -357,11 +357,15 @@ void BufferManager::removePageFromFrameIfNecessary(BMFileHandle& fileHandle, pag
void BufferManager::removePageFromFrame(
BMFileHandle& fileHandle, page_idx_t pageIdx, bool shouldFlush) {
auto pageState = fileHandle.getPageState(pageIdx);
if (PageState::getState(pageState->getStateAndVersion()) == PageState::EVICTED) {
return;
}
pageState->spinLock(pageState->getStateAndVersion());
if (shouldFlush) {
flushIfDirtyWithoutLock(fileHandle, pageIdx);
}
releaseFrameForPage(fileHandle, pageIdx);
freeUsedMemory(fileHandle.getPageSize());
pageState->resetToEvicted();
}

Expand Down
37 changes: 22 additions & 15 deletions src/storage/storage_structure/db_file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,29 @@ WALPageIdxAndFrame createWALVersionIfNecessaryAndPinPage(page_idx_t originalPage
page_idx_t pageIdxInWAL;
uint8_t* walFrame;
fileHandle.acquireWALPageIdxLock(originalPageIdx);
if (fileHandle.hasWALPageVersionNoWALPageIdxLock(originalPageIdx)) {
pageIdxInWAL = fileHandle.getWALPageIdxNoWALPageIdxLock(originalPageIdx);
walFrame = bufferManager.pin(
*wal.fileHandle, pageIdxInWAL, BufferManager::PageReadPolicy::READ_PAGE);
} else {
pageIdxInWAL =
wal.logPageUpdateRecord(dbFileID, originalPageIdx /* pageIdxInOriginalFile */);
walFrame = bufferManager.pin(
*wal.fileHandle, pageIdxInWAL, BufferManager::PageReadPolicy::DONT_READ_PAGE);
if (!insertingNewPage) {
bufferManager.optimisticRead(fileHandle, originalPageIdx, [&](uint8_t* frame) -> void {
memcpy(walFrame, frame, BufferPoolConstants::PAGE_4KB_SIZE);
});
try {
if (fileHandle.hasWALPageVersionNoWALPageIdxLock(originalPageIdx)) {
pageIdxInWAL = fileHandle.getWALPageIdxNoWALPageIdxLock(originalPageIdx);
walFrame = bufferManager.pin(
*wal.fileHandle, pageIdxInWAL, BufferManager::PageReadPolicy::READ_PAGE);
} else {
pageIdxInWAL =
wal.logPageUpdateRecord(dbFileID, originalPageIdx /* pageIdxInOriginalFile */);
walFrame = bufferManager.pin(
*wal.fileHandle, pageIdxInWAL, BufferManager::PageReadPolicy::DONT_READ_PAGE);
if (!insertingNewPage) {
bufferManager.optimisticRead(
fileHandle, originalPageIdx, [&](uint8_t* frame) -> void {
memcpy(walFrame, frame, BufferPoolConstants::PAGE_4KB_SIZE);
});
}
fileHandle.setWALPageIdxNoLock(
originalPageIdx /* pageIdxInOriginalFile */, pageIdxInWAL);
wal.fileHandle->setLockedPageDirty(pageIdxInWAL);
}
fileHandle.setWALPageIdxNoLock(originalPageIdx /* pageIdxInOriginalFile */, pageIdxInWAL);
wal.fileHandle->setLockedPageDirty(pageIdxInWAL);
} catch (Exception& e) {
fileHandle.releaseWALPageIdxLock(originalPageIdx);
throw;
}
return {originalPageIdx, pageIdxInWAL, walFrame};
}
Expand Down
12 changes: 6 additions & 6 deletions src/storage/store/rel_table_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ bool RelDataReadState::trySwitchToLocalStorage() {
return false;
}

bool RelDataReadState::hasMoreToRead(transaction::Transaction* transaction) {
bool RelDataReadState::hasMoreToRead(Transaction* transaction) {
if (transaction->isWriteTransaction()) {
if (readFromLocalStorage) {
// Already read from local storage. Check if there are more in local storage.
Expand Down Expand Up @@ -250,18 +250,18 @@ void RelTableData::lookup(Transaction* /*transaction*/, TableReadState& /*readSt
KU_ASSERT(false);
}

void RelTableData::insert(transaction::Transaction* transaction, ValueVector* srcNodeIDVector,
void RelTableData::insert(Transaction* transaction, ValueVector* srcNodeIDVector,
ValueVector* dstNodeIDVector, const std::vector<ValueVector*>& propertyVectors) {
auto localTableData = transaction->getLocalStorage()->getOrCreateLocalTableData(
tableID, columns, TableType::REL, getDataIdxFromDirection(direction), multiplicity);
auto checkPersistent =
localTableData->insert({srcNodeIDVector, dstNodeIDVector}, propertyVectors);
if (checkPersistent && multiplicity == common::RelMultiplicity::ONE) {
if (checkPersistent && multiplicity == RelMultiplicity::ONE) {
checkRelMultiplicityConstraint(transaction, srcNodeIDVector);
}
}

void RelTableData::update(transaction::Transaction* transaction, column_id_t columnID,
void RelTableData::update(Transaction* transaction, column_id_t columnID,
ValueVector* srcNodeIDVector, ValueVector* relIDVector, ValueVector* propertyVector) {
KU_ASSERT(columnID < columns.size() && columnID != REL_ID_COLUMN_ID);
auto localTableData = transaction->getLocalStorage()->getOrCreateLocalTableData(
Expand All @@ -278,7 +278,7 @@ bool RelTableData::delete_(

void RelTableData::checkRelMultiplicityConstraint(
Transaction* transaction, ValueVector* srcNodeIDVector) const {
KU_ASSERT(srcNodeIDVector->state->isFlat() && multiplicity == common::RelMultiplicity::ONE);
KU_ASSERT(srcNodeIDVector->state->isFlat() && multiplicity == RelMultiplicity::ONE);
auto nodeIDPos = srcNodeIDVector->state->selVector->selectedPositions[0];
auto nodeOffset = srcNodeIDVector->getValue<nodeID_t>(nodeIDPos).offset;
if (checkIfNodeHasRels(transaction, nodeOffset)) {
Expand Down Expand Up @@ -963,7 +963,7 @@ void RelTableData::prepareCommitNodeGroup(
}

LocalRelNG* RelTableData::getLocalNodeGroup(
transaction::Transaction* transaction, node_group_idx_t nodeGroupIdx) {
Transaction* transaction, node_group_idx_t nodeGroupIdx) {
auto localTableData = transaction->getLocalStorage()->getLocalTableData(
tableID, getDataIdxFromDirection(direction));
LocalRelNG* localNodeGroup = nullptr;
Expand Down
Loading

0 comments on commit 8b2c768

Please sign in to comment.