Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE: make sure event loops properly use the deferment api. #598

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions source/bsd/kqueue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,10 @@ static void aws_event_loop_thread(void *user_data) {
handle_data->events_this_loop |= event_flags;
}

/* enter the deferment boundary before handling IO events so that the scheduled events resulting from handling
* the IO, do not immediately get executed in the scheduler run. */
aws_task_scheduler_enter_deferment_boundary(&impl->thread_data.scheduler);

/* Invoke each handle's event callback (unless the handle has been unsubscribed) */
for (int i = 0; i < num_io_handle_events; ++i) {
struct handle_data *handle_data = io_handle_events[i];
Expand Down Expand Up @@ -947,6 +951,8 @@ static void aws_event_loop_thread(void *user_data) {
will not be run. That's ok, we'll handle them next time around. */
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop);
aws_task_scheduler_run_all(&impl->thread_data.scheduler, now_ns);
/* exit the deferment so that all deferred tasks will be executed in the next scheduler run. */
aws_task_scheduler_exit_deferment_boundary(&impl->thread_data.scheduler);

/* Set timeout for next kevent() call.
* If clock fails, or scheduler has no tasks, use default timeout */
Expand Down
42 changes: 35 additions & 7 deletions source/linux/epoll_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ struct epoll_event_data {
aws_event_loop_on_event_fn *on_event;
void *user_data;
struct aws_task cleanup_task;
int event_type_mask;
struct aws_linked_list_node node;
bool is_subscribed; /* false when handle is unsubscribed, but this struct hasn't been cleaned up yet */
};

Expand Down Expand Up @@ -407,6 +409,7 @@ static int s_subscribe_to_io_events(
epoll_event_data->handle = handle;
epoll_event_data->on_event = on_event;
epoll_event_data->is_subscribed = true;
aws_linked_list_node_reset(&epoll_event_data->node);

/*everyone is always registered for edge-triggered, hang up, remote hang up, errors. */
uint32_t event_mask = EPOLLET | EPOLLHUP | EPOLLRDHUP | EPOLLERR;
Expand Down Expand Up @@ -615,37 +618,59 @@ static void aws_event_loop_thread(void *args) {

AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, event_count);

struct aws_linked_list deduped_events;
aws_linked_list_init(&deduped_events);

for (int i = 0; i < event_count; ++i) {
struct epoll_event_data *event_data = (struct epoll_event_data *)events[i].data.ptr;

int event_mask = 0;
/* only do this once per event, this handles the case where the same fd has multiple events on it. */
if (event_data->node.next == NULL) {
event_data->event_type_mask = 0;
aws_linked_list_push_back(&deduped_events, &event_data->node);
}

if (events[i].events & EPOLLIN) {
event_mask |= AWS_IO_EVENT_TYPE_READABLE;
event_data->event_type_mask |= AWS_IO_EVENT_TYPE_READABLE;
}

if (events[i].events & EPOLLOUT) {
event_mask |= AWS_IO_EVENT_TYPE_WRITABLE;
event_data->event_type_mask |= AWS_IO_EVENT_TYPE_WRITABLE;
}

if (events[i].events & EPOLLRDHUP) {
event_mask |= AWS_IO_EVENT_TYPE_REMOTE_HANG_UP;
event_data->event_type_mask |= AWS_IO_EVENT_TYPE_REMOTE_HANG_UP;
}

if (events[i].events & EPOLLHUP) {
event_mask |= AWS_IO_EVENT_TYPE_CLOSED;
event_data->event_type_mask |= AWS_IO_EVENT_TYPE_CLOSED;
}

if (events[i].events & EPOLLERR) {
event_mask |= AWS_IO_EVENT_TYPE_ERROR;
event_data->event_type_mask |= AWS_IO_EVENT_TYPE_ERROR;
}
}

/* enter the deferment boundary before handling IO events so that the scheduled events resulting from handling
* the IO, do not immediately get executed in the scheduler run. */
aws_task_scheduler_enter_deferment_boundary(&epoll_loop->scheduler);

/* this should now be unique per fd */
while (!aws_linked_list_empty(&deduped_events)) {
struct aws_linked_list_node *deduped_node = aws_linked_list_pop_front(&deduped_events);
struct epoll_event_data *event_data = AWS_CONTAINER_OF(deduped_node, struct epoll_event_data, node);
aws_linked_list_node_reset(&event_data->node);

if (event_data->is_subscribed) {
AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP,
"id=%p: activity on fd %d, invoking handler.",
(void *)event_loop,
event_data->handle->data.fd);
event_data->on_event(event_loop, event_data->handle, event_mask, event_data->user_data);
event_data->on_event(
event_loop, event_data->handle, event_data->event_type_mask, event_data->user_data);
event_data->event_type_mask = 0;
}
}

Expand All @@ -658,6 +683,9 @@ static void aws_event_loop_thread(void *args) {
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop);
aws_task_scheduler_run_all(&epoll_loop->scheduler, now_ns);

/* exit the deferment so that all deferred tasks will be executed in the next scheduler run. */
aws_task_scheduler_exit_deferment_boundary(&epoll_loop->scheduler);

/* set timeout for next epoll_wait() call.
* if clock fails, or scheduler has no tasks, use default timeout */
bool use_default_timeout = false;
Expand Down
18 changes: 14 additions & 4 deletions source/socket_channel_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ static void s_on_readable_notification(struct aws_socket *socket, int error_code
* or schedule a task to enforce fairness for other sockets in the event loop if we read up to the max
* read per event loop tick.
*/
static void s_do_read(struct socket_handler *socket_handler) {
static void s_do_read(struct socket_handler *socket_handler, bool from_task) {

size_t downstream_window = aws_channel_slot_downstream_read_window(socket_handler->slot);
size_t max_to_read =
Expand All @@ -134,6 +134,16 @@ static void s_do_read(struct socket_handler *socket_handler) {
(unsigned long long)max_to_read);

if (max_to_read == 0) {
if (from_task) {
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET_HANDLER,
"id=%p: do_read call was called from a task, but we've exceeded the available channel window."
"Scheduling another read task to avoid missing edge-triggers",
(void *)socket_handler->slot->handler);
aws_channel_task_init(
&socket_handler->read_task_storage, s_read_task, socket_handler, "socket_handler_re_read");
aws_channel_schedule_task_now(socket_handler->slot->channel, &socket_handler->read_task_storage);
}
return;
}

Expand Down Expand Up @@ -218,7 +228,7 @@ static void s_on_readable_notification(struct aws_socket *socket, int error_code
* then immediately closes the socket. On some platforms, we'll never see the readable flag. So we want to make
* sure we read the ALERT, otherwise, we'll end up telling the user that the channel shutdown because of a socket
* closure, when in reality it was a TLS error */
s_do_read(socket_handler);
s_do_read(socket_handler, false);

if (error_code && !socket_handler->shutdown_in_progress) {
aws_channel_shutdown(socket_handler->slot->channel, error_code);
Expand All @@ -232,7 +242,7 @@ static void s_read_task(struct aws_channel_task *task, void *arg, aws_task_statu

if (status == AWS_TASK_STATUS_RUN_READY) {
struct socket_handler *socket_handler = arg;
s_do_read(socket_handler);
s_do_read(socket_handler, true);
}
}

Expand Down Expand Up @@ -358,7 +368,7 @@ static void s_gather_statistics(struct aws_channel_handler *handler, struct aws_
static void s_trigger_read(struct aws_channel_handler *handler) {
struct socket_handler *socket_handler = (struct socket_handler *)handler->impl;

s_do_read(socket_handler);
s_do_read(socket_handler, false);
}

static struct aws_channel_handler_vtable s_vtable = {
Expand Down
7 changes: 7 additions & 0 deletions source/windows/iocp/iocp_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,10 @@ static void aws_event_loop_thread(void *user_data) {

aws_event_loop_register_tick_start(event_loop);

/* enter the deferment boundary before handling IO events so that the scheduled events resulting from handling
* the IO, do not immediately get executed in the scheduler run. */
aws_task_scheduler_enter_deferment_boundary(&impl->thread_data.scheduler);

if (has_completion_entries) {
AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP,
Expand Down Expand Up @@ -746,6 +750,9 @@ static void aws_event_loop_thread(void *user_data) {
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop);
aws_task_scheduler_run_all(&impl->thread_data.scheduler, now_ns);

/* exit the deferment so that all deferred tasks will be executed in the next scheduler run. */
aws_task_scheduler_exit_deferment_boundary(&impl->thread_data.scheduler);

/* Set timeout for next GetQueuedCompletionStatus() call.
* If clock fails, or scheduler has no tasks, use default timeout */
bool use_default_timeout = false;
Expand Down
Loading