Skip to content

Commit

Permalink
Remove extra field.
Browse files Browse the repository at this point in the history
  • Loading branch information
Shylock-Hg committed Dec 8, 2021
1 parent e319615 commit ca55560
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 34 deletions.
1 change: 0 additions & 1 deletion src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,6 @@ StorageClientBase<ClientType>::getHostPartsWithCursor(GraphSpaceID spaceId) cons

// TODO support cursor
cpp2::ScanCursor c;
c.set_has_next(false);
auto parts = status.value();
for (auto partId = 1; partId <= parts; partId++) {
auto leader = getLeader(spaceId, partId);
Expand Down
3 changes: 1 addition & 2 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -561,9 +561,8 @@ struct LookupAndTraverseRequest {
*/

struct ScanCursor {
3: bool has_next,
// next start key of scan, only valid when has_next is true
4: optional binary next_cursor,
1: optional binary next_cursor,
}

struct ScanVertexRequest {
Expand Down
4 changes: 1 addition & 3 deletions src/mock/MockCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ class MockCluster {

void startMeta(const std::string& rootPath, HostAddr addr = HostAddr("127.0.0.1", 0));

void startStorage(HostAddr addr,
const std::string& rootPath,
SchemaVer schemaVerCount = 1);
void startStorage(HostAddr addr, const std::string& rootPath, SchemaVer schemaVerCount = 1);

/**
* Init a meta client connect to current meta server.
Expand Down
6 changes: 0 additions & 6 deletions src/storage/exec/ScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,7 @@ class ScanVertexPropNode : public QueryNode<Cursor> {

cpp2::ScanCursor c;
if (iter->valid()) {
c.set_has_next(true);
c.set_next_cursor(iter->key().str());
} else {
c.set_has_next(false);
}
cursors_->emplace(partId, std::move(c));
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down Expand Up @@ -246,10 +243,7 @@ class ScanEdgePropNode : public QueryNode<Cursor> {

cpp2::ScanCursor c;
if (iter->valid()) {
c.set_has_next(true);
c.set_next_cursor(iter->key().str());
} else {
c.set_has_next(false);
}
cursors_->emplace(partId, std::move(c));
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down
16 changes: 9 additions & 7 deletions src/storage/query/ScanEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ void ScanEdgeProcessor::runInSingleThread(const cpp2::ScanEdgeRequest& req) {
auto partId = partEntry.first;
auto cursor = partEntry.second;

auto ret = plan.go(partId, cursor.get_has_next() ? *cursor.get_next_cursor() : "");
auto ret = plan.go(
partId, cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "");
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED &&
failedParts.find(partId) == failedParts.end()) {
failedParts.emplace(partId);
Expand All @@ -157,12 +158,13 @@ void ScanEdgeProcessor::runInMultipleThread(const cpp2::ScanEdgeRequest& req) {
size_t i = 0;
std::vector<folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>>> futures;
for (const auto& [partId, cursor] : req.get_parts()) {
futures.emplace_back(runInExecutor(&contexts_[i],
&results_[i],
&cursorsOfPart_[i],
partId,
cursor.get_has_next() ? *cursor.get_next_cursor() : "",
&expCtxs_[i]));
futures.emplace_back(
runInExecutor(&contexts_[i],
&results_[i],
&cursorsOfPart_[i],
partId,
cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "",
&expCtxs_[i]));
i++;
}

Expand Down
16 changes: 9 additions & 7 deletions src/storage/query/ScanVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ void ScanVertexProcessor::runInSingleThread(const cpp2::ScanVertexRequest& req)
auto partId = partEntry.first;
auto cursor = partEntry.second;

auto ret = plan.go(partId, cursor.get_has_next() ? *cursor.get_next_cursor() : "");
auto ret = plan.go(
partId, cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "");
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED &&
failedParts.find(partId) == failedParts.end()) {
failedParts.emplace(partId);
Expand All @@ -159,12 +160,13 @@ void ScanVertexProcessor::runInMultipleThread(const cpp2::ScanVertexRequest& req
size_t i = 0;
std::vector<folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>>> futures;
for (const auto& [partId, cursor] : req.get_parts()) {
futures.emplace_back(runInExecutor(&contexts_[i],
&results_[i],
&cursorsOfPart_[i],
partId,
cursor.get_has_next() ? *cursor.get_next_cursor() : "",
&expCtxs_[i]));
futures.emplace_back(
runInExecutor(&contexts_[i],
&results_[i],
&cursorsOfPart_[i],
partId,
cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "",
&expCtxs_[i]));
i++;
}

Expand Down
9 changes: 5 additions & 4 deletions src/storage/test/ScanEdgeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ cpp2::ScanEdgeRequest buildRequest(std::vector<PartitionID> partIds,
CHECK_EQ(partIds.size(), cursors.size());
std::unordered_map<PartitionID, cpp2::ScanCursor> parts;
for (std::size_t i = 0; i < partIds.size(); ++i) {
c.set_has_next(!cursors[i].empty());
c.set_next_cursor(cursors[i]);
if (!cursors[i].empty()) {
c.set_next_cursor(cursors[i]);
}
parts.emplace(partIds[i], c);
}
req.set_parts(std::move(parts));
Expand Down Expand Up @@ -163,7 +164,7 @@ TEST(ScanEdgeTest, CursorTest) {

ASSERT_EQ(0, resp.result.failed_parts.size());
checkResponse(*resp.edge_data_ref(), edge, edge.second.size(), totalRowCount);
hasNext = resp.get_cursors().at(partId).get_has_next();
hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value();
if (hasNext) {
CHECK(resp.get_cursors().at(partId).next_cursor_ref().has_value());
cursor = *resp.get_cursors().at(partId).next_cursor_ref();
Expand All @@ -190,7 +191,7 @@ TEST(ScanEdgeTest, CursorTest) {

ASSERT_EQ(0, resp.result.failed_parts.size());
checkResponse(*resp.edge_data_ref(), edge, edge.second.size(), totalRowCount);
hasNext = resp.get_cursors().at(partId).get_has_next();
hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value();
if (hasNext) {
CHECK(resp.get_cursors().at(partId).next_cursor_ref().has_value());
cursor = *resp.get_cursors().at(partId).next_cursor_ref();
Expand Down
9 changes: 5 additions & 4 deletions src/storage/test/ScanVertexTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ cpp2::ScanVertexRequest buildRequest(
CHECK_EQ(partIds.size(), cursors.size());
std::unordered_map<PartitionID, cpp2::ScanCursor> parts;
for (std::size_t i = 0; i < partIds.size(); ++i) {
c.set_has_next(!cursors[i].empty());
c.set_next_cursor(cursors[i]);
if (!cursors[i].empty()) {
c.set_next_cursor(cursors[i]);
}
parts.emplace(partIds[i], c);
}
req.set_parts(std::move(parts));
Expand Down Expand Up @@ -184,7 +185,7 @@ TEST(ScanVertexTest, CursorTest) {
ASSERT_EQ(0, resp.result.failed_parts.size());
checkResponse(
*resp.vertex_data_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount);
hasNext = resp.get_cursors().at(partId).get_has_next();
hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value();
if (hasNext) {
CHECK(resp.get_cursors().at(partId).next_cursor_ref());
cursor = *resp.get_cursors().at(partId).next_cursor_ref();
Expand All @@ -211,7 +212,7 @@ TEST(ScanVertexTest, CursorTest) {
ASSERT_EQ(0, resp.result.failed_parts.size());
checkResponse(
*resp.vertex_data_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount);
hasNext = resp.get_cursors().at(partId).get_has_next();
hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value();
if (hasNext) {
CHECK(resp.get_cursors().at(partId).next_cursor_ref());
cursor = *resp.get_cursors().at(partId).next_cursor_ref();
Expand Down

0 comments on commit ca55560

Please sign in to comment.