-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathThriftClientManager-inl.h
100 lines (91 loc) · 3.74 KB
/
ThriftClientManager-inl.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once
#include <folly/io/async/AsyncSSLSocket.h>
#include <folly/io/async/AsyncSocket.h>
#include <folly/system/ThreadName.h>
#include <thrift/lib/cpp2/async/HeaderClientChannel.h>
#include "common/network/NetworkUtils.h"
#include "common/ssl/SSLConfig.h"
DECLARE_int32(conn_timeout_ms);
namespace nebula {
namespace thrift {
template <class ClientType>
std::shared_ptr<ClientType> ThriftClientManager<ClientType>::client(const HostAddr& host,
folly::EventBase* evb,
bool compatibility,
uint32_t timeout) {
if (evb == nullptr) {
evb = folly::EventBaseManager::get()->getEventBase();
}
// Get client from client manager if it is ok.
auto it = clientMap_->find(std::make_pair(host, evb));
if (it != clientMap_->end()) {
do {
auto channel = dynamic_cast<apache::thrift::HeaderClientChannel*>(it->second->getChannel());
if (channel == nullptr || !channel->good()) {
// Remove bad connection to create a new one.
clientMap_->erase(it);
VLOG(2) << "Invalid Channel: " << channel << " for host: " << host;
break;
}
auto transport = dynamic_cast<folly::AsyncSocket*>(channel->getTransport());
if (transport == nullptr || transport->hangup()) {
clientMap_->erase(it);
VLOG(2) << "Transport is closed by peers " << transport << " for host: " << host;
break;
}
VLOG(2) << "Getting a client to " << host;
return it->second;
} while (false);
}
// Need to create a new client and insert it to client map.
VLOG(2) << "There is no existing client to " << host << ", trying to create one";
static thread_local int connectionCount = 0;
/*
* TODO(liuyu): folly said 'resolve' may take second to finish
* if this really happen, we will add a cache here.
* */
HostAddr resolved = host;
if (!folly::IPAddress::validate(resolved.host)) {
try {
folly::SocketAddress socketAddr(resolved.host, resolved.port, true);
std::ostringstream oss;
oss << "resolve " << resolved << " as ";
resolved.host = socketAddr.getAddressStr();
oss << resolved;
LOG(INFO) << oss.str();
} catch (const std::exception& e) {
// if we resolve failed, just return a connection, we will retry later
LOG(ERROR) << e.what();
}
}
VLOG(2) << "Connecting to " << host << " for " << ++connectionCount << " times";
std::shared_ptr<folly::AsyncSocket> socket;
evb->runImmediatelyOrRunInEventBaseThreadAndWait([this, &socket, evb, resolved]() {
if (enableSSL_) {
socket = folly::AsyncSSLSocket::newSocket(nebula::createSSLContext(), evb);
socket->connect(nullptr, resolved.host, resolved.port, FLAGS_conn_timeout_ms);
} else {
socket =
folly::AsyncSocket::newSocket(evb, resolved.host, resolved.port, FLAGS_conn_timeout_ms);
}
});
auto headerClientChannel = apache::thrift::HeaderClientChannel::newChannel(socket);
if (timeout > 0) {
headerClientChannel->setTimeout(timeout);
}
if (compatibility) {
headerClientChannel->setProtocolId(apache::thrift::protocol::T_BINARY_PROTOCOL);
headerClientChannel->setClientType(THRIFT_UNFRAMED_DEPRECATED);
}
std::shared_ptr<ClientType> client(
new ClientType(std::move(headerClientChannel)),
[evb](auto* p) { evb->runImmediatelyOrRunInEventBaseThreadAndWait([p] { delete p; }); });
clientMap_->emplace(std::make_pair(host, evb), client);
return client;
}
} // namespace thrift
} // namespace nebula