Skip to content

Commit

Permalink
rewrite row reader to avoid malloc
Browse files Browse the repository at this point in the history
  • Loading branch information
xuguruogu committed Jul 29, 2020
1 parent 2c64389 commit bf264be
Show file tree
Hide file tree
Showing 17 changed files with 98 additions and 56 deletions.
43 changes: 18 additions & 25 deletions src/dataman/RowReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,67 +113,60 @@ RowReader::Iterator::operator bool() const {
* class RowReader
*
********************************************/
// static
std::unique_ptr<RowReader> RowReader::getTagPropReader(
RowReader RowReader::getTagPropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
TagID tag) {
if (schemaMan == nullptr) {
LOG(ERROR) << "schemaMan should not be nullptr!";
return nullptr;
return RowReader();
}
int32_t ver = getSchemaVer(row);
if (ver >= 0) {
auto schema = schemaMan->getTagSchema(space, tag, ver);
if (schema == nullptr) {
return nullptr;
return RowReader();
}
return std::unique_ptr<RowReader>(new RowReader(
row,
schema));
return RowReader(row, schema);
} else {
LOG(WARNING) << "Invalid schema version in the row data!";
return nullptr;
return RowReader();
}
}


// static
std::unique_ptr<RowReader> RowReader::getEdgePropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
EdgeType edge) {
RowReader RowReader::getEdgePropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
EdgeType edge) {
if (schemaMan == nullptr) {
LOG(ERROR) << "schemaMan should not be nullptr!";
return nullptr;
return RowReader();
}
int32_t ver = getSchemaVer(row);
if (ver >= 0) {
auto schema = schemaMan->getEdgeSchema(space, edge, ver);
if (schema == nullptr) {
return nullptr;
return RowReader();
}
return std::unique_ptr<RowReader>(new RowReader(
row,
schema));
return RowReader(row, schema);
} else {
LOG(WARNING) << "Invalid schema version in the row data!";
return nullptr;
return RowReader();
}
}

// static
std::unique_ptr<RowReader> RowReader::getRowReader(
folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema) {
RowReader RowReader::getRowReader(
folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema) {
SchemaVer ver = getSchemaVer(row);
CHECK_EQ(ver, schema->getVersion());
return std::unique_ptr<RowReader>(new RowReader(row, std::move(schema)));
return RowReader(row, std::move(schema));
}


// static
int32_t RowReader::getSchemaVer(folly::StringPiece row) {
const uint8_t* it = reinterpret_cast<const uint8_t*>(row.begin());
Expand Down
60 changes: 54 additions & 6 deletions src/dataman/RowReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,31 @@ class RowReader {


public:
static std::unique_ptr<RowReader> getTagPropReader(
RowReader(const RowReader&) = delete;
RowReader& operator=(const RowReader&) = delete;
RowReader(RowReader&& x) = default;
RowReader& operator=(RowReader&& x) = default;

static RowReader getTagPropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
TagID tag);

static std::unique_ptr<RowReader> getEdgePropReader(
static RowReader getEdgePropReader(
meta::SchemaManager* schemaMan,
folly::StringPiece row,
GraphSpaceID space,
EdgeType edge);

static std::unique_ptr<RowReader> getRowReader(
static RowReader getRowReader(
folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema);

static RowReader getEmptyRowReader() {
return RowReader();
}

static StatusOr<VariantType> getDefaultProp(const meta::SchemaProviderIf* schema,
const std::string& prop) {
auto& vType = schema->getFieldType(prop);
Expand Down Expand Up @@ -242,9 +251,6 @@ class RowReader {
return ResultType::E_DATA_INVALID;
}
}

virtual ~RowReader() = default;

SchemaVer schemaVer() const noexcept;
int32_t numFields() const noexcept;

Expand Down Expand Up @@ -286,6 +292,47 @@ class RowReader {
return data_;
}

operator bool() const noexcept {
return operator!=(nullptr);
}

bool operator==(nullptr_t) const noexcept {
return !data_.data();
}

bool operator==(const RowReader& x) const noexcept {
return data_ == x.data_;
}

bool operator!=(nullptr_t) const noexcept {
return (bool)data_.data();
}

bool operator!=(const RowReader& x) const noexcept {
return data_ != x.data_;
}

RowReader* operator->() const noexcept {
return get();
}

RowReader* get() const noexcept {
return const_cast<RowReader*>(this);
}

RowReader* get() noexcept {
return this;
}

RowReader& operator*() const noexcept {
return *get();
}

void reset() noexcept {
this->~RowReader();
new(this) RowReader();
}

// TODO getPath(const std::string& name) const noexcept;
// TODO getPath(int64_t index) const noexcept;
// TODO getList(const std::string& name) const noexcept;
Expand All @@ -308,6 +355,7 @@ class RowReader {
mutable std::vector<int64_t> offsets_;

private:
RowReader() = default;
RowReader(folly::StringPiece row,
std::shared_ptr<const meta::SchemaProviderIf> schema);

Expand Down
3 changes: 2 additions & 1 deletion src/dataman/RowSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef DATAMAN_ROWSETREADER_H_
#define DATAMAN_ROWSETREADER_H_

#include "RowReader.h"
#include "base/Base.h"
#include "gen-cpp2/storage_types.h"
#include "meta/SchemaProviderIf.h"
Expand All @@ -33,7 +34,7 @@ class RowSetReader {
// The total length of the encoded row set
const folly::StringPiece& data_;

std::unique_ptr<RowReader> reader_;
RowReader reader_ = RowReader::getEmptyRowReader();
// The offset of the current row
int64_t offset_;
// The length of the current row
Expand Down
4 changes: 2 additions & 2 deletions src/dataman/RowUpdater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace nebula {
using folly::hash::SpookyHashV2;
using nebula::meta::SchemaProviderIf;

RowUpdater::RowUpdater(std::unique_ptr<RowReader> reader,
RowUpdater::RowUpdater(RowReader reader,
std::shared_ptr<const meta::SchemaProviderIf> schema)
: schema_(std::move(schema))
, reader_(std::move(reader)) {
Expand All @@ -22,7 +22,7 @@ RowUpdater::RowUpdater(std::unique_ptr<RowReader> reader,

RowUpdater::RowUpdater(std::shared_ptr<const meta::SchemaProviderIf> schema)
: schema_(std::move(schema))
, reader_(nullptr) {
, reader_(RowReader::getEmptyRowReader()) {
CHECK(!!schema_);
}

Expand Down
4 changes: 2 additions & 2 deletions src/dataman/RowUpdater.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class RowUpdater {
// reader holds the original data
// schema is the writer schema, which means the updated data will be encoded
// using this schema
RowUpdater(std::unique_ptr<RowReader> reader,
RowUpdater(RowReader reader,
std::shared_ptr<const meta::SchemaProviderIf> schema);
explicit RowUpdater(std::shared_ptr<const meta::SchemaProviderIf> schema);

Expand Down Expand Up @@ -79,7 +79,7 @@ class RowUpdater {

private:
std::shared_ptr<const meta::SchemaProviderIf> schema_;
std::unique_ptr<RowReader> reader_;
RowReader reader_ = RowReader::getEmptyRowReader();
// Hash64(field_name) => value
std::unordered_map<uint64_t, FieldValue> updatedFields_;
};
Expand Down
4 changes: 2 additions & 2 deletions src/graph/FetchVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) {
finishExecution(std::move(rsWriter));
return;
}
std::unordered_map<VertexID, std::map<TagID, std::unique_ptr<RowReader>>> dataMap;
std::unordered_map<VertexID, std::map<TagID, RowReader>> dataMap;
dataMap.reserve(num);

std::unordered_map<TagID, std::shared_ptr<const meta::SchemaProviderIf>> tagSchemaMap;
Expand Down Expand Up @@ -407,7 +407,7 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) {
}
auto vschema = tagSchemaMap[tagId];
auto vreader = RowReader::getRowReader(data, vschema);
dataMap[vid][tagId] = std::move(vreader);
dataMap[vid].emplace(std::make_pair(tagId, std::move(vreader)));
tagIdSet.insert(tagId);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/GoExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ bool GoExecutor::processFinalResult(Callback cb) const {
return index_->getColumnWithRow(*inputRow, prop);
};

std::unique_ptr<RowReader> reader;
RowReader reader = RowReader::getEmptyRowReader();
if (currEdgeSchema) {
reader = RowReader::getRowReader(edge.props, currEdgeSchema);
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ std::string AddEdgesProcessor::addEdges(int64_t version, PartitionID partId,
});
for (auto& e : newEdges) {
std::string val;
std::unique_ptr<RowReader> nReader;
RowReader nReader = RowReader::getEmptyRowReader();
auto edgeType = NebulaKeyUtils::getEdgeType(e.first);
for (auto& index : indexes_) {
if (edgeType == index->get_schema_id().get_edge_type()) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ std::string AddVerticesProcessor::addVertices(int64_t version, PartitionID partI

for (auto& v : newVertices) {
std::string val;
std::unique_ptr<RowReader> nReader;
RowReader nReader = RowReader::getEmptyRowReader();
auto tagId = NebulaKeyUtils::getTagId(v.first);
auto vId = NebulaKeyUtils::getVertexId(v.first);
for (auto& index : indexes_) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId,
* just get the latest version edge for index.
*/
if (isLatestVE) {
std::unique_ptr<RowReader> reader;
RowReader reader = RowReader::getEmptyRowReader();
for (auto& index : indexes_) {
auto indexId = index->get_index_id();
if (type == index->get_schema_id().get_edge_type()) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/DeleteVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId,
* Using newlyVertexId to identify if it is the latest version
*/
if (latestVVId != tagId) {
std::unique_ptr<RowReader> reader;
RowReader reader = RowReader::getEmptyRowReader();
for (auto& index : indexes_) {
auto indexId = index->get_index_id();
if (index->get_schema_id().get_tag_id() == tagId) {
Expand Down
3 changes: 2 additions & 1 deletion src/storage/mutate/UpdateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ folly::Optional<std::string> UpdateEdgeProcessor::updateAndWriteBack(PartitionID
auto nVal = std::move(status.value());
// TODO(heng) we don't update the index for reverse edge.
if (!indexes_.empty() && edgeKey.edge_type > 0) {
std::unique_ptr<RowReader> reader, rReader;
RowReader reader = RowReader::getEmptyRowReader();
RowReader rReader = RowReader::getEmptyRowReader();
for (auto& index : indexes_) {
auto indexId = index->get_index_id();
if (index->get_schema_id().get_edge_type() == edgeKey.edge_type) {
Expand Down
3 changes: 2 additions & 1 deletion src/storage/mutate/UpdateVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ folly::Optional<std::string> UpdateVertexProcessor::updateAndWriteBack(const Par
}
auto nVal = std::move(status.value());
if (!indexes_.empty()) {
std::unique_ptr<RowReader> reader, oReader;
RowReader reader = RowReader::getEmptyRowReader();
RowReader oReader = RowReader::getEmptyRowReader();
for (auto &index : indexes_) {
if (index->get_schema_id().get_tag_id() == u.first) {
if (!(u.second->kv.second.empty())) {
Expand Down
4 changes: 1 addition & 3 deletions src/storage/query/QueryBaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ const std::unordered_map<std::string, PropContext::PropInKeyType> kPropsInKey_ =
{"_rank", PropContext::PropInKeyType::RANK}
};

using EdgeProcessor
= std::function<void(std::unique_ptr<RowReader> reader,
folly::StringPiece key)>;
using EdgeProcessor = std::function<void(RowReader reader, folly::StringPiece key)>;
struct Bucket {
std::vector<std::pair<PartitionID, VertexID>> vertices_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/query/QueryBaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ kvstore::ResultCode QueryBaseProcessor<REQ, RESP>::collectEdgeProps(
}
lastRank = rank;
lastDstId = dstId;
std::unique_ptr<RowReader> reader;
RowReader reader = RowReader::getEmptyRowReader();
if ((!onlyStructure || retTTL.has_value()) && !val.empty()) {
reader = RowReader::getEdgePropReader(this->schemaMan_,
val,
Expand Down
12 changes: 6 additions & 6 deletions src/storage/query/QueryBoundProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ kvstore::ResultCode QueryBoundProcessor::processEdgeImpl(const PartitionID partI
edges.reserve(FLAGS_reserved_edges_one_vertex);
auto ret = collectEdgeProps(
partId, vId, edgeType, &fcontext,
[&, this](std::unique_ptr<RowReader> reader,
folly::StringPiece k) {
[&, this](RowReader reader, folly::StringPiece k) {
cpp2::IdAndProp edge;
if (!onlyStructure) {
RowWriter writer(currEdgeSchema);
Expand Down Expand Up @@ -112,11 +111,12 @@ kvstore::ResultCode QueryBoundProcessor::processEdgeSampling(const PartitionID p
CHECK(!onlyVertexProps_);
auto ret = collectEdgeProps(
partId, vId, edgeType, &fcontext,
[&](std::unique_ptr<RowReader> reader,
folly::StringPiece k) {
[&](RowReader reader, folly::StringPiece k) {
sampler->sampling(
std::make_tuple(edgeType, k.str(), std::move(reader),
currEdgeSchema, props));
std::make_tuple(
edgeType, k.str(),
std::make_unique<RowReader>(std::move(reader)),
currEdgeSchema, props));
});
if (ret != kvstore::ResultCode::SUCCEEDED) {
return ret;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/query/QueryStatsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ kvstore::ResultCode QueryStatsProcessor::processVertex(PartitionID partId,
auto& props = ec.second;
if (!props.empty()) {
auto r = this->collectEdgeProps(partId, vId, edgeType, &fcontext,
[&, this](std::unique_ptr<RowReader> reader,
[&, this](RowReader reader,
folly::StringPiece key) {
this->collectProps(
reader.get(), key, props, &fcontext,
Expand Down

0 comments on commit bf264be

Please sign in to comment.