Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove extra field. #3427

Merged
merged 10 commits into from
Dec 15, 2021
1 change: 0 additions & 1 deletion src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,6 @@ StorageClientBase<ClientType>::getHostPartsWithCursor(GraphSpaceID spaceId) cons

// TODO support cursor
cpp2::ScanCursor c;
c.set_has_next(false);
auto parts = status.value();
for (auto partId = 1; partId <= parts; partId++) {
auto leader = getLeader(spaceId, partId);
Expand Down
3 changes: 1 addition & 2 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -561,9 +561,8 @@ struct LookupAndTraverseRequest {
*/

struct ScanCursor {
3: bool has_next,
// next start key of scan, only valid when has_next is true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how client knows the scan has get to end?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When 'next_cursor' is null.

4: optional binary next_cursor,
1: optional binary next_cursor,
}

struct ScanVertexRequest {
Expand Down
6 changes: 0 additions & 6 deletions src/storage/exec/ScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,7 @@ class ScanVertexPropNode : public QueryNode<Cursor> {

cpp2::ScanCursor c;
if (iter->valid()) {
c.set_has_next(true);
c.set_next_cursor(iter->key().str());
} else {
c.set_has_next(false);
}
cursors_->emplace(partId, std::move(c));
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down Expand Up @@ -246,10 +243,7 @@ class ScanEdgePropNode : public QueryNode<Cursor> {

cpp2::ScanCursor c;
if (iter->valid()) {
c.set_has_next(true);
c.set_next_cursor(iter->key().str());
} else {
c.set_has_next(false);
}
cursors_->emplace(partId, std::move(c));
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down
16 changes: 9 additions & 7 deletions src/storage/query/ScanEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ void ScanEdgeProcessor::runInSingleThread(const cpp2::ScanEdgeRequest& req) {
auto partId = partEntry.first;
auto cursor = partEntry.second;

auto ret = plan.go(partId, cursor.get_has_next() ? *cursor.get_next_cursor() : "");
auto ret = plan.go(
partId, cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "");
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED &&
failedParts.find(partId) == failedParts.end()) {
failedParts.emplace(partId);
Expand All @@ -157,12 +158,13 @@ void ScanEdgeProcessor::runInMultipleThread(const cpp2::ScanEdgeRequest& req) {
size_t i = 0;
std::vector<folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>>> futures;
for (const auto& [partId, cursor] : req.get_parts()) {
futures.emplace_back(runInExecutor(&contexts_[i],
&results_[i],
&cursorsOfPart_[i],
partId,
cursor.get_has_next() ? *cursor.get_next_cursor() : "",
&expCtxs_[i]));
futures.emplace_back(
runInExecutor(&contexts_[i],
&results_[i],
&cursorsOfPart_[i],
partId,
cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "",
&expCtxs_[i]));
i++;
}

Expand Down
16 changes: 9 additions & 7 deletions src/storage/query/ScanVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ void ScanVertexProcessor::runInSingleThread(const cpp2::ScanVertexRequest& req)
auto partId = partEntry.first;
auto cursor = partEntry.second;

auto ret = plan.go(partId, cursor.get_has_next() ? *cursor.get_next_cursor() : "");
auto ret = plan.go(
partId, cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "");
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED &&
failedParts.find(partId) == failedParts.end()) {
failedParts.emplace(partId);
Expand All @@ -159,12 +160,13 @@ void ScanVertexProcessor::runInMultipleThread(const cpp2::ScanVertexRequest& req
size_t i = 0;
std::vector<folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>>> futures;
for (const auto& [partId, cursor] : req.get_parts()) {
futures.emplace_back(runInExecutor(&contexts_[i],
&results_[i],
&cursorsOfPart_[i],
partId,
cursor.get_has_next() ? *cursor.get_next_cursor() : "",
&expCtxs_[i]));
futures.emplace_back(
runInExecutor(&contexts_[i],
&results_[i],
&cursorsOfPart_[i],
partId,
cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "",
&expCtxs_[i]));
i++;
}

Expand Down
9 changes: 5 additions & 4 deletions src/storage/test/ScanEdgeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ cpp2::ScanEdgeRequest buildRequest(std::vector<PartitionID> partIds,
CHECK_EQ(partIds.size(), cursors.size());
std::unordered_map<PartitionID, cpp2::ScanCursor> parts;
for (std::size_t i = 0; i < partIds.size(); ++i) {
c.set_has_next(!cursors[i].empty());
c.set_next_cursor(cursors[i]);
if (!cursors[i].empty()) {
c.set_next_cursor(cursors[i]);
}
parts.emplace(partIds[i], c);
}
req.set_parts(std::move(parts));
Expand Down Expand Up @@ -163,7 +164,7 @@ TEST(ScanEdgeTest, CursorTest) {

ASSERT_EQ(0, resp.result.failed_parts.size());
checkResponse(*resp.edge_data_ref(), edge, edge.second.size(), totalRowCount);
hasNext = resp.get_cursors().at(partId).get_has_next();
hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value();
if (hasNext) {
CHECK(resp.get_cursors().at(partId).next_cursor_ref().has_value());
cursor = *resp.get_cursors().at(partId).next_cursor_ref();
Expand All @@ -190,7 +191,7 @@ TEST(ScanEdgeTest, CursorTest) {

ASSERT_EQ(0, resp.result.failed_parts.size());
checkResponse(*resp.edge_data_ref(), edge, edge.second.size(), totalRowCount);
hasNext = resp.get_cursors().at(partId).get_has_next();
hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value();
if (hasNext) {
CHECK(resp.get_cursors().at(partId).next_cursor_ref().has_value());
cursor = *resp.get_cursors().at(partId).next_cursor_ref();
Expand Down
9 changes: 5 additions & 4 deletions src/storage/test/ScanVertexTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ cpp2::ScanVertexRequest buildRequest(
CHECK_EQ(partIds.size(), cursors.size());
std::unordered_map<PartitionID, cpp2::ScanCursor> parts;
for (std::size_t i = 0; i < partIds.size(); ++i) {
c.set_has_next(!cursors[i].empty());
c.set_next_cursor(cursors[i]);
if (!cursors[i].empty()) {
c.set_next_cursor(cursors[i]);
}
parts.emplace(partIds[i], c);
}
req.set_parts(std::move(parts));
Expand Down Expand Up @@ -184,7 +185,7 @@ TEST(ScanVertexTest, CursorTest) {
ASSERT_EQ(0, resp.result.failed_parts.size());
checkResponse(
*resp.vertex_data_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount);
hasNext = resp.get_cursors().at(partId).get_has_next();
hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value();
if (hasNext) {
CHECK(resp.get_cursors().at(partId).next_cursor_ref());
cursor = *resp.get_cursors().at(partId).next_cursor_ref();
Expand All @@ -211,7 +212,7 @@ TEST(ScanVertexTest, CursorTest) {
ASSERT_EQ(0, resp.result.failed_parts.size());
checkResponse(
*resp.vertex_data_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount);
hasNext = resp.get_cursors().at(partId).get_has_next();
hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value();
if (hasNext) {
CHECK(resp.get_cursors().at(partId).next_cursor_ref());
cursor = *resp.get_cursors().at(partId).next_cursor_ref();
Expand Down