Skip to content

Commit

Permalink
Opt Socket health checking
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Apr 28, 2024
1 parent f98ecb9 commit 103e5f0
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/brpc/rtmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ class RtmpSocketCreator : public SocketCreator {
: _connect_options(connect_options) {
}

int CreateSocket(const SocketOptions& opt, SocketId* id) {
int CreateSocket(const SocketOptions& opt, SocketId* id) override {
SocketOptions sock_opt = opt;
sock_opt.app_connect = std::make_shared<RtmpConnect>();
sock_opt.initial_parsing_context = new policy::RtmpContext(&_connect_options, NULL);
Expand Down
20 changes: 12 additions & 8 deletions src/brpc/selective_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ friend class SubDone;
ChannelBalancer::~ChannelBalancer() {
for (ChannelToIdMap::iterator
it = _chan_map.begin(); it != _chan_map.end(); ++it) {
SocketUniquePtr ptr(it->second); // Dereference
it->second->ReleaseAdditionalReference();
it->second->ReleaseHCRelatedReference();
}
_chan_map.clear();
}
Expand Down Expand Up @@ -196,15 +196,21 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
return -1;
}
SocketUniquePtr ptr;
CHECK_EQ(0, Socket::Address(sock_id, &ptr));
int rc = Socket::AddressFailedAsWell(sock_id, &ptr);
if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) {
LOG(FATAL) << "Fail to address SocketId=" << sock_id;
return -1;
}
if (!AddServer(ServerId(sock_id))) {
LOG(ERROR) << "Duplicated sub_channel=" << sub_channel;
// sub_chan will be deleted when the socket is recycled.
ptr->SetFailed();
// Cancel health checking.
ptr->ReleaseHCRelatedReference();
return -1;
}
ptr->SetHCRelatedRefHeld(); // set held status
_chan_map[sub_channel]= ptr.release(); // Add reference.
// The health-check-related reference has been held on created.
_chan_map[sub_channel]= ptr.get();
if (handle) {
*handle = sock_id;
}
Expand All @@ -223,13 +229,11 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha
BAIDU_SCOPED_LOCK(_mutex);
CHECK_EQ(1UL, _chan_map.erase(sub->chan));
}
{
ptr->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr2(ptr.get()); // Dereference.
}
if (rc == 0) {
ptr->ReleaseAdditionalReference();
}
// Cancel health checking.
ptr->ReleaseHCRelatedReference();
}
}

Expand Down
18 changes: 15 additions & 3 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -788,9 +788,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
m->_keepalive_options = options.keepalive_options;
m->_bthread_tag = options.bthread_tag;
CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));
<<<<<<< HEAD
m->_is_write_shutdown = false;
=======
int fd = options.fd;
if (!m->ValidFileDescriptor(fd) && options.connect_on_create) {
// Connect on create.
Expand All @@ -804,7 +802,6 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
return -1;
}
}
>>>>>>> 6bcd2619 (Support connect on socket create)
// Must be last one! Internal fields of this Socket may be access
// just after calling ResetFileDescriptor.
if (m->ResetFileDescriptor(fd) != 0) {
Expand All @@ -814,10 +811,25 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
berror(saved_errno));
return -1;
}
m->HoldHCRelatedRef();
*id = m->_this_id;
return 0;
}

void Socket::HoldHCRelatedRef() {
if (_health_check_interval_s > 0) {
_is_hc_related_ref_held = true;
_versioned_ref.fetch_add(1, butil::memory_order_release);
}
}

void Socket::ReleaseHCRelatedReference() {
if (_health_check_interval_s > 0) {
_is_hc_related_ref_held = false;
Dereference();
}
}

int Socket::WaitAndReset(int32_t expected_nref) {
const uint32_t id_ver = VersionOfSocketId(_this_id);
uint64_t vref;
Expand Down
17 changes: 10 additions & 7 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,16 +413,15 @@ friend class policy::H2GlobalStreamCreator;

// True if health checking is enabled.
bool HCEnabled() const {
// This fence makes sure that we see change of
// `_is_hc_related_ref_held' before changing `_versioned_ref.
butil::atomic_thread_fence(butil::memory_order_acquire);
return _health_check_interval_s > 0 && _is_hc_related_ref_held;
}

// When someone holds a health-checking-related reference,
// this function need to be called to make health checking run normally.
void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; }
// When someone releases the health-checking-related reference,
// this function need to be called to cancel health checking.
void SetHCRelatedRefReleased() { _is_hc_related_ref_held = false; }
bool IsHCRelatedRefHeld() const { return _is_hc_related_ref_held; }
// Release the health-checking-related
// reference which is held on created.
void ReleaseHCRelatedReference();

// After health checking is complete, set _hc_started to false.
void AfterHCCompleted() { _hc_started.store(false, butil::memory_order_relaxed); }
Expand Down Expand Up @@ -686,6 +685,10 @@ friend class policy::H2GlobalStreamCreator;
int ConductError(bthread_id_t);
int StartWrite(WriteRequest*, const WriteOptions&);

// Hold the health-checking-related
// reference on created.
void HoldHCRelatedRef();

int Dereference();
friend void DereferenceSocket(Socket*);

Expand Down
14 changes: 6 additions & 8 deletions src/brpc/socket_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static butil::static_atomic<SocketMap*> g_socket_map = BUTIL_STATIC_ATOMIC_INIT(

class GlobalSocketCreator : public SocketCreator {
public:
int CreateSocket(const SocketOptions& opt, SocketId* id) {
int CreateSocket(const SocketOptions& opt, SocketId* id) override {
SocketOptions sock_opt = opt;
sock_opt.health_check_interval_s = FLAGS_health_check_interval;
return get_client_side_messenger()->Create(sock_opt, id);
Expand Down Expand Up @@ -237,8 +237,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
return 0;
}
// A socket w/o HC is failed (permanently), replace it.
sc->socket->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr(sc->socket); // Remove the ref added at insertion.
sc->socket->ReleaseHCRelatedReference();
_map.erase(key); // in principle, we can override the entry in map w/o
// removing and inserting it again. But this would make error branches
// below have to remove the entry before returning, which is
Expand All @@ -258,12 +257,12 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
// use SocketUniquePtr which cannot put into containers before c++11.
// The ref will be removed at entry's removal.
SocketUniquePtr ptr;
if (Socket::Address(tmp_id, &ptr) != 0) {
int rc = Socket::AddressFailedAsWell(tmp_id, &ptr);
if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) {
LOG(FATAL) << "Fail to address SocketId=" << tmp_id;
return -1;
}
ptr->SetHCRelatedRefHeld(); // set held status
SingleConnection new_sc = { 1, ptr.release(), 0 };
SingleConnection new_sc = { 1, ptr.get(), 0 };
_map[key] = new_sc;
*id = tmp_id;
mu.unlock();
Expand Down Expand Up @@ -301,8 +300,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
_map.erase(key);
mu.unlock();
s->ReleaseAdditionalReference(); // release extra ref
s->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr(s); // Dereference
s->ReleaseHCRelatedReference();
}
}
}
Expand Down
21 changes: 12 additions & 9 deletions test/brpc_socket_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,6 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
{
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));
s->SetHCRelatedRefHeld(); // set held status
global_sock = s.get();
ASSERT_TRUE(s.get());
ASSERT_EQ(-1, s->fd());
Expand Down Expand Up @@ -542,6 +541,7 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
#endif
ASSERT_TRUE(src.empty());
ASSERT_EQ(-1, s->fd());
s->ReleaseHCRelatedReference();
}
// StartHealthCheck is possibly still running. Spin until global_sock
// is NULL(set in CheckRecycle::BeforeRecycle). Notice that you should
Expand Down Expand Up @@ -650,12 +650,14 @@ TEST_F(SocketTest, health_check) {
options.user = new CheckRecycle;
options.health_check_interval_s = kCheckInteval/*s*/;
ASSERT_EQ(0, brpc::Socket::Create(options, &id));
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));

s->SetHCRelatedRefHeld(); // set held status
global_sock = s.get();
ASSERT_TRUE(s.get());
brpc::Socket* s = NULL;
{
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
s = ptr.get();
}
global_sock = s;
ASSERT_NE(nullptr, s);
ASSERT_EQ(-1, s->fd());
ASSERT_EQ(point, s->remote_side());
ASSERT_EQ(id, s->id());
Expand Down Expand Up @@ -763,7 +765,7 @@ TEST_F(SocketTest, health_check) {
ASSERT_NE(0, ptr->fd());
}

s.release()->Dereference();
s->ReleaseHCRelatedReference();

// Must stop messenger before SetFailed the id otherwise StartHealthCheck
// still has chance to get reconnected and revive the id.
Expand All @@ -779,7 +781,8 @@ TEST_F(SocketTest, health_check) {
bthread_usleep(1000);
ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L);
}
ASSERT_EQ(-1, brpc::Socket::Status(id));
nref = 0;
ASSERT_EQ(-1, brpc::Socket::Status(id, &nref)) << "nref=" << nref;
// The id is invalid.
brpc::SocketUniquePtr ptr;
ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr));
Expand Down

0 comments on commit 103e5f0

Please sign in to comment.