Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: ipcc: Add an async connect API #450

Merged
merged 1 commit into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions include/qb/qbipcc.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,36 @@ typedef struct qb_ipcc_connection qb_ipcc_connection_t;
qb_ipcc_connection_t*
qb_ipcc_connect(const char *name, size_t max_msg_size);

/**
* Asynchronously connect to an IPC service
* @param name name of the service.
* @param max_msg_size biggest msg size.
* @param connect_fd return FD to continue connection with
* @return NULL (error: see errno) or a connection object.
*
* qb_ipcc_connect_async() returns a connection FD which
* should be used added to the application's mainloop - when it is
* active, qb_ipcc_connect_continue() should be called for the
* connection to be finalised.
* NOTE: This is NOT the same FD that is used for normal applicaion
* polling. qb_ipcc_fd_get() must still be called once the connection
* is established.
*/
qb_ipcc_connection_t *
qb_ipcc_connect_async(const char *name, size_t max_msg_size, int *connect_fd);

/**
* Finish up an asynchonous IPC connection
* @param c connection handle as returned from qb_ipcc_connect_async()
* @return 0 or -errno.
*
* Finishes up a connection that was initiated by qb_ipcc_connect_async(),
* this should only be called when the fd returned by qb_ipcc_connect_async()
* becomes active, usually as a callback in the application's main loop.
*/
int
qb_ipcc_connect_continue(struct qb_ipcc_connection * c);

/**
* Test kernel dgram socket buffers to verify the largest size up
* to the max_msg_size value a single msg can be. Rounds down to the
Expand Down
3 changes: 2 additions & 1 deletion lib/ipc_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ struct qb_ipcc_connection {
};

int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *r);
struct qb_ipc_connection_response *r);
int qb_ipcc_setup_connect_continue(struct qb_ipcc_connection *c, struct qb_ipc_connection_response *response);
ssize_t qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len);
ssize_t qb_ipc_us_recv(struct qb_ipc_one_way *one_way, void *msg, size_t len, int32_t timeout);
int32_t qb_ipc_us_ready(struct qb_ipc_one_way *ow_data, struct qb_ipc_one_way *ow_conn,
Expand Down
16 changes: 13 additions & 3 deletions lib/ipc_setup.c
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,7 @@ qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
{
int32_t res;
struct qb_ipc_connection_request request;
struct ipc_auth_data *data;
#ifdef QB_LINUX
int off = 0;
int on = 1;
#endif

Expand All @@ -471,13 +469,24 @@ qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
return res;
}

/* ... To be continued ... (when the FD is active) */
return 0;
}

/* Called from ipcc_connect_continue() when async connect socket is active */
int qb_ipcc_setup_connect_continue(struct qb_ipcc_connection *c, struct qb_ipc_connection_response *r)
{
struct ipc_auth_data *data;
int32_t res;
#ifdef QB_LINUX
int off = 0;
#endif
data = init_ipc_auth_data(c->setup.u.us.sock, sizeof(struct qb_ipc_connection_response));
if (data == NULL) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
return -ENOMEM;
}

qb_ipc_us_ready(&c->setup, NULL, -1, POLLIN);
res = qb_ipc_us_recv_msghdr(data);

#ifdef QB_LINUX
Expand All @@ -498,6 +507,7 @@ qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
c->server_pid = data->ugp.pid;

destroy_ipc_auth_data(data);

return r->hdr.error;
}

Expand Down
69 changes: 67 additions & 2 deletions lib/ipcc.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,70 @@ qb_ipcc_connect(const char *name, size_t max_msg_size)
if (res < 0) {
goto disconnect_and_cleanup;
}
qb_ipc_us_ready(&c->setup, NULL, -1, POLLIN);
res = qb_ipcc_connect_continue(c);
if (res != 0) {
/* qb_ipcc_connect_continue() has cleaned up for us */
errno = -res;
return NULL;
}

return c;

disconnect_and_cleanup:
if (c->setup.u.us.sock >= 0) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
}
free(c->receive_buf);
free(c);
errno = -res;
return NULL;
}

qb_ipcc_connection_t *
qb_ipcc_connect_async(const char *name, size_t max_msg_size, int *connect_fd)
{
int32_t res;
qb_ipcc_connection_t *c = NULL;
struct qb_ipc_connection_response response;

c = calloc(1, sizeof(struct qb_ipcc_connection));
if (c == NULL) {
return NULL;
}

c->setup.max_msg_size = QB_MAX(max_msg_size,
sizeof(struct qb_ipc_connection_response));
(void)strlcpy(c->name, name, NAME_MAX);
res = qb_ipcc_us_setup_connect(c, &response);
if (res < 0) {
goto disconnect_and_cleanup;
}

*connect_fd = c->setup.u.us.sock;
return c;

disconnect_and_cleanup:
if (c->setup.u.us.sock >= 0) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
}
free(c->receive_buf);
free(c);
errno = -res;
return NULL;
}

int qb_ipcc_connect_continue(struct qb_ipcc_connection * c)
{
struct qb_ipc_connection_response response;
int32_t res;

/* Finish up the authentication part */
res = qb_ipcc_setup_connect_continue(c, &response);
if (res != 0) {
goto disconnect_and_cleanup;
}

c->response.type = response.connection_type;
c->request.type = response.connection_type;
c->event.type = response.connection_type;
Expand Down Expand Up @@ -79,7 +143,7 @@ qb_ipcc_connect(const char *name, size_t max_msg_size)
goto disconnect_and_cleanup;
}
c->is_connected = QB_TRUE;
return c;
return 0;

disconnect_and_cleanup:
if (c->setup.u.us.sock >= 0) {
Expand All @@ -88,7 +152,8 @@ qb_ipcc_connect(const char *name, size_t max_msg_size)
free(c->receive_buf);
free(c);
errno = -res;
return NULL;
return -res;

}

static int32_t
Expand Down
80 changes: 80 additions & 0 deletions tests/check_ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,62 @@ send_and_check(int32_t req_id, uint32_t size,
return res;
}


static int32_t
process_async_connect(int32_t fd, int32_t revents, void *data)
{
qb_loop_t *cl = (qb_loop_t *)data;
int res;

res = qb_ipcc_connect_continue(conn);
ck_assert_int_eq(res, 0);
qb_loop_stop(cl);
return 0;
}
static void test_ipc_connect_async(void)
{
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header res_header;
int32_t res;
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
int connect_fd;
struct iovec iov[1];
static qb_loop_t *cl;

pid = run_function_in_new_process("server", run_ipc_server, NULL);
ck_assert(pid != -1);

conn = qb_ipcc_connect_async(ipc_name, max_size, &connect_fd);
ck_assert(conn != NULL);

cl = qb_loop_create();
res = qb_loop_poll_add(cl, QB_LOOP_MED,
connect_fd, POLLIN,
cl, process_async_connect);
ck_assert_int_eq(res, 0);
qb_loop_run(cl);

/* Send some data */
req_header.id = IPC_MSG_REQ_TX_RX;
req_header.size = sizeof(struct qb_ipc_request_header);

iov[0].iov_len = req_header.size;
iov[0].iov_base = &req_header;

res = qb_ipcc_sendv_recv(conn, iov, 1,
&res_header,
sizeof(struct qb_ipc_response_header), 5000);

ck_assert_int_ge(res, 0);

request_server_exit();
verify_graceful_stop(pid);


qb_ipcc_disconnect(conn);
}

static void
test_ipc_txrx_timeout(void)
{
Expand Down Expand Up @@ -1226,6 +1282,7 @@ START_TEST(test_ipc_txrx_shm_timeout)
}
END_TEST


START_TEST(test_ipc_txrx_us_timeout)
{
qb_enter();
Expand All @@ -1236,6 +1293,25 @@ START_TEST(test_ipc_txrx_us_timeout)
}
END_TEST

START_TEST(test_ipc_shm_connect_async)
{
qb_enter();
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
test_ipc_connect_async();
qb_leave();
}
END_TEST

START_TEST(test_ipc_us_connect_async)
{
qb_enter();
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
test_ipc_connect_async();
qb_leave();
}
END_TEST

START_TEST(test_ipc_txrx_shm_getauth)
{
Expand Down Expand Up @@ -2277,6 +2353,8 @@ make_shm_suite(void)
TCase *tc;
Suite *s = suite_create("shm");

add_tcase(s, tc, test_ipc_shm_connect_async, 7);

add_tcase(s, tc, test_ipc_txrx_shm_getauth, 7);
add_tcase(s, tc, test_ipc_txrx_shm_timeout, 28);
add_tcase(s, tc, test_ipc_server_fail_shm, 7);
Expand Down Expand Up @@ -2308,6 +2386,8 @@ make_soc_suite(void)
Suite *s = suite_create("socket");
TCase *tc;

add_tcase(s, tc, test_ipc_us_connect_async, 7);

add_tcase(s, tc, test_ipc_txrx_us_getauth, 7);
add_tcase(s, tc, test_ipc_txrx_us_timeout, 28);
/* Commented out for the moment as space in /dev/shm on the CI machines
Expand Down