diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index 1a66097c08c..757e990017d 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -11,6 +11,8 @@ #include "kvstore/KVStore.h" #include "meta/processors/admin/AdminClient.h" #include "meta/processors/admin/HBProcessor.h" +#include "meta/processors/job/AdminJobProcessor.h" +#include "meta/processors/job/JobManager.h" namespace nebula { namespace meta { @@ -23,6 +25,8 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { // Initialize counters kHBCounters.init(); + kNumActiveJobs = stats::StatsManager::registerStats("num_active_jobs", "sum"); + kNumRunningJobs = stats::StatsManager::registerStats("num_running_jobs", "sum"); } /** diff --git a/src/meta/processors/job/AdminJobProcessor.cpp b/src/meta/processors/job/AdminJobProcessor.cpp index 950a34d18dc..dfbdf9af4f0 100644 --- a/src/meta/processors/job/AdminJobProcessor.cpp +++ b/src/meta/processors/job/AdminJobProcessor.cpp @@ -6,11 +6,13 @@ #include "meta/processors/job/AdminJobProcessor.h" #include "common/base/StatusOr.h" +#include "common/stats/StatsManager.h" #include "meta/processors/job/JobDescription.h" #include "meta/processors/job/JobManager.h" namespace nebula { namespace meta { +stats::CounterId kNumActiveJobs; void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { cpp2::AdminJobResult result; @@ -59,6 +61,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { errorCode = jobMgr->addJob(jobDesc, adminClient_); if (errorCode == nebula::cpp2::ErrorCode::SUCCEEDED) { result.set_job_id(nebula::value(jobId)); + stats::StatsManager::addValue(kNumActiveJobs); } break; } @@ -121,6 +124,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { auto ret = jobMgr->recoverJob(spaceName, adminClient_, jobIds); if (nebula::ok(ret)) { result.set_recovered_job_num(nebula::value(ret)); + stats::StatsManager::addValue(kNumActiveJobs); } else { errorCode = nebula::error(ret); } diff --git a/src/meta/processors/job/AdminJobProcessor.h b/src/meta/processors/job/AdminJobProcessor.h index 589e81a7fa4..3a920f01358 100644 --- a/src/meta/processors/job/AdminJobProcessor.h +++ b/src/meta/processors/job/AdminJobProcessor.h @@ -6,11 +6,13 @@ #ifndef META_ADMINJOBPROCESSOR_H_ #define META_ADMINJOBPROCESSOR_H_ +#include "common/stats/StatsManager.h" #include "meta/processors/BaseProcessor.h" #include "meta/processors/admin/AdminClient.h" namespace nebula { namespace meta { +extern stats::CounterId kNumActiveJobs; class AdminJobProcessor : public BaseProcessor { public: diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index 38f2bff019a..5a6ad9296ca 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -34,7 +34,7 @@ using nebula::kvstore::KVIterator; namespace nebula { namespace meta { -stats::CounterId kNumRunningJobs = stats::StatsManager::registerStats("num_running_jobs", "sum"); +stats::CounterId kNumRunningJobs; JobManager* JobManager::getInstance() { static JobManager inst; diff --git a/src/storage/GraphStorageServiceHandler.cpp b/src/storage/GraphStorageServiceHandler.cpp index 7f7f027af5e..8fa037a65eb 100644 --- a/src/storage/GraphStorageServiceHandler.cpp +++ b/src/storage/GraphStorageServiceHandler.cpp @@ -63,6 +63,12 @@ GraphStorageServiceHandler::GraphStorageServiceHandler(StorageEnv* env) : env_(e kPutCounters.init("kv_put"); kGetCounters.init("kv_get"); kRemoveCounters.init("kv_remove"); + + kNumVerticesInserted = stats::StatsManager::registerStats("num_vertices_inserted", "rate, sum"); + kNumEdgesInserted = stats::StatsManager::registerStats("num_edges_inserted", "rate, sum"); + kNumEdgesDeleted = stats::StatsManager::registerStats("num_edges_deleted", "rate, sum"); + kNumTagsDeleted = stats::StatsManager::registerStats("num_tags_deleted", "rate, sum"); + kNumVerticesDeleted = stats::StatsManager::registerStats("num_vertices_deleted", "rate, sum"); } // Vertice section diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index 0db34b3cfd5..d3741ffda2c 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -8,6 +8,7 @@ #include #include "codec/RowWriterV2.h" +#include "common/stats/StatsManager.h" #include "common/time/WallClock.h" #include "common/utils/IndexKeyUtils.h" #include "common/utils/NebulaKeyUtils.h" @@ -17,6 +18,7 @@ namespace nebula { namespace storage { ProcessorCounters kAddEdgesCounters; +stats::CounterId kNumEdgesInserted; void AddEdgesProcessor::process(const cpp2::AddEdgesRequest& req) { spaceId_ = req.get_space_id(); @@ -140,6 +142,7 @@ void AddEdgesProcessor::doProcess(const cpp2::AddEdgesRequest& req) { }); } else { doPut(spaceId_, partId, std::move(data)); + stats::StatsManager::addValue(kNumEdgesInserted, data.size()); } } } @@ -297,6 +300,7 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { break; } batchHolder->put(std::move(key), std::move(retEnc.value())); + stats::StatsManager::addValue(kNumEdgesInserted); } if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { env_->edgesML_->unlockBatch(dummyLock); diff --git a/src/storage/mutate/AddEdgesProcessor.h b/src/storage/mutate/AddEdgesProcessor.h index dcb44d2a59b..5ae001f4c5a 100644 --- a/src/storage/mutate/AddEdgesProcessor.h +++ b/src/storage/mutate/AddEdgesProcessor.h @@ -7,6 +7,7 @@ #define STORAGE_MUTATE_ADDEDGESPROCESSOR_H_ #include "common/base/Base.h" +#include "common/stats/StatsManager.h" #include "kvstore/LogEncoder.h" #include "storage/BaseProcessor.h" #include "storage/StorageFlags.h" @@ -15,6 +16,7 @@ namespace nebula { namespace storage { extern ProcessorCounters kAddEdgesCounters; +extern stats::CounterId kNumEdgesInserted; class AddEdgesProcessor : public BaseProcessor { friend class TransactionManager; diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index 055c4309538..22265cfdf1c 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -8,6 +8,7 @@ #include #include "codec/RowWriterV2.h" +#include "common/stats/StatsManager.h" #include "common/time/WallClock.h" #include "common/utils/IndexKeyUtils.h" #include "common/utils/NebulaKeyUtils.h" @@ -18,6 +19,7 @@ namespace nebula { namespace storage { ProcessorCounters kAddVerticesCounters; +stats::CounterId kNumVerticesInserted; void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) { spaceId_ = req.get_space_id(); @@ -126,6 +128,7 @@ void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) { handleAsync(spaceId_, partId, code); } else { doPut(spaceId_, partId, std::move(data)); + stats::StatsManager::addValue(kNumVerticesInserted, data.size()); } } } @@ -280,6 +283,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re * step 3 , Insert new vertex data */ batchHolder->put(std::move(key), std::move(retEnc.value())); + stats::StatsManager::addValue(kNumVerticesInserted); } if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { break; diff --git a/src/storage/mutate/AddVerticesProcessor.h b/src/storage/mutate/AddVerticesProcessor.h index 01047551433..77c39384b5b 100644 --- a/src/storage/mutate/AddVerticesProcessor.h +++ b/src/storage/mutate/AddVerticesProcessor.h @@ -16,6 +16,7 @@ namespace nebula { namespace storage { extern ProcessorCounters kAddVerticesCounters; +extern stats::CounterId kNumVerticesInserted; class AddVerticesProcessor : public BaseProcessor { public: diff --git a/src/storage/mutate/DeleteEdgesProcessor.cpp b/src/storage/mutate/DeleteEdgesProcessor.cpp index 0c4964bf71b..06977c4f351 100644 --- a/src/storage/mutate/DeleteEdgesProcessor.cpp +++ b/src/storage/mutate/DeleteEdgesProcessor.cpp @@ -7,6 +7,7 @@ #include +#include "common/stats/StatsManager.h" #include "common/utils/IndexKeyUtils.h" #include "common/utils/NebulaKeyUtils.h" #include "common/utils/OperationKeyUtils.h" @@ -15,6 +16,7 @@ namespace nebula { namespace storage { ProcessorCounters kDelEdgesCounters; +stats::CounterId kNumEdgesDeleted; void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) { spaceId_ = req.get_space_id(); @@ -76,6 +78,7 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) { continue; } doRemove(spaceId_, partId, std::move(keys)); + stats::StatsManager::addValue(kNumEdgesDeleted, keys.size()); } } else { for (auto& part : partEdges) { @@ -185,6 +188,7 @@ ErrorOr DeleteEdgesProcessor::deleteEdges( } } batchHolder->remove(std::move(key)); + stats::StatsManager::addValue(kNumEdgesDeleted); } else if (ret == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { continue; } else { diff --git a/src/storage/mutate/DeleteEdgesProcessor.h b/src/storage/mutate/DeleteEdgesProcessor.h index 9e240b624f9..8bcd7553409 100644 --- a/src/storage/mutate/DeleteEdgesProcessor.h +++ b/src/storage/mutate/DeleteEdgesProcessor.h @@ -14,6 +14,7 @@ namespace nebula { namespace storage { extern ProcessorCounters kDelEdgesCounters; +extern stats::CounterId kNumEdgesDeleted; class DeleteEdgesProcessor : public BaseProcessor { public: diff --git a/src/storage/mutate/DeleteTagsProcessor.cpp b/src/storage/mutate/DeleteTagsProcessor.cpp index af414c55ba4..51d544a621a 100644 --- a/src/storage/mutate/DeleteTagsProcessor.cpp +++ b/src/storage/mutate/DeleteTagsProcessor.cpp @@ -5,6 +5,7 @@ #include "storage/mutate/DeleteTagsProcessor.h" +#include "common/stats/StatsManager.h" #include "common/utils/IndexKeyUtils.h" #include "common/utils/NebulaKeyUtils.h" #include "common/utils/OperationKeyUtils.h" @@ -14,6 +15,7 @@ namespace nebula { namespace storage { ProcessorCounters kDelTagsCounters; +stats::CounterId kNumTagsDeleted; void DeleteTagsProcessor::process(const cpp2::DeleteTagsRequest& req) { spaceId_ = req.get_space_id(); @@ -60,6 +62,7 @@ void DeleteTagsProcessor::process(const cpp2::DeleteTagsRequest& req) { } } doRemove(spaceId_, partId, std::move(keys)); + stats::StatsManager::addValue(kNumTagsDeleted, keys.size()); } } else { for (const auto& part : parts) { @@ -147,6 +150,7 @@ ErrorOr DeleteTagsProcessor::deleteTags( } } batchHolder->remove(std::move(key)); + stats::StatsManager::addValue(kNumTagsDeleted); } } return encodeBatchValue(batchHolder->getBatch()); diff --git a/src/storage/mutate/DeleteTagsProcessor.h b/src/storage/mutate/DeleteTagsProcessor.h index 5c473f16b66..37128a3cea8 100644 --- a/src/storage/mutate/DeleteTagsProcessor.h +++ b/src/storage/mutate/DeleteTagsProcessor.h @@ -15,6 +15,7 @@ namespace nebula { namespace storage { extern ProcessorCounters kDelTagsCounters; +extern stats::CounterId kNumTagsDeleted; class DeleteTagsProcessor : public BaseProcessor { public: diff --git a/src/storage/mutate/DeleteVerticesProcessor.cpp b/src/storage/mutate/DeleteVerticesProcessor.cpp index 570cbb0672a..32b732efbd9 100644 --- a/src/storage/mutate/DeleteVerticesProcessor.cpp +++ b/src/storage/mutate/DeleteVerticesProcessor.cpp @@ -14,6 +14,7 @@ namespace nebula { namespace storage { ProcessorCounters kDelVerticesCounters; +stats::CounterId kNumVerticesDeleted; void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) { spaceId_ = req.get_space_id(); @@ -80,6 +81,7 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) { continue; } doRemove(spaceId_, partId, std::move(keys)); + stats::StatsManager::addValue(kNumVerticesDeleted, keys.size()); } } else { for (auto& pv : partVertices) { @@ -171,6 +173,7 @@ ErrorOr DeleteVerticesProcessor::deleteVer } } batchHolder->remove(key.str()); + stats::StatsManager::addValue(kNumVerticesDeleted); iter->next(); } } diff --git a/src/storage/mutate/DeleteVerticesProcessor.h b/src/storage/mutate/DeleteVerticesProcessor.h index a5ec2f0b678..be326819fa9 100644 --- a/src/storage/mutate/DeleteVerticesProcessor.h +++ b/src/storage/mutate/DeleteVerticesProcessor.h @@ -15,6 +15,7 @@ namespace nebula { namespace storage { extern ProcessorCounters kDelVerticesCounters; +extern stats::CounterId kNumVerticesDeleted; class DeleteVerticesProcessor : public BaseProcessor { public: