diff --git a/src/brpc/rtmp.cpp b/src/brpc/rtmp.cpp index ae6eb6ad95..4913881cb1 100644 --- a/src/brpc/rtmp.cpp +++ b/src/brpc/rtmp.cpp @@ -1087,7 +1087,7 @@ class RtmpSocketCreator : public SocketCreator { : _connect_options(connect_options) { } - int CreateSocket(const SocketOptions& opt, SocketId* id) { + int CreateSocket(const SocketOptions& opt, SocketId* id) override { SocketOptions sock_opt = opt; sock_opt.app_connect = std::make_shared(); sock_opt.initial_parsing_context = new policy::RtmpContext(&_connect_options, NULL); diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index 9ad5f9a0cb..5a81582108 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -158,8 +158,8 @@ friend class SubDone; ChannelBalancer::~ChannelBalancer() { for (ChannelToIdMap::iterator it = _chan_map.begin(); it != _chan_map.end(); ++it) { - SocketUniquePtr ptr(it->second); // Dereference it->second->ReleaseAdditionalReference(); + it->second->ReleaseHCRelatedReference(); } _chan_map.clear(); } @@ -196,15 +196,21 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, return -1; } SocketUniquePtr ptr; - CHECK_EQ(0, Socket::Address(sock_id, &ptr)); + int rc = Socket::AddressFailedAsWell(sock_id, &ptr); + if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) { + LOG(FATAL) << "Fail to address SocketId=" << sock_id; + return -1; + } if (!AddServer(ServerId(sock_id))) { LOG(ERROR) << "Duplicated sub_channel=" << sub_channel; // sub_chan will be deleted when the socket is recycled. ptr->SetFailed(); + // Cancel health checking. + ptr->ReleaseHCRelatedReference(); return -1; } - ptr->SetHCRelatedRefHeld(); // set held status - _chan_map[sub_channel]= ptr.release(); // Add reference. + // The health-check-related reference has been held on created. + _chan_map[sub_channel]= ptr.get(); if (handle) { *handle = sock_id; } @@ -223,13 +229,11 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha BAIDU_SCOPED_LOCK(_mutex); CHECK_EQ(1UL, _chan_map.erase(sub->chan)); } - { - ptr->SetHCRelatedRefReleased(); // set released status to cancel health checking - SocketUniquePtr ptr2(ptr.get()); // Dereference. - } if (rc == 0) { ptr->ReleaseAdditionalReference(); } + // Cancel health checking. + ptr->ReleaseHCRelatedReference(); } } diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 95d1c7ed4e..91fcd9b8f7 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -772,10 +772,25 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { berror(saved_errno)); return -1; } + m->HoldHCRelatedRef(); *id = m->_this_id; return 0; } +void Socket::HoldHCRelatedRef() { + if (_health_check_interval_s > 0) { + _is_hc_related_ref_held = true; + _versioned_ref.fetch_add(1, butil::memory_order_release); + } +} + +void Socket::ReleaseHCRelatedReference() { + if (_health_check_interval_s > 0) { + _is_hc_related_ref_held = false; + Dereference(); + } +} + int Socket::WaitAndReset(int32_t expected_nref) { const uint32_t id_ver = VersionOfSocketId(_this_id); uint64_t vref; diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 3c389087dc..b705cbcff6 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -336,16 +336,15 @@ friend class policy::H2GlobalStreamCreator; // True if health checking is enabled. bool HCEnabled() const { + // This fence makes sure that we see change of + // `_is_hc_related_ref_held' before changing `_versioned_ref. + butil::atomic_thread_fence(butil::memory_order_acquire); return _health_check_interval_s > 0 && _is_hc_related_ref_held; } - // When someone holds a health-checking-related reference, - // this function need to be called to make health checking run normally. - void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; } - // When someone releases the health-checking-related reference, - // this function need to be called to cancel health checking. - void SetHCRelatedRefReleased() { _is_hc_related_ref_held = false; } - bool IsHCRelatedRefHeld() const { return _is_hc_related_ref_held; } + // Release the health-checking-related + // reference which is held on created. + void ReleaseHCRelatedReference(); // After health checking is complete, set _hc_started to false. void AfterHCCompleted() { _hc_started.store(false, butil::memory_order_relaxed); } @@ -609,6 +608,10 @@ friend class policy::H2GlobalStreamCreator; int ConductError(bthread_id_t); int StartWrite(WriteRequest*, const WriteOptions&); + // Hold the health-checking-related + // reference on created. + void HoldHCRelatedRef(); + int Dereference(); friend void DereferenceSocket(Socket*); diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index 774bf5a749..4dc76e5494 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -58,7 +58,7 @@ static butil::static_atomic g_socket_map = BUTIL_STATIC_ATOMIC_INIT( class GlobalSocketCreator : public SocketCreator { public: - int CreateSocket(const SocketOptions& opt, SocketId* id) { + int CreateSocket(const SocketOptions& opt, SocketId* id) override { SocketOptions sock_opt = opt; sock_opt.health_check_interval_s = FLAGS_health_check_interval; return get_client_side_messenger()->Create(sock_opt, id); @@ -237,8 +237,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, return 0; } // A socket w/o HC is failed (permanently), replace it. - sc->socket->SetHCRelatedRefReleased(); // set released status to cancel health checking - SocketUniquePtr ptr(sc->socket); // Remove the ref added at insertion. + sc->socket->ReleaseHCRelatedReference(); _map.erase(key); // in principle, we can override the entry in map w/o // removing and inserting it again. But this would make error branches // below have to remove the entry before returning, which is @@ -258,12 +257,12 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, // use SocketUniquePtr which cannot put into containers before c++11. // The ref will be removed at entry's removal. SocketUniquePtr ptr; - if (Socket::Address(tmp_id, &ptr) != 0) { + int rc = Socket::AddressFailedAsWell(tmp_id, &ptr); + if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) { LOG(FATAL) << "Fail to address SocketId=" << tmp_id; return -1; } - ptr->SetHCRelatedRefHeld(); // set held status - SingleConnection new_sc = { 1, ptr.release(), 0 }; + SingleConnection new_sc = { 1, ptr.get(), 0 }; _map[key] = new_sc; *id = tmp_id; mu.unlock(); @@ -301,8 +300,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key, _map.erase(key); mu.unlock(); s->ReleaseAdditionalReference(); // release extra ref - s->SetHCRelatedRefReleased(); // set released status to cancel health checking - SocketUniquePtr ptr(s); // Dereference + s->ReleaseHCRelatedReference(); } } } diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp index d225873531..8a5addb460 100644 --- a/test/brpc_socket_unittest.cpp +++ b/test/brpc_socket_unittest.cpp @@ -504,7 +504,6 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) { { brpc::SocketUniquePtr s; ASSERT_EQ(0, brpc::Socket::Address(id, &s)); - s->SetHCRelatedRefHeld(); // set held status global_sock = s.get(); ASSERT_TRUE(s.get()); ASSERT_EQ(-1, s->fd()); @@ -542,6 +541,7 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) { #endif ASSERT_TRUE(src.empty()); ASSERT_EQ(-1, s->fd()); + s->ReleaseHCRelatedReference(); } // StartHealthCheck is possibly still running. Spin until global_sock // is NULL(set in CheckRecycle::BeforeRecycle). Notice that you should @@ -650,12 +650,14 @@ TEST_F(SocketTest, health_check) { options.user = new CheckRecycle; options.health_check_interval_s = kCheckInteval/*s*/; ASSERT_EQ(0, brpc::Socket::Create(options, &id)); - brpc::SocketUniquePtr s; - ASSERT_EQ(0, brpc::Socket::Address(id, &s)); - - s->SetHCRelatedRefHeld(); // set held status - global_sock = s.get(); - ASSERT_TRUE(s.get()); + brpc::Socket* s = NULL; + { + brpc::SocketUniquePtr ptr; + ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); + s = ptr.get(); + } + global_sock = s; + ASSERT_NE(nullptr, s); ASSERT_EQ(-1, s->fd()); ASSERT_EQ(point, s->remote_side()); ASSERT_EQ(id, s->id()); @@ -763,7 +765,7 @@ TEST_F(SocketTest, health_check) { ASSERT_NE(0, ptr->fd()); } - s.release()->Dereference(); + s->ReleaseHCRelatedReference(); // Must stop messenger before SetFailed the id otherwise StartHealthCheck // still has chance to get reconnected and revive the id. @@ -779,7 +781,8 @@ TEST_F(SocketTest, health_check) { bthread_usleep(1000); ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L); } - ASSERT_EQ(-1, brpc::Socket::Status(id)); + nref = 0; + ASSERT_EQ(-1, brpc::Socket::Status(id, &nref)) << "nref=" << nref; // The id is invalid. brpc::SocketUniquePtr ptr; ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr));