Skip to content

Commit

Permalink
Rework resultset descriptor
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed May 22, 2023
1 parent ad3741e commit 977e464
Show file tree
Hide file tree
Showing 17 changed files with 93 additions and 110 deletions.
6 changes: 3 additions & 3 deletions src/include/processor/mapper/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class PlanMapper {
std::unique_ptr<PhysicalOperator> mapLogicalRenamePropertyToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<ResultCollector> appendResultCollector(
const binder::expression_vector& expressionsToCollect, const planner::Schema& schema,
const binder::expression_vector& expressionsToCollect, planner::Schema* schema,
std::unique_ptr<PhysicalOperator> prevOperator);

inline uint32_t getOperatorID() { return physicalOperatorID++; }
Expand All @@ -117,8 +117,8 @@ class PlanMapper {
const binder::expression_vector& dependentKeyExpressions,
std::vector<std::unique_ptr<function::AggregateFunction>> aggregateFunctions,
std::vector<std::unique_ptr<AggregateInputInfo>> aggregateInputInfos,
std::vector<DataPos> aggregatesOutputPos, const planner::Schema& inSchema,
const planner::Schema& outSchema, std::unique_ptr<PhysicalOperator> prevOperator,
std::vector<DataPos> aggregatesOutputPos, planner::Schema* inSchema,
planner::Schema* outSchema, std::unique_ptr<PhysicalOperator> prevOperator,
const std::string& paramsString);

static void mapAccHashJoin(PhysicalOperator* probe);
Expand Down
13 changes: 7 additions & 6 deletions src/include/processor/result/result_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@

#include "common/data_chunk/data_chunk.h"
#include "processor/data_pos.h"
#include "result_set_descriptor.h"

namespace kuzu {
namespace processor {

class ResultSet {
public:
struct ResultSet {
uint64_t multiplicity;
std::vector<std::shared_ptr<common::DataChunk>> dataChunks;

explicit ResultSet(uint32_t numDataChunks) : multiplicity{1}, dataChunks(numDataChunks) {}
static std::unique_ptr<ResultSet> getResultSet(
ResultSetDescriptor* resultSetDescriptor, storage::MemoryManager* memoryManager);

inline void insert(uint32_t pos, std::shared_ptr<common::DataChunk> dataChunk) {
assert(dataChunks.size() > pos);
Expand All @@ -32,10 +37,6 @@ class ResultSet {

uint64_t getNumTuplesWithoutMultiplicity(
const std::unordered_set<uint32_t>& dataChunksPosInScope);

public:
uint64_t multiplicity;
std::vector<std::shared_ptr<common::DataChunk>> dataChunks;
};

} // namespace processor
Expand Down
59 changes: 18 additions & 41 deletions src/include/processor/result/result_set_descriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,36 @@
#include <unordered_map>

#include "common/assert.h"
#include "planner/logical_plan/logical_operator/schema.h"
#include "processor/data_pos.h"

namespace kuzu {
namespace processor {
namespace planner {
class Schema;
} // namespace planner

class DataChunkDescriptor {
public:
DataChunkDescriptor() = default;
DataChunkDescriptor(const DataChunkDescriptor& other)
: singleState{other.singleState},
expressionNameToValueVectorPosMap{other.expressionNameToValueVectorPosMap},
expressions{other.expressions} {}
~DataChunkDescriptor() = default;
namespace processor {

inline void setSingleState() { singleState = true; }
inline bool isSingleState() const { return singleState; }
struct DataChunkDescriptor {
bool isSingleState;
std::vector<common::LogicalType> logicalTypes;

inline uint32_t getNumValueVectors() const { return expressions.size(); }
explicit DataChunkDescriptor(bool isSingleState) : isSingleState{isSingleState} {}
DataChunkDescriptor(const DataChunkDescriptor& other)
: isSingleState{other.isSingleState}, logicalTypes(other.logicalTypes) {}

inline void addExpression(std::shared_ptr<binder::Expression> expression) {
expressionNameToValueVectorPosMap.insert({expression->getUniqueName(), expressions.size()});
expressions.push_back(std::move(expression));
}
inline std::shared_ptr<binder::Expression> getExpression(uint32_t idx) const {
return expressions[idx];
inline std::unique_ptr<DataChunkDescriptor> copy() const {
return std::make_unique<DataChunkDescriptor>(*this);
}

private:
bool singleState = false;
std::unordered_map<std::string, uint32_t> expressionNameToValueVectorPosMap;
binder::expression_vector expressions;
};

class ResultSetDescriptor {
public:
explicit ResultSetDescriptor(const planner::Schema& schema);
ResultSetDescriptor(const ResultSetDescriptor& other);
~ResultSetDescriptor() = default;

inline uint32_t getNumDataChunks() const { return dataChunkDescriptors.size(); }

inline DataChunkDescriptor* getDataChunkDescriptor(uint32_t pos) const {
return dataChunkDescriptors[pos].get();
}
struct ResultSetDescriptor {
std::vector<std::unique_ptr<DataChunkDescriptor>> dataChunkDescriptors;

inline std::unique_ptr<ResultSetDescriptor> copy() const {
return std::make_unique<ResultSetDescriptor>(*this);
}
ResultSetDescriptor(std::vector<std::unique_ptr<DataChunkDescriptor>> dataChunkDescriptors)
: dataChunkDescriptors{std::move(dataChunkDescriptors)} {}
static std::unique_ptr<ResultSetDescriptor> getResultSetDescriptor(planner::Schema* schema);

private:
std::unordered_map<std::string, uint32_t> expressionNameToDataChunkPosMap;
std::vector<std::unique_ptr<DataChunkDescriptor>> dataChunkDescriptors;
std::unique_ptr<ResultSetDescriptor> copy() const;
};

} // namespace processor
Expand Down
1 change: 1 addition & 0 deletions src/processor/mapper/expression_mapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "expression_evaluator/function_evaluator.h"
#include "expression_evaluator/literal_evaluator.h"
#include "expression_evaluator/reference_evaluator.h"
#include "planner/logical_plan/logical_operator/schema.h"

using namespace kuzu::binder;
using namespace kuzu::common;
Expand Down
2 changes: 1 addition & 1 deletion src/processor/mapper/map_accumulate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalAccumulateToPhysical(
// append result collector
auto prevOperator = mapLogicalOperatorToPhysical(logicalAccumulate->getChild(0));
auto expressions = logicalAccumulate->getExpressions();
auto resultCollector = appendResultCollector(expressions, *inSchema, std::move(prevOperator));
auto resultCollector = appendResultCollector(expressions, inSchema, std::move(prevOperator));
// append factorized table scan
std::vector<DataPos> outDataPoses;
std::vector<uint32_t> colIndicesToScan;
Expand Down
24 changes: 12 additions & 12 deletions src/processor/mapper/map_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalAggregateToPhysical(
if (logicalAggregate.hasKeyExpressions()) {
return createHashAggregate(logicalAggregate.getKeyExpressions(),
logicalAggregate.getDependentKeyExpressions(), std::move(aggregateFunctions),
std::move(aggregateInputInfos), std::move(aggregatesOutputPos), *inSchema, *outSchema,
std::move(aggregateInputInfos), std::move(aggregatesOutputPos), inSchema, outSchema,
std::move(prevOperator), paramsString);
} else {
auto sharedState = make_shared<SimpleAggregateSharedState>(aggregateFunctions);
auto aggregate =
make_unique<SimpleAggregate>(std::make_unique<ResultSetDescriptor>(*inSchema),
make_unique<SimpleAggregate>(ResultSetDescriptor::getResultSetDescriptor(inSchema),
sharedState, std::move(aggregateFunctions), std::move(aggregateInputInfos),
std::move(prevOperator), getOperatorID(), paramsString);
return make_unique<SimpleAggregateScan>(
Expand All @@ -99,16 +99,16 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createHashAggregate(
const binder::expression_vector& dependentKeyExpressions,
std::vector<std::unique_ptr<function::AggregateFunction>> aggregateFunctions,
std::vector<std::unique_ptr<AggregateInputInfo>> aggregateInputInfos,
std::vector<DataPos> aggregatesOutputPos, const planner::Schema& inSchema,
const planner::Schema& outSchema, std::unique_ptr<PhysicalOperator> prevOperator,
const std::string& paramsString) {
std::vector<DataPos> aggregatesOutputPos, planner::Schema* inSchema, planner::Schema* outSchema,
std::unique_ptr<PhysicalOperator> prevOperator, const std::string& paramsString) {
auto sharedState = make_shared<HashAggregateSharedState>(aggregateFunctions);
auto flatKeyExpressions = getKeyExpressions(keyExpressions, inSchema, true /* isFlat */);
auto unFlatKeyExpressions = getKeyExpressions(keyExpressions, inSchema, false /* isFlat */);
auto aggregate = make_unique<HashAggregate>(std::make_unique<ResultSetDescriptor>(inSchema),
sharedState, getExpressionsDataPos(flatKeyExpressions, inSchema),
getExpressionsDataPos(unFlatKeyExpressions, inSchema),
getExpressionsDataPos(dependentKeyExpressions, inSchema), std::move(aggregateFunctions),
auto flatKeyExpressions = getKeyExpressions(keyExpressions, *inSchema, true /* isFlat */);
auto unFlatKeyExpressions = getKeyExpressions(keyExpressions, *inSchema, false /* isFlat */);
auto aggregate = make_unique<HashAggregate>(
ResultSetDescriptor::getResultSetDescriptor(inSchema), sharedState,
getExpressionsDataPos(flatKeyExpressions, *inSchema),
getExpressionsDataPos(unFlatKeyExpressions, *inSchema),
getExpressionsDataPos(dependentKeyExpressions, *inSchema), std::move(aggregateFunctions),
std::move(aggregateInputInfos), std::move(prevOperator), getOperatorID(), paramsString);
binder::expression_vector outputExpressions;
outputExpressions.insert(
Expand All @@ -118,7 +118,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createHashAggregate(
outputExpressions.insert(
outputExpressions.end(), dependentKeyExpressions.begin(), dependentKeyExpressions.end());
return std::make_unique<HashAggregateScan>(sharedState,
getExpressionsDataPos(outputExpressions, outSchema), std::move(aggregatesOutputPos),
getExpressionsDataPos(outputExpressions, *outSchema), std::move(aggregatesOutputPos),
std::move(aggregate), getOperatorID(), paramsString);
}

Expand Down
2 changes: 1 addition & 1 deletion src/processor/mapper/map_cross_product.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCrossProductToPhysical(
auto buildSideSchema = logicalCrossProduct->getBuildSideSchema();
auto buildSidePrevOperator = mapLogicalOperatorToPhysical(logicalCrossProduct->getChild(1));
auto resultCollector = appendResultCollector(buildSideSchema->getExpressionsInScope(),
*buildSideSchema, std::move(buildSidePrevOperator));
buildSideSchema, std::move(buildSidePrevOperator));
// map probe side
auto probeSidePrevOperator = mapLogicalOperatorToPhysical(logicalCrossProduct->getChild(0));
std::vector<DataPos> outVecPos;
Expand Down
2 changes: 1 addition & 1 deletion src/processor/mapper/map_distinct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalDistinctToPhysical(
std::vector<DataPos> emptyAggregatesOutputPos;
return createHashAggregate(logicalDistinct.getKeyExpressions(),
logicalDistinct.getDependentKeyExpressions(), std::move(emptyAggFunctions),
std::move(emptyAggInputInfos), std::move(emptyAggregatesOutputPos), *inSchema, *outSchema,
std::move(emptyAggInputInfos), std::move(emptyAggregatesOutputPos), inSchema, outSchema,
std::move(prevOperator), logicalDistinct.getExpressionsForPrinting());
}

Expand Down
2 changes: 1 addition & 1 deletion src/processor/mapper/map_extend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalRecursiveExtendToPhysica
auto relTableID = rel->getSingleTableID();

auto expressions = inSchema->getExpressionsInScope();
auto resultCollector = appendResultCollector(expressions, *inSchema, std::move(prevOperator));
auto resultCollector = appendResultCollector(expressions, inSchema, std::move(prevOperator));
auto sharedInputFTable = resultCollector->getSharedState();
std::vector<DataPos> outDataPoses;
std::vector<uint32_t> colIndicesToScan;
Expand Down
6 changes: 3 additions & 3 deletions src/processor/mapper/map_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalHashJoinToPhysical(
}
auto sharedState = std::make_shared<HashJoinSharedState>();
// create hashJoin build
auto hashJoinBuild =
make_unique<HashJoinBuild>(std::make_unique<ResultSetDescriptor>(*buildSchema), sharedState,
buildDataInfo, std::move(buildSidePrevOperator), getOperatorID(), paramsString);
auto hashJoinBuild = make_unique<HashJoinBuild>(
ResultSetDescriptor::getResultSetDescriptor(buildSchema), sharedState, buildDataInfo,
std::move(buildSidePrevOperator), getOperatorID(), paramsString);
// create hashJoin probe
ProbeDataInfo probeDataInfo(probeKeysDataPos, probePayloadsOutPos);
if (hashJoin->getJoinType() == common::JoinType::MARK) {
Expand Down
2 changes: 1 addition & 1 deletion src/processor/mapper/map_intersect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalIntersectToPhysical(
auto sharedState = std::make_shared<IntersectSharedState>();
sharedStates.push_back(sharedState);
children[i] = make_unique<IntersectBuild>(
std::make_unique<ResultSetDescriptor>(*buildSchema), sharedState, buildDataInfo,
ResultSetDescriptor::getResultSetDescriptor(buildSchema), sharedState, buildDataInfo,
std::move(buildSidePrevOperator), getOperatorID(), keyNodeID->toString());
IntersectDataInfo info{DataPos(outSchema->getExpressionPos(*keyNodeID)), payloadsDataPos};
intersectDataInfos.push_back(info);
Expand Down
2 changes: 1 addition & 1 deletion src/processor/mapper/map_order_by.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalOrderByToPhysical(
auto orderBySharedState = std::make_shared<SharedFactorizedTablesAndSortedKeyBlocks>();

auto orderBy =
make_unique<OrderBy>(std::make_unique<ResultSetDescriptor>(*inSchema), orderByDataInfo,
make_unique<OrderBy>(ResultSetDescriptor::getResultSetDescriptor(inSchema), orderByDataInfo,
orderBySharedState, std::move(prevOperator), getOperatorID(), paramsString);
auto dispatcher = std::make_shared<KeyBlockMergeTaskDispatcher>();
auto orderByMerge = make_unique<OrderByMerge>(orderBySharedState, std::move(dispatcher),
Expand Down
2 changes: 1 addition & 1 deletion src/processor/mapper/map_union.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalUnionAllToPhysical(
auto childSchema = logicalUnionAll.getSchemaBeforeUnion(i);
auto prevOperator = mapLogicalOperatorToPhysical(child);
auto resultCollector = appendResultCollector(
childSchema->getExpressionsInScope(), *childSchema, std::move(prevOperator));
childSchema->getExpressionsInScope(), childSchema, std::move(prevOperator));
resultCollectorSharedStates.push_back(resultCollector->getSharedState());
prevOperators.push_back(std::move(resultCollector));
}
Expand Down
10 changes: 5 additions & 5 deletions src/processor/mapper/plan_mapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ std::unique_ptr<PhysicalPlan> PlanMapper::mapLogicalPlanToPhysical(LogicalPlan*
auto lastOperator = mapLogicalOperatorToPhysical(logicalPlan->getLastOperator());
if (!StatementTypeUtils::isCopyCSV(statementType)) {
lastOperator = appendResultCollector(
expressionsToCollect, *logicalPlan->getSchema(), std::move(lastOperator));
expressionsToCollect, logicalPlan->getSchema(), std::move(lastOperator));
}
return make_unique<PhysicalPlan>(std::move(lastOperator));
}
Expand Down Expand Up @@ -145,19 +145,19 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalOperatorToPhysical(
}

std::unique_ptr<ResultCollector> PlanMapper::appendResultCollector(
const binder::expression_vector& expressionsToCollect, const Schema& schema,
const binder::expression_vector& expressionsToCollect, Schema* schema,
std::unique_ptr<PhysicalOperator> prevOperator) {
std::vector<std::pair<DataPos, LogicalType>> payloadsPosAndType;
std::vector<bool> isPayloadFlat;
for (auto& expression : expressionsToCollect) {
auto expressionName = expression->getUniqueName();
auto dataPos = DataPos(schema.getExpressionPos(*expression));
auto isFlat = schema.getGroup(expressionName)->isFlat();
auto dataPos = DataPos(schema->getExpressionPos(*expression));
auto isFlat = schema->getGroup(expressionName)->isFlat();
payloadsPosAndType.emplace_back(dataPos, expression->dataType);
isPayloadFlat.push_back(isFlat);
}
auto sharedState = std::make_shared<FTableSharedState>();
return make_unique<ResultCollector>(std::make_unique<ResultSetDescriptor>(schema),
return make_unique<ResultCollector>(ResultSetDescriptor::getResultSetDescriptor(schema),
payloadsPosAndType, isPayloadFlat, sharedState, std::move(prevOperator), getOperatorID(),
binder::ExpressionUtil::toString(expressionsToCollect));
}
Expand Down
19 changes: 1 addition & 18 deletions src/processor/processor_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,7 @@ std::unique_ptr<ResultSet> ProcessorTask::populateResultSet(
// Some pipeline does not need a resultSet, e.g. OrderByMerge
return nullptr;
}
auto numDataChunks = resultSetDescriptor->getNumDataChunks();
auto resultSet = std::make_unique<ResultSet>(numDataChunks);
for (auto i = 0u; i < numDataChunks; ++i) {
auto dataChunkDescriptor = resultSetDescriptor->getDataChunkDescriptor(i);
auto numValueVectors = dataChunkDescriptor->getNumValueVectors();
auto dataChunk = std::make_unique<common::DataChunk>(numValueVectors);
if (dataChunkDescriptor->isSingleState()) {
dataChunk->state = common::DataChunkState::getSingleValueDataChunkState();
}
for (auto j = 0u; j < dataChunkDescriptor->getNumValueVectors(); ++j) {
auto expression = dataChunkDescriptor->getExpression(j);
auto vector =
std::make_shared<common::ValueVector>(expression->dataType, memoryManager);
dataChunk->insert(j, std::move(vector));
}
resultSet->insert(i, std::move(dataChunk));
}
return resultSet;
return ResultSet::getResultSet(resultSetDescriptor, memoryManager);
}

} // namespace processor
Expand Down
21 changes: 21 additions & 0 deletions src/processor/result/result_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,27 @@
namespace kuzu {
namespace processor {

std::unique_ptr<ResultSet> ResultSet::getResultSet(
ResultSetDescriptor* resultSetDescriptor, storage::MemoryManager* memoryManager) {
auto numDataChunks = resultSetDescriptor->dataChunkDescriptors.size();
auto resultSet = std::make_unique<ResultSet>(numDataChunks);
for (auto i = 0u; i < numDataChunks; ++i) {
auto dataChunkDescriptor = resultSetDescriptor->dataChunkDescriptors[i].get();
auto numValueVectors = dataChunkDescriptor->logicalTypes.size();
auto dataChunk = std::make_unique<common::DataChunk>(numValueVectors);
if (dataChunkDescriptor->isSingleState) {
dataChunk->state = common::DataChunkState::getSingleValueDataChunkState();
}
for (auto j = 0u; j < numValueVectors; ++j) {
auto vector = std::make_shared<common::ValueVector>(
dataChunkDescriptor->logicalTypes[j], memoryManager);
dataChunk->insert(j, std::move(vector));
}
resultSet->insert(i, std::move(dataChunk));
}
return resultSet;
}

uint64_t ResultSet::getNumTuplesWithoutMultiplicity(
const std::unordered_set<uint32_t>& dataChunksPosInScope) {
assert(!dataChunksPosInScope.empty());
Expand Down
Loading

0 comments on commit 977e464

Please sign in to comment.