Skip to content

Commit

Permalink
optimize keepalive code & add UT
Browse files Browse the repository at this point in the history
  • Loading branch information
chenguangming committed Jan 30, 2023
1 parent 0c4ad71 commit 3dacaa5
Show file tree
Hide file tree
Showing 5 changed files with 559 additions and 100 deletions.
38 changes: 27 additions & 11 deletions src/brpc/input_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ BRPC_VALIDATE_GFLAG(log_connection_close, PassValidate);
DEFINE_bool(socket_keepalive, false,
"Enable keepalive of sockets if this value is true");

DEFINE_int32(socket_keepidle_s, -1,
DEFINE_int32(socket_keepalive_idle_s, -1,
"Set idle time of sockets before keepalive if this value is positive");

DEFINE_int32(socket_keepintvl_s, -1,
DEFINE_int32(socket_keepalive_interval_s, -1,
"Set interval of sockets between keepalives if this value is positive");

DEFINE_int32(socket_keepcnt, -1,
DEFINE_int32(socket_keepalive_count, -1,
"Set number of keepalives of sockets before close if this value is positive");

DECLARE_bool(usercode_in_pthread);
Expand Down Expand Up @@ -486,10 +486,14 @@ int InputMessenger::Create(const butil::EndPoint& remote_side,
options.user = this;
options.on_edge_triggered_events = OnNewMessages;
options.health_check_interval_s = health_check_interval_s;
options.keepalive = FLAGS_socket_keepalive;
options.keepidle_s = FLAGS_socket_keepidle_s;
options.keepintvl_s = FLAGS_socket_keepintvl_s;
options.keepcnt = FLAGS_socket_keepcnt;
if (FLAGS_socket_keepalive) {
options.mutable_keepalive_options()->keepalive_idle_s
= FLAGS_socket_keepalive_idle_s;
options.mutable_keepalive_options()->keepalive_interval_s
= FLAGS_socket_keepalive_interval_s;
options.mutable_keepalive_options()->keepalive_count
= FLAGS_socket_keepalive_count;
}
return Socket::Create(options, id);
}

Expand All @@ -505,10 +509,22 @@ int InputMessenger::Create(SocketOptions options, SocketId* id) {
#endif
options.on_edge_triggered_events = OnNewMessages;
}
options.keepalive = FLAGS_socket_keepalive;
options.keepidle_s = FLAGS_socket_keepidle_s;
options.keepintvl_s = FLAGS_socket_keepintvl_s;
options.keepcnt = FLAGS_socket_keepcnt;
// Enable keepalive by options or Gflag.
// Priority: options > Gflag.
if (options.has_keepalive_options() || FLAGS_socket_keepalive) {
if (options.mutable_keepalive_options()->keepalive_idle_s <= 0) {
options.mutable_keepalive_options()->keepalive_idle_s
= FLAGS_socket_keepalive_idle_s;
}
if (options.mutable_keepalive_options()->keepalive_interval_s <= 0) {
options.mutable_keepalive_options()->keepalive_interval_s
= FLAGS_socket_keepalive_interval_s;
}
if (options.mutable_keepalive_options()->keepalive_count <= 0) {
options.mutable_keepalive_options()->keepalive_count
= FLAGS_socket_keepalive_count;
}
}
return Socket::Create(options, id);
}

Expand Down
178 changes: 109 additions & 69 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,6 @@ Socket::Socket(Forbidden)
, _stream_set(NULL)
, _total_streams_unconsumed_size(0)
, _ninflight_app_health_check(0)
, _keepalive(false)
, _keepidle_s(-1)
, _keepintvl_s(-1)
, _keepcnt(-1)
{
CreateVarsOnce();
pthread_mutex_init(&_id_wait_list_mutex, NULL);
Expand Down Expand Up @@ -583,80 +579,80 @@ int Socket::ResetFileDescriptor(int fd) {
}
}

do {
if (!_keepalive) {
break;
SetKeepalive(fd);

if (_on_edge_triggered_events) {
if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) {
PLOG(ERROR) << "Fail to add SocketId=" << id()
<< " into EventDispatcher";
_fd.store(-1, butil::memory_order_release);
return -1;
}
}
return 0;
}

{
int keep_alive = 1;
socklen_t size = sizeof(keep_alive);
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keep_alive, size)!=0) {
LOG(FATAL) << "Fail to set keepalive of fd=" << fd;
break;
}
void Socket::SetKeepalive(int fd) {
if (!_keepalive_options) {
return;
} else {
int keep_alive = 1;
socklen_t size = sizeof(keep_alive);
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keep_alive, size)!=0) {
LOG(FATAL) << "Fail to set keepalive of fd=" << fd;
return;
}
}

#if defined(OS_LINUX)
if (_keepidle_s > 0) {
int keepidle_s = _keepidle_s;
socklen_t size = sizeof(keepidle_s);
if (setsockopt(fd, SOL_TCP, TCP_KEEPIDLE, &keepidle_s, size) < 0) {
LOG(FATAL) << "Fail to set keepidle of fd=" << fd;
}
if (_keepalive_options->keepalive_idle_s > 0) {
socklen_t size = sizeof(_keepalive_options->keepalive_idle_s);
if (setsockopt(fd, SOL_TCP, TCP_KEEPIDLE,
&_keepalive_options->keepalive_idle_s, size) < 0) {
LOG(FATAL) << "Fail to set keepidle of fd=" << fd;
}
}

if (_keepintvl_s > 0) {
int keepintvl_s = _keepintvl_s;
socklen_t size = sizeof(keepintvl_s);
if (setsockopt(fd, SOL_TCP, TCP_KEEPINTVL, &keepintvl_s, size) < 0) {
LOG(FATAL) << "Fail to set keepintvl of fd=" << fd;
}
if (_keepalive_options->keepalive_interval_s > 0) {
socklen_t size = sizeof(_keepalive_options->keepalive_interval_s);
if (setsockopt(fd, SOL_TCP, TCP_KEEPINTVL,
&_keepalive_options->keepalive_interval_s, size) < 0) {
LOG(FATAL) << "Fail to set keepintvl of fd=" << fd;
}
}

if (_keepcnt > 0) {
int keepcnt = _keepcnt;
socklen_t size = sizeof(keepcnt);
if (setsockopt(fd, SOL_TCP, TCP_KEEPCNT, &keepcnt, size) < 0) {
LOG(FATAL) << "Fail to set keepcnt of fd=" << fd;
}
if (_keepalive_options->keepalive_count > 0) {
socklen_t size = sizeof(_keepalive_options->keepalive_count);
if (setsockopt(fd, SOL_TCP, TCP_KEEPCNT,
&_keepalive_options->keepalive_count, size) < 0) {
LOG(FATAL) << "Fail to set keepcnt of fd=" << fd;
}
}
#elif defined(OS_MACOSX)
if (_keepidle_s > 0) {
int keepidle_s = _keepidle_s;
socklen_t size = sizeof(keepidle_s);
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE, &keepidle_s, size) < 0) {
LOG(FATAL) << "Fail to set keepidle of fd=" << fd;
}
}

if (_keepintvl_s > 0) {
int keepintvl_s = _keepintvl_s;
socklen_t size = sizeof(keepintvl_s);
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &keepintvl_s, size) < 0) {
LOG(FATAL) << "Fail to set keepintvl of fd=" << fd;
}
if (_keepalive_options->keepalive_idle_s > 0) {
socklen_t size = sizeof(_keepalive_options->keepalive_idle_s);
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE,
&_keepalive_options->keepalive_idle_s, size) < 0) {
LOG(FATAL) << "Fail to set keepidle of fd=" << fd;
}
}

if (_keepcnt > 0) {
int keepcnt = _keepcnt;
socklen_t size = sizeof(keepcnt);
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &keepcnt, size) < 0) {
LOG(FATAL) << "Fail to set keepcnt of fd=" << fd;
}
if (_keepalive_options->keepalive_interval_s > 0) {
socklen_t size = sizeof(_keepalive_options->keepalive_interval_s);
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL,
&_keepalive_options->keepalive_interval_s, size) < 0) {
LOG(FATAL) << "Fail to set keepintvl of fd=" << fd;
}
#endif
} while (false);
}

if (_on_edge_triggered_events) {
if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) {
PLOG(ERROR) << "Fail to add SocketId=" << id()
<< " into EventDispatcher";
_fd.store(-1, butil::memory_order_release);
return -1;
if (_keepalive_options->keepalive_count > 0) {
socklen_t size = sizeof(_keepalive_options->keepalive_count);
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT,
&_keepalive_options->keepalive_count, size) < 0) {
LOG(FATAL) << "Fail to set keepcnt of fd=" << fd;
}
}
return 0;
#endif
}

// SocketId = 32-bit version + 32-bit slot.
Expand Down Expand Up @@ -745,10 +741,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
}
m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
m->_unwritten_bytes.store(0, butil::memory_order_relaxed);
m->_keepalive = options.keepalive;
m->_keepidle_s = options.keepidle_s;
m->_keepintvl_s = options.keepintvl_s;
m->_keepcnt = options.keepcnt;
m->_keepalive_options = options.shared_keepalibe_options();
CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));
// Must be last one! Internal fields of this Socket may be access
// just after calling ResetFileDescriptor.
Expand Down Expand Up @@ -2340,10 +2333,57 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
Print(os, ptr->_ssl_session, "\n ");
os << "\n}";
}
os << "\nkeepalive=" << ptr->_keepalive
<< "\ntcp_keepalive_time=" << ptr->_keepidle_s
<< "\ntcp_keepalive_intvl=" << ptr->_keepintvl_s
<< "\ntcp_keepalive_probes=" << ptr->_keepcnt;

{
int keepalive = 0;
socklen_t len = sizeof(keepalive);
if (getsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, &len) == 0) {
os << "\nkeepalive=" << keepalive;
}
}

{
int keepidle = 0;
socklen_t len = sizeof(keepidle);
#if defined(OS_MACOSX)
if (getsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE, &keepidle, &len) == 0) {
os << "\ntcp_keepalive_time=" << keepidle;
}
#elif defined(OS_LINUX)
if (getsockopt(fd, SOL_TCP, TCP_KEEPIDLE, &keepidle, &len) == 0) {
os << "\ntcp_keepalive_time=" << keepidle;
}
#endif
}

{
int keepintvl = 0;
socklen_t len = sizeof(keepintvl);
#if defined(OS_MACOSX)
if (getsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &keepintvl, &len) == 0) {
os << "\ntcp_keepalive_intvl=" << keepintvl;
}
#elif defined(OS_LINUX)
if (getsockopt(fd, SOL_TCP, TCP_KEEPINTVL, &keepintvl, &len) == 0) {
os << "\ntcp_keepalive_intvl=" << keepintvl;
}
#endif
}

{
int keepcnt = 0;
socklen_t len = sizeof(keepcnt);
#if defined(OS_MACOSX)
if (getsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &keepcnt, &len) == 0) {
os << "\ntcp_keepalive_probes=" << keepcnt;
}
#elif defined(OS_LINUX)
if (getsockopt(fd, SOL_TCP, TCP_KEEPCNT, &keepcnt, &len) == 0) {
os << "\ntcp_keepalive_probes=" << keepcnt;
}
#endif
}

#if defined(OS_MACOSX)
struct tcp_connection_info ti;
socklen_t len = sizeof(ti);
Expand Down
60 changes: 44 additions & 16 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,20 @@ struct SocketSSLContext {
std::string sni_name; // useful for clients
};

struct SocketKeepaliveOptions {
SocketKeepaliveOptions()
: keepalive_idle_s(-1)
, keepalive_interval_s(-1)
, keepalive_count(-1)
{}
// Start keeplives after this period.
int keepalive_idle_s;
// Interval between keepalives.
int keepalive_interval_s;
// Number of keepalives before death.
int keepalive_count;
};

// TODO: Comment fields
struct SocketOptions {
SocketOptions();
Expand All @@ -199,14 +213,30 @@ struct SocketOptions {
// The created socket will set parsing_context with this value.
Destroyable* initial_parsing_context;

// Enable TCP keepalive or not.
bool keepalive;
// Start keeplives after this period.
int keepidle_s;
// Interval between keepalives.
int keepintvl_s;
// Number of keepalives before death.
int keepcnt;
// Socket keepalive related options.
// Refer to `SocketKeepaliveOptions' for details
void enable_keepalive() {
if (!_keepalive_options) {
_keepalive_options.reset(new SocketKeepaliveOptions);
}
}
bool has_keepalive_options() { return _keepalive_options != NULL; }
const SocketKeepaliveOptions& keepalive_options() const {
return *_keepalive_options;
}
SocketKeepaliveOptions* mutable_keepalive_options() {
enable_keepalive();
return _keepalive_options.get();
}
std::shared_ptr<SocketKeepaliveOptions>
shared_keepalibe_options() const {
return _keepalive_options;
}

private:
// SocketKeepaliveOptions is not often used, allocate it on heap to
// prevent SocketKeepaliveOptions from being bloated in most cases.
std::shared_ptr<SocketKeepaliveOptions> _keepalive_options;
};

// Abstractions on reading from and writing into file descriptors.
Expand Down Expand Up @@ -621,6 +651,8 @@ friend void DereferenceSocket(Socket*);

int ResetFileDescriptor(int fd);

void SetKeepalive(int fd);

// Wait until nref hits `expected_nref' and reset some internal resources.
int WaitAndReset(int32_t expected_nref);

Expand Down Expand Up @@ -883,14 +915,10 @@ friend void DereferenceSocket(Socket*);

butil::atomic<int64_t> _ninflight_app_health_check;

// Enable TCP keepalive or not.
bool _keepalive;
// Start keeplives after this period.
int _keepidle_s;
// Interval between keepalives.
int _keepintvl_s;
// Number of keepalives before death.
int _keepcnt;
// Socket keepalive related options.
// Refer to `SocketKeepaliveOptions' for details.
// non-NULL means that keepalive is on.
std::shared_ptr<SocketKeepaliveOptions> _keepalive_options;
};

} // namespace brpc
Expand Down
4 changes: 0 additions & 4 deletions src/brpc/socket_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ inline SocketOptions::SocketOptions()
, conn(NULL)
, app_connect(NULL)
, initial_parsing_context(NULL)
, keepalive(false)
, keepidle_s(-1)
, keepintvl_s(-1)
, keepcnt(-1)
{}

inline int Socket::Dereference() {
Expand Down
Loading

0 comments on commit 3dacaa5

Please sign in to comment.