Skip to content

Commit

Permalink
add some data related counters
Browse files Browse the repository at this point in the history
  • Loading branch information
jievince committed Dec 15, 2021
1 parent 1b26800 commit 8e5ecd0
Show file tree
Hide file tree
Showing 15 changed files with 42 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "kvstore/KVStore.h"
#include "meta/processors/admin/AdminClient.h"
#include "meta/processors/admin/HBProcessor.h"
#include "meta/processors/job/AdminJobProcessor.h"
#include "meta/processors/job/JobManager.h"

namespace nebula {
namespace meta {
Expand All @@ -23,6 +25,8 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {

// Initialize counters
kHBCounters.init();
kNumActiveJobs = stats::StatsManager::registerStats("num_active_jobs", "sum");
kNumRunningJobs = stats::StatsManager::registerStats("num_running_jobs", "sum");
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/meta/processors/job/AdminJobProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
#include "meta/processors/job/AdminJobProcessor.h"

#include "common/base/StatusOr.h"
#include "common/stats/StatsManager.h"
#include "meta/processors/job/JobDescription.h"
#include "meta/processors/job/JobManager.h"

namespace nebula {
namespace meta {
stats::CounterId kNumActiveJobs;

void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
cpp2::AdminJobResult result;
Expand Down Expand Up @@ -59,6 +61,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
errorCode = jobMgr->addJob(jobDesc, adminClient_);
if (errorCode == nebula::cpp2::ErrorCode::SUCCEEDED) {
result.set_job_id(nebula::value(jobId));
stats::StatsManager::addValue(kNumActiveJobs);
}
break;
}
Expand Down Expand Up @@ -121,6 +124,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
auto ret = jobMgr->recoverJob(spaceName, adminClient_, jobIds);
if (nebula::ok(ret)) {
result.set_recovered_job_num(nebula::value(ret));
stats::StatsManager::addValue(kNumActiveJobs);
} else {
errorCode = nebula::error(ret);
}
Expand Down
2 changes: 2 additions & 0 deletions src/meta/processors/job/AdminJobProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
#ifndef META_ADMINJOBPROCESSOR_H_
#define META_ADMINJOBPROCESSOR_H_

#include "common/stats/StatsManager.h"
#include "meta/processors/BaseProcessor.h"
#include "meta/processors/admin/AdminClient.h"

namespace nebula {
namespace meta {
extern stats::CounterId kNumActiveJobs;

class AdminJobProcessor : public BaseProcessor<cpp2::AdminJobResp> {
public:
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ using nebula::kvstore::KVIterator;

namespace nebula {
namespace meta {
stats::CounterId kNumRunningJobs = stats::StatsManager::registerStats("num_running_jobs", "sum");
stats::CounterId kNumRunningJobs;

JobManager* JobManager::getInstance() {
static JobManager inst;
Expand Down
6 changes: 6 additions & 0 deletions src/storage/GraphStorageServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ GraphStorageServiceHandler::GraphStorageServiceHandler(StorageEnv* env) : env_(e
kPutCounters.init("kv_put");
kGetCounters.init("kv_get");
kRemoveCounters.init("kv_remove");

kNumVerticesInserted = stats::StatsManager::registerStats("num_vertices_inserted", "rate, sum");
kNumEdgesInserted = stats::StatsManager::registerStats("num_edges_inserted", "rate, sum");
kNumEdgesDeleted = stats::StatsManager::registerStats("num_edges_deleted", "rate, sum");
kNumTagsDeleted = stats::StatsManager::registerStats("num_tags_deleted", "rate, sum");
kNumVerticesDeleted = stats::StatsManager::registerStats("num_vertices_deleted", "rate, sum");
}

// Vertice section
Expand Down
4 changes: 4 additions & 0 deletions src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <algorithm>

#include "codec/RowWriterV2.h"
#include "common/stats/StatsManager.h"
#include "common/time/WallClock.h"
#include "common/utils/IndexKeyUtils.h"
#include "common/utils/NebulaKeyUtils.h"
Expand All @@ -17,6 +18,7 @@ namespace nebula {
namespace storage {

ProcessorCounters kAddEdgesCounters;
stats::CounterId kNumEdgesInserted;

void AddEdgesProcessor::process(const cpp2::AddEdgesRequest& req) {
spaceId_ = req.get_space_id();
Expand Down Expand Up @@ -140,6 +142,7 @@ void AddEdgesProcessor::doProcess(const cpp2::AddEdgesRequest& req) {
});
} else {
doPut(spaceId_, partId, std::move(data));
stats::StatsManager::addValue(kNumEdgesInserted, data.size());
}
}
}
Expand Down Expand Up @@ -297,6 +300,7 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) {
break;
}
batchHolder->put(std::move(key), std::move(retEnc.value()));
stats::StatsManager::addValue(kNumEdgesInserted);
}
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
env_->edgesML_->unlockBatch(dummyLock);
Expand Down
2 changes: 2 additions & 0 deletions src/storage/mutate/AddEdgesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define STORAGE_MUTATE_ADDEDGESPROCESSOR_H_

#include "common/base/Base.h"
#include "common/stats/StatsManager.h"
#include "kvstore/LogEncoder.h"
#include "storage/BaseProcessor.h"
#include "storage/StorageFlags.h"
Expand All @@ -15,6 +16,7 @@ namespace nebula {
namespace storage {

extern ProcessorCounters kAddEdgesCounters;
extern stats::CounterId kNumEdgesInserted;

class AddEdgesProcessor : public BaseProcessor<cpp2::ExecResponse> {
friend class TransactionManager;
Expand Down
4 changes: 4 additions & 0 deletions src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <algorithm>

#include "codec/RowWriterV2.h"
#include "common/stats/StatsManager.h"
#include "common/time/WallClock.h"
#include "common/utils/IndexKeyUtils.h"
#include "common/utils/NebulaKeyUtils.h"
Expand All @@ -18,6 +19,7 @@ namespace nebula {
namespace storage {

ProcessorCounters kAddVerticesCounters;
stats::CounterId kNumVerticesInserted;

void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) {
spaceId_ = req.get_space_id();
Expand Down Expand Up @@ -126,6 +128,7 @@ void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) {
handleAsync(spaceId_, partId, code);
} else {
doPut(spaceId_, partId, std::move(data));
stats::StatsManager::addValue(kNumVerticesInserted, data.size());
}
}
}
Expand Down Expand Up @@ -280,6 +283,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
* step 3 , Insert new vertex data
*/
batchHolder->put(std::move(key), std::move(retEnc.value()));
stats::StatsManager::addValue(kNumVerticesInserted);
}
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
break;
Expand Down
1 change: 1 addition & 0 deletions src/storage/mutate/AddVerticesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace nebula {
namespace storage {

extern ProcessorCounters kAddVerticesCounters;
extern stats::CounterId kNumVerticesInserted;

class AddVerticesProcessor : public BaseProcessor<cpp2::ExecResponse> {
public:
Expand Down
4 changes: 4 additions & 0 deletions src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <algorithm>

#include "common/stats/StatsManager.h"
#include "common/utils/IndexKeyUtils.h"
#include "common/utils/NebulaKeyUtils.h"
#include "common/utils/OperationKeyUtils.h"
Expand All @@ -15,6 +16,7 @@ namespace nebula {
namespace storage {

ProcessorCounters kDelEdgesCounters;
stats::CounterId kNumEdgesDeleted;

void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {
spaceId_ = req.get_space_id();
Expand Down Expand Up @@ -76,6 +78,7 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {
continue;
}
doRemove(spaceId_, partId, std::move(keys));
stats::StatsManager::addValue(kNumEdgesDeleted, keys.size());
}
} else {
for (auto& part : partEdges) {
Expand Down Expand Up @@ -185,6 +188,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> DeleteEdgesProcessor::deleteEdges(
}
}
batchHolder->remove(std::move(key));
stats::StatsManager::addValue(kNumEdgesDeleted);
} else if (ret == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
continue;
} else {
Expand Down
1 change: 1 addition & 0 deletions src/storage/mutate/DeleteEdgesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace nebula {
namespace storage {

extern ProcessorCounters kDelEdgesCounters;
extern stats::CounterId kNumEdgesDeleted;

class DeleteEdgesProcessor : public BaseProcessor<cpp2::ExecResponse> {
public:
Expand Down
4 changes: 4 additions & 0 deletions src/storage/mutate/DeleteTagsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "storage/mutate/DeleteTagsProcessor.h"

#include "common/stats/StatsManager.h"
#include "common/utils/IndexKeyUtils.h"
#include "common/utils/NebulaKeyUtils.h"
#include "common/utils/OperationKeyUtils.h"
Expand All @@ -14,6 +15,7 @@ namespace nebula {
namespace storage {

ProcessorCounters kDelTagsCounters;
stats::CounterId kNumTagsDeleted;

void DeleteTagsProcessor::process(const cpp2::DeleteTagsRequest& req) {
spaceId_ = req.get_space_id();
Expand Down Expand Up @@ -60,6 +62,7 @@ void DeleteTagsProcessor::process(const cpp2::DeleteTagsRequest& req) {
}
}
doRemove(spaceId_, partId, std::move(keys));
stats::StatsManager::addValue(kNumTagsDeleted, keys.size());
}
} else {
for (const auto& part : parts) {
Expand Down Expand Up @@ -147,6 +150,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> DeleteTagsProcessor::deleteTags(
}
}
batchHolder->remove(std::move(key));
stats::StatsManager::addValue(kNumTagsDeleted);
}
}
return encodeBatchValue(batchHolder->getBatch());
Expand Down
1 change: 1 addition & 0 deletions src/storage/mutate/DeleteTagsProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace nebula {
namespace storage {

extern ProcessorCounters kDelTagsCounters;
extern stats::CounterId kNumTagsDeleted;

class DeleteTagsProcessor : public BaseProcessor<cpp2::ExecResponse> {
public:
Expand Down
3 changes: 3 additions & 0 deletions src/storage/mutate/DeleteVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace nebula {
namespace storage {

ProcessorCounters kDelVerticesCounters;
stats::CounterId kNumVerticesDeleted;

void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) {
spaceId_ = req.get_space_id();
Expand Down Expand Up @@ -80,6 +81,7 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) {
continue;
}
doRemove(spaceId_, partId, std::move(keys));
stats::StatsManager::addValue(kNumVerticesDeleted, keys.size());
}
} else {
for (auto& pv : partVertices) {
Expand Down Expand Up @@ -171,6 +173,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> DeleteVerticesProcessor::deleteVer
}
}
batchHolder->remove(key.str());
stats::StatsManager::addValue(kNumVerticesDeleted);
iter->next();
}
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/mutate/DeleteVerticesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace nebula {
namespace storage {

extern ProcessorCounters kDelVerticesCounters;
extern stats::CounterId kNumVerticesDeleted;

class DeleteVerticesProcessor : public BaseProcessor<cpp2::ExecResponse> {
public:
Expand Down

0 comments on commit 8e5ecd0

Please sign in to comment.