Skip to content

Commit

Permalink
Apply zone map to rel scan (kuzudb#3573)
Browse files Browse the repository at this point in the history
* Apply zone map to scan rel table

* Run clang-format

---------

Co-authored-by: CI Bot <andyfengHKU@users.noreply.github.com>
  • Loading branch information
andyfengHKU and andyfengHKU authored Jun 5, 2024
1 parent b95c1f9 commit 5938561
Show file tree
Hide file tree
Showing 63 changed files with 383 additions and 292 deletions.
6 changes: 3 additions & 3 deletions src/binder/query/query_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,11 @@ void QueryGraphCollection::finalize() {
std::vector<QueryGraph> QueryGraphCollection::mergeGraphs(common::idx_t baseGraphIdx) {
KU_ASSERT(baseGraphIdx < queryGraphs.size());
auto baseGraph = std::move(queryGraphs[baseGraphIdx]);
std::unordered_set<common::vector_idx_t> mergedGraphIndices;
std::unordered_set<common::idx_t> mergedGraphIndices;
mergedGraphIndices.insert(baseGraphIdx);
while (true) {
// find graph to merge
common::vector_idx_t graphToMergeIdx = common::INVALID_VECTOR_IDX;
common::idx_t graphToMergeIdx = common::INVALID_IDX;
for (auto i = 0u; i < queryGraphs.size(); ++i) {
if (mergedGraphIndices.contains(i)) { // graph has been merged.
continue;
Expand All @@ -256,7 +256,7 @@ std::vector<QueryGraph> QueryGraphCollection::mergeGraphs(common::idx_t baseGrap
break;
}
}
if (graphToMergeIdx == common::INVALID_VECTOR_IDX) { // No graph can be merged. Terminate.
if (graphToMergeIdx == common::INVALID_IDX) { // No graph can be merged. Terminate.
break;
}
// Perform merge
Expand Down
2 changes: 1 addition & 1 deletion src/function/table/call/storage_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace function {

struct StorageInfoLocalState final : public TableFuncLocalState {
std::unique_ptr<DataChunkCollection> dataChunkCollection;
vector_idx_t currChunkIdx;
idx_t currChunkIdx;

explicit StorageInfoLocalState(storage::MemoryManager* mm) : currChunkIdx{0} {
dataChunkCollection = std::make_unique<DataChunkCollection>(mm);
Expand Down
2 changes: 1 addition & 1 deletion src/include/binder/expression/node_rel_expression.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class NodeOrRelExpression : public Expression {
// A pattern may bind to multiple tables.
std::vector<common::table_id_t> tableIDs;
// Index over propertyExprs on property name.
std::unordered_map<std::string, common::vector_idx_t> propertyNameToIdx;
std::unordered_map<std::string, common::idx_t> propertyNameToIdx;
// Property expressions with order (aligned with catalog).
std::vector<std::unique_ptr<Expression>> propertyExprs;
std::shared_ptr<Expression> labelExpression;
Expand Down
3 changes: 1 addition & 2 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ using property_id_t = uint32_t;
constexpr property_id_t INVALID_PROPERTY_ID = UINT32_MAX;
using column_id_t = property_id_t;
constexpr column_id_t INVALID_COLUMN_ID = INVALID_PROPERTY_ID;
using vector_idx_t = uint32_t;
using idx_t = uint32_t;
constexpr vector_idx_t INVALID_VECTOR_IDX = UINT32_MAX;
const idx_t INVALID_IDX = UINT32_MAX;
using block_idx_t = uint64_t;
constexpr block_idx_t INVALID_BLOCK_IDX = UINT64_MAX;
using struct_field_idx_t = uint8_t;
Expand Down
3 changes: 1 addition & 2 deletions src/include/common/vector/auxiliary_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ class StructAuxiliaryBuffer : public AuxiliaryBuffer {
public:
StructAuxiliaryBuffer(const LogicalType& type, storage::MemoryManager* memoryManager);

inline void referenceChildVector(vector_idx_t idx,
std::shared_ptr<ValueVector> vectorToReference) {
inline void referenceChildVector(idx_t idx, std::shared_ptr<ValueVector> vectorToReference) {
childrenVectors[idx] = std::move(vectorToReference);
}
inline const std::vector<std::shared_ptr<ValueVector>>& getFieldVectors() const {
Expand Down
4 changes: 2 additions & 2 deletions src/include/function/path/vector_path_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ struct RelsFunction {
};

struct PropertiesBindData : public FunctionBindData {
common::vector_idx_t childIdx;
common::idx_t childIdx;

PropertiesBindData(std::unique_ptr<common::LogicalType> dataType, common::vector_idx_t childIdx)
PropertiesBindData(std::unique_ptr<common::LogicalType> dataType, common::idx_t childIdx)
: FunctionBindData{std::move(dataType)}, childIdx{childIdx} {}
};

Expand Down
5 changes: 2 additions & 3 deletions src/include/function/struct/vector_struct_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ struct StructPackFunctions {
};

struct StructExtractBindData : public FunctionBindData {
common::vector_idx_t childIdx;
common::idx_t childIdx;

StructExtractBindData(std::unique_ptr<common::LogicalType> dataType,
common::vector_idx_t childIdx)
StructExtractBindData(std::unique_ptr<common::LogicalType> dataType, common::idx_t childIdx)
: FunctionBindData{std::move(dataType)}, childIdx{childIdx} {}
};

Expand Down
3 changes: 3 additions & 0 deletions src/include/main/client_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ struct ClientConfig {
std::string fileSearchPath;
// If using semi mask in join.
bool enableSemiMask;
// If using zone map in scan.
bool enableZoneMap;
// Number of threads for execution.
uint64_t numThreads;
// Timeout (milliseconds).
Expand All @@ -36,6 +38,7 @@ struct ClientConfigDefault {
static constexpr uint64_t TIMEOUT_IN_MS = 0;
static constexpr uint32_t VAR_LENGTH_MAX_DEPTH = 30;
static constexpr bool ENABLE_SEMI_MASK = true;
static constexpr bool ENABLE_ZONEMAP = true;
static constexpr bool ENABLE_PROGRESS_BAR = false;
static constexpr uint64_t SHOW_PROGRESS_AFTER = 1000;
static constexpr common::PathSemantic RECURSIVE_PATTERN_SEMANTIC = common::PathSemantic::WALK;
Expand Down
12 changes: 12 additions & 0 deletions src/include/main/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ struct EnableSemiMaskSetting {
}
};

struct EnableZoneMapSetting {
static constexpr const char* name = "enable_zone_map";
static constexpr const common::LogicalTypeID inputType = common::LogicalTypeID::BOOL;
static void setContext(ClientContext* context, const common::Value& parameter) {
parameter.validateType(inputType);
context->getClientConfigUnsafe()->enableZoneMap = parameter.getValue<bool>();
}
static common::Value getSetting(ClientContext* context) {
return common::Value(context->getClientConfig()->enableZoneMap);
}
};

struct HomeDirectorySetting {
static constexpr const char* name = "home_directory";
static constexpr const common::LogicalTypeID inputType = common::LogicalTypeID::STRING;
Expand Down
4 changes: 3 additions & 1 deletion src/include/optimizer/filter_push_down_optimizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ class FilterPushDownOptimizer {
// Filter(a.ID=b.ID)
// CrossProduct to HashJoin
// S(a) S(b) S(a) S(b)
// This is a temporary solution in the absence of a generic hash join operator.
std::shared_ptr<planner::LogicalOperator> visitCrossProductReplace(
const std::shared_ptr<planner::LogicalOperator>& op);

// Push FILTER into SCAN_NODE_TABLE, and turn index lookup into INDEX_SCAN.
std::shared_ptr<planner::LogicalOperator> visitScanNodeTableReplace(
const std::shared_ptr<planner::LogicalOperator>& op);
// Push Filter into EXTEND.
std::shared_ptr<planner::LogicalOperator> visitExtendReplace(
const std::shared_ptr<planner::LogicalOperator>& op);

// Finish the current push down optimization by apply remaining predicates as a single filter.
// And heuristically reorder equality predicates first in the filter.
Expand Down
1 change: 1 addition & 0 deletions src/include/planner/operator/extend/base_logical_extend.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class BaseLogicalExtend : public LogicalOperator {
std::shared_ptr<binder::NodeExpression> getBoundNode() const { return boundNode; }
std::shared_ptr<binder::NodeExpression> getNbrNode() const { return nbrNode; }
std::shared_ptr<binder::RelExpression> getRel() const { return rel; }
bool isRecursive() const { return rel->isRecursive(); }
common::ExtendDirection getDirection() const { return direction; }

bool extendFromSourceNode() const {
Expand Down
13 changes: 9 additions & 4 deletions src/include/planner/operator/extend/logical_extend.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "planner/operator/extend/base_logical_extend.h"
#include "storage/predicate/column_predicate.h"

namespace kuzu {
namespace planner {
Expand All @@ -21,14 +22,18 @@ class LogicalExtend : public BaseLogicalExtend {
void computeFlatSchema() override;

binder::expression_vector getProperties() const { return properties; }

std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalExtend>(boundNode, nbrNode, rel, direction, properties,
hasAtMostOneNbr, children[0]->copy());
void setPropertyPredicates(std::vector<storage::ColumnPredicateSet> predicates) {
propertyPredicates = std::move(predicates);
}
const std::vector<storage::ColumnPredicateSet>& getPropertyPredicates() const {
return propertyPredicates;
}

std::unique_ptr<LogicalOperator> copy() override;

private:
binder::expression_vector properties;
std::vector<storage::ColumnPredicateSet> propertyPredicates;
bool hasAtMostOneNbr;
};

Expand Down
4 changes: 2 additions & 2 deletions src/include/planner/operator/logical_partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class LogicalPartitioner : public LogicalOperator {

std::string getExpressionsForPrinting() const final;

inline common::vector_idx_t getNumInfos() { return infos.size(); }
inline LogicalPartitionerInfo* getInfo(common::vector_idx_t idx) {
inline common::idx_t getNumInfos() { return infos.size(); }
inline LogicalPartitionerInfo* getInfo(common::idx_t idx) {
KU_ASSERT(idx < infos.size());
return infos[idx].get();
}
Expand Down
2 changes: 1 addition & 1 deletion src/include/planner/subplans_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class SubgraphPlans {
uint64_t maxCost = UINT64_MAX;
binder::expression_vector nodeIDsToEncode;
std::vector<std::unique_ptr<LogicalPlan>> plans;
std::unordered_map<std::bitset<binder::MAX_NUM_QUERY_VARIABLES>, common::vector_idx_t>
std::unordered_map<std::bitset<binder::MAX_NUM_QUERY_VARIABLES>, common::idx_t>
encodedPlan2PlanIdx;
};

Expand Down
8 changes: 4 additions & 4 deletions src/include/processor/data_pos.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
namespace kuzu {
namespace processor {

using data_chunk_pos_t = common::vector_idx_t;
constexpr data_chunk_pos_t INVALID_DATA_CHUNK_POS = common::INVALID_VECTOR_IDX;
using value_vector_pos_t = common::vector_idx_t;
constexpr value_vector_pos_t INVALID_VALUE_VECTOR_POS = common::INVALID_VECTOR_IDX;
using data_chunk_pos_t = common::idx_t;
constexpr data_chunk_pos_t INVALID_DATA_CHUNK_POS = common::INVALID_IDX;
using value_vector_pos_t = common::idx_t;
constexpr value_vector_pos_t INVALID_VALUE_VECTOR_POS = common::INVALID_IDX;

struct DataPos {
data_chunk_pos_t dataChunkPos;
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/order_by/top_k.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ class TopKBuffer {

bool compareBoundaryValue(const std::vector<common::ValueVector*>& keyVectors);

bool compareFlatKeys(common::vector_idx_t vectorIdxToCompare,
bool compareFlatKeys(common::idx_t vectorIdxToCompare,
const std::vector<common::ValueVector*> keyVectors);

void compareUnflatKeys(common::vector_idx_t vectorIdxToCompare,
void compareUnflatKeys(common::idx_t vectorIdxToCompare,
const std::vector<common::ValueVector*> keyVectors);

static void appendSelState(common::SelectionVector* selVector,
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ struct PartitionerSharedState {
: columnTypes{std::move(columnTypes)} {}

void initialize(std::vector<std::unique_ptr<PartitioningInfo>>& infos);
common::partition_idx_t getNextPartition(common::vector_idx_t partitioningIdx);
common::partition_idx_t getNextPartition(common::idx_t partitioningIdx);
void resetState();
void merge(std::vector<std::unique_ptr<PartitioningBuffer>> localPartitioningStates);

inline const storage::ChunkedNodeGroupCollection& getPartitionBuffer(
common::vector_idx_t partitioningIdx, common::partition_idx_t partitionIdx) const {
common::idx_t partitioningIdx, common::partition_idx_t partitionIdx) const {
KU_ASSERT(partitioningIdx < partitioningBuffers.size());
KU_ASSERT(partitionIdx < partitioningBuffers[partitioningIdx]->partitions.size());
return partitioningBuffers[partitioningIdx]->partitions[partitionIdx];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ struct NodeBatchInsertInfo final : public BatchInsertInfo {
struct NodeBatchInsertSharedState final : public BatchInsertSharedState {
// Primary key info
storage::PrimaryKeyIndex* pkIndex;
common::vector_idx_t pkColumnIdx;
common::idx_t pkColumnIdx;
common::LogicalType pkType;
std::optional<IndexBuilder> globalIndexBuilder = std::nullopt;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class BaseBFSState {

inline void finalizeCurrentLevel() { moveNextLevelAsCurrentLevel(); }
inline size_t getNumFrontiers() const { return frontiers.size(); }
inline Frontier* getFrontier(common::vector_idx_t idx) const { return frontiers[idx].get(); }
inline Frontier* getFrontier(common::idx_t idx) const { return frontiers[idx].get(); }

protected:
inline bool isCurrentFrontierEmpty() const { return currentFrontier->nodeIDs.empty(); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class DstNodeWithMultiplicityScanner : public BaseFrontierScanner {
*/
struct FrontiersScanner {
std::vector<std::unique_ptr<BaseFrontierScanner>> scanners;
common::vector_idx_t cursor;
common::idx_t cursor;

explicit FrontiersScanner(std::vector<std::unique_ptr<BaseFrontierScanner>> scanners)
: scanners{std::move(scanners)}, cursor{0} {}
Expand Down
27 changes: 14 additions & 13 deletions src/include/processor/operator/scan/scan_multi_rel_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ class RelTableCollectionScanner {
friend class ScanMultiRelTable;

public:
explicit RelTableCollectionScanner(std::vector<std::unique_ptr<ScanRelTableInfo>> scanInfos)
: scanInfos{std::move(scanInfos)} {}
explicit RelTableCollectionScanner(std::vector<ScanRelTableInfo> relInfos)
: relInfos{std::move(relInfos)} {}
EXPLICIT_COPY_DEFAULT_MOVE(RelTableCollectionScanner);

bool empty() const { return relInfos.empty(); }

void resetState() {
currentTableIdx = 0;
Expand All @@ -33,29 +36,27 @@ class RelTableCollectionScanner {

bool scan(const common::SelectionVector& selVector, transaction::Transaction* transaction);

std::unique_ptr<RelTableCollectionScanner> clone() const;
private:
RelTableCollectionScanner(const RelTableCollectionScanner& other)
: relInfos{copyVector(other.relInfos)} {}

private:
std::vector<std::unique_ptr<ScanRelTableInfo>> scanInfos;
std::vector<std::unique_ptr<storage::RelTableScanState>> readStates;
std::vector<ScanRelTableInfo> relInfos;
std::vector<bool> directionValues;
common::ValueVector* directionVector = nullptr;
uint32_t currentTableIdx = UINT32_MAX;
common::idx_t currentTableIdx = common::INVALID_IDX;
uint32_t nextTableIdx = 0;
};

class ScanMultiRelTable : public ScanTable {
static constexpr PhysicalOperatorType type_ = PhysicalOperatorType::SCAN_REL_TABLE;
using node_table_id_scanner_map_t =
std::unordered_map<common::table_id_t, std::unique_ptr<RelTableCollectionScanner>>;

public:
ScanMultiRelTable(ScanTableInfo info, DirectionInfo directionInfo,
node_table_id_scanner_map_t scannerPerNodeTable, std::unique_ptr<PhysicalOperator> child,
uint32_t id, const std::string& paramsString)
common::table_id_map_t<RelTableCollectionScanner> scanners,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: ScanTable{type_, std::move(info), std::move(child), id, paramsString},
directionInfo{std::move(directionInfo)},
scannerPerNodeTable{std::move(scannerPerNodeTable)} {}
directionInfo{std::move(directionInfo)}, scanners{std::move(scanners)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

Expand All @@ -69,7 +70,7 @@ class ScanMultiRelTable : public ScanTable {

private:
DirectionInfo directionInfo;
node_table_id_scanner_map_t scannerPerNodeTable;
common::table_id_map_t<RelTableCollectionScanner> scanners;
RelTableCollectionScanner* currentScanner = nullptr;
};

Expand Down
7 changes: 4 additions & 3 deletions src/include/processor/operator/scan/scan_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ struct ScanNodeTableInfo {
columnPredicates{std::move(columnPredicates)} {}
EXPLICIT_COPY_DEFAULT_MOVE(ScanNodeTableInfo);

void initScanState();

private:
ScanNodeTableInfo(const ScanNodeTableInfo& other)
: table{other.table}, columnIDs{other.columnIDs},
Expand All @@ -63,7 +65,7 @@ class ScanNodeTable final : public ScanTable {

bool getNextTuplesInternal(ExecutionContext* context) override;

const ScanNodeTableSharedState& getSharedState(common::vector_idx_t idx) const {
const ScanNodeTableSharedState& getSharedState(common::idx_t idx) const {
KU_ASSERT(idx < sharedStates.size());
return *sharedStates[idx];
}
Expand All @@ -74,8 +76,7 @@ class ScanNodeTable final : public ScanTable {
void initGlobalStateInternal(ExecutionContext* context) override;

private:
common::vector_idx_t currentTableIdx;
// TODO(Guodong): Refactor following three fields into a vector of structs.
common::idx_t currentTableIdx;
std::vector<ScanNodeTableInfo> nodeInfos;
std::vector<std::shared_ptr<ScanNodeTableSharedState>> sharedStates;
};
Expand Down
Loading

0 comments on commit 5938561

Please sign in to comment.