Skip to content

Commit

Permalink
PROTON-2818: Move epoll proctor connection logic to a task thread.
Browse files Browse the repository at this point in the history
  • Loading branch information
Cliff Jansen committed May 12, 2024
1 parent 2406d43 commit 36fe831
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 44 deletions.
1 change: 1 addition & 0 deletions c/src/proactor/epoll-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ typedef struct pconnection_t {
bool server; /* accept, not connect */
bool tick_pending;
bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
bool first_schedule;
pn_condition_t *disconnect_condition;
// Following values only changed by (sole) working task:
uint32_t current_arm; // active epoll io events
Expand Down
57 changes: 36 additions & 21 deletions c/src/proactor/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con
pc->wbuf_current = NULL;
pc->hog_count = 0;
pc->batch.next_event = pconnection_batch_next;
pc->first_schedule = false;

if (server) {
pn_transport_set_server(pc->driver.transport);
Expand Down Expand Up @@ -1122,6 +1123,7 @@ static void write_flush(pconnection_t *pc) {

static void pconnection_connected_lh(pconnection_t *pc);
static void pconnection_maybe_connect_lh(pconnection_t *pc);
static bool pconnection_first_connect_lh(pconnection_t *pc);

static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_ready, bool topup) {
bool waking = false;
Expand All @@ -1139,6 +1141,17 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
}
if (sched_ready) schedule_done(&pc->task);

if (pc->first_schedule) {
pc->first_schedule = false;
assert(!topup && !events);
if (!pc->queued_disconnect) {
if (pconnection_first_connect_lh(pc)) {
unlock(&pc->task.mutex);
return NULL;
}
}
}

if (topup) {
// Only called by the batch owner. Does not loop, just "tops up"
// once. May be back depending on hog_count.
Expand Down Expand Up @@ -1396,6 +1409,7 @@ static void pconnection_maybe_connect_lh(pconnection_t *pc) {

int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
{
// NOTE: getaddrinfo can block on DNS lookup (PROTON-2812).
struct addrinfo hints = { 0 };
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
Expand All @@ -1416,7 +1430,27 @@ bool schedule_if_inactive(pn_proactor_t *p) {
return false;
}

// Call from pconnection_process with task lock held.
// Return true if the socket is connecting and there are no Proton events to deliver.
static bool pconnection_first_connect_lh(pconnection_t *pc) {
unlock(&pc->task.mutex);
// TODO: move this step to a separate worker thread that scales in response to multiple blocking DNS lookups.
int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
lock(&pc->task.mutex);

if (!gai_error) {
pc->ai = pc->addrinfo;
pconnection_maybe_connect_lh(pc); /* Start connection attempts */
if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect && !pni_task_wake_pending(&pc->task))
return true;
} else {
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
}
return false;
}

void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
// Called from an arbitrary thread. Do setup prior to getaddrinfo, then switch to a worker thread.
size_t addrlen = strlen(addr);
pconnection_t *pc = (pconnection_t*) malloc(sizeof(pconnection_t)+addrlen);
assert(pc); // TODO: memory safety
Expand All @@ -1430,27 +1464,8 @@ void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *
lock(&pc->task.mutex);
proactor_add(&pc->task);
pn_connection_open(pc->driver.connection); /* Auto-open */

bool notify = false;

if (pc->disconnected) {
notify = schedule(&pc->task); /* Error during initialization */
} else {
int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
if (!gai_error) {
pn_connection_open(pc->driver.connection); /* Auto-open */
pc->ai = pc->addrinfo;
pconnection_maybe_connect_lh(pc); /* Start connection attempts */
if (pc->disconnected) notify = schedule(&pc->task);
} else {
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
notify = schedule(&pc->task);
lock(&p->task.mutex);
notify |= schedule_if_inactive(p);
unlock(&p->task.mutex);
}
}
/* We need to issue INACTIVE on immediate failure */
pc->first_schedule = true; // Resume connection setup when next scheduled.
bool notify = schedule(&pc->task);
unlock(&pc->task.mutex);
if (notify) notify_poller(p);
}
Expand Down
69 changes: 47 additions & 22 deletions c/src/proactor/epoll_raw_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ struct praw_connection_t {
bool disconnected;
bool hup_detected;
bool read_check;
bool first_schedule;
char *taddr;
};

static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
Expand Down Expand Up @@ -145,6 +147,8 @@ static void praw_connection_init(praw_connection_t *prc, pn_proactor_t *p, pn_ra

prc->connected = false;
prc->disconnected = false;
prc->first_schedule = false;
prc->taddr = NULL;
prc->batch.next_event = pni_raw_batch_next;

pmutex_init(&prc->rearm_mutex);
Expand All @@ -163,6 +167,7 @@ static void praw_connection_cleanup(praw_connection_t *prc) {
task_finalize(&prc->task);
if (prc->addrinfo)
freeaddrinfo(prc->addrinfo);
free(prc->taddr);
free(prc);
}
// else proactor_disconnect logic owns prc and its final free
Expand All @@ -177,39 +182,48 @@ pn_raw_connection_t *pn_raw_connection(void) {
return &conn->raw_connection;
}

void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {
assert(rc);
praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
praw_connection_init(prc, p, rc);
// TODO: check case of proactor shutting down

lock(&prc->task.mutex);
proactor_add(&prc->task);

bool notify = false;

// Call from pconnection_process with task lock held.
// Return true if the socket is connecting and there are no Proton events to deliver.
static bool praw_connection_first_connect_lh(praw_connection_t *prc) {
const char *host;
const char *port;
size_t addrlen = strlen(addr);
char *addr_buf = (char*) alloca(addrlen+1);
pni_parse_addr(addr, addr_buf, addrlen+1, &host, &port);

unlock(&prc->task.mutex);
size_t addrlen = strlen(prc->taddr);
char *addr_buf = (char*) alloca(addrlen+1);
pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port);
// TODO: move this step to a separate worker thread that scales in response to multiple blocking DNS lookups.
int gai_error = pgetaddrinfo(host, port, 0, &prc->addrinfo);
lock(&prc->task.mutex);

if (!gai_error) {
prc->ai = prc->addrinfo;
praw_connection_maybe_connect_lh(prc); /* Start connection attempts */
if (prc->disconnected) notify = schedule(&prc->task);
if (prc->psocket.epoll_io.fd != -1 && !pni_task_wake_pending(&prc->task))
return true;
} else {
psocket_gai_error(prc, gai_error, "connect to ", addr);
prc->disconnected = true;
notify = schedule(&prc->task);
lock(&p->task.mutex);
notify |= schedule_if_inactive(p);
unlock(&p->task.mutex);
psocket_gai_error(prc, gai_error, "connect to ", prc->taddr);
}
return false;
}

/* We need to issue INACTIVE on immediate failure */
void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {
// Called from an arbitrary thread. Do setup prior to getaddrinfo, then switch to a worker thread.
assert(rc);
praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
praw_connection_init(prc, p, rc);
// TODO: check case of proactor shutting down

lock(&prc->task.mutex);
size_t addrlen = strlen(addr);
prc->taddr = (char*) malloc(addrlen+1);
assert(prc->taddr); // TODO: memory safety
memcpy(prc->taddr, addr, addrlen+1);
prc->first_schedule = true; // Resume connection setup when next scheduled.
proactor_add(&prc->task);
bool notify = schedule(&prc->task);
unlock(&prc->task.mutex);

if (notify) notify_poller(p);
}

Expand Down Expand Up @@ -394,6 +408,16 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
}
int events = io_events;
int fd = rc->psocket.epoll_io.fd;

if (rc->first_schedule) {
rc->first_schedule = false;
assert(!events); // No socket yet.
assert(!rc->connected);
if (praw_connection_first_connect_lh(rc)) {
unlock(&rc->task.mutex);
return NULL;
}
}
if (!rc->connected) {
if (events & (EPOLLHUP | EPOLLERR)) {
praw_connection_maybe_connect_lh(rc);
Expand All @@ -413,6 +437,7 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
}
if (events & EPOLLOUT)
praw_connection_connected_lh(rc);

unlock(&rc->task.mutex);
return &rc->batch;
}
Expand Down
1 change: 0 additions & 1 deletion c/tests/raw_wake_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ TEST_CASE("proactor_raw_connection_wake") {
pn_proactor_raw_connect(pn_listener_proactor(l), rc, addr.c_str());


REQUIRE_RUN(p, PN_LISTENER_ACCEPT);
REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
CHECK(pn_proactor_get(p) == NULL); /* idle */
Expand Down

0 comments on commit 36fe831

Please sign in to comment.