From 130a666b27b23af214a516afd28cde301ba933e3 Mon Sep 17 00:00:00 2001 From: jievince <38901892+jievince@users.noreply.github.com> Date: Fri, 19 Nov 2021 18:14:28 +0800 Subject: [PATCH] fix --- include/nebula/sclient/ScanEdgeIter.h | 1 + include/nebula/sclient/StorageClient.h | 2 +- src/mclient/MetaClient.cpp | 2 +- src/mclient/tests/MetaClientTest.cpp | 23 ++++++++++++++++++++++- src/sclient/ScanEdgeIter.cpp | 12 ++++++++---- src/sclient/StorageClient.cpp | 5 ++--- src/thrift/ThriftClientManager-inl.h | 2 +- 7 files changed, 36 insertions(+), 11 deletions(-) diff --git a/include/nebula/sclient/ScanEdgeIter.h b/include/nebula/sclient/ScanEdgeIter.h index 7bdd521a..7ec7170b 100644 --- a/include/nebula/sclient/ScanEdgeIter.h +++ b/include/nebula/sclient/ScanEdgeIter.h @@ -35,6 +35,7 @@ struct ScanEdgeIter { storage::cpp2::ScanEdgeRequest* req_; bool hasNext_; std::string nextCursor_; + bool firstScan_{true}; }; } // namespace nebula diff --git a/include/nebula/sclient/StorageClient.h b/include/nebula/sclient/StorageClient.h index f5cac2d6..7fc2efc4 100644 --- a/include/nebula/sclient/StorageClient.h +++ b/include/nebula/sclient/StorageClient.h @@ -56,7 +56,7 @@ class ScanEdgeResponse; #define DEFAULT_END_TIME std::numeric_limits::max() class StorageClient { -friend class ScanEdgeIter; +friend struct ScanEdgeIter; public: explicit StorageClient(const std::vector& metaAddrs); diff --git a/src/mclient/MetaClient.cpp b/src/mclient/MetaClient.cpp index de7677d5..717c9ef7 100644 --- a/src/mclient/MetaClient.cpp +++ b/src/mclient/MetaClient.cpp @@ -198,7 +198,7 @@ void MetaClient::getResponse(Request req, LOG(INFO) << "Send request to meta " << host; remoteFunc(client, req) .via(evb) - .then([host, respGen = std::move(respGen), pro = std::move(pro), this]( + .then([host, respGen = std::move(respGen), pro = std::move(pro)]( folly::Try&& t) mutable { // exception occurred during RPC if (t.hasException()) { diff --git a/src/mclient/tests/MetaClientTest.cpp b/src/mclient/tests/MetaClientTest.cpp index 17fb4cd9..f26fec59 100644 --- a/src/mclient/tests/MetaClientTest.cpp +++ b/src/mclient/tests/MetaClientTest.cpp @@ -9,6 +9,10 @@ #include #include #include +// #include +// #include +// #include +// #include #include "../../ClientTest.h" #include "common/datatypes/HostAddr.h" @@ -20,6 +24,22 @@ class MetaClientTest : public ClientTest { protected: + // static void prepare() { + // nebula::ConnectionPool pool; + // pool.init({kServerHost ":38996"}, nebula::Config{}); + // auto session = pool.getSession("root", "nebula"); + // ASSERT_TRUE(session.valid()); + // // ping + // EXPECT_TRUE(session.ping()); + // // execute + // auto result = session.execute("CREATE SPACE meta_client_test(vid_type=FIXED_STRING(8), + // partition_num=3);USE meta_client_test"); ASSERT_EQ(result.errorCode, + // nebula::ErrorCode::SUCCEEDED); + + // auto result2 = session.execute("CREATE EDGE like(likeness int)"); + // ASSERT_EQ(result2.errorCode, nebula::ErrorCode::SUCCEEDED); + // } + static void runOnce(nebula::MetaClient &c) { auto ret = c.getSpaceIdByNameFromCache("nba"); ASSERT_TRUE(ret.first); @@ -40,7 +60,7 @@ class MetaClientTest : public ClientTest { for (auto partId : parts) { LOG(INFO) << partId << ","; } - EXPECT_GT(parts.size(), 0); + EXPECT_EQ(parts, (std::vector{1, 2, 3})); auto ret4 = c.getPartLeaderFromCache(spaceId, 1); ASSERT_TRUE(ret4.first); @@ -50,6 +70,7 @@ class MetaClientTest : public ClientTest { TEST_F(MetaClientTest, Basic) { nebula::MetaClient c({kServerHost ":45996"}); + // prepare(); runOnce(c); } diff --git a/src/sclient/ScanEdgeIter.cpp b/src/sclient/ScanEdgeIter.cpp index a8b0ee53..7d075057 100644 --- a/src/sclient/ScanEdgeIter.cpp +++ b/src/sclient/ScanEdgeIter.cpp @@ -25,10 +25,14 @@ DataSet ScanEdgeIter::next() { return DataSet(); } DCHECK(!!req_); - auto partCursorMapReq = req_->get_parts(); - DCHECK_EQ(partCursorMapReq.size(), 1); - partCursorMapReq.begin()->second.set_next_cursor(nextCursor_); - req_->set_parts(partCursorMapReq); + if (firstScan_) { + firstScan_ = false; + } else { + auto partCursorMapReq = req_->get_parts(); + DCHECK_EQ(partCursorMapReq.size(), 1); + partCursorMapReq.begin()->second.set_next_cursor(nextCursor_); + req_->set_parts(partCursorMapReq); + } auto r = client_->doScanEdge(*req_); if (!r.first) { LOG(ERROR) << "Scan edge failed"; diff --git a/src/sclient/StorageClient.cpp b/src/sclient/StorageClient.cpp index 33e5654f..b648599d 100644 --- a/src/sclient/StorageClient.cpp +++ b/src/sclient/StorageClient.cpp @@ -68,7 +68,7 @@ ScanEdgeIter StorageClient::scanEdgeWithPart(std::string spaceName, // req->set_cursor(""); // new interface storage::cpp2::ScanCursor scanCursor; - scanCursor.set_next_cursor(""); + scanCursor.set_has_next(false); req->set_parts(std::unordered_map{ {partId, scanCursor}}); req->set_return_columns(returnCols); @@ -120,11 +120,10 @@ void StorageClient::getResponse(std::pair&& request, auto host = request.first; auto client = clientsMan_->client(host, evb, false, 60 * 1000); // FLAGS_storage_client_timeout_ms - auto spaceId = request.second.get_space_id(); LOG(INFO) << "Send request to storage " << host; remoteFunc(client.get(), request.second) .via(evb) - .then([spaceId, pro = std::move(pro), host, this](folly::Try&& t) mutable { + .then([pro = std::move(pro), host](folly::Try&& t) mutable { // exception occurred during RPC if (t.hasException()) { LOG(ERROR) << "Send request to " << host << " failed"; diff --git a/src/thrift/ThriftClientManager-inl.h b/src/thrift/ThriftClientManager-inl.h index 9bba35c1..022b5d53 100644 --- a/src/thrift/ThriftClientManager-inl.h +++ b/src/thrift/ThriftClientManager-inl.h @@ -67,7 +67,7 @@ std::shared_ptr ThriftClientManager::client(const HostAd VLOG(2) << "Connecting to " << host << " for " << ++connectionCount << " times"; std::shared_ptr socket; - evb->runImmediatelyOrRunInEventBaseThreadAndWait([this, &socket, evb, resolved]() { + evb->runImmediatelyOrRunInEventBaseThreadAndWait([&socket, evb, resolved]() { // if (enableSSL_) { // socket = folly::AsyncSSLSocket::newSocket(nebula::createSSLContext(), evb); // socket->connect(nullptr, resolved.host, resolved.port, 1000); // FLAGS_conn_timeout_ms