Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug where last few bytes on socket go unread #642

Merged
merged 18 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions source/posix/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -1668,6 +1668,23 @@ static void s_on_socket_io_event(
* subscribed is set to false. */
aws_ref_count_acquire(&socket_impl->internal_refcount);

/* NOTE: READABLE|WRITABLE|HANG_UP events might arrive simultaneously
* (e.g. peer sends last few bytes and immediately hangs up).
* Notify user of READABLE|WRITABLE events first, so they try to read any remaining bytes. */

if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) {
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd);
if (socket->readable_fn) {
socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data);
}
}
/* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not
* have been cleaned up, so this next branch is safe. */
if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) {
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd);
s_process_socket_write_requests(socket, NULL);
}

if (events & AWS_IO_EVENT_TYPE_REMOTE_HANG_UP || events & AWS_IO_EVENT_TYPE_CLOSED) {
aws_raise_error(AWS_IO_SOCKET_CLOSED);
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: closed remotely", (void *)socket, socket->io_handle.data.fd);
Expand All @@ -1688,19 +1705,6 @@ static void s_on_socket_io_event(
goto end_check;
}

if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) {
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd);
if (socket->readable_fn) {
socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data);
}
}
/* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not
* have been cleaned up, so this next branch is safe. */
if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) {
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd);
s_process_socket_write_requests(socket, NULL);
}

end_check:
aws_ref_count_release(&socket_impl->internal_refcount);
}
Expand Down
37 changes: 21 additions & 16 deletions source/socket_channel_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ static void s_on_readable_notification(struct aws_socket *socket, int error_code
*/
static void s_do_read(struct socket_handler *socket_handler) {

if (socket_handler->shutdown_in_progress) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function didn't have any bugs. I'm just simplifying the code.

When this code was written, shutdown_in_progress could change half-way through the function, but since 2019 shutdown is always queued, so moving this check to the top of the function, instead of having it at 3 separate parts of a complicated flow

return;
}

size_t downstream_window = aws_channel_slot_downstream_read_window(socket_handler->slot);
size_t max_to_read =
downstream_window > socket_handler->max_rw_size ? socket_handler->max_rw_size : downstream_window;
Expand All @@ -139,17 +143,20 @@ static void s_do_read(struct socket_handler *socket_handler) {

size_t total_read = 0;
size_t read = 0;
while (total_read < max_to_read && !socket_handler->shutdown_in_progress) {
int last_error = 0;
while (total_read < max_to_read) {
size_t iter_max_read = max_to_read - total_read;

struct aws_io_message *message = aws_channel_acquire_message_from_pool(
socket_handler->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, iter_max_read);

if (!message) {
last_error = aws_last_error();
break;
}

if (aws_socket_read(socket_handler->socket, &message->message_data, &read)) {
last_error = aws_last_error();
aws_mem_release(message->allocator, message);
break;
}
Expand All @@ -162,6 +169,7 @@ static void s_do_read(struct socket_handler *socket_handler) {
(unsigned long long)read);

if (aws_channel_slot_send_message(socket_handler->slot, message, AWS_CHANNEL_DIR_READ)) {
last_error = aws_last_error();
aws_mem_release(message->allocator, message);
break;
}
Expand All @@ -170,30 +178,29 @@ static void s_do_read(struct socket_handler *socket_handler) {
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET_HANDLER,
"id=%p: total read on this tick %llu",
(void *)&socket_handler->slot->handler,
(void *)socket_handler->slot->handler,
(unsigned long long)total_read);

socket_handler->stats.bytes_read += total_read;

/* resubscribe as long as there's no error, just return if we're in a would block scenario. */
if (total_read < max_to_read) {
int last_error = aws_last_error();
AWS_ASSERT(last_error != 0);

if (last_error != AWS_IO_READ_WOULD_BLOCK && !socket_handler->shutdown_in_progress) {
if (last_error != AWS_IO_READ_WOULD_BLOCK) {
aws_channel_shutdown(socket_handler->slot->channel, last_error);
} else {
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET_HANDLER,
"id=%p: out of data to read on socket. "
"Waiting on event-loop notification.",
(void *)socket_handler->slot->handler);
}

AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET_HANDLER,
"id=%p: out of data to read on socket. "
"Waiting on event-loop notification.",
(void *)socket_handler->slot->handler);
return;
}
/* in this case, everything was fine, but there's still pending reads. We need to schedule a task to do the read
* again. */
if (!socket_handler->shutdown_in_progress && total_read == socket_handler->max_rw_size &&
!socket_handler->read_task_storage.task_fn) {
if (total_read == socket_handler->max_rw_size && !socket_handler->read_task_storage.task_fn) {

AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET_HANDLER,
Expand All @@ -210,6 +217,8 @@ static void s_do_read(struct socket_handler *socket_handler) {
* If an error, start the channel shutdown process. */
static void s_on_readable_notification(struct aws_socket *socket, int error_code, void *user_data) {
(void)socket;
/* TODO: explain */
(void)error_code;

struct socket_handler *socket_handler = user_data;
AWS_LOGF_TRACE(AWS_LS_IO_SOCKET_HANDLER, "id=%p: socket is now readable", (void *)socket_handler->slot->handler);
Expand All @@ -219,10 +228,6 @@ static void s_on_readable_notification(struct aws_socket *socket, int error_code
* sure we read the ALERT, otherwise, we'll end up telling the user that the channel shutdown because of a socket
* closure, when in reality it was a TLS error */
s_do_read(socket_handler);

if (error_code && !socket_handler->shutdown_in_progress) {
aws_channel_shutdown(socket_handler->slot->channel, error_code);
}
Comment on lines -223 to -225
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only line that truly HAD to change to fix this bug

}

/* Either the result of a context switch (for fairness in the event loop), or a window update. */
Expand Down
3 changes: 2 additions & 1 deletion source/windows/iocp/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ static int s_determine_socket_error(int error) {
case IO_STATUS_TIMEOUT:
return AWS_IO_SOCKET_TIMEOUT;
case IO_PIPE_BROKEN:
case ERROR_BROKEN_PIPE:
return AWS_IO_SOCKET_CLOSED;
case STATUS_INVALID_ADDRESS_COMPONENT:
case WSAEADDRNOTAVAIL:
Expand Down Expand Up @@ -2970,7 +2971,7 @@ static int s_tcp_read(struct aws_socket *socket, struct aws_byte_buf *buffer, si

AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: ReadFile() failed with error %d",
"id=%p handle=%p: recv() failed with error %d",
(void *)socket,
(void *)socket->io_handle.data.handle,
error);
Expand Down
22 changes: 22 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,28 @@ add_test_case(pem_sanitize_wrong_format_rejected)

add_test_case(socket_handler_echo_and_backpressure)
add_test_case(socket_handler_close)
add_test_case(socket_handler_read_to_eof_after_peer_hangup)
# These tests fail on Windows due to some bug in our server code where, if the socket is closed
# immediately after data is written, that data does not flush cleanly to the client.
# I've lost days to this bug, and no one is using our Windows server funcionality,
# so disabling these tests on Windows and moving along for now.
# I tried the following:
# 1) Wrote 2 simple standalone Windows programs, server and client, using simple synchronous socket code.
# WORKED PERFECTLY. So it's not a fundamental issue with Windows.
# 2) Commented out server part of this failing test, and used the simple standalone server instead.
# WORKED PERFECTLY. So it's not a problem with our actual client code.
# 3) Copy/pasted the simple standlone server code into this test, and used that instead of our actual server code.
# WORKED PERFECTLY. So it's not a problem with the server and client sockets being in the same process.
# 4) Commented out the client part of this failing test, and used the simple standalone client instead.
# FAILED. The standalone client got WSAECONNRESET (Connection reset by peer) before receiving all the data.
# So it's something with our complicated non-blocking server code.
# The last interesting thing I noticed before giving up was: we call shutdown() immediately
# before calling closesocket() but shutdown() gets error WSAENOTCONN, even
# though, at that moment, the socket should be connected just fine.
if(NOT WIN32)
add_net_test_case(socket_handler_ipv4_read_to_eof_after_peer_hangup)
add_net_test_case(socket_handler_ipv6_read_to_eof_after_peer_hangup)
endif()
add_test_case(socket_pinned_event_loop)
add_net_test_case(socket_pinned_event_loop_dns_failure)

Expand Down
50 changes: 34 additions & 16 deletions tests/read_write_test_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,42 +190,60 @@ struct rw_handler_write_task_args {
struct aws_channel_slot *slot;
struct aws_byte_buf *buffer;
struct aws_channel_task task;
aws_channel_on_message_write_completed_fn *on_completion;
void *user_data;
};

static void s_rw_handler_write_task(struct aws_channel_task *task, void *arg, enum aws_task_status task_status) {
(void)task;
(void)task_status;
struct rw_handler_write_task_args *write_task_args = arg;
static void s_rw_handler_write_now(
struct aws_channel_slot *slot,
struct aws_byte_buf *buffer,
aws_channel_on_message_write_completed_fn *on_completion,
void *user_data) {

struct aws_io_message *msg = aws_channel_acquire_message_from_pool(
write_task_args->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, write_task_args->buffer->len);
struct aws_io_message *msg =
aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, buffer->len);

msg->on_completion = on_completion;
msg->user_data = user_data;

struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(write_task_args->buffer);
aws_byte_buf_append(&msg->message_data, &write_buffer);
struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(buffer);
AWS_FATAL_ASSERT(aws_byte_buf_append(&msg->message_data, &write_buffer) == AWS_OP_SUCCESS);

aws_channel_slot_send_message(write_task_args->slot, msg, AWS_CHANNEL_DIR_WRITE);
AWS_FATAL_ASSERT(aws_channel_slot_send_message(slot, msg, AWS_CHANNEL_DIR_WRITE) == AWS_OP_SUCCESS);
}

static void s_rw_handler_write_task(struct aws_channel_task *task, void *arg, enum aws_task_status task_status) {
(void)task;
(void)task_status;
struct rw_handler_write_task_args *write_task_args = arg;
s_rw_handler_write_now(
write_task_args->slot, write_task_args->buffer, write_task_args->on_completion, write_task_args->user_data);
aws_mem_release(write_task_args->handler->alloc, write_task_args);
}

void rw_handler_write(struct aws_channel_handler *handler, struct aws_channel_slot *slot, struct aws_byte_buf *buffer) {
rw_handler_write_with_callback(handler, slot, buffer, NULL /*on_completion*/, NULL /*user_data*/);
}

void rw_handler_write_with_callback(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
struct aws_byte_buf *buffer,
aws_channel_on_message_write_completed_fn *on_completion,
void *user_data) {

struct rw_test_handler_impl *handler_impl = handler->impl;

if (!handler_impl->event_loop_driven || aws_channel_thread_is_callers_thread(slot->channel)) {
struct aws_io_message *msg =
aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, buffer->len);

struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(buffer);
aws_byte_buf_append(&msg->message_data, &write_buffer);

aws_channel_slot_send_message(slot, msg, AWS_CHANNEL_DIR_WRITE);
s_rw_handler_write_now(slot, buffer, on_completion, user_data);
} else {
struct rw_handler_write_task_args *write_task_args =
aws_mem_acquire(handler->alloc, sizeof(struct rw_handler_write_task_args));
write_task_args->handler = handler;
write_task_args->buffer = buffer;
write_task_args->slot = slot;
write_task_args->on_completion = on_completion;
write_task_args->user_data = user_data;
aws_channel_task_init(&write_task_args->task, s_rw_handler_write_task, write_task_args, "rw_handler_write");

aws_channel_schedule_task_now(slot->channel, &write_task_args->task);
Expand Down
7 changes: 7 additions & 0 deletions tests/read_write_test_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ void rw_handler_enable_wait_on_destroy(

void rw_handler_write(struct aws_channel_handler *handler, struct aws_channel_slot *slot, struct aws_byte_buf *buffer);

void rw_handler_write_with_callback(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
struct aws_byte_buf *buffer,
aws_channel_on_message_write_completed_fn *on_completion,
void *user_data);

void rw_handler_trigger_read(struct aws_channel_handler *handler, struct aws_channel_slot *slot);

bool rw_handler_shutdown_called(struct aws_channel_handler *handler);
Expand Down
Loading
Loading