Skip to content

Commit

Permalink
optimize keepalive code
Browse files Browse the repository at this point in the history
  • Loading branch information
chenguangming committed Jan 31, 2023
1 parent 3dacaa5 commit 64ba10b
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 67 deletions.
24 changes: 14 additions & 10 deletions src/brpc/input_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,11 +487,12 @@ int InputMessenger::Create(const butil::EndPoint& remote_side,
options.on_edge_triggered_events = OnNewMessages;
options.health_check_interval_s = health_check_interval_s;
if (FLAGS_socket_keepalive) {
options.mutable_keepalive_options()->keepalive_idle_s
options.keepalive_options = std::make_shared<SocketKeepaliveOptions>();
options.keepalive_options->keepalive_idle_s
= FLAGS_socket_keepalive_idle_s;
options.mutable_keepalive_options()->keepalive_interval_s
options.keepalive_options->keepalive_interval_s
= FLAGS_socket_keepalive_interval_s;
options.mutable_keepalive_options()->keepalive_count
options.keepalive_options->keepalive_count
= FLAGS_socket_keepalive_count;
}
return Socket::Create(options, id);
Expand All @@ -511,17 +512,20 @@ int InputMessenger::Create(SocketOptions options, SocketId* id) {
}
// Enable keepalive by options or Gflag.
// Priority: options > Gflag.
if (options.has_keepalive_options() || FLAGS_socket_keepalive) {
if (options.mutable_keepalive_options()->keepalive_idle_s <= 0) {
options.mutable_keepalive_options()->keepalive_idle_s
while (options.keepalive_options || FLAGS_socket_keepalive) {
if (!options.keepalive_options) {
options.keepalive_options = std::make_shared<SocketKeepaliveOptions>();
}
if (options.keepalive_options->keepalive_idle_s <= 0) {
options.keepalive_options->keepalive_idle_s
= FLAGS_socket_keepalive_idle_s;
}
if (options.mutable_keepalive_options()->keepalive_interval_s <= 0) {
options.mutable_keepalive_options()->keepalive_interval_s
if (options.keepalive_options->keepalive_interval_s <= 0) {
options.keepalive_options->keepalive_interval_s
= FLAGS_socket_keepalive_interval_s;
}
if (options.mutable_keepalive_options()->keepalive_count <= 0) {
options.mutable_keepalive_options()->keepalive_count
if (options.keepalive_options->keepalive_count <= 0) {
options.keepalive_options->keepalive_count
= FLAGS_socket_keepalive_count;
}
}
Expand Down
64 changes: 31 additions & 33 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,29 +557,27 @@ int Socket::ResetFileDescriptor(int fd) {
// OK to fail, namely unix domain socket does not support this.
butil::make_no_delay(fd);
if (_tos > 0 &&
setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) < 0) {
PLOG(FATAL) << "Fail to set tos of fd=" << fd << " to " << _tos;
setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) != 0) {
PLOG(ERROR) << "Fail to set tos of fd=" << fd << " to " << _tos;
}

if (FLAGS_socket_send_buffer_size > 0) {
int buff_size = FLAGS_socket_send_buffer_size;
socklen_t size = sizeof(buff_size);
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buff_size, size) != 0) {
PLOG(FATAL) << "Fail to set sndbuf of fd=" << fd << " to "
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buff_size, sizeof(buff_size)) != 0) {
PLOG(ERROR) << "Fail to set sndbuf of fd=" << fd << " to "
<< buff_size;
}
}

if (FLAGS_socket_recv_buffer_size > 0) {
int buff_size = FLAGS_socket_recv_buffer_size;
socklen_t size = sizeof(buff_size);
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buff_size, size) != 0) {
PLOG(FATAL) << "Fail to set rcvbuf of fd=" << fd << " to "
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buff_size, sizeof(buff_size)) != 0) {
PLOG(ERROR) << "Fail to set rcvbuf of fd=" << fd << " to "
<< buff_size;
}
}

SetKeepalive(fd);
EnableKeepaliveIfNeeded(fd);

if (_on_edge_triggered_events) {
if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) {
Expand All @@ -592,64 +590,64 @@ int Socket::ResetFileDescriptor(int fd) {
return 0;
}

void Socket::SetKeepalive(int fd) {
void Socket::EnableKeepaliveIfNeeded(int fd) {
if (!_keepalive_options) {
return;
} else {
int keep_alive = 1;
socklen_t size = sizeof(keep_alive);
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keep_alive, size)!=0) {
LOG(FATAL) << "Fail to set keepalive of fd=" << fd;
return;
}
}

int keepalive = 1;
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive,
sizeof(keepalive)) != 0) {
PLOG(ERROR) << "Fail to set keepalive of fd=" << fd;
return;
}

#if defined(OS_LINUX)
if (_keepalive_options->keepalive_idle_s > 0) {
socklen_t size = sizeof(_keepalive_options->keepalive_idle_s);
if (setsockopt(fd, SOL_TCP, TCP_KEEPIDLE,
&_keepalive_options->keepalive_idle_s, size) < 0) {
&_keepalive_options->keepalive_idle_s,
sizeof(_keepalive_options->keepalive_idle_s)) != 0) {
LOG(FATAL) << "Fail to set keepidle of fd=" << fd;
}
}

if (_keepalive_options->keepalive_interval_s > 0) {
socklen_t size = sizeof(_keepalive_options->keepalive_interval_s);
if (setsockopt(fd, SOL_TCP, TCP_KEEPINTVL,
&_keepalive_options->keepalive_interval_s, size) < 0) {
&_keepalive_options->keepalive_interval_s,
sizeof(_keepalive_options->keepalive_interval_s)) != 0) {
LOG(FATAL) << "Fail to set keepintvl of fd=" << fd;
}
}

if (_keepalive_options->keepalive_count > 0) {
socklen_t size = sizeof(_keepalive_options->keepalive_count);
if (setsockopt(fd, SOL_TCP, TCP_KEEPCNT,
&_keepalive_options->keepalive_count, size) < 0) {
&_keepalive_options->keepalive_count,
sizeof(_keepalive_options->keepalive_count)) != 0) {
LOG(FATAL) << "Fail to set keepcnt of fd=" << fd;
}
}
#elif defined(OS_MACOSX)
if (_keepalive_options->keepalive_idle_s > 0) {
socklen_t size = sizeof(_keepalive_options->keepalive_idle_s);
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE,
&_keepalive_options->keepalive_idle_s, size) < 0) {
LOG(FATAL) << "Fail to set keepidle of fd=" << fd;
&_keepalive_options->keepalive_idle_s,
sizeof(_keepalive_options->keepalive_idle_s)) != 0) {
PLOG(ERROR) << "Fail to set keepidle of fd=" << fd;
}
}

if (_keepalive_options->keepalive_interval_s > 0) {
socklen_t size = sizeof(_keepalive_options->keepalive_interval_s);
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL,
&_keepalive_options->keepalive_interval_s, size) < 0) {
LOG(FATAL) << "Fail to set keepintvl of fd=" << fd;
&_keepalive_options->keepalive_interval_s,
sizeof(_keepalive_options->keepalive_interval_s)) != 0) {
PLOG(ERROR) << "Fail to set keepintvl of fd=" << fd;
}
}

if (_keepalive_options->keepalive_count > 0) {
socklen_t size = sizeof(_keepalive_options->keepalive_count);
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT,
&_keepalive_options->keepalive_count, size) < 0) {
LOG(FATAL) << "Fail to set keepcnt of fd=" << fd;
&_keepalive_options->keepalive_count,
sizeof(_keepalive_options->keepalive_count)) != 0) {
PLOG(ERROR) << "Fail to set keepcnt of fd=" << fd;
}
}
#endif
Expand Down Expand Up @@ -741,7 +739,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
}
m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
m->_unwritten_bytes.store(0, butil::memory_order_relaxed);
m->_keepalive_options = options.shared_keepalibe_options();
m->_keepalive_options = options.keepalive_options;
CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));
// Must be last one! Internal fields of this Socket may be access
// just after calling ResetFileDescriptor.
Expand Down
27 changes: 3 additions & 24 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,29 +214,8 @@ struct SocketOptions {
Destroyable* initial_parsing_context;

// Socket keepalive related options.
// Refer to `SocketKeepaliveOptions' for details
void enable_keepalive() {
if (!_keepalive_options) {
_keepalive_options.reset(new SocketKeepaliveOptions);
}
}
bool has_keepalive_options() { return _keepalive_options != NULL; }
const SocketKeepaliveOptions& keepalive_options() const {
return *_keepalive_options;
}
SocketKeepaliveOptions* mutable_keepalive_options() {
enable_keepalive();
return _keepalive_options.get();
}
std::shared_ptr<SocketKeepaliveOptions>
shared_keepalibe_options() const {
return _keepalive_options;
}

private:
// SocketKeepaliveOptions is not often used, allocate it on heap to
// prevent SocketKeepaliveOptions from being bloated in most cases.
std::shared_ptr<SocketKeepaliveOptions> _keepalive_options;
// Refer to `SocketKeepaliveOptions' for details.
std::shared_ptr<SocketKeepaliveOptions> keepalive_options;
};

// Abstractions on reading from and writing into file descriptors.
Expand Down Expand Up @@ -651,7 +630,7 @@ friend void DereferenceSocket(Socket*);

int ResetFileDescriptor(int fd);

void SetKeepalive(int fd);
void EnableKeepaliveIfNeeded(int fd);

// Wait until nref hits `expected_nref' and reset some internal resources.
int WaitAndReset(int32_t expected_nref);
Expand Down

0 comments on commit 64ba10b

Please sign in to comment.