diff --git a/README-CN.md b/README-CN.md index fc2d5231815..7a2fcd7fc3b 100644 --- a/README-CN.md +++ b/README-CN.md @@ -8,21 +8,16 @@ WeiXin - - Zhihu - - - SegmentFault - Sina Weibo - nebula star + GitHub stars - nebula fork + GitHub forks +

# NebulaGraph 是什么? @@ -108,13 +103,18 @@ NebulaGraph内核 1.x 与 2.x 数据格式、通信协议、客户端等均双 * 访问[官网](http://nebula-graph.com.cn/) * [![WeiXin](https://img.shields.io/badge/WeChat-%E5%BE%AE%E4%BF%A1-brightgreen)](https://user-images.githubusercontent.com/38887077/67449282-4362b300-f64c-11e9-878f-7efc373e5e55.jpg) * [![Sina Weibo](https://img.shields.io/badge/Weibo-%E5%BE%AE%E5%8D%9A-red)](https://weibo.com/p/1006067122684542/home?from=page_100606&mod=TAB#place) -* [知乎](https://www.zhihu.com/org/nebulagraph/activities) -* [SegmentFault](https://segmentfault.com/t/nebula) * Email: info@vesoft.com ## 加入 NebulaGraph 社区 -[![Discussions](https://img.shields.io/badge/GitHub_Discussion-000000?style=for-the-badge&logo=github&logoColor=white)](https://github.com/vesoft-inc/nebula/discussions) [![Discourse](https://img.shields.io/badge/中文论坛-4285F4?style=for-the-badge&logo=discourse&logoColor=white)](https://discuss.nebula-graph.com.cn/) [![Slack](https://img.shields.io/badge/Slack-9F2B68?style=for-the-badge&logo=slack&logoColor=white)](https://join.slack.com/t/nebulagraph/shared_invite/zt-7ybejuqa-NCZBroh~PCh66d9kOQj45g) [![Tencent_Meeting](https://img.shields.io/badge/腾讯会议-2D8CFF?style=for-the-badge&logo=googlemeet&logoColor=white)](https://meeting.tencent.com/dm/F8NX1aRZ8PQv) [![Google Calendar](https://img.shields.io/badge/Calander-4285F4?style=for-the-badge&logo=google&logoColor=white)](https://calendar.google.com/calendar/u/0?cid=Z29mbGttamM3ZTVlZ2hpazI2cmNlNXVnZThAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ) [![Meetup](https://img.shields.io/badge/Meetup-FF0000?style=for-the-badge&logo=meetup&logoColor=white)](https://www.meetup.com/nebulagraph/events/287180186?utm_medium=referral&utm_campaign=share-btn_savedevents_share_modal&utm_source=link) [![Meeting Archive](https://img.shields.io/badge/Community_wiki-808080?style=for-the-badge&logo=readthedocs&logoColor=white)](https://github.com/vesoft-inc/nebula-community/wiki) + + +| 加入 NebulaGraph 社区 | 加入方式 | +| ----------------------- | ------------------------------------------------------------ | +| 微信群 | [![WeChat Group](https://img.shields.io/badge/微信群-000000?style=for-the-badge&logo=wechat)](https://wj.qq.com/s2/8321168/8e2f/) | +| 提问 | [![Discourse](https://img.shields.io/badge/中文论坛-4285F4?style=for-the-badge&logo=discourse&logoColor=white)](https://discuss.nebula-graph.com.cn/) [![Stack Overflow](https://img.shields.io/badge/Stack%20Overflow-nebula--graph-orange?style=for-the-badge&logo=stack-overflow&logoColor=white)](https://stackoverflow.com/questions/tagged/nebula-graph) [![Discussions](https://img.shields.io/badge/GitHub_Discussion-000000?style=for-the-badge&logo=github&logoColor=white)](https://github.com/vesoft-inc/nebula/discussions) | +| 聊天 | [![Chat History](https://img.shields.io/badge/Community%20Chat-000000?style=for-the-badge&logo=discord&logoColor=white)](https://community-chat.nebula-graph.io/) [![Slack](https://img.shields.io/badge/Slack-9F2B68?style=for-the-badge&logo=slack&logoColor=white)](https://join.slack.com/t/nebulagraph/shared_invite/zt-7ybejuqa-NCZBroh~PCh66d9kOQj45g) | +| NebulaGraph Meetup 活动 | [![Tencent_Meeting](https://img.shields.io/badge/腾讯会议-2D8CFF?style=for-the-badge&logo=googlemeet&logoColor=white)](https://meeting.tencent.com/dm/F8NX1aRZ8PQv) [![Google Calendar](https://img.shields.io/badge/Calander-4285F4?style=for-the-badge&logo=google&logoColor=white)](https://calendar.google.com/calendar/u/0?cid=Z29mbGttamM3ZTVlZ2hpazI2cmNlNXVnZThAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ) [![Zoom](https://img.shields.io/badge/Zoom-2D8CFF?style=for-the-badge&logo=zoom&logoColor=white)](https://us02web.zoom.us/meeting/register/tZ0rcuypqDMvGdLuIm4VprTlx96wrEf062SH) [![Meetup](https://img.shields.io/badge/Meetup-FF0000?style=for-the-badge&logo=meetup&logoColor=white)](https://www.meetup.com/nebulagraph/events/) [![Meeting Archive](https://img.shields.io/badge/Meeting_Archive-808080?style=for-the-badge&logo=readthedocs&logoColor=white)](https://github.com/vesoft-inc/nebula-community/wiki) |
diff --git a/README.md b/README.md index 2ef22e51cb3..c525b32996a 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,9 @@
A distributed, scalable, lightning-fast graph database

+ + Stack Overflow + code coverage @@ -107,15 +110,21 @@ You can also freely deploy **NebulaGraph** as a back-end service to support your ## Contact +* [Community Chat](https://community-chat.nebula-graph.io/) * [Slack Channel](https://join.slack.com/t/nebulagraph/shared_invite/zt-7ybejuqa-NCZBroh~PCh66d9kOQj45g) -* [Stack Overflow](https://stackoverflow.com/questions/tagged/nebulagraph) +* [Stack Overflow](https://stackoverflow.com/questions/tagged/nebula-graph) * Twitter: [@NebulaGraph](https://twitter.com/NebulaGraph) -* [LinkedIn Page](https://www.linkedin.com/company/vesoft-nebula-graph) +* [LinkedIn Page](https://www.linkedin.com/company/nebula-graph/) * Email: info@vesoft.com ## Community -[![Discussions](https://img.shields.io/badge/GitHub_Discussion-000000?style=for-the-badge&logo=github&logoColor=white)](https://github.com/vesoft-inc/nebula/discussions) [![Slack](https://img.shields.io/badge/Slack-9F2B68?style=for-the-badge&logo=slack&logoColor=white)](https://join.slack.com/t/nebulagraph/shared_invite/zt-7ybejuqa-NCZBroh~PCh66d9kOQj45g) [![Zoom](https://img.shields.io/badge/Zoom-2D8CFF?style=for-the-badge&logo=zoom&logoColor=white)](https://us02web.zoom.us/meeting/register/tZ0rcuypqDMvGdLuIm4VprTlx96wrEf062SH) [![Google Calendar](https://img.shields.io/badge/Calander-4285F4?style=for-the-badge&logo=google&logoColor=white)](https://calendar.google.com/calendar/u/0?cid=Z29mbGttamM3ZTVlZ2hpazI2cmNlNXVnZThAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ) [![Meetup](https://img.shields.io/badge/Meetup-FF0000?style=for-the-badge&logo=meetup&logoColor=white)](https://www.meetup.com/nebulagraph/events/287180186?utm_medium=referral&utm_campaign=share-btn_savedevents_share_modal&utm_source=link) [![Meeting Archive](https://img.shields.io/badge/Community_wiki-808080?style=for-the-badge&logo=readthedocs&logoColor=white)](https://github.com/vesoft-inc/nebula-community/wiki) [![Discourse](https://img.shields.io/badge/中文论坛-4285F4?style=for-the-badge&logo=discourse&logoColor=white)](https://discuss.nebula-graph.com.cn/) [![Tencent_Meeting](https://img.shields.io/badge/腾讯会议-2D8CFF?style=for-the-badge&logo=googlemeet&logoColor=white)](https://meeting.tencent.com/dm/F8NX1aRZ8PQv) +| Join NebulaGraph Community | Where to Find us | +| ----------------------------------- | ------------------------------------------------------------ | +| Asking Questions | [![Stack Overflow](https://img.shields.io/badge/Stack%20Overflow-nebula--graph-orange?style=for-the-badge&logo=stack-overflow&logoColor=white)](https://stackoverflow.com/questions/tagged/nebula-graph) [![Discussions](https://img.shields.io/badge/GitHub_Discussion-000000?style=for-the-badge&logo=github&logoColor=white)](https://github.com/vesoft-inc/nebula/discussions) | +| Chat with Community Members | [![Chat History](https://img.shields.io/badge/Community%20Chat-000000?style=for-the-badge&logo=discord&logoColor=white)](https://community-chat.nebula-graph.io/) [![Slack](https://img.shields.io/badge/Slack-9F2B68?style=for-the-badge&logo=slack&logoColor=white)](https://join.slack.com/t/nebulagraph/shared_invite/zt-7ybejuqa-NCZBroh~PCh66d9kOQj45g) | +| NebulaGraph Meetup | [![Google Calendar](https://img.shields.io/badge/Calander-4285F4?style=for-the-badge&logo=google&logoColor=white)](https://calendar.google.com/calendar/u/0?cid=Z29mbGttamM3ZTVlZ2hpazI2cmNlNXVnZThAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ) [![Zoom](https://img.shields.io/badge/Zoom-2D8CFF?style=for-the-badge&logo=zoom&logoColor=white)](https://us02web.zoom.us/meeting/register/tZ0rcuypqDMvGdLuIm4VprTlx96wrEf062SH) [![Meetup](https://img.shields.io/badge/Meetup-FF0000?style=for-the-badge&logo=meetup&logoColor=white)](https://www.meetup.com/nebulagraph/events/) [![Meeting Archive](https://img.shields.io/badge/Meeting_Archive-808080?style=for-the-badge&logo=readthedocs&logoColor=white)](https://github.com/vesoft-inc/nebula-community/wiki) | +| Chat, Asking, or Meeting in Chinese | [![WeChat Group](https://img.shields.io/badge/WeChat_Group-000000?style=for-the-badge&logo=wechat)](https://wj.qq.com/s2/8321168/8e2f/) [![Tencent_Meeting](https://img.shields.io/badge/腾讯会议-2D8CFF?style=for-the-badge&logo=googlemeet&logoColor=white)](https://meeting.tencent.com/dm/F8NX1aRZ8PQv) [![Discourse](https://img.shields.io/badge/中文论坛-4285F4?style=for-the-badge&logo=discourse&logoColor=white)](https://discuss.nebula-graph.com.cn/) |
diff --git a/cmake/nebula/SanitizerConfig.cmake b/cmake/nebula/SanitizerConfig.cmake index 28b5d500ef4..9f610e819a3 100644 --- a/cmake/nebula/SanitizerConfig.cmake +++ b/cmake/nebula/SanitizerConfig.cmake @@ -15,6 +15,7 @@ if(ENABLE_ASAN) add_compile_options(-fsanitize=address) add_compile_options(-g) add_compile_options(-fno-omit-frame-pointer) + add_definitions(-DENABLE_ASAN) set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address") endif() diff --git a/src/clients/storage/StorageClientBase-inl.h b/src/clients/storage/StorageClientBase-inl.h index 84fef19a840..beb8d25d958 100644 --- a/src/clients/storage/StorageClientBase-inl.h +++ b/src/clients/storage/StorageClientBase-inl.h @@ -101,16 +101,21 @@ StorageClientBase::collectResponse( return folly::collectAll(respFutures) .deferValue([this, requests = std::move(requests), totalLatencies, hosts]( std::vector>>&& resps) { + // throw in MemoryCheckGuard verified memory::MemoryCheckGuard guard; StorageRpcResponse rpcResp(resps.size()); for (size_t i = 0; i < resps.size(); i++) { auto& host = hosts->at(i); - auto& tryResp = resps[i]; - std::optional errMsg; + folly::Try>& tryResp = resps[i]; if (tryResp.hasException()) { - errMsg = std::string(tryResp.exception().what().c_str()); + std::string errMsg = tryResp.exception().what().toStdString(); + rpcResp.markFailure(); + LOG(ERROR) << "There some RPC errors: " << errMsg; + auto req = requests.at(host); + auto parts = getReqPartsId(req); + rpcResp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE); } else { - auto status = std::move(tryResp).value(); + StatusOr status = std::move(tryResp).value(); if (status.ok()) { auto resp = std::move(status).value(); auto result = resp.get_result(); @@ -128,17 +133,18 @@ StorageClientBase::collectResponse( // Keep the response rpcResp.addResponse(std::move(resp)); } else { - errMsg = std::move(status).status().message(); + rpcResp.markFailure(); + Status s = std::move(status).status(); + nebula::cpp2::ErrorCode errorCode = + s.code() == Status::Code::kGraphMemoryExceeded + ? nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED + : nebula::cpp2::ErrorCode::E_RPC_FAILURE; + LOG(ERROR) << "There some RPC errors: " << s.message(); + auto req = requests.at(host); + auto parts = getReqPartsId(req); + rpcResp.appendFailedParts(parts, errorCode); } } - - if (errMsg) { - rpcResp.markFailure(); - LOG(ERROR) << "There some RPC errors: " << errMsg.value(); - auto req = requests.at(host); - auto parts = getReqPartsId(req); - rpcResp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE); - } } return rpcResp; @@ -160,12 +166,16 @@ folly::Future> StorageClientBaseclient(host, evb, false, FLAGS_storage_client_timeout_ms); + // Encoding invoke Cpp2Ops::write the request to protocol is in current thread, + // do not need to turn on in Cpp2Ops::write return remoteFunc(client.get(), request); }) .thenValue([spaceId, this](Response&& resp) mutable -> StatusOr { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; auto& result = resp.get_result(); for (auto& part : result.get_failed_parts()) { @@ -196,14 +206,12 @@ folly::Future> StorageClientBase{}, - [](const std::bad_alloc&) { - return folly::makeFuture>(std::bad_alloc()); - }) - .thenError(folly::tag_t{}, - [](const std::exception& e) { - return folly::makeFuture>(std::runtime_error(e.what())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc&) { + return folly::makeFuture>(Status::GraphMemoryExceeded( + "(%d)", static_cast(nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED))); + }) .thenError([request, host, spaceId, this]( folly::exception_wrapper&& exWrapper) mutable -> StatusOr { stats::StatsManager::addValue(kNumRpcSentToStoragedFailed); diff --git a/src/codec/test/CMakeLists.txt b/src/codec/test/CMakeLists.txt index 9b4e1272096..8c511451eaf 100644 --- a/src/codec/test/CMakeLists.txt +++ b/src/codec/test/CMakeLists.txt @@ -22,6 +22,8 @@ set(CODEC_TEST_LIBS $ $ $ + $ + $ $ $ $ diff --git a/src/common/base/Status.cpp b/src/common/base/Status.cpp index d80c31cf370..51d4f8698ac 100644 --- a/src/common/base/Status.cpp +++ b/src/common/base/Status.cpp @@ -67,10 +67,14 @@ const char *Status::toString(Code code) { return "StatementEmpty: "; case kSemanticError: return "SemanticError: "; + case kGraphMemoryExceeded: + return "GraphMemoryExceeded: "; case kKeyNotFound: return "KeyNotFound: "; case kPartialSuccess: return "PartialSuccess: "; + case kStorageMemoryExceeded: + return "StorageMemoryExceeded: "; case kSpaceNotFound: return "SpaceNotFound: "; case kHostNotFound: diff --git a/src/common/base/Status.h b/src/common/base/Status.h index 2783cb7a70b..598320ad675 100644 --- a/src/common/base/Status.h +++ b/src/common/base/Status.h @@ -119,12 +119,15 @@ class Status final { // Graph engine errors STATUS_GENERATOR(SyntaxError); STATUS_GENERATOR(SemanticError); + STATUS_GENERATOR(GraphMemoryExceeded); + // Nothing is executed When command is comment STATUS_GENERATOR(StatementEmpty); // Storage engine errors STATUS_GENERATOR(KeyNotFound); STATUS_GENERATOR(PartialSuccess); + STATUS_GENERATOR(StorageMemoryExceeded); // Meta engine errors // TODO(dangleptr) we could use ErrorOr to replace SpaceNotFound here. @@ -166,9 +169,11 @@ class Status final { kSyntaxError = 201, kStatementEmpty = 202, kSemanticError = 203, + kGraphMemoryExceeded = 204, // 3xx, for storage engine errors kKeyNotFound = 301, kPartialSuccess = 302, + kStorageMemoryExceeded = 303, // 4xx, for meta service errors kSpaceNotFound = 404, kHostNotFound = 405, diff --git a/src/common/base/test/CMakeLists.txt b/src/common/base/test/CMakeLists.txt index 4963639cdf3..e644906d920 100644 --- a/src/common/base/test/CMakeLists.txt +++ b/src/common/base/test/CMakeLists.txt @@ -110,8 +110,10 @@ nebula_add_executable( $ $ $ + $ $ $ + $ $ $ $ diff --git a/src/common/datatypes/DataSetOps-inl.h b/src/common/datatypes/DataSetOps-inl.h index 2290df8bd75..89207de8497 100644 --- a/src/common/datatypes/DataSetOps-inl.h +++ b/src/common/datatypes/DataSetOps-inl.h @@ -13,6 +13,7 @@ #include "common/base/Base.h" #include "common/datatypes/CommonCpp2Ops.h" #include "common/datatypes/DataSet.h" +#include "common/memory/MemoryTracker.h" namespace apache { namespace thrift { @@ -47,7 +48,10 @@ inline constexpr protocol::TType Cpp2Ops::thriftType() { template uint32_t Cpp2Ops::write(Protocol* proto, nebula::DataSet const* obj) { + // we do not turn on memory tracker here, when the DataSet object is creating & inserting, it is + // in Processor::process(), where memory tracker is turned on. so we think that is enough. uint32_t xfer = 0; + xfer += proto->writeStructBegin("DataSet"); xfer += proto->writeFieldBegin("column_names", protocol::T_LIST, 1); @@ -62,11 +66,20 @@ uint32_t Cpp2Ops::write(Protocol* proto, nebula::DataSet const* xfer += proto->writeFieldStop(); xfer += proto->writeStructEnd(); + return xfer; } template void Cpp2Ops::read(Protocol* proto, nebula::DataSet* obj) { + // memory usage during decode a StorageResponse should be mostly occupied + // by DataSet (see interface/storage.thrift), turn on memory check here. + // + // MemoryTrackerVerified: + // throw std::bad_alloc has verified, can be captured in + // StorageClientBase::getResponse's onError + nebula::memory::MemoryCheckGuard guard; + apache::thrift::detail::ProtocolReaderStructReadState readState; readState.readStructBegin(proto); diff --git a/src/common/datatypes/ValueOps-inl.h b/src/common/datatypes/ValueOps-inl.h index d9ab04320f4..c58cb0a5261 100644 --- a/src/common/datatypes/ValueOps-inl.h +++ b/src/common/datatypes/ValueOps-inl.h @@ -103,6 +103,7 @@ template uint32_t Cpp2Ops::write(Protocol* proto, nebula::Value const* obj) { uint32_t xfer = 0; xfer += proto->writeStructBegin("Value"); + // MemoryTrackerVerified: throw bad_alloc verified switch (obj->type()) { case nebula::Value::Type::NULLVALUE: { diff --git a/src/common/datatypes/test/CMakeLists.txt b/src/common/datatypes/test/CMakeLists.txt index 0fdc5fde32c..f31731cf58b 100644 --- a/src/common/datatypes/test/CMakeLists.txt +++ b/src/common/datatypes/test/CMakeLists.txt @@ -43,6 +43,7 @@ nebula_add_test( $ $ $ + $ $ $ $ @@ -115,6 +116,7 @@ nebula_add_test( $ $ $ + $ $ $ LIBRARIES diff --git a/src/common/expression/test/CMakeLists.txt b/src/common/expression/test/CMakeLists.txt index b2884aecd67..d2c8b304859 100644 --- a/src/common/expression/test/CMakeLists.txt +++ b/src/common/expression/test/CMakeLists.txt @@ -128,6 +128,7 @@ nebula_add_executable( $ $ $ + $ $ $ $ @@ -153,6 +154,7 @@ nebula_add_executable( $ $ $ + $ $ $ $ diff --git a/src/common/geo/test/CMakeLists.txt b/src/common/geo/test/CMakeLists.txt index 932fffb14e6..5b37c79ffb9 100644 --- a/src/common/geo/test/CMakeLists.txt +++ b/src/common/geo/test/CMakeLists.txt @@ -30,6 +30,9 @@ nebula_add_test( $ $ $ + $ + $ + $ $ $ $ diff --git a/src/common/graph/tests/CMakeLists.txt b/src/common/graph/tests/CMakeLists.txt index cba41ef98ae..83465cf8f09 100644 --- a/src/common/graph/tests/CMakeLists.txt +++ b/src/common/graph/tests/CMakeLists.txt @@ -11,6 +11,9 @@ nebula_add_test( $ $ $ + $ + $ + $ $ $ $ diff --git a/src/common/id/test/CMakeLists.txt b/src/common/id/test/CMakeLists.txt index bcd640bd625..afd71499bdc 100644 --- a/src/common/id/test/CMakeLists.txt +++ b/src/common/id/test/CMakeLists.txt @@ -28,6 +28,7 @@ nebula_add_executable( $ $ $ + $ $ $ $ @@ -67,6 +68,7 @@ nebula_add_test( $ $ $ + $ $ $ $ diff --git a/src/common/memory/MemoryTracker.cpp b/src/common/memory/MemoryTracker.cpp index d7bc96432a4..548e4aacb38 100644 --- a/src/common/memory/MemoryTracker.cpp +++ b/src/common/memory/MemoryTracker.cpp @@ -38,6 +38,10 @@ void MemoryTracker::free(int64_t size) { MemoryStats::instance().free(size); } +bool MemoryTracker::isOn() { + return MemoryStats::instance().throwOnMemoryExceeded(); +} + void MemoryTracker::allocImpl(int64_t size, bool) { MemoryStats::instance().alloc(size); } diff --git a/src/common/memory/MemoryTracker.h b/src/common/memory/MemoryTracker.h index 1d2f344d56f..f42d5223ad9 100644 --- a/src/common/memory/MemoryTracker.h +++ b/src/common/memory/MemoryTracker.h @@ -138,6 +138,11 @@ class MemoryStats { threadMemoryStats_.throwOnMemoryExceeded = false; } + // return true if current thread's throwOnMemoryExceeded' + static bool throwOnMemoryExceeded() { + return threadMemoryStats_.throwOnMemoryExceeded; + } + private: inline ALWAYS_INLINE void allocGlobal(int64_t size) { int64_t willBe = size + used_.fetch_add(size, std::memory_order_relaxed); @@ -182,6 +187,9 @@ struct MemoryTracker { /// This function should be called after memory deallocation. static void free(int64_t size); + /// Test state of memory tracker, return true if memory tracker is turned on, otherwise false. + static bool isOn(); + private: static void allocImpl(int64_t size, bool throw_if_memory_exceeded); }; diff --git a/src/common/memory/MemoryUtils.cpp b/src/common/memory/MemoryUtils.cpp index aded2dd96cf..1d7170baffb 100644 --- a/src/common/memory/MemoryUtils.cpp +++ b/src/common/memory/MemoryUtils.cpp @@ -119,7 +119,8 @@ StatusOr MemoryUtils::hitsHighWatermark() { } // MemoryStats depends on jemalloc -#if ENABLE_JEMALLOC +#ifdef ENABLE_JEMALLOC +#ifndef ENABLE_ASAN // set MemoryStats limit (MemoryTracker track-able memory) int64_t trackable = total - FLAGS_memory_tracker_untracked_reserved_memory_mb * MiB; if (trackable > 0) { @@ -134,16 +135,9 @@ StatusOr MemoryUtils::hitsHighWatermark() { if (FLAGS_memory_purge_enabled) { int64_t now = time::WallClock::fastNowInSec(); if (now - kLastPurge_ > FLAGS_memory_purge_interval_seconds) { - // mallctl seems has issue with address_sanitizer, do purge only when address_sanitizer is off -#if defined(__clang__) -#if defined(__has_feature) -#if not __has_feature(address_sanitizer) - mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0); -#endif -#endif -#else // gcc + // jemalloc seems has issue with address_sanitizer, do purge only when address_sanitizer is + // off mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0); -#endif kLastPurge_ = now; } } @@ -173,7 +167,7 @@ StatusOr MemoryUtils::hitsHighWatermark() { kLastPrintMemoryTrackerStats_ = now; } } - +#endif #endif auto hits = (1 - available / total) > FLAGS_system_memory_high_watermark_ratio; diff --git a/src/common/memory/NewDelete.cpp b/src/common/memory/NewDelete.cpp index 4ff129cf331..5bc67076553 100644 --- a/src/common/memory/NewDelete.cpp +++ b/src/common/memory/NewDelete.cpp @@ -11,23 +11,16 @@ /// Two condition need check before MemoryTracker is on /// 1. jemalloc is used /// MemoryTracker need jemalloc API to get accurate size of alloc/free memory. -#if ENABLE_JEMALLOC /// 2. address_sanitizer is off /// sanitizer has already override the new/delete operator, /// only override new/delete operator only when address_sanitizer is off -#if defined(__clang__) -#if defined(__has_feature) -#if not __has_feature(address_sanitizer) +#ifdef ENABLE_JEMALLOC +#ifndef ENABLE_ASAN #define ENABLE_MEMORY_TRACKER #endif #endif -#else // gcc -#define ENABLE_MEMORY_TRACKER -#endif -#endif - -#if defined(ENABLE_MEMORY_TRACKER) +#ifdef ENABLE_MEMORY_TRACKER /// new void *operator new(std::size_t size) { nebula::memory::trackMemory(size); diff --git a/src/common/utils/test/CMakeLists.txt b/src/common/utils/test/CMakeLists.txt index afbdc3e9de4..63727036237 100644 --- a/src/common/utils/test/CMakeLists.txt +++ b/src/common/utils/test/CMakeLists.txt @@ -21,6 +21,9 @@ nebula_add_test( $ $ $ + $ + $ + $ $ $ LIBRARIES @@ -47,6 +50,9 @@ nebula_add_test( $ $ $ + $ + $ + $ $ $ LIBRARIES @@ -76,6 +82,9 @@ nebula_add_test( $ $ $ + $ + $ + $ $ $ LIBRARIES @@ -94,6 +103,8 @@ nebula_add_test( $ $ $ + $ + $ $ $ $ diff --git a/src/graph/executor/Executor.cpp b/src/graph/executor/Executor.cpp index e3d62265b8b..313d8cd7f59 100644 --- a/src/graph/executor/Executor.cpp +++ b/src/graph/executor/Executor.cpp @@ -603,6 +603,8 @@ Status Executor::open() { } Status Executor::close() { + // MemoryTrackerVerified + ProfilingStats stats; stats.totalDurationInUs = totalDuration_.elapsedInUSec(); stats.rows = numRows_; @@ -725,6 +727,7 @@ bool Executor::movable(const Variable *var) { } Status Executor::finish(Result &&result) { + // MemoryTrackerVerified if (!FLAGS_enable_lifetime_optimize || node()->outputVarPtr()->userCount.load(std::memory_order_relaxed) != 0) { numRows_ = result.size(); diff --git a/src/graph/executor/Executor.h b/src/graph/executor/Executor.h index 670eb4df302..ba9f90970cd 100644 --- a/src/graph/executor/Executor.h +++ b/src/graph/executor/Executor.h @@ -77,8 +77,8 @@ class Executor : private boost::noncopyable, private cpp::NonMovable { folly::Future error(Status status) const; static Status memoryExceededStatus() { - return Status::Error("Graph Error: GRAPH_MEMORY_EXCEEDED(%d)", - static_cast(nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED)); + return Status::GraphMemoryExceeded( + "(%d)", static_cast(nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED)); } protected: @@ -159,6 +159,7 @@ auto Executor::runMultiJobs(ScatterFunc &&scatter, GatherFunc &&gather, Iterator futures.emplace_back(folly::via( runner(), [begin, end, tmpIter = iter->copy(), f = std::move(scatter)]() mutable -> ScatterResult { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; // Since not all iterators are linear, so iterates to the begin pos size_t tmp = 0; @@ -173,17 +174,7 @@ auto Executor::runMultiJobs(ScatterFunc &&scatter, GatherFunc &&gather, Iterator } // Gather all results and do post works - return folly::collect(futures) - .via(runner()) - .thenValue(std::move(gather)) - .thenError(folly::tag_t{}, - [](const std::bad_alloc &) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); - }); + return folly::collect(futures).via(runner()).thenValue(std::move(gather)); } } // namespace graph } // namespace nebula diff --git a/src/graph/executor/StorageAccessExecutor.h b/src/graph/executor/StorageAccessExecutor.h index 18ddbe83e40..a3881325f7c 100644 --- a/src/graph/executor/StorageAccessExecutor.h +++ b/src/graph/executor/StorageAccessExecutor.h @@ -131,9 +131,11 @@ class StorageAccessExecutor : public Executor { "Storage Error: Part {} raft buffer is full. Please retry later.", partId)); case nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED: return Status::Error("Storage Error: Atomic operation failed."); + // E_GRAPH_MEMORY_EXCEEDED may happen during rpc response deserialize. + case nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED: + return Status::GraphMemoryExceeded("(%d)", static_cast(code)); case nebula::cpp2::ErrorCode::E_STORAGE_MEMORY_EXCEEDED: - return Status::Error("Storage Error: STORAGE_MEMORY_EXCEEDED(%d)", - static_cast(code)); + return Status::StorageMemoryExceeded("(%d)", static_cast(code)); default: auto status = Status::Error("Storage Error: part: %d, error: %s(%d).", partId, diff --git a/src/graph/executor/algo/BFSShortestPathExecutor.cpp b/src/graph/executor/algo/BFSShortestPathExecutor.cpp index 7cb4fda3447..cf9faafa0d9 100644 --- a/src/graph/executor/algo/BFSShortestPathExecutor.cpp +++ b/src/graph/executor/algo/BFSShortestPathExecutor.cpp @@ -10,6 +10,8 @@ DECLARE_int32(num_operator_threads); namespace nebula { namespace graph { folly::Future BFSShortestPathExecutor::execute() { + // MemoryTrackerVerified + SCOPED_TIMER(&execTime_); pathNode_ = asNode(node()); terminateEarlyVar_ = pathNode_->terminateEarlyVar(); @@ -30,10 +32,12 @@ folly::Future BFSShortestPathExecutor::execute() { std::vector> futures; auto leftFuture = folly::via(runner(), [this]() { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; return buildPath(false); }); auto rightFuture = folly::via(runner(), [this]() { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; return buildPath(true); }); @@ -111,6 +115,7 @@ Status BFSShortestPathExecutor::buildPath(bool reverse) { currentEdges.emplace(std::move(dst), std::move(edge)); } } + // set nextVid const auto& nextVidVar = reverse ? pathNode_->rightVidVar() : pathNode_->leftVidVar(); ectx_->setResult(nextVidVar, ResultBuilder().value(std::move(nextStepVids)).build()); @@ -170,12 +175,6 @@ folly::Future BFSShortestPathExecutor::conjunctPath() { currentDs_.append(std::move(resp)); } return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/algo/BatchShortestPath.cpp b/src/graph/executor/algo/BatchShortestPath.cpp index 2400364b592..95ac030019d 100644 --- a/src/graph/executor/algo/BatchShortestPath.cpp +++ b/src/graph/executor/algo/BatchShortestPath.cpp @@ -15,6 +15,7 @@ namespace graph { folly::Future BatchShortestPath::execute(const HashSet& startVids, const HashSet& endVids, DataSet* result) { + // MemoryTrackerVerified size_t rowSize = init(startVids, endVids); std::vector> futures; futures.reserve(rowSize); @@ -22,26 +23,18 @@ folly::Future BatchShortestPath::execute(const HashSet& startVids, resultDs_[rowNum].colNames = pathNode_->colNames(); futures.emplace_back(shortestPath(rowNum, 1)); } - return folly::collect(futures) - .via(qctx_->rctx()->runner()) - .thenValue([this, result](auto&& resps) { - memory::MemoryCheckGuard guard; - for (auto& resp : resps) { - NG_RETURN_IF_ERROR(resp); - } - result->colNames = pathNode_->colNames(); - for (auto& ds : resultDs_) { - result->append(std::move(ds)); - } - return Status::OK(); - }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(Executor::memoryExceededStatus()); - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); - }); + return folly::collect(futures).via(runner()).thenValue([this, result](auto&& resps) { + // MemoryTrackerVerified + memory::MemoryCheckGuard guard; + for (auto& resp : resps) { + NG_RETURN_IF_ERROR(resp); + } + result->colNames = pathNode_->colNames(); + for (auto& ds : resultDs_) { + result->append(std::move(ds)); + } + return Status::OK(); + }); } size_t BatchShortestPath::init(const HashSet& startVids, const HashSet& endVids) { @@ -106,8 +99,9 @@ folly::Future BatchShortestPath::shortestPath(size_t rowNum, size_t step futures.emplace_back(getNeighbors(rowNum, stepNum, false)); futures.emplace_back(getNeighbors(rowNum, stepNum, true)); return folly::collect(futures) - .via(qctx_->rctx()->runner()) + .via(runner()) .thenValue([this, rowNum, stepNum](auto&& resps) { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; for (auto& resp : resps) { if (!resp.ok()) { @@ -116,6 +110,10 @@ folly::Future BatchShortestPath::shortestPath(size_t rowNum, size_t step } return handleResponse(rowNum, stepNum); }) + // This thenError is necessary to catch bad_alloc, seems the returned future + // is related to two routines: getNeighbors, handleResponse, each of them launch some task in + // separate thread, if any one of routine throw bad_alloc, fail the query, will cause another + // to run on a maybe already released BatchShortestPath object .thenError(folly::tag_t{}, [](const std::bad_alloc&) { return folly::makeFuture(Executor::memoryExceededStatus()); @@ -151,18 +149,12 @@ folly::Future BatchShortestPath::getNeighbors(size_t rowNum, size_t step -1, nullptr, nullptr) - .via(qctx_->rctx()->runner()) + .via(runner()) .thenValue([this, rowNum, reverse, stepNum, getNbrTime](auto&& resp) { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; addStats(resp, stepNum, getNbrTime.elapsedInUSec(), reverse); return buildPath(rowNum, std::move(resp), reverse); - }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(Executor::memoryExceededStatus()); - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -281,14 +273,19 @@ Status BatchShortestPath::doBuildPath(size_t rowNum, GetNeighborsIter* iter, boo folly::Future BatchShortestPath::handleResponse(size_t rowNum, size_t stepNum) { return folly::makeFuture(Status::OK()) - .via(qctx_->rctx()->runner()) + .via(runner()) .thenValue([this, rowNum](auto&& status) { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; + // odd step UNUSED(status); return conjunctPath(rowNum, true); }) .thenValue([this, rowNum, stepNum](auto&& terminate) { + // MemoryTrackerVerified + memory::MemoryCheckGuard guard; + // even Step if (terminate || stepNum * 2 > maxStep_) { return folly::makeFuture(true); @@ -296,6 +293,9 @@ folly::Future BatchShortestPath::handleResponse(size_t rowNum, size_t st return conjunctPath(rowNum, false); }) .thenValue([this, rowNum, stepNum](auto&& result) { + // MemoryTrackerVerified + memory::MemoryCheckGuard guard; + if (result || stepNum * 2 >= maxStep_) { return folly::makeFuture(Status::OK()); } @@ -319,13 +319,6 @@ folly::Future BatchShortestPath::handleResponse(size_t rowNum, size_t st leftPathMap.clear(); rightPathMap.clear(); return shortestPath(rowNum, stepNum + 1); - }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(Executor::memoryExceededStatus()); - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -379,64 +372,61 @@ folly::Future BatchShortestPath::conjunctPath(size_t rowNum, bool oddStep) } auto future = getMeetVids(rowNum, oddStep, meetVids); - return future.via(qctx_->rctx()->runner()) - .thenValue([this, rowNum, oddStep](auto&& vertices) { - memory::MemoryCheckGuard guard; - if (vertices.empty()) { - return false; - } - robin_hood::unordered_flat_map> verticesMap; - for (auto& vertex : vertices) { - verticesMap[vertex.getVertex().vid] = std::move(vertex); + return future.via(runner()).thenValue([this, rowNum, oddStep](auto&& vertices) { + // MemoryTrackerVerified + memory::MemoryCheckGuard guard; + + if (vertices.empty()) { + return false; + } + robin_hood::unordered_flat_map> verticesMap; + for (auto& vertex : vertices) { + verticesMap[vertex.getVertex().vid] = std::move(vertex); + } + auto& terminationMap = terminationMaps_[rowNum]; + auto& leftPathMaps = currentLeftPathMaps_[rowNum]; + auto& rightPathMaps = oddStep ? preRightPathMaps_[rowNum] : currentRightPathMaps_[rowNum]; + for (const auto& leftPathMap : leftPathMaps) { + auto findCommonVid = rightPathMaps.find(leftPathMap.first); + if (findCommonVid == rightPathMaps.end()) { + continue; + } + auto findCommonVertex = verticesMap.find(findCommonVid->first); + if (findCommonVertex == verticesMap.end()) { + continue; + } + auto& rightPaths = findCommonVid->second; + for (const auto& srcPaths : leftPathMap.second) { + auto range = terminationMap.equal_range(srcPaths.first); + if (range.first == range.second) { + continue; } - auto& terminationMap = terminationMaps_[rowNum]; - auto& leftPathMaps = currentLeftPathMaps_[rowNum]; - auto& rightPathMaps = oddStep ? preRightPathMaps_[rowNum] : currentRightPathMaps_[rowNum]; - for (const auto& leftPathMap : leftPathMaps) { - auto findCommonVid = rightPathMaps.find(leftPathMap.first); - if (findCommonVid == rightPathMaps.end()) { - continue; - } - auto findCommonVertex = verticesMap.find(findCommonVid->first); - if (findCommonVertex == verticesMap.end()) { - continue; - } - auto& rightPaths = findCommonVid->second; - for (const auto& srcPaths : leftPathMap.second) { - auto range = terminationMap.equal_range(srcPaths.first); - if (range.first == range.second) { - continue; - } - for (const auto& dstPaths : rightPaths) { - for (auto found = range.first; found != range.second; ++found) { - if (found->second.first == dstPaths.first) { - if (singleShortest_ && !found->second.second) { - break; - } - doConjunctPath( - srcPaths.second, dstPaths.second, findCommonVertex->second, rowNum); - found->second.second = false; - } + for (const auto& dstPaths : rightPaths) { + for (auto found = range.first; found != range.second; ++found) { + if (found->second.first == dstPaths.first) { + if (singleShortest_ && !found->second.second) { + break; } + doConjunctPath(srcPaths.second, dstPaths.second, findCommonVertex->second, rowNum); + found->second.second = false; } } } - // update terminationMap - for (auto iter = terminationMap.begin(); iter != terminationMap.end();) { - if (!iter->second.second) { - iter = terminationMap.erase(iter); - } else { - ++iter; - } - } - if (terminationMap.empty()) { - return true; - } - return false; - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); - }); + } + } + // update terminationMap + for (auto iter = terminationMap.begin(); iter != terminationMap.end();) { + if (!iter->second.second) { + iter = terminationMap.erase(iter); + } else { + ++iter; + } + } + if (terminationMap.empty()) { + return true; + } + return false; + }); } void BatchShortestPath::doConjunctPath(const std::vector& leftPaths, diff --git a/src/graph/executor/algo/MultiShortestPathExecutor.cpp b/src/graph/executor/algo/MultiShortestPathExecutor.cpp index f77dc4d9d59..d9f0fddff15 100644 --- a/src/graph/executor/algo/MultiShortestPathExecutor.cpp +++ b/src/graph/executor/algo/MultiShortestPathExecutor.cpp @@ -46,6 +46,7 @@ folly::Future MultiShortestPathExecutor::execute() { return conjunctPath(false); }) .thenValue([this](auto&& resp) { + memory::MemoryCheckGuard guard; UNUSED(resp); preRightPaths_ = rightPaths_; // update history @@ -311,30 +312,25 @@ folly::Future MultiShortestPathExecutor::conjunctPath(bool oddStep) { futures.emplace_back(std::move(future)); } - return folly::collect(futures) - .via(runner()) - .thenValue([this](auto&& resps) { - memory::MemoryCheckGuard guard; - for (auto& resp : resps) { - currentDs_.append(std::move(resp)); - } + return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) { + memory::MemoryCheckGuard guard; + for (auto& resp : resps) { + currentDs_.append(std::move(resp)); + } - for (auto iter = terminationMap_.begin(); iter != terminationMap_.end();) { - if (!iter->second.second) { - iter = terminationMap_.erase(iter); - } else { - ++iter; - } - } - if (terminationMap_.empty()) { - ectx_->setValue(terminationVar_, true); - return true; - } - return false; - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); - }); + for (auto iter = terminationMap_.begin(); iter != terminationMap_.end();) { + if (!iter->second.second) { + iter = terminationMap_.erase(iter); + } else { + ++iter; + } + } + if (terminationMap_.empty()) { + ectx_->setValue(terminationVar_, true); + return true; + } + return false; + }); } void MultiShortestPathExecutor::setNextStepVid(const Interims& paths, const string& var) { diff --git a/src/graph/executor/algo/ProduceAllPathsExecutor.cpp b/src/graph/executor/algo/ProduceAllPathsExecutor.cpp index 44e771ab7cd..f2b709a000f 100644 --- a/src/graph/executor/algo/ProduceAllPathsExecutor.cpp +++ b/src/graph/executor/algo/ProduceAllPathsExecutor.cpp @@ -28,10 +28,12 @@ folly::Future ProduceAllPathsExecutor::execute() { } std::vector> futures; auto leftFuture = folly::via(runner(), [this]() { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; return buildPath(false); }); auto rightFuture = folly::via(runner(), [this]() { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; return buildPath(true); }); @@ -156,12 +158,14 @@ folly::Future ProduceAllPathsExecutor::conjunctPath() { auto endIter = leftIter; endIter++; auto oddStepFuture = folly::via(runner(), [this, startIter, endIter]() { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; return doConjunct(startIter, endIter, true); }); futures.emplace_back(std::move(oddStepFuture)); if (step_ * 2 <= pathNode_->steps()) { auto evenStepFuture = folly::via(runner(), [this, startIter, endIter]() { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; return doConjunct(startIter, endIter, false); }); @@ -175,12 +179,14 @@ folly::Future ProduceAllPathsExecutor::conjunctPath() { if (i != 0) { auto endIter = leftPaths_.end(); auto oddStepFuture = folly::via(runner(), [this, startIter, endIter]() { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; return doConjunct(startIter, endIter, true); }); futures.emplace_back(std::move(oddStepFuture)); if (step_ * 2 <= pathNode_->steps()) { auto evenStepFuture = folly::via(runner(), [this, startIter, endIter]() { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; return doConjunct(startIter, endIter, false); }); @@ -200,12 +206,6 @@ folly::Future ProduceAllPathsExecutor::conjunctPath() { leftPaths_.clear(); rightPaths_.clear(); return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/algo/ShortestPathBase.cpp b/src/graph/executor/algo/ShortestPathBase.cpp index 9bc82ed0ed9..e809f27f72b 100644 --- a/src/graph/executor/algo/ShortestPathBase.cpp +++ b/src/graph/executor/algo/ShortestPathBase.cpp @@ -197,5 +197,12 @@ void ShortestPathBase::addStats(PropRpcResponse& resp, int64_t timeInUSec) const statsLock_.unlock(); } +folly::Executor* ShortestPathBase::runner() const { + if (!qctx_ || !qctx_->rctx() || !qctx_->rctx()->runner()) { + return &folly::InlineExecutor::instance(); + } + return qctx_->rctx()->runner(); +} + } // namespace graph } // namespace nebula diff --git a/src/graph/executor/algo/ShortestPathBase.h b/src/graph/executor/algo/ShortestPathBase.h index b52767a9d6e..d107f7fbbcd 100644 --- a/src/graph/executor/algo/ShortestPathBase.h +++ b/src/graph/executor/algo/ShortestPathBase.h @@ -51,6 +51,8 @@ class ShortestPathBase { void addStats(PropRpcResponse& resp, int64_t timeInUSec) const; + folly::Executor* runner() const; + template StatusOr handleCompleteness(const storage::StorageRpcResponse& rpcResp, bool isPartialSuccessAccepted) const { diff --git a/src/graph/executor/algo/ShortestPathExecutor.cpp b/src/graph/executor/algo/ShortestPathExecutor.cpp index d8269ae074f..98381a9e72a 100644 --- a/src/graph/executor/algo/ShortestPathExecutor.cpp +++ b/src/graph/executor/algo/ShortestPathExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/algo/ShortestPathExecutor.h" +#include "common/memory/MemoryTracker.h" #include "graph/executor/algo/BatchShortestPath.h" #include "graph/executor/algo/SingleShortestPath.h" #include "graph/service/GraphFlags.h" @@ -15,6 +16,7 @@ DEFINE_uint32(num_path_thread, 0, "number of concurrent threads when do shortest namespace nebula { namespace graph { folly::Future ShortestPathExecutor::execute() { + // MemoryTrackerVerified SCOPED_TIMER(&execTime_); if (FLAGS_num_path_thread == 0) { FLAGS_num_path_thread = get_nprocs_conf(); diff --git a/src/graph/executor/algo/SingleShortestPath.cpp b/src/graph/executor/algo/SingleShortestPath.cpp index 69d3dba483a..fd509d765f6 100644 --- a/src/graph/executor/algo/SingleShortestPath.cpp +++ b/src/graph/executor/algo/SingleShortestPath.cpp @@ -21,26 +21,17 @@ folly::Future SingleShortestPath::execute(const HashSet& startVids, resultDs_[rowNum].colNames = pathNode_->colNames(); futures.emplace_back(shortestPath(rowNum, 1)); } - return folly::collect(futures) - .via(qctx_->rctx()->runner()) - .thenValue([this, result](auto&& resps) { - memory::MemoryCheckGuard guard; - for (auto& resp : resps) { - NG_RETURN_IF_ERROR(resp); - } - result->colNames = pathNode_->colNames(); - for (auto& ds : resultDs_) { - result->append(std::move(ds)); - } - return Status::OK(); - }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(Executor::memoryExceededStatus()); - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); - }); + return folly::collect(futures).via(runner()).thenValue([this, result](auto&& resps) { + memory::MemoryCheckGuard guard; + for (auto& resp : resps) { + NG_RETURN_IF_ERROR(resp); + } + result->colNames = pathNode_->colNames(); + for (auto& ds : resultDs_) { + result->append(std::move(ds)); + } + return Status::OK(); + }); } void SingleShortestPath::init(const HashSet& startVids, const HashSet& endVids, size_t rowSize) { @@ -75,7 +66,7 @@ folly::Future SingleShortestPath::shortestPath(size_t rowNum, size_t ste futures.emplace_back(getNeighbors(rowNum, stepNum, false)); futures.emplace_back(getNeighbors(rowNum, stepNum, true)); return folly::collect(futures) - .via(qctx_->rctx()->runner()) + .via(runner()) .thenValue([this, rowNum, stepNum](auto&& resps) { memory::MemoryCheckGuard guard; for (auto& resp : resps) { @@ -122,18 +113,11 @@ folly::Future SingleShortestPath::getNeighbors(size_t rowNum, -1, nullptr, nullptr) - .via(qctx_->rctx()->runner()) + .via(runner()) .thenValue([this, rowNum, stepNum, getNbrTime, reverse](auto&& resp) { memory::MemoryCheckGuard guard; addStats(resp, stepNum, getNbrTime.elapsedInUSec(), reverse); return buildPath(rowNum, std::move(resp), reverse); - }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(Executor::memoryExceededStatus()); - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -198,13 +182,16 @@ Status SingleShortestPath::doBuildPath(size_t rowNum, GetNeighborsIter* iter, bo folly::Future SingleShortestPath::handleResponse(size_t rowNum, size_t stepNum) { return folly::makeFuture(Status::OK()) - .via(qctx_->rctx()->runner()) + .via(runner()) .thenValue([this, rowNum, stepNum](auto&& status) { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; + UNUSED(status); return conjunctPath(rowNum, stepNum); }) .thenValue([this, rowNum, stepNum](auto&& result) { + memory::MemoryCheckGuard guard; if (result || stepNum * 2 >= maxStep_) { return folly::makeFuture(Status::OK()); } @@ -214,13 +201,6 @@ folly::Future SingleShortestPath::handleResponse(size_t rowNum, size_t s return folly::makeFuture(Status::OK()); } return shortestPath(rowNum, stepNum + 1); - }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(Executor::memoryExceededStatus()); - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -281,38 +261,36 @@ void SingleShortestPath::buildOddPath(size_t rowNum, const std::vector& m folly::Future SingleShortestPath::buildEvenPath(size_t rowNum, const std::vector& meetVids) { auto future = getMeetVidsProps(meetVids); - return future.via(qctx_->rctx()->runner()) - .thenValue([this, rowNum](auto&& vertices) { - memory::MemoryCheckGuard guard; - if (vertices.empty()) { - return false; - } - for (auto& meetVertex : vertices) { - if (!meetVertex.isVertex()) { - continue; - } - auto meetVid = meetVertex.getVertex().vid; - auto leftPaths = createLeftPath(rowNum, meetVid); - auto rightPaths = createRightPath(rowNum, meetVid, false); - for (auto& leftPath : leftPaths) { - for (auto& rightPath : rightPaths) { - Row path = leftPath; - auto& steps = path.values.back().mutableList().values; - steps.emplace_back(meetVertex); - steps.insert(steps.end(), rightPath.values.begin(), rightPath.values.end() - 1); - path.emplace_back(rightPath.values.back()); - resultDs_[rowNum].rows.emplace_back(std::move(path)); - if (singleShortest_) { - return true; - } - } + return future.via(runner()).thenValue([this, rowNum](auto&& vertices) { + // MemoryTrackerVerified + memory::MemoryCheckGuard guard; + + if (vertices.empty()) { + return false; + } + for (auto& meetVertex : vertices) { + if (!meetVertex.isVertex()) { + continue; + } + auto meetVid = meetVertex.getVertex().vid; + auto leftPaths = createLeftPath(rowNum, meetVid); + auto rightPaths = createRightPath(rowNum, meetVid, false); + for (auto& leftPath : leftPaths) { + for (auto& rightPath : rightPaths) { + Row path = leftPath; + auto& steps = path.values.back().mutableList().values; + steps.emplace_back(meetVertex); + steps.insert(steps.end(), rightPath.values.begin(), rightPath.values.end() - 1); + path.emplace_back(rightPath.values.back()); + resultDs_[rowNum].rows.emplace_back(std::move(path)); + if (singleShortest_) { + return true; } } - return true; - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); - }); + } + } + return true; + }); } std::vector SingleShortestPath::createLeftPath(size_t rowNum, const Value& meetVid) { diff --git a/src/graph/executor/algo/SubgraphExecutor.cpp b/src/graph/executor/algo/SubgraphExecutor.cpp index e51f4853c0c..cb3e9ca7634 100644 --- a/src/graph/executor/algo/SubgraphExecutor.cpp +++ b/src/graph/executor/algo/SubgraphExecutor.cpp @@ -53,6 +53,7 @@ folly::Future SubgraphExecutor::getNeighbors() { currentStep_ == 1 ? nullptr : subgraph_->tagFilter()) .via(runner()) .thenValue([this, getNbrTime](RpcResponse&& resp) mutable { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getNbrTime.elapsedInUSec())); auto& hostLatency = resp.hostLatency(); @@ -67,12 +68,6 @@ folly::Future SubgraphExecutor::getNeighbors() { } vids_.clear(); return handleResponse(std::move(resp)); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/logic/ArgumentExecutor.cpp b/src/graph/executor/logic/ArgumentExecutor.cpp index eab46383f17..1e74f3ba165 100644 --- a/src/graph/executor/logic/ArgumentExecutor.cpp +++ b/src/graph/executor/logic/ArgumentExecutor.cpp @@ -13,6 +13,7 @@ ArgumentExecutor::ArgumentExecutor(const PlanNode *node, QueryContext *qctx) : Executor("ArgumentExecutor", node, qctx) {} folly::Future ArgumentExecutor::execute() { + // MemoryTrackerVerified auto *argNode = asNode(node()); auto &alias = argNode->getAlias(); auto iter = ectx_->getResult(argNode->inputVar()).iter(); diff --git a/src/graph/executor/maintain/EdgeExecutor.cpp b/src/graph/executor/maintain/EdgeExecutor.cpp index e198624af6a..1c8a3151f44 100644 --- a/src/graph/executor/maintain/EdgeExecutor.cpp +++ b/src/graph/executor/maintain/EdgeExecutor.cpp @@ -21,18 +21,13 @@ folly::Future CreateEdgeExecutor::execute() { .via(runner()) .thenValue([ceNode, spaceId](StatusOr resp) { memory::MemoryCheckGuard guard; + // throw in MemoryCheckGuard verified if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Create edge `" << ceNode->getName() << "' failed: " << resp.status(); return resp.status(); } return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -47,6 +42,7 @@ folly::Future DescEdgeExecutor::execute() { .via(runner()) .thenValue([this, deNode, spaceId](StatusOr resp) { memory::MemoryCheckGuard guard; + // MemoryTrackerVerified if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Desc edge `" << deNode->getName() << "' failed: " << resp.status(); @@ -62,12 +58,6 @@ folly::Future DescEdgeExecutor::execute() { .value(Value(std::move(ret).value())) .iter(Iterator::Kind::kDefault) .build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -82,18 +72,13 @@ folly::Future DropEdgeExecutor::execute() { .via(runner()) .thenValue([deNode, spaceId](StatusOr resp) { memory::MemoryCheckGuard guard; + // MemoryTrackerVerified if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Drop edge `" << deNode->getName() << "' failed: " << resp.status(); return resp.status(); } return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -128,12 +113,6 @@ folly::Future ShowEdgesExecutor::execute() { .value(Value(std::move(dataSet))) .iter(Iterator::Kind::kDefault) .build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -161,12 +140,6 @@ folly::Future ShowCreateEdgeExecutor::execute() { } return finish( ResultBuilder().value(std::move(ret).value()).iter(Iterator::Kind::kDefault).build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -187,12 +160,6 @@ folly::Future AlterEdgeExecutor::execute() { return resp.status(); } return finish(ResultBuilder().value(Value()).iter(Iterator::Kind::kDefault).build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } } // namespace graph diff --git a/src/graph/executor/maintain/EdgeIndexExecutor.cpp b/src/graph/executor/maintain/EdgeIndexExecutor.cpp index 38d6a9f3087..00f1f2e9d20 100644 --- a/src/graph/executor/maintain/EdgeIndexExecutor.cpp +++ b/src/graph/executor/maintain/EdgeIndexExecutor.cpp @@ -27,6 +27,7 @@ folly::Future CreateEdgeIndexExecutor::execute() { .via(runner()) .thenValue([ceiNode, spaceId](StatusOr resp) { memory::MemoryCheckGuard guard; + // MemoryTrackerVerified if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Create index `" << ceiNode->getIndexName() << "' at edge: `" << ceiNode->getSchemaName() @@ -34,12 +35,6 @@ folly::Future CreateEdgeIndexExecutor::execute() { return resp.status(); } return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -60,12 +55,6 @@ folly::Future DropEdgeIndexExecutor::execute() { return resp.status(); } return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -94,12 +83,6 @@ folly::Future DescEdgeIndexExecutor::execute() { } return finish( ResultBuilder().value(std::move(ret).value()).iter(Iterator::Kind::kDefault).build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -127,12 +110,6 @@ folly::Future ShowCreateEdgeIndexExecutor::execute() { } return finish( ResultBuilder().value(std::move(ret).value()).iter(Iterator::Kind::kDefault).build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -190,12 +167,6 @@ folly::Future ShowEdgeIndexesExecutor::execute() { .value(Value(std::move(dataSet))) .iter(Iterator::Kind::kDefault) .build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -229,12 +200,6 @@ folly::Future ShowEdgeIndexStatusExecutor::execute() { .value(Value(std::move(dataSet))) .iter(Iterator::Kind::kDefault) .build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/maintain/FTIndexExecutor.cpp b/src/graph/executor/maintain/FTIndexExecutor.cpp index 06d08f91d07..2d174fcc025 100644 --- a/src/graph/executor/maintain/FTIndexExecutor.cpp +++ b/src/graph/executor/maintain/FTIndexExecutor.cpp @@ -26,12 +26,6 @@ folly::Future CreateFTIndexExecutor::execute() { return resp.status(); } return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -59,67 +53,51 @@ folly::Future DropFTIndexExecutor::execute() { LOG(WARNING) << "Drop fulltext index '" << inode->getName() << "' failed: " << ftRet; } return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } folly::Future ShowFTIndexesExecutor::execute() { SCOPED_TIMER(&execTime_); auto spaceId = qctx()->rctx()->session()->space().id; - return qctx() - ->getMetaClient() - ->listFTIndexes() - .via(runner()) - .thenValue( - [this, spaceId](StatusOr> resp) { - memory::MemoryCheckGuard guard; - if (!resp.ok()) { - LOG(WARNING) << "SpaceId: " << spaceId << ", Show fulltext indexes failed" - << resp.status(); - return resp.status(); - } + return qctx()->getMetaClient()->listFTIndexes().via(runner()).thenValue( + [this, spaceId](StatusOr> resp) { + memory::MemoryCheckGuard guard; + if (!resp.ok()) { + LOG(WARNING) << "SpaceId: " << spaceId << ", Show fulltext indexes failed" + << resp.status(); + return resp.status(); + } - auto indexes = std::move(resp).value(); - DataSet dataSet; - dataSet.colNames = {"Name", "Schema Type", "Schema Name", "Fields"}; - for (auto &index : indexes) { - if (index.second.get_space_id() != spaceId) { - continue; - } - auto shmId = index.second.get_depend_schema(); - auto isEdge = shmId.getType() == nebula::cpp2::SchemaID::Type::edge_type; - auto shmNameRet = - isEdge ? this->qctx_->schemaMng()->toEdgeName(spaceId, shmId.get_edge_type()) - : this->qctx_->schemaMng()->toTagName(spaceId, shmId.get_tag_id()); - if (!shmNameRet.ok()) { - LOG(WARNING) << "SpaceId: " << spaceId - << ", Get schema name failed: " << shmNameRet.status(); - return shmNameRet.status(); - } - std::string fields; - folly::join(", ", index.second.get_fields(), fields); - Row row; - row.values.emplace_back(index.first); - row.values.emplace_back(isEdge ? "Edge" : "Tag"); - row.values.emplace_back(std::move(shmNameRet).value()); - row.values.emplace_back(std::move(fields)); - dataSet.rows.emplace_back(std::move(row)); - } - return finish(ResultBuilder() - .value(Value(std::move(dataSet))) - .iter(Iterator::Kind::kDefault) - .build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); + auto indexes = std::move(resp).value(); + DataSet dataSet; + dataSet.colNames = {"Name", "Schema Type", "Schema Name", "Fields"}; + for (auto &index : indexes) { + if (index.second.get_space_id() != spaceId) { + continue; + } + auto shmId = index.second.get_depend_schema(); + auto isEdge = shmId.getType() == nebula::cpp2::SchemaID::Type::edge_type; + auto shmNameRet = + isEdge ? this->qctx_->schemaMng()->toEdgeName(spaceId, shmId.get_edge_type()) + : this->qctx_->schemaMng()->toTagName(spaceId, shmId.get_tag_id()); + if (!shmNameRet.ok()) { + LOG(WARNING) << "SpaceId: " << spaceId + << ", Get schema name failed: " << shmNameRet.status(); + return shmNameRet.status(); + } + std::string fields; + folly::join(", ", index.second.get_fields(), fields); + Row row; + row.values.emplace_back(index.first); + row.values.emplace_back(isEdge ? "Edge" : "Tag"); + row.values.emplace_back(std::move(shmNameRet).value()); + row.values.emplace_back(std::move(fields)); + dataSet.rows.emplace_back(std::move(row)); + } + return finish(ResultBuilder() + .value(Value(std::move(dataSet))) + .iter(Iterator::Kind::kDefault) + .build()); }); } diff --git a/src/graph/executor/maintain/TagExecutor.cpp b/src/graph/executor/maintain/TagExecutor.cpp index 5beb4c0b1b6..5dd58b2e6fa 100644 --- a/src/graph/executor/maintain/TagExecutor.cpp +++ b/src/graph/executor/maintain/TagExecutor.cpp @@ -27,12 +27,6 @@ folly::Future CreateTagExecutor::execute() { return resp.status(); } return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -60,12 +54,6 @@ folly::Future DescTagExecutor::execute() { } return finish( ResultBuilder().value(std::move(ret).value()).iter(Iterator::Kind::kDefault).build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -86,12 +74,6 @@ folly::Future DropTagExecutor::execute() { return resp.status(); } return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -99,11 +81,8 @@ folly::Future ShowTagsExecutor::execute() { SCOPED_TIMER(&execTime_); auto spaceId = qctx()->rctx()->session()->space().id; - return qctx() - ->getMetaClient() - ->listTagSchemas(spaceId) - .via(runner()) - .thenValue([this, spaceId](StatusOr> resp) { + return qctx()->getMetaClient()->listTagSchemas(spaceId).via(runner()).thenValue( + [this, spaceId](StatusOr> resp) { memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show tags failed: " << resp.status(); @@ -126,12 +105,6 @@ folly::Future ShowTagsExecutor::execute() { .value(Value(std::move(dataSet))) .iter(Iterator::Kind::kDefault) .build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -159,12 +132,6 @@ folly::Future ShowCreateTagExecutor::execute() { } return finish( ResultBuilder().value(std::move(ret).value()).iter(Iterator::Kind::kDefault).build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -185,12 +152,6 @@ folly::Future AlterTagExecutor::execute() { return resp.status(); } return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } } // namespace graph diff --git a/src/graph/executor/maintain/TagIndexExecutor.cpp b/src/graph/executor/maintain/TagIndexExecutor.cpp index ad14dcd3788..41909588cb3 100644 --- a/src/graph/executor/maintain/TagIndexExecutor.cpp +++ b/src/graph/executor/maintain/TagIndexExecutor.cpp @@ -34,12 +34,6 @@ folly::Future CreateTagIndexExecutor::execute() { return resp.status(); } return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -60,12 +54,6 @@ folly::Future DropTagIndexExecutor::execute() { return resp.status(); } return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -94,12 +82,6 @@ folly::Future DescTagIndexExecutor::execute() { } return finish( ResultBuilder().value(std::move(ret).value()).iter(Iterator::Kind::kDefault).build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -127,12 +109,6 @@ folly::Future ShowCreateTagIndexExecutor::execute() { } return finish( ResultBuilder().value(std::move(ret).value()).iter(Iterator::Kind::kDefault).build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -141,11 +117,8 @@ folly::Future ShowTagIndexesExecutor::execute() { auto *iNode = asNode(node()); const auto &bySchema = iNode->name(); auto spaceId = qctx()->rctx()->session()->space().id; - return qctx() - ->getMetaClient() - ->listTagIndexes(spaceId) - .via(runner()) - .thenValue([this, spaceId, bySchema](StatusOr> resp) { + return qctx()->getMetaClient()->listTagIndexes(spaceId).via(runner()).thenValue( + [this, spaceId, bySchema](StatusOr> resp) { memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show tag indexes failed" << resp.status(); @@ -191,12 +164,6 @@ folly::Future ShowTagIndexesExecutor::execute() { .value(Value(std::move(dataSet))) .iter(Iterator::Kind::kDefault) .build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -204,11 +171,8 @@ folly::Future ShowTagIndexStatusExecutor::execute() { SCOPED_TIMER(&execTime_); auto spaceId = qctx()->rctx()->session()->space().id; - return qctx() - ->getMetaClient() - ->listTagIndexStatus(spaceId) - .via(runner()) - .thenValue([this, spaceId](StatusOr> resp) { + return qctx()->getMetaClient()->listTagIndexStatus(spaceId).via(runner()).thenValue( + [this, spaceId](StatusOr> resp) { memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId @@ -230,12 +194,6 @@ folly::Future ShowTagIndexStatusExecutor::execute() { .value(Value(std::move(dataSet))) .iter(Iterator::Kind::kDefault) .build()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/mutate/DeleteExecutor.cpp b/src/graph/executor/mutate/DeleteExecutor.cpp index 71fee56ec86..15ef49a36b8 100644 --- a/src/graph/executor/mutate/DeleteExecutor.cpp +++ b/src/graph/executor/mutate/DeleteExecutor.cpp @@ -73,12 +73,6 @@ folly::Future DeleteVerticesExecutor::deleteVertices() { SCOPED_TIMER(&execTime_); NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -135,12 +129,6 @@ folly::Future DeleteTagsExecutor::deleteTags() { SCOPED_TIMER(&execTime_); NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -228,12 +216,6 @@ folly::Future DeleteEdgesExecutor::deleteEdges() { SCOPED_TIMER(&execTime_); NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } } // namespace graph diff --git a/src/graph/executor/mutate/InsertExecutor.cpp b/src/graph/executor/mutate/InsertExecutor.cpp index f41e867b39e..7fe31435d65 100644 --- a/src/graph/executor/mutate/InsertExecutor.cpp +++ b/src/graph/executor/mutate/InsertExecutor.cpp @@ -40,12 +40,6 @@ folly::Future InsertVerticesExecutor::insertVertices() { SCOPED_TIMER(&execTime_); NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -77,12 +71,6 @@ folly::Future InsertEdgesExecutor::insertEdges() { SCOPED_TIMER(&execTime_); NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } } // namespace graph diff --git a/src/graph/executor/mutate/UpdateExecutor.cpp b/src/graph/executor/mutate/UpdateExecutor.cpp index 89d6519cefe..3b820ac471a 100644 --- a/src/graph/executor/mutate/UpdateExecutor.cpp +++ b/src/graph/executor/mutate/UpdateExecutor.cpp @@ -82,12 +82,6 @@ folly::Future UpdateVertexExecutor::execute() { .build()); } return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -140,12 +134,6 @@ folly::Future UpdateEdgeExecutor::execute() { .build()); } return Status::OK(); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } } // namespace graph diff --git a/src/graph/executor/query/AggregateExecutor.cpp b/src/graph/executor/query/AggregateExecutor.cpp index 5c9fb046603..d9f8b5b5e30 100644 --- a/src/graph/executor/query/AggregateExecutor.cpp +++ b/src/graph/executor/query/AggregateExecutor.cpp @@ -11,6 +11,7 @@ namespace graph { folly::Future AggregateExecutor::execute() { SCOPED_TIMER(&execTime_); + // MemoryTrackerVerified auto* agg = asNode(node()); auto groupKeys = agg->groupKeys(); auto groupItems = agg->groupItems(); diff --git a/src/graph/executor/query/AppendVerticesExecutor.cpp b/src/graph/executor/query/AppendVerticesExecutor.cpp index 02bcd7bf81c..ae420b28ad4 100644 --- a/src/graph/executor/query/AppendVerticesExecutor.cpp +++ b/src/graph/executor/query/AppendVerticesExecutor.cpp @@ -61,6 +61,7 @@ folly::Future AppendVerticesExecutor::appendVertices() { otherStats_.emplace("total_rpc", folly::sformat("{}(us)", getPropsTime.elapsedInUSec())); }) .thenValue([this](StorageRpcResponse &&rpcResp) { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); addStats(rpcResp, otherStats_); @@ -69,12 +70,6 @@ folly::Future AppendVerticesExecutor::appendVertices() { } else { return handleRespMultiJobs(std::move(rpcResp)); } - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/query/DedupExecutor.cpp b/src/graph/executor/query/DedupExecutor.cpp index 8e106f3537c..f569545fbe4 100644 --- a/src/graph/executor/query/DedupExecutor.cpp +++ b/src/graph/executor/query/DedupExecutor.cpp @@ -10,7 +10,9 @@ namespace nebula { namespace graph { folly::Future DedupExecutor::execute() { + // MemoryTrackerVerified SCOPED_TIMER(&execTime_); + auto* dedup = asNode(node()); DCHECK(!dedup->inputVar().empty()); Result result = ectx_->getResult(dedup->inputVar()); diff --git a/src/graph/executor/query/FilterExecutor.cpp b/src/graph/executor/query/FilterExecutor.cpp index 4f2e7334326..1b7a5f39fad 100644 --- a/src/graph/executor/query/FilterExecutor.cpp +++ b/src/graph/executor/query/FilterExecutor.cpp @@ -29,11 +29,14 @@ folly::Future FilterExecutor::execute() { ds.rows.reserve(iter->size()); auto scatter = [this]( size_t begin, size_t end, Iterator *tmpIter) mutable -> StatusOr { + // MemoryTrackerVerified + DCHECK(memory::MemoryTracker::isOn()) << "MemoryTracker is off"; return handleJob(begin, end, tmpIter); }; auto gather = [this, result = std::move(ds), kind = iter->kind()](auto &&results) mutable -> Status { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; for (auto &r : results) { if (!r.ok()) { diff --git a/src/graph/executor/query/GetDstBySrcExecutor.cpp b/src/graph/executor/query/GetDstBySrcExecutor.cpp index e694d43971f..5635d8106ef 100644 --- a/src/graph/executor/query/GetDstBySrcExecutor.cpp +++ b/src/graph/executor/query/GetDstBySrcExecutor.cpp @@ -61,12 +61,6 @@ folly::Future GetDstBySrcExecutor::execute() { otherStats_.emplace(folly::sformat("resp[{}]", i), folly::toPrettyJson(info)); } return handleResponse(resp, this->gd_->colNames()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/query/GetEdgesExecutor.cpp b/src/graph/executor/query/GetEdgesExecutor.cpp index 98306c39d51..eade60d9e2b 100644 --- a/src/graph/executor/query/GetEdgesExecutor.cpp +++ b/src/graph/executor/query/GetEdgesExecutor.cpp @@ -105,12 +105,6 @@ folly::Future GetEdgesExecutor::getEdges() { SCOPED_TIMER(&execTime_); addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp), ge->colNames()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/query/GetNeighborsExecutor.cpp b/src/graph/executor/query/GetNeighborsExecutor.cpp index 0b69967c41e..ae50cc283e8 100644 --- a/src/graph/executor/query/GetNeighborsExecutor.cpp +++ b/src/graph/executor/query/GetNeighborsExecutor.cpp @@ -75,12 +75,6 @@ folly::Future GetNeighborsExecutor::execute() { otherStats_.emplace(folly::sformat("resp[{}]", i), folly::toPrettyJson(info)); } return handleResponse(resp); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/query/GetVerticesExecutor.cpp b/src/graph/executor/query/GetVerticesExecutor.cpp index 3906d3c564e..0d250044d9a 100644 --- a/src/graph/executor/query/GetVerticesExecutor.cpp +++ b/src/graph/executor/query/GetVerticesExecutor.cpp @@ -55,12 +55,6 @@ folly::Future GetVerticesExecutor::getVertices() { SCOPED_TIMER(&execTime_); addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp), gv->colNames()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/query/IndexScanExecutor.cpp b/src/graph/executor/query/IndexScanExecutor.cpp index e2bd6a65d9f..8bbfa8addd5 100644 --- a/src/graph/executor/query/IndexScanExecutor.cpp +++ b/src/graph/executor/query/IndexScanExecutor.cpp @@ -42,15 +42,10 @@ folly::Future IndexScanExecutor::indexScan() { lookup->limit(qctx_)) .via(runner()) .thenValue([this](StorageRpcResponse &&rpcResp) { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp)); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/query/ProjectExecutor.cpp b/src/graph/executor/query/ProjectExecutor.cpp index 3f54a1daa6b..1fd85a08e64 100644 --- a/src/graph/executor/query/ProjectExecutor.cpp +++ b/src/graph/executor/query/ProjectExecutor.cpp @@ -11,6 +11,7 @@ namespace nebula { namespace graph { folly::Future ProjectExecutor::execute() { + // throw std::bad_alloc in MemoryCheckGuard verified SCOPED_TIMER(&execTime_); auto *project = asNode(node()); auto iter = ectx_->getResult(project->inputVar()).iter(); diff --git a/src/graph/executor/query/ScanEdgesExecutor.cpp b/src/graph/executor/query/ScanEdgesExecutor.cpp index 4262ead8b72..769567620b2 100644 --- a/src/graph/executor/query/ScanEdgesExecutor.cpp +++ b/src/graph/executor/query/ScanEdgesExecutor.cpp @@ -45,12 +45,6 @@ folly::Future ScanEdgesExecutor::scanEdges() { SCOPED_TIMER(&execTime_); addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp), {}); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/query/ScanVerticesExecutor.cpp b/src/graph/executor/query/ScanVerticesExecutor.cpp index 9cd80e7011f..ba15ced7412 100644 --- a/src/graph/executor/query/ScanVerticesExecutor.cpp +++ b/src/graph/executor/query/ScanVerticesExecutor.cpp @@ -46,12 +46,6 @@ folly::Future ScanVerticesExecutor::scanVertices() { SCOPED_TIMER(&execTime_); addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp), sv->colNames()); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception &e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/query/TraverseExecutor.cpp b/src/graph/executor/query/TraverseExecutor.cpp index 298c4eb7463..850820bf1fa 100644 --- a/src/graph/executor/query/TraverseExecutor.cpp +++ b/src/graph/executor/query/TraverseExecutor.cpp @@ -5,6 +5,7 @@ #include "graph/executor/query/TraverseExecutor.h" #include "clients/storage/StorageClient.h" +#include "common/memory/MemoryTracker.h" #include "graph/service/GraphFlags.h" #include "graph/util/SchemaUtil.h" #include "graph/util/Utils.h" @@ -100,17 +101,12 @@ folly::Future TraverseExecutor::getNeighbors() { currentStep_ == 1 ? traverse_->tagFilter() : nullptr) .via(runner()) .thenValue([this, getNbrTime](StorageRpcResponse&& resp) mutable { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; vids_.clear(); SCOPED_TIMER(&execTime_); addStats(resp, getNbrTime.elapsedInUSec()); return handleResponse(std::move(resp)); - }) - .thenError( - folly::tag_t{}, - [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -302,6 +298,9 @@ folly::Future TraverseExecutor::buildPathMultiJobs(size_t minStep, size_ auto scatter = [this, minStep, maxStep]( size_t begin, size_t end, Iterator* tmpIter) mutable -> std::vector { + // outside caller should already turn on throwOnMemoryExceeded + DCHECK(memory::MemoryTracker::isOn()) << "MemoryTracker is off"; + // MemoryTrackerVerified std::vector rows; for (; tmpIter->valid() && begin++ < end; tmpIter->next()) { auto& initVertex = tmpIter->getColumn(0); @@ -316,6 +315,7 @@ folly::Future TraverseExecutor::buildPathMultiJobs(size_t minStep, size_ }; auto gather = [this](std::vector> resp) mutable -> Status { + // MemoryTrackerVerified memory::MemoryCheckGuard guard; for (auto& rows : resp) { if (rows.empty()) { diff --git a/src/graph/scheduler/AsyncMsgNotifyBasedScheduler.cpp b/src/graph/scheduler/AsyncMsgNotifyBasedScheduler.cpp index 8e4f17f171e..d2d121378bf 100644 --- a/src/graph/scheduler/AsyncMsgNotifyBasedScheduler.cpp +++ b/src/graph/scheduler/AsyncMsgNotifyBasedScheduler.cpp @@ -16,15 +16,15 @@ AsyncMsgNotifyBasedScheduler::AsyncMsgNotifyBasedScheduler(QueryContext* qctx) : } folly::Future AsyncMsgNotifyBasedScheduler::schedule() { - auto root = qctx_->plan()->root(); - if (FLAGS_enable_lifetime_optimize) { - // special for root - root->outputVarPtr()->userCount.store(std::numeric_limits::max(), - std::memory_order_relaxed); - analyzeLifetime(root); - } - auto executor = Executor::create(root, qctx_); - return doSchedule(executor); + auto root = qctx_->plan()->root(); + if (FLAGS_enable_lifetime_optimize) { + // special for root + root->outputVarPtr()->userCount.store(std::numeric_limits::max(), + std::memory_order_relaxed); + analyzeLifetime(root); + } + auto executor = Executor::create(root, qctx_); + return doSchedule(executor); } folly::Future AsyncMsgNotifyBasedScheduler::doSchedule(Executor* root) const { @@ -82,6 +82,14 @@ folly::Future AsyncMsgNotifyBasedScheduler::doSchedule(Executor* root) c auto currentExePromises = std::move(currentPromisesFound->second); scheduleExecutor(std::move(currentExeFutures), exe, runner) + // This is the root catch of bad_alloc for Executors, + // all chained returned future is checked here + .thenError( + folly::tag_t{}, + [](const std::bad_alloc&) { + return folly::makeFuture(Status::GraphMemoryExceeded( + "(%d)", static_cast(nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED))); + }) .thenTry([this, pros = std::move(currentExePromises)](auto&& t) mutable { // any exception or status not ok handled with notifyError if (t.hasException()) { @@ -135,6 +143,7 @@ folly::Future AsyncMsgNotifyBasedScheduler::runSelect( return execute(select); }) .thenValue([select, this](auto&& selectStatus) mutable -> folly::Future { + memory::MemoryCheckGuard guard; NG_RETURN_IF_ERROR(selectStatus); auto val = qctx_->ectx()->getValue(select->node()->outputVar()); if (!val.isBool()) { @@ -147,45 +156,22 @@ folly::Future AsyncMsgNotifyBasedScheduler::runSelect( return doSchedule(select->thenBody()); } return doSchedule(select->elseBody()); - }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(Executor::memoryExceededStatus()); - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } folly::Future AsyncMsgNotifyBasedScheduler::runExecutor( std::vector>&& futures, Executor* exe, folly::Executor* runner) const { - return folly::collect(futures) - .via(runner) - .thenValue([exe, this](auto&& t) mutable -> folly::Future { + return folly::collect(futures).via(runner).thenValue( + [exe, this](auto&& t) mutable -> folly::Future { NG_RETURN_IF_ERROR(checkStatus(std::move(t))); // Execute in current thread. return execute(exe); - }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(Executor::memoryExceededStatus()); - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } folly::Future AsyncMsgNotifyBasedScheduler::runLeafExecutor(Executor* exe, folly::Executor* runner) const { - return std::move(execute(exe)) - .via(runner) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(Executor::memoryExceededStatus()); - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); - }); + return std::move(execute(exe)).via(runner); } folly::Future AsyncMsgNotifyBasedScheduler::runLoop( @@ -199,6 +185,7 @@ folly::Future AsyncMsgNotifyBasedScheduler::runLoop( return execute(loop); }) .thenValue([loop, runner, this](auto&& loopStatus) mutable -> folly::Future { + memory::MemoryCheckGuard guard; NG_RETURN_IF_ERROR(loopStatus); auto val = qctx_->ectx()->getValue(loop->node()->outputVar()); if (!val.isBool()) { @@ -212,13 +199,6 @@ folly::Future AsyncMsgNotifyBasedScheduler::runLoop( std::vector> fs; fs.emplace_back(doSchedule(loop->loopBody())); return runLoop(std::move(fs), loop, runner); - }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(Executor::memoryExceededStatus()); - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -245,22 +225,16 @@ void AsyncMsgNotifyBasedScheduler::notifyError(std::vector AsyncMsgNotifyBasedScheduler::execute(Executor* executor) const { + memory::MemoryCheckGuard guard1; auto status = executor->open(); if (!status.ok()) { return executor->error(std::move(status)); } - return executor->execute() - .thenValue([executor](Status s) { - NG_RETURN_IF_ERROR(s); - return executor->close(); - }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(Executor::memoryExceededStatus()); - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); - }); + return executor->execute().thenValue([executor](Status s) { + memory::MemoryCheckGuard guard2; + NG_RETURN_IF_ERROR(s); + return executor->close(); + }); } } // namespace graph diff --git a/src/graph/service/QueryInstance.cpp b/src/graph/service/QueryInstance.cpp index 63bd78b7714..9e4d9f6406c 100644 --- a/src/graph/service/QueryInstance.cpp +++ b/src/graph/service/QueryInstance.cpp @@ -67,7 +67,8 @@ void QueryInstance::execute() { .thenError(folly::tag_t{}, [this](const std::exception &e) { onError(Status::Error("%s", e.what())); }); } catch (std::bad_alloc &e) { - onError(Executor::memoryExceededStatus()); + onError(Status::GraphMemoryExceeded( + "(%d)", static_cast(nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED))); } catch (std::exception &e) { onError(Status::Error("%s", e.what())); } catch (...) { @@ -191,6 +192,8 @@ void QueryInstance::onError(Status status) { case Status::Code::kUserNotFound: case Status::Code::kListenerNotFound: case Status::Code::kSessionNotFound: + case Status::Code::kGraphMemoryExceeded: + case Status::Code::kStorageMemoryExceeded: rctx->resp().errorCode = ErrorCode::E_EXECUTION_ERROR; break; } diff --git a/src/kvstore/listener/elasticsearch/ESListener.cpp b/src/kvstore/listener/elasticsearch/ESListener.cpp index cdc4fe5205d..66927c8c5da 100644 --- a/src/kvstore/listener/elasticsearch/ESListener.cpp +++ b/src/kvstore/listener/elasticsearch/ESListener.cpp @@ -95,6 +95,9 @@ void ESListener::pickTagAndEdgeData(BatchLogType type, } auto field = index.second.get_fields().front(); auto v = reader->getValueByName(field); + if (v.type() == Value::Type::NULLVALUE) { + continue; + } if (v.type() != Value::Type::STRING) { LOG(ERROR) << "Can't create fulltext index on type " << v.type(); } @@ -125,6 +128,9 @@ void ESListener::pickTagAndEdgeData(BatchLogType type, } auto field = index.second.get_fields().front(); auto v = reader->getValueByName(field); + if (v.type() == Value::Type::NULLVALUE) { + continue; + } if (v.type() != Value::Type::STRING) { LOG(ERROR) << "Can't create fulltext index on type " << v.type(); } diff --git a/src/kvstore/listener/test/CMakeLists.txt b/src/kvstore/listener/test/CMakeLists.txt index 1041cfd17e9..09ce4aba2ae 100644 --- a/src/kvstore/listener/test/CMakeLists.txt +++ b/src/kvstore/listener/test/CMakeLists.txt @@ -23,6 +23,7 @@ set(LISTENER_TEST_LIBS $ $ $ + $ $ $ $ diff --git a/src/kvstore/raftex/test/CMakeLists.txt b/src/kvstore/raftex/test/CMakeLists.txt index fc47919be15..4d0d61e8d36 100644 --- a/src/kvstore/raftex/test/CMakeLists.txt +++ b/src/kvstore/raftex/test/CMakeLists.txt @@ -14,6 +14,7 @@ set(RAFTEX_TEST_LIBS $ $ $ + $ $ $ $ diff --git a/src/kvstore/test/CMakeLists.txt b/src/kvstore/test/CMakeLists.txt index 5db2e891e36..129995acbfb 100644 --- a/src/kvstore/test/CMakeLists.txt +++ b/src/kvstore/test/CMakeLists.txt @@ -23,6 +23,7 @@ set(KVSTORE_TEST_LIBS $ $ $ + $ $ $ $ diff --git a/src/kvstore/wal/test/CMakeLists.txt b/src/kvstore/wal/test/CMakeLists.txt index bb4c77976d6..263afd373c3 100644 --- a/src/kvstore/wal/test/CMakeLists.txt +++ b/src/kvstore/wal/test/CMakeLists.txt @@ -10,6 +10,7 @@ set(WAL_TEST_LIBS $ $ $ + $ $ $ $ diff --git a/src/meta/processors/job/StorageJobExecutor.cpp b/src/meta/processors/job/StorageJobExecutor.cpp index 122f3e8b9c1..1934c7aedec 100644 --- a/src/meta/processors/job/StorageJobExecutor.cpp +++ b/src/meta/processors/job/StorageJobExecutor.cpp @@ -162,6 +162,7 @@ folly::Future StorageJobExecutor::execute() { // write all tasks first. std::vector data; + std::vector allTasks; for (auto i = 0U; i != addresses.size(); ++i) { TaskDescription task(space_, jobId_, i, addresses[i].first); auto taskKey = MetaKeyUtils::taskKey(task.getSpace(), task.getJobId(), task.getTaskId()); @@ -170,6 +171,7 @@ folly::Future StorageJobExecutor::execute() { task.getStartTime(), task.getStopTime(), task.getErrorCode()); + allTasks.emplace_back(std::move(task)); data.emplace_back(std::move(taskKey), std::move(taskVal)); } @@ -187,24 +189,59 @@ folly::Future StorageJobExecutor::execute() { } std::vector> futures; + futures.reserve(addresses.size()); for (auto& address : addresses) { // Will convert StorageAddr to AdminAddr in AdminClient futures.emplace_back(executeInternal(std::move(address.first), std::move(address.second))); } auto tries = folly::collectAll(std::move(futures)).get(); - for (auto& t : tries) { - if (t.hasException()) { - LOG(INFO) << t.exception().what(); + std::vector failedTasks; + for (size_t i = 0; i < tries.size(); i++) { + auto getFaildTask = [&](size_t taskId, nebula::cpp2::ErrorCode ec) { + auto task = allTasks[taskId]; + task.setStatus(cpp2::JobStatus::FAILED); + task.setErrorCode(ec); + return task; + }; + // taks id have the same index with address and futures. + if (tries[i].hasException()) { + LOG(INFO) << tries[i].exception().what(); rc = nebula::cpp2::ErrorCode::E_RPC_FAILURE; + failedTasks.emplace_back(getFaildTask(i, rc)); continue; } - if (!t.value().ok()) { - LOG(INFO) << t.value().toString(); + if (!tries[i].value().ok()) { + LOG(INFO) << tries[i].value().toString(); rc = nebula::cpp2::ErrorCode::E_RPC_FAILURE; + failedTasks.emplace_back(getFaildTask(i, rc)); continue; } } + + if (!failedTasks.empty()) { + // write all tasks first. + std::vector faildKV; + for (auto task : failedTasks) { + auto taskKey = MetaKeyUtils::taskKey(task.getSpace(), task.getJobId(), task.getTaskId()); + auto taskVal = MetaKeyUtils::taskVal(task.getHost(), + task.getStatus(), + task.getStartTime(), + task.getStopTime(), + task.getErrorCode()); + faildKV.emplace_back(std::move(taskKey), std::move(taskVal)); + } + + kvstore_->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(faildKV), [&](nebula::cpp2::ErrorCode code) { + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(INFO) << "Some task is failed, but failed to set task status due to kv error:" + << apache::thrift::util::enumNameSafe(code); + } + baton.post(); + }); + baton.wait(); + } return rc; } diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index a6fedde4a68..cbc40b42b17 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -35,7 +35,8 @@ static constexpr size_t MAX_STRING = 4096; %x LB_STR %x COMMENT -blank_without_newline ([ \t\r\xa0]) +nbsp (\xc2\xa0) +blank_without_newline ([ \t\r]|{nbsp}) blank ({blank_without_newline}|[\n]) blanks ({blank}+) @@ -57,17 +58,19 @@ HEX ([0-9a-fA-F]) OCT ([0-7]) IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5]) -U [\x80-\xbf] -U2 [\xc2-\xdf] +U [\x80-\x9f\xa1-\xbf] +UA0 \xa0 +U2 [\xc3-\xdf] +UC2 \xc2 U3 [\xe0-\xee] U4 [\xf0-\xf4] -CHINESE {U2}{U}|{U3}{U}{U}|{U4}{U}{U}{U} +CHINESE {U2}{UA0}|{UC2}{U}|{U2}{U}|{U3}{U}{U}|{U4}{U}{U}{U} CN_EN {CHINESE}|[a-zA-Z] CN_EN_NUM {CHINESE}|[_a-zA-Z0-9] LABEL {CN_EN}{CN_EN_NUM}* U3_FULL_WIDTH [\xe0-\xef] -CHINESE_FULL_WIDTH {U2}{U}|{U3_FULL_WIDTH}{U}{U}|{U4}{U}{U}{U} +CHINESE_FULL_WIDTH {U2}{UA0}|{UC2}{U}|{U2}{U}|{U3_FULL_WIDTH}{U}{U}|{U4}{U}{U}{U} CN_EN_FULL_WIDTH {CHINESE_FULL_WIDTH}|[a-zA-Z] CN_EN_NUM_FULL_WIDTH {CHINESE_FULL_WIDTH}|[_a-zA-Z0-9 ] LABEL_FULL_WIDTH {CN_EN_FULL_WIDTH}{CN_EN_NUM_FULL_WIDTH}* diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index 0d25a607c65..9bc976cc1a0 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -3350,4 +3350,18 @@ TEST_F(ParserTest, TestShowSentenceWithPipe) { ASSERT_TRUE(result.ok()) << result.status(); } } + +TEST_F(ParserTest, TestSpecialWhiteSpaceChar) { + { + std::string query = "SHOW\xC2\xA0SPACES"; + auto result = parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + std::string query = "SHOW \xC2\xA0SPACES\xC2\xA0"; + auto result = parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } +} + } // namespace nebula diff --git a/src/parser/test/ScannerTest.cpp b/src/parser/test/ScannerTest.cpp index f1b0675166e..6e6c4a32e51 100644 --- a/src/parser/test/ScannerTest.cpp +++ b/src/parser/test/ScannerTest.cpp @@ -533,7 +533,7 @@ TEST(Scanner, Basic) { CHECK_SEMANTIC_VALUE("label", TokenType::LABEL, "label"), CHECK_SEMANTIC_VALUE("label123", TokenType::LABEL, "label123"), // \xA0 is white space in UTF-8 too - CHECK_SEMANTIC_VALUE("\xA0" + CHECK_SEMANTIC_VALUE("\xC2\xA0" "abc", TokenType::LABEL, "abc"), diff --git a/src/storage/BaseProcessor.h b/src/storage/BaseProcessor.h index 8dd2975cf7a..b94fc723f08 100644 --- a/src/storage/BaseProcessor.h +++ b/src/storage/BaseProcessor.h @@ -30,11 +30,14 @@ class BaseProcessor { virtual ~BaseProcessor() = default; + void memoryExceeded() { + memoryExceeded_ = true; + } + folly::Future getFuture() { return promise_.getFuture(); } - protected: virtual void onFinished() { if (counters_) { stats::StatsManager::addValue(counters_->numCalls_); @@ -83,6 +86,7 @@ class BaseProcessor { delete this; } + protected: nebula::cpp2::ErrorCode getSpaceVidLen(GraphSpaceID spaceId) { auto len = this->env_->schemaMan_->getSpaceVidLen(spaceId); if (!len.ok()) { @@ -160,6 +164,31 @@ class BaseProcessor { bool memoryExceeded_{false}; }; +/// Helper class wrap the passed in Func in a MemoryTracker turned on scope. +template +struct MemoryCheckScope { + MemoryCheckScope(BaseProcessor* processor, Func f) + : processor_(processor), f_(std::move(f)) {} + + ~MemoryCheckScope() { + memory::MemoryCheckGuard guard; + try { + f_(); + } catch (std::bad_alloc& e) { + processor_->memoryExceeded(); + processor_->onError(); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + processor_->onError(); + } catch (...) { + processor_->onError(); + } + } + + BaseProcessor* processor_; + Func f_; +}; + } // namespace storage } // namespace nebula diff --git a/src/storage/GraphStorageServiceHandler.cpp b/src/storage/GraphStorageServiceHandler.cpp index d9e6688f0c0..7d0d18971d5 100644 --- a/src/storage/GraphStorageServiceHandler.cpp +++ b/src/storage/GraphStorageServiceHandler.cpp @@ -26,10 +26,25 @@ #include "storage/transaction/ChainDeleteEdgesGroupProcessor.h" #include "storage/transaction/ChainUpdateEdgeLocalProcessor.h" +// Processor::process's root memory check is turn on here. +// if the call stack in current thread, +// Processors DO NOT NEED handle error in their logic. +// else (do some work in another thread) +// Processors need handle error in that thread by itself #define RETURN_FUTURE(processor) \ memory::MemoryCheckGuard guard; \ auto f = processor->getFuture(); \ - processor->process(req); \ + try { \ + processor->process(req); \ + } catch (std::bad_alloc & e) { \ + processor->memoryExceeded(); \ + processor->onError(); \ + } catch (std::exception & e) { \ + LOG(ERROR) << e.what(); \ + processor->onError(); \ + } catch (...) { \ + processor->onError(); \ + } \ return f; namespace nebula { diff --git a/src/storage/index/LookupProcessor.cpp b/src/storage/index/LookupProcessor.cpp index 12c2e239cca..975cc348794 100644 --- a/src/storage/index/LookupProcessor.cpp +++ b/src/storage/index/LookupProcessor.cpp @@ -28,20 +28,11 @@ ProcessorCounters kLookupCounters; // print Plan for debug inline void printPlan(IndexNode* node, int tab = 0); void LookupProcessor::process(const cpp2::LookupIndexRequest& req) { - try { - if (executor_ != nullptr) { - executor_->add([req, this]() { this->doProcess(req); }); - } else { - doProcess(req); - } - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); + if (executor_ != nullptr) { + executor_->add( + [this, req]() { MemoryCheckScope wrapper(this, [this, req] { this->doProcess(req); }); }); + } else { + doProcess(req); } } diff --git a/src/storage/kv/GetProcessor.cpp b/src/storage/kv/GetProcessor.cpp index 22f507cec94..97a51efc1d4 100644 --- a/src/storage/kv/GetProcessor.cpp +++ b/src/storage/kv/GetProcessor.cpp @@ -13,53 +13,42 @@ namespace storage { ProcessorCounters kGetCounters; void GetProcessor::process(const cpp2::KVGetRequest& req) { - try { - CHECK_NOTNULL(env_->kvstore_); - GraphSpaceID spaceId = req.get_space_id(); - bool returnPartly = req.get_return_partly(); - - std::unordered_map pairs; - size_t size = 0; - for (auto& part : req.get_parts()) { - size += part.second.size(); - } - pairs.reserve(size); - - for (auto& part : req.get_parts()) { - auto partId = part.first; - auto& keys = part.second; - std::vector kvKeys; - kvKeys.reserve(part.second.size()); - std::transform( - keys.begin(), keys.end(), std::back_inserter(kvKeys), [partId](const auto& key) { - return NebulaKeyUtils::kvKey(partId, key); - }); - std::vector values; - auto ret = env_->kvstore_->multiGet(spaceId, partId, kvKeys, &values); - if ((ret.first == nebula::cpp2::ErrorCode::SUCCEEDED) || - (ret.first == nebula::cpp2::ErrorCode::E_PARTIAL_RESULT && returnPartly)) { - auto& status = ret.second; - for (size_t i = 0; i < kvKeys.size(); i++) { - if (status[i].ok()) { - pairs.emplace(keys[i], values[i]); - } + CHECK_NOTNULL(env_->kvstore_); + GraphSpaceID spaceId = req.get_space_id(); + bool returnPartly = req.get_return_partly(); + + std::unordered_map pairs; + size_t size = 0; + for (auto& part : req.get_parts()) { + size += part.second.size(); + } + pairs.reserve(size); + + for (auto& part : req.get_parts()) { + auto partId = part.first; + auto& keys = part.second; + std::vector kvKeys; + kvKeys.reserve(part.second.size()); + std::transform(keys.begin(), keys.end(), std::back_inserter(kvKeys), [partId](const auto& key) { + return NebulaKeyUtils::kvKey(partId, key); + }); + std::vector values; + auto ret = env_->kvstore_->multiGet(spaceId, partId, kvKeys, &values); + if ((ret.first == nebula::cpp2::ErrorCode::SUCCEEDED) || + (ret.first == nebula::cpp2::ErrorCode::E_PARTIAL_RESULT && returnPartly)) { + auto& status = ret.second; + for (size_t i = 0; i < kvKeys.size(); i++) { + if (status[i].ok()) { + pairs.emplace(keys[i], values[i]); } - } else { - handleErrorCode(ret.first, spaceId, partId); } + } else { + handleErrorCode(ret.first, spaceId, partId); } - - resp_.key_values_ref() = std::move(pairs); - this->onFinished(); - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); } + + resp_.key_values_ref() = std::move(pairs); + this->onFinished(); } } // namespace storage diff --git a/src/storage/kv/PutProcessor.cpp b/src/storage/kv/PutProcessor.cpp index 4f97feada50..24ef9a9dd25 100644 --- a/src/storage/kv/PutProcessor.cpp +++ b/src/storage/kv/PutProcessor.cpp @@ -13,29 +13,19 @@ namespace storage { ProcessorCounters kPutCounters; void PutProcessor::process(const cpp2::KVPutRequest& req) { - try { - CHECK_NOTNULL(env_->kvstore_); - const auto& pairs = req.get_parts(); - auto space = req.get_space_id(); - callingNum_ = pairs.size(); + CHECK_NOTNULL(env_->kvstore_); + const auto& pairs = req.get_parts(); + auto space = req.get_space_id(); + callingNum_ = pairs.size(); - std::for_each(pairs.begin(), pairs.end(), [&](auto& value) { - auto part = value.first; - std::vector data; - for (auto& pair : value.second) { - data.emplace_back(std::move(NebulaKeyUtils::kvKey(part, pair.key)), std::move(pair.value)); - } - doPut(space, part, std::move(data)); - }); - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); - } + std::for_each(pairs.begin(), pairs.end(), [&](auto& value) { + auto part = value.first; + std::vector data; + for (auto& pair : value.second) { + data.emplace_back(std::move(NebulaKeyUtils::kvKey(part, pair.key)), std::move(pair.value)); + } + doPut(space, part, std::move(data)); + }); } } // namespace storage diff --git a/src/storage/kv/RemoveProcessor.cpp b/src/storage/kv/RemoveProcessor.cpp index f074de0b123..3588e5b95e6 100644 --- a/src/storage/kv/RemoveProcessor.cpp +++ b/src/storage/kv/RemoveProcessor.cpp @@ -13,29 +13,19 @@ namespace storage { ProcessorCounters kRemoveCounters; void RemoveProcessor::process(const cpp2::KVRemoveRequest& req) { - try { - CHECK_NOTNULL(env_->kvstore_); - const auto& pairs = req.get_parts(); - auto space = req.get_space_id(); - callingNum_ = pairs.size(); + CHECK_NOTNULL(env_->kvstore_); + const auto& pairs = req.get_parts(); + auto space = req.get_space_id(); + callingNum_ = pairs.size(); - std::for_each(pairs.begin(), pairs.end(), [&](auto& value) { - auto part = value.first; - std::vector keys; - for (auto& key : value.second) { - keys.emplace_back(std::move(NebulaKeyUtils::kvKey(part, key))); - } - doRemove(space, part, std::move(keys)); - }); - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); - } + std::for_each(pairs.begin(), pairs.end(), [&](auto& value) { + auto part = value.first; + std::vector keys; + for (auto& key : value.second) { + keys.emplace_back(std::move(NebulaKeyUtils::kvKey(part, key))); + } + doRemove(space, part, std::move(keys)); + }); } } // namespace storage diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index 47ca51a33cc..9bd0a3d21ab 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -21,53 +21,43 @@ namespace storage { ProcessorCounters kAddEdgesCounters; void AddEdgesProcessor::process(const cpp2::AddEdgesRequest& req) { - try { - spaceId_ = req.get_space_id(); - ifNotExists_ = req.get_if_not_exists(); - const auto& partEdges = req.get_parts(); - - CHECK_NOTNULL(env_->schemaMan_); - auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); - if (!ret.ok()) { - LOG(ERROR) << ret.status(); - for (auto& part : partEdges) { - pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); - } - onFinished(); - return; + spaceId_ = req.get_space_id(); + ifNotExists_ = req.get_if_not_exists(); + const auto& partEdges = req.get_parts(); + + CHECK_NOTNULL(env_->schemaMan_); + auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); + if (!ret.ok()) { + LOG(ERROR) << ret.status(); + for (auto& part : partEdges) { + pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); } + onFinished(); + return; + } - spaceVidLen_ = ret.value(); - callingNum_ = partEdges.size(); + spaceVidLen_ = ret.value(); + callingNum_ = partEdges.size(); - CHECK_NOTNULL(env_->indexMan_); - auto iRet = env_->indexMan_->getEdgeIndexes(spaceId_); - if (!iRet.ok()) { - LOG(ERROR) << iRet.status(); - for (auto& part : partEdges) { - pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); - } - onFinished(); - return; + CHECK_NOTNULL(env_->indexMan_); + auto iRet = env_->indexMan_->getEdgeIndexes(spaceId_); + if (!iRet.ok()) { + LOG(ERROR) << iRet.status(); + for (auto& part : partEdges) { + pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); } - indexes_ = std::move(iRet).value(); - ignoreExistedIndex_ = req.get_ignore_existed_index(); + onFinished(); + return; + } + indexes_ = std::move(iRet).value(); + ignoreExistedIndex_ = req.get_ignore_existed_index(); - CHECK_NOTNULL(env_->kvstore_); + CHECK_NOTNULL(env_->kvstore_); - if (indexes_.empty()) { - doProcess(req); - } else { - doProcessWithIndex(req); - } - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); + if (indexes_.empty()) { + doProcess(req); + } else { + doProcessWithIndex(req); } } diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index 22a94e2948e..0cdd04b769a 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -22,51 +22,40 @@ namespace storage { ProcessorCounters kAddVerticesCounters; void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) { - try { - memory::MemoryCheckGuard guard; - spaceId_ = req.get_space_id(); - const auto& partVertices = req.get_parts(); - ifNotExists_ = req.get_if_not_exists(); - CHECK_NOTNULL(env_->schemaMan_); - auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); - if (!ret.ok()) { - LOG(ERROR) << ret.status(); - for (auto& part : partVertices) { - pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); - } - onFinished(); - return; + spaceId_ = req.get_space_id(); + const auto& partVertices = req.get_parts(); + ifNotExists_ = req.get_if_not_exists(); + CHECK_NOTNULL(env_->schemaMan_); + auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); + if (!ret.ok()) { + LOG(ERROR) << ret.status(); + for (auto& part : partVertices) { + pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); } - spaceVidLen_ = ret.value(); - callingNum_ = partVertices.size(); - - CHECK_NOTNULL(env_->indexMan_); - auto iRet = env_->indexMan_->getTagIndexes(spaceId_); - if (!iRet.ok()) { - LOG(ERROR) << iRet.status(); - for (auto& part : partVertices) { - pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); - } - onFinished(); - return; + onFinished(); + return; + } + spaceVidLen_ = ret.value(); + callingNum_ = partVertices.size(); + + CHECK_NOTNULL(env_->indexMan_); + auto iRet = env_->indexMan_->getTagIndexes(spaceId_); + if (!iRet.ok()) { + LOG(ERROR) << iRet.status(); + for (auto& part : partVertices) { + pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); } - indexes_ = std::move(iRet).value(); - ignoreExistedIndex_ = req.get_ignore_existed_index(); + onFinished(); + return; + } + indexes_ = std::move(iRet).value(); + ignoreExistedIndex_ = req.get_ignore_existed_index(); - CHECK_NOTNULL(env_->kvstore_); - if (indexes_.empty()) { - doProcess(req); - } else { - doProcessWithIndex(req); - } - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); + CHECK_NOTNULL(env_->kvstore_); + if (indexes_.empty()) { + doProcess(req); + } else { + doProcessWithIndex(req); } } diff --git a/src/storage/mutate/DeleteEdgesProcessor.cpp b/src/storage/mutate/DeleteEdgesProcessor.cpp index eacfe721c6b..944e25bd69a 100644 --- a/src/storage/mutate/DeleteEdgesProcessor.cpp +++ b/src/storage/mutate/DeleteEdgesProcessor.cpp @@ -20,153 +20,140 @@ namespace storage { ProcessorCounters kDelEdgesCounters; void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) { - try { - spaceId_ = req.get_space_id(); - const auto& partEdges = req.get_parts(); - - CHECK_NOTNULL(env_->schemaMan_); - auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); - if (!ret.ok()) { - LOG(ERROR) << ret.status(); - for (auto& part : partEdges) { - pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); - } - onFinished(); - return; + spaceId_ = req.get_space_id(); + const auto& partEdges = req.get_parts(); + + CHECK_NOTNULL(env_->schemaMan_); + auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); + if (!ret.ok()) { + LOG(ERROR) << ret.status(); + for (auto& part : partEdges) { + pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); } - spaceVidLen_ = ret.value(); - callingNum_ = partEdges.size(); - - CHECK_NOTNULL(env_->indexMan_); - auto iRet = env_->indexMan_->getEdgeIndexes(spaceId_); - if (!iRet.ok()) { - LOG(ERROR) << iRet.status(); - for (auto& part : partEdges) { - pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); - } - onFinished(); - return; + onFinished(); + return; + } + spaceVidLen_ = ret.value(); + callingNum_ = partEdges.size(); + + CHECK_NOTNULL(env_->indexMan_); + auto iRet = env_->indexMan_->getEdgeIndexes(spaceId_); + if (!iRet.ok()) { + LOG(ERROR) << iRet.status(); + for (auto& part : partEdges) { + pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); } - indexes_ = std::move(iRet).value(); - - CHECK_NOTNULL(env_->kvstore_); - if (indexes_.empty()) { - // Operate every part, the graph layer guarantees the unique of the edgeKey - for (auto& part : partEdges) { - std::vector keys; - keys.reserve(32); - auto partId = part.first; - auto code = nebula::cpp2::ErrorCode::SUCCEEDED; - for (auto& edgeKey : part.second) { - if (!NebulaKeyUtils::isValidVidLen( - spaceVidLen_, edgeKey.src_ref()->getStr(), edgeKey.dst_ref()->getStr())) { - LOG(ERROR) << "Space " << spaceId_ << " vertex length invalid, " - << "space vid len: " << spaceVidLen_ - << ", edge srcVid: " << *edgeKey.src_ref() - << " dstVid: " << *edgeKey.dst_ref(); - code = nebula::cpp2::ErrorCode::E_INVALID_VID; - break; - } - // todo(doodle): delete lock in toss - auto edge = NebulaKeyUtils::edgeKey(spaceVidLen_, - partId, - edgeKey.src_ref()->getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - edgeKey.dst_ref()->getStr()); - keys.emplace_back(edge.data(), edge.size()); - } - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - handleAsync(spaceId_, partId, code); - continue; + onFinished(); + return; + } + indexes_ = std::move(iRet).value(); + + CHECK_NOTNULL(env_->kvstore_); + if (indexes_.empty()) { + // Operate every part, the graph layer guarantees the unique of the edgeKey + for (auto& part : partEdges) { + std::vector keys; + keys.reserve(32); + auto partId = part.first; + auto code = nebula::cpp2::ErrorCode::SUCCEEDED; + for (auto& edgeKey : part.second) { + if (!NebulaKeyUtils::isValidVidLen( + spaceVidLen_, edgeKey.src_ref()->getStr(), edgeKey.dst_ref()->getStr())) { + LOG(ERROR) << "Space " << spaceId_ << " vertex length invalid, " + << "space vid len: " << spaceVidLen_ << ", edge srcVid: " << *edgeKey.src_ref() + << " dstVid: " << *edgeKey.dst_ref(); + code = nebula::cpp2::ErrorCode::E_INVALID_VID; + break; } + // todo(doodle): delete lock in toss + auto edge = NebulaKeyUtils::edgeKey(spaceVidLen_, + partId, + edgeKey.src_ref()->getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + edgeKey.dst_ref()->getStr()); + keys.emplace_back(edge.data(), edge.size()); + } + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + handleAsync(spaceId_, partId, code); + continue; + } - HookFuncPara para; - if (tossHookFunc_) { - para.keys.emplace(&keys); - (*tossHookFunc_)(para); - } - if (para.result) { - env_->kvstore_->asyncAppendBatch( - spaceId_, - partId, - std::move(para.result.value()), - [partId, this](nebula::cpp2::ErrorCode rc) { handleAsync(spaceId_, partId, rc); }); - } else { - doRemove(spaceId_, partId, std::move(keys)); - stats::StatsManager::addValue(kNumEdgesDeleted, keys.size()); - } + HookFuncPara para; + if (tossHookFunc_) { + para.keys.emplace(&keys); + (*tossHookFunc_)(para); } - } else { - for (auto& part : partEdges) { - IndexCountWrapper wrapper(env_); - auto partId = part.first; - std::vector dummyLock; - dummyLock.reserve(part.second.size()); - - nebula::cpp2::ErrorCode err = nebula::cpp2::ErrorCode::SUCCEEDED; - for (const auto& edgeKey : part.second) { - if (!NebulaKeyUtils::isValidVidLen( - spaceVidLen_, edgeKey.src_ref()->getStr(), edgeKey.dst_ref()->getStr())) { - LOG(ERROR) << "Space " << spaceId_ << " vertex length invalid, " - << "space vid len: " << spaceVidLen_ - << ", edge srcVid: " << *edgeKey.src_ref() - << " dstVid: " << *edgeKey.dst_ref(); - err = nebula::cpp2::ErrorCode::E_INVALID_VID; - break; - } - auto l = std::make_tuple(spaceId_, - partId, - edgeKey.src_ref()->getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - edgeKey.dst_ref()->getStr()); - if (std::find(dummyLock.begin(), dummyLock.end(), l) == dummyLock.end()) { - if (!env_->edgesML_->try_lock(l)) { - LOG(ERROR) << folly::sformat("The edge locked : src {}, type {}, tank {}, dst {}", - edgeKey.src_ref()->getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - edgeKey.dst_ref()->getStr()); - err = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - break; - } - dummyLock.emplace_back(std::move(l)); - } - } - if (err != nebula::cpp2::ErrorCode::SUCCEEDED) { - env_->edgesML_->unlockBatch(dummyLock); - handleAsync(spaceId_, partId, err); - continue; - } - auto batch = deleteEdges(partId, std::move(part.second)); - if (!nebula::ok(batch)) { - env_->edgesML_->unlockBatch(dummyLock); - handleAsync(spaceId_, partId, nebula::error(batch)); - continue; - } - DCHECK(!nebula::value(batch).empty()); - nebula::MemoryLockGuard lg(env_->edgesML_.get(), std::move(dummyLock), false, false); + if (para.result) { env_->kvstore_->asyncAppendBatch( spaceId_, partId, - std::move(nebula::value(batch)), - [l = std::move(lg), icw = std::move(wrapper), partId, this]( - nebula::cpp2::ErrorCode code) { - UNUSED(l); - UNUSED(icw); - handleAsync(spaceId_, partId, code); - }); + std::move(para.result.value()), + [partId, this](nebula::cpp2::ErrorCode rc) { handleAsync(spaceId_, partId, rc); }); + } else { + doRemove(spaceId_, partId, std::move(keys)); + stats::StatsManager::addValue(kNumEdgesDeleted, keys.size()); + } + } + } else { + for (auto& part : partEdges) { + IndexCountWrapper wrapper(env_); + auto partId = part.first; + std::vector dummyLock; + dummyLock.reserve(part.second.size()); + + nebula::cpp2::ErrorCode err = nebula::cpp2::ErrorCode::SUCCEEDED; + for (const auto& edgeKey : part.second) { + if (!NebulaKeyUtils::isValidVidLen( + spaceVidLen_, edgeKey.src_ref()->getStr(), edgeKey.dst_ref()->getStr())) { + LOG(ERROR) << "Space " << spaceId_ << " vertex length invalid, " + << "space vid len: " << spaceVidLen_ << ", edge srcVid: " << *edgeKey.src_ref() + << " dstVid: " << *edgeKey.dst_ref(); + err = nebula::cpp2::ErrorCode::E_INVALID_VID; + break; + } + auto l = std::make_tuple(spaceId_, + partId, + edgeKey.src_ref()->getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + edgeKey.dst_ref()->getStr()); + if (std::find(dummyLock.begin(), dummyLock.end(), l) == dummyLock.end()) { + if (!env_->edgesML_->try_lock(l)) { + LOG(ERROR) << folly::sformat("The edge locked : src {}, type {}, tank {}, dst {}", + edgeKey.src_ref()->getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + edgeKey.dst_ref()->getStr()); + err = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + break; + } + dummyLock.emplace_back(std::move(l)); + } + } + if (err != nebula::cpp2::ErrorCode::SUCCEEDED) { + env_->edgesML_->unlockBatch(dummyLock); + handleAsync(spaceId_, partId, err); + continue; + } + auto batch = deleteEdges(partId, std::move(part.second)); + if (!nebula::ok(batch)) { + env_->edgesML_->unlockBatch(dummyLock); + handleAsync(spaceId_, partId, nebula::error(batch)); + continue; } + DCHECK(!nebula::value(batch).empty()); + nebula::MemoryLockGuard lg(env_->edgesML_.get(), std::move(dummyLock), false, false); + env_->kvstore_->asyncAppendBatch(spaceId_, + partId, + std::move(nebula::value(batch)), + [l = std::move(lg), icw = std::move(wrapper), partId, this]( + nebula::cpp2::ErrorCode code) { + UNUSED(l); + UNUSED(icw); + handleAsync(spaceId_, partId, code); + }); } - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); } } diff --git a/src/storage/mutate/DeleteTagsProcessor.cpp b/src/storage/mutate/DeleteTagsProcessor.cpp index 9b189fef728..7ffc6f28696 100644 --- a/src/storage/mutate/DeleteTagsProcessor.cpp +++ b/src/storage/mutate/DeleteTagsProcessor.cpp @@ -18,87 +18,76 @@ namespace storage { ProcessorCounters kDelTagsCounters; void DeleteTagsProcessor::process(const cpp2::DeleteTagsRequest& req) { - try { - spaceId_ = req.get_space_id(); - const auto& parts = req.get_parts(); + spaceId_ = req.get_space_id(); + const auto& parts = req.get_parts(); - CHECK_NOTNULL(env_->schemaMan_); - auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); - if (!ret.ok()) { - LOG(ERROR) << ret.status(); - for (auto& part : parts) { - pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); - } - onFinished(); - return; + CHECK_NOTNULL(env_->schemaMan_); + auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); + if (!ret.ok()) { + LOG(ERROR) << ret.status(); + for (auto& part : parts) { + pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); } - spaceVidLen_ = ret.value(); - callingNum_ = parts.size(); + onFinished(); + return; + } + spaceVidLen_ = ret.value(); + callingNum_ = parts.size(); - CHECK_NOTNULL(env_->indexMan_); - auto iRet = env_->indexMan_->getTagIndexes(spaceId_); - if (!iRet.ok()) { - LOG(ERROR) << iRet.status(); - for (auto& part : parts) { - pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); - } - onFinished(); - return; + CHECK_NOTNULL(env_->indexMan_); + auto iRet = env_->indexMan_->getTagIndexes(spaceId_); + if (!iRet.ok()) { + LOG(ERROR) << iRet.status(); + for (auto& part : parts) { + pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); } - indexes_ = std::move(iRet).value(); + onFinished(); + return; + } + indexes_ = std::move(iRet).value(); - CHECK_NOTNULL(env_->kvstore_); - if (indexes_.empty()) { - std::vector keys; - keys.reserve(32); - for (const auto& part : parts) { - auto partId = part.first; - const auto& delTags = part.second; - keys.clear(); - for (const auto& entry : delTags) { - const auto& vId = entry.get_id().getStr(); - for (const auto& tagId : entry.get_tags()) { - auto key = NebulaKeyUtils::tagKey(spaceVidLen_, partId, vId, tagId); - keys.emplace_back(std::move(key)); - } + CHECK_NOTNULL(env_->kvstore_); + if (indexes_.empty()) { + std::vector keys; + keys.reserve(32); + for (const auto& part : parts) { + auto partId = part.first; + const auto& delTags = part.second; + keys.clear(); + for (const auto& entry : delTags) { + const auto& vId = entry.get_id().getStr(); + for (const auto& tagId : entry.get_tags()) { + auto key = NebulaKeyUtils::tagKey(spaceVidLen_, partId, vId, tagId); + keys.emplace_back(std::move(key)); } - doRemove(spaceId_, partId, std::move(keys)); - stats::StatsManager::addValue(kNumTagsDeleted, keys.size()); } - } else { - for (const auto& part : parts) { - IndexCountWrapper wrapper(env_); - auto partId = part.first; - std::vector lockedKeys; - auto batch = deleteTags(partId, part.second, lockedKeys); - if (!nebula::ok(batch)) { - env_->verticesML_->unlockBatch(lockedKeys); - handleAsync(spaceId_, partId, nebula::error(batch)); - continue; - } - // keys has been locked in deleteTags - nebula::MemoryLockGuard lg( - env_->verticesML_.get(), std::move(lockedKeys), false, false); - env_->kvstore_->asyncAppendBatch( - spaceId_, - partId, - std::move(nebula::value(batch)), - [l = std::move(lg), icw = std::move(wrapper), partId, this]( - nebula::cpp2::ErrorCode code) { - UNUSED(l); - UNUSED(icw); - handleAsync(spaceId_, partId, code); - }); + doRemove(spaceId_, partId, std::move(keys)); + stats::StatsManager::addValue(kNumTagsDeleted, keys.size()); + } + } else { + for (const auto& part : parts) { + IndexCountWrapper wrapper(env_); + auto partId = part.first; + std::vector lockedKeys; + auto batch = deleteTags(partId, part.second, lockedKeys); + if (!nebula::ok(batch)) { + env_->verticesML_->unlockBatch(lockedKeys); + handleAsync(spaceId_, partId, nebula::error(batch)); + continue; } + // keys has been locked in deleteTags + nebula::MemoryLockGuard lg( + env_->verticesML_.get(), std::move(lockedKeys), false, false); + env_->kvstore_->asyncAppendBatch(spaceId_, + partId, + std::move(nebula::value(batch)), + [l = std::move(lg), icw = std::move(wrapper), partId, this]( + nebula::cpp2::ErrorCode code) { + UNUSED(l); + UNUSED(icw); + handleAsync(spaceId_, partId, code); + }); } - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); } } diff --git a/src/storage/mutate/DeleteVerticesProcessor.cpp b/src/storage/mutate/DeleteVerticesProcessor.cpp index 3dbc6ae7060..63a21f2ad20 100644 --- a/src/storage/mutate/DeleteVerticesProcessor.cpp +++ b/src/storage/mutate/DeleteVerticesProcessor.cpp @@ -17,107 +17,95 @@ namespace storage { ProcessorCounters kDelVerticesCounters; void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) { - try { - spaceId_ = req.get_space_id(); - const auto& partVertices = req.get_parts(); + spaceId_ = req.get_space_id(); + const auto& partVertices = req.get_parts(); - CHECK_NOTNULL(env_->schemaMan_); - auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); - if (!ret.ok()) { - LOG(ERROR) << ret.status(); - for (auto& part : partVertices) { - pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); - } - onFinished(); - return; + CHECK_NOTNULL(env_->schemaMan_); + auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); + if (!ret.ok()) { + LOG(ERROR) << ret.status(); + for (auto& part : partVertices) { + pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); } - spaceVidLen_ = ret.value(); - callingNum_ = partVertices.size(); + onFinished(); + return; + } + spaceVidLen_ = ret.value(); + callingNum_ = partVertices.size(); - CHECK_NOTNULL(env_->indexMan_); - auto iRet = env_->indexMan_->getTagIndexes(spaceId_); - if (!iRet.ok()) { - LOG(ERROR) << iRet.status(); - for (auto& part : partVertices) { - pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); - } - onFinished(); - return; + CHECK_NOTNULL(env_->indexMan_); + auto iRet = env_->indexMan_->getTagIndexes(spaceId_); + if (!iRet.ok()) { + LOG(ERROR) << iRet.status(); + for (auto& part : partVertices) { + pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); } - indexes_ = std::move(iRet).value(); + onFinished(); + return; + } + indexes_ = std::move(iRet).value(); - CHECK_NOTNULL(env_->kvstore_); - if (indexes_.empty()) { - // Operate every part, the graph layer guarantees the unique of the vid - std::vector keys; - keys.reserve(32); - for (auto& part : partVertices) { - auto partId = part.first; - const auto& vertexIds = part.second; - keys.clear(); - auto code = nebula::cpp2::ErrorCode::SUCCEEDED; - for (auto& vid : vertexIds) { - if (!NebulaKeyUtils::isValidVidLen(spaceVidLen_, vid.getStr())) { - LOG(ERROR) << "Space " << spaceId_ << ", vertex length invalid, " - << " space vid len: " << spaceVidLen_ << ", vid is " << vid; - code = nebula::cpp2::ErrorCode::E_INVALID_VID; - break; - } - keys.emplace_back(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid.getStr())); - auto prefix = NebulaKeyUtils::tagPrefix(spaceVidLen_, partId, vid.getStr()); - std::unique_ptr iter; - code = env_->kvstore_->prefix(spaceId_, partId, prefix, &iter); - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - VLOG(3) << "Error! ret = " << static_cast(code) << ", spaceID " << spaceId_; - break; - } - while (iter->valid()) { - auto key = iter->key(); - keys.emplace_back(key.str()); - iter->next(); - } + CHECK_NOTNULL(env_->kvstore_); + if (indexes_.empty()) { + // Operate every part, the graph layer guarantees the unique of the vid + std::vector keys; + keys.reserve(32); + for (auto& part : partVertices) { + auto partId = part.first; + const auto& vertexIds = part.second; + keys.clear(); + auto code = nebula::cpp2::ErrorCode::SUCCEEDED; + for (auto& vid : vertexIds) { + if (!NebulaKeyUtils::isValidVidLen(spaceVidLen_, vid.getStr())) { + LOG(ERROR) << "Space " << spaceId_ << ", vertex length invalid, " + << " space vid len: " << spaceVidLen_ << ", vid is " << vid; + code = nebula::cpp2::ErrorCode::E_INVALID_VID; + break; } + keys.emplace_back(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid.getStr())); + auto prefix = NebulaKeyUtils::tagPrefix(spaceVidLen_, partId, vid.getStr()); + std::unique_ptr iter; + code = env_->kvstore_->prefix(spaceId_, partId, prefix, &iter); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - handleAsync(spaceId_, partId, code); - continue; + VLOG(3) << "Error! ret = " << static_cast(code) << ", spaceID " << spaceId_; + break; } - doRemove(spaceId_, partId, std::move(keys)); - stats::StatsManager::addValue(kNumVerticesDeleted, keys.size()); - } - } else { - for (auto& pv : partVertices) { - IndexCountWrapper wrapper(env_); - auto partId = pv.first; - std::vector dummyLock; - auto batch = deleteVertices(partId, std::move(pv).second, dummyLock); - if (!nebula::ok(batch)) { - env_->verticesML_->unlockBatch(dummyLock); - handleAsync(spaceId_, partId, nebula::error(batch)); - continue; + while (iter->valid()) { + auto key = iter->key(); + keys.emplace_back(key.str()); + iter->next(); } - DCHECK(!nebula::value(batch).empty()); - nebula::MemoryLockGuard lg( - env_->verticesML_.get(), std::move(dummyLock), false, false); - env_->kvstore_->asyncAppendBatch( - spaceId_, - partId, - std::move(nebula::value(batch)), - [l = std::move(lg), icw = std::move(wrapper), partId, this]( - nebula::cpp2::ErrorCode code) { - UNUSED(l); - UNUSED(icw); - handleAsync(spaceId_, partId, code); - }); } + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + handleAsync(spaceId_, partId, code); + continue; + } + doRemove(spaceId_, partId, std::move(keys)); + stats::StatsManager::addValue(kNumVerticesDeleted, keys.size()); + } + } else { + for (auto& pv : partVertices) { + IndexCountWrapper wrapper(env_); + auto partId = pv.first; + std::vector dummyLock; + auto batch = deleteVertices(partId, std::move(pv).second, dummyLock); + if (!nebula::ok(batch)) { + env_->verticesML_->unlockBatch(dummyLock); + handleAsync(spaceId_, partId, nebula::error(batch)); + continue; + } + DCHECK(!nebula::value(batch).empty()); + nebula::MemoryLockGuard lg(env_->verticesML_.get(), std::move(dummyLock), false, false); + env_->kvstore_->asyncAppendBatch(spaceId_, + partId, + std::move(nebula::value(batch)), + [l = std::move(lg), icw = std::move(wrapper), partId, this]( + nebula::cpp2::ErrorCode code) { + UNUSED(l); + UNUSED(icw); + handleAsync(spaceId_, partId, code); + }); } - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); } } diff --git a/src/storage/mutate/UpdateEdgeProcessor.cpp b/src/storage/mutate/UpdateEdgeProcessor.cpp index 6e6d1db6d2a..599f4e20937 100644 --- a/src/storage/mutate/UpdateEdgeProcessor.cpp +++ b/src/storage/mutate/UpdateEdgeProcessor.cpp @@ -19,23 +19,11 @@ namespace storage { ProcessorCounters kUpdateEdgeCounters; void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) { - try { - if (executor_ != nullptr) { - executor_->add([req, this]() { - memory::MemoryCheckGuard guard; - this->doProcess(req); - }); - } else { - doProcess(req); - } - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); + if (executor_ != nullptr) { + executor_->add( + [this, req]() { MemoryCheckScope wrapper(this, [this, req] { this->doProcess(req); }); }); + } else { + doProcess(req); } } diff --git a/src/storage/mutate/UpdateVertexProcessor.cpp b/src/storage/mutate/UpdateVertexProcessor.cpp index 757adb9d6d0..c5fabcff81d 100644 --- a/src/storage/mutate/UpdateVertexProcessor.cpp +++ b/src/storage/mutate/UpdateVertexProcessor.cpp @@ -19,23 +19,11 @@ namespace storage { ProcessorCounters kUpdateVertexCounters; void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) { - try { - if (executor_ != nullptr) { - executor_->add([req, this]() { - memory::MemoryCheckGuard guard; - this->doProcess(req); - }); - } else { - doProcess(req); - } - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); + if (executor_ != nullptr) { + executor_->add( + [this, req]() { MemoryCheckScope wrapper(this, [this, req] { this->doProcess(req); }); }); + } else { + doProcess(req); } } diff --git a/src/storage/query/GetDstBySrcProcessor.cpp b/src/storage/query/GetDstBySrcProcessor.cpp index 11240738108..f4a1ce38029 100644 --- a/src/storage/query/GetDstBySrcProcessor.cpp +++ b/src/storage/query/GetDstBySrcProcessor.cpp @@ -21,20 +21,11 @@ namespace storage { ProcessorCounters kGetDstBySrcCounters; void GetDstBySrcProcessor::process(const cpp2::GetDstBySrcRequest& req) { - try { - if (executor_ != nullptr) { - executor_->add([req, this]() { this->doProcess(req); }); - } else { - doProcess(req); - } - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); + if (executor_ != nullptr) { + executor_->add( + [this, req]() { MemoryCheckScope wrapper(this, [this, req] { this->doProcess(req); }); }); + } else { + doProcess(req); } } diff --git a/src/storage/query/GetNeighborsProcessor.cpp b/src/storage/query/GetNeighborsProcessor.cpp index 9e8b5dd30ff..07c68d6a62a 100644 --- a/src/storage/query/GetNeighborsProcessor.cpp +++ b/src/storage/query/GetNeighborsProcessor.cpp @@ -21,20 +21,11 @@ namespace storage { ProcessorCounters kGetNeighborsCounters; void GetNeighborsProcessor::process(const cpp2::GetNeighborsRequest& req) { - try { - if (executor_ != nullptr) { - executor_->add([req, this]() { this->doProcess(req); }); - } else { - doProcess(req); - } - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); + if (executor_ != nullptr) { + executor_->add( + [this, req]() { MemoryCheckScope wrapper(this, [this, req] { this->doProcess(req); }); }); + } else { + doProcess(req); } } diff --git a/src/storage/query/GetPropProcessor.cpp b/src/storage/query/GetPropProcessor.cpp index 21f7bda3d3d..81dfb5ef7e2 100644 --- a/src/storage/query/GetPropProcessor.cpp +++ b/src/storage/query/GetPropProcessor.cpp @@ -14,20 +14,11 @@ namespace storage { ProcessorCounters kGetPropCounters; void GetPropProcessor::process(const cpp2::GetPropRequest& req) { - try { - if (executor_ != nullptr) { - executor_->add([req, this]() { this->doProcess(req); }); - } else { - doProcess(req); - } - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); + if (executor_ != nullptr) { + executor_->add( + [this, req]() { MemoryCheckScope wrapper(this, [this, req] { this->doProcess(req); }); }); + } else { + doProcess(req); } } diff --git a/src/storage/query/ScanEdgeProcessor.cpp b/src/storage/query/ScanEdgeProcessor.cpp index 717b1924662..c49d72906f5 100644 --- a/src/storage/query/ScanEdgeProcessor.cpp +++ b/src/storage/query/ScanEdgeProcessor.cpp @@ -16,20 +16,11 @@ namespace storage { ProcessorCounters kScanEdgeCounters; void ScanEdgeProcessor::process(const cpp2::ScanEdgeRequest& req) { - try { - if (executor_ != nullptr) { - executor_->add([req, this]() { this->doProcess(req); }); - } else { - doProcess(req); - } - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); + if (executor_ != nullptr) { + executor_->add( + [this, req]() { MemoryCheckScope wrapper(this, [this, req] { this->doProcess(req); }); }); + } else { + doProcess(req); } } diff --git a/src/storage/query/ScanVertexProcessor.cpp b/src/storage/query/ScanVertexProcessor.cpp index 429f514bdb5..6465b11de4b 100644 --- a/src/storage/query/ScanVertexProcessor.cpp +++ b/src/storage/query/ScanVertexProcessor.cpp @@ -18,20 +18,11 @@ namespace storage { ProcessorCounters kScanVertexCounters; void ScanVertexProcessor::process(const cpp2::ScanVertexRequest& req) { - try { - if (executor_ != nullptr) { - executor_->add([req, this]() { this->doProcess(req); }); - } else { - doProcess(req); - } - } catch (std::bad_alloc& e) { - memoryExceeded_ = true; - onError(); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - onError(); - } catch (...) { - onError(); + if (executor_ != nullptr) { + executor_->add( + [this, req]() { MemoryCheckScope wrapper(this, [this, req] { this->doProcess(req); }); }); + } else { + doProcess(req); } } diff --git a/tests/Makefile b/tests/Makefile index 5200dc86219..8e33ae1efc8 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -15,7 +15,7 @@ BUILD_DIR ?= $(CURR_DIR)/../build DEBUG ?= true J ?= 10 ENABLE_FT_INDEX ?= false -ES_ADDRESS ?= "locahost:9200" +ES_ADDRESS ?= 127.0.0.1:9200 ENABLE_SSL ?= false ENABLE_GRAPH_SSL ?= false ENABLE_META_SSL ?= false diff --git a/tests/tck/features/basic/Parser.feature b/tests/tck/features/basic/Parser.feature new file mode 100644 index 00000000000..021e2ffa475 --- /dev/null +++ b/tests/tck/features/basic/Parser.feature @@ -0,0 +1,16 @@ +# Copyright (c) 2022 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License. +Feature: Parser + + Scenario: Test special white space character + When executing query: + """ + SHOW  SPACES + """ + Then the execution should be successful + When executing query: + """ + RETURN  1 + """ + Then the execution should be successful