diff --git a/src/dataman/RowReader.cpp b/src/dataman/RowReader.cpp index 0bc5a731138..e4db5d4f9e2 100644 --- a/src/dataman/RowReader.cpp +++ b/src/dataman/RowReader.cpp @@ -113,67 +113,60 @@ RowReader::Iterator::operator bool() const { * class RowReader * ********************************************/ -// static -std::unique_ptr RowReader::getTagPropReader( +RowReader RowReader::getTagPropReader( meta::SchemaManager* schemaMan, folly::StringPiece row, GraphSpaceID space, TagID tag) { if (schemaMan == nullptr) { LOG(ERROR) << "schemaMan should not be nullptr!"; - return nullptr; + return RowReader(); } int32_t ver = getSchemaVer(row); if (ver >= 0) { auto schema = schemaMan->getTagSchema(space, tag, ver); if (schema == nullptr) { - return nullptr; + return RowReader(); } - return std::unique_ptr(new RowReader( - row, - schema)); + return RowReader(row, schema); } else { LOG(WARNING) << "Invalid schema version in the row data!"; - return nullptr; + return RowReader(); } } - // static -std::unique_ptr RowReader::getEdgePropReader( - meta::SchemaManager* schemaMan, - folly::StringPiece row, - GraphSpaceID space, - EdgeType edge) { +RowReader RowReader::getEdgePropReader( + meta::SchemaManager* schemaMan, + folly::StringPiece row, + GraphSpaceID space, + EdgeType edge) { if (schemaMan == nullptr) { LOG(ERROR) << "schemaMan should not be nullptr!"; - return nullptr; + return RowReader(); } int32_t ver = getSchemaVer(row); if (ver >= 0) { auto schema = schemaMan->getEdgeSchema(space, edge, ver); if (schema == nullptr) { - return nullptr; + return RowReader(); } - return std::unique_ptr(new RowReader( - row, - schema)); + return RowReader(row, schema); } else { LOG(WARNING) << "Invalid schema version in the row data!"; - return nullptr; + return RowReader(); } } // static -std::unique_ptr RowReader::getRowReader( - folly::StringPiece row, - std::shared_ptr schema) { +RowReader RowReader::getRowReader( + folly::StringPiece row, + std::shared_ptr schema) { SchemaVer ver = getSchemaVer(row); CHECK_EQ(ver, schema->getVersion()); - return std::unique_ptr(new RowReader(row, std::move(schema))); + return RowReader(row, std::move(schema)); } - // static int32_t RowReader::getSchemaVer(folly::StringPiece row) { const uint8_t* it = reinterpret_cast(row.begin()); diff --git a/src/dataman/RowReader.h b/src/dataman/RowReader.h index b2bfb876e34..4f796a6fc82 100644 --- a/src/dataman/RowReader.h +++ b/src/dataman/RowReader.h @@ -72,22 +72,31 @@ class RowReader { public: - static std::unique_ptr getTagPropReader( + RowReader(const RowReader&) = delete; + RowReader& operator=(const RowReader&) = delete; + RowReader(RowReader&& x) = default; + RowReader& operator=(RowReader&& x) = default; + + static RowReader getTagPropReader( meta::SchemaManager* schemaMan, folly::StringPiece row, GraphSpaceID space, TagID tag); - static std::unique_ptr getEdgePropReader( + static RowReader getEdgePropReader( meta::SchemaManager* schemaMan, folly::StringPiece row, GraphSpaceID space, EdgeType edge); - static std::unique_ptr getRowReader( + static RowReader getRowReader( folly::StringPiece row, std::shared_ptr schema); + static RowReader getEmptyRowReader() { + return RowReader(); + } + static StatusOr getDefaultProp(const meta::SchemaProviderIf* schema, const std::string& prop) { auto& vType = schema->getFieldType(prop); @@ -242,9 +251,6 @@ class RowReader { return ResultType::E_DATA_INVALID; } } - - virtual ~RowReader() = default; - SchemaVer schemaVer() const noexcept; int32_t numFields() const noexcept; @@ -286,6 +292,47 @@ class RowReader { return data_; } + operator bool() const noexcept { + return operator!=(nullptr); + } + + bool operator==(std::nullptr_t) const noexcept { + return !data_.data(); + } + + bool operator==(const RowReader& x) const noexcept { + return data_ == x.data_; + } + + bool operator!=(std::nullptr_t) const noexcept { + return static_cast(data_.data()); + } + + bool operator!=(const RowReader& x) const noexcept { + return data_ != x.data_; + } + + RowReader* operator->() const noexcept { + return get(); + } + + RowReader* get() const noexcept { + return const_cast(this); + } + + RowReader* get() noexcept { + return this; + } + + RowReader& operator*() const noexcept { + return *get(); + } + + void reset() noexcept { + this->~RowReader(); + new(this) RowReader(); + } + // TODO getPath(const std::string& name) const noexcept; // TODO getPath(int64_t index) const noexcept; // TODO getList(const std::string& name) const noexcept; @@ -308,6 +355,7 @@ class RowReader { mutable std::vector offsets_; private: + RowReader() = default; RowReader(folly::StringPiece row, std::shared_ptr schema); diff --git a/src/dataman/RowSetReader.h b/src/dataman/RowSetReader.h index c84aaaa65bd..3a30a0fb460 100644 --- a/src/dataman/RowSetReader.h +++ b/src/dataman/RowSetReader.h @@ -7,6 +7,7 @@ #ifndef DATAMAN_ROWSETREADER_H_ #define DATAMAN_ROWSETREADER_H_ +#include "RowReader.h" #include "base/Base.h" #include "gen-cpp2/storage_types.h" #include "meta/SchemaProviderIf.h" @@ -33,7 +34,7 @@ class RowSetReader { // The total length of the encoded row set const folly::StringPiece& data_; - std::unique_ptr reader_; + RowReader reader_ = RowReader::getEmptyRowReader(); // The offset of the current row int64_t offset_; // The length of the current row diff --git a/src/dataman/RowUpdater.cpp b/src/dataman/RowUpdater.cpp index 853a6865233..5546b10123c 100644 --- a/src/dataman/RowUpdater.cpp +++ b/src/dataman/RowUpdater.cpp @@ -12,7 +12,7 @@ namespace nebula { using folly::hash::SpookyHashV2; using nebula::meta::SchemaProviderIf; -RowUpdater::RowUpdater(std::unique_ptr reader, +RowUpdater::RowUpdater(RowReader reader, std::shared_ptr schema) : schema_(std::move(schema)) , reader_(std::move(reader)) { @@ -22,7 +22,7 @@ RowUpdater::RowUpdater(std::unique_ptr reader, RowUpdater::RowUpdater(std::shared_ptr schema) : schema_(std::move(schema)) - , reader_(nullptr) { + , reader_(RowReader::getEmptyRowReader()) { CHECK(!!schema_); } diff --git a/src/dataman/RowUpdater.h b/src/dataman/RowUpdater.h index ab1b3887bb2..708785023eb 100644 --- a/src/dataman/RowUpdater.h +++ b/src/dataman/RowUpdater.h @@ -27,7 +27,7 @@ class RowUpdater { // reader holds the original data // schema is the writer schema, which means the updated data will be encoded // using this schema - RowUpdater(std::unique_ptr reader, + RowUpdater(RowReader reader, std::shared_ptr schema); explicit RowUpdater(std::shared_ptr schema); @@ -79,7 +79,7 @@ class RowUpdater { private: std::shared_ptr schema_; - std::unique_ptr reader_; + RowReader reader_ = RowReader::getEmptyRowReader(); // Hash64(field_name) => value std::unordered_map updatedFields_; }; diff --git a/src/graph/FetchVerticesExecutor.cpp b/src/graph/FetchVerticesExecutor.cpp index ddd0f043777..b6ef4b54117 100644 --- a/src/graph/FetchVerticesExecutor.cpp +++ b/src/graph/FetchVerticesExecutor.cpp @@ -364,7 +364,7 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) { finishExecution(std::move(rsWriter)); return; } - std::unordered_map>> dataMap; + std::unordered_map> dataMap; dataMap.reserve(num); std::unordered_map> tagSchemaMap; @@ -407,7 +407,7 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) { } auto vschema = tagSchemaMap[tagId]; auto vreader = RowReader::getRowReader(data, vschema); - dataMap[vid][tagId] = std::move(vreader); + dataMap[vid].emplace(std::make_pair(tagId, std::move(vreader))); tagIdSet.insert(tagId); } } diff --git a/src/graph/GoExecutor.cpp b/src/graph/GoExecutor.cpp index d73c191e887..cf8a61b0555 100644 --- a/src/graph/GoExecutor.cpp +++ b/src/graph/GoExecutor.cpp @@ -926,7 +926,7 @@ void GoExecutor::fetchVertexProps(std::vector ids) { auto returns = status.value(); auto future = ectx()->getStorageClient()->getVertexProps(spaceId, ids, returns); auto *runner = ectx()->rctx()->runner(); - auto cb = [this] (auto &&result) mutable { + auto cb = [this, ectx = ectx()] (auto &&result) mutable { auto completeness = result.completeness(); if (completeness == 0) { doError(Status::Error("Get dest props failed")); @@ -939,7 +939,7 @@ void GoExecutor::fetchVertexProps(std::vector ids) { } } if (vertexHolder_ == nullptr) { - vertexHolder_ = std::make_unique(); + vertexHolder_ = std::make_unique(ectx); } for (auto &resp : result.responses()) { vertexHolder_->add(resp); @@ -1222,7 +1222,7 @@ bool GoExecutor::processFinalResult(Callback cb) const { return index_->getColumnWithRow(*inputRow, prop); }; - std::unique_ptr reader; + RowReader reader = RowReader::getEmptyRowReader(); if (currEdgeSchema) { reader = RowReader::getRowReader(edge.props, currEdgeSchema); } @@ -1315,28 +1315,24 @@ bool GoExecutor::processFinalResult(Callback cb) const { return true; } -OptVariantType GoExecutor::VertexHolder::getDefaultProp(TagID tid, const std::string &prop) const { - for (auto it = data_.cbegin(); it != data_.cend(); ++it) { - auto it2 = it->second.find(tid); - if (it2 != it->second.cend()) { - return RowReader::getDefaultProp(std::get<0>(it2->second).get(), prop); - } +OptVariantType GoExecutor::VertexHolder::getDefaultProp( + TagID tagId, const std::string &prop) const { + auto space = ectx_->rctx()->session()->space(); + auto schema = ectx_->schemaManager()->getTagSchema(space, tagId); + if (schema == nullptr) { + return Status::Error("No tag schema for tagId %d", tagId); } - - - return Status::Error("Unknown property: `%s'", prop.c_str()); + return RowReader::getDefaultProp(schema.get(), prop); } -SupportedType GoExecutor::VertexHolder::getDefaultPropType(TagID tid, - const std::string &prop) const { - for (auto it = data_.cbegin(); it != data_.cend(); ++it) { - auto it2 = it->second.find(tid); - if (it2 != it->second.cend()) { - return std::get<0>(it2->second)->getFieldType(prop).type; - } +SupportedType GoExecutor::VertexHolder::getDefaultPropType( + TagID tagId, const std::string &prop) const { + auto space = ectx_->rctx()->session()->space(); + auto schema = ectx_->schemaManager()->getTagSchema(space, tagId); + if (schema == nullptr) { + return nebula::cpp2::SupportedType::UNKNOWN; } - - return nebula::cpp2::SupportedType::UNKNOWN; + return schema->getFieldType(prop).type; } OptVariantType GoExecutor::VertexHolder::get(VertexID id, TagID tid, diff --git a/src/graph/GoExecutor.h b/src/graph/GoExecutor.h index 283c9423aa6..0fd66973e0c 100644 --- a/src/graph/GoExecutor.h +++ b/src/graph/GoExecutor.h @@ -175,6 +175,7 @@ class GoExecutor final : public TraverseExecutor { */ class VertexHolder final { public: + explicit VertexHolder(ExecutionContext* ectx) : ectx_(ectx) { } OptVariantType getDefaultProp(TagID tid, const std::string &prop) const; OptVariantType get(VertexID id, TagID tid, const std::string &prop) const; void add(const storage::cpp2::QueryResponse &resp); @@ -184,6 +185,7 @@ class GoExecutor final : public TraverseExecutor { private: using VData = std::tuple, std::string>; std::unordered_map> data_; + ExecutionContext* ectx_{nullptr}; }; class VertexBackTracker final { diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index e16f2bedea9..83ef882d6e8 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -87,7 +87,7 @@ std::string AddEdgesProcessor::addEdges(int64_t version, PartitionID partId, }); for (auto& e : newEdges) { std::string val; - std::unique_ptr nReader; + RowReader nReader = RowReader::getEmptyRowReader(); auto edgeType = NebulaKeyUtils::getEdgeType(e.first); for (auto& index : indexes_) { if (edgeType == index->get_schema_id().get_edge_type()) { diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index 814bb4318b5..10e0083b85a 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -103,7 +103,7 @@ std::string AddVerticesProcessor::addVertices(int64_t version, PartitionID partI for (auto& v : newVertices) { std::string val; - std::unique_ptr nReader; + RowReader nReader = RowReader::getEmptyRowReader(); auto tagId = NebulaKeyUtils::getTagId(v.first); auto vId = NebulaKeyUtils::getVertexId(v.first); for (auto& index : indexes_) { diff --git a/src/storage/mutate/DeleteEdgesProcessor.cpp b/src/storage/mutate/DeleteEdgesProcessor.cpp index d74dca77ffe..589bc7babda 100644 --- a/src/storage/mutate/DeleteEdgesProcessor.cpp +++ b/src/storage/mutate/DeleteEdgesProcessor.cpp @@ -98,7 +98,7 @@ DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId, * just get the latest version edge for index. */ if (isLatestVE) { - std::unique_ptr reader; + RowReader reader = RowReader::getEmptyRowReader(); for (auto& index : indexes_) { auto indexId = index->get_index_id(); if (type == index->get_schema_id().get_edge_type()) { diff --git a/src/storage/mutate/DeleteVerticesProcessor.cpp b/src/storage/mutate/DeleteVerticesProcessor.cpp index d90574e5272..2e84ff9d760 100644 --- a/src/storage/mutate/DeleteVerticesProcessor.cpp +++ b/src/storage/mutate/DeleteVerticesProcessor.cpp @@ -119,7 +119,7 @@ DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId, * Using newlyVertexId to identify if it is the latest version */ if (latestVVId != tagId) { - std::unique_ptr reader; + RowReader reader = RowReader::getEmptyRowReader(); for (auto& index : indexes_) { auto indexId = index->get_index_id(); if (index->get_schema_id().get_tag_id() == tagId) { diff --git a/src/storage/mutate/UpdateEdgeProcessor.cpp b/src/storage/mutate/UpdateEdgeProcessor.cpp index feef1c4e504..38adf96c4a7 100644 --- a/src/storage/mutate/UpdateEdgeProcessor.cpp +++ b/src/storage/mutate/UpdateEdgeProcessor.cpp @@ -403,7 +403,8 @@ folly::Optional UpdateEdgeProcessor::updateAndWriteBack(PartitionID auto nVal = std::move(status.value()); // TODO(heng) we don't update the index for reverse edge. if (!indexes_.empty() && edgeKey.edge_type > 0) { - std::unique_ptr reader, rReader; + RowReader reader = RowReader::getEmptyRowReader(); + RowReader rReader = RowReader::getEmptyRowReader(); for (auto& index : indexes_) { auto indexId = index->get_index_id(); if (index->get_schema_id().get_edge_type() == edgeKey.edge_type) { diff --git a/src/storage/mutate/UpdateVertexProcessor.cpp b/src/storage/mutate/UpdateVertexProcessor.cpp index 4a8df8fb03d..3a22af5b426 100644 --- a/src/storage/mutate/UpdateVertexProcessor.cpp +++ b/src/storage/mutate/UpdateVertexProcessor.cpp @@ -410,7 +410,8 @@ folly::Optional UpdateVertexProcessor::updateAndWriteBack(const Par } auto nVal = std::move(status.value()); if (!indexes_.empty()) { - std::unique_ptr reader, oReader; + RowReader reader = RowReader::getEmptyRowReader(); + RowReader oReader = RowReader::getEmptyRowReader(); for (auto &index : indexes_) { if (index->get_schema_id().get_tag_id() == u.first) { if (!(u.second->kv.second.empty())) { diff --git a/src/storage/query/QueryBaseProcessor.h b/src/storage/query/QueryBaseProcessor.h index 4409a04d80e..98c615e31df 100644 --- a/src/storage/query/QueryBaseProcessor.h +++ b/src/storage/query/QueryBaseProcessor.h @@ -26,9 +26,7 @@ const std::unordered_map kPropsInKey_ = {"_rank", PropContext::PropInKeyType::RANK} }; -using EdgeProcessor - = std::function reader, - folly::StringPiece key)>; +using EdgeProcessor = std::function; struct Bucket { std::vector> vertices_; }; diff --git a/src/storage/query/QueryBaseProcessor.inl b/src/storage/query/QueryBaseProcessor.inl index 9c256cfbd77..b0a6ee447bc 100644 --- a/src/storage/query/QueryBaseProcessor.inl +++ b/src/storage/query/QueryBaseProcessor.inl @@ -516,7 +516,7 @@ kvstore::ResultCode QueryBaseProcessor::collectEdgeProps( } lastRank = rank; lastDstId = dstId; - std::unique_ptr reader; + RowReader reader = RowReader::getEmptyRowReader(); if ((!onlyStructure || retTTL.has_value()) && !val.empty()) { reader = RowReader::getEdgePropReader(this->schemaMan_, val, diff --git a/src/storage/query/QueryBoundProcessor.cpp b/src/storage/query/QueryBoundProcessor.cpp index cabc6219da6..5ada25b46ea 100644 --- a/src/storage/query/QueryBoundProcessor.cpp +++ b/src/storage/query/QueryBoundProcessor.cpp @@ -35,8 +35,7 @@ kvstore::ResultCode QueryBoundProcessor::processEdgeImpl(const PartitionID partI edges.reserve(FLAGS_reserved_edges_one_vertex); auto ret = collectEdgeProps( partId, vId, edgeType, &fcontext, - [&, this](std::unique_ptr reader, - folly::StringPiece k) { + [&, this](RowReader reader, folly::StringPiece k) { cpp2::IdAndProp edge; if (!onlyStructure) { RowWriter writer(currEdgeSchema); @@ -112,11 +111,12 @@ kvstore::ResultCode QueryBoundProcessor::processEdgeSampling(const PartitionID p CHECK(!onlyVertexProps_); auto ret = collectEdgeProps( partId, vId, edgeType, &fcontext, - [&](std::unique_ptr reader, - folly::StringPiece k) { + [&](RowReader reader, folly::StringPiece k) { sampler->sampling( - std::make_tuple(edgeType, k.str(), std::move(reader), - currEdgeSchema, props)); + std::make_tuple( + edgeType, k.str(), + std::make_unique(std::move(reader)), + currEdgeSchema, props)); }); if (ret != kvstore::ResultCode::SUCCEEDED) { return ret; diff --git a/src/storage/query/QueryStatsProcessor.cpp b/src/storage/query/QueryStatsProcessor.cpp index 2a7c4fc9c85..d127bc572ed 100644 --- a/src/storage/query/QueryStatsProcessor.cpp +++ b/src/storage/query/QueryStatsProcessor.cpp @@ -86,7 +86,7 @@ kvstore::ResultCode QueryStatsProcessor::processVertex(PartitionID partId, auto& props = ec.second; if (!props.empty()) { auto r = this->collectEdgeProps(partId, vId, edgeType, &fcontext, - [&, this](std::unique_ptr reader, + [&, this](RowReader reader, folly::StringPiece key) { this->collectProps( reader.get(), key, props, &fcontext,