Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iocp: fix crash, GetQueuedCompletionStatus() write freed WSAOVERLAPPED memory #4136

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 124 additions & 25 deletions pjlib/src/pj/ioqueue_winnt.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
#include <pj/errno.h>
#include <pj/compat/socket.h>

#define THIS_FILE "iocp"

typedef BOOL (WINAPI *FnCancelIoEx)(HANDLE hFile, LPOVERLAPPED lpOverlapped);
static FnCancelIoEx fnCancelIoEx = NULL;

#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
# include <winsock2.h>
Expand Down Expand Up @@ -92,6 +96,14 @@ union operation_key
#endif
};

struct pending_op
{
PJ_DECL_LIST_MEMBER(struct pending_op);
union operation_key pending_key;
pj_ioqueue_op_key_t *op_key;
pj_pool_t *pool;
};

/* Type of handle in the key. */
enum handle_type
{
Expand Down Expand Up @@ -125,16 +137,18 @@ struct pj_ioqueue_key_t
pj_atomic_t *ref_count;
pj_bool_t closing;
pj_time_val free_time;
pj_mutex_t *mutex;
#endif

pj_mutex_t *mutex;
struct pending_op pending_list;
};

/*
* IO Queue structure.
*/
struct pj_ioqueue_t
{
pj_pool_t *pool;
pj_ioqueue_cfg cfg;
HANDLE iocp;
pj_lock_t *lock;
Expand Down Expand Up @@ -375,6 +389,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_create2(pj_pool_t *pool,

/* Create IOCP */
ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
ioqueue->pool = pool;
if (cfg)
pj_memcpy(&ioqueue->cfg, cfg, sizeof(*cfg));
else
Expand Down Expand Up @@ -409,7 +424,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_create2(pj_pool_t *pool,
pj_ioqueue_key_t *key;

key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));

pj_list_init(&key->pending_list);
rc = pj_atomic_create(pool, 0, &key->ref_count);
if (rc != PJ_SUCCESS) {
key = ioqueue->free_list.next;
Expand Down Expand Up @@ -577,6 +592,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,

#else
rec = (pj_ioqueue_key_t *)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
pj_mutex_create_recursive(pool, "ioqkey", rec->mutex);
pj_list_init(&rec->pending_list);
#endif

/* Build the key for this socket. */
Expand Down Expand Up @@ -698,6 +715,49 @@ static void decrement_counter(pj_ioqueue_key_t *key)
}
#endif

static struct pending_op *alloc_pending_op(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key)
{
pj_pool_t *pool;
struct pending_op *op;

pool = pj_pool_create(key->ioqueue->pool->factory, "pending_op%p", 512, 512, NULL);
PJ_ASSERT_RETURN(pool, NULL);
op = PJ_POOL_ZALLOC_T(pool, struct pending_op);
op->pool = pool;
op->op_key = op_key;

pj_mutex_lock(key->mutex);
pj_list_push_back(&key->pending_list, op);
pj_mutex_unlock(key->mutex);
return op;
}

static void release_pending_op(pj_ioqueue_key_t *key, struct pending_op *op)
{
if (!key || !op)
return;
pj_mutex_lock(key->mutex);
pj_list_erase(op);
pj_mutex_unlock(key->mutex);
pj_pool_release(op->pool);
}

static void cancel_all_pending_op(pj_ioqueue_key_t *key)
{
if (!fnCancelIoEx)
fnCancelIoEx = (FnCancelIoEx)GetProcAddress(GetModuleHandle(PJ_T("Kernel32.dll")), "CancelIoEx");
if (fnCancelIoEx) {
struct pending_op *op;
pj_mutex_lock(key->mutex);
for (op = key->pending_list.next; op != &key->pending_list; op = op->next) {
BOOL rc = fnCancelIoEx(key->ioqueue->iocp, (LPOVERLAPPED)&op->pending_key);
if (rc)
PJ_PERROR(2, (THIS_FILE, PJ_RETURN_OS_ERROR(GetLastError()), "cancel io error"));
}
pj_mutex_unlock(key->mutex);
}
}

/*
* Poll the I/O Completion Port, execute callback,
* and return the key and bytes transferred of the last operation.
Expand All @@ -711,11 +771,15 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
pj_ioqueue_key_t *key;
pj_ssize_t size_status = -1;
BOOL rcGetQueued;
struct pending_op *xpending_op = NULL;
pj_ioqueue_op_key_t *op_key = NULL;

/* Poll for completion status. */
rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransferred,
&dwKey, (OVERLAPPED**)&pOv,
dwTimeout);
if (!rcGetQueued && pOv)
PJ_PERROR(4, (THIS_FILE, PJ_STATUS_FROM_OS(GetLastError()), "GetQueuedCompletionStatus() error dwKey:%p, pOv:%p", (void *)dwKey, pOv));

/* The return value is:
* - nonzero if event was dequeued.
Expand All @@ -735,10 +799,26 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
if (p_key)
*p_key = key;

switch(pOv->operation)
{
case PJ_IOQUEUE_OP_RECV:
case PJ_IOQUEUE_OP_RECV_FROM:
case PJ_IOQUEUE_OP_SEND:
case PJ_IOQUEUE_OP_SEND_TO:
case PJ_IOQUEUE_OP_ACCEPT:
xpending_op = (struct pending_op *)((char *)pOv - offsetof(struct pending_op, pending_key));
op_key = xpending_op->op_key;
break;
default:
break;
}

#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* We shouldn't call callbacks if key is quitting. */
if (key->closing)
if (key->closing) {
release_pending_op(key, xpending_op);
return PJ_TRUE;
}

/* If concurrency is disabled, lock the key
* (and save the lock status to local var since app may change
Expand All @@ -755,6 +835,7 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
if (has_lock) {
pj_mutex_unlock(key->mutex);
}
release_pending_op(key, xpending_op);
return PJ_TRUE;
}

Expand All @@ -773,16 +854,14 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
case PJ_IOQUEUE_OP_RECV_FROM:
pOv->operation = 0;
if (key->cb.on_read_complete)
key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
size_status);
key->cb.on_read_complete(key, op_key, size_status);
break;
case PJ_IOQUEUE_OP_WRITE:
case PJ_IOQUEUE_OP_SEND:
case PJ_IOQUEUE_OP_SEND_TO:
pOv->operation = 0;
if (key->cb.on_write_complete)
key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
size_status);
key->cb.on_write_complete(key, op_key, size_status);
break;
#if PJ_HAS_TCP
case PJ_IOQUEUE_OP_ACCEPT:
Expand All @@ -802,9 +881,7 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
status = PJ_RETURN_OS_ERROR(dwError);
}

key->cb.on_accept_complete(key, (pj_ioqueue_op_key_t*)pOv,
newsock, status);

key->cb.on_accept_complete(key, op_key, newsock, status);
}
break;
case PJ_IOQUEUE_OP_CONNECT:
Expand All @@ -820,6 +897,7 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
pj_mutex_unlock(key->mutex);
#endif

release_pending_op(key, xpending_op);
return PJ_TRUE;
}

Expand Down Expand Up @@ -874,7 +952,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
#else
PJ_UNUSED_ARG(has_lock);
#endif


/* Cancel all penging I/O operations in order to free all pending memory */
cancel_all_pending_op(key);

/* Close handle (the only way to disassociate handle from IOCP).
* We also need to close handle to make sure that no further events
* will come to the handle.
Expand Down Expand Up @@ -1028,6 +1109,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
DWORD bytesRead;
DWORD dwFlags = 0;
union operation_key *op_key_rec;
struct pending_op *op;

PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
Expand Down Expand Up @@ -1062,6 +1144,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
}
}

op = alloc_pending_op(key, op_key);
memcpy(&op->pending_key, op_key_rec, sizeof(union operation_key));
op_key_rec = &op->pending_key;

dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);

/*
Expand All @@ -1079,6 +1165,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
DWORD dwStatus = WSAGetLastError();
if (dwStatus!=WSA_IO_PENDING) {
*length = -1;
release_pending_op(key, op);
return PJ_STATUS_FROM_OS(dwStatus);
}
}
Expand All @@ -1104,6 +1191,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
DWORD bytesRead;
DWORD dwFlags = 0;
union operation_key *op_key_rec;
struct pending_op *op;

PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
Expand Down Expand Up @@ -1138,6 +1226,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
}
}

op = alloc_pending_op(key, op_key);
memcpy(&op->pending_key, op_key_rec, sizeof(union operation_key));
op_key_rec = &op->pending_key;

dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);

/*
Expand All @@ -1155,10 +1247,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
DWORD dwStatus = WSAGetLastError();
if (dwStatus!=WSA_IO_PENDING) {
*length = -1;
release_pending_op(key, op);
return PJ_STATUS_FROM_OS(dwStatus);
}
}

/* Pending operation has been scheduled. */
return PJ_EPENDING;
}
Expand Down Expand Up @@ -1195,6 +1288,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
DWORD bytesWritten;
DWORD dwFlags;
union operation_key *op_key_rec;
struct pending_op *op;

PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
Expand Down Expand Up @@ -1231,6 +1325,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
}
}

op = alloc_pending_op(key, op_key);
memcpy(&op->pending_key, op_key_rec, sizeof(union operation_key));
op_key_rec = &op->pending_key;

dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);

/*
Expand All @@ -1246,8 +1344,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
&op_key_rec->overlapped.overlapped, NULL);
if (rc == SOCKET_ERROR) {
DWORD dwStatus = WSAGetLastError();
if (dwStatus!=WSA_IO_PENDING)
if (dwStatus!=WSA_IO_PENDING) {
release_pending_op(key, op);
return PJ_STATUS_FROM_OS(dwStatus);
}
}

/* Asynchronous operation successfully submitted. */
Expand All @@ -1273,6 +1373,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
pj_status_t status;
union operation_key *op_key_rec;
SOCKET sock;
struct pending_op *op;

PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
Expand Down Expand Up @@ -1323,12 +1424,15 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
* No connection is immediately available.
* Must schedule an asynchronous operation.
*/
op_key_rec = (union operation_key*)op_key->internal__;

op = alloc_pending_op(key, op_key);
op_key_rec = &op->pending_key;

status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0,
&op_key_rec->accept.newsock);
if (status != PJ_SUCCESS)
if (status != PJ_SUCCESS) {
release_pending_op(key, op);
return status;
}

op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
op_key_rec->accept.addrlen = addrlen;
Expand All @@ -1346,11 +1450,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,

if (rc == TRUE) {
ioqueue_on_accept_complete(key, &op_key_rec->accept);
release_pending_op(key, op);
return PJ_SUCCESS;
} else {
DWORD dwStatus = WSAGetLastError();
if (dwStatus!=WSA_IO_PENDING)
if (dwStatus!=WSA_IO_PENDING) {
release_pending_op(key, op);
return PJ_STATUS_FROM_OS(dwStatus);
}
}

/* Asynchronous Accept() has been submitted. */
Expand Down Expand Up @@ -1493,20 +1600,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,

PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
{
#if PJ_IOQUEUE_HAS_SAFE_UNREG
return pj_mutex_lock(key->mutex);
#else
PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
#endif
}

PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
{
#if PJ_IOQUEUE_HAS_SAFE_UNREG
return pj_mutex_unlock(key->mutex);
#else
PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
#endif
}

PJ_DEF(pj_oshandle_t) pj_ioqueue_get_os_handle( pj_ioqueue_t *ioqueue )
Expand Down
Loading