Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jievince committed Dec 6, 2021
1 parent 9b81ef7 commit 0148a8c
Show file tree
Hide file tree
Showing 22 changed files with 89 additions and 94 deletions.
20 changes: 20 additions & 0 deletions include/nebula/mclient/MConfig.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#pragma once

#include <cstdint>
#include <string>

namespace nebula {

struct MConfig {
int32_t connTimeoutInMs_{1000}; // It's as same as FLAGS_conn_timeout_ms in nebula
int32_t clientTimeoutInMs_{60 * 1000}; // It's as same as FLAG_meta_client_timeout_ms in nebula
bool enableSSL_{false};
std::string CAPath_;
};

} // namespace nebula
6 changes: 4 additions & 2 deletions include/nebula/mclient/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#include "common/datatypes/HostAddr.h"
#include "common/thrift/ThriftTypes.h"
#include "nebula/sclient/ScanEdgeIter.h"
#include "nebula/mclient/MConfig.h"

struct pair_hash {
template <class T1, class T2>
Expand Down Expand Up @@ -63,7 +63,8 @@ using SpaceEdgeNameTypeMap =

class MetaClient {
public:
explicit MetaClient(const std::vector<std::string> &metaAddrs);
explicit MetaClient(const std::vector<std::string> &metaAddrs,
const MConfig &mConfig = MConfig{});

~MetaClient();

Expand Down Expand Up @@ -103,6 +104,7 @@ class MetaClient {

private:
std::vector<HostAddr> metaAddrs_;
MConfig mConfig_;
SpaceNameIdMap spaceIndexByName_;
SpaceEdgeNameTypeMap spaceEdgeIndexByName_;
std::unordered_map<std::pair<GraphSpaceID, PartitionID>, HostAddr, pair_hash> spacePartLeaderMap_;
Expand Down
12 changes: 0 additions & 12 deletions include/nebula/sclient/Init.h

This file was deleted.

21 changes: 21 additions & 0 deletions include/nebula/sclient/SConfig.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#pragma once

#include <cstdint>
#include <string>

namespace nebula {

struct SConfig {
int32_t connTimeoutInMs_{1000}; // in ms. It's as same as FLAGS_conn_timeout_ms in nebula
int32_t clientTimeoutInMs_{60 *
1000}; // in ms. It's as same as FLAG_meta_client_timeout_ms in nebula
bool enableSSL_{false};
std::string CAPath_;
};

} // namespace nebula
1 change: 0 additions & 1 deletion include/nebula/sclient/ScanEdgeIter.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ struct ScanEdgeIter {
storage::cpp2::ScanEdgeRequest* req_;
bool hasNext_;
std::string nextCursor_;
bool firstScan_{true};
};

} // namespace nebula
8 changes: 6 additions & 2 deletions include/nebula/sclient/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/datatypes/HostAddr.h"
#include "common/thrift/ThriftTypes.h"
#include "nebula/mclient/MetaClient.h"
#include "nebula/sclient/SConfig.h"
#include "nebula/sclient/ScanEdgeIter.h"

namespace folly {
Expand Down Expand Up @@ -51,15 +52,17 @@ class ScanEdgeResponse;
} // namespace cpp2
} // namespace storage

#define DEFAULT_LIMIT 1000
#define DEFAULT_LIMIT std::numeric_limits<int64_t>::max()
#define DEFAULT_START_TIME 0
#define DEFAULT_END_TIME std::numeric_limits<int64_t>::max()

class StorageClient {
friend struct ScanEdgeIter;

public:
explicit StorageClient(const std::vector<std::string>& metaAddrs);
explicit StorageClient(const std::vector<std::string>& metaAddrs,
const MConfig& mConfig = MConfig{},
const SConfig& sConfig = SConfig{});

~StorageClient();

Expand Down Expand Up @@ -88,6 +91,7 @@ class StorageClient {
folly::Promise<std::pair<bool, Response>> pro);

std::unique_ptr<MetaClient> mClient_;
SConfig sConfig_;
std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
std::shared_ptr<thrift::ThriftClientManager<storage::cpp2::GraphStorageServiceAsyncClient>>
clientsMan_;
Expand Down
2 changes: 1 addition & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ set(NEBULA_COMMON_SOURCES
geo/io/wkb/WKBWriter.cpp
geo/io/wkb/ByteOrderDataIOStream.cpp
Init.cpp
SSLConfig.cpp
)

set(NEBULA_CLIENT_SOURCES
client/Connection.cpp
client/ConnectionPool.cpp
client/Session.cpp
client/SSLConfig.cpp
)

set(NEBULA_MCLIENT_SOURCES
Expand Down
44 changes: 0 additions & 44 deletions src/ClientTest.h

This file was deleted.

File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion src/client/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include <memory>
#include <stdexcept>

#include "./SSLConfig.h"
#include "../SSLConfig.h"
#include "interface/gen-cpp2/GraphServiceAsyncClient.h"

namespace nebula {
Expand Down
2 changes: 1 addition & 1 deletion src/client/tests/AddressTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <nebula/client/ConnectionPool.h>
#include <nebula/client/Session.h>

#include "../../ClientTest.h"
#include "./ClientTest.h"

// Require a nebula server could access

Expand Down
2 changes: 1 addition & 1 deletion src/client/tests/ConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <nebula/client/ConnectionPool.h>
#include <nebula/client/Session.h>

#include "../../ClientTest.h"
#include "./ClientTest.h"

// Require a nebula server could access

Expand Down
2 changes: 1 addition & 1 deletion src/client/tests/ConnectionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <gtest/gtest.h>
#include <nebula/client/Connection.h>

#include "../../ClientTest.h"
#include "./ClientTest.h"

// Require a nebula server could access

Expand Down
2 changes: 1 addition & 1 deletion src/client/tests/SessionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <chrono>
#include <thread>

#include "../../ClientTest.h"
#include "./ClientTest.h"

// Require a nebula server could access

Expand Down
2 changes: 1 addition & 1 deletion src/client/tests/TimezoneConversionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <chrono>
#include <thread>

#include "../../ClientTest.h"
#include "./ClientTest.h"

// Require a nebula server could access

Expand Down
10 changes: 5 additions & 5 deletions src/mclient/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@

namespace nebula {

MetaClient::MetaClient(const std::vector<std::string>& metaAddrs) {
MetaClient::MetaClient(const std::vector<std::string>& metaAddrs, const MConfig& mConfig) {
for (const auto& addr : metaAddrs) {
std::vector<std::string> ip_port;
folly::split(':', addr, ip_port, true);
CHECK(ip_port.size() == 2) << "meta server addr " << addr << " is illegal";
metaAddrs_.emplace_back(ip_port[0], folly::to<int32_t>(ip_port[1]));
}
CHECK(!metaAddrs_.empty()) << "metaAddrs_ is empty";
mConfig_ = mConfig;

ioExecutor_ = std::make_shared<folly::IOThreadPoolExecutor>(std::thread::hardware_concurrency());
clientsMan_ =
std::make_shared<thrift::ThriftClientManager<meta::cpp2::MetaServiceAsyncClient>>(false);
clientsMan_ = std::make_shared<thrift::ThriftClientManager<meta::cpp2::MetaServiceAsyncClient>>(
mConfig_.connTimeoutInMs_, mConfig_.enableSSL_, mConfig_.CAPath_);
bool b = loadData(); // load data into cache
if (!b) {
LOG(ERROR) << "load data failed";
Expand Down Expand Up @@ -199,8 +200,7 @@ void MetaClient::getResponse(Request req,
respGen = std::move(respGen),
pro = std::move(pro),
this]() mutable {
auto client = clientsMan_->client(
host, evb, false, 60 * 1000); // FLAGS_meta_client_timeout_ms
auto client = clientsMan_->client(host, evb, false, mConfig_.clientTimeoutInMs_);
LOG(INFO) << "Send request to meta " << host;
remoteFunc(client, req)
.via(evb)
Expand Down
5 changes: 2 additions & 3 deletions src/mclient/tests/MetaClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,16 @@
#include <nebula/client/ConnectionPool.h>
#include <nebula/client/Session.h>
#include <nebula/mclient/MetaClient.h>
#include <nebula/sclient/Init.h>

#include "../../ClientTest.h"
#include "./MClientTest.h"
#include "common/datatypes/HostAddr.h"
#include "common/thrift/ThriftTypes.h"

// Require a nebula server could access

#define kServerHost "127.0.0.1"

class MetaClientTest : public ClientTest {
class MetaClientTest : public MClientTest {
protected:
static void prepare() {
nebula::ConnectionPool pool;
Expand Down
12 changes: 7 additions & 5 deletions src/sclient/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@

namespace nebula {

StorageClient::StorageClient(const std::vector<std::string>& metaAddrs) {
mClient_ = std::make_unique<MetaClient>(metaAddrs);
StorageClient::StorageClient(const std::vector<std::string>& metaAddrs,
const MConfig& mConfig,
const SConfig& sConfig) {
mClient_ = std::make_unique<MetaClient>(metaAddrs, mConfig);
sConfig_ = sConfig;
ioExecutor_ = std::make_shared<folly::IOThreadPoolExecutor>(std::thread::hardware_concurrency());
clientsMan_ =
std::make_shared<thrift::ThriftClientManager<storage::cpp2::GraphStorageServiceAsyncClient>>(
false);
sConfig.connTimeoutInMs_, sConfig.enableSSL_, sConfig.CAPath_);
}

StorageClient::~StorageClient() = default;
Expand Down Expand Up @@ -118,8 +121,7 @@ void StorageClient::getResponse(std::pair<HostAddr, Request>&& request,
pro = std::move(pro),
this]() mutable {
auto host = request.first;
auto client = clientsMan_->client(
host, evb, false, 60 * 1000); // FLAGS_storage_client_timeout_ms
auto client = clientsMan_->client(host, evb, false, sConfig_.clientTimeoutInMs_);
LOG(INFO) << "Send request to storage " << host;
remoteFunc(client.get(), request.second)
.via(evb)
Expand Down
6 changes: 3 additions & 3 deletions src/sclient/tests/StorageClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* This source code is licensed under Apache 2.0 License.
*/

#include <common/Init.h>
#include <common/datatypes/DataSet.h>
#include <folly/synchronization/Baton.h>
#include <glog/logging.h>
Expand All @@ -11,17 +12,16 @@
#include <nebula/client/ConnectionPool.h>
#include <nebula/client/Session.h>
#include <nebula/mclient/MetaClient.h>
#include <nebula/sclient/Init.h>
#include <nebula/sclient/ScanEdgeIter.h>
#include <nebula/sclient/StorageClient.h>

#include "../../ClientTest.h"
#include "./SClientTest.h"

// Require a nebula server could access

#define kServerHost "127.0.0.1"

class StorageClientTest : public ClientTest {
class StorageClientTest : public SClientTest {
protected:
static void prepare() {
nebula::ConnectionPool pool;
Expand Down
17 changes: 9 additions & 8 deletions src/thrift/ThriftClientManager-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <folly/system/ThreadName.h>
#include <thrift/lib/cpp2/async/HeaderClientChannel.h>

#include "../SSLConfig.h"

namespace nebula {
namespace thrift {

Expand Down Expand Up @@ -67,14 +69,13 @@ std::shared_ptr<ClientType> ThriftClientManager<ClientType>::client(const HostAd

VLOG(2) << "Connecting to " << host << " for " << ++connectionCount << " times";
std::shared_ptr<folly::AsyncSocket> socket;
evb->runImmediatelyOrRunInEventBaseThreadAndWait([&socket, evb, resolved]() {
// if (enableSSL_) {
// socket = folly::AsyncSSLSocket::newSocket(nebula::createSSLContext(), evb);
// socket->connect(nullptr, resolved.host, resolved.port, 1000); // FLAGS_conn_timeout_ms
// } else {
socket = folly::AsyncSocket::newSocket(
evb, resolved.host, resolved.port, 1000); // FLAGS_conn_timeout_ms
// }
evb->runImmediatelyOrRunInEventBaseThreadAndWait([this, &socket, evb, resolved]() {
if (enableSSL_) {
socket = folly::AsyncSSLSocket::newSocket(nebula::createSSLContext(CAPath_), evb);
socket->connect(nullptr, resolved.host, resolved.port, connTimeoutInMs_);
} else {
socket = folly::AsyncSocket::newSocket(evb, resolved.host, resolved.port, connTimeoutInMs_);
}
});
auto headerClientChannel = apache::thrift::HeaderClientChannel::newChannel(socket);
if (timeout > 0) {
Expand Down
Loading

0 comments on commit 0148a8c

Please sign in to comment.