diff --git a/src/brpc/acceptor.cpp b/src/brpc/acceptor.cpp index d265725849..e8e6dcbeb6 100644 --- a/src/brpc/acceptor.cpp +++ b/src/brpc/acceptor.cpp @@ -321,7 +321,7 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) { // Always add this socket into `_socket_map' whether it // has been `SetFailed' or not, whether `Acceptor' is // running or not. Otherwise, `Acceptor::BeforeRecycle' - // may be called (inside Socket::OnRecycle) after `Acceptor' + // may be called (inside Socket::BeforeRecycled) after `Acceptor' // has been destroyed am->_socket_map.insert(socket_id, ConnectStatistics()); } diff --git a/src/brpc/details/health_check.cpp b/src/brpc/details/health_check.cpp index ab838cc8b2..a8074f9255 100644 --- a/src/brpc/details/health_check.cpp +++ b/src/brpc/details/health_check.cpp @@ -213,7 +213,8 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { ptr->_ninflight_app_health_check.fetch_add( 1, butil::memory_order_relaxed); } - ptr->Revive(); + // See comments above. + ptr->Revive(2/*note*/); ptr->_hc_count = 0; if (!FLAGS_health_check_path.empty()) { HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s); diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp index 3d5b626cb6..067bfb4da6 100644 --- a/src/brpc/event_dispatcher.cpp +++ b/src/brpc/event_dispatcher.cpp @@ -71,34 +71,30 @@ EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) { return g_edisp[tag * FLAGS_event_dispatcher_num + index]; } -int EventData::OnCreate(const EventDataOptions* options) { - if (!options) { - LOG(ERROR) << "options is NULL"; - return -1; - } - if (options->user_id == INVALID_VREF_ID) { +int EventData::OnCreated(const EventDataOptions& options) { + if (options.user_id == INVALID_VREF_ID) { LOG(ERROR) << "Invalid user_id=-1"; return -1; } - if (!options->input_cb) { + if (!options.input_cb) { LOG(ERROR) << "Invalid input_cb=NULL"; return -1; } - if (!options->output_cb) { + if (!options.output_cb) { LOG(ERROR) << "Invalid output_cb=NULL"; return -1; } - _options = *options; + _options = options; return 0; } -void EventData::OnRecycle() { +void EventData::BeforeRecycled() { _options = {INVALID_EVENT_DATA_ID, NULL, NULL}; } void MakeEventDataIdInvalid(EventDataId& id) { - EventData::SetFailed(id); + EventData::SetFailedById(id); id = INVALID_EVENT_DATA_ID; } diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h index a5ec29e3c6..ad4f13df0c 100644 --- a/src/brpc/event_dispatcher.h +++ b/src/brpc/event_dispatcher.h @@ -56,12 +56,11 @@ struct EventDataOptions { // stored in epoll/kqueue data, and calls its callback, so // EventDispatcher supports various IO types, such as socket, // pipe, eventfd, timerfd, etc. -class EventData : public VersionedRefWithId { +class EventData : public VersionedRefWithId { public: explicit EventData(Forbidden f) - : VersionedRefWithId(f) - , _options{INVALID_EVENT_DATA_ID, NULL, NULL} - {} + : VersionedRefWithId(f) + , _options{INVALID_EVENT_DATA_ID, NULL, NULL} {} DISALLOW_COPY_AND_ASSIGN(EventData); @@ -76,8 +75,10 @@ class EventData : public VersionedRefWithId { } private: - int OnCreate(const EventDataOptions* options) override; - void OnRecycle() override; +friend class VersionedRefWithId; + + int OnCreated(const EventDataOptions& options); + void BeforeRecycled(); EventDataOptions _options; }; diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index c2550a7d5a..7494b8bf8a 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -422,9 +422,9 @@ class Socket::EpollOutRequest : public SocketUser { static const uint64_t AUTH_FLAG = (1ul << 32); -Socket::Socket(Forbidden) +Socket::Socket(Forbidden f) // must be even because Address() relies on evenness of version - : _versioned_ref(0) + : VersionedRefWithId(f) , _shared_part(NULL) , _nevent(0) , _keytable_pool(NULL) @@ -434,7 +434,6 @@ Socket::Socket(Forbidden) , _on_edge_triggered_events(NULL) , _user(NULL) , _conn(NULL) - , _this_id(0) , _event_data_id(0) , _preferred_index(-1) , _hc_count(0) @@ -459,7 +458,6 @@ Socket::Socket(Forbidden) , _overcrowded(false) , _fail_me_at_server_stop(false) , _logoff_flag(false) - , _additional_ref_status(REF_USING) , _error_code(0) , _pipeline_q(NULL) , _last_writetime_us(0) @@ -663,128 +661,264 @@ void Socket::EnableKeepaliveIfNeeded(int fd) { // version: from version part of _versioned_nref, must be an EVEN number. // slot: designated by ResourcePool. int Socket::Create(const SocketOptions& options, SocketId* id) { - butil::ResourceId slot; - Socket* const m = butil::get_resource(&slot, Forbidden()); - if (m == NULL) { - LOG(FATAL) << "Fail to get_resource"; - return -1; - } - // nref can be non-zero due to concurrent AddressSocket(). - // _this_id will only be used in destructor/Destroy of referenced - // slots, which is safe and properly fenced. Although it's better - // to put the id into SocketUniquePtr. - m->_this_id = MakeSocketId( - VersionOfVRef(m->_versioned_ref.fetch_add( - 1, butil::memory_order_release)), slot); + return VersionedRefWithId::Create(id, options); +} + +int Socket::OnCreated(const SocketOptions& options) { EventDataOptions event_data_options{ - m->_this_id, StartInputEvent, HandleEpollOut + id(), StartInputEvent, HandleEpollOut }; - if (EventData::Create(&event_data_options, &m->_event_data_id) !=0) { + if (EventData::Create(&_event_data_id, event_data_options) !=0) { LOG(ERROR) << "Fail to create EventDispatcherData"; return -1; } - auto guard = butil::MakeScopeGuard([m] { - MakeEventDataIdInvalid(m->_event_data_id); + auto guard = butil::MakeScopeGuard([this] { + MakeEventDataIdInvalid(_event_data_id); }); g_vars->nsocket << 1; - CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed)); - m->_nevent.store(0, butil::memory_order_relaxed); - m->_keytable_pool = options.keytable_pool; - m->_tos = 0; - m->_remote_side = options.remote_side; - m->_on_edge_triggered_events = options.on_edge_triggered_events; - m->_user = options.user; - m->_conn = options.conn; - m->_app_connect = options.app_connect; - m->_preferred_index = -1; - m->_hc_count = 0; - CHECK(m->_read_buf.empty()); + CHECK(NULL == _shared_part.load(butil::memory_order_relaxed)); + _nevent.store(0, butil::memory_order_relaxed); + _keytable_pool = options.keytable_pool; + _tos = 0; + _remote_side = options.remote_side; + _on_edge_triggered_events = options.on_edge_triggered_events; + _user = options.user; + _conn = options.conn; + _app_connect = options.app_connect; + _preferred_index = -1; + _hc_count = 0; + CHECK(_read_buf.empty()); const int64_t cpuwide_now = butil::cpuwide_time_us(); - m->_last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed); - m->reset_parsing_context(options.initial_parsing_context); - m->_correlation_id = 0; - m->_health_check_interval_s = options.health_check_interval_s; - m->_is_hc_related_ref_held = false; - m->_hc_started.store(false, butil::memory_order_relaxed); - m->_ninprocess.store(1, butil::memory_order_relaxed); - m->_auth_flag_error.store(0, butil::memory_order_relaxed); - const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL); + _last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed); + reset_parsing_context(options.initial_parsing_context); + _correlation_id = 0; + _health_check_interval_s = options.health_check_interval_s; + _is_hc_related_ref_held = false; + _hc_started.store(false, butil::memory_order_relaxed); + _ninprocess.store(1, butil::memory_order_relaxed); + _auth_flag_error.store(0, butil::memory_order_relaxed); + const int rc2 = bthread_id_create(&_auth_id, NULL, NULL); if (rc2) { LOG(ERROR) << "Fail to create auth_id: " << berror(rc2); - m->SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2)); + SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2)); return -1; } - m->_force_ssl = options.force_ssl; + _force_ssl = options.force_ssl; // Disable SSL check if there is no SSL context - m->_ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN); - m->_ssl_session = NULL; - m->_ssl_ctx = options.initial_ssl_ctx; + _ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN); + _ssl_session = NULL; + _ssl_ctx = options.initial_ssl_ctx; #if BRPC_WITH_RDMA - CHECK(m->_rdma_ep == NULL); + CHECK(_rdma_ep == NULL); if (options.use_rdma) { - m->_rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(m); - if (!m->_rdma_ep) { + _rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(m); + if (!_rdma_ep) { const int saved_errno = errno; PLOG(ERROR) << "Fail to create RdmaEndpoint"; - m->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s", + SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s", berror(saved_errno)); return -1; } - m->_rdma_state = RDMA_UNKNOWN; + _rdma_state = RDMA_UNKNOWN; } else { - m->_rdma_state = RDMA_OFF; + _rdma_state = RDMA_OFF; } #endif - m->_connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN; - m->_controller_released_socket.store(false, butil::memory_order_relaxed); - m->_overcrowded = false; - // May be non-zero for RTMP connections. - m->_fail_me_at_server_stop = false; - m->_logoff_flag.store(false, butil::memory_order_relaxed); - m->_additional_ref_status.store(REF_USING, butil::memory_order_relaxed); - m->_error_code = 0; - m->_error_text.clear(); - m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed); - m->_total_streams_unconsumed_size.store(0, butil::memory_order_relaxed); - m->_ninflight_app_health_check.store(0, butil::memory_order_relaxed); + _connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN; + _controller_released_socket.store(false, butil::memory_order_relaxed); + _overcrowded = false; + // Maybe non-zero for RTMP connections. + _fail_me_at_server_stop = false; + _logoff_flag.store(false, butil::memory_order_relaxed); + _error_code = 0; + _agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed); + _total_streams_unconsumed_size.store(0, butil::memory_order_relaxed); + _ninflight_app_health_check.store(0, butil::memory_order_relaxed); // NOTE: last two params are useless in bthread > r32787 - const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512); + const int rc = bthread_id_list_init(&_id_wait_list, 512, 512); if (rc) { LOG(ERROR) << "Fail to init _id_wait_list: " << berror(rc); - m->SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc)); + SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc)); return -1; } - m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed); - m->_unwritten_bytes.store(0, butil::memory_order_relaxed); - m->_keepalive_options = options.keepalive_options; - m->_bthread_tag = options.bthread_tag; - CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed)); - // Must be last one! Internal fields of this Socket may be access + _last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed); + _unwritten_bytes.store(0, butil::memory_order_relaxed); + _keepalive_options = options.keepalive_options; + _bthread_tag = options.bthread_tag; + CHECK(NULL == _write_head.load(butil::memory_order_relaxed)); + // Must be the last one! Internal fields of this Socket may be accessed // just after calling ResetFileDescriptor. - if (m->ResetFileDescriptor(options.fd) != 0) { + if (ResetFileDescriptor(options.fd) != 0) { const int saved_errno = errno; PLOG(ERROR) << "Fail to ResetFileDescriptor"; - m->SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s", + SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s", berror(saved_errno)); return -1; } guard.dismiss(); - *id = m->_this_id; return 0; } +void Socket::BeforeRecycled() { + const bool create_by_connect = CreatedByConnect(); + if (_app_connect) { + std::shared_ptr tmp; + _app_connect.swap(tmp); + tmp->StopConnect(this); + } + if (_conn) { + SocketConnection* const saved_conn = _conn; + _conn = NULL; + saved_conn->BeforeRecycle(this); + } + if (_user) { + SocketUser* const saved_user = _user; + _user = NULL; + saved_user->BeforeRecycle(this); + } + SharedPart* sp = _shared_part.exchange(NULL, butil::memory_order_acquire); + if (sp) { + sp->RemoveRefManually(); + } + + // Recycle `_event_data_id'. + MakeEventDataIdInvalid(_event_data_id); + + const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed); + if (ValidFileDescriptor(prev_fd)) { + if (_on_edge_triggered_events != NULL) { + GetGlobalEventDispatcher(prev_fd, _bthread_tag).RemoveConsumer(prev_fd); + } + close(prev_fd); + if (create_by_connect) { + g_vars->channel_conn << -1; + } + } + +#if BRPC_WITH_RDMA + if (_rdma_ep) { + delete _rdma_ep; + _rdma_ep = NULL; + _rdma_state = RDMA_UNKNOWN; + } +#endif + + reset_parsing_context(NULL); + _read_buf.clear(); + + _auth_flag_error.store(0, butil::memory_order_relaxed); + bthread_id_error(_auth_id, 0); + + bthread_id_list_destroy(&_id_wait_list); + + if (_ssl_session) { + SSL_free(_ssl_session); + _ssl_session = NULL; + } + + _ssl_ctx = NULL; + + delete _pipeline_q; + _pipeline_q = NULL; + + delete _auth_context; + _auth_context = NULL; + + delete _stream_set; + _stream_set = NULL; + + const SocketId asid = _agent_socket_id.load(butil::memory_order_relaxed); + if (asid != INVALID_SOCKET_ID) { + SocketUniquePtr ptr; + if (Socket::Address(asid, &ptr) == 0) { + ptr->ReleaseAdditionalReference(); + } + } + g_vars->nsocket << -1; +} + +void Socket::OnFailed(int error_code, const std::string& error_text) { + // Update _error_text + pthread_mutex_lock(&_id_wait_list_mutex); + _error_code = error_code; + _error_text = error_text; + pthread_mutex_unlock(&_id_wait_list_mutex); + + // Do health-checking even if we're not connected before, needed + // by Channel to revive never-connected socket when server side + // comes online. + if (HCEnabled()) { + bool expect = false; + if (_hc_started.compare_exchange_strong(expect, + true, + butil::memory_order_relaxed, + butil::memory_order_relaxed)) { + GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); + StartHealthCheck(id(), + GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms()); + } else { + // No need to run 2 health checking at the same time. + RPC_VLOG << "There is already a health checking running " + "for SocketId=" << id(); + } + } + // Wake up all threads waiting on EPOLLOUT when closing fd + _epollout_butex->fetch_add(1, butil::memory_order_relaxed); + bthread::butex_wake_all(_epollout_butex); + + // Wake up all unresponded RPC. + CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe( + &_id_wait_list, error_code, error_text, + &_id_wait_list_mutex)); + + ResetAllStreams(); + // _app_connect shouldn't be set to NULL in SetFailed otherwise + // HC is always not supported. + // FIXME: Design a better interface for AppConnect + // if (_app_connect) { + // AppConnect* const saved_app_connect = _app_connect; + // _app_connect = NULL; + // saved_app_connect->StopConnect(this); + // } +} + +void Socket::AfterRevived() { + if (_user) { + _user->AfterRevived(this); + } else { + LOG(INFO) << "Revived " << description() << " (Connectable)"; + } +} + +std::string Socket::OnDescription() const { + // NOTE: The output of `description()' should be consistent with operator<<() + std::string result; + result.reserve(64); + const int saved_fd = fd(); + if (saved_fd >= 0) { + butil::string_appendf(&result, "fd=%d", saved_fd); + } + butil::string_appendf(&result, " addr=%s", + butil::endpoint2str(remote_side()).c_str()); + const int local_port = local_side().port; + if (local_port > 0) { + butil::string_appendf(&result, ":%d", local_port); + } + return result; +} + int Socket::WaitAndReset(int32_t expected_nref) { - const uint32_t id_ver = VersionOfSocketId(_this_id); + const uint32_t id_ver = VersionOfVRefId(id()); uint64_t vref; // Wait until nref == expected_nref. - while (1) { + while (true) { // The acquire fence pairs with release fence in Dereference to avoid // inconsistent states to be seen by others. - vref = _versioned_ref.load(butil::memory_order_acquire); + vref = versioned_ref(); if (VersionOfVRef(vref) != id_ver + 1) { - LOG(WARNING) << "SocketId=" << _this_id << " is already alive or recycled"; + LOG(WARNING) << "SocketId=" << id() << " is already alive or recycled"; return -1; } if (NRefOfVRef(vref) > expected_nref) { @@ -793,7 +927,7 @@ int Socket::WaitAndReset(int32_t expected_nref) { return -1; } } else if (NRefOfVRef(vref) < expected_nref) { - RPC_VLOG << "SocketId=" << _this_id + RPC_VLOG << "SocketId=" << id() << " was abandoned during health checking"; return -1; } else { @@ -801,7 +935,7 @@ int Socket::WaitAndReset(int32_t expected_nref) { // so no need to do health checking. if (!_is_hc_related_ref_held) { RPC_VLOG << "Nobody holds a health-checking-related reference" - << " for SocketId=" << _this_id; + << " for SocketId=" << id(); return -1; } @@ -869,59 +1003,6 @@ int Socket::WaitAndReset(int32_t expected_nref) { return 0; } -// We don't care about the return value of Revive. -void Socket::Revive() { - const uint32_t id_ver = VersionOfSocketId(_this_id); - uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed); - _additional_ref_status.store(REF_REVIVING, butil::memory_order_relaxed); - while (1) { - CHECK_EQ(id_ver + 1, VersionOfVRef(vref)); - - int32_t nref = NRefOfVRef(vref); - if (nref <= 1) { - // Set status to REF_RECYLED since no one uses this socket - _additional_ref_status.store(REF_RECYCLED, butil::memory_order_relaxed); - CHECK_EQ(1, nref); - LOG(WARNING) << *this << " was abandoned during revival"; - return; - } - // +1 is the additional ref added in Create(). TODO(gejun): we should - // remove this additional nref someday. - if (_versioned_ref.compare_exchange_weak( - vref, MakeVRef(id_ver, nref + 1/*note*/), - butil::memory_order_release, - butil::memory_order_relaxed)) { - // Set status to REF_USING since we add additional ref again - _additional_ref_status.store(REF_USING, butil::memory_order_relaxed); - if (_user) { - _user->AfterRevived(this); - } else { - LOG(INFO) << "Revived " << *this << " (Connectable)"; - } - return; - } - } -} - -int Socket::ReleaseAdditionalReference() { - do { - AdditionalRefStatus expect = REF_USING; - if (_additional_ref_status.compare_exchange_strong( - expect, - REF_RECYCLED, - butil::memory_order_relaxed, - butil::memory_order_relaxed)) { - return Dereference(); - } - - if (expect == REF_REVIVING) { // sched_yield to wait until status is not REF_REVIVING - sched_yield(); - } else { - return -1; // REF_RECYCLED - } - } while (1); -} - void Socket::AddRecentError() { SharedPart* sp = GetSharedPart(); if (sp) { @@ -945,87 +1026,6 @@ int Socket::isolated_times() const { return 0; } -int Socket::SetFailed(int error_code, const char* error_fmt, ...) { - if (error_code == 0) { - CHECK(false) << "error_code is 0"; - error_code = EFAILEDSOCKET; - } - const uint32_t id_ver = VersionOfSocketId(_this_id); - uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed); - for (;;) { // need iteration to retry compare_exchange_strong - if (VersionOfVRef(vref) != id_ver) { - return -1; - } - // Try to set version=id_ver+1 (to make later Address() return NULL), - // retry on fail. - if (_versioned_ref.compare_exchange_strong( - vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)), - butil::memory_order_release, - butil::memory_order_relaxed)) { - // Update _error_text - std::string error_text; - if (error_fmt != NULL) { - va_list ap; - va_start(ap, error_fmt); - butil::string_vprintf(&error_text, error_fmt, ap); - va_end(ap); - } - pthread_mutex_lock(&_id_wait_list_mutex); - _error_code = error_code; - _error_text = error_text; - pthread_mutex_unlock(&_id_wait_list_mutex); - - // Do health-checking even if we're not connected before, needed - // by Channel to revive never-connected socket when server side - // comes online. - if (HCEnabled()) { - bool expect = false; - if (_hc_started.compare_exchange_strong(expect, - true, - butil::memory_order_relaxed, - butil::memory_order_relaxed)) { - GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); - StartHealthCheck(id(), - GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms()); - } else { - // No need to run 2 health checking at the same time. - RPC_VLOG << "There is already a health checking running " - "for SocketId=" << _this_id; - } - } - // Wake up all threads waiting on EPOLLOUT when closing fd - _epollout_butex->fetch_add(1, butil::memory_order_relaxed); - bthread::butex_wake_all(_epollout_butex); - - // Wake up all unresponded RPC. - CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe( - &_id_wait_list, error_code, error_text, - &_id_wait_list_mutex)); - - ResetAllStreams(); - // _app_connect shouldn't be set to NULL in SetFailed otherwise - // HC is always not supported. - // FIXME: Design a better interface for AppConnect - // if (_app_connect) { - // AppConnect* const saved_app_connect = _app_connect; - // _app_connect = NULL; - // saved_app_connect->StopConnect(this); - // } - - // Deref additionally which is added at creation so that this - // Socket's reference will hit 0(recycle) when no one addresses it. - ReleaseAdditionalReference(); - // NOTE: This Socket may be recycled at this point, don't - // touch anything. - return 0; - } - } -} - -int Socket::SetFailed() { - return SetFailed(EFAILEDSOCKET, NULL); -} - void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) { if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code, latency_us)) { if (SetFailed(main_socket_id()) == 0) { @@ -1052,12 +1052,29 @@ int Socket::ReleaseReferenceIfIdle(int idle_seconds) { return ReleaseAdditionalReference(); } + +int Socket::SetFailed() { + return SetFailed(EFAILEDSOCKET, NULL); +} + +int Socket::SetFailed(int error_code, const char* error_fmt, ...) { + std::string error_text; + if (error_fmt != NULL) { + va_list ap; + va_start(ap, error_fmt); + butil::string_vprintf(&error_text, error_fmt, ap); + va_end(ap); + } + return VersionedRefWithId::SetFailed(error_code, error_text); +} + int Socket::SetFailed(SocketId id) { SocketUniquePtr ptr; if (Address(id, &ptr) != 0) { return -1; } - return ptr->SetFailed(); + + return ptr->SetFailed(EFAILEDSOCKET, NULL); } void Socket::NotifyOnFailed(bthread_id_t id) { @@ -1078,16 +1095,16 @@ void Socket::NotifyOnFailed(bthread_id_t id) { // For unit-test. int Socket::Status(SocketId id, int32_t* nref) { - const butil::ResourceId slot = SlotOfSocketId(id); + const butil::ResourceId slot = SlotOfVRefId(id); Socket* const m = address_resource(slot); if (m != NULL) { - const uint64_t vref = m->_versioned_ref.load(butil::memory_order_relaxed); - if (VersionOfVRef(vref) == VersionOfSocketId(id)) { + const uint64_t vref = m->versioned_ref(); + if (VersionOfVRef(vref) == VersionOfVRefId(id)) { if (nref) { *nref = NRefOfVRef(vref); } return 0; - } else if (VersionOfVRef(vref) == VersionOfSocketId(id) + 1) { + } else if (VersionOfVRef(vref) == VersionOfVRefId(id) + 1) { if (nref) { *nref = NRefOfVRef(vref); } @@ -1097,85 +1114,6 @@ int Socket::Status(SocketId id, int32_t* nref) { return -1; } -void Socket::OnRecycle() { - const bool create_by_connect = CreatedByConnect(); - if (_app_connect) { - std::shared_ptr tmp; - _app_connect.swap(tmp); - tmp->StopConnect(this); - } - if (_conn) { - SocketConnection* const saved_conn = _conn; - _conn = NULL; - saved_conn->BeforeRecycle(this); - } - if (_user) { - SocketUser* const saved_user = _user; - _user = NULL; - saved_user->BeforeRecycle(this); - } - SharedPart* sp = _shared_part.exchange(NULL, butil::memory_order_acquire); - if (sp) { - sp->RemoveRefManually(); - } - - // Recycle `_event_data_id'. - MakeEventDataIdInvalid(_event_data_id); - - const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed); - if (ValidFileDescriptor(prev_fd)) { - if (_on_edge_triggered_events != NULL) { - GetGlobalEventDispatcher(prev_fd, _bthread_tag).RemoveConsumer(prev_fd); - } - close(prev_fd); - if (create_by_connect) { - g_vars->channel_conn << -1; - } - } - -#if BRPC_WITH_RDMA - if (_rdma_ep) { - delete _rdma_ep; - _rdma_ep = NULL; - _rdma_state = RDMA_UNKNOWN; - } -#endif - - reset_parsing_context(NULL); - _read_buf.clear(); - - _auth_flag_error.store(0, butil::memory_order_relaxed); - bthread_id_error(_auth_id, 0); - - bthread_id_list_destroy(&_id_wait_list); - - if (_ssl_session) { - SSL_free(_ssl_session); - _ssl_session = NULL; - } - - _ssl_ctx = NULL; - - delete _pipeline_q; - _pipeline_q = NULL; - - delete _auth_context; - _auth_context = NULL; - - delete _stream_set; - _stream_set = NULL; - - const SocketId asid = _agent_socket_id.load(butil::memory_order_relaxed); - if (asid != INVALID_SOCKET_ID) { - SocketUniquePtr ptr; - if (Socket::Address(asid, &ptr) == 0) { - ptr->ReleaseAdditionalReference(); - } - } - - g_vars->nsocket << -1; -} - void* Socket::ProcessEvent(void* arg) { // the enclosed Socket is valid and free to access inside this function. SocketUniquePtr s(static_cast(arg)); @@ -1574,7 +1512,6 @@ inline int SetError(bthread_id_t id_wait, int ec) { } int Socket::ConductError(bthread_id_t id_wait) { - pthread_mutex_lock(&_id_wait_list_mutex); if (Failed()) { const int error_code = non_zero_error_code(); if (id_wait != INVALID_BTHREAD_ID) { @@ -2275,7 +2212,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { // } os << "# This is a broken Socket\n"; } - const uint64_t vref = ptr->_versioned_ref.load(butil::memory_order_relaxed); + const uint64_t vref = ptr->versioned_ref(); size_t npipelined = 0; size_t idsizes[4]; size_t nidsize = 0; @@ -2335,7 +2272,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { << "\nlocal_side=" << ptr->_local_side << "\non_et_events=" << (void*)ptr->_on_edge_triggered_events << "\nuser=" << ShowObject(ptr->_user) - << "\nthis_id=" << ptr->_this_id + << "\nthis_id=" << ptr->id() << "\npreferred_index=" << preferred_index; InputMessenger* messenger = dynamic_cast(ptr->user()); if (messenger != NULL) { @@ -2380,7 +2317,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { << "\nauth_context=" << ptr->_auth_context << "\nlogoff_flag=" << ptr->_logoff_flag.load(butil::memory_order_relaxed) << "\n_additional_ref_status=" - << ptr->_additional_ref_status.load(butil::memory_order_relaxed) + << ptr->additional_ref_status() << "\ntotal_streams_buffer_size=" << ptr->_total_streams_unconsumed_size.load(butil::memory_order_relaxed) << "\nninflight_app_health_check=" @@ -2569,7 +2506,7 @@ void Socket::ResetAllStreams() { if (_stream_set != NULL) { // Not delete _stream_set because there are likely more streams added // after reviving if the Socket is still in use, or it is to be deleted in - // OnRecycle() + // BeforeRecycled() saved_stream_set.swap(*_stream_set); } _stream_mutex.unlock(); @@ -2949,25 +2886,6 @@ void Socket::OnProgressiveReadCompleted() { } } -std::string Socket::description() const { - // NOTE: The output should be consistent with operator<<() - std::string result; - result.reserve(64); - butil::string_appendf(&result, "Socket{id=%" PRIu64, id()); - const int saved_fd = fd(); - if (saved_fd >= 0) { - butil::string_appendf(&result, " fd=%d", saved_fd); - } - butil::string_appendf(&result, " addr=%s", - butil::endpoint2str(remote_side()).c_str()); - const int local_port = local_side().port; - if (local_port > 0) { - butil::string_appendf(&result, ":%d", local_port); - } - butil::string_appendf(&result, "} (0x%p)", this); - return result; -} - SocketSSLContext::SocketSSLContext() : raw_ctx(NULL) {} diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 13d9a1773d..e2de5419ba 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -40,6 +40,7 @@ #include "bvar/bvar.h" #include "brpc/http_method.h" #include "brpc/event_dispatcher.h" +#include "brpc/versioned_ref_with_id.h" namespace brpc { namespace policy { @@ -227,7 +228,7 @@ struct SocketOptions { // Abstractions on reading from and writing into file descriptors. // NOTE: accessed by multiple threads(frequently), align it by cacheline. -class BAIDU_CACHELINE_ALIGNMENT/*note*/ Socket { +class BAIDU_CACHELINE_ALIGNMENT/*note*/ Socket : public VersionedRefWithId { friend class EventDispatcher; friend class InputMessenger; friend class Acceptor; @@ -244,16 +245,17 @@ friend class HealthCheckTask; friend class OnAppHealthCheckDone; friend class HealthCheckManager; friend class policy::H2GlobalStreamCreator; +friend class VersionedRefWithId; +friend void DereferenceSocket(Socket*); class SharedPart; - struct Forbidden {}; struct WriteRequest; public: const static int STREAM_FAKE_FD = INT_MAX; // NOTE: User cannot create Socket from constructor. Use Create() // instead. It's public just because of requirement of ResourcePool. - Socket(Forbidden); - ~Socket(); + explicit Socket(Forbidden); + ~Socket() override; // Write `msg' into this Socket and clear it. The `msg' should be an // intact request or response. To prevent messages from interleaving @@ -343,9 +345,6 @@ friend class policy::H2GlobalStreamCreator; // After health checking is complete, set _hc_started to false. void AfterHCCompleted() { _hc_started.store(false, butil::memory_order_relaxed); } - // The unique identifier. - SocketId id() const { return _this_id; } - // `user' parameter passed to Create(). SocketUser* user() const { return _user; } @@ -373,21 +372,6 @@ friend class policy::H2GlobalStreamCreator; // Returns 0 on success, -1 otherwise. static int Create(const SocketOptions& options, SocketId* id); - // Place the Socket associated with identifier `id' into unique_ptr `ptr', - // which will be released automatically when out of scope (w/o explicit - // std::move). User can still access `ptr' after calling ptr->SetFailed() - // before release of `ptr'. - // This function is wait-free. - // Returns 0 on success, -1 when the Socket was SetFailed(). - static int Address(SocketId id, SocketUniquePtr* ptr); - - // Re-address current socket into `ptr'. - // Always succeed even if this socket is failed. - void ReAddress(SocketUniquePtr* ptr); - - // Returns 0 on success, 1 on failed socket, -1 on recycled. - static int AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr); - // Mark this Socket or the Socket associated with `id' as failed. // Any later Address() of the identifier shall return NULL unless the // Socket was revivied by StartHealthCheck. The Socket is NOT recycled @@ -410,20 +394,10 @@ friend class policy::H2GlobalStreamCreator; void FeedbackCircuitBreaker(int error_code, int64_t latency_us); - bool Failed() const; - - bool DidReleaseAdditionalRereference() const { - return _additional_ref_status.load(butil::memory_order_relaxed) == REF_RECYCLED; - } - // Notify `id' object (by calling bthread_id_error) when this Socket // has been `SetFailed'. If it already has, notify `id' immediately void NotifyOnFailed(bthread_id_t id); - // Release the additional reference which added inside `Create' - // before so that `Socket' will be recycled automatically once - // on one is addressing it. - int ReleaseAdditionalReference(); // `ReleaseAdditionalReference' this Socket iff it has no data // transmission during the last `idle_seconds' int ReleaseReferenceIfIdle(int idle_seconds); @@ -578,9 +552,6 @@ friend class policy::H2GlobalStreamCreator; _last_writetime_us.load(butil::memory_order_relaxed)); } - // A brief description of this socket, consistent with os << *this - std::string description() const; - // Returns true if the remote side is overcrowded. bool is_overcrowded() const { return _overcrowded; } @@ -602,8 +573,20 @@ friend class policy::H2GlobalStreamCreator; int ConductError(bthread_id_t); int StartWrite(WriteRequest*, const WriteOptions&); - int Dereference(); -friend void DereferenceSocket(Socket*); + // Create a Socket according to `options', put the identifier into `id'. + // Returns 0 on success, -1 otherwise. + int OnCreated(const SocketOptions& options); + + // Called before returning to pool. + void BeforeRecycled(); + + void OnFailed(int error_code, const std::string& error_text); + + // Make this socket addressable again. + void AfterRevived(); + + std::string OnDescription() const; + static int Status(SocketId, int32_t* nref = NULL); // for unit-test. @@ -625,15 +608,6 @@ friend void DereferenceSocket(Socket*); // success, -1 otherwise and errno is set ssize_t DoWrite(WriteRequest* req); - // 1. When `failed_ad_well=true', returns 0 on success, - // 1 on failed socket, -1 on recycled. - // 2. When `failed_ad_well=true', returns 0 on success, - // -1 when the Socket was SetFailed(). - static int AddressImpl(SocketId id, bool failed_as_well, SocketUniquePtr* ptr); - - // Called before returning to pool. - void OnRecycle(); - // [Not thread-safe] Wait for EPOLLOUT event on `fd'. If `pollin' is // true, EPOLLIN event will also be included and EPOLL_CTL_MOD will // be used instead of EPOLL_CTL_ADD. Note that spurious wakeups may @@ -665,9 +639,6 @@ friend void DereferenceSocket(Socket*); // Wait until nref hits `expected_nref' and reset some internal resources. int WaitAndReset(int32_t expected_nref); - // Make this socket addressable again. - void Revive(); - static void* ProcessEvent(void*); static void* KeepWrite(void*); @@ -744,16 +715,6 @@ friend void DereferenceSocket(Socket*); void CancelUnwrittenBytes(size_t bytes); private: - // unsigned 32-bit version + signed 32-bit referenced-count. - // Meaning of version: - // * Created version: no SetFailed() is called on the Socket yet. Must be - // same evenness with initial _versioned_ref because during lifetime of - // a Socket on the slot, the version is added with 1 twice. This is - // also the version encoded in SocketId. - // * Failed version: = created version + 1, SetFailed()-ed but returned. - // * Other versions: the socket is already recycled. - butil::atomic _versioned_ref; - // In/Out bytes/messages, SocketPool etc // _shared_part is shared by a main socket and all its pooled sockets. // Can't use intrusive_ptr because the creation is based on optimistic @@ -797,9 +758,6 @@ friend void DereferenceSocket(Socket*); // Initialized by SocketOptions.app_connect. std::shared_ptr _app_connect; - // Identifier of this Socket in ResourcePool. - SocketId _this_id; - // Identifier of EventData in ResourcePool. EventDataId _event_data_id; @@ -889,21 +847,6 @@ friend void DereferenceSocket(Socket*); // Set by SetLogOff butil::atomic _logoff_flag; - // Status flag used to mark that - enum AdditionalRefStatus { - REF_USING, // additional reference has been increased - REF_REVIVING, // additional reference is increasing - REF_RECYCLED // additional reference has been decreased - }; - - // Indicates whether additional reference has increased, - // decreased, or is increasing. - // additional ref status: - // `Socket'、`Create': REF_USING - // `SetFailed': REF_USING -> REF_RECYCLED - // `Revive' REF_RECYCLED -> REF_REVIVING -> REF_USING - butil::atomic _additional_ref_status; - // Concrete error information from SetFailed() // Accesses to these 2 fields(especially _error_text) must be protected // by _id_wait_list_mutex diff --git a/src/brpc/socket_id.h b/src/brpc/socket_id.h index 720432473f..e000c98b69 100644 --- a/src/brpc/socket_id.h +++ b/src/brpc/socket_id.h @@ -22,9 +22,7 @@ // To brpc developers: This is a header included by user, don't depend // on internal structures, use opaque pointers instead. -#include // uint64_t -#include "butil/unique_ptr.h" // std::unique_ptr - +#include "brpc/versioned_ref_with_id.h" namespace brpc { @@ -32,21 +30,25 @@ namespace brpc { // Users shall store SocketId instead of Sockets and call Socket::Address() // to convert the identifier to an unique_ptr at each access. Whenever a // unique_ptr is not destructed, the enclosed Socket will not be recycled. -typedef uint64_t SocketId; +typedef VRefId SocketId; -const SocketId INVALID_SOCKET_ID = (SocketId)-1; +const SocketId INVALID_SOCKET_ID = INVALID_VREF_ID; class Socket; extern void DereferenceSocket(Socket*); -struct SocketDeleter { +// Explicit (full) template specialization to ignore compiler error, +// because Socket is an incomplete type where only this header is included. +template<> +struct VersionedRefWithIdDeleter { void operator()(Socket* m) const { DereferenceSocket(m); } }; -typedef std::unique_ptr SocketUniquePtr; +typedef VersionedRefWithIdUniquePtr SocketUniquePtr; + } // namespace brpc diff --git a/src/brpc/socket_inl.h b/src/brpc/socket_inl.h index b26d52152b..a8ff3ce8ff 100644 --- a/src/brpc/socket_inl.h +++ b/src/brpc/socket_inl.h @@ -23,20 +23,6 @@ namespace brpc { -// Utility functions to combine and extract SocketId. -BUTIL_FORCE_INLINE SocketId -MakeSocketId(uint32_t version, butil::ResourceId slot) { - return MakeVRefId(version, slot); -} - -BUTIL_FORCE_INLINE butil::ResourceId SlotOfSocketId(SocketId sid) { - return SlotOfVRefId(sid); -} - -BUTIL_FORCE_INLINE uint32_t VersionOfSocketId(SocketId sid) { - return VersionOfVRefId(sid); -} - inline SocketOptions::SocketOptions() : fd(-1) , user(NULL) @@ -48,135 +34,7 @@ inline SocketOptions::SocketOptions() , conn(NULL) , app_connect(NULL) , initial_parsing_context(NULL) - , bthread_tag(BTHREAD_TAG_DEFAULT) -{} - -inline int Socket::Dereference() { - const SocketId id = _this_id; - const uint64_t vref = _versioned_ref.fetch_sub( - 1, butil::memory_order_release); - const int32_t nref = NRefOfVRef(vref); - if (nref > 1) { - return 0; - } - if (__builtin_expect(nref == 1, 1)) { - const uint32_t ver = VersionOfVRef(vref); - const uint32_t id_ver = VersionOfSocketId(id); - // Besides first successful SetFailed() adds 1 to version, one of - // those dereferencing nref from 1->0 adds another 1 to version. - // Notice "one of those": The wait-free Address() may make ref of a - // version-unmatched slot change from 1 to 0 for mutiple times, we - // have to use version as a guard variable to prevent returning the - // Socket to pool more than once. - // - // Note: `ver == id_ver' means this socket has been `SetRecycle' - // before rather than `SetFailed'; `ver == ide_ver+1' means we - // had `SetFailed' this socket before. We should destroy the - // socket under both situation - if (__builtin_expect(ver == id_ver || ver == id_ver + 1, 1)) { - // sees nref:1->0, try to set version=id_ver+2,--nref. - // No retry: if version changes, the slot is already returned by - // another one who sees nref:1->0 concurrently; if nref changes, - // which must be non-zero, the slot will be returned when - // nref changes from 1->0 again. - // Example: - // SetFailed(): --nref, sees nref:1->0 (1) - // try to set version=id_ver+2 (2) - // Address(): ++nref, unmatched version (3) - // --nref, sees nref:1->0 (4) - // try to set version=id_ver+2 (5) - // 1,2,3,4,5 or 1,3,4,2,5: - // SetFailed() succeeds, Address() fails at (5). - // 1,3,2,4,5: SetFailed() fails with (2), the slot will be - // returned by (5) of Address() - // 1,3,4,5,2: SetFailed() fails with (2), the slot is already - // returned by (5) of Address(). - uint64_t expected_vref = vref - 1; - if (_versioned_ref.compare_exchange_strong( - expected_vref, MakeVRef(id_ver + 2, 0), - butil::memory_order_acquire, - butil::memory_order_relaxed)) { - OnRecycle(); - return_resource(SlotOfSocketId(id)); - return 1; - } - return 0; - } - LOG(FATAL) << "Invalid SocketId=" << id; - return -1; - } - LOG(FATAL) << "Over dereferenced SocketId=" << id; - return -1; -} - -inline int Socket::Address(SocketId id, SocketUniquePtr* ptr) { - return AddressImpl(id ,false, ptr); -} - -inline void Socket::ReAddress(SocketUniquePtr* ptr) { - _versioned_ref.fetch_add(1, butil::memory_order_acquire); - ptr->reset(this); -} - -inline int Socket::AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr) { - return AddressImpl(id, true, ptr); -} - -inline int Socket::AddressImpl(SocketId id, - bool failed_as_well, - SocketUniquePtr* ptr) { - const butil::ResourceId slot = SlotOfSocketId(id); - Socket* const m = address_resource(slot); - if (__builtin_expect(m != NULL, 1)) { - // acquire fence makes sure this thread sees latest changes before - // Dereference() or Revive(). - const uint64_t vref1 = m->_versioned_ref.fetch_add( - 1, butil::memory_order_acquire); - const uint32_t ver1 = VersionOfVRef(vref1); - if (ver1 == VersionOfSocketId(id)) { - ptr->reset(m); - return 0; - } - if (failed_as_well && ver1 == VersionOfSocketId(id) + 1) { - ptr->reset(m); - return 1; - } - - const uint64_t vref2 = m->_versioned_ref.fetch_sub( - 1, butil::memory_order_release); - const int32_t nref = NRefOfVRef(vref2); - if (nref > 1) { - return -1; - } else if (__builtin_expect(nref == 1, 1)) { - const uint32_t ver2 = VersionOfVRef(vref2); - if ((ver2 & 1)) { - if (ver1 == ver2 || ver1 + 1 == ver2) { - uint64_t expected_vref = vref2 - 1; - if (m->_versioned_ref.compare_exchange_strong( - expected_vref, MakeVRef(ver2 + 1, 0), - butil::memory_order_acquire, - butil::memory_order_relaxed)) { - m->OnRecycle(); - return_resource(slot); - } - } else { - CHECK(false) << "ref-version=" << ver1 - << " unref-version=" << ver2; - } - } else { - // Addressed a free slot. - } - } else { - CHECK(false) << "Over dereferenced SocketId=" << id; - } - } - return -1; -} - -inline bool Socket::Failed() const { - return VersionOfVRef(_versioned_ref.load(butil::memory_order_relaxed)) - != VersionOfSocketId(_this_id); -} + , bthread_tag(BTHREAD_TAG_DEFAULT) {} inline bool Socket::MoreReadEvents(int* progress) { // Fail to CAS means that new events arrived. diff --git a/src/brpc/versioned_ref_with_id.h b/src/brpc/versioned_ref_with_id.h index 53b78be420..2acb421c85 100644 --- a/src/brpc/versioned_ref_with_id.h +++ b/src/brpc/versioned_ref_with_id.h @@ -24,6 +24,7 @@ #include "butil/class_name.h" #include "butil/logging.h" #include "bthread/bthread.h" +#include "brpc/errno.pb.h" namespace brpc { @@ -32,7 +33,7 @@ typedef uint64_t VRefId; const VRefId INVALID_VREF_ID = (VRefId)-1; -template +template class VersionedRefWithId; template @@ -79,6 +80,45 @@ BUTIL_FORCE_INLINE uint64_t MakeVRef(uint32_t version, int32_t nref) { return (((uint64_t)version) << 32) | (uint32_t/*1*/)nref; } + +template +typename std::enable_if::value, Ret>::type ReturnEmpty() { + return Ret{}; +} + +template +typename std::enable_if::value, Ret>::type ReturnEmpty() {} + +// Call func_name of class_type if class_type implements func_name, +// otherwise call default function. +#define WRAPPER_OF(class_type, func_name, return_type) \ + struct func_name ## Wrapper { \ + template \ + static auto Test(int) -> decltype( \ + std::declval().func_name(std::declval()...), std::true_type()); \ + template \ + static auto Test(...) -> std::false_type; \ + \ + template \ + typename std::enable_if(0))::value, return_type>::type \ + Call(class_type* obj, Args... args) { \ + static_assert(butil::is_class_func_return_same< \ + return_type, T, decltype(&T::func_name), Args...>::value, \ + "Return type mismatch"); \ + return obj->func_name(std::forward(args)...); \ + } \ + \ + template \ + typename std::enable_if(0))::value, return_type>::type \ + Call(class_type* obj, Args... args) { \ + return ReturnEmpty(); \ + } \ + } + +#define WRAPPER_CALL(func_name, obj, ...) func_name ## Wrapper().Call(obj, ## __VA_ARGS__) + // VersionedRefWithId is an efficient data structure, which can be find // in O(1)-time by VRefId. // Users shall call VersionedRefWithId::Create() to create T, @@ -86,25 +126,26 @@ BUTIL_FORCE_INLINE uint64_t MakeVRef(uint32_t version, int32_t nref) { // to convert the identifier to an unique_ptr at each access. Whenever // a unique_ptr is not destructed, the enclosed T will not be recycled. // -// The implementation is the same as `Socket', but there is no health -// check and revive mechanism. // -// Derived classes implement 4 virtual functions which are guaranteed -// to only be called once: -// 1. (required) int OnCreate(const CreateOptions* options) : +// Derived classes implement 6 functions : +// 1. (required) int OnCreated(Args... args) : // Will be called in Create() to initialize T init when T is created successfully. // If initialization fails, return non-zero. VersionedRefWithId will be `SetFailed' // and Create() returns non-zero. -// 2. (required) void OnRecycle() : -// Will be called in Dereference() when T is being recycled. -// 3. (optional) void OnFailed() : -// Will be called in SetFailed() when T is set failed successfully. -// 4. (optional) void OnAdditionalRefReleased() : -// Will be called in ReleaseAdditionalReference(). +// 2. (required) void BeforeRecycled() : +// Will be called in Dereference() before T is recycled. +// 3. (optional) void OnFailed(Args... args) : +// Will be called in SetFailed() when VersionedRefWithId is set failed successfully. +// 4. (optional) void BeforeAdditionalRefReleased() : +// Will be called in ReleaseAdditionalReference() before additional ref is released. +// 5. (optional) void AfterRevived() : +// Will be called in Revive() When VersionedRefWithId is revived. +// 6. (optional) std::string OnDescription() const : +// Will be called in description(). // // Example usage: // -// class UserData : public brpc::VersionedRefWithId { +// class UserData : public brpc::VersionedRefWithId { // public: // explicit UserData(Forbidden f) // : brpc::VersionedRefWithId(f) @@ -117,14 +158,16 @@ BUTIL_FORCE_INLINE uint64_t MakeVRef(uint32_t version, int32_t nref) { // _count.fetch_sub(c, butil::memory_order_relaxed); // } // private: -// int OnCreate(const void*) override { +// friend class brpc::VersionedRefWithId; +// +// int OnCreated() { // _count.store(1, butil::memory_order_relaxed); // return 0; // } -// void OnFailed() override { +// void OnFailed(int error_code, const std::string& error_text) { // _count.fetch_sub(1, butil::memory_order_relaxed); // } -// void OnRecycle() override { +// void BeforeRecycled() { // _count.store(0, butil::memory_order_relaxed); // } // @@ -137,7 +180,7 @@ BUTIL_FORCE_INLINE uint64_t MakeVRef(uint32_t version, int32_t nref) { // // And to call methods on UserData: // UserDataId id; -// if (UserData::Create(NULL, &id) ! =0) { +// if (UserData::Create(&id) ! =0) { // LOG(ERROR) << "Fail to create UserData"; // return; // } @@ -148,27 +191,28 @@ BUTIL_FORCE_INLINE uint64_t MakeVRef(uint32_t version, int32_t nref) { // } // user_data->Add(10); // user_data->SetFailed(); -// UserData::SetFailed(id); +// UserData::SetFailedById(id); // -template +template class VersionedRefWithId { protected: struct Forbidden {}; public: explicit VersionedRefWithId(Forbidden) + // Must be even because Address() relies on evenness of version. : _versioned_ref(0) , _this_id(0) - , _recycle_flag(false) - {} + , _additional_ref_status(ADDITIONAL_REF_USING) {} virtual ~VersionedRefWithId() = default; DISALLOW_COPY_AND_ASSIGN(VersionedRefWithId); // Create a VersionedRefWithId, put the identifier into `id'. - // `options' will be passed to OnCreate() directly. + // `args' will be passed to OnCreated() directly. // Returns 0 on success, -1 otherwise. - static int Create(const CreateOptions* options, VRefId* id); + template + static int Create(VRefId* id, Args... args); // Place the VersionedRefWithId associated with identifier `id' into // unique_ptr `ptr', which will be released automatically when out @@ -201,8 +245,11 @@ class VersionedRefWithId { // T::OnFailed() will be called when SetFailed() successfully. // This function is lock-free. // Returns -1 when the Socket was already SetFailed(), 0 otherwise. - int SetFailed(); - static int SetFailed(VRefId id); + template + static int SetFailedById(VRefId id, Args... args); + + template + int SetFailed(Args... args); bool Failed() const { return VersionOfVRef(_versioned_ref.load(butil::memory_order_relaxed)) @@ -212,32 +259,63 @@ class VersionedRefWithId { // Release the additional reference which added inside `Create' // before so that `VersionedRefWithId' will be recycled automatically // once on one is addressing it. - void ReleaseAdditionalReference(); + int ReleaseAdditionalReference(); VRefId id() const { return _this_id; } + // A brief description. + std::string description() const; + protected: friend void DereferenceVersionedRefWithId<>(T* r); - // 1. When `failed_ad_well=true', returns 0 on success, - // 1 on failed socket, -1 on recycled. - // 2. When `failed_ad_well=true', returns 0 on success, - // -1 when the Socket was SetFailed(). - static int AddressImpl(VRefId id, bool failed_as_well, - VersionedRefWithIdUniquePtr* ptr); + // Status flag used to mark that + enum AdditionalRefStatus { + // 1. Additional reference has been increased; + ADDITIONAL_REF_USING, + // 2. Additional reference is increasing; + ADDITIONAL_REF_REVIVING, + // 3. Additional reference has been decreased. + ADDITIONAL_REF_RECYCLED + }; + + AdditionalRefStatus additional_ref_status() const { + return _additional_ref_status.load(butil::memory_order_relaxed); + } + + uint64_t versioned_ref() const { + // The acquire fence pairs with release fence in Dereference to avoid + // inconsistent states to be seen by others. + return _versioned_ref.load(butil::memory_order_acquire); + } + + template + int SetFailedImpl(Args... args); // Release the reference. If no one is addressing this VersionedRefWithId, - // it will be recycled automatically and T::OnRecycle() will be called. + // it will be recycled automatically and T::BeforeRecycled() will be called. int Dereference(); + // Make this socket addressable again. + // If nref is less than `at_least_nref', VersionedRefWithId was + // abandoned during revival and cannot be revived. + void Revive(int32_t at_least_nref); + private: typedef butil::ResourceId resource_id_t; - // See comments of VersionedRefWithId above. - virtual int OnCreate(const CreateOptions* options) = 0; - virtual void OnRecycle() = 0; - virtual void OnFailed() {} - virtual void OnAdditionalRefReleased() {} + // 1. When `failed_as_well=true', returns 0 on success, + // 1 on failed socket, -1 on recycled. + // 2. When `failed_as_well=true', returns 0 on success, + // -1 when the Socket was SetFailed(). + static int AddressImpl(VRefId id, bool failed_as_well, + VersionedRefWithIdUniquePtr* ptr); + + // Callback wrapper of Derived classes. + WRAPPER_OF(T, OnFailed, void); + WRAPPER_OF(T, BeforeAdditionalRefReleased, void); + WRAPPER_OF(T, AfterRevived, void); + WRAPPER_OF(T, OnDescription, std::string); // unsigned 32-bit version + signed 32-bit referenced-count. // Meaning of version: @@ -250,22 +328,25 @@ friend void DereferenceVersionedRefWithId<>(T* r); butil::atomic BAIDU_CACHELINE_ALIGNMENT _versioned_ref; // The unique identifier. VRefId _this_id; - // Flag used to mark whether additional reference - // has been decreased by either `SetFailed'. - butil::atomic _recycle_flag; + // Indicates whether additional reference has increased, + // decreased, or is increasing. + // additional ref status: + // `Socket'、`Create': REF_USING + // `SetFailed': REF_USING -> REF_RECYCLED + // `Revive' REF_RECYCLED -> REF_REVIVING -> REF_USING + butil::atomic _additional_ref_status; }; - template void DereferenceVersionedRefWithId(T* r) { if (r) { - r->Dereference(); + static_cast*>(r)->Dereference(); } } -template -int VersionedRefWithId::Create(const CreateOptions* options, - VRefId* id) { +template +template +int VersionedRefWithId::Create(VRefId* id, Args... args) { resource_id_t slot; T* const t = butil::get_resource(&slot, Forbidden()); if (t == NULL) { @@ -277,43 +358,47 @@ int VersionedRefWithId::Create(const CreateOptions* options, // _this_id will only be used in destructor/Destroy of referenced // slots, which is safe and properly fenced. Although it's better // to put the id into VersionedRefWithIdUniquePtr. - VersionedRefWithId* const vref_with_id = t; + VersionedRefWithId* const vref_with_id = t; vref_with_id->_this_id = MakeVRefId( VersionOfVRef(vref_with_id->_versioned_ref.fetch_add( 1, butil::memory_order_release)), slot); - vref_with_id->_recycle_flag.store(false, butil::memory_order_relaxed); - // At last, call T::OnCreate() to initialize the T object. - if (vref_with_id->OnCreate(options) != 0) { + vref_with_id->_additional_ref_status.store( + ADDITIONAL_REF_USING, butil::memory_order_relaxed); + static_assert(butil::is_class_func_return_int< + T, decltype(&T::OnCreated), Args...>::value, + "T::OnCreated must return int"); + // At last, call T::OnCreated() to initialize the T object. + if (t->OnCreated(std::forward(args)...) != 0) { vref_with_id->SetFailed(); - // NOTE: This object may be recycled at this point, don't - // touch anything. + // NOTE: This object may be recycled at this point, + // don't touch anything. return -1; } *id = vref_with_id->_this_id; return 0; } -template -int VersionedRefWithId::Address( +template +int VersionedRefWithId::Address( VRefId id, VersionedRefWithIdUniquePtr* ptr) { return AddressImpl(id, false, ptr); } -template -int VersionedRefWithId::AddressFailedAsWell( +template +int VersionedRefWithId::AddressFailedAsWell( VRefId id, VersionedRefWithIdUniquePtr* ptr) { return AddressImpl(id, true, ptr); } -template -int VersionedRefWithId::AddressImpl( +template +int VersionedRefWithId::AddressImpl( VRefId id, bool failed_as_well, VersionedRefWithIdUniquePtr* ptr) { const resource_id_t slot = SlotOfVRefId(id); T* const t = address_resource(slot); if (__builtin_expect(t != NULL, 1)) { // acquire fence makes sure this thread sees latest changes before // Dereference() or Revive(). - VersionedRefWithId* const vref_with_id = t; + VersionedRefWithId* const vref_with_id = t; const uint64_t vref1 = vref_with_id->_versioned_ref.fetch_add( 1, butil::memory_order_acquire); const uint32_t ver1 = VersionOfVRef(vref1); @@ -340,7 +425,10 @@ int VersionedRefWithId::AddressImpl( expected_vref, MakeVRef(ver2 + 1, 0), butil::memory_order_acquire, butil::memory_order_relaxed)) { - vref_with_id->OnRecycle(); + static_assert(butil::is_class_func_return_void< + T, decltype(&T::BeforeRecycled)>::value, + "T::BeforeRecycled must return void"); + t->BeforeRecycled(); return_resource(slot); } } else { @@ -357,14 +445,31 @@ int VersionedRefWithId::AddressImpl( return -1; } -template -void VersionedRefWithId::ReAddress(VersionedRefWithIdUniquePtr* ptr) { +template +void VersionedRefWithId::ReAddress(VersionedRefWithIdUniquePtr* ptr) { _versioned_ref.fetch_add(1, butil::memory_order_acquire); ptr->reset(static_cast(this)); } -template -int VersionedRefWithId::SetFailed() { +template +template +int VersionedRefWithId::SetFailedById(VRefId id, Args... args) { + VersionedRefWithIdUniquePtr ptr; + if (Address(id, &ptr) != 0) { + return -1; + } + return ptr->SetFailed(std::forward(args)...); +} + +template +template +int VersionedRefWithId::SetFailed(Args... args) { + return SetFailedImpl(std::forward(args)...); +} + +template +template +int VersionedRefWithId::SetFailedImpl(Args... args) { const uint32_t id_ver = VersionOfVRefId(_this_id); uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed); for (;;) { @@ -374,11 +479,11 @@ int VersionedRefWithId::SetFailed() { // Try to set version=id_ver+1 (to make later address() return NULL), // retry on fail. if (_versioned_ref.compare_exchange_strong( - vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)), - butil::memory_order_release, - butil::memory_order_relaxed)) { + vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)), + butil::memory_order_release, + butil::memory_order_relaxed)) { // Call T::OnFailed() to notify the failure of T. - OnFailed(); + WRAPPER_CALL(OnFailed, static_cast(this), std::forward(args)...); // Deref additionally which is added at creation so that this // queue's reference will hit 0(recycle) when no one addresses it. ReleaseAdditionalReference(); @@ -389,25 +494,30 @@ int VersionedRefWithId::SetFailed() { } } -template -int VersionedRefWithId::SetFailed(VRefId id) { - VersionedRefWithIdUniquePtr ptr; - if (Address(id, &ptr) != 0) { - return -1; - } - return ptr->SetFailed(); -} +template +int VersionedRefWithId::ReleaseAdditionalReference() { + do { + AdditionalRefStatus expect = ADDITIONAL_REF_USING; + if (_additional_ref_status.compare_exchange_strong( + expect, ADDITIONAL_REF_RECYCLED, + butil::memory_order_relaxed, + butil::memory_order_relaxed)) { + BeforeAdditionalRefReleasedWrapper(); + WRAPPER_CALL(BeforeAdditionalRefReleased, static_cast(this)); + return Dereference(); + } -template -void VersionedRefWithId::ReleaseAdditionalReference() { - if (!_recycle_flag.exchange(true,butil::memory_order_relaxed)) { - OnAdditionalRefReleased(); - Dereference(); - } + if (expect == ADDITIONAL_REF_REVIVING) { + // sched_yield to wait until status is not REF_REVIVING. + sched_yield(); + } else { + return -1; // REF_RECYCLED + } + } while (true); } -template -int VersionedRefWithId::Dereference() { +template +int VersionedRefWithId::Dereference() { const VRefId id = _this_id; const uint64_t vref = _versioned_ref.fetch_sub( 1, butil::memory_order_release); @@ -452,19 +562,63 @@ int VersionedRefWithId::Dereference() { expected_vref, MakeVRef(id_ver + 2, 0), butil::memory_order_acquire, butil::memory_order_relaxed)) { - OnRecycle(); + static_cast(this)->BeforeRecycled(); return_resource(SlotOfVRefId(id)); return 1; } return 0; } - LOG(FATAL) << "Invalid SocketId=" << id; + LOG(FATAL) << "Invalid VRefId=" << id; return -1; } - LOG(FATAL) << "Over dereferenced SocketId=" << id; + LOG(FATAL) << "Over dereferenced VRefId=" << id; return -1; } +template +void VersionedRefWithId::Revive(int32_t at_least_nref) { + const uint32_t id_ver = VersionOfVRefId(_this_id); + uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed); + _additional_ref_status.store( + ADDITIONAL_REF_REVIVING, butil::memory_order_relaxed); + while (true) { + CHECK_EQ(id_ver + 1, VersionOfVRef(vref)) << "id=" << id(); + + int32_t nref = NRefOfVRef(vref); + if (nref < at_least_nref) { + // Set the status to REF_RECYLED since no one uses this socket + _additional_ref_status.store( + ADDITIONAL_REF_RECYCLED, butil::memory_order_relaxed); + CHECK_EQ(1, nref); + LOG(WARNING) << description() << " was abandoned during revival"; + return; + } + // +1 is the additional ref added in Create(). TODO(gejun): we should + // remove this additional nref someday. + if (_versioned_ref.compare_exchange_weak( + vref, MakeVRef(id_ver, nref + 1/*note*/), + butil::memory_order_release, + butil::memory_order_relaxed)) { + // Set the status to REF_USING since we add additional ref again + _additional_ref_status.store( + ADDITIONAL_REF_USING, butil::memory_order_relaxed); + WRAPPER_CALL(AfterRevived, static_cast(this)); + return; + } + } +} + +template +std::string VersionedRefWithId::description() const { + std::string result; + result.reserve(128); + butil::string_appendf(&result, "Socket{id=%" PRIu64, id()); + result.append(WRAPPER_CALL( + OnDescription, const_cast(static_cast(this)))); + butil::string_appendf(&result, "} (0x%p)", this); + return result; } +} // namespace brpc + #endif // BRPC_VERSIONED_REF_WITH_ID_H diff --git a/src/butil/memory/scope_guard.h b/src/butil/memory/scope_guard.h index 8e8c568912..1755783154 100644 --- a/src/butil/memory/scope_guard.h +++ b/src/butil/memory/scope_guard.h @@ -19,18 +19,13 @@ #define BRPC_SCOPED_GUARD_H #include +#include "butil/type_traits.h" namespace butil { -// Whether a no-argument callable returns void. -template -struct returns_void_t - : public std::is_same()())> -{}; - template::value>::type> + is_callable_return_void::value>> class ScopeGuard; template diff --git a/src/butil/type_traits.h b/src/butil/type_traits.h index 5f342db3d9..67bf029a7a 100644 --- a/src/butil/type_traits.h +++ b/src/butil/type_traits.h @@ -341,7 +341,7 @@ template struct is_enum_impl : false_type { }; template struct is_enum : internal::is_enum_impl< - is_same::value || + is_same::value || is_integral::value || is_floating_point::value || is_reference::value || @@ -351,6 +351,34 @@ template struct is_enum : is_enum { }; template struct is_enum : is_enum { }; template struct is_enum : is_enum { }; +// Whether a callable returns type which is same as ReturnType. +template +struct is_callable_return_same + : public butil::is_same()(std::declval()...))> {}; + +// Whether a callable returns void. +template +struct is_callable_return_void + : public is_callable_return_same {}; + +// Whether a function of class returns type which is same as ReturnType. +template +struct is_class_func_return_same + : public butil::is_same().*std::declval())(std::declval()...))> {}; + +// Whether a function of class returns int. +template +struct is_class_func_return_int + : public is_class_func_return_same {}; + +// Whether a function of class returns void. +template +struct is_class_func_return_void + : public is_class_func_return_same {}; + + } // namespace butil #endif // BUTIL_TYPE_TRAITS_H diff --git a/test/brpc_event_dispatcher_unittest.cpp b/test/brpc_event_dispatcher_unittest.cpp index d061d0016e..0335e1d0a0 100644 --- a/test/brpc_event_dispatcher_unittest.cpp +++ b/test/brpc_event_dispatcher_unittest.cpp @@ -53,30 +53,41 @@ TEST_F(EventDispatcherTest, versioned_ref) { ASSERT_EQ(brpc::MakeVRef(1, 1), versioned_ref); } -struct UserData : public brpc::VersionedRefWithId { +struct UserData; + +UserData* g_user_data = NULL; + +struct UserData : public brpc::VersionedRefWithId { explicit UserData(Forbidden f) - : brpc::VersionedRefWithId(f) + : brpc::VersionedRefWithId(f) , count(0) , _additional_ref_released(false) {} - int OnCreate(const void*) override { + int OnCreated() { count.store(1, butil::memory_order_relaxed); _additional_ref_released = false; + g_user_data = this; return 0; } - void OnRecycle() override { + void BeforeRecycled() { count.store(0, butil::memory_order_relaxed); + g_user_data = NULL; } - void OnAdditionalRefReleased() override { + void BeforeAdditionalRefReleased() { _additional_ref_released = true; } - void OnFailed() override { + void OnFailed() { count.fetch_sub(1, butil::memory_order_relaxed); } + void AfterRevived() { + count.fetch_add(1, butil::memory_order_relaxed); + _additional_ref_released = false; + } + butil::atomic count; bool _additional_ref_released; }; @@ -111,13 +122,14 @@ void* VRefThread(void* arg) { TEST_F(EventDispatcherTest, versioned_ref_with_id) { UserDataId id = INVALID_EVENT_DATA_ID; - ASSERT_EQ(0, UserData::Create(NULL, &id)); + ASSERT_EQ(0, UserData::Create(&id)); ASSERT_NE(INVALID_EVENT_DATA_ID, id); UserDataUniquePtr ptr; ASSERT_EQ(0, UserData::Address(id, &ptr)); ASSERT_EQ(2, ptr->nref()); ASSERT_FALSE(ptr->Failed()); ASSERT_EQ(1, ptr->count); + ASSERT_EQ(g_user_data, ptr.get()); { UserDataUniquePtr temp_ptr; ASSERT_EQ(0, UserData::Address(id, &temp_ptr)); @@ -152,9 +164,15 @@ TEST_F(EventDispatcherTest, versioned_ref_with_id) { ASSERT_EQ(ptr, temp_ptr); ASSERT_EQ(2, ptr->nref()); } + ptr->Revive(1); + ASSERT_EQ(2, ptr->nref()); + ASSERT_EQ(g_count, ptr->count); + ASSERT_FALSE(ptr->_additional_ref_released); + ASSERT_EQ(0, UserData::SetFailedById(id)); + ASSERT_EQ(g_count - 1, ptr->count); ptr.reset(); ASSERT_EQ(nullptr, ptr); - ASSERT_NE(0, UserData::Address(id, &ptr)); + ASSERT_EQ(nullptr, g_user_data); } std::vector err_fd; @@ -385,10 +403,10 @@ class EventPipe; typedef brpc::VersionedRefWithIdUniquePtr EventPipeUniquePtr; -class EventPipe : public brpc::VersionedRefWithId { +class EventPipe : public brpc::VersionedRefWithId { public: explicit EventPipe(Forbidden f) - : brpc::VersionedRefWithId(f) + : brpc::VersionedRefWithId(f) , _event_data_id(INVALID_EVENT_PIPE_ID) , _pipe_fds{-1, -1} , _input_event_count(0) @@ -404,13 +422,15 @@ class EventPipe : public brpc::VersionedRefWithId { } private: - int OnCreate(const void*) override { +friend class VersionedRefWithId; + + int OnCreated() { brpc::EventDataOptions event_data_options { id(), HandleEpollIn, HandleEpollOut }; - if (brpc::EventData::Create(&event_data_options, &_event_data_id) != 0) { + if (brpc::EventData::Create(&_event_data_id, event_data_options) != 0) { LOG(ERROR) << "Fail to create EventData"; return -1; } @@ -434,7 +454,7 @@ class EventPipe : public brpc::VersionedRefWithId { return 0; } - void OnRecycle() override { + void BeforeRecycled() { brpc::MakeEventDataIdInvalid(_event_data_id); brpc::GetGlobalEventDispatcher(_pipe_fds[0], bthread_self_tag()) .RemoveConsumer(_pipe_fds[0]); @@ -484,7 +504,7 @@ class EventPipe : public brpc::VersionedRefWithId { TEST_F(EventDispatcherTest, customize_dispatch_task) { EventPipeId id = INVALID_EVENT_PIPE_ID; - ASSERT_EQ(0, EventPipe::Create(NULL, &id)); + ASSERT_EQ(0, EventPipe::Create(&id)); ASSERT_NE(INVALID_EVENT_PIPE_ID, id); EventPipeUniquePtr ptr; ASSERT_EQ(0, EventPipe::Address(id, &ptr)); diff --git a/test/brpc_load_balancer_unittest.cpp b/test/brpc_load_balancer_unittest.cpp index d98c11b5df..14b0131cf4 100644 --- a/test/brpc_load_balancer_unittest.cpp +++ b/test/brpc_load_balancer_unittest.cpp @@ -1107,7 +1107,7 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) { { brpc::SocketUniquePtr dummy_ptr; ASSERT_EQ(1, brpc::Socket::AddressFailedAsWell(ptr[0]->id(), &dummy_ptr)); - dummy_ptr->Revive(); + dummy_ptr->Revive(2); } bthread_usleep(brpc::FLAGS_detect_available_server_interval_ms * 1000); // After one server is revived, the reject rate should be 50% diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp index 1176676c95..573ab2ed5b 100644 --- a/test/brpc_redis_unittest.cpp +++ b/test/brpc_redis_unittest.cpp @@ -1080,7 +1080,7 @@ class MultiCommandHandler : public brpc::RedisCommandHandler { brpc::RedisCommandHandlerResult Run(const std::vector& args, brpc::RedisReply* output, - bool flush_batched) { + bool flush_batched) override { output->SetStatus("OK"); return brpc::REDIS_CMD_CONTINUE; } @@ -1093,7 +1093,7 @@ class MultiCommandHandler : public brpc::RedisCommandHandler { public: brpc::RedisCommandHandlerResult Run(const std::vector& args, brpc::RedisReply* output, - bool flush_batched) { + bool flush_batched) override { if (args[0] == "multi") { output->SetError("ERR duplicate multi"); return brpc::REDIS_CMD_CONTINUE;