Skip to content
This repository has been archived by the owner on Aug 19, 2021. It is now read-only.

Commit

Permalink
Moved to segmented dispatch
Browse files Browse the repository at this point in the history
Before, the dispatch function iterated over the events, calling
the callbacks as it removed the events from the queue. Now the
dispatch function dequeues all pending events before dispatching
callbacks.

This removes the need for a break event, allows better cache
coherency, and results in a better organization of the dispatch
function.

The only downsides are a longer startup in dispatch and increased
jitter, although the latter could be fixed by adding fabricated
unlock points during the linked-list traversal.

Notable performance impact (make prof):
equeue_dispatch_prof: 466 cycles (+25%)
equeue_alloc_many_prof: 79 cycles (-12%)
equeue_post_many_prof: 7681 cycles (+33%)
equeue_post_future_many_prof: 7612 cycles (+35%)
equeue_dispatch_many_prof: 8353 cycles (+65%)

It's interesting to note there is a decrease in performance for
the alloc_many test, this may just be because moving the equeue
members around caused the slab info to cross a cache boundary.
The alloc_many tests is already very fast (~3* the time for
mutex lock).
  • Loading branch information
geky committed Jul 30, 2016
1 parent 7937a76 commit c4d4839
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 47 deletions.
136 changes: 91 additions & 45 deletions equeue.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) {
q->slab.data = buffer;

q->queue = 0;
q->break_ = (struct equeue_event){
.id = 0,
.period = -1,
};
q->breaks = 0;

int err;
err = equeue_sema_create(&q->eventsema);
Expand Down Expand Up @@ -172,7 +169,7 @@ static void equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned ms) {
while (*p && equeue_tickdiff((*p)->target, e->target) <= 0) {
p = &(*p)->next;
}

e->ref = p;
e->next = *p;
if (*p) {
Expand All @@ -181,102 +178,151 @@ static void equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned ms) {
*p = e;
}

static void equeue_dequeue(equeue_t *q, struct equeue_event *e) {
static void equeue_unqueue(equeue_t *q, struct equeue_event *e) {
if (e->next) {
e->next->ref = e->ref;
}
*e->ref = e->next;
}

static int equeue_post_in(equeue_t *q, struct equeue_event *e, int ms) {
static struct equeue_event *equeue_dequeue(
equeue_t *q, unsigned target, int *deadline) {
struct equeue_event *head = q->queue;
if (!head || equeue_tickdiff(head->target, target) > 0) {
return 0;
}

struct equeue_event **p = &q->queue;
while (*p) {
int diff = equeue_tickdiff((*p)->target, target);
if (diff > 0) {
*deadline = diff;
break;
}

p = &(*p)->next;
}

if (*p) {
(*p)->ref = &q->queue;
}
q->queue = *p;
*p = 0;

return head;
}

int equeue_post(equeue_t *q, void (*cb)(void*), void *p) {
struct equeue_event *e = (struct equeue_event*)p - 1;
int id = (e->id << q->npw2) | ((unsigned char *)e - q->buffer);
if (ms < 0) {
e->cb = cb;

if (e->target < 0) {
equeue_dealloc(q, e+1);
return id;
}

equeue_mutex_lock(&q->queuelock);
equeue_enqueue(q, e, ms);
equeue_enqueue(q, e, e->target);
equeue_mutex_unlock(&q->queuelock);

equeue_sema_release(&q->eventsema);
return id;
}

int equeue_post(equeue_t *q, void (*cb)(void*), void *p) {
struct equeue_event *e = (struct equeue_event*)p - 1;
e->cb = cb;
int id = equeue_post_in(q, e, e->target);
return id;
}

void equeue_cancel(equeue_t *q, int id) {
struct equeue_event *e = (struct equeue_event *)
&q->buffer[id & ((1 << q->npw2)-1)];

equeue_mutex_lock(&q->queuelock);
if (e->id == -id >> q->npw2) {
e->cb = 0;
e->period = -1;
}

if (e->id != id >> q->npw2) {
equeue_mutex_unlock(&q->queuelock);
return;
}

equeue_dequeue(q, e);
equeue_unqueue(q, e);
equeue_mutex_unlock(&q->queuelock);

equeue_dealloc(q, e+1);
}

void equeue_break(equeue_t *q) {
equeue_post_in(q, &q->break_, 0);
equeue_mutex_lock(&q->queuelock);
q->breaks++;
equeue_mutex_unlock(&q->queuelock);
equeue_sema_release(&q->eventsema);
}

void equeue_dispatch(equeue_t *q, int ms) {
if (ms >= 0) {
equeue_post_in(q, &q->break_, ms);
}
unsigned timeout = equeue_tick() + ms;

while (1) {
// collect all the available events and next deadline
struct equeue_event *es = 0;
int deadline = -1;

while (q->queue) {
deadline = -1;

if (q->queue) {
equeue_mutex_lock(&q->queuelock);
if (!q->queue) {
equeue_mutex_unlock(&q->queuelock);
break;
}
es = equeue_dequeue(q, equeue_tick(), &deadline);

deadline = equeue_tickdiff(q->queue->target, equeue_tick());
if (deadline > 0) {
equeue_mutex_unlock(&q->queuelock);
break;
// mark events as in-flight
for (struct equeue_event *e = es; e; e = e->next) {
e->id = -e->id;
}
equeue_mutex_unlock(&q->queuelock);
}

struct equeue_event *e = q->queue;
e->id += 1;
q->queue = e->next;
// dispatch events
while (es) {
struct equeue_event *e = es;
es = e->next;

// actually dispatch the callbacks
void (*cb)(void *) = e->cb;
if (cb) {
cb(e + 1);
}

// undirty the id and either dealloc or reenqueue periodic events
e->id = -e->id;
if (e->period >= 0) {
// requeue periodic tasks to avoid race conditions
// in equeue_cancel
equeue_mutex_lock(&q->queuelock);
equeue_enqueue(q, e, e->period);
equeue_mutex_unlock(&q->queuelock);
} else {
equeue_dealloc(q, e+1);
}
equeue_mutex_unlock(&q->queuelock);
}

if (e == &q->break_) {
// check if we should stop dispatching soon
if (ms >= 0) {
int diff = equeue_tickdiff(timeout, equeue_tick());
if (diff <= 0) {
return;
}

// actually dispatch the callback
e->cb(e + 1);

if (e->period < 0) {
equeue_dealloc(q, e+1);
if (deadline < 0 || diff < deadline) {
deadline = diff;
}
}

// wait for events
equeue_sema_wait(&q->eventsema, deadline);

// check if we were notified to break out of dispatch
if (q->breaks) {
equeue_mutex_lock(&q->queuelock);
if (q->breaks > 0) {
q->breaks--;
equeue_mutex_unlock(&q->queuelock);
return;
}
equeue_mutex_unlock(&q->queuelock);
}
}
}

Expand Down
3 changes: 1 addition & 2 deletions equeue.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct equeue_event {

typedef struct equeue {
struct equeue_event *queue;
int breaks;

unsigned char *buffer;
unsigned npw2;
Expand All @@ -56,8 +57,6 @@ typedef struct equeue {
unsigned char *data;
} slab;

struct equeue_event break_;

equeue_sema_t eventsema;
equeue_mutex_t queuelock;
equeue_mutex_t memlock;
Expand Down

0 comments on commit c4d4839

Please sign in to comment.