Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix VertexHolder::getDefaultProp performance issue. #2249

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 18 additions & 25 deletions src/dataman/RowReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,67 +113,60 @@ RowReader::Iterator::operator bool() const {
* class RowReader
*
********************************************/
// static
std::unique_ptr<RowReader> RowReader::getTagPropReader(
RowReader RowReader::getTagPropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
TagID tag) {
if (schemaMan == nullptr) {
LOG(ERROR) << "schemaMan should not be nullptr!";
return nullptr;
return RowReader();
}
int32_t ver = getSchemaVer(row);
if (ver >= 0) {
auto schema = schemaMan->getTagSchema(space, tag, ver);
if (schema == nullptr) {
return nullptr;
return RowReader();
}
return std::unique_ptr<RowReader>(new RowReader(
row,
schema));
return RowReader(row, schema);
} else {
LOG(WARNING) << "Invalid schema version in the row data!";
return nullptr;
return RowReader();
}
}


// static
std::unique_ptr<RowReader> RowReader::getEdgePropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
EdgeType edge) {
RowReader RowReader::getEdgePropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
EdgeType edge) {
if (schemaMan == nullptr) {
LOG(ERROR) << "schemaMan should not be nullptr!";
return nullptr;
return RowReader();
}
int32_t ver = getSchemaVer(row);
if (ver >= 0) {
auto schema = schemaMan->getEdgeSchema(space, edge, ver);
if (schema == nullptr) {
return nullptr;
return RowReader();
}
return std::unique_ptr<RowReader>(new RowReader(
row,
schema));
return RowReader(row, schema);
} else {
LOG(WARNING) << "Invalid schema version in the row data!";
return nullptr;
return RowReader();
}
}

// static
std::unique_ptr<RowReader> RowReader::getRowReader(
folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema) {
RowReader RowReader::getRowReader(
folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema) {
SchemaVer ver = getSchemaVer(row);
CHECK_EQ(ver, schema->getVersion());
return std::unique_ptr<RowReader>(new RowReader(row, std::move(schema)));
return RowReader(row, std::move(schema));
}


// static
int32_t RowReader::getSchemaVer(folly::StringPiece row) {
const uint8_t* it = reinterpret_cast<const uint8_t*>(row.begin());
Expand Down
60 changes: 54 additions & 6 deletions src/dataman/RowReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,31 @@ class RowReader {


public:
static std::unique_ptr<RowReader> getTagPropReader(
RowReader(const RowReader&) = delete;
RowReader& operator=(const RowReader&) = delete;
RowReader(RowReader&& x) = default;
RowReader& operator=(RowReader&& x) = default;

static RowReader getTagPropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
TagID tag);

static std::unique_ptr<RowReader> getEdgePropReader(
static RowReader getEdgePropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
EdgeType edge);

static std::unique_ptr<RowReader> getRowReader(
static RowReader getRowReader(
folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema);

static RowReader getEmptyRowReader() {
return RowReader();
}

static StatusOr<VariantType> getDefaultProp(const meta::SchemaProviderIf* schema,
const std::string& prop) {
auto& vType = schema->getFieldType(prop);
Expand Down Expand Up @@ -242,9 +251,6 @@ class RowReader {
return ResultType::E_DATA_INVALID;
}
}

virtual ~RowReader() = default;

SchemaVer schemaVer() const noexcept;
int32_t numFields() const noexcept;

Expand Down Expand Up @@ -286,6 +292,47 @@ class RowReader {
return data_;
}

operator bool() const noexcept {
return operator!=(nullptr);
}

bool operator==(std::nullptr_t) const noexcept {
return !data_.data();
}

bool operator==(const RowReader& x) const noexcept {
return data_ == x.data_;
}

bool operator!=(std::nullptr_t) const noexcept {
return static_cast<bool>(data_.data());
}

bool operator!=(const RowReader& x) const noexcept {
return data_ != x.data_;
}

RowReader* operator->() const noexcept {
return get();
}

RowReader* get() const noexcept {
return const_cast<RowReader*>(this);
}

RowReader* get() noexcept {
return this;
}

RowReader& operator*() const noexcept {
return *get();
}

void reset() noexcept {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we dont need reset any more. But never mind, leave it here.

this->~RowReader();
new(this) RowReader();
}

// TODO getPath(const std::string& name) const noexcept;
// TODO getPath(int64_t index) const noexcept;
// TODO getList(const std::string& name) const noexcept;
Expand All @@ -308,6 +355,7 @@ class RowReader {
mutable std::vector<int64_t> offsets_;

private:
RowReader() = default;
RowReader(folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema);

Expand Down
3 changes: 2 additions & 1 deletion src/dataman/RowSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef DATAMAN_ROWSETREADER_H_
#define DATAMAN_ROWSETREADER_H_

#include "RowReader.h"
#include "base/Base.h"
#include "gen-cpp2/storage_types.h"
#include "meta/SchemaProviderIf.h"
Expand All @@ -33,7 +34,7 @@ class RowSetReader {
// The total length of the encoded row set
const folly::StringPiece& data_;

std::unique_ptr<RowReader> reader_;
RowReader reader_ = RowReader::getEmptyRowReader();
// The offset of the current row
int64_t offset_;
// The length of the current row
Expand Down
4 changes: 2 additions & 2 deletions src/dataman/RowUpdater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace nebula {
using folly::hash::SpookyHashV2;
using nebula::meta::SchemaProviderIf;

RowUpdater::RowUpdater(std::unique_ptr<RowReader> reader,
RowUpdater::RowUpdater(RowReader reader,
std::shared_ptr<const meta::SchemaProviderIf> schema)
: schema_(std::move(schema))
, reader_(std::move(reader)) {
Expand All @@ -22,7 +22,7 @@ RowUpdater::RowUpdater(std::unique_ptr<RowReader> reader,

RowUpdater::RowUpdater(std::shared_ptr<const meta::SchemaProviderIf> schema)
: schema_(std::move(schema))
, reader_(nullptr) {
, reader_(RowReader::getEmptyRowReader()) {
CHECK(!!schema_);
}

Expand Down
4 changes: 2 additions & 2 deletions src/dataman/RowUpdater.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class RowUpdater {
// reader holds the original data
// schema is the writer schema, which means the updated data will be encoded
// using this schema
RowUpdater(std::unique_ptr<RowReader> reader,
RowUpdater(RowReader reader,
std::shared_ptr<const meta::SchemaProviderIf> schema);
explicit RowUpdater(std::shared_ptr<const meta::SchemaProviderIf> schema);

Expand Down Expand Up @@ -79,7 +79,7 @@ class RowUpdater {

private:
std::shared_ptr<const meta::SchemaProviderIf> schema_;
std::unique_ptr<RowReader> reader_;
RowReader reader_ = RowReader::getEmptyRowReader();
// Hash64(field_name) => value
std::unordered_map<uint64_t, FieldValue> updatedFields_;
};
Expand Down
4 changes: 2 additions & 2 deletions src/graph/FetchVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) {
finishExecution(std::move(rsWriter));
return;
}
std::unordered_map<VertexID, std::map<TagID, std::unique_ptr<RowReader>>> dataMap;
std::unordered_map<VertexID, std::map<TagID, RowReader>> dataMap;
dataMap.reserve(num);

std::unordered_map<TagID, std::shared_ptr<const meta::SchemaProviderIf>> tagSchemaMap;
Expand Down Expand Up @@ -407,7 +407,7 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) {
}
auto vschema = tagSchemaMap[tagId];
auto vreader = RowReader::getRowReader(data, vschema);
dataMap[vid][tagId] = std::move(vreader);
dataMap[vid].emplace(std::make_pair(tagId, std::move(vreader)));
tagIdSet.insert(tagId);
}
}
Expand Down
38 changes: 17 additions & 21 deletions src/graph/GoExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ void GoExecutor::fetchVertexProps(std::vector<VertexID> ids) {
auto returns = status.value();
auto future = ectx()->getStorageClient()->getVertexProps(spaceId, ids, returns);
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&result) mutable {
auto cb = [this, ectx = ectx()] (auto &&result) mutable {
auto completeness = result.completeness();
if (completeness == 0) {
doError(Status::Error("Get dest props failed"));
Expand All @@ -939,7 +939,7 @@ void GoExecutor::fetchVertexProps(std::vector<VertexID> ids) {
}
}
if (vertexHolder_ == nullptr) {
vertexHolder_ = std::make_unique<VertexHolder>();
vertexHolder_ = std::make_unique<VertexHolder>(ectx);
}
for (auto &resp : result.responses()) {
vertexHolder_->add(resp);
Expand Down Expand Up @@ -1222,7 +1222,7 @@ bool GoExecutor::processFinalResult(Callback cb) const {
return index_->getColumnWithRow(*inputRow, prop);
};

std::unique_ptr<RowReader> reader;
RowReader reader = RowReader::getEmptyRowReader();
if (currEdgeSchema) {
reader = RowReader::getRowReader(edge.props, currEdgeSchema);
}
Expand Down Expand Up @@ -1315,28 +1315,24 @@ bool GoExecutor::processFinalResult(Callback cb) const {
return true;
}

OptVariantType GoExecutor::VertexHolder::getDefaultProp(TagID tid, const std::string &prop) const {
for (auto it = data_.cbegin(); it != data_.cend(); ++it) {
auto it2 = it->second.find(tid);
if (it2 != it->second.cend()) {
return RowReader::getDefaultProp(std::get<0>(it2->second).get(), prop);
}
OptVariantType GoExecutor::VertexHolder::getDefaultProp(
Copy link
Contributor

@dangleptr dangleptr Jul 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we don't need "getDefaultProp" and "getDefaultPropType" inside VertexHolder,

You could use RowReader::getDefaultProp directly.

FYI. You could pass schemaManager into VertexHolder directly.

TagID tagId, const std::string &prop) const {
auto space = ectx_->rctx()->session()->space();
auto schema = ectx_->schemaManager()->getTagSchema(space, tagId);
if (schema == nullptr) {
return Status::Error("No tag schema for tagId %d", tagId);
}


return Status::Error("Unknown property: `%s'", prop.c_str());
return RowReader::getDefaultProp(schema.get(), prop);
}

SupportedType GoExecutor::VertexHolder::getDefaultPropType(TagID tid,
const std::string &prop) const {
for (auto it = data_.cbegin(); it != data_.cend(); ++it) {
auto it2 = it->second.find(tid);
if (it2 != it->second.cend()) {
return std::get<0>(it2->second)->getFieldType(prop).type;
}
SupportedType GoExecutor::VertexHolder::getDefaultPropType(
TagID tagId, const std::string &prop) const {
auto space = ectx_->rctx()->session()->space();
auto schema = ectx_->schemaManager()->getTagSchema(space, tagId);
if (schema == nullptr) {
return nebula::cpp2::SupportedType::UNKNOWN;
}

return nebula::cpp2::SupportedType::UNKNOWN;
return schema->getFieldType(prop).type;
}

OptVariantType GoExecutor::VertexHolder::get(VertexID id, TagID tid,
Expand Down
2 changes: 2 additions & 0 deletions src/graph/GoExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class GoExecutor final : public TraverseExecutor {
*/
class VertexHolder final {
public:
explicit VertexHolder(ExecutionContext* ectx) : ectx_(ectx) { }
OptVariantType getDefaultProp(TagID tid, const std::string &prop) const;
OptVariantType get(VertexID id, TagID tid, const std::string &prop) const;
void add(const storage::cpp2::QueryResponse &resp);
Expand All @@ -184,6 +185,7 @@ class GoExecutor final : public TraverseExecutor {
private:
using VData = std::tuple<std::shared_ptr<ResultSchemaProvider>, std::string>;
std::unordered_map<VertexID, std::unordered_map<TagID, VData>> data_;
ExecutionContext* ectx_{nullptr};
};

class VertexBackTracker final {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ std::string AddEdgesProcessor::addEdges(int64_t version, PartitionID partId,
});
for (auto& e : newEdges) {
std::string val;
std::unique_ptr<RowReader> nReader;
RowReader nReader = RowReader::getEmptyRowReader();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd better use pointer of RowReader this place. Because there are lots of check null for rowReader in the code.

Copy link
Collaborator Author

@xuguruogu xuguruogu Jul 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bool operator==(nullptr_t) const noexcept {
return !data_.data();
}
bool operator==(const RowReader& x) const noexcept {
return data_ == x.data_;
}
bool operator!=(nullptr_t) const noexcept {
return (bool)data_.data();
}
bool operator!=(const RowReader& x) const noexcept {
return data_ != x.data_;
}

Solve it with CPP magic. This can really help to improve performance, the total batch computing costs reduces from 13min to 5min, for about 2.6X improvement.

Access data set from the stack is much faster than from heap, for CPU hardware cache optimization.

auto edgeType = NebulaKeyUtils::getEdgeType(e.first);
for (auto& index : indexes_) {
if (edgeType == index->get_schema_id().get_edge_type()) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ std::string AddVerticesProcessor::addVertices(int64_t version, PartitionID partI

for (auto& v : newVertices) {
std::string val;
std::unique_ptr<RowReader> nReader;
RowReader nReader = RowReader::getEmptyRowReader();
auto tagId = NebulaKeyUtils::getTagId(v.first);
auto vId = NebulaKeyUtils::getVertexId(v.first);
for (auto& index : indexes_) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId,
* just get the latest version edge for index.
*/
if (isLatestVE) {
std::unique_ptr<RowReader> reader;
RowReader reader = RowReader::getEmptyRowReader();
for (auto& index : indexes_) {
auto indexId = index->get_index_id();
if (type == index->get_schema_id().get_edge_type()) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/DeleteVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId,
* Using newlyVertexId to identify if it is the latest version
*/
if (latestVVId != tagId) {
std::unique_ptr<RowReader> reader;
RowReader reader = RowReader::getEmptyRowReader();
for (auto& index : indexes_) {
auto indexId = index->get_index_id();
if (index->get_schema_id().get_tag_id() == tagId) {
Expand Down
3 changes: 2 additions & 1 deletion src/storage/mutate/UpdateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ folly::Optional<std::string> UpdateEdgeProcessor::updateAndWriteBack(PartitionID
auto nVal = std::move(status.value());
// TODO(heng) we don't update the index for reverse edge.
if (!indexes_.empty() && edgeKey.edge_type > 0) {
std::unique_ptr<RowReader> reader, rReader;
RowReader reader = RowReader::getEmptyRowReader();
RowReader rReader = RowReader::getEmptyRowReader();
for (auto& index : indexes_) {
auto indexId = index->get_index_id();
if (index->get_schema_id().get_edge_type() == edgeKey.edge_type) {
Expand Down
3 changes: 2 additions & 1 deletion src/storage/mutate/UpdateVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ folly::Optional<std::string> UpdateVertexProcessor::updateAndWriteBack(const Par
}
auto nVal = std::move(status.value());
if (!indexes_.empty()) {
std::unique_ptr<RowReader> reader, oReader;
RowReader reader = RowReader::getEmptyRowReader();
RowReader oReader = RowReader::getEmptyRowReader();
for (auto &index : indexes_) {
if (index->get_schema_id().get_tag_id() == u.first) {
if (!(u.second->kv.second.empty())) {
Expand Down
Loading