Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vacuum dropped columns during checkpoint #4074

Merged
merged 1 commit into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/catalog/catalog_entry/rel_table_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@ column_id_t RelTableCatalogEntry::getColumnID(property_id_t propertyID) const {
bool RelTableCatalogEntry::isSingleMultiplicity(RelDataDirection direction) const {
return getMultiplicity(direction) == RelMultiplicity::ONE;
}

RelMultiplicity RelTableCatalogEntry::getMultiplicity(RelDataDirection direction) const {
return direction == RelDataDirection::FWD ? dstMultiplicity : srcMultiplicity;
}

table_id_t RelTableCatalogEntry::getBoundTableID(RelDataDirection relDirection) const {
return relDirection == RelDataDirection::FWD ? srcTableID : dstTableID;
}

table_id_t RelTableCatalogEntry::getNbrTableID(RelDataDirection relDirection) const {
return relDirection == RelDataDirection::FWD ? dstTableID : srcTableID;
}
Expand Down
40 changes: 21 additions & 19 deletions src/catalog/catalog_entry/table_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ bool TableCatalogEntry::containProperty(const std::string& propertyName) const {
[&propertyName](const auto& property) { return property.getName() == propertyName; });
}

common::property_id_t TableCatalogEntry::getPropertyID(const std::string& propertyName) const {
property_id_t TableCatalogEntry::getPropertyID(const std::string& propertyName) const {
auto it = std::find_if(properties.begin(), properties.end(),
[&propertyName](const auto& property) { return property.getName() == propertyName; });
KU_ASSERT(it != properties.end());
return it->getPropertyID();
}

const Property* TableCatalogEntry::getProperty(common::property_id_t propertyID) const {
const Property* TableCatalogEntry::getProperty(property_id_t propertyID) const {
auto it = std::find_if(properties.begin(), properties.end(),
[&propertyID](const auto& property) { return property.getPropertyID() == propertyID; });
KU_ASSERT(it != properties.end());
Expand All @@ -73,41 +73,43 @@ uint32_t TableCatalogEntry::getPropertyPos(common::property_id_t propertyID) con
return it - properties.begin();
}

common::column_id_t TableCatalogEntry::getColumnID(const common::property_id_t propertyID) const {
column_id_t TableCatalogEntry::getColumnID(const property_id_t propertyID) const {
auto it = std::find_if(properties.begin(), properties.end(),
[&propertyID](const auto& property) { return property.getPropertyID() == propertyID; });
KU_ASSERT(it != properties.end());
return it->getColumnID();
}

bool TableCatalogEntry::containPropertyType(const common::LogicalType& logicalType) const {
return std::any_of(properties.begin(), properties.end(),
[&logicalType](const Property& property) { return property.getDataType() == logicalType; });
}

void TableCatalogEntry::addProperty(std::string propertyName, common::LogicalType dataType,
void TableCatalogEntry::addProperty(std::string propertyName, LogicalType dataType,
std::unique_ptr<parser::ParsedExpression> defaultExpr) {
properties.emplace_back(std::move(propertyName), std::move(dataType), std::move(defaultExpr),
nextPID++, nextColumnID++, tableID);
}

void TableCatalogEntry::dropProperty(common::property_id_t propertyID) {
void TableCatalogEntry::dropProperty(property_id_t propertyID) {
properties.erase(std::remove_if(properties.begin(), properties.end(),
[propertyID](const Property& property) {
return property.getPropertyID() == propertyID;
}),
properties.end());
}

void TableCatalogEntry::renameProperty(common::property_id_t propertyID,
const std::string& newName) {
void TableCatalogEntry::renameProperty(property_id_t propertyID, const std::string& newName) {
auto it = std::find_if(properties.begin(), properties.end(),
[&propertyID](const auto& property) { return property.getPropertyID() == propertyID; });
KU_ASSERT(it != properties.end());
it->rename(newName);
}

void TableCatalogEntry::serialize(common::Serializer& serializer) const {
void TableCatalogEntry::resetColumnIDs() {
auto columnID = 0u;
for (auto& property : properties) {
property.setColumnID(columnID++);
}
nextColumnID = columnID;
}

void TableCatalogEntry::serialize(Serializer& serializer) const {
CatalogEntry::serialize(serializer);
serializer.write(tableID);
serializer.serializeVector(properties);
Expand All @@ -116,13 +118,13 @@ void TableCatalogEntry::serialize(common::Serializer& serializer) const {
serializer.write(nextColumnID);
}

std::unique_ptr<TableCatalogEntry> TableCatalogEntry::deserialize(
common::Deserializer& deserializer, CatalogEntryType type) {
common::table_id_t tableID;
std::unique_ptr<TableCatalogEntry> TableCatalogEntry::deserialize(Deserializer& deserializer,
CatalogEntryType type) {
table_id_t tableID;
std::vector<Property> properties;
std::string comment;
common::property_id_t nextPID;
common::column_id_t nextColumnID;
property_id_t nextPID;
column_id_t nextColumnID;
deserializer.deserializeValue(tableID);
deserializer.deserializeVector(properties);
deserializer.deserializeValue(comment);
Expand Down Expand Up @@ -167,7 +169,7 @@ void TableCatalogEntry::copyFrom(const CatalogEntry& other) {
binder::BoundCreateTableInfo TableCatalogEntry::getBoundCreateTableInfo(
transaction::Transaction* transaction) const {
auto extraInfo = getBoundExtraCreateInfo(transaction);
return BoundCreateTableInfo(getTableType(), name, common::ConflictAction::ON_CONFLICT_THROW,
return BoundCreateTableInfo(getTableType(), name, ConflictAction::ON_CONFLICT_THROW,
std::move(extraInfo));
}

Expand Down
2 changes: 1 addition & 1 deletion src/include/catalog/catalog_entry/sequence_catalog_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
#include <mutex>

#include "binder/ddl/bound_create_sequence_info.h"
#include "catalog/property.h"
#include "catalog_entry.h"
#include "common/vector/value_vector.h"

namespace kuzu {
namespace binder {
Expand Down
4 changes: 2 additions & 2 deletions src/include/catalog/catalog_entry/table_catalog_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class KUZU_API TableCatalogEntry : public CatalogEntry {
std::string getComment() const { return comment; }
void setComment(std::string newComment) { comment = std::move(newComment); }
virtual bool isParent(common::table_id_t /*tableID*/) { return false; };
// TODO(Guodong/Ziyi): This function should be removed. Instead we should use CatalogEntryType.
virtual common::TableType getTableType() const = 0;
virtual function::TableFunction getScanFunction() { KU_UNREACHABLE; }
binder::BoundAlterInfo* getAlterInfo() const { return alterInfo.get(); }
Expand All @@ -56,16 +55,17 @@ class KUZU_API TableCatalogEntry : public CatalogEntry {
//===--------------------------------------------------------------------===//
uint32_t getNumProperties() const { return properties.size(); }
const std::vector<Property>& getPropertiesRef() const { return properties; }
std::vector<Property>& getPropertiesUnsafe() { return properties; }
bool containProperty(const std::string& propertyName) const;
common::property_id_t getPropertyID(const std::string& propertyName) const;
const Property* getProperty(common::property_id_t propertyID) const;
uint32_t getPropertyPos(common::property_id_t propertyID) const;
virtual common::column_id_t getColumnID(common::property_id_t propertyID) const;
bool containPropertyType(const common::LogicalType& logicalType) const;
void addProperty(std::string propertyName, common::LogicalType dataType,
std::unique_ptr<parser::ParsedExpression> defaultExpr);
void dropProperty(common::property_id_t propertyID);
void renameProperty(common::property_id_t propertyID, const std::string& newName);
void resetColumnIDs();

//===--------------------------------------------------------------------===//
// serialization & deserialization
Expand Down
4 changes: 2 additions & 2 deletions src/include/catalog/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include "common/copy_constructors.h"
#include "common/types/types.h"
#include "common/types/value/value.h"
#include "parser/expression/parsed_expression.h"

namespace kuzu {
Expand All @@ -29,6 +28,7 @@ class Property {

std::string getName() const { return name; }

void setColumnID(common::column_id_t columnID) { this->columnID = columnID; }
const common::LogicalType& getDataType() const { return dataType; }
common::property_id_t getPropertyID() const { return propertyID; }
common::column_id_t getColumnID() const { return columnID; }
Expand All @@ -40,7 +40,7 @@ class Property {
void serialize(common::Serializer& serializer) const;
static Property deserialize(common::Deserializer& deserializer);

static std::string toCypher(const std::vector<kuzu::catalog::Property>& properties);
static std::string toCypher(const std::vector<Property>& properties);

private:
Property(const Property& other)
Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/store/chunked_node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class ChunkedNodeGroup {

ChunkedNodeGroup(std::vector<std::unique_ptr<ColumnChunk>> chunks,
common::row_idx_t startRowIdx, NodeGroupDataFormat format = NodeGroupDataFormat::REGULAR);
ChunkedNodeGroup(ChunkedNodeGroup& base,
const std::vector<common::column_id_t>& selectedColumns);
ChunkedNodeGroup(const std::vector<common::LogicalType>& columnTypes, bool enableCompression,
uint64_t capacity, common::row_idx_t startRowIdx, ResidencyState residencyState,
NodeGroupDataFormat format = NodeGroupDataFormat::REGULAR);
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/store/csr_chunked_node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class ChunkedCSRNodeGroup final : public ChunkedNodeGroup {
: ChunkedNodeGroup{columnTypes, enableCompression, capacity, startOffset, residencyState,
NodeGroupDataFormat::CSR},
csrHeader{enableCompression, common::StorageConstants::NODE_GROUP_SIZE, residencyState} {}
ChunkedCSRNodeGroup(ChunkedCSRNodeGroup& base,
const std::vector<common::column_id_t>& selectedColumns)
: ChunkedNodeGroup{base, selectedColumns}, csrHeader{std::move(base.csrHeader)} {}
ChunkedCSRNodeGroup(ChunkedCSRHeader csrHeader,
std::vector<std::unique_ptr<ColumnChunk>> chunks, common::row_idx_t startRowIdx)
: ChunkedNodeGroup{std::move(chunks), startRowIdx, NodeGroupDataFormat::CSR},
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/csr_node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ struct CSRNodeGroupCheckpointState final : NodeGroupCheckpointState {
std::unique_ptr<ChunkedCSRHeader> newHeader;

CSRNodeGroupCheckpointState(std::vector<common::column_id_t> columnIDs,
std::vector<Column*> columns, BMFileHandle& dataFH, MemoryManager* mm, Column* csrOffsetCol,
Column* csrLengthCol)
std::vector<std::unique_ptr<Column>> columns, BMFileHandle& dataFH, MemoryManager* mm,
Column* csrOffsetCol, Column* csrLengthCol)
: NodeGroupCheckpointState{std::move(columnIDs), std::move(columns), dataFH, mm},
csrOffsetColumn{csrOffsetCol}, csrLengthColumn{csrLengthCol} {}
};
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ struct NodeGroupScanState {
class MemoryManager;
struct NodeGroupCheckpointState {
std::vector<common::column_id_t> columnIDs;
std::vector<Column*> columns;
std::vector<std::unique_ptr<Column>> columns;
BMFileHandle& dataFH;
MemoryManager* mm;

NodeGroupCheckpointState(std::vector<common::column_id_t> columnIDs,
std::vector<Column*> columns, BMFileHandle& dataFH, MemoryManager* mm)
std::vector<std::unique_ptr<Column>> columns, BMFileHandle& dataFH, MemoryManager* mm)
: columnIDs{std::move(columnIDs)}, columns{std::move(columns)}, dataFH{dataFH}, mm{mm} {}
virtual ~NodeGroupCheckpointState() = default;

Expand Down
8 changes: 1 addition & 7 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <cstdint>

#include "common/exception/not_implemented.h"
#include "common/types/types.h"
#include "storage/index/hash_index.h"
#include "storage/store/node_group_collection.h"
Expand Down Expand Up @@ -115,10 +114,6 @@ class NodeTable final : public Table {

void addColumn(transaction::Transaction* transaction,
TableAddColumnState& addColumnState) override;
void dropColumn(common::column_id_t) override {
throw common::NotImplementedException("dropColumn is not implemented yet.");
}

bool isVisible(const transaction::Transaction* transaction, common::offset_t offset) const;

bool lookupPK(const transaction::Transaction* transaction, common::ValueVector* keyVector,
Expand Down Expand Up @@ -150,8 +145,7 @@ class NodeTable final : public Table {
transaction::Transaction* transaction, ChunkedNodeGroup& chunkedGroup);

void commit(transaction::Transaction* transaction, LocalTable* localTable) override;
void rollback(LocalTable* localTable) override;
void checkpoint(common::Serializer& ser) override;
void checkpoint(common::Serializer& ser, catalog::TableCatalogEntry* tableEntry) override;

common::node_group_idx_t getNumCommittedNodeGroups() const {
return nodeGroups->getNumNodeGroups();
Expand Down
8 changes: 1 addition & 7 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,6 @@ class RelTable final : public Table {

void addColumn(transaction::Transaction* transaction,
TableAddColumnState& addColumnState) override;
void dropColumn(common::column_id_t) override {
// TODO(Guodong): Rework this.
// fwdRelTableData->dropColumn(columnID);
// bwdRelTableData->dropColumn(columnID);
}
Column* getCSROffsetColumn(common::RelDataDirection direction) const {
return direction == common::RelDataDirection::FWD ? fwdRelTableData->getCSROffsetColumn() :
bwdRelTableData->getCSROffsetColumn();
Expand All @@ -164,8 +159,7 @@ class RelTable final : public Table {
common::RelDataDirection direction) const;

void commit(transaction::Transaction* transaction, LocalTable* localTable) override;
void rollback(LocalTable* localTable) override;
void checkpoint(common::Serializer& ser) override;
void checkpoint(common::Serializer& ser, catalog::TableCatalogEntry* tableEntry) override;

common::row_idx_t getNumRows() override { return nextRelOffset; }

Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class RelTableData {
return numRows;
}

void checkpoint() const;
void checkpoint(const std::vector<common::column_id_t>& columnIDs);

void serialize(common::Serializer& serializer) const;

Expand Down
5 changes: 2 additions & 3 deletions src/include/storage/store/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,10 @@ class Table {

virtual void addColumn(transaction::Transaction* transaction,
TableAddColumnState& addColumnState) = 0;
virtual void dropColumn(common::column_id_t columnID) = 0;
void dropColumn() { setHasChanges(); }

virtual void commit(transaction::Transaction* transaction, LocalTable* localTable) = 0;
virtual void rollback(LocalTable* localTable) = 0;
virtual void checkpoint(common::Serializer& ser) = 0;
virtual void checkpoint(common::Serializer& ser, catalog::TableCatalogEntry* tableEntry) = 0;

virtual common::row_idx_t getNumRows() = 0;

Expand Down
3 changes: 3 additions & 0 deletions src/processor/operator/ddl/alter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ void Alter::executeDDLInternal(ExecutionContext* context) {
auto storageManager = context->clientContext->getStorageManager();
storage::TableAddColumnState state{*addedProp, *defaultValueEvaluator};
storageManager->getTable(info.tableID)->addColumn(context->clientContext->getTx(), state);
} else if (info.alterType == common::AlterType::DROP_PROPERTY) {
auto storageManager = context->clientContext->getStorageManager();
storageManager->getTable(info.tableID)->dropColumn();
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/storage/local_storage/local_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ void LocalStorage::commit() {

void LocalStorage::rollback() {
for (auto& [tableID, localTable] : tables) {
const auto table = clientContext.getStorageManager()->getTable(tableID);
table->rollback(localTable.get());
localTable->clear();
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/storage/storage_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,15 @@ void StorageManager::checkpoint(main::ClientContext& clientContext) {
stringFormat("Checkpoint failed: table {} not found in storage manager.",
tableEntry->getName()));
}
tables.at(tableEntry->getTableID())->checkpoint(ser);
tables.at(tableEntry->getTableID())->checkpoint(ser, tableEntry);
}
for (const auto tableEntry : relTableEntries) {
if (!tables.contains(tableEntry->getTableID())) {
throw RuntimeException(
stringFormat("Checkpoint failed: table {} not found in storage manager.",
tableEntry->getName()));
}
tables.at(tableEntry->getTableID())->checkpoint(ser);
tables.at(tableEntry->getTableID())->checkpoint(ser, tableEntry);
}
writer->flush();
writer->sync();
Expand Down
11 changes: 11 additions & 0 deletions src/storage/store/chunked_node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ ChunkedNodeGroup::ChunkedNodeGroup(std::vector<std::unique_ptr<ColumnChunk>> chu
}
}

ChunkedNodeGroup::ChunkedNodeGroup(ChunkedNodeGroup& base,
const std::vector<column_id_t>& selectedColumns)
: format{base.format}, residencyState{base.residencyState}, startRowIdx{base.startRowIdx},
capacity{base.capacity}, numRows{base.numRows.load()} {
chunks.reserve(selectedColumns.size());
for (const auto columnID : selectedColumns) {
KU_ASSERT(columnID < base.getNumColumns());
chunks.push_back(base.moveColumnChunk(columnID));
}
}

ChunkedNodeGroup::ChunkedNodeGroup(const std::vector<LogicalType>& columnTypes,
bool enableCompression, uint64_t capacity, row_idx_t startRowIdx, ResidencyState residencyState,
NodeGroupDataFormat format)
Expand Down
12 changes: 9 additions & 3 deletions src/storage/store/csr_node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,12 @@
// No csr regions need to be checkpointed, meaning nothing is updated or deleted.
// We should reset the version and update info of the persistent chunked group.
persistentChunkGroup->resetVersionAndUpdateInfo();
if (csrState.columnIDs.size() != persistentChunkGroup->getNumColumns()) {
// The column set of the node group has changed. We need to re-create the persistent
// chunked group.
persistentChunkGroup = std::make_unique<ChunkedCSRNodeGroup>(
persistentChunkGroup->cast<ChunkedCSRNodeGroup>(), csrState.columnIDs);

Check warning on line 352 in src/storage/store/csr_node_group.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/store/csr_node_group.cpp#L351-L352

Added lines #L351 - L352 were not covered by tests
}
return;
}
if (regionsToCheckpoint.size() == 1 &&
Expand All @@ -353,9 +359,7 @@
redistributeCSRRegions(csrState, leafRegions);
} else {
for (auto& region : regionsToCheckpoint) {
// if (region.hasDeletionsOrInsertions()) {
csrState.newHeader->populateRegionCSROffsets(region, *csrState.oldHeader);
// }
// The left node offset of a region should always maintain stable across length and
// offset changes.
KU_ASSERT(csrState.oldHeader->getStartCSROffset(region.leftNodeOffset) ==
Expand All @@ -367,6 +371,8 @@
checkpointColumn(lock, columnID, csrState, regionsToCheckpoint);
}
checkpointCSRHeaderColumns(csrState);
persistentChunkGroup = std::make_unique<ChunkedCSRNodeGroup>(
persistentChunkGroup->cast<ChunkedCSRNodeGroup>(), csrState.columnIDs);
finalizeCheckpoint(lock);
}

Expand Down Expand Up @@ -429,7 +435,7 @@
numOldRowsInRegion, false, ResidencyState::IN_MEMORY);
ChunkState chunkState;
const auto& persistentChunk = persistentChunkGroup->getColumnChunk(columnID);
chunkState.column = csrState.columns[columnID];
chunkState.column = csrState.columns[columnID].get();
persistentChunk.initializeScanState(chunkState);
persistentChunk.scanCommitted<ResidencyState::ON_DISK>(&DUMMY_CHECKPOINT_TRANSACTION,
chunkState, *oldChunkWithUpdates, leftCSROffset, numOldRowsInRegion);
Expand Down
Loading
Loading