diff --git a/src/clients/storage/StorageClientBase-inl.h b/src/clients/storage/StorageClientBase-inl.h index 79435d2252a..dd319fcc9f6 100644 --- a/src/clients/storage/StorageClientBase-inl.h +++ b/src/clients/storage/StorageClientBase-inl.h @@ -357,7 +357,6 @@ StorageClientBase::getHostPartsWithCursor(GraphSpaceID spaceId) cons // TODO support cursor cpp2::ScanCursor c; - c.set_has_next(false); auto parts = status.value(); for (auto partId = 1; partId <= parts; partId++) { auto leader = getLeader(spaceId, partId); diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 07c56e2b2fa..405114afc12 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -561,9 +561,8 @@ struct LookupAndTraverseRequest { */ struct ScanCursor { - 3: bool has_next, // next start key of scan, only valid when has_next is true - 4: optional binary next_cursor, + 1: optional binary next_cursor, } struct ScanVertexRequest { diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h index 22425dcb90b..6602752b5c2 100644 --- a/src/storage/exec/ScanNode.h +++ b/src/storage/exec/ScanNode.h @@ -92,10 +92,7 @@ class ScanVertexPropNode : public QueryNode { cpp2::ScanCursor c; if (iter->valid()) { - c.set_has_next(true); c.set_next_cursor(iter->key().str()); - } else { - c.set_has_next(false); } cursors_->emplace(partId, std::move(c)); return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -246,10 +243,7 @@ class ScanEdgePropNode : public QueryNode { cpp2::ScanCursor c; if (iter->valid()) { - c.set_has_next(true); c.set_next_cursor(iter->key().str()); - } else { - c.set_has_next(false); } cursors_->emplace(partId, std::move(c)); return nebula::cpp2::ErrorCode::SUCCEEDED; diff --git a/src/storage/query/ScanEdgeProcessor.cpp b/src/storage/query/ScanEdgeProcessor.cpp index 29b9d3cce33..7a8e99884ad 100644 --- a/src/storage/query/ScanEdgeProcessor.cpp +++ b/src/storage/query/ScanEdgeProcessor.cpp @@ -136,7 +136,8 @@ void ScanEdgeProcessor::runInSingleThread(const cpp2::ScanEdgeRequest& req) { auto partId = partEntry.first; auto cursor = partEntry.second; - auto ret = plan.go(partId, cursor.get_has_next() ? *cursor.get_next_cursor() : ""); + auto ret = plan.go( + partId, cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : ""); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED && failedParts.find(partId) == failedParts.end()) { failedParts.emplace(partId); @@ -158,12 +159,13 @@ void ScanEdgeProcessor::runInMultipleThread(const cpp2::ScanEdgeRequest& req) { size_t i = 0; std::vector>> futures; for (const auto& [partId, cursor] : req.get_parts()) { - futures.emplace_back(runInExecutor(&contexts_[i], - &results_[i], - &cursorsOfPart_[i], - partId, - cursor.get_has_next() ? *cursor.get_next_cursor() : "", - &expCtxs_[i])); + futures.emplace_back( + runInExecutor(&contexts_[i], + &results_[i], + &cursorsOfPart_[i], + partId, + cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "", + &expCtxs_[i])); i++; } diff --git a/src/storage/query/ScanVertexProcessor.cpp b/src/storage/query/ScanVertexProcessor.cpp index c8624a0d490..6c210f3c0dd 100644 --- a/src/storage/query/ScanVertexProcessor.cpp +++ b/src/storage/query/ScanVertexProcessor.cpp @@ -140,7 +140,8 @@ void ScanVertexProcessor::runInSingleThread(const cpp2::ScanVertexRequest& req) auto partId = partEntry.first; auto cursor = partEntry.second; - auto ret = plan.go(partId, cursor.get_has_next() ? *cursor.get_next_cursor() : ""); + auto ret = plan.go( + partId, cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : ""); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED && failedParts.find(partId) == failedParts.end()) { failedParts.emplace(partId); @@ -162,12 +163,13 @@ void ScanVertexProcessor::runInMultipleThread(const cpp2::ScanVertexRequest& req size_t i = 0; std::vector>> futures; for (const auto& [partId, cursor] : req.get_parts()) { - futures.emplace_back(runInExecutor(&contexts_[i], - &results_[i], - &cursorsOfPart_[i], - partId, - cursor.get_has_next() ? *cursor.get_next_cursor() : "", - &expCtxs_[i])); + futures.emplace_back( + runInExecutor(&contexts_[i], + &results_[i], + &cursorsOfPart_[i], + partId, + cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "", + &expCtxs_[i])); i++; } diff --git a/src/storage/test/ScanEdgeTest.cpp b/src/storage/test/ScanEdgeTest.cpp index 320361726e9..3b6b30cd6d0 100644 --- a/src/storage/test/ScanEdgeTest.cpp +++ b/src/storage/test/ScanEdgeTest.cpp @@ -28,8 +28,9 @@ cpp2::ScanEdgeRequest buildRequest( CHECK_EQ(partIds.size(), cursors.size()); std::unordered_map parts; for (std::size_t i = 0; i < partIds.size(); ++i) { - c.set_has_next(!cursors[i].empty()); - c.set_next_cursor(cursors[i]); + if (!cursors[i].empty()) { + c.set_next_cursor(cursors[i]); + } parts.emplace(partIds[i], c); } req.set_parts(std::move(parts)); @@ -168,7 +169,7 @@ TEST(ScanEdgeTest, CursorTest) { ASSERT_EQ(0, resp.result.failed_parts.size()); checkResponse(*resp.props_ref(), edge, edge.second.size(), totalRowCount); - hasNext = resp.get_cursors().at(partId).get_has_next(); + hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value(); if (hasNext) { CHECK(resp.get_cursors().at(partId).next_cursor_ref().has_value()); cursor = *resp.get_cursors().at(partId).next_cursor_ref(); @@ -195,7 +196,7 @@ TEST(ScanEdgeTest, CursorTest) { ASSERT_EQ(0, resp.result.failed_parts.size()); checkResponse(*resp.props_ref(), edge, edge.second.size(), totalRowCount); - hasNext = resp.get_cursors().at(partId).get_has_next(); + hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value(); if (hasNext) { CHECK(resp.get_cursors().at(partId).next_cursor_ref().has_value()); cursor = *resp.get_cursors().at(partId).next_cursor_ref(); diff --git a/src/storage/test/ScanVertexTest.cpp b/src/storage/test/ScanVertexTest.cpp index 09653b7d1bf..6f232ef25df 100644 --- a/src/storage/test/ScanVertexTest.cpp +++ b/src/storage/test/ScanVertexTest.cpp @@ -28,8 +28,9 @@ cpp2::ScanVertexRequest buildRequest( CHECK_EQ(partIds.size(), cursors.size()); std::unordered_map parts; for (std::size_t i = 0; i < partIds.size(); ++i) { - c.set_has_next(!cursors[i].empty()); - c.set_next_cursor(cursors[i]); + if (!cursors[i].empty()) { + c.set_next_cursor(cursors[i]); + } parts.emplace(partIds[i], c); } req.set_parts(std::move(parts)); @@ -183,7 +184,7 @@ TEST(ScanVertexTest, CursorTest) { ASSERT_EQ(0, resp.result.failed_parts.size()); checkResponse(*resp.props_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); - hasNext = resp.get_cursors().at(partId).get_has_next(); + hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value(); if (hasNext) { CHECK(resp.get_cursors().at(partId).next_cursor_ref()); cursor = *resp.get_cursors().at(partId).next_cursor_ref(); @@ -209,7 +210,7 @@ TEST(ScanVertexTest, CursorTest) { ASSERT_EQ(0, resp.result.failed_parts.size()); checkResponse(*resp.props_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); - hasNext = resp.get_cursors().at(partId).get_has_next(); + hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value(); if (hasNext) { CHECK(resp.get_cursors().at(partId).next_cursor_ref()); cursor = *resp.get_cursors().at(partId).next_cursor_ref();