Skip to content

Commit

Permalink
fix VertexHolder::getDefaultProp performance issue. (vesoft-inc#2249)
Browse files Browse the repository at this point in the history
* fix VertexHolder::getDefaultProp performance issue.

* rewrite row reader to avoid malloc

Co-authored-by: trippli <trippli@tencent.com>
Co-authored-by: dangleptr <37216992+dangleptr@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 30, 2020
1 parent 311d494 commit 95abc21
Show file tree
Hide file tree
Showing 18 changed files with 116 additions and 76 deletions.
43 changes: 18 additions & 25 deletions src/dataman/RowReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,67 +113,60 @@ RowReader::Iterator::operator bool() const {
* class RowReader
*
********************************************/
// static
std::unique_ptr<RowReader> RowReader::getTagPropReader(
RowReader RowReader::getTagPropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
TagID tag) {
if (schemaMan == nullptr) {
LOG(ERROR) << "schemaMan should not be nullptr!";
return nullptr;
return RowReader();
}
int32_t ver = getSchemaVer(row);
if (ver >= 0) {
auto schema = schemaMan->getTagSchema(space, tag, ver);
if (schema == nullptr) {
return nullptr;
return RowReader();
}
return std::unique_ptr<RowReader>(new RowReader(
row,
schema));
return RowReader(row, schema);
} else {
LOG(WARNING) << "Invalid schema version in the row data!";
return nullptr;
return RowReader();
}
}


// static
std::unique_ptr<RowReader> RowReader::getEdgePropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
EdgeType edge) {
RowReader RowReader::getEdgePropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
EdgeType edge) {
if (schemaMan == nullptr) {
LOG(ERROR) << "schemaMan should not be nullptr!";
return nullptr;
return RowReader();
}
int32_t ver = getSchemaVer(row);
if (ver >= 0) {
auto schema = schemaMan->getEdgeSchema(space, edge, ver);
if (schema == nullptr) {
return nullptr;
return RowReader();
}
return std::unique_ptr<RowReader>(new RowReader(
row,
schema));
return RowReader(row, schema);
} else {
LOG(WARNING) << "Invalid schema version in the row data!";
return nullptr;
return RowReader();
}
}

// static
std::unique_ptr<RowReader> RowReader::getRowReader(
folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema) {
RowReader RowReader::getRowReader(
folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema) {
SchemaVer ver = getSchemaVer(row);
CHECK_EQ(ver, schema->getVersion());
return std::unique_ptr<RowReader>(new RowReader(row, std::move(schema)));
return RowReader(row, std::move(schema));
}


// static
int32_t RowReader::getSchemaVer(folly::StringPiece row) {
const uint8_t* it = reinterpret_cast<const uint8_t*>(row.begin());
Expand Down
60 changes: 54 additions & 6 deletions src/dataman/RowReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,31 @@ class RowReader {


public:
static std::unique_ptr<RowReader> getTagPropReader(
RowReader(const RowReader&) = delete;
RowReader& operator=(const RowReader&) = delete;
RowReader(RowReader&& x) = default;
RowReader& operator=(RowReader&& x) = default;

static RowReader getTagPropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
TagID tag);

static std::unique_ptr<RowReader> getEdgePropReader(
static RowReader getEdgePropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
EdgeType edge);

static std::unique_ptr<RowReader> getRowReader(
static RowReader getRowReader(
folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema);

static RowReader getEmptyRowReader() {
return RowReader();
}

static StatusOr<VariantType> getDefaultProp(const meta::SchemaProviderIf* schema,
const std::string& prop) {
auto& vType = schema->getFieldType(prop);
Expand Down Expand Up @@ -242,9 +251,6 @@ class RowReader {
return ResultType::E_DATA_INVALID;
}
}

virtual ~RowReader() = default;

SchemaVer schemaVer() const noexcept;
int32_t numFields() const noexcept;

Expand Down Expand Up @@ -286,6 +292,47 @@ class RowReader {
return data_;
}

operator bool() const noexcept {
return operator!=(nullptr);
}

bool operator==(std::nullptr_t) const noexcept {
return !data_.data();
}

bool operator==(const RowReader& x) const noexcept {
return data_ == x.data_;
}

bool operator!=(std::nullptr_t) const noexcept {
return static_cast<bool>(data_.data());
}

bool operator!=(const RowReader& x) const noexcept {
return data_ != x.data_;
}

RowReader* operator->() const noexcept {
return get();
}

RowReader* get() const noexcept {
return const_cast<RowReader*>(this);
}

RowReader* get() noexcept {
return this;
}

RowReader& operator*() const noexcept {
return *get();
}

void reset() noexcept {
this->~RowReader();
new(this) RowReader();
}

// TODO getPath(const std::string& name) const noexcept;
// TODO getPath(int64_t index) const noexcept;
// TODO getList(const std::string& name) const noexcept;
Expand All @@ -308,6 +355,7 @@ class RowReader {
mutable std::vector<int64_t> offsets_;

private:
RowReader() = default;
RowReader(folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema);

Expand Down
3 changes: 2 additions & 1 deletion src/dataman/RowSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef DATAMAN_ROWSETREADER_H_
#define DATAMAN_ROWSETREADER_H_

#include "RowReader.h"
#include "base/Base.h"
#include "gen-cpp2/storage_types.h"
#include "meta/SchemaProviderIf.h"
Expand All @@ -33,7 +34,7 @@ class RowSetReader {
// The total length of the encoded row set
const folly::StringPiece& data_;

std::unique_ptr<RowReader> reader_;
RowReader reader_ = RowReader::getEmptyRowReader();
// The offset of the current row
int64_t offset_;
// The length of the current row
Expand Down
4 changes: 2 additions & 2 deletions src/dataman/RowUpdater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace nebula {
using folly::hash::SpookyHashV2;
using nebula::meta::SchemaProviderIf;

RowUpdater::RowUpdater(std::unique_ptr<RowReader> reader,
RowUpdater::RowUpdater(RowReader reader,
std::shared_ptr<const meta::SchemaProviderIf> schema)
: schema_(std::move(schema))
, reader_(std::move(reader)) {
Expand All @@ -22,7 +22,7 @@ RowUpdater::RowUpdater(std::unique_ptr<RowReader> reader,

RowUpdater::RowUpdater(std::shared_ptr<const meta::SchemaProviderIf> schema)
: schema_(std::move(schema))
, reader_(nullptr) {
, reader_(RowReader::getEmptyRowReader()) {
CHECK(!!schema_);
}

Expand Down
4 changes: 2 additions & 2 deletions src/dataman/RowUpdater.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class RowUpdater {
// reader holds the original data
// schema is the writer schema, which means the updated data will be encoded
// using this schema
RowUpdater(std::unique_ptr<RowReader> reader,
RowUpdater(RowReader reader,
std::shared_ptr<const meta::SchemaProviderIf> schema);
explicit RowUpdater(std::shared_ptr<const meta::SchemaProviderIf> schema);

Expand Down Expand Up @@ -79,7 +79,7 @@ class RowUpdater {

private:
std::shared_ptr<const meta::SchemaProviderIf> schema_;
std::unique_ptr<RowReader> reader_;
RowReader reader_ = RowReader::getEmptyRowReader();
// Hash64(field_name) => value
std::unordered_map<uint64_t, FieldValue> updatedFields_;
};
Expand Down
4 changes: 2 additions & 2 deletions src/graph/FetchVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) {
finishExecution(std::move(rsWriter));
return;
}
std::unordered_map<VertexID, std::map<TagID, std::unique_ptr<RowReader>>> dataMap;
std::unordered_map<VertexID, std::map<TagID, RowReader>> dataMap;
dataMap.reserve(num);

std::unordered_map<TagID, std::shared_ptr<const meta::SchemaProviderIf>> tagSchemaMap;
Expand Down Expand Up @@ -407,7 +407,7 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) {
}
auto vschema = tagSchemaMap[tagId];
auto vreader = RowReader::getRowReader(data, vschema);
dataMap[vid][tagId] = std::move(vreader);
dataMap[vid].emplace(std::make_pair(tagId, std::move(vreader)));
tagIdSet.insert(tagId);
}
}
Expand Down
38 changes: 17 additions & 21 deletions src/graph/GoExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ void GoExecutor::fetchVertexProps(std::vector<VertexID> ids) {
auto returns = status.value();
auto future = ectx()->getStorageClient()->getVertexProps(spaceId, ids, returns);
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&result) mutable {
auto cb = [this, ectx = ectx()] (auto &&result) mutable {
auto completeness = result.completeness();
if (completeness == 0) {
doError(Status::Error("Get dest props failed"));
Expand All @@ -939,7 +939,7 @@ void GoExecutor::fetchVertexProps(std::vector<VertexID> ids) {
}
}
if (vertexHolder_ == nullptr) {
vertexHolder_ = std::make_unique<VertexHolder>();
vertexHolder_ = std::make_unique<VertexHolder>(ectx);
}
for (auto &resp : result.responses()) {
vertexHolder_->add(resp);
Expand Down Expand Up @@ -1222,7 +1222,7 @@ bool GoExecutor::processFinalResult(Callback cb) const {
return index_->getColumnWithRow(*inputRow, prop);
};

std::unique_ptr<RowReader> reader;
RowReader reader = RowReader::getEmptyRowReader();
if (currEdgeSchema) {
reader = RowReader::getRowReader(edge.props, currEdgeSchema);
}
Expand Down Expand Up @@ -1315,28 +1315,24 @@ bool GoExecutor::processFinalResult(Callback cb) const {
return true;
}

OptVariantType GoExecutor::VertexHolder::getDefaultProp(TagID tid, const std::string &prop) const {
for (auto it = data_.cbegin(); it != data_.cend(); ++it) {
auto it2 = it->second.find(tid);
if (it2 != it->second.cend()) {
return RowReader::getDefaultProp(std::get<0>(it2->second).get(), prop);
}
OptVariantType GoExecutor::VertexHolder::getDefaultProp(
TagID tagId, const std::string &prop) const {
auto space = ectx_->rctx()->session()->space();
auto schema = ectx_->schemaManager()->getTagSchema(space, tagId);
if (schema == nullptr) {
return Status::Error("No tag schema for tagId %d", tagId);
}


return Status::Error("Unknown property: `%s'", prop.c_str());
return RowReader::getDefaultProp(schema.get(), prop);
}

SupportedType GoExecutor::VertexHolder::getDefaultPropType(TagID tid,
const std::string &prop) const {
for (auto it = data_.cbegin(); it != data_.cend(); ++it) {
auto it2 = it->second.find(tid);
if (it2 != it->second.cend()) {
return std::get<0>(it2->second)->getFieldType(prop).type;
}
SupportedType GoExecutor::VertexHolder::getDefaultPropType(
TagID tagId, const std::string &prop) const {
auto space = ectx_->rctx()->session()->space();
auto schema = ectx_->schemaManager()->getTagSchema(space, tagId);
if (schema == nullptr) {
return nebula::cpp2::SupportedType::UNKNOWN;
}

return nebula::cpp2::SupportedType::UNKNOWN;
return schema->getFieldType(prop).type;
}

OptVariantType GoExecutor::VertexHolder::get(VertexID id, TagID tid,
Expand Down
2 changes: 2 additions & 0 deletions src/graph/GoExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class GoExecutor final : public TraverseExecutor {
*/
class VertexHolder final {
public:
explicit VertexHolder(ExecutionContext* ectx) : ectx_(ectx) { }
OptVariantType getDefaultProp(TagID tid, const std::string &prop) const;
OptVariantType get(VertexID id, TagID tid, const std::string &prop) const;
void add(const storage::cpp2::QueryResponse &resp);
Expand All @@ -184,6 +185,7 @@ class GoExecutor final : public TraverseExecutor {
private:
using VData = std::tuple<std::shared_ptr<ResultSchemaProvider>, std::string>;
std::unordered_map<VertexID, std::unordered_map<TagID, VData>> data_;
ExecutionContext* ectx_{nullptr};
};

class VertexBackTracker final {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ std::string AddEdgesProcessor::addEdges(int64_t version, PartitionID partId,
});
for (auto& e : newEdges) {
std::string val;
std::unique_ptr<RowReader> nReader;
RowReader nReader = RowReader::getEmptyRowReader();
auto edgeType = NebulaKeyUtils::getEdgeType(e.first);
for (auto& index : indexes_) {
if (edgeType == index->get_schema_id().get_edge_type()) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ std::string AddVerticesProcessor::addVertices(int64_t version, PartitionID partI

for (auto& v : newVertices) {
std::string val;
std::unique_ptr<RowReader> nReader;
RowReader nReader = RowReader::getEmptyRowReader();
auto tagId = NebulaKeyUtils::getTagId(v.first);
auto vId = NebulaKeyUtils::getVertexId(v.first);
for (auto& index : indexes_) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId,
* just get the latest version edge for index.
*/
if (isLatestVE) {
std::unique_ptr<RowReader> reader;
RowReader reader = RowReader::getEmptyRowReader();
for (auto& index : indexes_) {
auto indexId = index->get_index_id();
if (type == index->get_schema_id().get_edge_type()) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/DeleteVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId,
* Using newlyVertexId to identify if it is the latest version
*/
if (latestVVId != tagId) {
std::unique_ptr<RowReader> reader;
RowReader reader = RowReader::getEmptyRowReader();
for (auto& index : indexes_) {
auto indexId = index->get_index_id();
if (index->get_schema_id().get_tag_id() == tagId) {
Expand Down
3 changes: 2 additions & 1 deletion src/storage/mutate/UpdateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ folly::Optional<std::string> UpdateEdgeProcessor::updateAndWriteBack(PartitionID
auto nVal = std::move(status.value());
// TODO(heng) we don't update the index for reverse edge.
if (!indexes_.empty() && edgeKey.edge_type > 0) {
std::unique_ptr<RowReader> reader, rReader;
RowReader reader = RowReader::getEmptyRowReader();
RowReader rReader = RowReader::getEmptyRowReader();
for (auto& index : indexes_) {
auto indexId = index->get_index_id();
if (index->get_schema_id().get_edge_type() == edgeKey.edge_type) {
Expand Down
3 changes: 2 additions & 1 deletion src/storage/mutate/UpdateVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ folly::Optional<std::string> UpdateVertexProcessor::updateAndWriteBack(const Par
}
auto nVal = std::move(status.value());
if (!indexes_.empty()) {
std::unique_ptr<RowReader> reader, oReader;
RowReader reader = RowReader::getEmptyRowReader();
RowReader oReader = RowReader::getEmptyRowReader();
for (auto &index : indexes_) {
if (index->get_schema_id().get_tag_id() == u.first) {
if (!(u.second->kv.second.empty())) {
Expand Down
Loading

0 comments on commit 95abc21

Please sign in to comment.