Skip to content

Commit

Permalink
adapt to new scan interface
Browse files Browse the repository at this point in the history
  • Loading branch information
jievince committed Nov 19, 2021
1 parent 4784f29 commit 38ff016
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
20 changes: 13 additions & 7 deletions src/sclient/ScanEdgeIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ bool ScanEdgeIter::hasNext() { return hasNext_; }
ScanEdgeIter::~ScanEdgeIter() { delete req_; }

DataSet ScanEdgeIter::next() {
if (!hasNext_) {
if (!hasNext()) {
LOG(ERROR) << "hasNext() == false !";
return DataSet();
}
DCHECK(!!req_);
req_->set_cursor(std::move(nextCursor_));
auto partCursorMapReq = req_->get_parts();
DCHECK_EQ(partCursorMapReq.size(), 1);
partCursorMapReq.begin()->second.set_next_cursor(nextCursor_);
req_->set_parts(partCursorMapReq);
auto r = client_->doScanEdge(*req_);
if (!r.first) {
LOG(ERROR) << "Scan edge failed";
Expand All @@ -39,11 +42,14 @@ DataSet ScanEdgeIter::next() {
this->hasNext_ = false;
return DataSet();
}
this->hasNext_ = scanResponse.get_has_next();
std::string* cursor = scanResponse.get_next_cursor();
if (cursor) {
this->nextCursor_ = *cursor;
}
auto partCursorMapResp = scanResponse.get_cursors();
DCHECK_EQ(partCursorMapResp.size(), 1);
auto scanCursor = partCursorMapResp.begin()->second;
hasNext_ = scanCursor.get_has_next();
LOG(INFO) << "hasNext: " << hasNext_;
nextCursor_ = *scanCursor.get_next_cursor();
LOG(INFO) << "nextCursor_: " << nextCursor_;

return scanResponse.get_edge_data();
}

Expand Down
16 changes: 12 additions & 4 deletions src/sclient/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,14 @@ ScanEdgeIter StorageClient::scanEdgeWithPart(std::string spaceName,

auto* req = new storage::cpp2::ScanEdgeRequest;
req->set_space_id(spaceId);
// req->set_parts(std::unordered_map<PartitionID, storage::cpp2::ScanCursor>{{partId, ""}});
req->set_part_id(partId);
req->set_cursor("");
// old interface
// req->set_part_id(partId);
// req->set_cursor("");
// new interface
storage::cpp2::ScanCursor scanCursor;
scanCursor.set_next_cursor("");
req->set_parts(std::unordered_map<PartitionID, storage::cpp2::ScanCursor>{
{partId, scanCursor}});
req->set_return_columns(returnCols);
req->set_limit(limit);
req->set_start_time(startTime);
Expand All @@ -80,7 +85,10 @@ ScanEdgeIter StorageClient::scanEdgeWithPart(std::string spaceName,
std::pair<bool, storage::cpp2::ScanEdgeResponse> StorageClient::doScanEdge(
const storage::cpp2::ScanEdgeRequest& req) {
std::pair<HostAddr, storage::cpp2::ScanEdgeRequest> request;
auto host = mClient_->getPartLeaderFromCache(req.get_space_id(), req.get_part_id());
auto partCursorMap = req.get_parts();
DCHECK_EQ(partCursorMap.size(), 1);
PartitionID partId = partCursorMap.begin()->first;
auto host = mClient_->getPartLeaderFromCache(req.get_space_id(), partId);
if (!host.first) {
return {false, storage::cpp2::ScanEdgeResponse()};
}
Expand Down

0 comments on commit 38ff016

Please sign in to comment.