Skip to content

Commit

Permalink
Merge branch 'master' into feature/bison-datetime-parser
Browse files Browse the repository at this point in the history
  • Loading branch information
Shylock-Hg authored Dec 10, 2021
2 parents 828cc3e + 9a0fbb6 commit 7a39eff
Show file tree
Hide file tree
Showing 79 changed files with 2,017 additions and 191 deletions.
2 changes: 1 addition & 1 deletion README-CN.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<p align="center">
<img src="https://nebula-graph.io/img/nav-nebula-logo.png"/>
<img src="https://nebula-website-cn.oss-cn-hangzhou.aliyuncs.com/nebula-website/images/nebulagraph-logo.png"/>
<br>中文 | <a href="README.md">English</a>
<br>世界上唯一能够容纳千亿个顶点和万亿条边,并提供毫秒级查询延时的图数据库解决方案<br>
</p>
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<p align="center">
<img src="https://nebula-graph.io/img/nav-nebula-logo.png"/>
<img src="https://nebula-website-cn.oss-cn-hangzhou.aliyuncs.com/nebula-website/images/nebulagraph-logo.png"/>
<br> English | <a href="README-CN.md">中文</a>
<br>A distributed, scalable, lightning-fast graph database<br>
</p>
Expand Down
10 changes: 5 additions & 5 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,15 +558,15 @@ StorageRpcRespFuture<cpp2::GetNeighborsResponse> StorageClient::lookupAndTravers
});
}

StorageRpcRespFuture<cpp2::ScanEdgeResponse> StorageClient::scanEdge(
StorageRpcRespFuture<cpp2::ScanResponse> StorageClient::scanEdge(
const CommonRequestParam& param,
const cpp2::EdgeProp& edgeProp,
const std::vector<cpp2::EdgeProp>& edgeProp,
int64_t limit,
const Expression* filter) {
std::unordered_map<HostAddr, cpp2::ScanEdgeRequest> requests;
auto status = getHostPartsWithCursor(param.space);
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ScanEdgeResponse>>(
return folly::makeFuture<StorageRpcResponse<cpp2::ScanResponse>>(
std::runtime_error(status.status().toString()));
}
auto& clusters = status.value();
Expand All @@ -589,15 +589,15 @@ StorageRpcRespFuture<cpp2::ScanEdgeResponse> StorageClient::scanEdge(
const cpp2::ScanEdgeRequest& r) { return client->future_scanEdge(r); });
}

StorageRpcRespFuture<cpp2::ScanVertexResponse> StorageClient::scanVertex(
StorageRpcRespFuture<cpp2::ScanResponse> StorageClient::scanVertex(
const CommonRequestParam& param,
const std::vector<cpp2::VertexProp>& vertexProp,
int64_t limit,
const Expression* filter) {
std::unordered_map<HostAddr, cpp2::ScanVertexRequest> requests;
auto status = getHostPartsWithCursor(param.space);
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ScanVertexResponse>>(
return folly::makeFuture<StorageRpcResponse<cpp2::ScanResponse>>(
std::runtime_error(status.status().toString()));
}
auto& clusters = status.value();
Expand Down
10 changes: 5 additions & 5 deletions src/clients/storage/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ class StorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsyncCli
StorageRpcRespFuture<cpp2::GetNeighborsResponse> lookupAndTraverse(
const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec);

StorageRpcRespFuture<cpp2::ScanEdgeResponse> scanEdge(const CommonRequestParam& param,
const cpp2::EdgeProp& vertexProp,
int64_t limit,
const Expression* filter);
StorageRpcRespFuture<cpp2::ScanResponse> scanEdge(const CommonRequestParam& param,
const std::vector<cpp2::EdgeProp>& vertexProp,
int64_t limit,
const Expression* filter);

StorageRpcRespFuture<cpp2::ScanVertexResponse> scanVertex(
StorageRpcRespFuture<cpp2::ScanResponse> scanVertex(
const CommonRequestParam& param,
const std::vector<cpp2::VertexProp>& vertexProp,
int64_t limit,
Expand Down
18 changes: 9 additions & 9 deletions src/common/graph/ExecutionResponseOps-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct TccStructTraits<::nebula::ExecutionResponse> {
_ftype = apache::thrift::protocol::T_I32;
} else if (_fname == "latency_in_us") {
fid = 2;
_ftype = apache::thrift::protocol::T_I32;
_ftype = apache::thrift::protocol::T_I64;
} else if (_fname == "data") {
fid = 3;
_ftype = apache::thrift::protocol::T_STRUCT;
Expand Down Expand Up @@ -75,9 +75,9 @@ uint32_t Cpp2Ops<::nebula::ExecutionResponse>::write(Protocol* proto,
::nebula::ErrorCode>::write(*proto,
obj->errorCode);
xfer += proto->writeFieldEnd();
xfer += proto->writeFieldBegin("latency_in_us", apache::thrift::protocol::T_I32, 2);
xfer += proto->writeFieldBegin("latency_in_us", apache::thrift::protocol::T_I64, 2);
xfer += ::apache::thrift::detail::pm::protocol_methods<::apache::thrift::type_class::integral,
int32_t>::write(*proto, obj->latencyInUs);
int64_t>::write(*proto, obj->latencyInUs);
xfer += proto->writeFieldEnd();
if (obj->data != nullptr) {
xfer += proto->writeFieldBegin("data", apache::thrift::protocol::T_STRUCT, 3);
Expand Down Expand Up @@ -134,7 +134,7 @@ _readField_error_code : {
}
_readField_latency_in_us : {
::apache::thrift::detail::pm::protocol_methods<::apache::thrift::type_class::integral,
int32_t>::read(*proto, obj->latencyInUs);
int64_t>::read(*proto, obj->latencyInUs);
isset_latency_in_us = true;
}

Expand Down Expand Up @@ -216,7 +216,7 @@ _readField_comment : {
}
}
case 2: {
if (LIKELY(_readState.fieldType == apache::thrift::protocol::T_I32)) {
if (LIKELY(_readState.fieldType == apache::thrift::protocol::T_I64)) {
goto _readField_latency_in_us;
} else {
goto _skip;
Expand Down Expand Up @@ -276,9 +276,9 @@ uint32_t Cpp2Ops<::nebula::ExecutionResponse>::serializedSize(
xfer += ::apache::thrift::detail::pm::protocol_methods<
::apache::thrift::type_class::enumeration,
::nebula::ErrorCode>::serializedSize<false>(*proto, obj->errorCode);
xfer += proto->serializedFieldSize("latency_in_us", apache::thrift::protocol::T_I32, 2);
xfer += proto->serializedFieldSize("latency_in_us", apache::thrift::protocol::T_I64, 2);
xfer += ::apache::thrift::detail::pm::
protocol_methods<::apache::thrift::type_class::integral, int32_t>::serializedSize<false>(
protocol_methods<::apache::thrift::type_class::integral, int64_t>::serializedSize<false>(
*proto, obj->latencyInUs);
if (obj->data != nullptr) {
xfer += proto->serializedFieldSize("data", apache::thrift::protocol::T_STRUCT, 3);
Expand Down Expand Up @@ -314,9 +314,9 @@ uint32_t Cpp2Ops<::nebula::ExecutionResponse>::serializedSizeZC(
xfer += ::apache::thrift::detail::pm::protocol_methods<
::apache::thrift::type_class::enumeration,
::nebula::ErrorCode>::serializedSize<false>(*proto, obj->errorCode);
xfer += proto->serializedFieldSize("latency_in_us", apache::thrift::protocol::T_I32, 2);
xfer += proto->serializedFieldSize("latency_in_us", apache::thrift::protocol::T_I64, 2);
xfer += ::apache::thrift::detail::pm::
protocol_methods<::apache::thrift::type_class::integral, int32_t>::serializedSize<false>(
protocol_methods<::apache::thrift::type_class::integral, int64_t>::serializedSize<false>(
*proto, obj->latencyInUs);
if (obj->data != nullptr) {
xfer += proto->serializedFieldSize("data", apache::thrift::protocol::T_STRUCT, 3);
Expand Down
2 changes: 1 addition & 1 deletion src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ struct ExecutionResponse {
}

ErrorCode errorCode{ErrorCode::SUCCEEDED};
int32_t latencyInUs{0};
int64_t latencyInUs{0};
std::unique_ptr<nebula::DataSet> data{nullptr};
std::unique_ptr<std::string> spaceName{nullptr};
std::unique_ptr<std::string> errorMsg{nullptr};
Expand Down
13 changes: 10 additions & 3 deletions src/common/memory/MemoryUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,25 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
}
double available = 0.0, total = 0.0;
if (FLAGS_containerized) {
FileUtils::FileLineIterator iter("/sys/fs/cgroup/memory/memory.stat", &reTotalCache);
bool cgroupsv2 = FileUtils::exist("/sys/fs/cgroup/cgroup.controllers");
std::string statPath =
cgroupsv2 ? "/sys/fs/cgroup/memory.stat" : "/sys/fs/cgroup/memory/memory.stat";
FileUtils::FileLineIterator iter(statPath, &reTotalCache);
uint64_t cacheSize = 0;
for (; iter.valid(); ++iter) {
auto& sm = iter.matched();
cacheSize += std::stoul(sm[2].str(), NULL);
}

auto limitStatus = MemoryUtils::readSysContents("/sys/fs/cgroup/memory/memory.limit_in_bytes");
std::string limitPath =
cgroupsv2 ? "/sys/fs/cgroup/memory.max" : "/sys/fs/cgroup/memory/memory.limit_in_bytes";
auto limitStatus = MemoryUtils::readSysContents(limitPath);
NG_RETURN_IF_ERROR(limitStatus);
uint64_t limitInBytes = std::move(limitStatus).value();

auto usageStatus = MemoryUtils::readSysContents("/sys/fs/cgroup/memory/memory.usage_in_bytes");
std::string usagePath =
cgroupsv2 ? "/sys/fs/cgroup/memory.current" : "/sys/fs/cgroup/memory/memory.usage_in_bytes";
auto usageStatus = MemoryUtils::readSysContents(usagePath);
NG_RETURN_IF_ERROR(usageStatus);
uint64_t usageInBytes = std::move(usageStatus).value();

Expand Down
12 changes: 12 additions & 0 deletions src/common/meta/SchemaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,17 @@ StatusOr<std::pair<bool, int32_t>> SchemaManager::getSchemaIDByName(GraphSpaceID
return Status::Error("Schema not exist: %s", schemaName.str().c_str());
}

StatusOr<std::unordered_map<TagID, std::string>> SchemaManager::getAllTags(GraphSpaceID space) {
std::unordered_map<TagID, std::string> tags;
auto tagSchemas = getAllLatestVerTagSchema(space);
NG_RETURN_IF_ERROR(tagSchemas);
for (auto& tagSchema : tagSchemas.value()) {
auto tagName = toTagName(space, tagSchema.first);
NG_RETURN_IF_ERROR(tagName);
tags.emplace(tagSchema.first, tagName.value());
}
return tags;
}

} // namespace meta
} // namespace nebula
2 changes: 2 additions & 0 deletions src/common/meta/SchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class SchemaManager {

virtual StatusOr<std::vector<std::string>> getAllEdge(GraphSpaceID space) = 0;

StatusOr<std::unordered_map<TagID, std::string>> getAllTags(GraphSpaceID space);

// get all version of all tag schema
virtual StatusOr<TagSchemas> getAllVerTagSchema(GraphSpaceID space) = 0;

Expand Down
37 changes: 32 additions & 5 deletions src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1174,18 +1174,43 @@ GraphSpaceID MetaKeyUtils::parseLocalIdSpace(folly::StringPiece rawData) {
return *reinterpret_cast<const GraphSpaceID*>(rawData.data() + offset);
}

GraphSpaceID MetaKeyUtils::parseDiskPartsSpace(folly::StringPiece rawData) {
/**
* diskPartsKey = kDiskPartsTable + len(serialized(hostAddr)) + serialized(hostAddr) + path
*/

HostAddr MetaKeyUtils::parseDiskPartsHost(const folly::StringPiece& rawData) {
auto offset = kDiskPartsTable.size();
return *reinterpret_cast<const GraphSpaceID*>(rawData.data() + offset);
auto hostAddrLen = *reinterpret_cast<const size_t*>(rawData.begin() + offset);
offset += sizeof(size_t);
std::string hostAddrStr(rawData.data() + offset, hostAddrLen);
return deserializeHostAddr(hostAddrStr);
}

GraphSpaceID MetaKeyUtils::parseDiskPartsSpace(const folly::StringPiece& rawData) {
auto offset = kDiskPartsTable.size();
size_t hostAddrLen = *reinterpret_cast<const size_t*>(rawData.begin() + offset);
offset += sizeof(size_t) + hostAddrLen;
return *reinterpret_cast<const GraphSpaceID*>(rawData.begin() + offset);
}

std::string MetaKeyUtils::parseDiskPartsPath(const folly::StringPiece& rawData) {
auto offset = kDiskPartsTable.size();
size_t hostAddrLen = *reinterpret_cast<const size_t*>(rawData.begin() + offset);
offset += sizeof(size_t) + hostAddrLen + sizeof(GraphSpaceID);
std::string path(rawData.begin() + offset, rawData.size() - offset);
return path;
}

std::string MetaKeyUtils::diskPartsPrefix() { return kDiskPartsTable; }

std::string MetaKeyUtils::diskPartsPrefix(HostAddr addr) {
std::string key;
std::string hostStr = serializeHostAddr(addr);
key.reserve(kDiskPartsTable.size() + hostStr.size());
key.append(kDiskPartsTable.data(), kDiskPartsTable.size()).append(hostStr.data(), hostStr.size());
size_t hostAddrLen = hostStr.size();
key.reserve(kDiskPartsTable.size() + sizeof(size_t) + hostStr.size());
key.append(kDiskPartsTable.data(), kDiskPartsTable.size())
.append(reinterpret_cast<const char*>(&hostAddrLen), sizeof(size_t))
.append(hostStr.data(), hostStr.size());
return key;
}

Expand All @@ -1198,7 +1223,9 @@ std::string MetaKeyUtils::diskPartsPrefix(HostAddr addr, GraphSpaceID spaceId) {
return key;
}

std::string MetaKeyUtils::diskPartsKey(HostAddr addr, GraphSpaceID spaceId, std::string path) {
std::string MetaKeyUtils::diskPartsKey(HostAddr addr,
GraphSpaceID spaceId,
const std::string& path) {
std::string key;
std::string prefix = diskPartsPrefix(addr, spaceId);
key.reserve(prefix.size() + path.size());
Expand Down
8 changes: 6 additions & 2 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,15 +382,19 @@ class MetaKeyUtils final {

static std::unordered_map<std::string, std::pair<std::string, bool>> getSystemTableMaps();

static GraphSpaceID parseDiskPartsSpace(folly::StringPiece rawData);
static GraphSpaceID parseDiskPartsSpace(const folly::StringPiece& rawData);

static HostAddr parseDiskPartsHost(const folly::StringPiece& rawData);

static std::string parseDiskPartsPath(const folly::StringPiece& rawData);

static std::string diskPartsPrefix();

static std::string diskPartsPrefix(HostAddr addr);

static std::string diskPartsPrefix(HostAddr addr, GraphSpaceID spaceId);

static std::string diskPartsKey(HostAddr addr, GraphSpaceID spaceId, std::string path);
static std::string diskPartsKey(HostAddr addr, GraphSpaceID spaceId, const std::string& path);

static std::string diskPartsVal(const meta::cpp2::PartitionList& partList);

Expand Down
15 changes: 14 additions & 1 deletion src/common/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,20 @@ std::string NebulaKeyUtils::edgeKey(size_t vIdLen,
.append(1, ev);
return key;
}

// static
std::string NebulaKeyUtils::vertexKey(size_t vIdLen,
PartitionID partId,
const VertexID& vId,
char pad) {
CHECK_GE(vIdLen, vId.size());
int32_t item = (partId << kPartitionOffset) | static_cast<uint32_t>(NebulaKeyType::kVertex);
std::string key;
key.reserve(kTagLen + vIdLen);
key.append(reinterpret_cast<const char*>(&item), sizeof(int32_t))
.append(vId.data(), vId.size())
.append(vIdLen - vId.size(), pad);
return key;
}
// static
std::string NebulaKeyUtils::systemCommitKey(PartitionID partId) {
int32_t item = (partId << kPartitionOffset) | static_cast<uint32_t>(NebulaKeyType::kSystem);
Expand Down
9 changes: 6 additions & 3 deletions src/common/utils/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class NebulaKeyUtils final {
static std::string lastKey(const std::string& prefix, size_t count);

/**
* Generate vertex key for kv store
* Generate tag key for kv store
* */
static std::string tagKey(
size_t vIdLen, PartitionID partId, const VertexID& vId, TagID tagId, char pad = '\0');
Expand All @@ -65,15 +65,18 @@ class NebulaKeyUtils final {
EdgeRanking rank,
const VertexID& dstId,
EdgeVerPlaceHolder ev = 1);

static std::string vertexKey(size_t vIdLen,
PartitionID partId,
const VertexID& vId,
char pad = '\0');
static std::string systemCommitKey(PartitionID partId);

static std::string systemPartKey(PartitionID partId);

static std::string kvKey(PartitionID partId, const folly::StringPiece& name);

/**
* Prefix for vertex
* Prefix for tag
* */
static std::string tagPrefix(size_t vIdLen, PartitionID partId, const VertexID& vId, TagID tagId);

Expand Down
6 changes: 3 additions & 3 deletions src/common/utils/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ enum class NebulaKeyType : uint32_t {
kSystem = 0x00000004,
kOperation = 0x00000005,
kKeyValue = 0x00000006,
// kVertex = 0x00000007,
kVertex = 0x00000007,
};

enum class NebulaSystemKeyType : uint32_t {
Expand All @@ -41,10 +41,10 @@ static typename std::enable_if<std::is_integral<T>::value, T>::type readInt(cons
return *reinterpret_cast<const T*>(data);
}

// size of vertex key except vertexId
// size of tag key except vertexId
static constexpr int32_t kTagLen = sizeof(PartitionID) + sizeof(TagID);

// size of vertex key except srcId and dstId
// size of tag key except srcId and dstId
static constexpr int32_t kEdgeLen =
sizeof(PartitionID) + sizeof(EdgeType) + sizeof(EdgeRanking) + sizeof(EdgeVerPlaceHolder);

Expand Down
11 changes: 11 additions & 0 deletions src/common/utils/test/MetaKeyUtilsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,17 @@ TEST(MetaKeyUtilsTest, ZoneTest) {
ASSERT_EQ(nodes, MetaKeyUtils::parseZoneHosts(zoneValue));
}

TEST(MetaKeyUtilsTest, DiskPathsTest) {
HostAddr addr{"192.168.0.1", 1234};
GraphSpaceID spaceId = 1;
std::string path = "/data/storage/test_part1";

auto diskPartsKey = MetaKeyUtils::diskPartsKey(addr, spaceId, path);
ASSERT_EQ(addr, MetaKeyUtils::parseDiskPartsHost(diskPartsKey));
ASSERT_EQ(spaceId, MetaKeyUtils::parseDiskPartsSpace(diskPartsKey));
ASSERT_EQ(path, MetaKeyUtils::parseDiskPartsPath(diskPartsKey));
}

} // namespace nebula

int main(int argc, char** argv) {
Expand Down
2 changes: 2 additions & 0 deletions src/graph/context/ast/CypherAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ struct ScanInfo {
std::vector<IndexID> indexIds;
// use for seek by edge only
MatchEdge::Direction direction{MatchEdge::Direction::OUT_EDGE};
// use for scan seek
bool anyLabel{false};
};

struct CypherClauseContextBase : AstContext {
Expand Down
2 changes: 2 additions & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ nebula_add_library(
query/InnerJoinExecutor.cpp
query/IndexScanExecutor.cpp
query/AssignExecutor.cpp
query/ScanVerticesExecutor.cpp
query/ScanEdgesExecutor.cpp
query/TraverseExecutor.cpp
query/AppendVerticesExecutor.cpp
algo/ConjunctPathExecutor.cpp
Expand Down
Loading

0 comments on commit 7a39eff

Please sign in to comment.