diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index 28e3c1a039f..f2081951a4d 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -82,6 +82,7 @@ jobs: tags: | vesoft/nebula-graphd:nightly target: graphd + cache-to: type=local,dest=/tmp/buildx-cache,mode=max push: true - uses: docker/build-push-action@v2 with: @@ -91,6 +92,7 @@ jobs: tags: | vesoft/nebula-storaged:nightly target: storaged + cache-from: type=local,src=/tmp/buildx-cache push: true - uses: docker/build-push-action@v2 with: @@ -100,6 +102,7 @@ jobs: tags: | vesoft/nebula-metad:nightly target: metad + cache-from: type=local,src=/tmp/buildx-cache push: true - uses: docker/build-push-action@v2 with: @@ -109,7 +112,11 @@ jobs: tags: | vesoft/nebula-tools:nightly target: tools + cache-from: type=local,src=/tmp/buildx-cache push: true + - name: delete the cache + run: | + rm -rf /tmp/buildx-cache coverage: name: coverage diff --git a/.github/workflows/rc.yml b/.github/workflows/rc.yml index 48014a99182..01a82d273ea 100644 --- a/.github/workflows/rc.yml +++ b/.github/workflows/rc.yml @@ -117,6 +117,7 @@ jobs: ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-graphd:${{ steps.tagname.outputs.majorver }} ${{ steps.docker.outputs.graphdTag }} target: graphd + cache-to: type=local,dest=/tmp/buildx-cache,mode=max push: true build-args: | BRANCH=${{ steps.tagname.outputs.tag }} @@ -128,11 +129,12 @@ jobs: file: ./docker/Dockerfile platforms: linux/amd64,linux/arm64 tags: | - ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-storaged${{ steps.tagname.outputs.tag }} + ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-storaged:${{ steps.tagname.outputs.tag }} ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-storaged:${{ steps.tagname.outputs.majorver }} ${{ steps.docker.outputs.storagedTag }} target: storaged push: true + cache-from: type=local,src=/tmp/buildx-cache build-args: | BRANCH=${{ steps.tagname.outputs.tag }} VERSION=${{ steps.tagname.outputs.tagnum }} @@ -143,11 +145,12 @@ jobs: file: ./docker/Dockerfile platforms: linux/amd64,linux/arm64 tags: | - ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-metad${{ steps.tagname.outputs.tag }} + ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-metad:${{ steps.tagname.outputs.tag }} ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-metad:${{ steps.tagname.outputs.majorver }} ${{ steps.docker.outputs.metadTag }} target: metad push: true + cache-from: type=local,src=/tmp/buildx-cache build-args: | BRANCH=${{ steps.tagname.outputs.tag }} VERSION=${{ steps.tagname.outputs.tagnum }} @@ -158,14 +161,18 @@ jobs: file: ./docker/Dockerfile platforms: linux/amd64,linux/arm64 tags: | - ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-tools${{ steps.tagname.outputs.tag }} + ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-tools:${{ steps.tagname.outputs.tag }} ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-tools:${{ steps.tagname.outputs.majorver }} ${{ steps.docker.outputs.toolsTag }} target: tools push: true + cache-from: type=local,src=/tmp/buildx-cache build-args: | BRANCH=${{ steps.tagname.outputs.tag }} VERSION=${{ steps.tagname.outputs.tagnum }} + - name: delete the cache + run: | + rm -rf /tmp/buildx-cache test: name: test needs: package diff --git a/src/common/base/ObjectPool.h b/src/common/base/ObjectPool.h index 081cfcb3db5..acd20d73986 100644 --- a/src/common/base/ObjectPool.h +++ b/src/common/base/ObjectPool.h @@ -38,7 +38,9 @@ class ObjectPool final : private boost::noncopyable, private cpp::NonMovable { template T *makeAndAdd(Args &&... args) { + lock_.lock(); void *ptr = arena_.allocateAligned(sizeof(T)); + lock_.unlock(); return add(new (ptr) T(std::forward(args)...)); } diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp index 5664bf2fac9..4c21c118a14 100644 --- a/src/daemons/MetaDaemon.cpp +++ b/src/daemons/MetaDaemon.cpp @@ -176,11 +176,16 @@ int main(int argc, char* argv[]) { } if (nebula::value(ret) == localhost) { LOG(INFO) << "Check and init root user"; - if (!nebula::meta::RootUserMan::isUserExists(gKVStore.get())) { - if (!nebula::meta::RootUserMan::initRootUser(gKVStore.get())) { - LOG(ERROR) << "Init root user failed"; - return EXIT_FAILURE; - } + auto checkRet = nebula::meta::RootUserMan::isGodExists(gKVStore.get()); + if (!nebula::ok(checkRet)) { + auto retCode = nebula::error(checkRet); + LOG(ERROR) << "Parser God Role error:" << apache::thrift::util::enumNameSafe(retCode); + return EXIT_FAILURE; + } + auto existGod = nebula::value(checkRet); + if (!existGod && !nebula::meta::RootUserMan::initRootUser(gKVStore.get())) { + LOG(ERROR) << "Init root user failed"; + return EXIT_FAILURE; } } } diff --git a/src/daemons/StandAloneDaemon.cpp b/src/daemons/StandAloneDaemon.cpp index 4b17749856b..5b519f726f5 100644 --- a/src/daemons/StandAloneDaemon.cpp +++ b/src/daemons/StandAloneDaemon.cpp @@ -234,11 +234,16 @@ int main(int argc, char *argv[]) { } if (nebula::value(ret) == metaLocalhost) { LOG(INFO) << "Check and init root user"; - if (!nebula::meta::RootUserMan::isUserExists(gMetaKVStore.get())) { - if (!nebula::meta::RootUserMan::initRootUser(gMetaKVStore.get())) { - LOG(ERROR) << "Init root user failed"; - return; - } + auto checkRet = nebula::meta::RootUserMan::isGodExists(gMetaKVStore.get()); + if (!nebula::ok(checkRet)) { + auto retCode = nebula::error(checkRet); + LOG(ERROR) << "Parser God Role error:" << apache::thrift::util::enumNameSafe(retCode); + return; + } + auto existGod = nebula::value(checkRet); + if (!existGod && !nebula::meta::RootUserMan::initRootUser(gMetaKVStore.get())) { + LOG(ERROR) << "Init root user failed"; + return; } } } diff --git a/src/graph/executor/algo/BatchShortestPath.cpp b/src/graph/executor/algo/BatchShortestPath.cpp index cf7c8591ac0..f01e0af2a6e 100644 --- a/src/graph/executor/algo/BatchShortestPath.cpp +++ b/src/graph/executor/algo/BatchShortestPath.cpp @@ -135,7 +135,7 @@ folly::Future BatchShortestPath::getNeighbors(size_t rowNum, size_t step nullptr) .via(qctx_->rctx()->runner()) .thenValue([this, rowNum, reverse, stepNum, getNbrTime](auto&& resp) { - addStats(resp, stats_, stepNum, getNbrTime.elapsedInUSec(), reverse); + addStats(resp, stepNum, getNbrTime.elapsedInUSec(), reverse); return buildPath(rowNum, std::move(resp), reverse); }); } diff --git a/src/graph/executor/algo/ShortestPathBase.cpp b/src/graph/executor/algo/ShortestPathBase.cpp index 876e52d352b..ea4b7645eff 100644 --- a/src/graph/executor/algo/ShortestPathBase.cpp +++ b/src/graph/executor/algo/ShortestPathBase.cpp @@ -38,7 +38,7 @@ folly::Future> ShortestPathBase::getMeetVidsProps( nullptr) .via(qctx_->rctx()->runner()) .thenValue([this, getPropsTime](PropRpcResponse&& resp) { - addStats(resp, stats_, getPropsTime.elapsedInUSec()); + addStats(resp, getPropsTime.elapsedInUSec()); return handlePropResp(std::move(resp)); }); } @@ -168,7 +168,6 @@ Status ShortestPathBase::handleErrorCode(nebula::cpp2::ErrorCode code, Partition } void ShortestPathBase::addStats(RpcResponse& resp, - std::unordered_map* stats, size_t stepNum, int64_t timeInUSec, bool reverse) const { @@ -193,15 +192,17 @@ void ShortestPathBase::addStats(RpcResponse& resp, } ss << "\n}"; if (reverse) { - stats->emplace(folly::sformat("reverse step {}", stepNum), ss.str()); + statsLock_.lock(); + stats_->emplace(folly::sformat("reverse step {}", stepNum), ss.str()); + statsLock_.unlock(); } else { - stats->emplace(folly::sformat("step {}", stepNum), ss.str()); + statsLock_.lock(); + stats_->emplace(folly::sformat("step {}", stepNum), ss.str()); + statsLock_.unlock(); } } -void ShortestPathBase::addStats(PropRpcResponse& resp, - std::unordered_map* stats, - int64_t timeInUSec) const { +void ShortestPathBase::addStats(PropRpcResponse& resp, int64_t timeInUSec) const { auto& hostLatency = resp.hostLatency(); std::stringstream ss; ss << "{\n"; @@ -217,7 +218,7 @@ void ShortestPathBase::addStats(PropRpcResponse& resp, ss << "\n}"; } ss << "\n}"; - stats->emplace(folly::sformat("get_prop "), ss.str()); + stats_->emplace(folly::sformat("get_prop "), ss.str()); } } // namespace graph diff --git a/src/graph/executor/algo/ShortestPathBase.h b/src/graph/executor/algo/ShortestPathBase.h index 7f80336260f..bc7ece02df2 100644 --- a/src/graph/executor/algo/ShortestPathBase.h +++ b/src/graph/executor/algo/ShortestPathBase.h @@ -43,15 +43,9 @@ class ShortestPathBase { Status handleErrorCode(nebula::cpp2::ErrorCode code, PartitionID partId) const; - void addStats(RpcResponse& resp, - std::unordered_map* stats, - size_t stepNum, - int64_t timeInUSec, - bool reverse) const; + void addStats(RpcResponse& resp, size_t stepNum, int64_t timeInUSec, bool reverse) const; - void addStats(PropRpcResponse& resp, - std::unordered_map* stats, - int64_t timeInUSec) const; + void addStats(PropRpcResponse& resp, int64_t timeInUSec) const; template StatusOr handleCompleteness(const storage::StorageRpcResponse& rpcResp, @@ -86,6 +80,7 @@ class ShortestPathBase { std::unordered_map* stats_{nullptr}; size_t maxStep_; bool singleShortest_{true}; + folly::SpinLock statsLock_; std::vector resultDs_; std::vector leftVids_; diff --git a/src/graph/executor/algo/SingleShortestPath.cpp b/src/graph/executor/algo/SingleShortestPath.cpp index eb862e7967a..379a4a358ba 100644 --- a/src/graph/executor/algo/SingleShortestPath.cpp +++ b/src/graph/executor/algo/SingleShortestPath.cpp @@ -108,7 +108,7 @@ folly::Future SingleShortestPath::getNeighbors(size_t rowNum, nullptr) .via(qctx_->rctx()->runner()) .thenValue([this, rowNum, stepNum, getNbrTime, reverse](auto&& resp) { - addStats(resp, stats_, stepNum, getNbrTime.elapsedInUSec(), reverse); + addStats(resp, stepNum, getNbrTime.elapsedInUSec(), reverse); return buildPath(rowNum, std::move(resp), reverse); }); } @@ -264,6 +264,9 @@ folly::Future SingleShortestPath::buildEvenPath(size_t rowNum, return false; } for (auto& meetVertex : vertices) { + if (!meetVertex.isVertex()) { + continue; + } auto meetVid = meetVertex.getVertex().vid; auto leftPaths = createLeftPath(rowNum, meetVid); auto rightPaths = createRightPath(rowNum, meetVid, false); diff --git a/src/graph/optimizer/rule/CollapseProjectRule.cpp b/src/graph/optimizer/rule/CollapseProjectRule.cpp index 726ef22c6ff..aa72ba991eb 100644 --- a/src/graph/optimizer/rule/CollapseProjectRule.cpp +++ b/src/graph/optimizer/rule/CollapseProjectRule.cpp @@ -75,6 +75,7 @@ StatusOr CollapseProjectRule::transform( // 2. find link according to propRefNames and colNames in ProjBelow std::unordered_map rewriteMap; auto colNames = projBelow->colNames(); + DCHECK_EQ(colNames.size(), colsBelow.size()); for (size_t i = 0; i < colNames.size(); ++i) { if (uniquePropRefNames.count(colNames[i])) { auto colExpr = colsBelow[i]->expr(); diff --git a/src/graph/optimizer/rule/GetEdgesTransformAppendVerticesLimitRule.cpp b/src/graph/optimizer/rule/GetEdgesTransformAppendVerticesLimitRule.cpp index 22cad3311ec..f12f346b673 100644 --- a/src/graph/optimizer/rule/GetEdgesTransformAppendVerticesLimitRule.cpp +++ b/src/graph/optimizer/rule/GetEdgesTransformAppendVerticesLimitRule.cpp @@ -85,7 +85,6 @@ StatusOr GetEdgesTransformAppendVerticesLimitRule::tra auto newLimit = limit->clone(); auto newLimitGroup = OptGroup::create(ctx); auto newLimitGroupNode = newLimitGroup->makeGroupNode(newLimit); - newLimit->setOutputVar(limit->outputVar()); newProject->setInputVar(newLimit->outputVar()); newProjectGroupNode->dependsOn(newLimitGroup); @@ -105,8 +104,8 @@ StatusOr GetEdgesTransformAppendVerticesLimitRule::tra auto newAppendVertices = appendVertices->clone(); auto newAppendVerticesGroup = OptGroup::create(ctx); auto colSize = appendVertices->colNames().size(); - newAppendVertices->setOutputVar(appendVertices->outputVar()); newLimit->setInputVar(newAppendVertices->outputVar()); + newLimit->setColNames(newAppendVertices->colNames()); // Limit keep column names same with input newAppendVertices->setColNames( {appendVertices->colNames()[colSize - 2], appendVertices->colNames()[colSize - 1]}); auto newAppendVerticesGroupNode = newAppendVerticesGroup->makeGroupNode(newAppendVertices); @@ -123,19 +122,19 @@ StatusOr GetEdgesTransformAppendVerticesLimitRule::tra auto *newProj = GetEdgesTransformUtils::projectEdges(qctx, newScanEdges, traverse->colNames().back()); newProj->setInputVar(newScanEdges->outputVar()); - newProj->setOutputVar(newAppendVertices->inputVar()); newProj->setColNames({traverse->colNames().back()}); auto newProjGroup = OptGroup::create(ctx); auto newProjGroupNode = newProjGroup->makeGroupNode(newProj); newAppendVerticesGroupNode->dependsOn(newProjGroup); + newAppendVertices->setInputVar(newProj->outputVar()); newProjGroupNode->dependsOn(newScanEdgesGroup); for (auto dep : scanVerticesGroupNode->dependencies()) { newScanEdgesGroupNode->dependsOn(dep); } TransformResult result; - result.eraseCurr = true; + result.eraseAll = true; result.newGroupNodes.emplace_back(newProjectGroupNode); return result; } diff --git a/src/graph/optimizer/rule/GetEdgesTransformRule.cpp b/src/graph/optimizer/rule/GetEdgesTransformRule.cpp index aa86c1438e2..5df8b26744f 100644 --- a/src/graph/optimizer/rule/GetEdgesTransformRule.cpp +++ b/src/graph/optimizer/rule/GetEdgesTransformRule.cpp @@ -72,8 +72,8 @@ StatusOr GetEdgesTransformRule::transform( auto project = static_cast(projectGroupNode->node()); auto newProject = project->clone(); - auto newProjectGroupNode = OptGroupNode::create(ctx, newProject, projectGroupNode->group()); newProject->setOutputVar(project->outputVar()); + auto newProjectGroupNode = OptGroupNode::create(ctx, newProject, projectGroupNode->group()); auto limitGroupNode = matched.dependencies.front().node; auto limit = static_cast(limitGroupNode->node()); @@ -85,11 +85,11 @@ StatusOr GetEdgesTransformRule::transform( auto newLimit = limit->clone(); auto newLimitGroup = OptGroup::create(ctx); - newLimit->setOutputVar(limit->outputVar()); auto newLimitGroupNode = newLimitGroup->makeGroupNode(newLimit); newProjectGroupNode->dependsOn(newLimitGroup); + newProject->setInputVar(newLimit->outputVar()); auto *newScanEdges = GetEdgesTransformUtils::traverseToScanEdges(traverse, limit->count(qctx)); if (newScanEdges == nullptr) { @@ -100,14 +100,15 @@ StatusOr GetEdgesTransformRule::transform( auto *newProj = GetEdgesTransformUtils::projectEdges(qctx, newScanEdges, traverse->colNames().back()); - newProj->setInputVar(newScanEdges->outputVar()); - newProj->setOutputVar(traverse->outputVar()); newProj->setColNames({traverse->colNames().back()}); auto newProjGroup = OptGroup::create(ctx); auto newProjGroupNode = newProjGroup->makeGroupNode(newProj); newLimitGroupNode->dependsOn(newProjGroup); + newLimit->setInputVar(newProj->outputVar()); + newLimit->setColNames(newProj->colNames()); // Limit keep column names same with input newProjGroupNode->dependsOn(newScanEdgesGroup); + newProj->setInputVar(newScanEdges->outputVar()); newScanEdgesGroupNode->setDeps(scanVerticesGroupNode->dependencies()); TransformResult result; diff --git a/src/graph/optimizer/rule/GetEdgesTransformUtils.h b/src/graph/optimizer/rule/GetEdgesTransformUtils.h index 0a706cddee7..15c5f98fac3 100644 --- a/src/graph/optimizer/rule/GetEdgesTransformUtils.h +++ b/src/graph/optimizer/rule/GetEdgesTransformUtils.h @@ -51,6 +51,7 @@ class GetEdgesTransformUtils final { return scanEdges; } + // [EDGE] AS static graph::Project *projectEdges(graph::QueryContext *qctx, PlanNode *input, const std::string &colName) { diff --git a/src/meta/RootUserMan.h b/src/meta/RootUserMan.h index fbc45b1fd16..c8bde933203 100644 --- a/src/meta/RootUserMan.h +++ b/src/meta/RootUserMan.h @@ -20,6 +20,26 @@ namespace meta { * */ class RootUserMan { public: + static ErrorOr isGodExists(kvstore::KVStore* kv) { + auto rolePrefix = MetaKeyUtils::roleSpacePrefix(kDefaultSpaceId); + std::unique_ptr iter; + auto code = kv->prefix(kDefaultSpaceId, kDefaultPartId, rolePrefix, &iter, false); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + return code; + } + while (iter->valid()) { + auto val = iter->val(); + auto type = *reinterpret_cast(val.begin()); + if (type == meta::cpp2::RoleType::GOD) { + LOG(INFO) << "God user exists"; + return true; + } + iter->next(); + } + LOG(INFO) << "God user is not exists"; + return false; + } + static bool isUserExists(kvstore::KVStore* kv) { auto userKey = MetaKeyUtils::userKey("root"); std::string val; diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index d3e32a4b078..4e024fe4f04 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -130,7 +130,12 @@ void JobManager::scheduleThread() { auto jobOp = std::get<0>(opJobId); auto jodId = std::get<1>(opJobId); auto spaceId = std::get<2>(opJobId); - std::lock_guard lk(muJobFinished_[spaceId]); + auto iter = muJobFinished_.find(spaceId); + if (iter == muJobFinished_.end()) { + iter = muJobFinished_.emplace(spaceId, std::make_unique()).first; + } + std::lock_guard lk(*(iter->second)); + auto jobDescRet = JobDescription::loadJobDescription(spaceId, jodId, kvStore_); if (!nebula::ok(jobDescRet)) { LOG(INFO) << "Load an invalid job from space " << spaceId << " jodId " << jodId; @@ -189,7 +194,11 @@ bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) { jobExec->setFinishCallBack([this, jobDesc](meta::cpp2::JobStatus status) { if (status == meta::cpp2::JobStatus::STOPPED) { auto space = jobDesc.getSpace(); - std::lock_guard lkg(muJobFinished_[space]); + auto iter = muJobFinished_.find(space); + if (iter == muJobFinished_.end()) { + iter = muJobFinished_.emplace(space, std::make_unique()).first; + } + std::lock_guard lk(*(iter->second)); cleanJob(jobDesc.getJobId()); return nebula::cpp2::ErrorCode::SUCCEEDED; } else { @@ -226,7 +235,12 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(GraphSpaceID spaceId, jobId, apache::thrift::util::enumNameSafe(jobStatus)); // normal job finish may race to job stop - std::lock_guard lk(muJobFinished_[spaceId]); + auto mutexIter = muJobFinished_.find(spaceId); + if (mutexIter == muJobFinished_.end()) { + mutexIter = muJobFinished_.emplace(spaceId, std::make_unique()).first; + } + std::lock_guard lk(*(mutexIter->second)); + auto optJobDescRet = JobDescription::loadJobDescription(spaceId, jobId, kvStore_); if (!nebula::ok(optJobDescRet)) { LOG(INFO) << folly::sformat("Load job failed, spaceId={} jobId={}", spaceId, jobId); @@ -289,7 +303,7 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(GraphSpaceID spaceId, cleanJob(jobId); return nebula::cpp2::ErrorCode::SUCCEEDED; } - std::unique_ptr& jobExec = it->second; + auto jobExec = it->second.get(); if (jobStatus == cpp2::JobStatus::STOPPED) { jobExec->stop(); if (!jobExec->isMetaJob()) { @@ -358,7 +372,12 @@ nebula::cpp2::ErrorCode JobManager::reportTaskFinish(const cpp2::ReportTaskReq& } // because the last task will update the job's status // tasks should report once a time - std::lock_guard lk(muReportFinish_[spaceId]); + auto iter = muReportFinish_.find(spaceId); + if (iter == muReportFinish_.end()) { + iter = muReportFinish_.emplace(spaceId, std::make_unique()).first; + } + std::lock_guard lk(*(iter->second)); + auto tasksRet = getAllTasks(spaceId, jobId); if (!nebula::ok(tasksRet)) { return nebula::error(tasksRet); diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index f0e0008faaa..355ee5a5b6b 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -288,18 +288,18 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { // Identify whether the current space is running a job folly::ConcurrentHashMap> spaceRunningJobs_; - std::map> runningJobs_; + folly::ConcurrentHashMap> runningJobs_; // The job in running or queue folly::ConcurrentHashMap inFlightJobs_; std::thread bgThread_; nebula::kvstore::KVStore* kvStore_{nullptr}; AdminClient* adminClient_{nullptr}; - std::map muReportFinish_; + folly::ConcurrentHashMap> muReportFinish_; // Start & stop & finish a job need mutual exclusion // The reason of using recursive_mutex is that, it's possible for a meta job try to get this lock // in finish-callback in the same thread with runJobInternal - std::map muJobFinished_; + folly::ConcurrentHashMap> muJobFinished_; std::atomic status_ = JbmgrStatus::NOT_START; }; diff --git a/src/meta/test/GetStatsTest.cpp b/src/meta/test/GetStatsTest.cpp index 0d7f2abcdd0..d1b00497ff1 100644 --- a/src/meta/test/GetStatsTest.cpp +++ b/src/meta/test/GetStatsTest.cpp @@ -67,7 +67,12 @@ struct JobCallBack { item.space_vertices_ref() = 2 * n_; item.space_edges_ref() = 2 * n_; req.stats_ref() = item; - jobMgr_->muJobFinished_[spaceId_].unlock(); + auto mutexIter = jobMgr_->muJobFinished_.find(spaceId_); + if (mutexIter == jobMgr_->muJobFinished_.end()) { + mutexIter = + jobMgr_->muJobFinished_.emplace(spaceId_, std::make_unique()).first; + } + mutexIter->second->unlock(); jobMgr_->reportTaskFinish(req); return folly::Future(Status::OK()); } diff --git a/src/meta/test/JobManagerTest.cpp b/src/meta/test/JobManagerTest.cpp index 48761f3223c..ef694eb3cd7 100644 --- a/src/meta/test/JobManagerTest.cpp +++ b/src/meta/test/JobManagerTest.cpp @@ -639,6 +639,31 @@ TEST_F(JobManagerTest, RecoverJob) { } } +TEST_F(JobManagerTest, ConcurrentHashMapTest) { + folly::ConcurrentHashMap> m; + + auto t1 = std::thread([&m] { + for (int i = 0; i < 100000; i++) { + auto itr = m.find(i * 2); + if (itr == m.end()) { + m.emplace(i * 2, std::make_unique()); + } + } + }); + + auto t2 = std::thread([&m] { + for (int i = 0; i < 100000; i++) { + auto itr = m.find(i * 2 + 1); + if (itr == m.end()) { + m.emplace(i * 2 + 1, std::make_unique()); + } + } + }); + + t1.join(); + t2.join(); +} + TEST(JobDescriptionTest, Ctor) { GraphSpaceID spaceId = 1; JobID jobId1 = 1; diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index 201d30ceba9..f0d16bd8cfc 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -195,16 +195,16 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { kvs.emplace_back(std::move(key), std::move(encode.value())); } - auto atomicOp = - [partId, data = std::move(kvs), this]() mutable -> kvstore::MergeableAtomicOpResult { - return addEdgesWithIndex(partId, std::move(data)); - }; - - auto cb = [partId, this](nebula::cpp2::ErrorCode ec) { handleAsync(spaceId_, partId, ec); }; - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { handleAsync(spaceId_, partId, code); } else { + stats::StatsManager::addValue(kNumEdgesInserted, kvs.size()); + auto atomicOp = + [partId, data = std::move(kvs), this]() mutable -> kvstore::MergeableAtomicOpResult { + return addEdgesWithIndex(partId, std::move(data)); + }; + auto cb = [partId, this](nebula::cpp2::ErrorCode ec) { handleAsync(spaceId_, partId, ec); }; + env_->kvstore_->asyncAtomicOp(spaceId_, partId, std::move(atomicOp), std::move(cb)); } } diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index 2302e6d7ce5..3f598fcad12 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -195,6 +195,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { handleAsync(spaceId_, partId, code); } else { + stats::StatsManager::addValue(kNumVerticesInserted, verticeData.size()); auto atomicOp = [=, tags = std::move(tags), vertices = std::move(verticeData)]() mutable { return addVerticesWithIndex(partId, tags, vertices); }; diff --git a/src/webservice/GetFlagsHandler.cpp b/src/webservice/GetFlagsHandler.cpp index ad8ea573c56..35db5d03f78 100644 --- a/src/webservice/GetFlagsHandler.cpp +++ b/src/webservice/GetFlagsHandler.cpp @@ -22,7 +22,7 @@ using proxygen::ResponseBuilder; using proxygen::UpgradeProtocol; void GetFlagsHandler::onRequest(std::unique_ptr headers) noexcept { - if (headers->getMethod().value() != HTTPMethod::GET) { + if (!headers->getMethod() || headers->getMethod().value() != HTTPMethod::GET) { // Unsupported method err_ = HttpCode::E_UNSUPPORTED_METHOD; return; diff --git a/src/webservice/GetStatsHandler.cpp b/src/webservice/GetStatsHandler.cpp index cc75b71b66f..4bb61d81461 100644 --- a/src/webservice/GetStatsHandler.cpp +++ b/src/webservice/GetStatsHandler.cpp @@ -24,7 +24,7 @@ using proxygen::ResponseBuilder; using proxygen::UpgradeProtocol; void GetStatsHandler::onRequest(std::unique_ptr headers) noexcept { - if (headers->getMethod().value() != HTTPMethod::GET) { + if (!headers->getMethod() || headers->getMethod().value() != HTTPMethod::GET) { // Unsupported method err_ = HttpCode::E_UNSUPPORTED_METHOD; return; diff --git a/src/webservice/Router.cpp b/src/webservice/Router.cpp index 12243d0bb8c..18741463750 100644 --- a/src/webservice/Router.cpp +++ b/src/webservice/Router.cpp @@ -104,6 +104,9 @@ proxygen::RequestHandler *Route::generateHandler(const std::string &path) const proxygen::RequestHandler *Router::dispatch(const proxygen::HTTPMessage *msg) const { for (Route *r = head_; r != nullptr; r = r->next()) { + if (!msg->getMethod()) { + break; + } if (r->matches(msg->getMethod().value(), msg->getPath())) { return r->generateHandler(msg->getPath()); } diff --git a/src/webservice/SetFlagsHandler.cpp b/src/webservice/SetFlagsHandler.cpp index 26b6f621232..dcb7506a970 100644 --- a/src/webservice/SetFlagsHandler.cpp +++ b/src/webservice/SetFlagsHandler.cpp @@ -21,7 +21,7 @@ using proxygen::ResponseBuilder; using proxygen::UpgradeProtocol; void SetFlagsHandler::onRequest(std::unique_ptr headers) noexcept { - if (headers->getMethod().value() != HTTPMethod::PUT) { + if (!headers->getMethod() || headers->getMethod().value() != HTTPMethod::PUT) { // Unsupported method err_ = HttpCode::E_UNSUPPORTED_METHOD; return; diff --git a/src/webservice/StatusHandler.cpp b/src/webservice/StatusHandler.cpp index a2ffb9c76b4..cae194362bd 100644 --- a/src/webservice/StatusHandler.cpp +++ b/src/webservice/StatusHandler.cpp @@ -20,7 +20,7 @@ using proxygen::ResponseBuilder; using proxygen::UpgradeProtocol; void StatusHandler::onRequest(std::unique_ptr headers) noexcept { - if (headers->getMethod().value() != HTTPMethod::GET) { + if (!headers->getMethod() || headers->getMethod().value() != HTTPMethod::GET) { // Unsupported method err_ = HttpCode::E_UNSUPPORTED_METHOD; return;