Skip to content

Commit

Permalink
[Level Events] manage level events registration mask (envoyproxy#13787)
Browse files Browse the repository at this point in the history
Signed-off-by: Sotiris Nanopoulos <sonanopo@microsoft.com>
Signed-off-by: Qin Qin <qqin@google.com>
  • Loading branch information
Sotiris Nanopoulos authored and qqustc committed Nov 24, 2020
1 parent c0830ab commit 7338588
Show file tree
Hide file tree
Showing 18 changed files with 254 additions and 54 deletions.
6 changes: 6 additions & 0 deletions include/envoy/api/io_error.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ template <typename ReturnValue> struct IoCallResult {
*/
bool ok() const { return err_ == nullptr; }

/**
* This return code is frequent enough that we have a separate function to check.
* @return true if the system call failed because the socket would block.
*/
bool wouldBlock() const { return !ok() && err_->getErrorCode() == IoError::IoErrorCode::Again; }

// TODO(danzh): rename it to be more meaningful, i.e. return_value_.
ReturnValue rc_;
IoErrorPtr err_;
Expand Down
45 changes: 39 additions & 6 deletions include/envoy/event/file_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,38 @@ struct FileReadyType {
static const uint32_t Closed = 0x4;
};

enum class FileTriggerType { Level, Edge };
enum class FileTriggerType {
// See @man 7 epoll(7)
// They are used on all platforms for DNS and TCP listeners.
Level,
// See @man 7 epoll(7)
// They are used on all platforms that support Edge triggering as the default trigger type.
Edge,
// These are synthetic edge events managed by Envoy. They are based on level events and when they
// are activated they are immediately disabled. This makes them behave like Edge events. Then it
// is is the responsibility of the consumer of the event to reactivate the event
// when the socket operation would block.
//
// Their main application in Envoy is for Win32 which does not support edge-triggered events. They
// should be used in Win32 instead of level events. They can only be used in platforms where
// `PlatformDefaultTriggerType` is `FileTriggerType::EmulatedEdge`.
EmulatedEdge
};

static constexpr FileTriggerType PlatformDefaultTriggerType
#ifdef WIN32
// Libevent only supports Level trigger on Windows.
{FileTriggerType::Level};
// For POSIX developers to get the Windows behavior of file events
// you need to add the following definition:
// `FORCE_LEVEL_EVENTS`
// You can do this with bazel if you add the following build/test options
// `--copt="-DFORCE_LEVEL_EVENTS"`
constexpr FileTriggerType determinePlatformPreferredEventType() {
#if defined(WIN32) || defined(FORCE_LEVEL_EVENTS)
return FileTriggerType::EmulatedEdge;
#else
{FileTriggerType::Edge};
return FileTriggerType::Edge;
#endif
}

static constexpr FileTriggerType PlatformDefaultTriggerType = determinePlatformPreferredEventType();

/**
* Callback invoked when a FileEvent is ready for reading or writing.
Expand All @@ -53,6 +76,16 @@ class FileEvent {
* registered events and fire callbacks when they are active.
*/
virtual void setEnabled(uint32_t events) PURE;

/**
* Add a single event from the event registration mark.
*/
virtual void registerEventIfEmulatedEdge(uint32_t event) PURE;

/**
* Remove a single event from the event registration mark.
*/
virtual void unregisterEventIfEmulatedEdge(uint32_t event) PURE;
};

using FileEventPtr = std::unique_ptr<FileEvent>;
Expand Down
74 changes: 66 additions & 8 deletions source/common/event/file_event_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Event {

FileEventImpl::FileEventImpl(DispatcherImpl& dispatcher, os_fd_t fd, FileReadyCb cb,
FileTriggerType trigger, uint32_t events)
: cb_(cb), fd_(fd), trigger_(trigger),
: cb_(cb), fd_(fd), trigger_(trigger), enabled_events_(events),
activation_cb_(dispatcher.createSchedulableCallback([this]() {
ASSERT(injected_activation_events_ != 0);
mergeInjectedEventsAndRunCb(0);
Expand All @@ -21,9 +21,13 @@ FileEventImpl::FileEventImpl(DispatcherImpl& dispatcher, os_fd_t fd, FileReadyCb
// an OOM condition and just crash.
RELEASE_ASSERT(SOCKET_VALID(fd), "");
#ifdef WIN32
RELEASE_ASSERT(trigger_ == FileTriggerType::Level,
"libevent does not support edge triggers on Windows");
ASSERT(trigger_ != FileTriggerType::Edge, "libevent does not support edge triggers on Windows");
#endif
if constexpr (PlatformDefaultTriggerType != FileTriggerType::EmulatedEdge) {
ASSERT(trigger_ != FileTriggerType::EmulatedEdge,
"Cannot use EmulatedEdge events if they are not the default platform type");
}

assignEvents(events, &dispatcher.base());
event_add(&raw_event_, nullptr);
}
Expand All @@ -48,9 +52,10 @@ void FileEventImpl::activate(uint32_t events) {

void FileEventImpl::assignEvents(uint32_t events, event_base* base) {
ASSERT(base != nullptr);
enabled_events_ = events;
event_assign(
&raw_event_, base, fd_,
EV_PERSIST | (trigger_ == FileTriggerType::Level ? 0 : EV_ET) |
EV_PERSIST | (trigger_ == FileTriggerType::Edge ? EV_ET : 0) |
(events & FileReadyType::Read ? EV_READ : 0) |
(events & FileReadyType::Write ? EV_WRITE : 0) |
(events & FileReadyType::Closed ? EV_CLOSED : 0),
Expand All @@ -75,6 +80,16 @@ void FileEventImpl::assignEvents(uint32_t events, event_base* base) {
this);
}

void FileEventImpl::updateEvents(uint32_t events) {
if (events == enabled_events_) {
return;
}
auto* base = event_get_base(&raw_event_);
event_del(&raw_event_);
assignEvents(events, base);
event_add(&raw_event_, nullptr);
}

void FileEventImpl::setEnabled(uint32_t events) {
if (injected_activation_events_ != 0) {
// Clear pending events on updates to the fd event mask to avoid delivering events that are no
Expand All @@ -84,19 +99,62 @@ void FileEventImpl::setEnabled(uint32_t events) {
injected_activation_events_ = 0;
activation_cb_->cancel();
}
updateEvents(events);
}

auto* base = event_get_base(&raw_event_);
event_del(&raw_event_);
assignEvents(events, base);
event_add(&raw_event_, nullptr);
void FileEventImpl::unregisterEventIfEmulatedEdge(uint32_t event) {
// This constexpr if allows the compiler to optimize away the function on POSIX
if constexpr (PlatformDefaultTriggerType == FileTriggerType::EmulatedEdge) {
ASSERT((event & (FileReadyType::Read | FileReadyType::Write)) == event);
if (trigger_ == FileTriggerType::EmulatedEdge) {
auto new_event_mask = enabled_events_ & ~event;
updateEvents(new_event_mask);
}
}
}

void FileEventImpl::registerEventIfEmulatedEdge(uint32_t event) {
// This constexpr if allows the compiler to optimize away the function on POSIX
if constexpr (PlatformDefaultTriggerType == FileTriggerType::EmulatedEdge) {
ASSERT((event & (FileReadyType::Read | FileReadyType::Write)) == event);
if (trigger_ == FileTriggerType::EmulatedEdge) {
auto new_event_mask = enabled_events_ | event;
if (event & FileReadyType::Read && (enabled_events_ & FileReadyType::Closed)) {
// We never ask for both early close and read at the same time.
new_event_mask = new_event_mask & ~FileReadyType::Read;
}
updateEvents(new_event_mask);
}
}
}

void FileEventImpl::mergeInjectedEventsAndRunCb(uint32_t events) {
if (injected_activation_events_ != 0) {
// TODO(antoniovicente) remove this adjustment to activation events once ConnectionImpl can
// handle Read and Close events delivered together.
if constexpr (PlatformDefaultTriggerType == FileTriggerType::EmulatedEdge) {
if (events & FileReadyType::Closed && injected_activation_events_ & FileReadyType::Read) {
// We never ask for both early close and read at the same time. If close is requested
// keep that instead.
injected_activation_events_ = injected_activation_events_ & ~FileReadyType::Read;
}
}

events |= injected_activation_events_;
injected_activation_events_ = 0;
activation_cb_->cancel();
}

// TODO(davinci26): This can be optimized further in (w)epoll backends using the `EPOLLONESHOT`
// flag. With this flag `EPOLLIN`/`EPOLLOUT` are automatically disabled when the event is
// activated.
if constexpr (PlatformDefaultTriggerType == FileTriggerType::EmulatedEdge) {
if (trigger_ == FileTriggerType::EmulatedEdge) {
unregisterEventIfEmulatedEdge(events &
(Event::FileReadyType::Write | Event::FileReadyType::Read));
}
}

cb_(events);
}

Expand Down
5 changes: 5 additions & 0 deletions source/common/event/file_event_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@ class FileEventImpl : public FileEvent, ImplBase {
// Event::FileEvent
void activate(uint32_t events) override;
void setEnabled(uint32_t events) override;
void unregisterEventIfEmulatedEdge(uint32_t event) override;
void registerEventIfEmulatedEdge(uint32_t event) override;

private:
void assignEvents(uint32_t events, event_base* base);
void mergeInjectedEventsAndRunCb(uint32_t events);
void updateEvents(uint32_t events);

FileReadyCb cb_;
os_fd_t fd_;
FileTriggerType trigger_;
// Enabled events for this fd.
uint32_t enabled_events_;

// Injected FileReadyType events that were scheduled by recent calls to activate() and are pending
// delivery.
Expand Down
7 changes: 3 additions & 4 deletions source/common/event/libevent_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,9 @@ class LibeventScheduler : public Scheduler, public CallbackScheduler {

static constexpr int flagsBasedOnEventType() {
if constexpr (Event::PlatformDefaultTriggerType == FileTriggerType::Level) {
// On Windows, EVLOOP_NONBLOCK will cause the libevent event_base_loop to run forever.
// This is because libevent only supports level triggering on Windows, and so the write
// event callbacks will trigger every time through the loop. Adding EVLOOP_ONCE ensures the
// loop will run at most once
// With level events, EVLOOP_NONBLOCK will cause the libevent event_base_loop to run
// forever. This is because the write event callbacks will trigger every time through the
// loop. Adding EVLOOP_ONCE ensures the loop will run at most once
return EVLOOP_NONBLOCK | EVLOOP_ONCE;
}
return EVLOOP_NONBLOCK;
Expand Down
92 changes: 85 additions & 7 deletions source/common/network/io_socket_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,18 @@ Api::IoCallUint64Result IoSocketHandleImpl::readv(uint64_t max_length, Buffer::R
num_bytes_to_read += slice_length;
}
ASSERT(num_bytes_to_read <= max_length);
return sysCallResultToIoCallResult(Api::OsSysCallsSingleton::get().readv(
auto result = sysCallResultToIoCallResult(Api::OsSysCallsSingleton::get().readv(
fd_, iov.begin(), static_cast<int>(num_slices_to_read)));

// Emulated edge events need to registered if the socket operation did not complete
// because the socket would block.
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
// Some tests try to read without initializing the file_event.
if (result.wouldBlock() && file_event_) {
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
}
}
return result;
}

Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer, uint64_t max_length) {
Expand All @@ -103,6 +113,15 @@ Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer, uint6
bytes_to_commit -= slices[i].len_;
}
buffer.commit(slices, num_slices);

// Emulated edge events need to registered if the socket operation did not complete
// because the socket would block.
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
// Some tests try to read without initializing the file_event.
if (result.wouldBlock() && file_event_) {
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
}
}
return result;
}

Expand All @@ -120,8 +139,18 @@ Api::IoCallUint64Result IoSocketHandleImpl::writev(const Buffer::RawSlice* slice
if (num_slices_to_write == 0) {
return Api::ioCallUint64ResultNoError();
}
return sysCallResultToIoCallResult(
auto result = sysCallResultToIoCallResult(
Api::OsSysCallsSingleton::get().writev(fd_, iov.begin(), num_slices_to_write));

// Emulated edge events need to registered if the socket operation did not complete
// because the socket would block.
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
// Some tests try to write without initializing the file_event.
if (result.wouldBlock() && file_event_) {
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Write);
}
}
return result;
}

Api::IoCallUint64Result IoSocketHandleImpl::write(Buffer::Instance& buffer) {
Expand All @@ -131,6 +160,15 @@ Api::IoCallUint64Result IoSocketHandleImpl::write(Buffer::Instance& buffer) {
if (result.ok() && result.rc_ > 0) {
buffer.drain(static_cast<uint64_t>(result.rc_));
}

// Emulated edge events need to registered if the socket operation did not complete
// because the socket would block.
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
// Some tests try to read without initializing the file_event.
if (result.wouldBlock() && file_event_) {
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Write);
}
}
return result;
}

Expand Down Expand Up @@ -168,7 +206,15 @@ Api::IoCallUint64Result IoSocketHandleImpl::sendmsg(const Buffer::RawSlice* slic
message.msg_control = nullptr;
message.msg_controllen = 0;
const Api::SysCallSizeResult result = os_syscalls.sendmsg(fd_, &message, flags);
return sysCallResultToIoCallResult(result);
auto io_result = sysCallResultToIoCallResult(result);
// Emulated edge events need to registered if the socket operation did not complete
// because the socket would block.
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
if (io_result.wouldBlock() && file_event_) {
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Write);
}
}
return io_result;
} else {
const size_t space_v6 = CMSG_SPACE(sizeof(in6_pktinfo));
const size_t space_v4 = CMSG_SPACE(sizeof(in_pktinfo));
Expand Down Expand Up @@ -210,7 +256,15 @@ Api::IoCallUint64Result IoSocketHandleImpl::sendmsg(const Buffer::RawSlice* slic
*(reinterpret_cast<absl::uint128*>(pktinfo->ipi6_addr.s6_addr)) = self_ip->ipv6()->address();
}
const Api::SysCallSizeResult result = os_syscalls.sendmsg(fd_, &message, flags);
return sysCallResultToIoCallResult(result);
auto io_result = sysCallResultToIoCallResult(result);
// Emulated edge events need to registered if the socket operation did not complete
// because the socket would block.
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
if (io_result.wouldBlock() && file_event_) {
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Write);
}
}
return io_result;
}
}

Expand Down Expand Up @@ -298,7 +352,15 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices,
hdr.msg_controllen = cmsg_space_;
const Api::SysCallSizeResult result = Api::OsSysCallsSingleton::get().recvmsg(fd_, &hdr, 0);
if (result.rc_ < 0) {
return sysCallResultToIoCallResult(result);
auto io_result = sysCallResultToIoCallResult(result);
// Emulated edge events need to registered if the socket operation did not complete
// because the socket would block.
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
if (io_result.wouldBlock() && file_event_) {
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
}
}
return io_result;
}

RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0,
Expand Down Expand Up @@ -381,7 +443,15 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin
fd_, mmsg_hdr.data(), num_packets_per_mmsg_call, MSG_TRUNC | MSG_WAITFORONE, nullptr);

if (result.rc_ <= 0) {
return sysCallResultToIoCallResult(result);
auto io_result = sysCallResultToIoCallResult(result);
// Emulated edge events need to registered if the socket operation did not complete
// because the socket would block.
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
if (io_result.wouldBlock() && file_event_) {
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
}
}
return io_result;
}

int num_packets_read = result.rc_;
Expand Down Expand Up @@ -435,7 +505,15 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin
Api::IoCallUint64Result IoSocketHandleImpl::recv(void* buffer, size_t length, int flags) {
const Api::SysCallSizeResult result =
Api::OsSysCallsSingleton::get().recv(fd_, buffer, length, flags);
return sysCallResultToIoCallResult(result);
auto io_result = sysCallResultToIoCallResult(result);
// Emulated edge events need to registered if the socket operation did not complete
// because the socket would block.
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
if (io_result.wouldBlock() && file_event_) {
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
}
}
return io_result;
}

bool IoSocketHandleImpl::supportsMmsg() const {
Expand Down
Loading

0 comments on commit 7338588

Please sign in to comment.