Skip to content

Commit

Permalink
Cherry pick v3.1.0 (0412-0413) (#4145)
Browse files Browse the repository at this point in the history
* Add SetUsageMessage For daemons (#4134)

* fix the wrong regex pattern of scientific notation (#4136)

* fix updatePart (#4139)

* fix updatePart

* fix format check

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* show DATA_BALANCE in job list (#4138)

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* fix disk fault recovery (#4131)

* fix disk fault recovery

* add ut

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* Refactor findpath (#4095)

* optimize bfs

* optimizer allpath

* optimizer multi-shortestpath

* optimizer multi shortest path

* fix validate unit test

* add some comment

* fix error

* fix bfs error

* add comment

* delete conjunct

* add findpath unit test

* delete useless file

* delete log

* remove check

* multi thread executor

* single shortest multi thread

* add some testcases

* add gflags

* fix bfs error

* address comment

* Just report errorof property pruner in debug mode (#4142)

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* remove Atomic Edge (#4127)

* try to remove Atomic Edge

* remove some space

* remove zone

Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com>

* fix show tag/edge index status (#4148)

* fix multiple match (#4143)

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* Refactor the process of symbols in optimizer. (#4146)

* Refactor the process of symbols in optimizer.

* Fix variable shadowing.

* Collect boundary from MatchResult directly.

* Check variable in TransformResult don't referenced by outside plan
nodes.

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

Co-authored-by: Alex Xing <90179377+SuperYoko@users.noreply.github.com>
Co-authored-by: jie.wang <38901892+jievince@users.noreply.github.com>
Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com>
Co-authored-by: liwenhui-soul <38217397+liwenhui-soul@users.noreply.github.com>
Co-authored-by: panda-sheep <59197347+panda-sheep@users.noreply.github.com>
Co-authored-by: jimingquan <mingquan.ji@vesoft.com>
Co-authored-by: lionel.liu@vesoft.com <52276794+liuyu85cn@users.noreply.github.com>
Co-authored-by: kyle.cao <kyle.cao@vesoft.com>
Co-authored-by: shylock <33566796+Shylock-Hg@users.noreply.github.com>
  • Loading branch information
10 people authored Apr 13, 2022
1 parent 50159ab commit 85b77a5
Show file tree
Hide file tree
Showing 85 changed files with 2,914 additions and 4,144 deletions.
1 change: 1 addition & 0 deletions src/daemons/GraphDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ DECLARE_bool(containerized);

int main(int argc, char *argv[]) {
google::SetVersionString(nebula::versionString());
google::SetUsageMessage("Usage: " + std::string(argv[0]) + " [options]");
if (argc == 1) {
printHelp(argv[0]);
return EXIT_FAILURE;
Expand Down
1 change: 1 addition & 0 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ extern Status setupBreakpad();

int main(int argc, char* argv[]) {
google::SetVersionString(nebula::versionString());
google::SetUsageMessage("Usage: " + std::string(argv[0]) + " [options]");
// Detect if the server has already been started
// Check pid before glog init, in case of user may start daemon twice
// the 2nd will make the 1st failed to output log anymore
Expand Down
1 change: 1 addition & 0 deletions src/daemons/StandAloneDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ DEFINE_int32(meta_port, 45500, "Meta daemon listening port");

int main(int argc, char *argv[]) {
google::SetVersionString(nebula::versionString());
google::SetUsageMessage("Usage: " + std::string(argv[0]) + " [options]");
gflags::ParseCommandLineFlags(&argc, &argv, false);

if (argc == 1) {
Expand Down
1 change: 1 addition & 0 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ extern Status setupBreakpad();

int main(int argc, char *argv[]) {
google::SetVersionString(nebula::versionString());
google::SetUsageMessage("Usage: " + std::string(argv[0]) + " [options]");
// Detect if the server has already been started
// Check pid before glog init, in case of user may start daemon twice
// the 2nd will make the 1st failed to output log anymore
Expand Down
1 change: 1 addition & 0 deletions src/graph/context/Symbols.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ SymbolTable::SymbolTable(ObjectPool* objPool) {

Variable* SymbolTable::newVariable(std::string name) {
VLOG(1) << "New variable for: " << name;
DCHECK(vars_.find(name) == vars_.end());
auto* variable = objPool_->makeAndAdd<Variable>(name);
addVar(std::move(name), variable);
return variable;
Expand Down
3 changes: 1 addition & 2 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ nebula_add_library(
query/TraverseExecutor.cpp
query/AppendVerticesExecutor.cpp
query/RollUpApplyExecutor.cpp
algo/ConjunctPathExecutor.cpp
algo/BFSShortestPathExecutor.cpp
algo/ProduceSemiShortestPathExecutor.cpp
algo/MultiShortestPathExecutor.cpp
algo/ProduceAllPathsExecutor.cpp
algo/CartesianProductExecutor.cpp
algo/SubgraphExecutor.cpp
Expand Down
10 changes: 3 additions & 7 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@
#include "graph/executor/admin/ZoneExecutor.h"
#include "graph/executor/algo/BFSShortestPathExecutor.h"
#include "graph/executor/algo/CartesianProductExecutor.h"
#include "graph/executor/algo/ConjunctPathExecutor.h"
#include "graph/executor/algo/MultiShortestPathExecutor.h"
#include "graph/executor/algo/ProduceAllPathsExecutor.h"
#include "graph/executor/algo/ProduceSemiShortestPathExecutor.h"
#include "graph/executor/algo/SubgraphExecutor.h"
#include "graph/executor/logic/ArgumentExecutor.h"
#include "graph/executor/logic/LoopExecutor.h"
Expand Down Expand Up @@ -447,11 +446,8 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kBFSShortest: {
return pool->add(new BFSShortestPathExecutor(node, qctx));
}
case PlanNode::Kind::kProduceSemiShortestPath: {
return pool->add(new ProduceSemiShortestPathExecutor(node, qctx));
}
case PlanNode::Kind::kConjunctPath: {
return pool->add(new ConjunctPathExecutor(node, qctx));
case PlanNode::Kind::kMultiShortestPath: {
return pool->add(new MultiShortestPathExecutor(node, qctx));
}
case PlanNode::Kind::kProduceAllPaths: {
return pool->add(new ProduceAllPathsExecutor(node, qctx));
Expand Down
7 changes: 0 additions & 7 deletions src/graph/executor/admin/SpaceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ folly::Future<Status> DescSpaceExecutor::execute() {
"Charset",
"Collate",
"Vid Type",
"Atomic Edge",
"Comment"};
Row row;
row.values.emplace_back(spaceId);
Expand All @@ -86,12 +85,6 @@ folly::Future<Status> DescSpaceExecutor::execute() {
row.values.emplace_back(properties.get_charset_name());
row.values.emplace_back(properties.get_collate_name());
row.values.emplace_back(SchemaUtil::typeToString(properties.get_vid_type()));
bool sAtomicEdge{false};
if (properties.isolation_level_ref().has_value() &&
(*properties.isolation_level_ref() == meta::cpp2::IsolationLevel::TOSS)) {
sAtomicEdge = true;
}
row.values.emplace_back(sAtomicEdge);

if (properties.comment_ref().has_value()) {
row.values.emplace_back(*properties.comment_ref());
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/admin/SubmitJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
uint32_t total = paras.size() - index - 1, succeeded = 0, failed = 0, inProgress = 0,
invalid = 0;
v.emplace_back(Row({jd.get_job_id(),
apache::thrift::util::enumNameSafe(jd.get_type()),
apache::thrift::util::enumNameSafe(meta::cpp2::JobType::DATA_BALANCE),
apache::thrift::util::enumNameSafe(jd.get_status()),
convertJobTimestampToDateTime(jd.get_start_time()).toString(),
convertJobTimestampToDateTime(jd.get_stop_time()).toString(),
Expand Down
239 changes: 202 additions & 37 deletions src/graph/executor/algo/BFSShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
@@ -1,52 +1,217 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
// Copyright (c) 2022 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#include "graph/executor/algo/BFSShortestPathExecutor.h"

#include "graph/planner/plan/Algo.h"

DECLARE_int32(num_operator_threads);
namespace nebula {
namespace graph {
folly::Future<Status> BFSShortestPathExecutor::execute() {
SCOPED_TIMER(&execTime_);
auto* bfs = asNode<BFSShortestPath>(node());
auto iter = ectx_->getResult(bfs->inputVar()).iter();
VLOG(1) << "current: " << node()->outputVar();
VLOG(1) << "input: " << bfs->inputVar();
pathNode_ = asNode<BFSShortestPath>(node());

if (step_ == 1) {
allRightEdges_.emplace_back();
auto& currentEdges = allRightEdges_.back();
auto rIter = ectx_->getResult(pathNode_->rightVidVar()).iter();
std::unordered_set<Value> rightVids;
for (; rIter->valid(); rIter->next()) {
auto& vid = rIter->getColumn(0);
if (rightVids.emplace(vid).second) {
Edge dummy;
currentEdges.emplace(vid, std::move(dummy));
}
}
}

std::vector<folly::Future<Status>> futures;
auto leftFuture = folly::via(runner(), [this]() { return buildPath(false); });
auto rightFuture = folly::via(runner(), [this]() { return buildPath(true); });
futures.emplace_back(std::move(leftFuture));
futures.emplace_back(std::move(rightFuture));

return folly::collect(futures)
.via(runner())
.thenValue([this](auto&& status) {
UNUSED(status);
return conjunctPath();
})
.thenValue([this](auto&& status) {
UNUSED(status);
step_++;
DataSet ds;
ds.colNames = pathNode_->colNames();
ds.rows.swap(currentDs_.rows);
return finish(ResultBuilder().value(Value(std::move(ds))).build());
});
}

Status BFSShortestPathExecutor::buildPath(bool reverse) {
auto iter = reverse ? ectx_->getResult(pathNode_->rightInputVar()).iter()
: ectx_->getResult(pathNode_->leftInputVar()).iter();
DCHECK(!!iter);
auto& visitedVids = reverse ? rightVisitedVids_ : leftVisitedVids_;
auto& allEdges = reverse ? allRightEdges_ : allLeftEdges_;
allEdges.emplace_back();
auto& currentEdges = allEdges.back();

auto iterSize = iter->size();
visitedVids.reserve(visitedVids.size() + iterSize);

std::unordered_set<Value> uniqueDst;
uniqueDst.reserve(iterSize);
DataSet nextStepVids;
nextStepVids.colNames = {nebula::kVid};
if (step_ == 1) {
for (; iter->valid(); iter->next()) {
auto edgeVal = iter->getEdge();
if (UNLIKELY(!edgeVal.isEdge())) {
continue;
}
auto& edge = edgeVal.getEdge();
auto dst = edge.dst;
visitedVids.emplace(edge.src);
if (uniqueDst.emplace(dst).second) {
nextStepVids.rows.emplace_back(Row({dst}));
}
currentEdges.emplace(std::move(dst), std::move(edge));
}
} else {
for (; iter->valid(); iter->next()) {
auto edgeVal = iter->getEdge();
if (UNLIKELY(!edgeVal.isEdge())) {
continue;
}
auto& edge = edgeVal.getEdge();
auto dst = edge.dst;
if (visitedVids.find(dst) != visitedVids.end()) {
continue;
}
if (uniqueDst.emplace(dst).second) {
nextStepVids.rows.emplace_back(Row({dst}));
}
currentEdges.emplace(std::move(dst), std::move(edge));
}
}
// set nextVid
const auto& nextVidVar = reverse ? pathNode_->rightVidVar() : pathNode_->leftVidVar();
ectx_->setResult(nextVidVar, ResultBuilder().value(std::move(nextStepVids)).build());

visitedVids.insert(std::make_move_iterator(uniqueDst.begin()),
std::make_move_iterator(uniqueDst.end()));
return Status::OK();
}

folly::Future<Status> BFSShortestPathExecutor::conjunctPath() {
const auto& leftEdges = allLeftEdges_.back();
const auto& preRightEdges = allRightEdges_[step_ - 1];
std::unordered_set<Value> meetVids;
bool oddStep = true;
for (const auto& edge : leftEdges) {
if (preRightEdges.find(edge.first) != preRightEdges.end()) {
meetVids.emplace(edge.first);
}
}
if (meetVids.empty() && step_ * 2 <= pathNode_->steps()) {
const auto& rightEdges = allRightEdges_.back();
for (const auto& edge : leftEdges) {
if (rightEdges.find(edge.first) != rightEdges.end()) {
meetVids.emplace(edge.first);
oddStep = false;
}
}
}
if (meetVids.empty()) {
return Status::OK();
}
size_t i = 0;
size_t totalSize = meetVids.size();
size_t batchSize = totalSize / static_cast<size_t>(FLAGS_num_operator_threads);
std::vector<Value> batchVids;
batchVids.reserve(batchSize);
std::vector<folly::Future<DataSet>> futures;
for (auto& vid : meetVids) {
batchVids.push_back(vid);
if (i == totalSize - 1 || batchVids.size() == batchSize) {
auto future = folly::via(runner(), [this, vids = std::move(batchVids), oddStep]() {
return doConjunct(vids, oddStep);
});
futures.emplace_back(std::move(future));
}
i++;
}

return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
for (auto& resp : resps) {
currentDs_.append(std::move(resp));
}
return Status::OK();
});
}

DataSet BFSShortestPathExecutor::doConjunct(const std::vector<Value>& meetVids,
bool oddStep) const {
DataSet ds;
ds.colNames = node()->colNames();
std::unordered_multimap<Value, Value> interim;

for (; iter->valid(); iter->next()) {
auto edgeVal = iter->getEdge();
if (!edgeVal.isEdge()) {
continue;
}
auto& edge = edgeVal.getEdge();
auto visited = visited_.find(edge.dst) != visited_.end();
if (visited) {
continue;
}

// save the starts.
visited_.emplace(edge.src);
VLOG(1) << "dst: " << edge.dst << " edge: " << edge;
interim.emplace(edge.dst, std::move(edgeVal));
}
for (auto& kv : interim) {
auto dst = std::move(kv.first);
auto edge = std::move(kv.second);
Row row;
row.values.emplace_back(dst);
row.values.emplace_back(std::move(edge));
ds.rows.emplace_back(std::move(row));
visited_.emplace(dst);
}
return finish(ResultBuilder().value(Value(std::move(ds))).build());
auto leftPaths = createPath(meetVids, false, oddStep);
auto rightPaths = createPath(meetVids, true, oddStep);
for (auto& leftPath : leftPaths) {
auto range = rightPaths.equal_range(leftPath.first);
for (auto& rightPath = range.first; rightPath != range.second; ++rightPath) {
Path result = leftPath.second;
result.reverse();
result.append(rightPath->second);
Row row;
row.emplace_back(std::move(result));
ds.rows.emplace_back(std::move(row));
}
}
return ds;
}

std::unordered_multimap<Value, Path> BFSShortestPathExecutor::createPath(
std::vector<Value> meetVids, bool reverse, bool oddStep) const {
std::unordered_multimap<Value, Path> result;
auto& allEdges = reverse ? allRightEdges_ : allLeftEdges_;
for (auto& meetVid : meetVids) {
Path start;
start.src = Vertex(meetVid, {});
std::vector<Path> interimPaths = {std::move(start)};
auto iter = (reverse && oddStep) ? allEdges.rbegin() + 1 : allEdges.rbegin();
auto end = reverse ? allEdges.rend() - 1 : allEdges.rend();
if (iter == end) {
result.emplace(meetVid, std::move(start));
return result;
}
for (; iter != end; ++iter) {
std::vector<Path> temp;
for (auto& interimPath : interimPaths) {
Value id;
if (interimPath.steps.empty()) {
id = interimPath.src.vid;
} else {
id = interimPath.steps.back().dst.vid;
}
auto range = iter->equal_range(id);
for (auto edgeIter = range.first; edgeIter != range.second; ++edgeIter) {
Path p = interimPath;
auto& edge = edgeIter->second;
p.steps.emplace_back(Step(Vertex(edge.src, {}), -edge.type, edge.name, edge.ranking, {}));

if (iter == end - 1) {
result.emplace(p.src.vid, std::move(p));
} else {
temp.emplace_back(std::move(p));
}
} // edgeIter
} // interimPath
interimPaths = std::move(temp);
}
}
return result;
}

} // namespace graph
} // namespace nebula
Loading

0 comments on commit 85b77a5

Please sign in to comment.