Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jievince committed Nov 22, 2021
1 parent 845425f commit 130a666
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 11 deletions.
1 change: 1 addition & 0 deletions include/nebula/sclient/ScanEdgeIter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct ScanEdgeIter {
storage::cpp2::ScanEdgeRequest* req_;
bool hasNext_;
std::string nextCursor_;
bool firstScan_{true};
};

} // namespace nebula
2 changes: 1 addition & 1 deletion include/nebula/sclient/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class ScanEdgeResponse;
#define DEFAULT_END_TIME std::numeric_limits<int64_t>::max()

class StorageClient {
friend class ScanEdgeIter;
friend struct ScanEdgeIter;
public:
explicit StorageClient(const std::vector<std::string>& metaAddrs);

Expand Down
2 changes: 1 addition & 1 deletion src/mclient/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ void MetaClient::getResponse(Request req,
LOG(INFO) << "Send request to meta " << host;
remoteFunc(client, req)
.via(evb)
.then([host, respGen = std::move(respGen), pro = std::move(pro), this](
.then([host, respGen = std::move(respGen), pro = std::move(pro)](
folly::Try<RpcResponse>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
Expand Down
23 changes: 22 additions & 1 deletion src/mclient/tests/MetaClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
#include <gtest/gtest.h>
#include <nebula/mclient/MetaClient.h>
#include <nebula/sclient/Init.h>
// #include <nebula/client/Config.h>
// #include <nebula/client/ConnectionPool.h>
// #include <nebula/client/Init.h>
// #include <nebula/client/Session.h>

#include "../../ClientTest.h"
#include "common/datatypes/HostAddr.h"
Expand All @@ -20,6 +24,22 @@

class MetaClientTest : public ClientTest {
protected:
// static void prepare() {
// nebula::ConnectionPool pool;
// pool.init({kServerHost ":38996"}, nebula::Config{});
// auto session = pool.getSession("root", "nebula");
// ASSERT_TRUE(session.valid());
// // ping
// EXPECT_TRUE(session.ping());
// // execute
// auto result = session.execute("CREATE SPACE meta_client_test(vid_type=FIXED_STRING(8),
// partition_num=3);USE meta_client_test"); ASSERT_EQ(result.errorCode,
// nebula::ErrorCode::SUCCEEDED);

// auto result2 = session.execute("CREATE EDGE like(likeness int)");
// ASSERT_EQ(result2.errorCode, nebula::ErrorCode::SUCCEEDED);
// }

static void runOnce(nebula::MetaClient &c) {
auto ret = c.getSpaceIdByNameFromCache("nba");
ASSERT_TRUE(ret.first);
Expand All @@ -40,7 +60,7 @@ class MetaClientTest : public ClientTest {
for (auto partId : parts) {
LOG(INFO) << partId << ",";
}
EXPECT_GT(parts.size(), 0);
EXPECT_EQ(parts, (std::vector<nebula::PartitionID>{1, 2, 3}));

auto ret4 = c.getPartLeaderFromCache(spaceId, 1);
ASSERT_TRUE(ret4.first);
Expand All @@ -50,6 +70,7 @@ class MetaClientTest : public ClientTest {

TEST_F(MetaClientTest, Basic) {
nebula::MetaClient c({kServerHost ":45996"});
// prepare();
runOnce(c);
}

Expand Down
12 changes: 8 additions & 4 deletions src/sclient/ScanEdgeIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ DataSet ScanEdgeIter::next() {
return DataSet();
}
DCHECK(!!req_);
auto partCursorMapReq = req_->get_parts();
DCHECK_EQ(partCursorMapReq.size(), 1);
partCursorMapReq.begin()->second.set_next_cursor(nextCursor_);
req_->set_parts(partCursorMapReq);
if (firstScan_) {
firstScan_ = false;
} else {
auto partCursorMapReq = req_->get_parts();
DCHECK_EQ(partCursorMapReq.size(), 1);
partCursorMapReq.begin()->second.set_next_cursor(nextCursor_);
req_->set_parts(partCursorMapReq);
}
auto r = client_->doScanEdge(*req_);
if (!r.first) {
LOG(ERROR) << "Scan edge failed";
Expand Down
5 changes: 2 additions & 3 deletions src/sclient/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ ScanEdgeIter StorageClient::scanEdgeWithPart(std::string spaceName,
// req->set_cursor("");
// new interface
storage::cpp2::ScanCursor scanCursor;
scanCursor.set_next_cursor("");
scanCursor.set_has_next(false);
req->set_parts(std::unordered_map<PartitionID, storage::cpp2::ScanCursor>{
{partId, scanCursor}});
req->set_return_columns(returnCols);
Expand Down Expand Up @@ -120,11 +120,10 @@ void StorageClient::getResponse(std::pair<HostAddr, Request>&& request,
auto host = request.first;
auto client =
clientsMan_->client(host, evb, false, 60 * 1000); // FLAGS_storage_client_timeout_ms
auto spaceId = request.second.get_space_id();
LOG(INFO) << "Send request to storage " << host;
remoteFunc(client.get(), request.second)
.via(evb)
.then([spaceId, pro = std::move(pro), host, this](folly::Try<Response>&& t) mutable {
.then([pro = std::move(pro), host](folly::Try<Response>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
LOG(ERROR) << "Send request to " << host << " failed";
Expand Down
2 changes: 1 addition & 1 deletion src/thrift/ThriftClientManager-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ std::shared_ptr<ClientType> ThriftClientManager<ClientType>::client(const HostAd

VLOG(2) << "Connecting to " << host << " for " << ++connectionCount << " times";
std::shared_ptr<folly::AsyncSocket> socket;
evb->runImmediatelyOrRunInEventBaseThreadAndWait([this, &socket, evb, resolved]() {
evb->runImmediatelyOrRunInEventBaseThreadAndWait([&socket, evb, resolved]() {
// if (enableSSL_) {
// socket = folly::AsyncSSLSocket::newSocket(nebula::createSSLContext(), evb);
// socket->connect(nullptr, resolved.host, resolved.port, 1000); // FLAGS_conn_timeout_ms
Expand Down

0 comments on commit 130a666

Please sign in to comment.