Skip to content

Commit

Permalink
make rsp object part of the processing state
Browse files Browse the repository at this point in the history
  • Loading branch information
Yao Yue committed Jan 28, 2017
1 parent 274df8f commit 2d6869f
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 26 deletions.
4 changes: 2 additions & 2 deletions config/twemcache_test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
#slab_profile: 1024 2048 4096 8192 16384 32768 65536 131072 262144 524288

debug_log_level: 6
debug_log_file: twemcache.log
debug_log_nbuf: 16384
# debug_log_file: twemcache.log
# debug_log_nbuf: 16384

klog_file: twemcache.cmd
klog_backup: twemcache.cmd.old
Expand Down
1 change: 1 addition & 0 deletions src/protocol/data/memcache/response.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ response_return_all(struct response **response)

struct response *nr, *rsp = *response;

nr = STAILQ_NEXT(rsp, next);
while (rsp != NULL) {
nr = STAILQ_NEXT(rsp, next);
response_return(&rsp);
Expand Down
94 changes: 70 additions & 24 deletions src/server/twemcache/data/process.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ typedef enum put_rstatus {
PUT_ERROR,
} put_rstatus_t;

/* the data pointer in the process functions is of type `struct data **' */
struct data {
struct request *req;
struct response *rsp;
};

static bool process_init = false;
static process_metrics_st *process_metrics = NULL;
static bool allow_flush = ALLOW_FLUSH;
Expand Down Expand Up @@ -596,36 +602,65 @@ process_request(struct response *rsp, struct request *req)
}

static inline void
_cleanup(struct request *req, struct response **rsp)
_cleanup(struct request *req, struct response *rsp)
{
struct response *nr = STAILQ_NEXT(rsp, next);

/* reset req, return rsp: this is likely to change when partial write needs
* to be supported
*/
request_reset(req);
response_return_all(rsp);
/* return all but the first response */
if (nr != NULL) {
response_return_all(&nr);
}
response_reset(rsp);
}

static inline struct data *
_data_create(void)
{
struct data *data;
struct request *req;
struct response *rsp;

req = request_borrow();
rsp = response_borrow();
data = cc_alloc(sizeof(struct data));

if (req == NULL || rsp == NULL || data == NULL) {
request_return(&req);
response_return(&rsp);
cc_free(data); /* cc_free(data) is a macro that sets data to NULL */
} else {
data->req = req;
data->rsp = rsp;
}

return data;
}

int
twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
{
parse_rstatus_t status;
struct data *state;
struct request *req;
struct response *rsp;

log_verb("post-read processing");

if (*data == NULL) { /* *data is a request pointer, one per sock_buf */
if ((*data = request_borrow()) == NULL) {
/* deal with the stateful part: request and response */
if (*data == NULL) {
if ((*data = _data_create()) == NULL) {
/* TODO(yao): simply return for now, better to respond with OOM */
log_error("cannot acquire request: OOM");
log_error("cannot process request: OOM");
INCR(process_metrics, process_ex);

return -1;
}
}
state = (struct data *)*data;
req = state->req;
rsp = state->rsp;

req = *data;
/* keep parse-process-compose until running out of data in rbuf */
while (buf_rsize(*rbuf) > 0) {
struct response *nr;
Expand All @@ -649,6 +684,10 @@ twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
return -1;
}

if (req->swallow) { /* skip to the end of current request */
continue;
}

/* stage 2: processing- check for quit, allocate response(s), process */

/* quit is special, no response expected */
Expand All @@ -658,56 +697,54 @@ twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
}

/* find cardinality of the request and get enough response objects */
card = array_nelem(req->keys);
card = array_nelem(req->keys) - 1; /* we already have one in rsp */
if (req->type == REQ_GET || req->type == REQ_GETS) {
/* extra response object for the "END" line after values */
card++;
}
for (i = 0, rsp = response_borrow(), nr = rsp;
for (i = 0, nr = rsp;
i < card;
i++, STAILQ_NEXT(nr, next) = response_borrow(), nr =
STAILQ_NEXT(nr, next)) {
if (nr == NULL) {
log_error("cannot acquire response: OOM");
INCR(process_metrics, process_ex);
_cleanup(req, &rsp);
_cleanup(req, rsp);
return -1;
}
}

/* actual processing & command logging */
if (req->swallow) { /* skip to the end of current request */
continue;
}

/* actual processing */
process_request(rsp, req);
if (req->partial) { /* implies end of rbuf w/o complete processing */
/* in this case, do not attempt to log or write response */
buf_lshift(*rbuf);
return 0;
}

klog_write(req, rsp);

/* stage 3: write response(s) if necessary */

/* noreply means no need to write to buffers */
card++;
if (!req->noreply) {
nr = rsp;
if (req->type == REQ_GET || req->type == REQ_GETS) {
/* for get/gets, card is determined by number of values */
card = req->nfound + 1;
} /* no need to update for other req types- card remains 1 */
}
for (i = 0; i < card; nr = STAILQ_NEXT(nr, next), ++i) {
if (compose_rsp(wbuf, nr) < 0) {
log_error("composing rsp erred");
INCR(process_metrics, process_ex);
_cleanup(req, &rsp);
_cleanup(req, rsp);
return -1;
}
}
}

_cleanup(req, &rsp);
/* logging, clean-up */
klog_write(req, rsp);
_cleanup(req, rsp);
}

return 0;
Expand All @@ -719,6 +756,8 @@ twemcache_process_write(struct buf **rbuf, struct buf **wbuf, void **data)
{
log_verb("post-write processing");

buf_lshift(*rbuf);
buf_lshift(*wbuf);
dbuf_shrink(rbuf);
dbuf_shrink(wbuf);

Expand All @@ -729,6 +768,10 @@ twemcache_process_write(struct buf **rbuf, struct buf **wbuf, void **data)
int
twemcache_process_error(struct buf **rbuf, struct buf **wbuf, void **data)
{
struct data *state = (struct data *)*data;
struct request *req;
struct response *rsp;

log_verb("post-error processing");

/* normalize buffer size */
Expand All @@ -739,11 +782,14 @@ twemcache_process_error(struct buf **rbuf, struct buf **wbuf, void **data)

/* release request data & associated reserved data */
if (*data != NULL) {
struct request *req = (struct request *)*data;
req = state->req;
rsp = state->rsp;
if (req->reserved != NULL) {
item_release((struct item **)&req->reserved);
}
request_return((struct request **)data);
request_return(&req);
response_return_all(&rsp);
cc_free(state);
}

return 0;
Expand Down

0 comments on commit 2d6869f

Please sign in to comment.