Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Tuning GetNeighbors perf #103

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion resources/gflags.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{
"MUTABLE": [
"max_edge_returned_per_vertex",
"minloglevel",
"v",
"heartbeat_interval_secs",
Expand Down
2 changes: 1 addition & 1 deletion src/codec/NebulaCodecImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ NebulaCodecImpl::decode(std::string encoded,

folly::StringPiece piece;
ResultType code;
auto reader = RowReader::getRowReader(encoded, schema);
auto reader = RowReaderWrapper::getRowReader(encoded, schema);
std::unordered_map<std::string, Value> result;
for (size_t index = 0; index < schema->getNumFields(); index++) {
auto field = schema->getFieldName(index);
Expand Down
173 changes: 0 additions & 173 deletions src/codec/RowReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,6 @@ Value RowReader::Cell::value() const noexcept {
* class RowReader::Iterator
*
********************************************/
RowReader::Iterator::Iterator(Iterator&& iter)
: reader_(iter.reader_)
, cell_(std::move(iter.cell_))
, index_(iter.index_) {
}


void RowReader::Iterator::operator=(Iterator&& rhs) {
reader_ = rhs.reader_;
cell_ = std::move(rhs.cell_);
index_ = rhs.index_;
}


bool RowReader::Iterator::operator==(const Iterator& rhs) const noexcept {
return reader_ == rhs.reader_ && index_ == rhs.index_;
Expand Down Expand Up @@ -68,166 +55,6 @@ RowReader::Iterator& RowReader::Iterator::operator++() {
*
********************************************/

// static
std::unique_ptr<RowReader> RowReader::getTagPropReader(
meta::SchemaManager* schemaMan,
GraphSpaceID space,
TagID tag,
folly::StringPiece row) {
auto reader = std::make_unique<RowReaderWrapper>();
if (reader->resetTagPropReader(schemaMan, space, tag, row)) {
return reader;
}
LOG(ERROR) << "Failed to initiate the reader, most likely the data"
"is corrupted. The data is ["
<< toHexStr(row)
<< "]";
return std::unique_ptr<RowReader>();
}


// static
std::unique_ptr<RowReader> RowReader::getEdgePropReader(
meta::SchemaManager* schemaMan,
GraphSpaceID space,
EdgeType edge,
folly::StringPiece row) {
auto reader = std::make_unique<RowReaderWrapper>();
if (reader->resetEdgePropReader(schemaMan, space, edge, row)) {
return reader;
}
LOG(ERROR) << "Failed to initiate the reader, most likely the data"
"is corrupted. The data is ["
<< toHexStr(row)
<< "]";
return std::unique_ptr<RowReader>();
}

// static
std::unique_ptr<RowReader> RowReader::getRowReader(
const meta::SchemaProviderIf* schema,
folly::StringPiece row) {
auto reader = std::make_unique<RowReaderWrapper>();
SchemaVer schemaVer;
int32_t readerVer;
RowReaderWrapper::getVersions(row, schemaVer, readerVer);
if (schemaVer != schema->getVersion()) {
return std::unique_ptr<RowReader>();
}
if (reader->reset(schema, row, readerVer)) {
return reader;
} else {
LOG(ERROR) << "Failed to initiate the reader, most likely the data"
"is corrupted. The data is ["
<< toHexStr(row)
<< "]";
return std::unique_ptr<RowReader>();
}
}

// static
std::unique_ptr<RowReader> RowReader::getRowReader(
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
folly::StringPiece row) {
auto reader = std::make_unique<RowReaderWrapper>();
SchemaVer schemaVer;
int32_t readerVer;
RowReaderWrapper::getVersions(row, schemaVer, readerVer);
if (static_cast<size_t>(schemaVer) >= schemas.size()) {
return std::unique_ptr<RowReader>();
}
// the schema is stored from oldest to newest, so just use version as idx
if (schemaVer != schemas[schemaVer]->getVersion()) {
return std::unique_ptr<RowReader>();
}
if (reader->reset(schemas[schemaVer].get(), row, readerVer)) {
return reader;
} else {
LOG(ERROR) << "Failed to initiate the reader, most likely the data"
"is corrupted. The data is ["
<< toHexStr(row)
<< "]";
return std::unique_ptr<RowReader>();
}
}

bool RowReader::resetTagPropReader(
meta::SchemaManager* schemaMan,
GraphSpaceID space,
TagID tag,
folly::StringPiece row) {
if (schemaMan == nullptr) {
LOG(ERROR) << "schemaMan should not be nullptr!";
return false;
}
SchemaVer schemaVer;
int32_t readerVer;
RowReaderWrapper::getVersions(row, schemaVer, readerVer);
if (schemaVer >= 0) {
auto schema = schemaMan->getTagSchema(space, tag, schemaVer);
if (schema == nullptr) {
return false;
}
return reset(schema.get(), row, readerVer);
} else {
LOG(WARNING) << "Invalid schema version in the row data!";
return false;
}
}

bool RowReader::resetEdgePropReader(
meta::SchemaManager* schemaMan,
GraphSpaceID space,
EdgeType edge,
folly::StringPiece row) {
if (schemaMan == nullptr) {
LOG(ERROR) << "schemaMan should not be nullptr!";
return false;
}
SchemaVer schemaVer;
int32_t readerVer;
RowReaderWrapper::getVersions(row, schemaVer, readerVer);
if (schemaVer >= 0) {
auto schema = schemaMan->getEdgeSchema(space, edge, schemaVer);
if (schema == nullptr) {
return false;
}
return reset(schema.get(), row, readerVer);
} else {
LOG(WARNING) << "Invalid schema version in the row data!";
return false;
}
}

bool RowReader::reset(meta::SchemaProviderIf const* schema,
folly::StringPiece row) noexcept {
if (schema == nullptr) {
return false;
}
SchemaVer schemaVer;
int32_t readerVer;
RowReaderWrapper::getVersions(row, schemaVer, readerVer);
if (schemaVer != schema->getVersion()) {
return false;
}
return reset(schema, row, readerVer);
}

bool RowReader::reset(const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
folly::StringPiece row) noexcept {
SchemaVer schemaVer;
int32_t readerVer;
RowReaderWrapper::getVersions(row, schemaVer, readerVer);
if (static_cast<size_t>(schemaVer) >= schemas.size()) {
return false;
}
// the schema is stored from oldest to newest, so just use version as idx
if (schemaVer != schemas[schemaVer]->getVersion()) {
return false;
}
return reset(schemas[schemaVer].get(), row, readerVer);
}

bool RowReader::resetImpl(meta::SchemaProviderIf const* schema,
folly::StringPiece row) noexcept {
schema_ = schema;
Expand Down
50 changes: 0 additions & 50 deletions src/codec/RowReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ class RowReader {
friend class Cell;
friend class RowReader;
public:
Iterator(Iterator&& iter);

void operator=(Iterator&& rhs);

const Cell& operator*() const noexcept;
const Cell* operator->() const noexcept;

Expand Down Expand Up @@ -70,48 +66,6 @@ class RowReader {


public:
static std::unique_ptr<RowReader> getTagPropReader(
meta::SchemaManager* schemaMan,
GraphSpaceID space,
TagID tag,
folly::StringPiece row);

static std::unique_ptr<RowReader> getEdgePropReader(
meta::SchemaManager* schemaMan,
GraphSpaceID space,
EdgeType edge,
folly::StringPiece row);

static std::unique_ptr<RowReader> getRowReader(
meta::SchemaProviderIf const* schema,
folly::StringPiece row);

// notice: the schemas are from oldest to newest,
// usually from getAllVerTagSchema or getAllVerEdgeSchema in SchemaMan
static std::unique_ptr<RowReader> getRowReader(
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
folly::StringPiece row);

bool resetTagPropReader(
meta::SchemaManager* schemaMan,
GraphSpaceID space,
TagID tag,
folly::StringPiece row);

bool resetEdgePropReader(
meta::SchemaManager* schemaMan,
GraphSpaceID space,
EdgeType edge,
folly::StringPiece row);

bool reset(meta::SchemaProviderIf const* schema,
folly::StringPiece row) noexcept;

// notice: the schemas are from oldest to newest,
// usually from getAllVerTagSchema or getAllVerEdgeSchema in SchemaMan
bool reset(const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
folly::StringPiece row) noexcept;

virtual ~RowReader() = default;

virtual Value getValueByName(const std::string& prop) const noexcept = 0;
Expand Down Expand Up @@ -156,10 +110,6 @@ class RowReader {
virtual bool resetImpl(meta::SchemaProviderIf const* schema,
folly::StringPiece row) noexcept;

virtual bool reset(meta::SchemaProviderIf const* schema,
folly::StringPiece row,
int32_t readerVer) noexcept = 0;

private:
Iterator endIter_;
};
Expand Down
1 change: 0 additions & 1 deletion src/codec/RowReaderV1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ bool RowReaderV1::resetImpl(meta::SchemaProviderIf const* schema,
}
}


bool RowReaderV1::processHeader(folly::StringPiece row) {
const uint8_t* it = reinterpret_cast<const uint8_t*>(row.begin());
if (reinterpret_cast<const char*>(it) == row.end()) {
Expand Down
4 changes: 0 additions & 4 deletions src/codec/RowReaderV1.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ class RowReaderV1 : public RowReader {
return headerLen_;
}

bool reset(meta::SchemaProviderIf const*, folly::StringPiece, int32_t) noexcept override {
LOG(FATAL) << "Not implemented";
}

protected:
bool resetImpl(meta::SchemaProviderIf const* schema, folly::StringPiece row)
noexcept override;
Expand Down
4 changes: 0 additions & 4 deletions src/codec/RowReaderV2.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ class RowReaderV2 : public RowReader {
return headerLen_;
}

bool reset(meta::SchemaProviderIf const*, folly::StringPiece, int32_t) noexcept override {
LOG(FATAL) << "Not implemented";
}

protected:
bool resetImpl(meta::SchemaProviderIf const* schema, folly::StringPiece row)
noexcept override;
Expand Down
Loading