Skip to content

Commit

Permalink
EventDispatcher supports various IO types
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Mar 4, 2024
1 parent 24fc31e commit 9b51e09
Show file tree
Hide file tree
Showing 10 changed files with 970 additions and 170 deletions.
54 changes: 54 additions & 0 deletions src/brpc/event_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,60 @@ EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) {
return g_edisp[tag * FLAGS_event_dispatcher_num + index];
}

int EventData::OnCreate(uint64_t user_id,
brpc::InputEventCallback input_cb,
brpc::OutputEventCallback output_cb) {
if (user_id == INVALID_VREF_ID) {
LOG(ERROR) << "Invalid user_id=-1";
return -1;
}
if (!input_cb) {
LOG(ERROR) << "Invalid input_cb=NULL";
return -1;
}
if (!output_cb) {
LOG(ERROR) << "Invalid output_cb=NULL";
return -1;
}

_user_id = user_id;
_input_cb = input_cb;
_output_cb = output_cb;
return 0;
}

void EventData::OnRecycle() {
_user_id = INVALID_VREF_ID;
_input_cb = NULL;
_output_cb = NULL;
}

void MakeEventDataIdInvalid(EventDataId& id) {
EventData::SetFailed(id);
id = INVALID_EVENT_DATA_ID;
}

int EventDispatcher::CallInputEventCallback(EventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr) {
EventDataUniquePtr data;
if (EventData::Address(event_data_id, &data) != 0) {
return -1;
}
data->CallInputEventCallback(events, thread_attr);
return 0;
}

int EventDispatcher::CallOutputEventCallback(EventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr) {
EventDataUniquePtr data;
if (EventData::Address(event_data_id, &data) != 0) {
return -1;
}
return data->CallOutputEventCallback(events, thread_attr);
}

} // namespace brpc

#if defined(OS_LINUX)
Expand Down
84 changes: 76 additions & 8 deletions src/brpc/event_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,71 @@

#include "butil/macros.h" // DISALLOW_COPY_AND_ASSIGN
#include "bthread/types.h" // bthread_t, bthread_attr_t
#include "brpc/socket.h" // Socket, SocketId
#include "brpc/versioned_ref_with_id.h"


namespace brpc {

// Unique identifier of a EventData.
// Users shall store EventDataId instead of EventData and call EventData::Address()
// to convert the identifier to an unique_ptr at each access. Whenever a
// unique_ptr is not destructed, the enclosed EventData will not be recycled.
typedef VRefId EventDataId;

const VRefId INVALID_EVENT_DATA_ID = INVALID_VREF_ID;

class EventData;

typedef VersionedRefWithIdUniquePtr<EventData> EventDataUniquePtr;

// User callback type of input event and output event.
typedef int (*InputEventCallback) (VRefId id, uint32_t events,
const bthread_attr_t& thread_attr);
typedef InputEventCallback OutputEventCallback;

// EventDispatcher finds EventData by EventDataId which is
// stored in epoll/kqueue data, and calls its callback, so
// EventDispatcher supports various IO types, such as socket,
// pipe, eventfd, timerfd, etc.
class EventData : public VersionedRefWithId<EventData> {
public:
explicit EventData(Forbidden f)
:VersionedRefWithId<EventData>(f)
, _user_id(INVALID_VREF_ID)
, _input_cb(NULL)
, _output_cb(NULL)
{}

int CallInputEventCallback(uint32_t events,
const bthread_attr_t& thread_attr) {
return _input_cb(_user_id, events, thread_attr);
}

int CallOutputEventCallback(uint32_t events,
const bthread_attr_t& thread_attr) {
return _output_cb(_user_id, events, thread_attr);
}

private:
friend class VersionedRefWithId<EventData>;

int OnCreate(uint64_t user_id,
InputEventCallback input_cb,
OutputEventCallback output_cb);
void OnFailed() {}
void OnRecycle();

uint64_t _user_id;
InputEventCallback _input_cb;
OutputEventCallback _output_cb;
};

void MakeEventDataIdInvalid(EventDataId& id);

namespace rdma {
class RdmaEndpoint;
}

// Dispatch edge-triggered events of file descriptors to consumers
// running in separate bthreads.
class EventDispatcher {
Expand All @@ -40,7 +100,7 @@ friend class rdma::RdmaEndpoint;
// Use |*consumer_thread_attr| (if it's not NULL) as the attribute to
// create bthreads running user callbacks.
// Returns 0 on success, -1 otherwise.
virtual int Start(const bthread_attr_t* consumer_thread_attr);
virtual int Start(const bthread_attr_t* thread_attr);

// True iff this dispatcher is running in a bthread
bool Running() const;
Expand All @@ -57,19 +117,19 @@ friend class rdma::RdmaEndpoint;
// When the file descriptor is removed from internal epoll, the Socket
// will be dereferenced once additionally.
// Returns 0 on success, -1 otherwise.
int AddConsumer(SocketId socket_id, int fd);
int AddConsumer(EventDataId event_data_id, int fd);

// Watch EPOLLOUT event on `fd' into epoll device. If `pollin' is
// true, EPOLLIN event will also be included and EPOLL_CTL_MOD will
// be used instead of EPOLL_CTL_ADD. When event arrives,
// `Socket::HandleEpollOut' will be called with `socket_id'
// Returns 0 on success, -1 otherwise and errno is set
int RegisterEvent(SocketId socket_id, int fd, bool pollin);
int RegisterEvent(EventDataId event_data_id, int fd, bool pollin);

// Remove EPOLLOUT event on `fd'. If `pollin' is true, EPOLLIN event
// will be kept and EPOLL_CTL_MOD will be used instead of EPOLL_CTL_DEL
// Returns 0 on success, -1 otherwise and errno is set
int UnregisterEvent(SocketId socket_id, int fd, bool pollin);
int UnregisterEvent(EventDataId event_data_id, int fd, bool pollin);

private:
DISALLOW_COPY_AND_ASSIGN(EventDispatcher);
Expand All @@ -83,8 +143,16 @@ friend class rdma::RdmaEndpoint;
// Remove the file descriptor `fd' from epoll.
int RemoveConsumer(int fd);

// The epoll to watch events.
int _epfd;
// Call user callback of input event and output event.
static int CallInputEventCallback(EventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr);
static int CallOutputEventCallback(EventDataId event_data_id,
uint32_t events,
const bthread_attr_t& thread_attr);

// The epoll/kqueue fd to watch events.
int _event_dispatcher_fd;

// false unless Stop() is called.
volatile bool _stop;
Expand All @@ -93,7 +161,7 @@ friend class rdma::RdmaEndpoint;
bthread_t _tid;

// The attribute of bthreads calling user callbacks.
bthread_attr_t _consumer_thread_attr;
bthread_attr_t _thread_attr;

// Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
int _wakeup_fds[2];
Expand Down
Loading

0 comments on commit 9b51e09

Please sign in to comment.