Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow multiple values in both sarray_insert/remove #241

Merged
merged 2 commits into from
Aug 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/protocol/data/resp/cmd_sarray.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
ACTION( REQ_SARRAY_FIND, "SArray.find", 3, 0 )\
ACTION( REQ_SARRAY_GET, "SArray.get", 2, 2 )\
ACTION( REQ_SARRAY_INSERT, "SArray.insert", 3, -1 )\
ACTION( REQ_SARRAY_REMOVE, "SArray.remove", 3, 0 )\
ACTION( REQ_SARRAY_REMOVE, "SArray.remove", 3, -1 )\
ACTION( REQ_SARRAY_TRUNCATE, "SArray.truncate", 3, 0 )

typedef enum sarray_elem {
Expand Down
208 changes: 127 additions & 81 deletions src/server/rds/data/cmd_sarray.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
#include <cc_debug.h>
#include <cc_mm.h>

/* TODO(yao): make MAX_NVAL configurable */
#define MAX_NVAL 255 /* max no. of values to insert/remove in one request */
static uint64_t vals[MAX_NVAL];


static inline struct item *
_add_key(struct response *rsp, struct bstring *key)
Expand Down Expand Up @@ -180,11 +184,7 @@ cmd_sarray_len(struct response *rsp, const struct request *req,
}

nentry = sarray_nentry((sarray_p)item_data(it));

rsp->type = reply->type = ELEM_INT;
reply->num = (int64_t)nentry;
log_verb("command '%.*s' '%.*s' succeeded, sarray length %u",
cmd->bstr.len, cmd->bstr.data, key->len, key->data, nentry);
compose_rsp_numeric(rsp, reply, cmd, key, (int64_t)nentry);
}

void
Expand Down Expand Up @@ -257,11 +257,14 @@ cmd_sarray_get(struct response *rsp, const struct request *req, const struct
struct element *reply = (struct element *)array_push(rsp->token);
struct bstring *key;
struct item *it;
int64_t idx;
int64_t idx = 0, cnt = 1;
uint64_t val;
uint32_t narg, nentry, nreturned = 0;
int32_t incr;
sarray_rstatus_e status;

ASSERT(array_nelem(req->token) == cmd->narg);
narg = array_nelem(req->token);
ASSERT(narg >= cmd->narg);

INCR(process_metrics, sarray_get);

Expand All @@ -271,12 +274,6 @@ cmd_sarray_get(struct response *rsp, const struct request *req, const struct

return;
}
if (!req_get_int(&idx, req, SARRAY_IDX)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_get_ex);

return;
}

it = item_get(key);
if (it == NULL) {
Expand All @@ -286,26 +283,55 @@ cmd_sarray_get(struct response *rsp, const struct request *req, const struct
return;
}

status = sarray_value(&val, (sarray_p)item_data(it), (uint32_t)idx);
switch (status) {
case SARRAY_OK:
rsp->type = reply->type = ELEM_INT;
reply->num = (int64_t)val;
log_verb("command '%.*s' '%.*s' succeeded, index %"PRIu32" has value "
PRIu64, cmd->bstr.len, cmd->bstr.data, key->len, key->data, idx,
val);
INCR(process_metrics, sarray_get_ok);
nentry = sarray_nentry((sarray_p)item_data(it));

break;
case SARRAY_EOOB:
compose_rsp_oob(rsp, reply, cmd, key, idx);
INCR(process_metrics, sarray_get_oob);
if (narg > cmd->narg && !req_get_int(&idx, req, SARRAY_IDX)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_get_ex);

break;
default:
compose_rsp_server_err(rsp, reply, cmd, key);
NOT_REACHED();
return;
}

if (idx < 0) {
idx += nentry;
if (idx < 0) {
idx = 0;
}
} else {
if (idx > nentry) {
idx = nentry;
}
}

if (narg > cmd->narg + 1 && !req_get_int(&cnt, req, SARRAY_ICNT)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_get_ex);

return;
}
/* cnt < 0 means return in reverse order */
if (cnt > 0) {
incr = 1;
nreturned = MIN(nentry - idx, cnt);
} else {
incr = -1;
nreturned = MIN(idx + 1, -cnt);
}

/* write the array header */
rsp->type = ELEM_ARRAY;
for (; nreturned > 0; nreturned--, idx += incr) {
status = sarray_value(&val, (sarray_p)item_data(it), (uint32_t)idx);
ASSERT(status == SARRAY_OK);
reply->type = ELEM_INT;
reply->num = val;
reply = (struct element *)array_push(rsp->token);
}
array_pop(rsp->token);

INCR(process_metrics, sarray_get_ok);
log_verb("command '%.*s' '%.*s' succeeded, returning %"PRIu32" elements",
cmd->bstr.len, cmd->bstr.data, key->len, key->data, array_nelem(rsp->token));
}

void
Expand All @@ -315,12 +341,12 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct
struct element *reply = (struct element *)array_push(rsp->token);
struct bstring *key;
struct item *it;
uint32_t delta = 0;
uint32_t nval = 0, ninserted = 0, delta;
int64_t val;
sarray_p sa;
sarray_rstatus_e status;

ASSERT(array_nelem(req->token) == cmd->narg);
ASSERT(array_nelem(req->token) >= cmd->narg);

INCR(process_metrics, sarray_insert);

Expand All @@ -330,12 +356,6 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct

return;
}
if (!req_get_int(&val, req, SARRAY_VAL)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_ex);

return;
}

it = item_get(key);
if (it == NULL) {
Expand All @@ -345,7 +365,19 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct
return;
}

delta = sarray_esize((sarray_p)item_data(it));
/* parse and store all values to be inserted in array vals */
for (uint32_t i = SARRAY_VAL; i < array_nelem(req->token); ++i, ++nval) {
if (!req_get_int(&val, req, i)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_ex);

return;
} else {
vals[nval] = (uint64_t)val;
}
}

delta = sarray_esize((sarray_p)item_data(it)) * nval;
if (_realloc_key(&it, key, delta) != ITEM_OK) {
compose_rsp_storage_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_ex);
Expand All @@ -354,23 +386,24 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct
}

sa = (sarray_p)item_data(it);
status = sarray_insert(sa, (uint64_t)val);

if (status == SARRAY_EINVALID) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_ex);

return;
}
if (status == SARRAY_EDUP) {
compose_rsp_noop(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_noop);
for (uint32_t i = 0; i < nval; ++i) {
status = sarray_insert(sa, vals[i]);
if (status == SARRAY_EINVALID) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_ex);
return;
}

return;
if (status == SARRAY_EDUP) {
compose_rsp_noop(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_noop);
} else {
INCR(process_metrics, sarray_insert_ok);
ninserted++;
}
}

compose_rsp_ok(rsp, reply, cmd, key);
INCR(process_metrics, sarray_insert_ok);
compose_rsp_numeric(rsp, reply, cmd, key, (int64_t)ninserted);
}

void
Expand All @@ -380,6 +413,7 @@ cmd_sarray_remove(struct response *rsp, const struct request *req, const struct
struct element *reply = (struct element *)array_push(rsp->token);
struct bstring *key;
struct item *it;
uint32_t nval = 0, nremoved = 0;
int64_t val;
sarray_p sa;
sarray_rstatus_e status;
Expand All @@ -394,12 +428,6 @@ cmd_sarray_remove(struct response *rsp, const struct request *req, const struct

return;
}
if (!req_get_int(&val, req, SARRAY_VAL)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_ex);

return;
}

it = item_get(key);
if (it == NULL) {
Expand All @@ -409,31 +437,49 @@ cmd_sarray_remove(struct response *rsp, const struct request *req, const struct
return;
}

sa = (sarray_p)item_data(it);
status = sarray_remove(sa, val);
/* parse and store all values to be inserted in array vals */
for (uint32_t i = SARRAY_VAL; i < array_nelem(req->token); ++i) {
if (!req_get_int(&val, req, i)) {
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_ex);

switch (status) {
case SARRAY_OK:
/* TODO: should we try to "fit" to a smaller item here? */
compose_rsp_ok(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_ok);

break;
case SARRAY_ENOTFOUND:
compose_rsp_noop(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_noop);

break;
case SARRAY_EINVALID:
/* client error, bad argument */
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_ex);
return;
} else {
vals[nval] = (uint64_t)val;
nval++;
}
}
/* TODO: should we try to "fit" to a smaller item here? */

break;
default:
compose_rsp_server_err(rsp, reply, cmd, key);
NOT_REACHED();
sa = (sarray_p)item_data(it);
for (uint32_t i = 0; i < nval; ++i) {
status = sarray_remove(sa, vals[i]);
switch (status) {
case SARRAY_OK:
nremoved++;
INCR(process_metrics, sarray_remove_ok);

break;
case SARRAY_ENOTFOUND:
compose_rsp_noop(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_noop);

break;
case SARRAY_EINVALID:
/* client error, bad argument */
compose_rsp_client_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_ex);

return;
default:
compose_rsp_server_err(rsp, reply, cmd, key);
INCR(process_metrics, sarray_remove_ex);
NOT_REACHED();
return;
}
}

compose_rsp_numeric(rsp, reply, cmd, key, (int64_t)nremoved);
}

void
Expand Down
2 changes: 1 addition & 1 deletion src/server/rds/data/cmd_sarray.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

/* name type description */
#define PROCESS_SARRAY_METRIC(ACTION) \
#define PROCESS_SARRAY_METRIC(ACTION) \
ACTION( sarray_create, METRIC_COUNTER, "# sarray create requests" )\
ACTION( sarray_create_exist, METRIC_COUNTER, "# sarray already exist" )\
ACTION( sarray_create_ok, METRIC_COUNTER, "# sarray stored" )\
Expand Down
10 changes: 10 additions & 0 deletions src/server/rds/data/shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ compose_rsp_noop(struct response *rsp, struct element *reply,
cmd->bstr.len, cmd->bstr.data, key->len, key->data);
}

static inline void
compose_rsp_numeric(struct response *rsp, struct element *reply,
const struct command *cmd, const struct bstring *key, int64_t num)
{
rsp->type = reply->type = ELEM_INT;
reply->num = num;
log_verb("command '%.*s' '%.*s' succeeded, response value is %"PRIi64,
cmd->bstr.len, cmd->bstr.data, key->len, key->data, num);
}

static inline void
compose_rsp_client_err(struct response *rsp, struct element *reply,
const struct command *cmd, const struct bstring *key)
Expand Down