From caf4c18fef4261363b1f8d0e2a0a4d537d995532 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Sun, 3 Mar 2024 21:50:36 +0800 Subject: [PATCH] EventDispatcher supports various IO types --- src/brpc/acceptor.cpp | 2 +- src/brpc/details/health_check.cpp | 3 +- src/brpc/event_dispatcher.cpp | 18 + src/brpc/event_dispatcher.h | 211 +++++++- src/brpc/event_dispatcher_epoll.cpp | 85 ++-- src/brpc/event_dispatcher_kqueue.cpp | 82 ++-- src/brpc/socket.cpp | 592 ++++++++++------------ src/brpc/socket.h | 107 ++-- src/brpc/socket_id.h | 16 +- src/brpc/socket_inl.h | 193 +------- src/brpc/versioned_ref_with_id.h | 624 ++++++++++++++++++++++++ src/butil/memory/scope_guard.h | 12 +- src/butil/type_traits.h | 36 +- test/brpc_event_dispatcher_unittest.cpp | 266 +++++++++- test/brpc_load_balancer_unittest.cpp | 2 +- test/brpc_redis_unittest.cpp | 4 +- 16 files changed, 1531 insertions(+), 722 deletions(-) create mode 100644 src/brpc/versioned_ref_with_id.h diff --git a/src/brpc/acceptor.cpp b/src/brpc/acceptor.cpp index d265725849..e8e6dcbeb6 100644 --- a/src/brpc/acceptor.cpp +++ b/src/brpc/acceptor.cpp @@ -321,7 +321,7 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) { // Always add this socket into `_socket_map' whether it // has been `SetFailed' or not, whether `Acceptor' is // running or not. Otherwise, `Acceptor::BeforeRecycle' - // may be called (inside Socket::OnRecycle) after `Acceptor' + // may be called (inside Socket::BeforeRecycled) after `Acceptor' // has been destroyed am->_socket_map.insert(socket_id, ConnectStatistics()); } diff --git a/src/brpc/details/health_check.cpp b/src/brpc/details/health_check.cpp index ea6e78747e..810082552d 100644 --- a/src/brpc/details/health_check.cpp +++ b/src/brpc/details/health_check.cpp @@ -210,7 +210,8 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { ptr->_ninflight_app_health_check.fetch_add( 1, butil::memory_order_relaxed); } - ptr->Revive(); + // See comments above. + ptr->Revive(2/*note*/); ptr->_hc_count = 0; if (!FLAGS_health_check_path.empty()) { HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s); diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp index 689e80da7a..53495ea6d7 100644 --- a/src/brpc/event_dispatcher.cpp +++ b/src/brpc/event_dispatcher.cpp @@ -71,6 +71,24 @@ EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) { return g_edisp[tag * FLAGS_event_dispatcher_num + index]; } +int IOEventData::OnCreated(const IOEventDataOptions& options) { + if (!options.input_cb) { + LOG(ERROR) << "Invalid input_cb=NULL"; + return -1; + } + if (!options.output_cb) { + LOG(ERROR) << "Invalid output_cb=NULL"; + return -1; + } + + _options = options; + return 0; +} + +void IOEventData::BeforeRecycled() { + _options = { NULL, NULL, NULL }; +} + } // namespace brpc #if defined(OS_LINUX) diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h index 0de74df650..51c404e2a0 100644 --- a/src/brpc/event_dispatcher.h +++ b/src/brpc/event_dispatcher.h @@ -21,16 +21,78 @@ #include "butil/macros.h" // DISALLOW_COPY_AND_ASSIGN #include "bthread/types.h" // bthread_t, bthread_attr_t -#include "brpc/socket.h" // Socket, SocketId +#include "brpc/versioned_ref_with_id.h" namespace brpc { +// Unique identifier of a IOEventData. +// Users shall store EventDataId instead of EventData and call EventData::Address() +// to convert the identifier to an unique_ptr at each access. Whenever a +// unique_ptr is not destructed, the enclosed EventData will not be recycled. +typedef VRefId IOEventDataId; + +const VRefId INVALID_IO_EVENT_DATA_ID = INVALID_VREF_ID; + +class IOEventData; + +typedef VersionedRefWithIdUniquePtr EventDataUniquePtr; + +// User callback type of input event and output event. +typedef int (*InputEventCallback) (void* id, uint32_t events, + const bthread_attr_t& thread_attr); +typedef InputEventCallback OutputEventCallback; + +struct IOEventDataOptions { + // Callback for input event. + InputEventCallback input_cb; + // Callback for output event. + OutputEventCallback output_cb; + // User data. + void* user_data; +}; + +// EventDispatcher finds IOEventData by IOEventDataId which is +// stored in epoll/kqueue data, and calls input/output event callback, +// so EventDispatcher supports various IO types, such as socket, +// pipe, eventfd, timerfd, etc. +class IOEventData : public VersionedRefWithId { +public: + explicit IOEventData(Forbidden f) + : VersionedRefWithId(f) + , _options{ NULL, NULL, NULL } {} + + DISALLOW_COPY_AND_ASSIGN(IOEventData); + + int CallInputEventCallback(uint32_t events, + const bthread_attr_t& thread_attr) { + return _options.input_cb(_options.user_data, events, thread_attr); + } + + int CallOutputEventCallback(uint32_t events, + const bthread_attr_t& thread_attr) { + return _options.output_cb(_options.user_data, events, thread_attr); + } + +private: +friend class VersionedRefWithId; + + int OnCreated(const IOEventDataOptions& options); + void BeforeRecycled(); + + IOEventDataOptions _options; +}; + +namespace rdma { +class RdmaEndpoint; +} + // Dispatch edge-triggered events of file descriptors to consumers // running in separate bthreads. class EventDispatcher { friend class Socket; friend class rdma::RdmaEndpoint; +template friend class IOEvent; public: EventDispatcher(); @@ -40,7 +102,7 @@ friend class rdma::RdmaEndpoint; // Use |*consumer_thread_attr| (if it's not NULL) as the attribute to // create bthreads running user callbacks. // Returns 0 on success, -1 otherwise. - virtual int Start(const bthread_attr_t* consumer_thread_attr); + virtual int Start(const bthread_attr_t* thread_attr); // True iff this dispatcher is running in a bthread bool Running() const; @@ -57,19 +119,19 @@ friend class rdma::RdmaEndpoint; // When the file descriptor is removed from internal epoll, the Socket // will be dereferenced once additionally. // Returns 0 on success, -1 otherwise. - int AddConsumer(SocketId socket_id, int fd); + int AddConsumer(IOEventDataId event_data_id, int fd); // Watch EPOLLOUT event on `fd' into epoll device. If `pollin' is // true, EPOLLIN event will also be included and EPOLL_CTL_MOD will // be used instead of EPOLL_CTL_ADD. When event arrives, // `Socket::HandleEpollOut' will be called with `socket_id' // Returns 0 on success, -1 otherwise and errno is set - int RegisterEvent(SocketId socket_id, int fd, bool pollin); + int RegisterEvent(IOEventDataId event_data_id, int fd, bool pollin); // Remove EPOLLOUT event on `fd'. If `pollin' is true, EPOLLIN event // will be kept and EPOLL_CTL_MOD will be used instead of EPOLL_CTL_DEL // Returns 0 on success, -1 otherwise and errno is set - int UnregisterEvent(SocketId socket_id, int fd, bool pollin); + int UnregisterEvent(IOEventDataId event_data_id, int fd, bool pollin); private: DISALLOW_COPY_AND_ASSIGN(EventDispatcher); @@ -83,8 +145,33 @@ friend class rdma::RdmaEndpoint; // Remove the file descriptor `fd' from epoll. int RemoveConsumer(int fd); - // The epoll to watch events. - int _epfd; + // Call user callback of input event and output event. + template + static int OnEvent(IOEventDataId event_data_id, uint32_t events, + const bthread_attr_t& thread_attr) { + EventDataUniquePtr data; + if (IOEventData::Address(event_data_id, &data) != 0) { + return -1; + } + return IsInputEvent ? + data->CallInputEventCallback(events, thread_attr) : + data->CallOutputEventCallback(events, thread_attr); + } + + static int CallInputEventCallback(IOEventDataId event_data_id, + uint32_t events, + const bthread_attr_t& thread_attr) { + return OnEvent(event_data_id, events, thread_attr); + } + + static int CallOutputEventCallback(IOEventDataId event_data_id, + uint32_t events, + const bthread_attr_t& thread_attr) { + return OnEvent(event_data_id, events, thread_attr); + } + + // The epoll/kqueue fd to watch events. + int _event_dispatcher_fd; // false unless Stop() is called. volatile bool _stop; @@ -93,7 +180,7 @@ friend class rdma::RdmaEndpoint; bthread_t _tid; // The attribute of bthreads calling user callbacks. - bthread_attr_t _consumer_thread_attr; + bthread_attr_t _thread_attr; // Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit int _wakeup_fds[2]; @@ -101,6 +188,114 @@ friend class rdma::RdmaEndpoint; EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag); +// IOEvent class manages the IO events of a file descriptor conveniently. +template +class IOEvent { +public: + IOEvent() + : _init(false) + , _event_data_id(INVALID_IO_EVENT_DATA_ID) + , _bthread_tag(bthread_self_tag()) {} + + ~IOEvent() { Reset(); } + + DISALLOW_COPY_AND_ASSIGN(IOEvent); + + int Init(void* user_data) { + if (_init) { + LOG(WARNING) << "IOEvent has been initialized"; + return 0; + } + IOEventDataOptions options{ OnInputEvent, OnOutputEvent, user_data }; + if (IOEventData::Create(&_event_data_id, options) != 0) { + LOG(ERROR) << "Fail to create EventData"; + return -1; + } + _init = true; + return 0; + } + + void Reset() { + if (_init) { + IOEventData::SetFailedById(_event_data_id); + _init = false; + } + } + + // See comments of `EventDispatcher::AddConsumer'. + int AddConsumer(int fd) { + if (!_init) { + LOG(ERROR) << "IOEvent has not been initialized"; + return -1; + } + return GetGlobalEventDispatcher(fd, _bthread_tag) + .AddConsumer(_event_data_id, fd); + } + + // See comments of `EventDispatcher::RemoveConsumer'. + int RemoveConsumer(int fd) { + if (!_init) { + LOG(ERROR) << "IOEvent has not been initialized"; + return -1; + } + return GetGlobalEventDispatcher(fd, _bthread_tag).RemoveConsumer(fd); + } + + // See comments of `EventDispatcher::RegisterEvent'. + int RegisterEvent(int fd, bool pollin) { + if (!_init) { + LOG(ERROR) << "IOEvent has not been initialized"; + return -1; + } + return GetGlobalEventDispatcher(fd, _bthread_tag) + .RegisterEvent(_event_data_id, fd, pollin); + } + + // See comments of `EventDispatcher::UnregisterEvent'. + int UnregisterEvent(int fd, bool pollin) { + if (!_init) { + LOG(ERROR) << "IOEvent has not been initialized"; + return -1; + } + return GetGlobalEventDispatcher(fd, _bthread_tag) + .UnregisterEvent(_event_data_id, fd, pollin); + } + + void set_bthread_tag(bthread_tag_t bthread_tag) { + _bthread_tag = bthread_tag; + } + bthread_tag_t bthread_tag() const { + return _bthread_tag; + } + +private: + // Generic callback to handle input event. + static int OnInputEvent(void* user_data, uint32_t events, + const bthread_attr_t& thread_attr) { + static_assert( + butil::is_result_int::value, + "T::OnInputEvent signature mismatch"); + return T::OnInputEvent(user_data, events, thread_attr); + } + + // Generic callback to handle output event. + static int OnOutputEvent(void* user_data, uint32_t events, + const bthread_attr_t& thread_attr) { + static_assert( + butil::is_result_int::value, + "T::OnInputEvent signature mismatch"); + return T::OnOutputEvent(user_data, events, thread_attr); + } + + bool _init; + IOEventDataId _event_data_id; + bthread_tag_t _bthread_tag; +}; + } // namespace brpc diff --git a/src/brpc/event_dispatcher_epoll.cpp b/src/brpc/event_dispatcher_epoll.cpp index 1ac7647d97..64717b1623 100644 --- a/src/brpc/event_dispatcher_epoll.cpp +++ b/src/brpc/event_dispatcher_epoll.cpp @@ -23,17 +23,16 @@ namespace brpc { EventDispatcher::EventDispatcher() - : _epfd(-1) + : _event_dispatcher_fd(-1) , _stop(false) , _tid(0) - , _consumer_thread_attr(BTHREAD_ATTR_NORMAL) -{ - _epfd = epoll_create(1024 * 1024); - if (_epfd < 0) { + , _thread_attr(BTHREAD_ATTR_NORMAL) { + _event_dispatcher_fd = epoll_create(1024 * 1024); + if (_event_dispatcher_fd < 0) { PLOG(FATAL) << "Fail to create epoll"; return; } - CHECK_EQ(0, butil::make_close_on_exec(_epfd)); + CHECK_EQ(0, butil::make_close_on_exec(_event_dispatcher_fd)); _wakeup_fds[0] = -1; _wakeup_fds[1] = -1; @@ -46,9 +45,9 @@ EventDispatcher::EventDispatcher() EventDispatcher::~EventDispatcher() { Stop(); Join(); - if (_epfd >= 0) { - close(_epfd); - _epfd = -1; + if (_event_dispatcher_fd >= 0) { + close(_event_dispatcher_fd); + _event_dispatcher_fd = -1; } if (_wakeup_fds[0] > 0) { close(_wakeup_fds[0]); @@ -57,7 +56,7 @@ EventDispatcher::~EventDispatcher() { } int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { - if (_epfd < 0) { + if (_event_dispatcher_fd < 0) { LOG(FATAL) << "epoll was not created"; return -1; } @@ -68,22 +67,21 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { return -1; } - // Set _consumer_thread_attr before creating epoll thread to make sure + // Set _thread_attr before creating epoll thread to make sure // everyting seems sane to the thread. - _consumer_thread_attr = (consumer_thread_attr ? - *consumer_thread_attr : BTHREAD_ATTR_NORMAL); + _thread_attr = consumer_thread_attr ? + *consumer_thread_attr : BTHREAD_ATTR_NORMAL; - //_consumer_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread + //_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread // that created by epoll_wait() never to quit. - bthread_attr_t epoll_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT; + bthread_attr_t epoll_thread_attr = _thread_attr | BTHREAD_NEVER_QUIT; // Polling thread uses the same attr for consumer threads (NORMAL right // now). Previously, we used small stack (32KB) which may be overflowed // when the older comlog (e.g. 3.1.85) calls com_openlog_r(). Since this // is also a potential issue for consumer threads, using the same attr // should be a reasonable solution. - int rc = bthread_start_background( - &_tid, &epoll_thread_attr, RunThis, this); + int rc = bthread_start_background(&_tid, &epoll_thread_attr, RunThis, this); if (rc) { LOG(FATAL) << "Fail to create epoll thread: " << berror(rc); return -1; @@ -92,15 +90,15 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { } bool EventDispatcher::Running() const { - return !_stop && _epfd >= 0 && _tid != 0; + return !_stop && _event_dispatcher_fd >= 0 && _tid != 0; } void EventDispatcher::Stop() { _stop = true; - if (_epfd >= 0) { + if (_event_dispatcher_fd >= 0) { epoll_event evt = { EPOLLOUT, { NULL } }; - epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt); + epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt); } } @@ -111,62 +109,62 @@ void EventDispatcher::Join() { } } -int EventDispatcher::RegisterEvent(SocketId socket_id, int fd, bool pollin) { - if (_epfd < 0) { +int EventDispatcher::RegisterEvent(IOEventDataId event_data_id, + int fd, bool pollin) { + if (_event_dispatcher_fd < 0) { errno = EINVAL; return -1; } epoll_event evt; - evt.data.u64 = socket_id; + evt.data.u64 = event_data_id; evt.events = EPOLLOUT | EPOLLET; #ifdef BRPC_SOCKET_HAS_EOF evt.events |= has_epollrdhup; #endif if (pollin) { evt.events |= EPOLLIN; - if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) { + if (epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_MOD, fd, &evt) < 0) { // This fd has been removed from epoll via `RemoveConsumer', // in which case errno will be ENOENT return -1; } } else { - if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0) { + if (epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_ADD, fd, &evt) < 0) { return -1; } } return 0; } -int EventDispatcher::UnregisterEvent(SocketId socket_id, - int fd, bool pollin) { +int EventDispatcher::UnregisterEvent(IOEventDataId event_data_id, + int fd, bool pollin) { if (pollin) { epoll_event evt; - evt.data.u64 = socket_id; + evt.data.u64 = event_data_id; evt.events = EPOLLIN | EPOLLET; #ifdef BRPC_SOCKET_HAS_EOF evt.events |= has_epollrdhup; #endif - return epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt); + return epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_MOD, fd, &evt); } else { - return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL); + return epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_DEL, fd, NULL); } return -1; } -int EventDispatcher::AddConsumer(SocketId socket_id, int fd) { - if (_epfd < 0) { +int EventDispatcher::AddConsumer(IOEventDataId event_data_id, int fd) { + if (_event_dispatcher_fd < 0) { errno = EINVAL; return -1; } epoll_event evt; + evt.data.u64 = event_data_id; evt.events = EPOLLIN | EPOLLET; - evt.data.u64 = socket_id; #ifdef BRPC_SOCKET_HAS_EOF evt.events |= has_epollrdhup; #endif - return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt); - return -1; + return epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_ADD, fd, &evt); } int EventDispatcher::RemoveConsumer(int fd) { @@ -180,8 +178,8 @@ int EventDispatcher::RemoveConsumer(int fd) { // from epoll again! If the fd was level-triggered and there's data left, // epoll_wait will keep returning events of the fd continuously, making // program abnormal. - if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) { - PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd; + if (epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_DEL, fd, NULL) < 0) { + PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _event_dispatcher_fd; return -1; } return 0; @@ -197,12 +195,12 @@ void EventDispatcher::Run() { epoll_event e[32]; #ifdef BRPC_ADDITIONAL_EPOLL // Performance downgrades in examples. - int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0); + int n = epoll_wait(_event_dispatcher_fd, e, ARRAY_SIZE(e), 0); if (n == 0) { - n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1); + n = epoll_wait(_event_dispatcher_fd, e, ARRAY_SIZE(e), -1); } #else - const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1); + const int n = epoll_wait(_event_dispatcher_fd, e, ARRAY_SIZE(e), -1); #endif if (_stop) { // epoll_ctl/epoll_wait should have some sort of memory fencing @@ -215,7 +213,7 @@ void EventDispatcher::Run() { // We've checked _stop, no wake-up will be missed. continue; } - PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd; + PLOG(FATAL) << "Fail to epoll_wait epfd=" << _event_dispatcher_fd; break; } for (int i = 0; i < n; ++i) { @@ -225,14 +223,13 @@ void EventDispatcher::Run() { #endif ) { // We don't care about the return value. - Socket::StartInputEvent(e[i].data.u64, e[i].events, - _consumer_thread_attr); + CallInputEventCallback(e[i].data.u64, e[i].events, _thread_attr); } } for (int i = 0; i < n; ++i) { if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) { // We don't care about the return value. - Socket::HandleEpollOut(e[i].data.u64); + CallOutputEventCallback(e[i].data.u64, e[i].events, _thread_attr); } } } diff --git a/src/brpc/event_dispatcher_kqueue.cpp b/src/brpc/event_dispatcher_kqueue.cpp index fa52a20450..97ad29bba6 100644 --- a/src/brpc/event_dispatcher_kqueue.cpp +++ b/src/brpc/event_dispatcher_kqueue.cpp @@ -23,17 +23,16 @@ namespace brpc { EventDispatcher::EventDispatcher() - : _epfd(-1) + : _event_dispatcher_fd(-1) , _stop(false) , _tid(0) - , _consumer_thread_attr(BTHREAD_ATTR_NORMAL) -{ - _epfd = kqueue(); - if (_epfd < 0) { + , _thread_attr(BTHREAD_ATTR_NORMAL) { + _event_dispatcher_fd = kqueue(); + if (_event_dispatcher_fd < 0) { PLOG(FATAL) << "Fail to create kqueue"; return; } - CHECK_EQ(0, butil::make_close_on_exec(_epfd)); + CHECK_EQ(0, butil::make_close_on_exec(_event_dispatcher_fd)); _wakeup_fds[0] = -1; _wakeup_fds[1] = -1; @@ -46,9 +45,9 @@ EventDispatcher::EventDispatcher() EventDispatcher::~EventDispatcher() { Stop(); Join(); - if (_epfd >= 0) { - close(_epfd); - _epfd = -1; + if (_event_dispatcher_fd >= 0) { + close(_event_dispatcher_fd); + _event_dispatcher_fd = -1; } if (_wakeup_fds[0] > 0) { close(_wakeup_fds[0]); @@ -56,8 +55,8 @@ EventDispatcher::~EventDispatcher() { } } -int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { - if (_epfd < 0) { +int EventDispatcher::Start(const bthread_attr_t* thread_attr) { + if (_event_dispatcher_fd < 0) { LOG(FATAL) << "kqueue was not created"; return -1; } @@ -68,14 +67,13 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { return -1; } - // Set _consumer_thread_attr before creating kqueue thread to make sure + // Set _thread_attr before creating kqueue thread to make sure // everyting seems sane to the thread. - _consumer_thread_attr = (consumer_thread_attr ? - *consumer_thread_attr : BTHREAD_ATTR_NORMAL); + _thread_attr = (thread_attr ? *thread_attr : BTHREAD_ATTR_NORMAL); - //_consumer_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread + //_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread // that created by kevent() never to quit. - bthread_attr_t kqueue_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT; + bthread_attr_t kqueue_thread_attr = _thread_attr | BTHREAD_NEVER_QUIT; // Polling thread uses the same attr for consumer threads (NORMAL right // now). Previously, we used small stack (32KB) which may be overflowed @@ -92,17 +90,17 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { } bool EventDispatcher::Running() const { - return !_stop && _epfd >= 0 && _tid != 0; + return !_stop && _event_dispatcher_fd >= 0 && _tid != 0; } void EventDispatcher::Stop() { _stop = true; - if (_epfd >= 0) { + if (_event_dispatcher_fd >= 0) { struct kevent kqueue_event; EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, NULL); - kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL); + kevent(_event_dispatcher_fd, &kqueue_event, 1, NULL, 0, NULL); } } @@ -113,8 +111,9 @@ void EventDispatcher::Join() { } } -int EventDispatcher::RegisterEvent(SocketId socket_id, int fd, bool pollin) { - if (_epfd < 0) { +int EventDispatcher::RegisterEvent(IOEventDataId event_data_id, + int fd, bool pollin) { + if (_event_dispatcher_fd < 0) { errno = EINVAL; return -1; } @@ -122,44 +121,44 @@ int EventDispatcher::RegisterEvent(SocketId socket_id, int fd, bool pollin) { struct kevent evt; //TODO(zhujiashun): add EV_EOF EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, - 0, 0, (void*)socket_id); - if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) { + 0, 0, (void*)event_data_id); + if (kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL) < 0) { return -1; } if (pollin) { EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, - 0, 0, (void*)socket_id); - if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) { + 0, 0, (void*)event_data_id); + if (kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL) < 0) { return -1; } } return 0; } -int EventDispatcher::UnregisterEvent(SocketId socket_id, - int fd, bool pollin) { +int EventDispatcher::UnregisterEvent(IOEventDataId event_data_id, + int fd, bool pollin) { struct kevent evt; EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) { + if (kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL) < 0) { return -1; } if (pollin) { EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, - 0, 0, (void*)socket_id); - return kevent(_epfd, &evt, 1, NULL, 0, NULL); + 0, 0, (void*)event_data_id); + return kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL); } return 0; } -int EventDispatcher::AddConsumer(SocketId socket_id, int fd) { - if (_epfd < 0) { +int EventDispatcher::AddConsumer(IOEventDataId event_data_id, int fd) { + if (_event_dispatcher_fd < 0) { errno = EINVAL; return -1; } struct kevent evt; EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, - 0, 0, (void*)socket_id); - return kevent(_epfd, &evt, 1, NULL, 0, NULL); + 0, 0, (void*)event_data_id); + return kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL); } int EventDispatcher::RemoveConsumer(int fd) { @@ -175,9 +174,9 @@ int EventDispatcher::RemoveConsumer(int fd) { // program abnormal. struct kevent evt; EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); - kevent(_epfd, &evt, 1, NULL, 0, NULL); + kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL); EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - kevent(_epfd, &evt, 1, NULL, 0, NULL); + kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL); return 0; } @@ -189,7 +188,7 @@ void* EventDispatcher::RunThis(void* arg) { void EventDispatcher::Run() { while (!_stop) { struct kevent e[32]; - int n = kevent(_epfd, NULL, 0, e, ARRAY_SIZE(e), NULL); + int n = kevent(_event_dispatcher_fd, NULL, 0, e, ARRAY_SIZE(e), NULL); if (_stop) { // EV_SET/kevent should have some sort of memory fencing // guaranteeing that we(after kevent) see _stop set before @@ -201,20 +200,21 @@ void EventDispatcher::Run() { // We've checked _stop, no wake-up will be missed. continue; } - PLOG(FATAL) << "Fail to kqueue epfd=" << _epfd; + PLOG(FATAL) << "Fail to kqueue epfd=" << _event_dispatcher_fd; break; } for (int i = 0; i < n; ++i) { if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) { // We don't care about the return value. - Socket::StartInputEvent((SocketId)e[i].udata, e[i].filter, - _consumer_thread_attr); + CallInputEventCallback((IOEventDataId)e[i].udata, + e[i].filter, _thread_attr); } } for (int i = 0; i < n; ++i) { if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) { // We don't care about the return value. - Socket::HandleEpollOut((SocketId)e[i].udata); + CallOutputEventCallback((IOEventDataId)e[i].udata, + e[i].filter, _thread_attr); } } } diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 447bac5fe7..85aa150739 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -35,6 +35,7 @@ #include "butil/logging.h" // CHECK #include "butil/macros.h" #include "butil/class_name.h" // butil::class_name +#include "butil/memory/scope_guard.h" #include "brpc/log.h" #include "brpc/reloadable_flags.h" // BRPC_VALIDATE_GFLAG #include "brpc/errno.pb.h" @@ -447,9 +448,9 @@ class Socket::EpollOutRequest : public SocketUser { static const uint64_t AUTH_FLAG = (1ul << 32); -Socket::Socket(Forbidden) +Socket::Socket(Forbidden f) // must be even because Address() relies on evenness of version - : _versioned_ref(0) + : VersionedRefWithId(f) , _shared_part(NULL) , _nevent(0) , _keytable_pool(NULL) @@ -459,7 +460,6 @@ Socket::Socket(Forbidden) , _on_edge_triggered_events(NULL) , _user(NULL) , _conn(NULL) - , _this_id(0) , _preferred_index(-1) , _hc_count(0) , _last_msg_size(0) @@ -483,7 +483,6 @@ Socket::Socket(Forbidden) , _overcrowded(false) , _fail_me_at_server_stop(false) , _logoff_flag(false) - , _additional_ref_status(REF_USING) , _error_code(0) , _pipeline_q(NULL) , _last_writetime_us(0) @@ -494,8 +493,7 @@ Socket::Socket(Forbidden) , _stream_set(NULL) , _total_streams_unconsumed_size(0) , _ninflight_app_health_check(0) - , _http_request_method(HTTP_METHOD_GET) -{ + , _http_request_method(HTTP_METHOD_GET) { CreateVarsOnce(); pthread_mutex_init(&_id_wait_list_mutex, NULL); _epollout_butex = bthread::butex_create_checked >(); @@ -621,7 +619,7 @@ int Socket::ResetFileDescriptor(int fd) { EnableKeepaliveIfNeeded(fd); if (_on_edge_triggered_events) { - if (GetGlobalEventDispatcher(fd, _bthread_tag).AddConsumer(id(), fd) != 0) { + if (_io_event.AddConsumer(fd) != 0) { PLOG(ERROR) << "Fail to add SocketId=" << id() << " into EventDispatcher"; _fd.store(-1, butil::memory_order_release); @@ -698,116 +696,260 @@ void Socket::EnableKeepaliveIfNeeded(int fd) { // version: from version part of _versioned_nref, must be an EVEN number. // slot: designated by ResourcePool. int Socket::Create(const SocketOptions& options, SocketId* id) { - butil::ResourceId slot; - Socket* const m = butil::get_resource(&slot, Forbidden()); - if (m == NULL) { - LOG(FATAL) << "Fail to get_resource"; + return VersionedRefWithId::Create(id, options); +} + +int Socket::OnCreated(const SocketOptions& options) { + if (_io_event.Init((void*)id()) != 0) { + LOG(ERROR) << "Fail to init IOEvent"; + SetFailed(ENOMEM, "%s", "Fail to init IOEvent"); return -1; } + _io_event.set_bthread_tag(options.bthread_tag); + auto guard = butil::MakeScopeGuard([this] { + _io_event.Reset(); + }); + g_vars->nsocket << 1; - CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed)); - m->_nevent.store(0, butil::memory_order_relaxed); - m->_keytable_pool = options.keytable_pool; - m->_tos = 0; - m->_remote_side = options.remote_side; - m->_on_edge_triggered_events = options.on_edge_triggered_events; - m->_user = options.user; - m->_conn = options.conn; - m->_app_connect = options.app_connect; - // nref can be non-zero due to concurrent AddressSocket(). - // _this_id will only be used in destructor/Destroy of referenced - // slots, which is safe and properly fenced. Although it's better - // to put the id into SocketUniquePtr. - m->_this_id = MakeSocketId( - VersionOfVRef(m->_versioned_ref.fetch_add( - 1, butil::memory_order_release)), slot); - m->_preferred_index = -1; - m->_hc_count = 0; - CHECK(m->_read_buf.empty()); + CHECK(NULL == _shared_part.load(butil::memory_order_relaxed)); + _nevent.store(0, butil::memory_order_relaxed); + _keytable_pool = options.keytable_pool; + _tos = 0; + _remote_side = options.remote_side; + _on_edge_triggered_events = options.on_edge_triggered_events; + _user = options.user; + _conn = options.conn; + _app_connect = options.app_connect; + _preferred_index = -1; + _hc_count = 0; + CHECK(_read_buf.empty()); const int64_t cpuwide_now = butil::cpuwide_time_us(); - m->_last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed); - m->reset_parsing_context(options.initial_parsing_context); - m->_correlation_id = 0; - m->_health_check_interval_s = options.health_check_interval_s; - m->_is_hc_related_ref_held = false; - m->_hc_started.store(false, butil::memory_order_relaxed); - m->_ninprocess.store(1, butil::memory_order_relaxed); - m->_auth_flag_error.store(0, butil::memory_order_relaxed); - const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL); + _last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed); + reset_parsing_context(options.initial_parsing_context); + _correlation_id = 0; + _health_check_interval_s = options.health_check_interval_s; + _is_hc_related_ref_held = false; + _hc_started.store(false, butil::memory_order_relaxed); + _ninprocess.store(1, butil::memory_order_relaxed); + _auth_flag_error.store(0, butil::memory_order_relaxed); + const int rc2 = bthread_id_create(&_auth_id, NULL, NULL); if (rc2) { LOG(ERROR) << "Fail to create auth_id: " << berror(rc2); - m->SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2)); + SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2)); return -1; } - m->_force_ssl = options.force_ssl; + _force_ssl = options.force_ssl; // Disable SSL check if there is no SSL context - m->_ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN); - m->_ssl_session = NULL; - m->_ssl_ctx = options.initial_ssl_ctx; + _ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN); + _ssl_session = NULL; + _ssl_ctx = options.initial_ssl_ctx; #if BRPC_WITH_RDMA - CHECK(m->_rdma_ep == NULL); + CHECK(_rdma_ep == NULL); if (options.use_rdma) { - m->_rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(m); - if (!m->_rdma_ep) { + _rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(m); + if (!_rdma_ep) { const int saved_errno = errno; PLOG(ERROR) << "Fail to create RdmaEndpoint"; - m->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s", + SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s", berror(saved_errno)); return -1; } - m->_rdma_state = RDMA_UNKNOWN; + _rdma_state = RDMA_UNKNOWN; } else { - m->_rdma_state = RDMA_OFF; + _rdma_state = RDMA_OFF; } #endif - m->_connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN; - m->_controller_released_socket.store(false, butil::memory_order_relaxed); - m->_overcrowded = false; - // May be non-zero for RTMP connections. - m->_fail_me_at_server_stop = false; - m->_logoff_flag.store(false, butil::memory_order_relaxed); - m->_additional_ref_status.store(REF_USING, butil::memory_order_relaxed); - m->_error_code = 0; - m->_error_text.clear(); - m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed); - m->_total_streams_unconsumed_size.store(0, butil::memory_order_relaxed); - m->_ninflight_app_health_check.store(0, butil::memory_order_relaxed); + _connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN; + _controller_released_socket.store(false, butil::memory_order_relaxed); + _overcrowded = false; + // Maybe non-zero for RTMP connections. + _fail_me_at_server_stop = false; + _logoff_flag.store(false, butil::memory_order_relaxed); + _error_code = 0; + _agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed); + _total_streams_unconsumed_size.store(0, butil::memory_order_relaxed); + _ninflight_app_health_check.store(0, butil::memory_order_relaxed); // NOTE: last two params are useless in bthread > r32787 - const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512); + const int rc = bthread_id_list_init(&_id_wait_list, 512, 512); if (rc) { LOG(ERROR) << "Fail to init _id_wait_list: " << berror(rc); - m->SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc)); + SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc)); return -1; } - m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed); - m->_unwritten_bytes.store(0, butil::memory_order_relaxed); - m->_keepalive_options = options.keepalive_options; - m->_bthread_tag = options.bthread_tag; - CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed)); - m->_is_write_shutdown = false; - // Must be last one! Internal fields of this Socket may be access + _last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed); + _unwritten_bytes.store(0, butil::memory_order_relaxed); + _keepalive_options = options.keepalive_options; + CHECK(NULL == _write_head.load(butil::memory_order_relaxed)); + _is_write_shutdown = false; + // Must be the last one! Internal fields of this Socket may be accessed // just after calling ResetFileDescriptor. - if (m->ResetFileDescriptor(options.fd) != 0) { + if (ResetFileDescriptor(options.fd) != 0) { const int saved_errno = errno; PLOG(ERROR) << "Fail to ResetFileDescriptor"; - m->SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s", + SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s", berror(saved_errno)); return -1; } - *id = m->_this_id; + guard.dismiss(); + return 0; } +void Socket::BeforeRecycled() { + const bool create_by_connect = CreatedByConnect(); + if (_app_connect) { + std::shared_ptr tmp; + _app_connect.swap(tmp); + tmp->StopConnect(this); + } + if (_conn) { + SocketConnection* const saved_conn = _conn; + _conn = NULL; + saved_conn->BeforeRecycle(this); + } + if (_user) { + SocketUser* const saved_user = _user; + _user = NULL; + saved_user->BeforeRecycle(this); + } + SharedPart* sp = _shared_part.exchange(NULL, butil::memory_order_acquire); + if (sp) { + sp->RemoveRefManually(); + } + + const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed); + if (ValidFileDescriptor(prev_fd)) { + if (_on_edge_triggered_events != NULL) { + _io_event.RemoveConsumer(prev_fd); + } + close(prev_fd); + if (create_by_connect) { + g_vars->channel_conn << -1; + } + } + _io_event.Reset(); + +#if BRPC_WITH_RDMA + if (_rdma_ep) { + delete _rdma_ep; + _rdma_ep = NULL; + _rdma_state = RDMA_UNKNOWN; + } +#endif + + reset_parsing_context(NULL); + _read_buf.clear(); + + _auth_flag_error.store(0, butil::memory_order_relaxed); + bthread_id_error(_auth_id, 0); + + bthread_id_list_destroy(&_id_wait_list); + + if (_ssl_session) { + SSL_free(_ssl_session); + _ssl_session = NULL; + } + + _ssl_ctx = NULL; + + delete _pipeline_q; + _pipeline_q = NULL; + + delete _auth_context; + _auth_context = NULL; + + delete _stream_set; + _stream_set = NULL; + + const SocketId asid = _agent_socket_id.load(butil::memory_order_relaxed); + if (asid != INVALID_SOCKET_ID) { + SocketUniquePtr ptr; + if (Socket::Address(asid, &ptr) == 0) { + ptr->ReleaseAdditionalReference(); + } + } + g_vars->nsocket << -1; +} + +void Socket::OnFailed(int error_code, const std::string& error_text) { + // Update _error_text + pthread_mutex_lock(&_id_wait_list_mutex); + _error_code = error_code; + _error_text = error_text; + pthread_mutex_unlock(&_id_wait_list_mutex); + + // Do health-checking even if we're not connected before, needed + // by Channel to revive never-connected socket when server side + // comes online. + if (HCEnabled()) { + bool expect = false; + if (_hc_started.compare_exchange_strong(expect, + true, + butil::memory_order_relaxed, + butil::memory_order_relaxed)) { + GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); + StartHealthCheck(id(), + GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms()); + } else { + // No need to run 2 health checking at the same time. + RPC_VLOG << "There is already a health checking running " + "for SocketId=" << id(); + } + } + // Wake up all threads waiting on EPOLLOUT when closing fd + _epollout_butex->fetch_add(1, butil::memory_order_relaxed); + bthread::butex_wake_all(_epollout_butex); + + // Wake up all unresponded RPC. + CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe( + &_id_wait_list, error_code, error_text, + &_id_wait_list_mutex)); + ResetAllStreams(error_code, error_text); + // _app_connect shouldn't be set to NULL in SetFailed otherwise + // HC is always not supported. + // FIXME: Design a better interface for AppConnect + // if (_app_connect) { + // AppConnect* const saved_app_connect = _app_connect; + // _app_connect = NULL; + // saved_app_connect->StopConnect(this); + // } +} + +void Socket::AfterRevived() { + if (_user) { + _user->AfterRevived(this); + } else { + LOG(INFO) << "Revived " << description() << " (Connectable)"; + } +} + +std::string Socket::OnDescription() const { + // NOTE: The output of `description()' should be consistent with operator<<() + std::string result; + result.reserve(64); + const int saved_fd = fd(); + if (saved_fd >= 0) { + butil::string_appendf(&result, "fd=%d", saved_fd); + } + butil::string_appendf(&result, " addr=%s", + butil::endpoint2str(remote_side()).c_str()); + const int local_port = local_side().port; + if (local_port > 0) { + butil::string_appendf(&result, ":%d", local_port); + } + return result; +} + int Socket::WaitAndReset(int32_t expected_nref) { - const uint32_t id_ver = VersionOfSocketId(_this_id); + const uint32_t id_ver = VersionOfVRefId(id()); uint64_t vref; // Wait until nref == expected_nref. - while (1) { + while (true) { // The acquire fence pairs with release fence in Dereference to avoid // inconsistent states to be seen by others. - vref = _versioned_ref.load(butil::memory_order_acquire); + vref = versioned_ref(); if (VersionOfVRef(vref) != id_ver + 1) { - LOG(WARNING) << "SocketId=" << _this_id << " is already alive or recycled"; + LOG(WARNING) << "SocketId=" << id() << " is already alive or recycled"; return -1; } if (NRefOfVRef(vref) > expected_nref) { @@ -816,7 +958,7 @@ int Socket::WaitAndReset(int32_t expected_nref) { return -1; } } else if (NRefOfVRef(vref) < expected_nref) { - RPC_VLOG << "SocketId=" << _this_id + RPC_VLOG << "SocketId=" << id() << " was abandoned during health checking"; return -1; } else { @@ -824,7 +966,7 @@ int Socket::WaitAndReset(int32_t expected_nref) { // so no need to do health checking. if (!_is_hc_related_ref_held) { RPC_VLOG << "Nobody holds a health-checking-related reference" - << " for SocketId=" << _this_id; + << " for SocketId=" << id(); return -1; } @@ -836,7 +978,7 @@ int Socket::WaitAndReset(int32_t expected_nref) { const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed); if (ValidFileDescriptor(prev_fd)) { if (_on_edge_triggered_events != NULL) { - GetGlobalEventDispatcher(prev_fd, _bthread_tag).RemoveConsumer(prev_fd); + _io_event.RemoveConsumer(prev_fd); } close(prev_fd); if (CreatedByConnect()) { @@ -892,59 +1034,6 @@ int Socket::WaitAndReset(int32_t expected_nref) { return 0; } -// We don't care about the return value of Revive. -void Socket::Revive() { - const uint32_t id_ver = VersionOfSocketId(_this_id); - uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed); - _additional_ref_status.store(REF_REVIVING, butil::memory_order_relaxed); - while (1) { - CHECK_EQ(id_ver + 1, VersionOfVRef(vref)); - - int32_t nref = NRefOfVRef(vref); - if (nref <= 1) { - // Set status to REF_RECYLED since no one uses this socket - _additional_ref_status.store(REF_RECYCLED, butil::memory_order_relaxed); - CHECK_EQ(1, nref); - LOG(WARNING) << *this << " was abandoned during revival"; - return; - } - // +1 is the additional ref added in Create(). TODO(gejun): we should - // remove this additional nref someday. - if (_versioned_ref.compare_exchange_weak( - vref, MakeVRef(id_ver, nref + 1/*note*/), - butil::memory_order_release, - butil::memory_order_relaxed)) { - // Set status to REF_USING since we add additional ref again - _additional_ref_status.store(REF_USING, butil::memory_order_relaxed); - if (_user) { - _user->AfterRevived(this); - } else { - LOG(INFO) << "Revived " << *this << " (Connectable)"; - } - return; - } - } -} - -int Socket::ReleaseAdditionalReference() { - do { - AdditionalRefStatus expect = REF_USING; - if (_additional_ref_status.compare_exchange_strong( - expect, - REF_RECYCLED, - butil::memory_order_relaxed, - butil::memory_order_relaxed)) { - return Dereference(); - } - - if (expect == REF_REVIVING) { // sched_yield to wait until status is not REF_REVIVING - sched_yield(); - } else { - return -1; // REF_RECYCLED - } - } while (1); -} - void Socket::AddRecentError() { SharedPart* sp = GetSharedPart(); if (sp) { @@ -968,87 +1057,6 @@ int Socket::isolated_times() const { return 0; } -int Socket::SetFailed(int error_code, const char* error_fmt, ...) { - if (error_code == 0) { - CHECK(false) << "error_code is 0"; - error_code = EFAILEDSOCKET; - } - const uint32_t id_ver = VersionOfSocketId(_this_id); - uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed); - for (;;) { // need iteration to retry compare_exchange_strong - if (VersionOfVRef(vref) != id_ver) { - return -1; - } - // Try to set version=id_ver+1 (to make later Address() return NULL), - // retry on fail. - if (_versioned_ref.compare_exchange_strong( - vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)), - butil::memory_order_release, - butil::memory_order_relaxed)) { - // Update _error_text - std::string error_text; - if (error_fmt != NULL) { - va_list ap; - va_start(ap, error_fmt); - butil::string_vprintf(&error_text, error_fmt, ap); - va_end(ap); - } - pthread_mutex_lock(&_id_wait_list_mutex); - _error_code = error_code; - _error_text = error_text; - pthread_mutex_unlock(&_id_wait_list_mutex); - - // Do health-checking even if we're not connected before, needed - // by Channel to revive never-connected socket when server side - // comes online. - if (HCEnabled()) { - bool expect = false; - if (_hc_started.compare_exchange_strong(expect, - true, - butil::memory_order_relaxed, - butil::memory_order_relaxed)) { - GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); - StartHealthCheck(id(), - GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms()); - } else { - // No need to run 2 health checking at the same time. - RPC_VLOG << "There is already a health checking running " - "for SocketId=" << _this_id; - } - } - // Wake up all threads waiting on EPOLLOUT when closing fd - _epollout_butex->fetch_add(1, butil::memory_order_relaxed); - bthread::butex_wake_all(_epollout_butex); - - // Wake up all unresponded RPC. - CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe( - &_id_wait_list, error_code, error_text, - &_id_wait_list_mutex)); - - ResetAllStreams(error_code, error_text); - // _app_connect shouldn't be set to NULL in SetFailed otherwise - // HC is always not supported. - // FIXME: Design a better interface for AppConnect - // if (_app_connect) { - // AppConnect* const saved_app_connect = _app_connect; - // _app_connect = NULL; - // saved_app_connect->StopConnect(this); - // } - - // Deref additionally which is added at creation so that this - // Socket's reference will hit 0(recycle) when no one addresses it. - ReleaseAdditionalReference(); - // NOTE: This Socket may be recycled at this point, don't - // touch anything. - return 0; - } - } -} - -int Socket::SetFailed() { - return SetFailed(EFAILEDSOCKET, NULL); -} - void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) { if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code, latency_us)) { if (SetFailed(main_socket_id()) == 0) { @@ -1075,12 +1083,29 @@ int Socket::ReleaseReferenceIfIdle(int idle_seconds) { return ReleaseAdditionalReference(); } + +int Socket::SetFailed() { + return SetFailed(EFAILEDSOCKET, NULL); +} + +int Socket::SetFailed(int error_code, const char* error_fmt, ...) { + std::string error_text; + if (error_fmt != NULL) { + va_list ap; + va_start(ap, error_fmt); + butil::string_vprintf(&error_text, error_fmt, ap); + va_end(ap); + } + return VersionedRefWithId::SetFailed(error_code, error_text); +} + int Socket::SetFailed(SocketId id) { SocketUniquePtr ptr; if (Address(id, &ptr) != 0) { return -1; } - return ptr->SetFailed(); + + return ptr->SetFailed(EFAILEDSOCKET, NULL); } void Socket::NotifyOnFailed(bthread_id_t id) { @@ -1101,16 +1126,16 @@ void Socket::NotifyOnFailed(bthread_id_t id) { // For unit-test. int Socket::Status(SocketId id, int32_t* nref) { - const butil::ResourceId slot = SlotOfSocketId(id); + const butil::ResourceId slot = SlotOfVRefId(id); Socket* const m = address_resource(slot); if (m != NULL) { - const uint64_t vref = m->_versioned_ref.load(butil::memory_order_relaxed); - if (VersionOfVRef(vref) == VersionOfSocketId(id)) { + const uint64_t vref = m->versioned_ref(); + if (VersionOfVRef(vref) == VersionOfVRefId(id)) { if (nref) { *nref = NRefOfVRef(vref); } return 0; - } else if (VersionOfVRef(vref) == VersionOfSocketId(id) + 1) { + } else if (VersionOfVRef(vref) == VersionOfVRefId(id) + 1) { if (nref) { *nref = NRefOfVRef(vref); } @@ -1120,81 +1145,6 @@ int Socket::Status(SocketId id, int32_t* nref) { return -1; } -void Socket::OnRecycle() { - const bool create_by_connect = CreatedByConnect(); - if (_app_connect) { - std::shared_ptr tmp; - _app_connect.swap(tmp); - tmp->StopConnect(this); - } - if (_conn) { - SocketConnection* const saved_conn = _conn; - _conn = NULL; - saved_conn->BeforeRecycle(this); - } - if (_user) { - SocketUser* const saved_user = _user; - _user = NULL; - saved_user->BeforeRecycle(this); - } - SharedPart* sp = _shared_part.exchange(NULL, butil::memory_order_acquire); - if (sp) { - sp->RemoveRefManually(); - } - const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed); - if (ValidFileDescriptor(prev_fd)) { - if (_on_edge_triggered_events != NULL) { - GetGlobalEventDispatcher(prev_fd, _bthread_tag).RemoveConsumer(prev_fd); - } - close(prev_fd); - if (create_by_connect) { - g_vars->channel_conn << -1; - } - } - -#if BRPC_WITH_RDMA - if (_rdma_ep) { - delete _rdma_ep; - _rdma_ep = NULL; - _rdma_state = RDMA_UNKNOWN; - } -#endif - - reset_parsing_context(NULL); - _read_buf.clear(); - - _auth_flag_error.store(0, butil::memory_order_relaxed); - bthread_id_error(_auth_id, 0); - - bthread_id_list_destroy(&_id_wait_list); - - if (_ssl_session) { - SSL_free(_ssl_session); - _ssl_session = NULL; - } - - _ssl_ctx = NULL; - - delete _pipeline_q; - _pipeline_q = NULL; - - delete _auth_context; - _auth_context = NULL; - - delete _stream_set; - _stream_set = NULL; - - const SocketId asid = _agent_socket_id.load(butil::memory_order_relaxed); - if (asid != INVALID_SOCKET_ID) { - SocketUniquePtr ptr; - if (Socket::Address(asid, &ptr) == 0) { - ptr->ReleaseAdditionalReference(); - } - } - - g_vars->nsocket << -1; -} - void* Socket::ProcessEvent(void* arg) { // the enclosed Socket is valid and free to access inside this function. SocketUniquePtr s(static_cast(arg)); @@ -1272,8 +1222,7 @@ int Socket::WaitEpollOut(int fd, bool pollin, const timespec* abstime) { // Do not need to check addressable since it will be called by // health checker which called `SetFailed' before const int expected_val = _epollout_butex->load(butil::memory_order_relaxed); - EventDispatcher& edisp = GetGlobalEventDispatcher(fd, _bthread_tag); - if (edisp.RegisterEvent(id(), fd, pollin) != 0) { + if (_io_event.RegisterEvent(fd, pollin) != 0) { return -1; } @@ -1285,7 +1234,7 @@ int Socket::WaitEpollOut(int fd, bool pollin, const timespec* abstime) { } // Ignore return value since `fd' might have been removed // by `RemoveConsumer' in `SetFailed' - butil::ignore_result(edisp.UnregisterEvent(id(), fd, pollin)); + butil::ignore_result(_io_event.UnregisterEvent(fd, pollin)); errno = saved_errno; // Could be writable or spurious wakeup (by former epollout) return rc; @@ -1333,7 +1282,7 @@ int Socket::Connect(const timespec* abstime, // be added into epoll device soon SocketId connect_id; SocketOptions options; - options.bthread_tag = _bthread_tag; + options.bthread_tag = _io_event.bthread_tag(); options.user = req; if (Socket::Create(options, &connect_id) != 0) { LOG(FATAL) << "Fail to create Socket"; @@ -1348,8 +1297,7 @@ int Socket::Connect(const timespec* abstime, // Add `sockfd' into epoll so that `HandleEpollOutRequest' will // be called with `req' when epoll event reaches - if (GetGlobalEventDispatcher(sockfd, _bthread_tag).RegisterEvent(connect_id, sockfd, false) != - 0) { + if (s->_io_event.RegisterEvent(sockfd, false) != 0) { const int saved_errno = errno; PLOG(WARNING) << "Fail to add fd=" << sockfd << " into epoll"; s->SetFailed(saved_errno, "Fail to add fd=%d into epoll: %s", @@ -1417,7 +1365,7 @@ int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) { return 0; } // Set tag for client side socket - _bthread_tag = bthread_self_tag(); + _io_event.set_bthread_tag(bthread_self_tag()); // Have to hold a reference for `req' SocketUniquePtr s; ReAddress(&s); @@ -1440,7 +1388,9 @@ void Socket::WakeAsEpollOut() { bthread::butex_wake_except(_epollout_butex, 0); } -int Socket::HandleEpollOut(SocketId id) { +int Socket::OnOutputEvent(void* user_data, uint32_t, + const bthread_attr_t&) { + auto id = reinterpret_cast(user_data); SocketUniquePtr s; // Since Sockets might have been `SetFailed' before they were // added into epoll, these sockets miss the signal inside @@ -1486,7 +1436,7 @@ int Socket::HandleEpollOutRequest(int error_code, EpollOutRequest* req) { } // We've got the right to call user callback // The timer will be removed inside destructor of EpollOutRequest - GetGlobalEventDispatcher(req->fd, _bthread_tag).UnregisterEvent(id(), req->fd, false); + butil::ignore_result(_io_event.UnregisterEvent(req->fd, false)); return req->on_epollout_event(req->fd, error_code, req->data); } @@ -2229,8 +2179,9 @@ AuthContext* Socket::mutable_auth_context() { return _auth_context; } -int Socket::StartInputEvent(SocketId id, uint32_t events, - const bthread_attr_t& thread_attr) { +int Socket::OnInputEvent(void* user_data, uint32_t events, + const bthread_attr_t& thread_attr) { + auto id = reinterpret_cast(user_data); SocketUniquePtr s; if (Address(id, &s) < 0) { return -1; @@ -2324,7 +2275,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { // } os << "# This is a broken Socket\n"; } - const uint64_t vref = ptr->_versioned_ref.load(butil::memory_order_relaxed); + const uint64_t vref = ptr->versioned_ref(); size_t npipelined = 0; size_t idsizes[4]; size_t nidsize = 0; @@ -2384,7 +2335,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { << "\nlocal_side=" << ptr->_local_side << "\non_et_events=" << (void*)ptr->_on_edge_triggered_events << "\nuser=" << ShowObject(ptr->_user) - << "\nthis_id=" << ptr->_this_id + << "\nthis_id=" << ptr->id() << "\npreferred_index=" << preferred_index; InputMessenger* messenger = dynamic_cast(ptr->user()); if (messenger != NULL) { @@ -2429,7 +2380,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { << "\nauth_context=" << ptr->_auth_context << "\nlogoff_flag=" << ptr->_logoff_flag.load(butil::memory_order_relaxed) << "\n_additional_ref_status=" - << ptr->_additional_ref_status.load(butil::memory_order_relaxed) + << ptr->additional_ref_status() << "\ntotal_streams_buffer_size=" << ptr->_total_streams_unconsumed_size.load(butil::memory_order_relaxed) << "\nninflight_app_health_check=" @@ -2570,7 +2521,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { ptr->_rdma_ep->DebugInfo(os); } #endif - { os << "\nbthread_tag=" << ptr->_bthread_tag; } + { os << "\nbthread_tag=" << ptr->_io_event.bthread_tag(); } } int Socket::CheckHealth() { @@ -2620,7 +2571,7 @@ void Socket::ResetAllStreams(int error_code, const std::string& error_text) { if (_stream_set != NULL) { // Not delete _stream_set because there are likely more streams added // after reviving if the Socket is still in use, or it is to be deleted in - // OnRecycle() + // BeforeRecycled() saved_stream_set.swap(*_stream_set); } _stream_mutex.unlock(); @@ -3000,25 +2951,6 @@ void Socket::OnProgressiveReadCompleted() { } } -std::string Socket::description() const { - // NOTE: The output should be consistent with operator<<() - std::string result; - result.reserve(64); - butil::string_appendf(&result, "Socket{id=%" PRIu64, id()); - const int saved_fd = fd(); - if (saved_fd >= 0) { - butil::string_appendf(&result, " fd=%d", saved_fd); - } - butil::string_appendf(&result, " addr=%s", - butil::endpoint2str(remote_side()).c_str()); - const int local_port = local_side().port; - if (local_port > 0) { - butil::string_appendf(&result, ":%d", local_port); - } - butil::string_appendf(&result, "} (0x%p)", this); - return result; -} - SocketSSLContext::SocketSSLContext() : raw_ctx(NULL) {} diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 97ce568522..6d9c8f113c 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -38,7 +38,9 @@ #include "brpc/socket_id.h" // SocketId #include "brpc/socket_message.h" // SocketMessagePtr #include "bvar/bvar.h" -#include "http_method.h" +#include "brpc/http_method.h" +#include "brpc/event_dispatcher.h" +#include "brpc/versioned_ref_with_id.h" namespace brpc { namespace policy { @@ -282,7 +284,7 @@ struct SocketOptions { // Abstractions on reading from and writing into file descriptors. // NOTE: accessed by multiple threads(frequently), align it by cacheline. -class BAIDU_CACHELINE_ALIGNMENT/*note*/ Socket { +class BAIDU_CACHELINE_ALIGNMENT/*note*/ Socket : public VersionedRefWithId { friend class EventDispatcher; friend class InputMessenger; friend class Acceptor; @@ -299,16 +301,18 @@ friend class HealthCheckTask; friend class OnAppHealthCheckDone; friend class HealthCheckManager; friend class policy::H2GlobalStreamCreator; +friend class VersionedRefWithId; +friend class IOEvent; +friend void DereferenceSocket(Socket*); class SharedPart; - struct Forbidden {}; struct WriteRequest; public: const static int STREAM_FAKE_FD = INT_MAX; // NOTE: User cannot create Socket from constructor. Use Create() // instead. It's public just because of requirement of ResourcePool. - Socket(Forbidden); - ~Socket(); + explicit Socket(Forbidden); + ~Socket() override; // Write `msg' into this Socket and clear it. The `msg' should be an // intact request or response. To prevent messages from interleaving @@ -419,9 +423,6 @@ friend class policy::H2GlobalStreamCreator; // After health checking is complete, set _hc_started to false. void AfterHCCompleted() { _hc_started.store(false, butil::memory_order_relaxed); } - // The unique identifier. - SocketId id() const { return _this_id; } - // `user' parameter passed to Create(). SocketUser* user() const { return _user; } @@ -446,24 +447,9 @@ friend class policy::H2GlobalStreamCreator; AuthContext* mutable_auth_context(); // Create a Socket according to `options', put the identifier into `id'. - // Returns 0 on sucess, -1 otherwise. + // Returns 0 on success, -1 otherwise. static int Create(const SocketOptions& options, SocketId* id); - // Place the Socket associated with identifier `id' into unique_ptr `ptr', - // which will be released automatically when out of scope (w/o explicit - // std::move). User can still access `ptr' after calling ptr->SetFailed() - // before release of `ptr'. - // This function is wait-free. - // Returns 0 on success, -1 when the Socket was SetFailed(). - static int Address(SocketId id, SocketUniquePtr* ptr); - - // Re-address current socket into `ptr'. - // Always succeed even if this socket is failed. - void ReAddress(SocketUniquePtr* ptr); - - // Returns 0 on success, 1 on failed socket, -1 on recycled. - static int AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr); - // Mark this Socket or the Socket associated with `id' as failed. // Any later Address() of the identifier shall return NULL unless the // Socket was revivied by StartHealthCheck. The Socket is NOT recycled @@ -486,20 +472,10 @@ friend class policy::H2GlobalStreamCreator; void FeedbackCircuitBreaker(int error_code, int64_t latency_us); - bool Failed() const; - - bool DidReleaseAdditionalRereference() const { - return _additional_ref_status.load(butil::memory_order_relaxed) == REF_RECYCLED; - } - // Notify `id' object (by calling bthread_id_error) when this Socket // has been `SetFailed'. If it already has, notify `id' immediately void NotifyOnFailed(bthread_id_t id); - // Release the additional reference which added inside `Create' - // before so that `Socket' will be recycled automatically once - // on one is addressing it. - int ReleaseAdditionalReference(); // `ReleaseAdditionalReference' this Socket iff it has no data // transmission during the last `idle_seconds' int ReleaseReferenceIfIdle(int idle_seconds); @@ -515,8 +491,8 @@ friend class policy::H2GlobalStreamCreator; // Start to process edge-triggered events from the fd. // This function does not block caller. - static int StartInputEvent(SocketId id, uint32_t events, - const bthread_attr_t& thread_attr); + static int OnInputEvent(void* user_data, uint32_t events, + const bthread_attr_t& thread_attr); static const int PROGRESS_INIT = 1; bool MoreReadEvents(int* progress); @@ -654,9 +630,6 @@ friend class policy::H2GlobalStreamCreator; _last_writetime_us.load(butil::memory_order_relaxed)); } - // A brief description of this socket, consistent with os << *this - std::string description() const; - // Returns true if the remote side is overcrowded. bool is_overcrowded() const { return _overcrowded; } @@ -678,8 +651,20 @@ friend class policy::H2GlobalStreamCreator; int ConductError(bthread_id_t); int StartWrite(WriteRequest*, const WriteOptions&); - int Dereference(); -friend void DereferenceSocket(Socket*); + // Create a Socket according to `options', put the identifier into `id'. + // Returns 0 on success, -1 otherwise. + int OnCreated(const SocketOptions& options); + + // Called before returning to pool. + void BeforeRecycled(); + + void OnFailed(int error_code, const std::string& error_text); + + // Make this socket addressable again. + void AfterRevived(); + + std::string OnDescription() const; + static int Status(SocketId, int32_t* nref = NULL); // for unit-test. @@ -701,9 +686,6 @@ friend void DereferenceSocket(Socket*); // success, -1 otherwise and errno is set ssize_t DoWrite(WriteRequest* req); - // Called before returning to pool. - void OnRecycle(); - // [Not thread-safe] Wait for EPOLLOUT event on `fd'. If `pollin' is // true, EPOLLIN event will also be included and EPOLL_CTL_MOD will // be used instead of EPOLL_CTL_ADD. Note that spurious wakeups may @@ -735,9 +717,6 @@ friend void DereferenceSocket(Socket*); // Wait until nref hits `expected_nref' and reset some internal resources. int WaitAndReset(int32_t expected_nref); - // Make this socket addressable again. - void Revive(); - static void* ProcessEvent(void*); static void* KeepWrite(void*); @@ -755,8 +734,9 @@ friend void DereferenceSocket(Socket*); // Try to wake socket just like epollout has arrived void WakeAsEpollOut(); - // Generic callback for Socket to handle epollout event - static int HandleEpollOut(SocketId socket_id); + // Generic callback for Socket to handle output event. + static int OnOutputEvent(void* user_data, uint32_t, + const bthread_attr_t&); class EpollOutRequest; // Callback to handle epollout event whose request data @@ -811,16 +791,6 @@ friend void DereferenceSocket(Socket*); void CancelUnwrittenBytes(size_t bytes); private: - // unsigned 32-bit version + signed 32-bit referenced-count. - // Meaning of version: - // * Created version: no SetFailed() is called on the Socket yet. Must be - // same evenness with initial _versioned_ref because during lifetime of - // a Socket on the slot, the version is added with 1 twice. This is - // also the version encoded in SocketId. - // * Failed version: = created version + 1, SetFailed()-ed but returned. - // * Other versions: the socket is already recycled. - butil::atomic _versioned_ref; - // In/Out bytes/messages, SocketPool etc // _shared_part is shared by a main socket and all its pooled sockets. // Can't use intrusive_ptr because the creation is based on optimistic @@ -838,7 +808,6 @@ friend void DereferenceSocket(Socket*); // [ Set in ResetFileDescriptor ] butil::atomic _fd; // -1 when not connected. - bthread_tag_t _bthread_tag; // bthread tag of this socket int _tos; // Type of service which is actually only 8bits. int64_t _reset_fd_real_us; // When _fd was reset, in microseconds. @@ -864,8 +833,7 @@ friend void DereferenceSocket(Socket*); // Initialized by SocketOptions.app_connect. std::shared_ptr _app_connect; - // Identifier of this Socket in ResourcePool - SocketId _this_id; + IOEvent _io_event; // last chosen index of the protocol as a heuristic value to avoid // iterating all protocol handlers each time. @@ -953,21 +921,6 @@ friend void DereferenceSocket(Socket*); // Set by SetLogOff butil::atomic _logoff_flag; - // Status flag used to mark that - enum AdditionalRefStatus { - REF_USING, // additional reference has been increased - REF_REVIVING, // additional reference is increasing - REF_RECYCLED // additional reference has been decreased - }; - - // Indicates whether additional reference has increased, - // decreased, or is increasing. - // additional ref status: - // `Socket'、`Create': REF_USING - // `SetFailed': REF_USING -> REF_RECYCLED - // `Revive' REF_RECYCLED -> REF_REVIVING -> REF_USING - butil::atomic _additional_ref_status; - // Concrete error information from SetFailed() // Accesses to these 2 fields(especially _error_text) must be protected // by _id_wait_list_mutex diff --git a/src/brpc/socket_id.h b/src/brpc/socket_id.h index 720432473f..e000c98b69 100644 --- a/src/brpc/socket_id.h +++ b/src/brpc/socket_id.h @@ -22,9 +22,7 @@ // To brpc developers: This is a header included by user, don't depend // on internal structures, use opaque pointers instead. -#include // uint64_t -#include "butil/unique_ptr.h" // std::unique_ptr - +#include "brpc/versioned_ref_with_id.h" namespace brpc { @@ -32,21 +30,25 @@ namespace brpc { // Users shall store SocketId instead of Sockets and call Socket::Address() // to convert the identifier to an unique_ptr at each access. Whenever a // unique_ptr is not destructed, the enclosed Socket will not be recycled. -typedef uint64_t SocketId; +typedef VRefId SocketId; -const SocketId INVALID_SOCKET_ID = (SocketId)-1; +const SocketId INVALID_SOCKET_ID = INVALID_VREF_ID; class Socket; extern void DereferenceSocket(Socket*); -struct SocketDeleter { +// Explicit (full) template specialization to ignore compiler error, +// because Socket is an incomplete type where only this header is included. +template<> +struct VersionedRefWithIdDeleter { void operator()(Socket* m) const { DereferenceSocket(m); } }; -typedef std::unique_ptr SocketUniquePtr; +typedef VersionedRefWithIdUniquePtr SocketUniquePtr; + } // namespace brpc diff --git a/src/brpc/socket_inl.h b/src/brpc/socket_inl.h index 6af9e8f19f..a8ff3ce8ff 100644 --- a/src/brpc/socket_inl.h +++ b/src/brpc/socket_inl.h @@ -23,35 +23,6 @@ namespace brpc { -// Utility functions to combine and extract SocketId. -BUTIL_FORCE_INLINE SocketId -MakeSocketId(uint32_t version, butil::ResourceId slot) { - return SocketId((((uint64_t)version) << 32) | slot.value); -} - -BUTIL_FORCE_INLINE butil::ResourceId SlotOfSocketId(SocketId sid) { - butil::ResourceId id = { (sid & 0xFFFFFFFFul) }; - return id; -} - -BUTIL_FORCE_INLINE uint32_t VersionOfSocketId(SocketId sid) { - return (uint32_t)(sid >> 32); -} - -// Utility functions to combine and extract Socket::_versioned_ref -BUTIL_FORCE_INLINE uint32_t VersionOfVRef(uint64_t vref) { - return (uint32_t)(vref >> 32); -} - -BUTIL_FORCE_INLINE int32_t NRefOfVRef(uint64_t vref) { - return (int32_t)(vref & 0xFFFFFFFFul); -} - -BUTIL_FORCE_INLINE uint64_t MakeVRef(uint32_t version, int32_t nref) { - // 1: Intended conversion to uint32_t, nref=-1 is 00000000FFFFFFFF - return (((uint64_t)version) << 32) | (uint32_t/*1*/)nref; -} - inline SocketOptions::SocketOptions() : fd(-1) , user(NULL) @@ -63,169 +34,7 @@ inline SocketOptions::SocketOptions() , conn(NULL) , app_connect(NULL) , initial_parsing_context(NULL) - , bthread_tag(BTHREAD_TAG_DEFAULT) -{} - -inline int Socket::Dereference() { - const SocketId id = _this_id; - const uint64_t vref = _versioned_ref.fetch_sub( - 1, butil::memory_order_release); - const int32_t nref = NRefOfVRef(vref); - if (nref > 1) { - return 0; - } - if (__builtin_expect(nref == 1, 1)) { - const uint32_t ver = VersionOfVRef(vref); - const uint32_t id_ver = VersionOfSocketId(id); - // Besides first successful SetFailed() adds 1 to version, one of - // those dereferencing nref from 1->0 adds another 1 to version. - // Notice "one of those": The wait-free Address() may make ref of a - // version-unmatched slot change from 1 to 0 for mutiple times, we - // have to use version as a guard variable to prevent returning the - // Socket to pool more than once. - // - // Note: `ver == id_ver' means this socket has been `SetRecycle' - // before rather than `SetFailed'; `ver == ide_ver+1' means we - // had `SetFailed' this socket before. We should destroy the - // socket under both situation - if (__builtin_expect(ver == id_ver || ver == id_ver + 1, 1)) { - // sees nref:1->0, try to set version=id_ver+2,--nref. - // No retry: if version changes, the slot is already returned by - // another one who sees nref:1->0 concurrently; if nref changes, - // which must be non-zero, the slot will be returned when - // nref changes from 1->0 again. - // Example: - // SetFailed(): --nref, sees nref:1->0 (1) - // try to set version=id_ver+2 (2) - // Address(): ++nref, unmatched version (3) - // --nref, sees nref:1->0 (4) - // try to set version=id_ver+2 (5) - // 1,2,3,4,5 or 1,3,4,2,5: - // SetFailed() succeeds, Address() fails at (5). - // 1,3,2,4,5: SetFailed() fails with (2), the slot will be - // returned by (5) of Address() - // 1,3,4,5,2: SetFailed() fails with (2), the slot is already - // returned by (5) of Address(). - uint64_t expected_vref = vref - 1; - if (_versioned_ref.compare_exchange_strong( - expected_vref, MakeVRef(id_ver + 2, 0), - butil::memory_order_acquire, - butil::memory_order_relaxed)) { - OnRecycle(); - return_resource(SlotOfSocketId(id)); - return 1; - } - return 0; - } - LOG(FATAL) << "Invalid SocketId=" << id; - return -1; - } - LOG(FATAL) << "Over dereferenced SocketId=" << id; - return -1; -} - -inline int Socket::Address(SocketId id, SocketUniquePtr* ptr) { - const butil::ResourceId slot = SlotOfSocketId(id); - Socket* const m = address_resource(slot); - if (__builtin_expect(m != NULL, 1)) { - // acquire fence makes sure this thread sees latest changes before - // Dereference() or Revive(). - const uint64_t vref1 = m->_versioned_ref.fetch_add( - 1, butil::memory_order_acquire); - const uint32_t ver1 = VersionOfVRef(vref1); - if (ver1 == VersionOfSocketId(id)) { - ptr->reset(m); - return 0; - } - - const uint64_t vref2 = m->_versioned_ref.fetch_sub( - 1, butil::memory_order_release); - const int32_t nref = NRefOfVRef(vref2); - if (nref > 1) { - return -1; - } else if (__builtin_expect(nref == 1, 1)) { - const uint32_t ver2 = VersionOfVRef(vref2); - if ((ver2 & 1)) { - if (ver1 == ver2 || ver1 + 1 == ver2) { - uint64_t expected_vref = vref2 - 1; - if (m->_versioned_ref.compare_exchange_strong( - expected_vref, MakeVRef(ver2 + 1, 0), - butil::memory_order_acquire, - butil::memory_order_relaxed)) { - m->OnRecycle(); - return_resource(SlotOfSocketId(id)); - } - } else { - CHECK(false) << "ref-version=" << ver1 - << " unref-version=" << ver2; - } - } else { - CHECK_EQ(ver1, ver2); - // Addressed a free slot. - } - } else { - CHECK(false) << "Over dereferenced SocketId=" << id; - } - } - return -1; -} - -inline void Socket::ReAddress(SocketUniquePtr* ptr) { - _versioned_ref.fetch_add(1, butil::memory_order_acquire); - ptr->reset(this); -} - -inline int Socket::AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr) { - const butil::ResourceId slot = SlotOfSocketId(id); - Socket* const m = address_resource(slot); - if (__builtin_expect(m != NULL, 1)) { - const uint64_t vref1 = m->_versioned_ref.fetch_add( - 1, butil::memory_order_acquire); - const uint32_t ver1 = VersionOfVRef(vref1); - if (ver1 == VersionOfSocketId(id)) { - ptr->reset(m); - return 0; - } - if (ver1 == VersionOfSocketId(id) + 1) { - ptr->reset(m); - return 1; - } - - const uint64_t vref2 = m->_versioned_ref.fetch_sub( - 1, butil::memory_order_release); - const int32_t nref = NRefOfVRef(vref2); - if (nref > 1) { - return -1; - } else if (__builtin_expect(nref == 1, 1)) { - const uint32_t ver2 = VersionOfVRef(vref2); - if ((ver2 & 1)) { - if (ver1 == ver2 || ver1 + 1 == ver2) { - uint64_t expected_vref = vref2 - 1; - if (m->_versioned_ref.compare_exchange_strong( - expected_vref, MakeVRef(ver2 + 1, 0), - butil::memory_order_acquire, - butil::memory_order_relaxed)) { - m->OnRecycle(); - return_resource(slot); - } - } else { - CHECK(false) << "ref-version=" << ver1 - << " unref-version=" << ver2; - } - } else { - // Addressed a free slot. - } - } else { - CHECK(false) << "Over dereferenced SocketId=" << id; - } - } - return -1; -} - -inline bool Socket::Failed() const { - return VersionOfVRef(_versioned_ref.load(butil::memory_order_relaxed)) - != VersionOfSocketId(_this_id); -} + , bthread_tag(BTHREAD_TAG_DEFAULT) {} inline bool Socket::MoreReadEvents(int* progress) { // Fail to CAS means that new events arrived. diff --git a/src/brpc/versioned_ref_with_id.h b/src/brpc/versioned_ref_with_id.h new file mode 100644 index 0000000000..38141f3dbc --- /dev/null +++ b/src/brpc/versioned_ref_with_id.h @@ -0,0 +1,624 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef BRPC_VERSIONED_REF_WITH_ID_H +#define BRPC_VERSIONED_REF_WITH_ID_H + +#include +#include "butil/resource_pool.h" +#include "butil/class_name.h" +#include "butil/logging.h" +#include "bthread/bthread.h" +#include "brpc/errno.pb.h" + +namespace brpc { + +// Unique identifier of a T object. +typedef uint64_t VRefId; + +const VRefId INVALID_VREF_ID = (VRefId)-1; + +template +class VersionedRefWithId; + +template +void DereferenceVersionedRefWithId(T* r); + +template +struct VersionedRefWithIdDeleter { + void operator()(T* r) const { + DereferenceVersionedRefWithId(r); + } +}; + +template +using VersionedRefWithIdUniquePtr = + std::unique_ptr>; + +// Utility functions to combine and extract VRefId. +template +BUTIL_FORCE_INLINE VRefId MakeVRefId(uint32_t version, + butil::ResourceId slot) { + return VRefId((((uint64_t)version) << 32) | slot.value); +} + +template +BUTIL_FORCE_INLINE butil::ResourceId SlotOfVRefId(VRefId vref_id) { + return { (vref_id & 0xFFFFFFFFul) }; +} + +BUTIL_FORCE_INLINE uint32_t VersionOfVRefId(VRefId vref_id) { + return (uint32_t)(vref_id >> 32); +} + +// Utility functions to combine and extract _versioned_ref +BUTIL_FORCE_INLINE uint32_t VersionOfVRef(uint64_t vref) { + return (uint32_t)(vref >> 32); +} + +BUTIL_FORCE_INLINE int32_t NRefOfVRef(uint64_t vref) { + return (int32_t)(vref & 0xFFFFFFFFul); +} + +BUTIL_FORCE_INLINE uint64_t MakeVRef(uint32_t version, int32_t nref) { + // 1: Intended conversion to uint32_t, nref=-1 is 00000000FFFFFFFF + return (((uint64_t)version) << 32) | (uint32_t/*1*/)nref; +} + + +template +typename std::enable_if::value, Ret>::type ReturnEmpty() { + return Ret{}; +} + +template +typename std::enable_if::value, Ret>::type ReturnEmpty() {} + +// Call func_name of class_type if class_type implements func_name, +// otherwise call default function. +#define WRAPPER_OF(class_type, func_name, return_type) \ + struct func_name ## Wrapper { \ + template \ + static auto Test(int) -> decltype( \ + std::declval().func_name(std::declval()...), std::true_type()); \ + template \ + static auto Test(...) -> std::false_type; \ + \ + template \ + typename std::enable_if(0))::value, return_type>::type \ + Call(class_type* obj, Args... args) { \ + BAIDU_CASSERT((butil::is_result_same< \ + return_type, decltype(&T::func_name), T, Args...>::value), \ + "Params or return type mismatch"); \ + return obj->func_name(std::forward(args)...); \ + } \ + \ + template \ + typename std::enable_if(0))::value, return_type>::type \ + Call(class_type* obj, Args... args) { \ + return ReturnEmpty(); \ + } \ + } + +#define WRAPPER_CALL(func_name, obj, ...) func_name ## Wrapper().Call(obj, ## __VA_ARGS__) + +// VersionedRefWithId is an efficient data structure, which can be find +// in O(1)-time by VRefId. +// Users shall call VersionedRefWithId::Create() to create T, +// store VRefId instead of T and call VersionedRefWithId::Address() +// to convert the identifier to an unique_ptr at each access. Whenever +// a unique_ptr is not destructed, the enclosed T will not be recycled. +// +// CRTP +// Derived classes implement 6 functions : +// 1. (required) int OnCreated(Args... args) : +// Will be called in Create() to initialize T init when T is created successfully. +// If initialization fails, return non-zero. VersionedRefWithId will be `SetFailed' +// and Create() returns non-zero. +// 2. (required) void BeforeRecycled() : +// Will be called in Dereference() before T is recycled. +// 3. (optional) void OnFailed(Args... args) : +// Will be called in SetFailed() when VersionedRefWithId is set failed successfully. +// 4. (optional) void BeforeAdditionalRefReleased() : +// Will be called in ReleaseAdditionalReference() before additional ref is released. +// 5. (optional) void AfterRevived() : +// Will be called in Revive() When VersionedRefWithId is revived. +// 6. (optional) std::string OnDescription() const : +// Will be called in description(). +// +// Example usage: +// +// class UserData : public brpc::VersionedRefWithId { +// public: +// explicit UserData(Forbidden f) +// : brpc::VersionedRefWithId(f) +// , _count(0) {} + +// void Add(int c) { +// _count.fetch_add(c, butil::memory_order_relaxed); +// } +// void Sub(int c) { +// _count.fetch_sub(c, butil::memory_order_relaxed); +// } +// private: +// friend class brpc::VersionedRefWithId; +// +// int OnCreated() { +// _count.store(1, butil::memory_order_relaxed); +// return 0; +// } +// void OnFailed(int error_code, const std::string& error_text) { +// _count.fetch_sub(1, butil::memory_order_relaxed); +// } +// void BeforeRecycled() { +// _count.store(0, butil::memory_order_relaxed); +// } +// +// butil::atomic _count; +// }; +// +// typedef brpc::VRefId UserDataId; +// const brpc::VRefId INVALID_EVENT_DATA_ID = brpc::INVALID_VREF_ID; +// typedef brpc::VersionedRefWithIdUniquePtr UserDataUniquePtr; +// +// And to call methods on UserData: +// UserDataId id; +// if (UserData::Create(&id) ! =0) { +// LOG(ERROR) << "Fail to create UserData"; +// return; +// } +// UserDataUniquePtr user_data; +// if (UserData::Address(id, &user_data) != 0) { +// LOG(ERROR) << "Fail to address UserDataId=" << id; +// return; +// } +// user_data->Add(10); +// user_data->SetFailed(); +// UserData::SetFailedById(id); +// +template +class VersionedRefWithId { +protected: + struct Forbidden {}; + +public: + explicit VersionedRefWithId(Forbidden) + // Must be even because Address() relies on evenness of version. + : _versioned_ref(0) + , _this_id(0) + , _additional_ref_status(ADDITIONAL_REF_USING) {} + + virtual ~VersionedRefWithId() = default; + DISALLOW_COPY_AND_ASSIGN(VersionedRefWithId); + + // Create a VersionedRefWithId, put the identifier into `id'. + // `args' will be passed to OnCreated() directly. + // Returns 0 on success, -1 otherwise. + template + static int Create(VRefId* id, Args... args); + + // Place the VersionedRefWithId associated with identifier `id' into + // unique_ptr `ptr', which will be released automatically when out + // of scope (w/o explicit std::move). User can still access `ptr' + // after calling ptr->SetFailed() before release of `ptr'. + // This function is wait-free. + // Returns 0 on success, -1 when the Socket was SetFailed(). + static int Address(VRefId id, VersionedRefWithIdUniquePtr* ptr); + + // Returns 0 on success, 1 on failed socket, -1 on recycled. + static int AddressFailedAsWell(VRefId id, VersionedRefWithIdUniquePtr* ptr); + + // Re-address current VersionedRefWithId into `ptr'. + // Always succeed even if this socket is failed. + void ReAddress(VersionedRefWithIdUniquePtr* ptr); + + // Returns signed 32-bit referenced-count. + int32_t nref() const { + return NRefOfVRef(_versioned_ref.load(butil::memory_order_relaxed)); + } + + // Mark this VersionedRefWithId or the VersionedRefWithId associated + // with `id' as failed. + // Any later Address() of the identifier shall return NULL. The + // VersionedRefWithId is NOT recycled after calling this function, + // instead it will be recycled when no one references it. Internal + // fields of the Socket are still accessible after calling this + // function. Calling SetFailed() of a VersionedRefWithId more than + // once is OK. + // T::OnFailed() will be called when SetFailed() successfully. + // This function is lock-free. + // Returns -1 when the Socket was already SetFailed(), 0 otherwise. + template + static int SetFailedById(VRefId id, Args... args); + + template + int SetFailed(Args... args); + + bool Failed() const { + return VersionOfVRef(_versioned_ref.load(butil::memory_order_relaxed)) + != VersionOfVRefId(_this_id); + } + + // Release the additional reference which added inside `Create' + // before so that `VersionedRefWithId' will be recycled automatically + // once on one is addressing it. + int ReleaseAdditionalReference(); + + VRefId id() const { return _this_id; } + + // A brief description. + std::string description() const; + +protected: +friend void DereferenceVersionedRefWithId<>(T* r); + + // Status flag used to mark that + enum AdditionalRefStatus { + // 1. Additional reference has been increased; + ADDITIONAL_REF_USING, + // 2. Additional reference is increasing; + ADDITIONAL_REF_REVIVING, + // 3. Additional reference has been decreased. + ADDITIONAL_REF_RECYCLED + }; + + AdditionalRefStatus additional_ref_status() const { + return _additional_ref_status.load(butil::memory_order_relaxed); + } + + uint64_t versioned_ref() const { + // The acquire fence pairs with release fence in Dereference to avoid + // inconsistent states to be seen by others. + return _versioned_ref.load(butil::memory_order_acquire); + } + + template + int SetFailedImpl(Args... args); + + // Release the reference. If no one is addressing this VersionedRefWithId, + // it will be recycled automatically and T::BeforeRecycled() will be called. + int Dereference(); + + // Make this socket addressable again. + // If nref is less than `at_least_nref', VersionedRefWithId was + // abandoned during revival and cannot be revived. + void Revive(int32_t at_least_nref); + +private: + typedef butil::ResourceId resource_id_t; + + // 1. When `failed_as_well=true', returns 0 on success, + // 1 on failed socket, -1 on recycled. + // 2. When `failed_as_well=true', returns 0 on success, + // -1 when the Socket was SetFailed(). + static int AddressImpl(VRefId id, bool failed_as_well, + VersionedRefWithIdUniquePtr* ptr); + + // Callback wrapper of Derived classes. + WRAPPER_OF(T, OnFailed, void); + WRAPPER_OF(T, BeforeAdditionalRefReleased, void); + WRAPPER_OF(T, AfterRevived, void); + WRAPPER_OF(T, OnDescription, std::string); + + // unsigned 32-bit version + signed 32-bit referenced-count. + // Meaning of version: + // * Created version: no SetFailed() is called on the VersionedRefWithId yet. + // Must be same evenness with initial _versioned_ref because during lifetime + // of a VersionedRefWithId on the slot, the version is added with 1 twice. + // This is also the version encoded in VRefId. + // * Failed version: = created version + 1, SetFailed()-ed but returned. + // * Other versions: the socket is already recycled. + butil::atomic BAIDU_CACHELINE_ALIGNMENT _versioned_ref; + // The unique identifier. + VRefId _this_id; + // Indicates whether additional reference has increased, + // decreased, or is increasing. + // additional ref status: + // `Socket'、`Create': REF_USING + // `SetFailed': REF_USING -> REF_RECYCLED + // `Revive' REF_RECYCLED -> REF_REVIVING -> REF_USING + butil::atomic _additional_ref_status; +}; + +template +void DereferenceVersionedRefWithId(T* r) { + if (r) { + static_cast*>(r)->Dereference(); + } +} + +template +template +int VersionedRefWithId::Create(VRefId* id, Args... args) { + resource_id_t slot; + T* const t = butil::get_resource(&slot, Forbidden()); + if (t == NULL) { + LOG(FATAL) << "Fail to get_resource<" + << butil::class_name_str() << ">"; + return -1; + } + // nref can be non-zero due to concurrent Address(). + // _this_id will only be used in destructor/Destroy of referenced + // slots, which is safe and properly fenced. Although it's better + // to put the id into VersionedRefWithIdUniquePtr. + VersionedRefWithId* const vref_with_id = t; + vref_with_id->_this_id = MakeVRefId( + VersionOfVRef(vref_with_id->_versioned_ref.fetch_add( + 1, butil::memory_order_release)), slot); + vref_with_id->_additional_ref_status.store( + ADDITIONAL_REF_USING, butil::memory_order_relaxed); + BAIDU_CASSERT((butil::is_result_int::value), + "T::OnCreated must accept Args params and return int"); + // At last, call T::OnCreated() to initialize the T object. + if (t->OnCreated(std::forward(args)...) != 0) { + vref_with_id->SetFailed(); + // NOTE: This object may be recycled at this point, + // don't touch anything. + return -1; + } + *id = vref_with_id->_this_id; + return 0; +} + +template +int VersionedRefWithId::Address( + VRefId id, VersionedRefWithIdUniquePtr* ptr) { + return AddressImpl(id, false, ptr); +} + +template +int VersionedRefWithId::AddressFailedAsWell( + VRefId id, VersionedRefWithIdUniquePtr* ptr) { + return AddressImpl(id, true, ptr); +} + +template +int VersionedRefWithId::AddressImpl( + VRefId id, bool failed_as_well, VersionedRefWithIdUniquePtr* ptr) { + const resource_id_t slot = SlotOfVRefId(id); + T* const t = address_resource(slot); + if (__builtin_expect(t != NULL, 1)) { + // acquire fence makes sure this thread sees latest changes before + // Dereference() or Revive(). + VersionedRefWithId* const vref_with_id = t; + const uint64_t vref1 = vref_with_id->_versioned_ref.fetch_add( + 1, butil::memory_order_acquire); + const uint32_t ver1 = VersionOfVRef(vref1); + if (ver1 == VersionOfVRefId(id)) { + ptr->reset(t); + return 0; + } + if (failed_as_well && ver1 == VersionOfVRefId(id) + 1) { + ptr->reset(t); + return 1; + } + + const uint64_t vref2 = vref_with_id->_versioned_ref.fetch_sub( + 1, butil::memory_order_release); + const int32_t nref = NRefOfVRef(vref2); + if (nref > 1) { + return -1; + } else if (__builtin_expect(nref == 1, 1)) { + const uint32_t ver2 = VersionOfVRef(vref2); + if ((ver2 & 1)) { + if (ver1 == ver2 || ver1 + 1 == ver2) { + uint64_t expected_vref = vref2 - 1; + if (vref_with_id->_versioned_ref.compare_exchange_strong( + expected_vref, MakeVRef(ver2 + 1, 0), + butil::memory_order_acquire, + butil::memory_order_relaxed)) { + BAIDU_CASSERT((butil::is_result_void< + decltype(&T::BeforeRecycled), T>::value), + "T::BeforeRecycled must accept Args params" + " and return void"); + t->BeforeRecycled(); + return_resource(slot); + } + } else { + CHECK(false) << "ref-version=" << ver1 + << " unref-version=" << ver2; + } + } else { + // Addressed a free slot. + } + } else { + CHECK(false) << "Over dereferenced SocketId=" << id; + } + } + return -1; +} + +template +void VersionedRefWithId::ReAddress(VersionedRefWithIdUniquePtr* ptr) { + _versioned_ref.fetch_add(1, butil::memory_order_acquire); + ptr->reset(static_cast(this)); +} + +template +template +int VersionedRefWithId::SetFailedById(VRefId id, Args... args) { + VersionedRefWithIdUniquePtr ptr; + if (Address(id, &ptr) != 0) { + return -1; + } + return ptr->SetFailed(std::forward(args)...); +} + +template +template +int VersionedRefWithId::SetFailed(Args... args) { + return SetFailedImpl(std::forward(args)...); +} + +template +template +int VersionedRefWithId::SetFailedImpl(Args... args) { + const uint32_t id_ver = VersionOfVRefId(_this_id); + uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed); + for (;;) { + if (VersionOfVRef(vref) != id_ver) { + return -1; + } + // Try to set version=id_ver+1 (to make later address() return NULL), + // retry on fail. + if (_versioned_ref.compare_exchange_strong( + vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)), + butil::memory_order_release, + butil::memory_order_relaxed)) { + // Call T::OnFailed() to notify the failure of T. + WRAPPER_CALL(OnFailed, static_cast(this), std::forward(args)...); + // Deref additionally which is added at creation so that this + // queue's reference will hit 0(recycle) when no one addresses it. + ReleaseAdditionalReference(); + // NOTE: This object may be recycled at this point, don't + // touch anything. + return 0; + } + } +} + +template +int VersionedRefWithId::ReleaseAdditionalReference() { + do { + AdditionalRefStatus expect = ADDITIONAL_REF_USING; + if (_additional_ref_status.compare_exchange_strong( + expect, ADDITIONAL_REF_RECYCLED, + butil::memory_order_relaxed, + butil::memory_order_relaxed)) { + BeforeAdditionalRefReleasedWrapper(); + WRAPPER_CALL(BeforeAdditionalRefReleased, static_cast(this)); + return Dereference(); + } + + if (expect == ADDITIONAL_REF_REVIVING) { + // sched_yield to wait until status is not REF_REVIVING. + sched_yield(); + } else { + return -1; // REF_RECYCLED + } + } while (true); +} + +template +int VersionedRefWithId::Dereference() { + const VRefId id = _this_id; + const uint64_t vref = _versioned_ref.fetch_sub( + 1, butil::memory_order_release); + const int32_t nref = NRefOfVRef(vref); + if (nref > 1) { + return 0; + } + if (__builtin_expect(nref == 1, 1)) { + const uint32_t ver = VersionOfVRef(vref); + const uint32_t id_ver = VersionOfVRefId(id); + // Besides first successful SetFailed() adds 1 to version, one of + // those dereferencing nref from 1->0 adds another 1 to version. + // Notice "one of those": The wait-free Address() may make ref of a + // version-unmatched slot change from 1 to 0 for mutiple times, we + // have to use version as a guard variable to prevent returning the + // VersionedRefWithId to pool more than once. + // + // Note: `ver == id_ver' means this VersionedRefWithId has been `SetRecycle' + // before rather than `SetFailed'; `ver == ide_ver+1' means we + // had `SetFailed' this socket before. We should destroy the + // socket under both situation + if (__builtin_expect(ver == id_ver || ver == id_ver + 1, 1)) { + // sees nref:1->0, try to set version=id_ver+2,--nref. + // No retry: if version changes, the slot is already returned by + // another one who sees nref:1->0 concurrently; if nref changes, + // which must be non-zero, the slot will be returned when + // nref changes from 1->0 again. + // Example: + // SetFailed(): --nref, sees nref:1->0 (1) + // try to set version=id_ver+2 (2) + // Address(): ++nref, unmatched version (3) + // --nref, sees nref:1->0 (4) + // try to set version=id_ver+2 (5) + // 1,2,3,4,5 or 1,3,4,2,5: + // SetFailed() succeeds, Address() fails at (5). + // 1,3,2,4,5: SetFailed() fails with (2), the slot will be + // returned by (5) of Address() + // 1,3,4,5,2: SetFailed() fails with (2), the slot is already + // returned by (5) of Address(). + uint64_t expected_vref = vref - 1; + if (_versioned_ref.compare_exchange_strong( + expected_vref, MakeVRef(id_ver + 2, 0), + butil::memory_order_acquire, + butil::memory_order_relaxed)) { + static_cast(this)->BeforeRecycled(); + return_resource(SlotOfVRefId(id)); + return 1; + } + return 0; + } + LOG(FATAL) << "Invalid VRefId=" << id; + return -1; + } + LOG(FATAL) << "Over dereferenced VRefId=" << id; + return -1; +} + +template +void VersionedRefWithId::Revive(int32_t at_least_nref) { + const uint32_t id_ver = VersionOfVRefId(_this_id); + uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed); + _additional_ref_status.store( + ADDITIONAL_REF_REVIVING, butil::memory_order_relaxed); + while (true) { + CHECK_EQ(id_ver + 1, VersionOfVRef(vref)) << "id=" << id(); + + int32_t nref = NRefOfVRef(vref); + if (nref < at_least_nref) { + // Set the status to REF_RECYLED since no one uses this socket + _additional_ref_status.store( + ADDITIONAL_REF_RECYCLED, butil::memory_order_relaxed); + CHECK_EQ(1, nref); + LOG(WARNING) << description() << " was abandoned during revival"; + return; + } + // +1 is the additional ref added in Create(). TODO(gejun): we should + // remove this additional nref someday. + if (_versioned_ref.compare_exchange_weak( + vref, MakeVRef(id_ver, nref + 1/*note*/), + butil::memory_order_release, + butil::memory_order_relaxed)) { + // Set the status to REF_USING since we add additional ref again + _additional_ref_status.store( + ADDITIONAL_REF_USING, butil::memory_order_relaxed); + WRAPPER_CALL(AfterRevived, static_cast(this)); + return; + } + } +} + +template +std::string VersionedRefWithId::description() const { + std::string result; + result.reserve(128); + butil::string_appendf(&result, "Socket{id=%" PRIu64, id()); + result.append(WRAPPER_CALL( + OnDescription, const_cast(static_cast(this)))); + butil::string_appendf(&result, "} (0x%p)", this); + return result; +} + +} // namespace brpc + +#endif // BRPC_VERSIONED_REF_WITH_ID_H diff --git a/src/butil/memory/scope_guard.h b/src/butil/memory/scope_guard.h index 1771683f68..433a4b9971 100644 --- a/src/butil/memory/scope_guard.h +++ b/src/butil/memory/scope_guard.h @@ -19,18 +19,12 @@ #define BRPC_SCOPED_GUARD_H #include +#include "butil/type_traits.h" namespace butil { -// Whether a no-argument callable returns void. -template -struct returns_void_t - : public std::is_same()())> -{}; - template::value>::type> + typename = std::enable_if::value>> class ScopeGuard; template @@ -42,7 +36,7 @@ template class ScopeGuard { public: ScopeGuard(ScopeGuard&& other) noexcept - :_callback(std::move(other._callback)) + : _callback(std::move(other._callback)) , _dismiss(other._dismiss) { other.dismiss(); } diff --git a/src/butil/type_traits.h b/src/butil/type_traits.h index 5f342db3d9..3cf8473f56 100644 --- a/src/butil/type_traits.h +++ b/src/butil/type_traits.h @@ -341,7 +341,7 @@ template struct is_enum_impl : false_type { }; template struct is_enum : internal::is_enum_impl< - is_same::value || + is_same::value || is_integral::value || is_floating_point::value || is_reference::value || @@ -351,6 +351,40 @@ template struct is_enum : is_enum { }; template struct is_enum : is_enum { }; template struct is_enum : is_enum { }; +// Deduces the return type of an INVOKE expression +// at compile time. +// If the callable is non-static member function, +// the first argument should be the class type. +#if (__cplusplus >= 201703L) +// std::result_of is deprecated in C++17 and removed in C++20, +// use std::invoke_result instead. +template +struct result_of; +template +struct result_of : std::invoke_result {}; +#elif (__cplusplus >= 201103L) +template +using result_of = std::result_of; +#else +#error Only C++11 or later is supported. +#endif + +template +using result_of_t = typename result_of::type; + +// Whether a callable returns type which is same as ReturnType. +template +struct is_result_same + : public butil::is_same> {}; + +// Whether a callable returns void. +template +struct is_result_void : public is_result_same {}; + +// Whether a callable returns int. +template +struct is_result_int : public is_result_same {}; + } // namespace butil #endif // BUTIL_TYPE_TRAITS_H diff --git a/test/brpc_event_dispatcher_unittest.cpp b/test/brpc_event_dispatcher_unittest.cpp index d8f6984296..185e9f2dc8 100644 --- a/test/brpc_event_dispatcher_unittest.cpp +++ b/test/brpc_event_dispatcher_unittest.cpp @@ -27,18 +27,19 @@ #include "butil/time.h" #include "butil/macros.h" #include "butil/fd_utility.h" +#include "butil/memory/scope_guard.h" +#include "bthread/bthread.h" #include "brpc/event_dispatcher.h" +#include "brpc/socket.h" #include "brpc/details/has_epollrdhup.h" +#include "brpc/versioned_ref_with_id.h" class EventDispatcherTest : public ::testing::Test{ protected: - EventDispatcherTest(){ - }; - virtual ~EventDispatcherTest(){}; - virtual void SetUp() { - }; - virtual void TearDown() { - }; + EventDispatcherTest() = default; + ~EventDispatcherTest() override = default; + void SetUp() override {} + void TearDown() override {} }; TEST_F(EventDispatcherTest, has_epollrdhup) { @@ -52,6 +53,128 @@ TEST_F(EventDispatcherTest, versioned_ref) { ASSERT_EQ(brpc::MakeVRef(1, 1), versioned_ref); } +struct UserData; + +UserData* g_user_data = NULL; + +struct UserData : public brpc::VersionedRefWithId { + explicit UserData(Forbidden f) + : brpc::VersionedRefWithId(f) + , count(0) + , _additional_ref_released(false) {} + + int OnCreated() { + count.store(1, butil::memory_order_relaxed); + _additional_ref_released = false; + g_user_data = this; + return 0; + } + + void BeforeRecycled() { + count.store(0, butil::memory_order_relaxed); + g_user_data = NULL; + } + + void BeforeAdditionalRefReleased() { + _additional_ref_released = true; + } + + void OnFailed() { + count.fetch_sub(1, butil::memory_order_relaxed); + } + + void AfterRevived() { + count.fetch_add(1, butil::memory_order_relaxed); + _additional_ref_released = false; + } + + butil::atomic count; + bool _additional_ref_released; +}; + +// Unique identifier of a UserData. +// Users shall store UserDataId instead of UserData and call UserData::Address() +// to convert the identifier to an unique_ptr at each access. Whenever a +// unique_ptr is not destructed, the enclosed UserData will not be recycled. +typedef brpc::VRefId UserDataId; + +const brpc::VRefId INVALID_EVENT_DATA_ID = brpc::INVALID_VREF_ID; + +typedef brpc::VersionedRefWithIdUniquePtr UserDataUniquePtr; + +volatile bool vref_thread_stop = false; +butil::atomic g_count(1); + +void TestVRef(UserDataId id) { + UserDataUniquePtr ptr; + ASSERT_EQ(0, UserData::Address(id, &ptr)); + ptr->count.fetch_add(1, butil::memory_order_relaxed); + g_count.fetch_add(1, butil::memory_order_relaxed); +} + +void* VRefThread(void* arg) { + auto id = (UserDataId)arg; + while (!vref_thread_stop) { + TestVRef(id); + } + return NULL; +} + +TEST_F(EventDispatcherTest, versioned_ref_with_id) { + UserDataId id = INVALID_EVENT_DATA_ID; + ASSERT_EQ(0, UserData::Create(&id)); + ASSERT_NE(INVALID_EVENT_DATA_ID, id); + UserDataUniquePtr ptr; + ASSERT_EQ(0, UserData::Address(id, &ptr)); + ASSERT_EQ(2, ptr->nref()); + ASSERT_FALSE(ptr->Failed()); + ASSERT_EQ(1, ptr->count); + ASSERT_EQ(g_user_data, ptr.get()); + { + UserDataUniquePtr temp_ptr; + ASSERT_EQ(0, UserData::Address(id, &temp_ptr)); + ASSERT_EQ(ptr, temp_ptr); + ASSERT_EQ(3, ptr->nref()); + } + + const size_t thread_num = 8; + pthread_t tid[thread_num]; + for (auto& i : tid) { + ASSERT_EQ(0, pthread_create(&i, NULL, VRefThread, (void*)id)); + } + + sleep(2); + + vref_thread_stop = true; + for (const auto i : tid) { + pthread_join(i, NULL); + } + + ASSERT_EQ(2, ptr->nref()); + ASSERT_EQ(g_count, ptr->count); + ASSERT_EQ(0, ptr->SetFailed()); + ASSERT_TRUE(ptr->Failed()); + ASSERT_EQ(g_count - 1, ptr->count); + // Additional reference has been released. + ASSERT_TRUE(ptr->_additional_ref_released); + ASSERT_EQ(1, ptr->nref()); + { + UserDataUniquePtr temp_ptr; + ASSERT_EQ(1, UserData::AddressFailedAsWell(id, &temp_ptr)); + ASSERT_EQ(ptr, temp_ptr); + ASSERT_EQ(2, ptr->nref()); + } + ptr->Revive(1); + ASSERT_EQ(2, ptr->nref()); + ASSERT_EQ(g_count, ptr->count); + ASSERT_FALSE(ptr->_additional_ref_released); + ASSERT_EQ(0, UserData::SetFailedById(id)); + ASSERT_EQ(g_count - 1, ptr->count); + ptr.reset(); + ASSERT_EQ(nullptr, ptr); + ASSERT_EQ(nullptr, g_user_data); +} + std::vector err_fd; pthread_mutex_t err_fd_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -79,7 +202,7 @@ struct BAIDU_CACHELINE_ALIGNMENT SocketExtra : public brpc::SocketUser { times = 0; } - virtual void BeforeRecycle(brpc::Socket* m) { + void BeforeRecycle(brpc::Socket* m) override { pthread_mutex_lock(&rel_fd_mutex); rel_fd.push_back(m->fd()); pthread_mutex_unlock(&rel_fd_mutex); @@ -267,3 +390,130 @@ TEST_F(EventDispatcherTest, dispatch_tasks) { ASSERT_EQ(NCLIENT, info.free_item_num - old_info.free_item_num); #endif } + +// Unique identifier of a EventPipe. +// Users shall store EventFDId instead of EventPipe and call EventPipe::Address() +// to convert the identifier to an unique_ptr at each access. Whenever a +// unique_ptr is not destructed, the enclosed EventPipe will not be recycled. +typedef brpc::VRefId EventPipeId; + +const brpc::VRefId INVALID_EVENT_PIPE_ID = brpc::INVALID_VREF_ID; + +class EventPipe; +typedef brpc::VersionedRefWithIdUniquePtr EventPipeUniquePtr; + + +class EventPipe : public brpc::VersionedRefWithId { +public: + explicit EventPipe(Forbidden f) + : brpc::VersionedRefWithId(f) + , _pipe_fds{-1, -1} + , _input_event_count(0) + {} + + int Notify() { + char c = 0; + if (write(_pipe_fds[1], &c, 1) != 1) { + PLOG(ERROR) << "Fail to write to _pipe_fds[1]"; + return -1; + } + return 0; + } + +private: +friend class VersionedRefWithId; +friend class brpc::IOEvent; + + int OnCreated() { + if (pipe(_pipe_fds)) { + PLOG(FATAL) << "Fail to create _pipe_fds"; + return -1; + } + if (_io_event.Init((void*)id()) != 0) { + LOG(ERROR) << "Fail to init IOEvent"; + return -1; + } + _io_event.set_bthread_tag(bthread_self_tag()); + if (_io_event.AddConsumer(_pipe_fds[0]) != 0) { + PLOG(ERROR) << "Fail to add SocketId=" << id() + << " into EventDispatcher"; + return -1; + } + + + _input_event_count = 0; + return 0; + } + + void BeforeRecycled() { + brpc::GetGlobalEventDispatcher(_pipe_fds[0], bthread_self_tag()) + .RemoveConsumer(_pipe_fds[0]); + _io_event.Reset(); + if (_pipe_fds[0] >= 0) { + close(_pipe_fds[0]); + } + if (_pipe_fds[1] >= 0) { + close(_pipe_fds[1]); + } + } + + static int OnInputEvent(void* user_data, uint32_t, + const bthread_attr_t&) { + auto id = reinterpret_cast(user_data); + EventPipeUniquePtr ptr; + if (EventPipe::Address(id, &ptr) != 0) { + LOG(WARNING) << "Fail to address EventPipe"; + return -1; + } + + char buf[1024]; + ssize_t nr = read(ptr->_pipe_fds[0], &buf, arraysize(buf)); + if (nr <= 0) { + if (errno == EAGAIN) { + return 0; + } else { + PLOG(WARNING) << "Fail to read from _pipe_fds[0]"; + ptr->SetFailed(); + return -1; + } + } + + ptr->_input_event_count += nr; + return 0; + } + + static int OnOutputEvent(void*, uint32_t, + const bthread_attr_t&) { + EXPECT_TRUE(false) << "Should not be called"; + return 0; + } + + brpc::IOEvent _io_event; + int _pipe_fds[2]; + + size_t _input_event_count; +}; + +TEST_F(EventDispatcherTest, customize_dispatch_task) { + EventPipeId id = INVALID_EVENT_PIPE_ID; + ASSERT_EQ(0, EventPipe::Create(&id)); + ASSERT_NE(INVALID_EVENT_PIPE_ID, id); + EventPipeUniquePtr ptr; + ASSERT_EQ(0, EventPipe::Address(id, &ptr)); + ASSERT_EQ(2, ptr->nref()); + ASSERT_FALSE(ptr->Failed()); + + ASSERT_EQ((size_t)0, ptr->_input_event_count); + const size_t N = 10000; + for (size_t i = 0; i < N; ++i) { + ASSERT_EQ(0, ptr->Notify()); + } + usleep(1000 * 50); + ASSERT_EQ(N, ptr->_input_event_count); + + ASSERT_EQ(0, ptr->SetFailed()); + ASSERT_TRUE(ptr->Failed()); + ptr.reset(); + ASSERT_EQ(nullptr, ptr); + ASSERT_NE(0, EventPipe::Address(id, &ptr)); +} diff --git a/test/brpc_load_balancer_unittest.cpp b/test/brpc_load_balancer_unittest.cpp index d98c11b5df..14b0131cf4 100644 --- a/test/brpc_load_balancer_unittest.cpp +++ b/test/brpc_load_balancer_unittest.cpp @@ -1107,7 +1107,7 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) { { brpc::SocketUniquePtr dummy_ptr; ASSERT_EQ(1, brpc::Socket::AddressFailedAsWell(ptr[0]->id(), &dummy_ptr)); - dummy_ptr->Revive(); + dummy_ptr->Revive(2); } bthread_usleep(brpc::FLAGS_detect_available_server_interval_ms * 1000); // After one server is revived, the reject rate should be 50% diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp index 1176676c95..573ab2ed5b 100644 --- a/test/brpc_redis_unittest.cpp +++ b/test/brpc_redis_unittest.cpp @@ -1080,7 +1080,7 @@ class MultiCommandHandler : public brpc::RedisCommandHandler { brpc::RedisCommandHandlerResult Run(const std::vector& args, brpc::RedisReply* output, - bool flush_batched) { + bool flush_batched) override { output->SetStatus("OK"); return brpc::REDIS_CMD_CONTINUE; } @@ -1093,7 +1093,7 @@ class MultiCommandHandler : public brpc::RedisCommandHandler { public: brpc::RedisCommandHandlerResult Run(const std::vector& args, brpc::RedisReply* output, - bool flush_batched) { + bool flush_batched) override { if (args[0] == "multi") { output->SetError("ERR duplicate multi"); return brpc::REDIS_CMD_CONTINUE;