diff --git a/src/io_posix.c b/src/io_posix.c index 33517354..74aed173 100644 --- a/src/io_posix.c +++ b/src/io_posix.c @@ -104,6 +104,8 @@ udx__on_writable (udx_socket_t *socket) { #ifdef UDX_PLATFORM_HAS_SENDMMSG bool finished = false; + assert((socket->status & UDX_SOCKET_CLOSING_HANDLES) == 0); + while (!finished) { udx_packet_t *batch[UDX_SENDMMSG_BATCH_SIZE]; struct mmsghdr h[UDX_SENDMMSG_BATCH_SIZE]; @@ -180,7 +182,7 @@ udx__on_writable (udx_socket_t *socket) { udx__confirm_packet(batch[i]); } - if (rc == UV_EAGAIN) { + if (rc == UV_EAGAIN || socket->status & UDX_SOCKET_CLOSING_HANDLES) { finished = true; } } @@ -209,6 +211,9 @@ udx__on_writable (udx_socket_t *socket) { // todo: set in confirm packet with uv_now() pkt->time_sent = uv_now(socket->udx->loop); udx__confirm_packet(pkt); + if (socket->status & UDX_SOCKET_CLOSING_HANDLES) { + break; + } } #endif } diff --git a/src/io_win.c b/src/io_win.c index a9e8f75b..a541156b 100644 --- a/src/io_win.c +++ b/src/io_win.c @@ -85,6 +85,7 @@ udx__recvmsg (udx_socket_t *socket, uv_buf_t *buf, struct sockaddr *addr, int ad void udx__on_writable (udx_socket_t *socket) { + assert((socket->status & UDX_SOCKET_CLOSING_HANDLES) == 0); while (true) { udx_packet_t *pkt = udx__shift_packet(socket); if (pkt == NULL) break; @@ -108,5 +109,8 @@ udx__on_writable (udx_socket_t *socket) { } pkt->time_sent = uv_now(socket->udx->loop); udx__confirm_packet(pkt); + if (socket->status & UDX_SOCKET_CLOSING_HANDLES) { + break; + } } } diff --git a/src/udx.c b/src/udx.c index 365ec0a8..01b13840 100644 --- a/src/udx.c +++ b/src/udx.c @@ -1598,7 +1598,7 @@ on_uv_poll (uv_poll_t *handle, int status, int events) { buf.base = (char *) &b; buf.len = 2048; - while ((size = udx__recvmsg(socket, &buf, (struct sockaddr *) &addr, addr_len)) >= 0) { + while (!(socket->status & UDX_SOCKET_CLOSING_HANDLES) && (size = udx__recvmsg(socket, &buf, (struct sockaddr *) &addr, addr_len)) >= 0) { if (!process_packet(socket, b, size, (struct sockaddr *) &addr) && socket->on_recv != NULL) { buf.len = size; @@ -1613,7 +1613,7 @@ on_uv_poll (uv_poll_t *handle, int status, int events) { } } - if (events & UV_WRITABLE) { + if (events & UV_WRITABLE && !(socket->status & UDX_SOCKET_CLOSING_HANDLES)) { if (events & UV_READABLE) { // compensate for potentially long-running read callbacks uv_update_time(handle->loop);