Skip to content

Commit

Permalink
EventDispatcher supports various IO types (#2560)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright authored Jun 3, 2024
1 parent 976c588 commit 122355a
Show file tree
Hide file tree
Showing 16 changed files with 1,531 additions and 723 deletions.
2 changes: 1 addition & 1 deletion src/brpc/acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
// Always add this socket into `_socket_map' whether it
// has been `SetFailed' or not, whether `Acceptor' is
// running or not. Otherwise, `Acceptor::BeforeRecycle'
// may be called (inside Socket::OnRecycle) after `Acceptor'
// may be called (inside Socket::BeforeRecycled) after `Acceptor'
// has been destroyed
am->_socket_map.insert(socket_id, ConnectStatistics());
}
Expand Down
3 changes: 2 additions & 1 deletion src/brpc/details/health_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
ptr->_ninflight_app_health_check.fetch_add(
1, butil::memory_order_relaxed);
}
ptr->Revive();
// See comments above.
ptr->Revive(2/*note*/);
ptr->_hc_count = 0;
if (!FLAGS_health_check_path.empty()) {
HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s);
Expand Down
18 changes: 18 additions & 0 deletions src/brpc/event_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,24 @@ EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) {
return g_edisp[tag * FLAGS_event_dispatcher_num + index];
}

int IOEventData::OnCreated(const IOEventDataOptions& options) {
if (!options.input_cb) {
LOG(ERROR) << "Invalid input_cb=NULL";
return -1;
}
if (!options.output_cb) {
LOG(ERROR) << "Invalid output_cb=NULL";
return -1;
}

_options = options;
return 0;
}

void IOEventData::BeforeRecycled() {
_options = { NULL, NULL, NULL };
}

} // namespace brpc

#if defined(OS_LINUX)
Expand Down
211 changes: 203 additions & 8 deletions src/brpc/event_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,78 @@

#include "butil/macros.h" // DISALLOW_COPY_AND_ASSIGN
#include "bthread/types.h" // bthread_t, bthread_attr_t
#include "brpc/socket.h" // Socket, SocketId
#include "brpc/versioned_ref_with_id.h"


namespace brpc {

// Unique identifier of a IOEventData.
// Users shall store EventDataId instead of EventData and call EventData::Address()
// to convert the identifier to an unique_ptr at each access. Whenever a
// unique_ptr is not destructed, the enclosed EventData will not be recycled.
typedef VRefId IOEventDataId;

const VRefId INVALID_IO_EVENT_DATA_ID = INVALID_VREF_ID;

class IOEventData;

typedef VersionedRefWithIdUniquePtr<IOEventData> EventDataUniquePtr;

// User callback type of input event and output event.
typedef int (*InputEventCallback) (void* id, uint32_t events,
const bthread_attr_t& thread_attr);
typedef InputEventCallback OutputEventCallback;

struct IOEventDataOptions {
// Callback for input event.
InputEventCallback input_cb;
// Callback for output event.
OutputEventCallback output_cb;
// User data.
void* user_data;
};

// EventDispatcher finds IOEventData by IOEventDataId which is
// stored in epoll/kqueue data, and calls input/output event callback,
// so EventDispatcher supports various IO types, such as socket,
// pipe, eventfd, timerfd, etc.
class IOEventData : public VersionedRefWithId<IOEventData> {
public:
explicit IOEventData(Forbidden f)
: VersionedRefWithId<IOEventData>(f)
, _options{ NULL, NULL, NULL } {}

DISALLOW_COPY_AND_ASSIGN(IOEventData);

int CallInputEventCallback(uint32_t events,
const bthread_attr_t& thread_attr) {
return _options.input_cb(_options.user_data, events, thread_attr);
}

int CallOutputEventCallback(uint32_t events,
const bthread_attr_t& thread_attr) {
return _options.output_cb(_options.user_data, events, thread_attr);
}

private:
friend class VersionedRefWithId<IOEventData>;

int OnCreated(const IOEventDataOptions& options);
void BeforeRecycled();

IOEventDataOptions _options;
};

namespace rdma {
class RdmaEndpoint;
}

// Dispatch edge-triggered events of file descriptors to consumers
// running in separate bthreads.
class EventDispatcher {
friend class Socket;
friend class rdma::RdmaEndpoint;
template <typename T> friend class IOEvent;
public:
EventDispatcher();

Expand All @@ -40,7 +102,7 @@ friend class rdma::RdmaEndpoint;
// Use |*consumer_thread_attr| (if it's not NULL) as the attribute to
// create bthreads running user callbacks.
// Returns 0 on success, -1 otherwise.
virtual int Start(const bthread_attr_t* consumer_thread_attr);
virtual int Start(const bthread_attr_t* thread_attr);

// True iff this dispatcher is running in a bthread
bool Running() const;
Expand All @@ -57,19 +119,19 @@ friend class rdma::RdmaEndpoint;
// When the file descriptor is removed from internal epoll, the Socket
// will be dereferenced once additionally.
// Returns 0 on success, -1 otherwise.
int AddConsumer(SocketId socket_id, int fd);
int AddConsumer(IOEventDataId event_data_id, int fd);

// Watch EPOLLOUT event on `fd' into epoll device. If `pollin' is
// true, EPOLLIN event will also be included and EPOLL_CTL_MOD will
// be used instead of EPOLL_CTL_ADD. When event arrives,
// `Socket::HandleEpollOut' will be called with `socket_id'
// Returns 0 on success, -1 otherwise and errno is set
int RegisterEvent(SocketId socket_id, int fd, bool pollin);
int RegisterEvent(IOEventDataId event_data_id, int fd, bool pollin);

// Remove EPOLLOUT event on `fd'. If `pollin' is true, EPOLLIN event
// will be kept and EPOLL_CTL_MOD will be used instead of EPOLL_CTL_DEL
// Returns 0 on success, -1 otherwise and errno is set
int UnregisterEvent(SocketId socket_id, int fd, bool pollin);
int UnregisterEvent(IOEventDataId event_data_id, int fd, bool pollin);

private:
DISALLOW_COPY_AND_ASSIGN(EventDispatcher);
Expand All @@ -83,8 +145,33 @@ friend class rdma::RdmaEndpoint;
// Remove the file descriptor `fd' from epoll.
int RemoveConsumer(int fd);

// The epoll to watch events.
int _epfd;
// Call user callback of input event and output event.
template<bool IsInputEvent>
static int OnEvent(IOEventDataId event_data_id, uint32_t events,
const bthread_attr_t& thread_attr) {
EventDataUniquePtr data;
if (IOEventData::Address(event_data_id, &data) != 0) {
return -1;
}
return IsInputEvent ?
data->CallInputEventCallback(events, thread_attr) :
data->CallOutputEventCallback(events, thread_attr);
}

static int CallInputEventCallback(IOEventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr) {
return OnEvent<true>(event_data_id, events, thread_attr);
}

static int CallOutputEventCallback(IOEventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr) {
return OnEvent<false>(event_data_id, events, thread_attr);
}

// The epoll/kqueue fd to watch events.
int _event_dispatcher_fd;

// false unless Stop() is called.
volatile bool _stop;
Expand All @@ -93,14 +180,122 @@ friend class rdma::RdmaEndpoint;
bthread_t _tid;

// The attribute of bthreads calling user callbacks.
bthread_attr_t _consumer_thread_attr;
bthread_attr_t _thread_attr;

// Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
int _wakeup_fds[2];
};

EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag);

// IOEvent class manages the IO events of a file descriptor conveniently.
template <typename T>
class IOEvent {
public:
IOEvent()
: _init(false)
, _event_data_id(INVALID_IO_EVENT_DATA_ID)
, _bthread_tag(bthread_self_tag()) {}

~IOEvent() { Reset(); }

DISALLOW_COPY_AND_ASSIGN(IOEvent);

int Init(void* user_data) {
if (_init) {
LOG(WARNING) << "IOEvent has been initialized";
return 0;
}
IOEventDataOptions options{ OnInputEvent, OnOutputEvent, user_data };
if (IOEventData::Create(&_event_data_id, options) != 0) {
LOG(ERROR) << "Fail to create EventData";
return -1;
}
_init = true;
return 0;
}

void Reset() {
if (_init) {
IOEventData::SetFailedById(_event_data_id);
_init = false;
}
}

// See comments of `EventDispatcher::AddConsumer'.
int AddConsumer(int fd) {
if (!_init) {
LOG(ERROR) << "IOEvent has not been initialized";
return -1;
}
return GetGlobalEventDispatcher(fd, _bthread_tag)
.AddConsumer(_event_data_id, fd);
}

// See comments of `EventDispatcher::RemoveConsumer'.
int RemoveConsumer(int fd) {
if (!_init) {
LOG(ERROR) << "IOEvent has not been initialized";
return -1;
}
return GetGlobalEventDispatcher(fd, _bthread_tag).RemoveConsumer(fd);
}

// See comments of `EventDispatcher::RegisterEvent'.
int RegisterEvent(int fd, bool pollin) {
if (!_init) {
LOG(ERROR) << "IOEvent has not been initialized";
return -1;
}
return GetGlobalEventDispatcher(fd, _bthread_tag)
.RegisterEvent(_event_data_id, fd, pollin);
}

// See comments of `EventDispatcher::UnregisterEvent'.
int UnregisterEvent(int fd, bool pollin) {
if (!_init) {
LOG(ERROR) << "IOEvent has not been initialized";
return -1;
}
return GetGlobalEventDispatcher(fd, _bthread_tag)
.UnregisterEvent(_event_data_id, fd, pollin);
}

void set_bthread_tag(bthread_tag_t bthread_tag) {
_bthread_tag = bthread_tag;
}
bthread_tag_t bthread_tag() const {
return _bthread_tag;
}

private:
// Generic callback to handle input event.
static int OnInputEvent(void* user_data, uint32_t events,
const bthread_attr_t& thread_attr) {
static_assert(
butil::is_result_int<decltype(&T::OnInputEvent),
void*, uint32_t,
bthread_attr_t>::value,
"T::OnInputEvent signature mismatch");
return T::OnInputEvent(user_data, events, thread_attr);
}

// Generic callback to handle output event.
static int OnOutputEvent(void* user_data, uint32_t events,
const bthread_attr_t& thread_attr) {
static_assert(
butil::is_result_int<decltype(&T::OnOutputEvent),
void*, uint32_t,
bthread_attr_t>::value,
"T::OnInputEvent signature mismatch");
return T::OnOutputEvent(user_data, events, thread_attr);
}

bool _init;
IOEventDataId _event_data_id;
bthread_tag_t _bthread_tag;
};

} // namespace brpc


Expand Down
Loading

0 comments on commit 122355a

Please sign in to comment.