diff --git a/CMakeLists.txt b/CMakeLists.txt index 10dc97edc..86fe26eda 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,7 @@ option(HAVE_ASSERT_PANIC "assert_panic disabled by default" OFF) option(HAVE_LOGGING "logging enabled by default" ON) option(HAVE_STATS "stats enabled by default" ON) option(TARGET_PINGSERVER "build pingserver binary" ON) +option(TARGET_SLIMREDIS "build slimredis binary" ON) option(TARGET_SLIMCACHE "build slimcache binary" ON) option(TARGET_TWEMCACHE "build twemcache binary" ON) option(COVERAGE "code coverage" OFF) diff --git a/config/test.conf b/config/test.conf new file mode 100644 index 000000000..43f2c5473 --- /dev/null +++ b/config/test.conf @@ -0,0 +1 @@ +debug_log_level: 6 diff --git a/config/twemcache_test.conf b/config/twemcache_test.conf deleted file mode 100644 index 3744290e7..000000000 --- a/config/twemcache_test.conf +++ /dev/null @@ -1,12 +0,0 @@ -# if slab profile is specified, then the profile wil be explicitly set -# otherwise, slab profile will automatically be generated by using growth factor, slab size, and chunk size -#slab_profile: 1024 2048 4096 8192 16384 32768 65536 131072 262144 524288 - -debug_log_level: 6 -# debug_log_file: twemcache.log -# debug_log_nbuf: 16384 - -klog_file: twemcache.cmd -klog_backup: twemcache.cmd.old -klog_sample: 1 -klog_max: 100 diff --git a/src/protocol/data/redis/cmd_hash.h b/src/protocol/data/redis/cmd_hash.h index a5c762c44..d98d85875 100644 --- a/src/protocol/data/redis/cmd_hash.h +++ b/src/protocol/data/redis/cmd_hash.h @@ -1,28 +1,23 @@ #pragma once -/* - * Note: negative # of arguments means variable number of arguments: - * e.g. `-2' means at least two arguments. This notation is inherited from - * the original Redis server implementation. - */ /* type string # of args */ #define REQ_HASH(ACTION) \ - ACTION( REQ_HDEL, "hdel", -3 )\ - ACTION( REQ_HDELALL, "hdelall", 2 )\ - ACTION( REQ_HEXISTS, "hexists", 3 )\ - ACTION( REQ_HGET, "hget", 3 )\ - ACTION( REQ_HGETALL, "hgetall", 2 )\ - ACTION( REQ_HINCRBY, "hincrby", 4 )\ - ACTION( REQ_HINCRBYFLOAT, "hincrbyfloat", 4 )\ - ACTION( REQ_HKEYS, "hkeys", 2 )\ - ACTION( REQ_HLEN, "hlen", 2 )\ - ACTION( REQ_HMGET, "hmget", -3 )\ - ACTION( REQ_HMSET, "hmset", -4 )\ - ACTION( REQ_HSET, "hset", 4 )\ - ACTION( REQ_HSETNX, "hsetnx", 4 )\ - ACTION( REQ_HSTRLEN, "hstrlen", 3 )\ - ACTION( REQ_HVALS, "hvals", 2 )\ - ACTION( REQ_HSCAN, "hscan", -3 ) + ACTION( REQ_HDEL, "hdel", 3, -1 )\ + ACTION( REQ_HDELALL, "hdelall", 2, 0 )\ + ACTION( REQ_HEXISTS, "hexists", 3, 0 )\ + ACTION( REQ_HGET, "hget", 3, 0 )\ + ACTION( REQ_HGETALL, "hgetall", 2, 0 )\ + ACTION( REQ_HINCRBY, "hincrby", 4, 0 )\ + ACTION( REQ_HINCRBYFLOAT, "hincrbyfloat", 4, 0 )\ + ACTION( REQ_HKEYS, "hkeys", 2, 0 )\ + ACTION( REQ_HLEN, "hlen", 2, 0 )\ + ACTION( REQ_HMGET, "hmget", 3, -1 )\ + ACTION( REQ_HMSET, "hmset", 4, -1 )\ + ACTION( REQ_HSET, "hset", 4, 0 )\ + ACTION( REQ_HSETNX, "hsetnx", 4, 0 )\ + ACTION( REQ_HSTRLEN, "hstrlen", 3, 0 )\ + ACTION( REQ_HVALS, "hvals", 2, 0 )\ + ACTION( REQ_HSCAN, "hscan", 3, 0 ) /* "hlen KEY" == "*2\r\n$4\r\nhlen\r\n$3\r\nKEY\r\n" */ diff --git a/src/protocol/data/redis/cmd_misc.h b/src/protocol/data/redis/cmd_misc.h index 44b94265d..1bff28c1b 100644 --- a/src/protocol/data/redis/cmd_misc.h +++ b/src/protocol/data/redis/cmd_misc.h @@ -1,6 +1,7 @@ #pragma once -/* type string # of args */ -#define REQ_MISC(ACTION) \ - ACTION( REQ_PING, "ping", -1 )\ - ACTION( REQ_QUIT, "quit", 1 ) +/* type string #arg#opt */ +#define REQ_MISC(ACTION) \ + ACTION( REQ_FLUSHALL, "flushall", 1, 0 )\ + ACTION( REQ_PING, "ping", 1, 1 )\ + ACTION( REQ_QUIT, "quit", 1, 0 ) diff --git a/src/protocol/data/redis/cmd_zset.h b/src/protocol/data/redis/cmd_zset.h index 772069bc6..90004b6ad 100644 --- a/src/protocol/data/redis/cmd_zset.h +++ b/src/protocol/data/redis/cmd_zset.h @@ -1,25 +1,25 @@ #pragma once /* type string # of args */ -#define REQ_ZSET(ACTION) \ - ACTION( REQ_ZADD, "zadd", -4 )\ - ACTION( REQ_ZINCRBY, "zincrby", 4 )\ - ACTION( REQ_ZREM, "zrem", -3 )\ - ACTION( REQ_ZREMRANGEBYSCORE, "zremrangebyscore", 4 )\ - ACTION( REQ_ZREMRANGEBYRANK, "zremrangebyrank", 4 )\ - ACTION( REQ_ZREMRANGEBYLEX, "zremrangebylex", 4 )\ - ACTION( REQ_ZUNIONSTORE, "zunionstore", -4 )\ - ACTION( REQ_ZINTERSTORE, "zinterstore", -4 )\ - ACTION( REQ_ZRANGE, "zrange", -4 )\ - ACTION( REQ_ZRANGEBYSCORE, "zrangebyscore", -4 )\ - ACTION( REQ_ZREVRANGEBYSCORE, "zrevrangebyscore", -4 )\ - ACTION( REQ_ZRANGEBYLEX, "zrangebylex", -4 )\ - ACTION( REQ_ZREVRANGEBYLEX, "zrevrangebylex", -4 )\ - ACTION( REQ_ZCOUNT, "zcount", 4 )\ - ACTION( REQ_ZLEXCOUNT, "zlexcount", 4 )\ - ACTION( REQ_ZREVRANGE, "zrevrange", -4 )\ - ACTION( REQ_ZCARD, "zcard", 2 )\ - ACTION( REQ_ZSCORE, "zscore", 3 )\ - ACTION( REQ_ZRANK, "zrank", 3 )\ - ACTION( REQ_ZREVRANK, "zrevrank", 3 )\ - ACTION( REQ_ZSCAN, "zscan", -3 ) +#define REQ_ZSET(ACTION) \ + ACTION( REQ_ZADD, "zadd", 4, -1 )\ + ACTION( REQ_ZINCRBY, "zincrby", 4, 0 )\ + ACTION( REQ_ZREM, "zrem", 3, -1 )\ + ACTION( REQ_ZREMRANGEBYSCORE, "zremrangebyscore", 4, 0 )\ + ACTION( REQ_ZREMRANGEBYRANK, "zremrangebyrank", 4, 0 )\ + ACTION( REQ_ZREMRANGEBYLEX, "zremrangebylex", 4, 0 )\ + ACTION( REQ_ZUNIONSTORE, "zunionstore", 4, -1 )\ + ACTION( REQ_ZINTERSTORE, "zinterstore", 4, -1 )\ + ACTION( REQ_ZRANGE, "zrange", 4, -1 )\ + ACTION( REQ_ZRANGEBYSCORE, "zrangebyscore", 4, -1 )\ + ACTION( REQ_ZREVRANGEBYSCORE, "zrevrangebyscore", 4, -1 )\ + ACTION( REQ_ZRANGEBYLEX, "zrangebylex", 4, -1 )\ + ACTION( REQ_ZREVRANGEBYLEX, "zrevrangebylex", 4, -1 )\ + ACTION( REQ_ZCOUNT, "zcount", 4, 0 )\ + ACTION( REQ_ZLEXCOUNT, "zlexcount", 4, 0 )\ + ACTION( REQ_ZREVRANGE, "zrevrange", 4, -1 )\ + ACTION( REQ_ZCARD, "zcard", 2, 0 )\ + ACTION( REQ_ZSCORE, "zscore", 3, 0 )\ + ACTION( REQ_ZRANK, "zrank", 3, 0 )\ + ACTION( REQ_ZREVRANK, "zrevrank", 3, 0 )\ + ACTION( REQ_ZSCAN, "zscan", 3, -1 ) diff --git a/src/protocol/data/redis/compose.h b/src/protocol/data/redis/compose.h index 0b15f8c42..03000b9e0 100644 --- a/src/protocol/data/redis/compose.h +++ b/src/protocol/data/redis/compose.h @@ -1,6 +1,7 @@ #pragma once -#include +#include "token.h" + #include #include @@ -24,13 +25,6 @@ typedef struct { COMPOSE_RSP_METRIC(METRIC_DECLARE) } compose_rsp_metrics_st; -typedef enum compose_rstatus { - COMPOSE_OK = 0, - COMPOSE_EUNFIN = -1, - COMPOSE_ENOMEM = -2, - COMPOSE_EINVALID = -3, - COMPOSE_EOTHER = -4, -} compose_rstatus_t; struct request; struct response; diff --git a/src/protocol/data/redis/parse.c b/src/protocol/data/redis/parse.c index 28b52f54c..c1b3fdc85 100644 --- a/src/protocol/data/redis/parse.c +++ b/src/protocol/data/redis/parse.c @@ -72,9 +72,9 @@ _parse_cmd(struct request *req) /* check narg */ cmd = command_table[type]; narg = req->token->nelem; - if ((cmd.narg >= 0 && cmd.narg != narg) || narg + cmd.narg < 0) { - log_warn("wrong number of arguments for '%.*s': %d expected, %d given", - cmd.bstr.len, cmd.bstr.data, cmd.narg, narg); + if (narg < cmd.narg || narg > (cmd.narg + cmd.nopt)) { + log_warn("wrong # of arguments for '%.*s': %d+[%d] expected, %d given", + cmd.bstr.len, cmd.bstr.data, cmd.narg, cmd.nopt, narg); return PARSE_EINVALID; } @@ -121,8 +121,8 @@ parse_req(struct request *req, struct buf *buf) } el = array_push(req->token); status = parse_element(el, buf); + log_verb("parse element returned status %d", status); if (status != PARSE_OK) { - log_verb("parse element returned status %d", status); request_reset(req); buf->rpos = old_rpos; return status; @@ -131,6 +131,7 @@ parse_req(struct request *req, struct buf *buf) } status = _parse_cmd(req); + log_verb("parse command returned status %d", status); if (status != PARSE_OK) { buf->rpos = old_rpos; return status; diff --git a/src/protocol/data/redis/parse.h b/src/protocol/data/redis/parse.h index ce33167d3..50c7bebff 100644 --- a/src/protocol/data/redis/parse.h +++ b/src/protocol/data/redis/parse.h @@ -2,8 +2,8 @@ #include "request.h" #include "response.h" +#include "token.h" -#include #include #include @@ -33,14 +33,6 @@ typedef struct { PARSE_RSP_METRIC(METRIC_DECLARE) } parse_rsp_metrics_st; -typedef enum parse_rstatus { - PARSE_OK = 0, - PARSE_EUNFIN = -1, - PARSE_EEMPTY = -2, - PARSE_EOVERSIZE = -3, - PARSE_EINVALID = -4, - PARSE_EOTHER = -5, -} parse_rstatus_t; void parse_setup(parse_req_metrics_st *req, parse_rsp_metrics_st *rsp); void parse_teardown(void); diff --git a/src/protocol/data/redis/request.c b/src/protocol/data/redis/request.c index 2a38e5c46..3c5c7da2a 100644 --- a/src/protocol/data/redis/request.c +++ b/src/protocol/data/redis/request.c @@ -11,11 +11,13 @@ static bool request_init = false; static request_metrics_st *request_metrics = NULL; -struct command command_table[REQ_SENTINEL]; -#define CMD_INIT(_type, _str, _narg) \ - { .type = _type, .bstr = { sizeof(_str) - 1, (_str) }, .narg = _narg }, +#define CMD_INIT(_type, _str, _narg, _nopt) {\ + .type = _type, \ + .bstr = { sizeof(_str) - 1, (_str) }, \ + .narg = _narg, \ + .nopt = _nopt }, struct command command_table[REQ_SENTINEL] = { - { .type = REQ_UNKNOWN, .bstr = { 0, NULL }, .narg = 0 }, + { .type = REQ_UNKNOWN, .bstr = { 0, NULL }, .narg = 0, .nopt = 0 }, REQ_HASH(CMD_INIT) REQ_ZSET(CMD_INIT) REQ_MISC(CMD_INIT) @@ -189,7 +191,13 @@ request_setup(request_options_st *options, request_metrics_st *metrics) request_metrics = metrics; if (options != NULL) { + int i; ntoken = option_uint(&options->request_ntoken); + for (i = 1; i < REQ_SENTINEL; i++) { /* update nopt based on ntoken */ + if (command_table[i].nopt == -1) { + command_table[i].nopt = ntoken - command_table[i].narg; + } + } max = option_uint(&options->request_poolsize); } request_pool_create(max); diff --git a/src/protocol/data/redis/request.h b/src/protocol/data/redis/request.h index d5fb5f938..2459644b2 100644 --- a/src/protocol/data/redis/request.h +++ b/src/protocol/data/redis/request.h @@ -3,6 +3,7 @@ #include "cmd_hash.h" #include "cmd_misc.h" #include "cmd_zset.h" +#include "token.h" #include #include @@ -18,7 +19,7 @@ /* name type default description */ #define REQUEST_OPTION(ACTION) \ - ACTION( request_ntoken, OPTION_TYPE_UINT, REQ_NTOKEN, "# tokens in request")\ + ACTION( request_ntoken, OPTION_TYPE_UINT, REQ_NTOKEN, "# tokens in req" )\ ACTION( request_poolsize, OPTION_TYPE_UINT, REQ_POOLSIZE, "request pool size") typedef struct { @@ -38,7 +39,7 @@ typedef struct { REQUEST_METRIC(METRIC_DECLARE) } request_metrics_st; -#define GET_TYPE(_type, _str, narg) _type, +#define GET_TYPE(_type, _str, narg, nopt) _type, typedef enum cmd_type { REQ_UNKNOWN, REQ_HASH(GET_TYPE) @@ -48,10 +49,19 @@ typedef enum cmd_type { } cmd_type_e; #undef GET_TYPE +/* + * Note: though redis supports unboudned number of variables in some commands, + * implementation cannot operate with performance guarantee when this number + * gets too big. It also introduces uncertainty around resources. Therefore, we + * are limiting it to REQ_NTOKEN minus the # required args. For each command, if + * the # of optional arguments is declared as -1, (req_ntoken - narg) will be + * used to enforce argument limits. + */ struct command { cmd_type_e type; struct bstring bstr; - int32_t narg; + int32_t narg; /* number of required arguments, including verb */ + int32_t nopt; /* number of optional arguments */ }; extern struct command command_table[REQ_SENTINEL]; @@ -65,7 +75,7 @@ struct request { bool cerror; /* client error */ cmd_type_e type; - struct array *token; /* array elements are tokens */ + struct array *token; /* member type: `struct element' */ }; void request_setup(request_options_st *options, request_metrics_st *metrics); diff --git a/src/protocol/data/redis/response.c b/src/protocol/data/redis/response.c index 0d1ee49ce..ee840b18d 100644 --- a/src/protocol/data/redis/response.c +++ b/src/protocol/data/redis/response.c @@ -24,6 +24,8 @@ response_reset(struct response *rsp) STAILQ_NEXT(rsp, next) = NULL; rsp->free = false; + rsp->serror = false; + rsp->type = ELEM_UNKNOWN; rsp->nil = false; rsp->token->nelem = 0; diff --git a/src/protocol/data/redis/response.h b/src/protocol/data/redis/response.h index 7faa5efb2..cdaa32cf7 100644 --- a/src/protocol/data/redis/response.h +++ b/src/protocol/data/redis/response.h @@ -1,5 +1,7 @@ #pragma once +#include "token.h" + #include #include #include @@ -8,7 +10,7 @@ #include #include -#define RSP_NTOKEN 255 /* # tokens in a command */ +#define RSP_NTOKEN 127 /* # tokens in a response */ #define RSP_POOLSIZE 0 /* name type default description */ @@ -39,6 +41,7 @@ typedef struct { * - a RSP_NUMERIC type that doesn't have a corresponding message body. */ #define RSP_STR_OK "OK" +#define RSP_PONG "pong" /* * NOTE(yao): we store fields as location in rbuf, this assumes the data will @@ -50,9 +53,11 @@ struct response { STAILQ_ENTRY(response) next; /* allow response pooling/chaining */ bool free; - int type; - bool nil; - struct array *token; /* array elements are tokens */ + bool serror; /* server error */ + + element_type_e type; /* only array can have >1 token */ + bool nil; /* null array or null bulk string */ + struct array *token; /* member type: `struct element' */ }; void response_setup(response_options_st *options, response_metrics_st *metrics); diff --git a/src/protocol/data/redis/token.c b/src/protocol/data/redis/token.c index 086ea675b..0fd608eaf 100644 --- a/src/protocol/data/redis/token.c +++ b/src/protocol/data/redis/token.c @@ -3,6 +3,8 @@ #include "request.h" #include "response.h" +#include +#include #include #include #include diff --git a/src/protocol/data/redis/token.h b/src/protocol/data/redis/token.h index adf779175..7fffcb358 100644 --- a/src/protocol/data/redis/token.h +++ b/src/protocol/data/redis/token.h @@ -47,13 +47,27 @@ * last element. */ -#include "parse.h" -#include "compose.h" - #include #include #include +typedef enum parse_rstatus { + PARSE_OK = 0, + PARSE_EUNFIN = -1, + PARSE_EEMPTY = -2, + PARSE_EOVERSIZE = -3, + PARSE_EINVALID = -4, + PARSE_EOTHER = -5, +} parse_rstatus_t; + +typedef enum compose_rstatus { + COMPOSE_OK = 0, + COMPOSE_EUNFIN = -1, + COMPOSE_ENOMEM = -2, + COMPOSE_EINVALID = -3, + COMPOSE_EOTHER = -4, +} compose_rstatus_t; + /* array is not a basic element type */ typedef enum element_type { ELEM_UNKNOWN = 0, diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 9dff0246e..f2bfc8407 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -2,6 +2,10 @@ if(TARGET_PINGSERVER) add_subdirectory(pingserver) endif() +if(TARGET_REDIS) + add_subdirectory(slimredis) +endif() + if(TARGET_SLIMCACHE) add_subdirectory(slimcache) endif() diff --git a/src/server/slimredis/CMakeLists.txt b/src/server/slimredis/CMakeLists.txt new file mode 100644 index 000000000..b5db5ff6a --- /dev/null +++ b/src/server/slimredis/CMakeLists.txt @@ -0,0 +1,29 @@ +add_subdirectory(admin) +add_subdirectory(data) + +set(SOURCE + ${SOURCE} + main.c + setting.c + stats.c) + +set(MODULES + core + protocol_admin + protocol_redis + slab + time + util) + +set(LIBS + ccommon-static + ${CMAKE_THREAD_LIBS_INIT}) + +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/_bin) +set(TARGET_NAME ${PROJECT_NAME}_slimredis) + +add_executable(${TARGET_NAME} ${SOURCE}) +target_link_libraries(${TARGET_NAME} ${MODULES} ${LIBS}) + +install(TARGETS ${TARGET_NAME} RUNTIME DESTINATION bin) +add_dependencies(service ${TARGET_NAME}) diff --git a/src/server/slimredis/admin/CMakeLists.txt b/src/server/slimredis/admin/CMakeLists.txt new file mode 100644 index 000000000..31a7e65f3 --- /dev/null +++ b/src/server/slimredis/admin/CMakeLists.txt @@ -0,0 +1,4 @@ +set(SOURCE + ${SOURCE} + ${CMAKE_CURRENT_SOURCE_DIR}/process.c + PARENT_SCOPE) diff --git a/src/server/slimredis/admin/process.c b/src/server/slimredis/admin/process.c new file mode 100644 index 000000000..8cf1ca3a8 --- /dev/null +++ b/src/server/slimredis/admin/process.c @@ -0,0 +1,80 @@ +#include "process.h" + +#include "protocol/admin/admin_include.h" +#include "util/procinfo.h" + +#include +#include + +#define SLIMREDIS_ADMIN_MODULE_NAME "slimredis::admin" + +extern struct stats stats; +extern unsigned int nmetric; + +static bool admin_init = false; +static char *buf = NULL; +static size_t cap; + +void +admin_process_setup(void) +{ + log_info("set up the %s module", SLIMREDIS_ADMIN_MODULE_NAME); + if (admin_init) { + log_warn("%s has already been setup, overwrite", + SLIMREDIS_ADMIN_MODULE_NAME); + } + + cap = nmetric * METRIC_PRINT_LEN; + buf = cc_alloc(cap); + /* TODO: check return status of cc_alloc */ + + admin_init = true; +} + +void +admin_process_teardown(void) +{ + log_info("tear down the %s module", SLIMREDIS_ADMIN_MODULE_NAME); + if (!admin_init) { + log_warn("%s has never been setup", SLIMREDIS_ADMIN_MODULE_NAME); + } + + admin_init = false; +} + +static void +_admin_stats_default(struct response *rsp, struct request *req) +{ + procinfo_update(); + rsp->data.data = buf; + rsp->data.len = print_stats(buf, cap, (struct metric *)&stats, nmetric); +} + +static void +_admin_stats(struct response *rsp, struct request *req) +{ + if (bstring_empty(&req->arg)) { + _admin_stats_default(rsp, req); + return; + } else { + rsp->type = RSP_INVALID; + } +} + +void +admin_process_request(struct response *rsp, struct request *req) +{ + rsp->type = RSP_GENERIC; + + switch (req->type) { + case REQ_STATS: + _admin_stats(rsp, req); + break; + case REQ_VERSION: + rsp->data = str2bstr(VERSION_PRINTED); + break; + default: + rsp->type = RSP_INVALID; + break; + } +} diff --git a/src/server/slimredis/admin/process.h b/src/server/slimredis/admin/process.h new file mode 100644 index 000000000..361230004 --- /dev/null +++ b/src/server/slimredis/admin/process.h @@ -0,0 +1,4 @@ +#pragma once + +void admin_process_setup(void); +void admin_process_teardown(void); diff --git a/src/server/slimredis/data/CMakeLists.txt b/src/server/slimredis/data/CMakeLists.txt new file mode 100644 index 000000000..71e5e1565 --- /dev/null +++ b/src/server/slimredis/data/CMakeLists.txt @@ -0,0 +1,5 @@ +set(SOURCE + ${SOURCE} + ${CMAKE_CURRENT_SOURCE_DIR}/process.c + ${CMAKE_CURRENT_SOURCE_DIR}/cmd_misc.c + PARENT_SCOPE) diff --git a/src/server/slimredis/data/cmd_misc.c b/src/server/slimredis/data/cmd_misc.c new file mode 100644 index 000000000..3c67072ea --- /dev/null +++ b/src/server/slimredis/data/cmd_misc.c @@ -0,0 +1,30 @@ +#include "process.h" + +#include "protocol/data/redis_include.h" +#include +#include + + +bool allow_flush = ALLOW_FLUSH; + +void +cmd_ping(struct response *rsp, struct request *req, struct command *cmd) +{ + struct element *el = NULL; + + el = array_push(rsp->token); + ASSERT(el != NULL); /* cannot fail because we preallocate tokens */ + + if (cmd->nopt == 0) { /* no additional argument, respond pong */ + rsp->type = ELEM_STR; + el->type = ELEM_STR; + el->bstr = str2bstr(RSP_PONG); + } else { /* behave as echo, use bulk string */ + struct element *arg = (struct element *)array_get(req->token, 1); + rsp->type = ELEM_BULK; + el->type = ELEM_BULK; + el->bstr = arg->bstr; + } + + INCR(process_metrics, ping); +} diff --git a/src/server/slimredis/data/cmd_misc.h b/src/server/slimredis/data/cmd_misc.h new file mode 100644 index 000000000..ca2410717 --- /dev/null +++ b/src/server/slimredis/data/cmd_misc.h @@ -0,0 +1,13 @@ +#pragma once + +/* name type description */ +#define PROCESS_MISC_METRIC(ACTION) \ + ACTION( flushall, METRIC_COUNTER, "# flushall requests" )\ + ACTION( ping, METRIC_COUNTER, "# ping requests" ) + +struct request; +struct response; +struct command; + +/* cmd_* functions must be command_fn (process.c) compatible */ +void cmd_ping(struct response *rsp, struct request *req, struct command *cmd); diff --git a/src/server/slimredis/data/process.c b/src/server/slimredis/data/process.c new file mode 100644 index 000000000..0f8c13173 --- /dev/null +++ b/src/server/slimredis/data/process.c @@ -0,0 +1,176 @@ +#include "process.h" + +#include "protocol/data/redis_include.h" + +#include +#include +#include + +#define SLIMREDIS_PROCESS_MODULE_NAME "slimredis::process" + +#define OVERSIZE_ERR_MSG "oversized value, cannot be stored" +#define OOM_ERR_MSG "server is out of memory" +#define CMD_ERR_MSG "command not supported" +#define OTHER_ERR_MSG "unknown server error" + + +typedef void (* command_fn)(struct response *, struct request *, struct command *cmd); +static command_fn command_registry[REQ_SENTINEL]; + +static bool process_init = false; +process_metrics_st *process_metrics = NULL; + +void +process_setup(process_options_st *options, process_metrics_st *metrics) +{ + log_info("set up the %s module", SLIMREDIS_PROCESS_MODULE_NAME); + + if (process_init) { + log_warn("%s has already been setup, overwrite", + SLIMREDIS_PROCESS_MODULE_NAME); + } + + process_metrics = metrics; + + if (options != NULL) { + allow_flush = option_bool(&options->allow_flush); + } + + command_registry[REQ_PING] = cmd_ping; + + process_init = true; +} + +void +process_teardown(void) +{ + log_info("tear down the %s module", SLIMREDIS_PROCESS_MODULE_NAME); + if (!process_init) { + log_warn("%s has never been setup", SLIMREDIS_PROCESS_MODULE_NAME); + } + + command_registry[REQ_PING] = cmd_ping; + + allow_flush = ALLOW_FLUSH; + process_metrics = NULL; + process_init = false; +} + + +void +process_request(struct response *rsp, struct request *req) +{ + struct command cmd; + command_fn func = command_registry[req->type]; + + cmd = command_table[req->type]; + cmd.nopt = req->token->nelem - cmd.narg; + + log_verb("processing command '%.*s' with %d optional arguments", + cmd.bstr.len, cmd.bstr.data, cmd.nopt); + func(rsp, req, &cmd); +} + +int +slimredis_process_read(struct buf **rbuf, struct buf **wbuf, void **data) +{ + parse_rstatus_t status; + struct request *req; /* data should be NULL or hold a req pointer */ + struct response *rsp; + + req = request_borrow(); + rsp = response_borrow(); + if (req == NULL || rsp == NULL) { + goto error; + } + + /* keep parse-process-compose until running out of data in rbuf */ + while (buf_rsize(*rbuf) > 0) { + request_reset(req); + response_reset(rsp); + + /* stage 1: parsing */ + log_verb("%"PRIu32" bytes left", buf_rsize(*rbuf)); + + status = parse_req(req, *rbuf); + if (status == PARSE_EUNFIN) { + buf_lshift(*rbuf); + goto done; + } + if (status != PARSE_OK) { + /* parsing errors are all client errors, since we don't know + * how to recover from client errors in this condition (we do not + * have a valid request so we don't know where the invalid request + * ends), we should close the connection + */ + log_warn("illegal request received, status: %d", status); + INCR(process_metrics, process_ex); + INCR(process_metrics, process_client_ex); + goto error; + } + + /* stage 2: processing- check for quit, allocate response(s), process */ + + /* quit is special, no response expected */ + if (req->type == REQ_QUIT) { + log_info("peer called quit"); + goto error; + } + + /* actual processing */ + process_request(rsp, req); + + /* stage 3: write response(s) if necessary */ + + /* noreply means no need to write to buffers */ + if (compose_rsp(wbuf, rsp) < 0) { + log_error("composing rsp erred"); + INCR(process_metrics, process_ex); + INCR(process_metrics, process_server_ex); + goto error; + } + + /* logging, clean-up */ + } + +done: + request_return(&req); + response_return(&rsp); + + return 0; + +error: + request_return(&req); + response_return(&rsp); + + return -1; +} + + +int +slimredis_process_write(struct buf **rbuf, struct buf **wbuf, void **data) +{ + log_verb("post-write processing"); + + buf_lshift(*rbuf); + dbuf_shrink(rbuf); + buf_lshift(*wbuf); + dbuf_shrink(wbuf); + + return 0; +} + + +int +slimredis_process_error(struct buf **rbuf, struct buf **wbuf, void **data) +{ + log_verb("post-error processing"); + + /* normalize buffer size */ + buf_reset(*rbuf); + dbuf_shrink(rbuf); + buf_reset(*wbuf); + dbuf_shrink(wbuf); + + return 0; +} diff --git a/src/server/slimredis/data/process.h b/src/server/slimredis/data/process.h new file mode 100644 index 000000000..fb110e90f --- /dev/null +++ b/src/server/slimredis/data/process.h @@ -0,0 +1,39 @@ +#pragma once + +#include "cmd_misc.h" + +#include +#include +#include + +#define ALLOW_FLUSH false + +/* name type default description */ +#define PROCESS_OPTION(ACTION) \ + ACTION( allow_flush, OPTION_TYPE_BOOL, ALLOW_FLUSH, "allow flushing on the data port" ) + +typedef struct { + PROCESS_OPTION(OPTION_DECLARE) +} process_options_st; + +/* name type description */ +#define PROCESS_METRIC(ACTION) \ + ACTION( process_req, METRIC_COUNTER, "# requests processed" )\ + ACTION( process_ex, METRIC_COUNTER, "# processing error" )\ + ACTION( process_client_ex, METRIC_COUNTER, "# internal error" )\ + ACTION( process_server_ex, METRIC_COUNTER, "# internal error" ) + +typedef struct { + PROCESS_METRIC(METRIC_DECLARE) + PROCESS_MISC_METRIC(METRIC_DECLARE) +} process_metrics_st; + +extern process_metrics_st *process_metrics; +extern bool allow_flush; + +void process_setup(process_options_st *options, process_metrics_st *metrics); +void process_teardown(void); + +int slimredis_process_read(struct buf **rbuf, struct buf **wbuf, void **data); +int slimredis_process_write(struct buf **rbuf, struct buf **wbuf, void **data); +int slimredis_process_error(struct buf **rbuf, struct buf **wbuf, void **data); diff --git a/src/server/slimredis/main.c b/src/server/slimredis/main.c new file mode 100644 index 000000000..308ac717a --- /dev/null +++ b/src/server/slimredis/main.c @@ -0,0 +1,202 @@ +#include "admin/process.h" +#include "setting.h" +#include "stats.h" + +#include "time/time.h" +#include "util/util.h" + +#include + +#include +#include +#include +#include +#include + +struct data_processor worker_processor = { + slimredis_process_read, + slimredis_process_write, + slimredis_process_error, +}; + +static void +show_usage(void) +{ + log_stdout( + "Usage:" CRLF + " pelikan_slimredis [option|config]" CRLF + ); + log_stdout( + "Description:" CRLF + " pelikan_slimredis is one of the unified cache backends. " CRLF + " It uses managed storage backends to cache key/val pairs. " CRLF + " It speaks the memcached ASCII protocol and supports almost " CRLF + " all ASCII memcached commands." CRLF + ); + log_stdout( + "Command-line options:" CRLF + " -h, --help show this message" CRLF + " -v, --version show version number" CRLF + " -c, --config list & describe all options in config" CRLF + " -s, --stats list & describe all metrics in stats" CRLF + ); + log_stdout( + "Example:" CRLF + " pelikan_slimredis slimredis.conf" CRLF CRLF + "Sample config files can be found under the config dir." CRLF + ); +} + +static void +teardown(void) +{ + core_worker_teardown(); + core_server_teardown(); + core_admin_teardown(); + admin_process_teardown(); + process_teardown(); + compose_teardown(); + parse_teardown(); + response_teardown(); + request_teardown(); + procinfo_teardown(); + time_teardown(); + + timing_wheel_teardown(); + tcp_teardown(); + sockio_teardown(); + event_teardown(); + dbuf_teardown(); + buf_teardown(); + + debug_teardown(); + log_teardown(); +} + +static void +setup(void) +{ + char *fname = NULL; + uint64_t intvl; + + if (atexit(teardown) != 0) { + log_stderr("cannot register teardown procedure with atexit()"); + exit(EX_OSERR); /* only failure comes from NOMEM */ + } + + /* Setup logging first */ + log_setup(&stats.log); + if (debug_setup(&setting.debug) != CC_OK) { + log_stderr("debug log setup failed"); + exit(EX_CONFIG); + } + + /* setup top-level application options */ + if (option_bool(&setting.slimredis.daemonize)) { + daemonize(); + } + fname = option_str(&setting.slimredis.pid_filename); + if (fname != NULL) { + /* to get the correct pid, call create_pidfile after daemonize */ + create_pidfile(fname); + } + + /* setup library modules */ + buf_setup(&setting.buf, &stats.buf); + dbuf_setup(&setting.dbuf, &stats.dbuf); + event_setup(&stats.event); + sockio_setup(&setting.sockio, &stats.sockio); + tcp_setup(&setting.tcp, &stats.tcp); + timing_wheel_setup(&stats.timing_wheel); + + /* setup pelikan modules */ + time_setup(); + procinfo_setup(&stats.procinfo); + request_setup(&setting.request, &stats.request); + response_setup(&setting.response, &stats.response); + parse_setup(&stats.parse_req, NULL); + compose_setup(NULL, &stats.compose_rsp); + process_setup(&setting.process, &stats.process); + admin_process_setup(); + core_admin_setup(&setting.admin); + core_server_setup(&setting.server, &stats.server); + core_worker_setup(&setting.worker, &stats.worker); + + /* adding recurring events to maintenance/admin thread */ + intvl = option_uint(&setting.slimredis.dlog_intvl); + if (core_admin_register(intvl, debug_log_flush, NULL) == NULL) { + log_stderr("Could not register timed event to flush debug log"); + goto error; + } + + return; + +error: + if (fname != NULL) { + remove_pidfile(fname); + } + + /* since we registered teardown with atexit, it'll be called upon exit */ + exit(EX_CONFIG); +} + +int +main(int argc, char **argv) +{ + rstatus_i status = CC_OK;; + FILE *fp = NULL; + + if (argc > 2) { + show_usage(); + exit(EX_USAGE); + } + + if (argc == 1) { + log_stderr("launching server with default values."); + } else { + /* argc == 2 */ + if (strcmp(argv[1], "-h") == 0 || strcmp(argv[1], "--help") == 0) { + show_usage(); + exit(EX_OK); + } + if (strcmp(argv[1], "-v") == 0 || strcmp(argv[1], "--version") == 0) { + show_version(); + exit(EX_OK); + } + if (strcmp(argv[1], "-c") == 0 || strcmp(argv[1], "--config") == 0) { + option_describe_all((struct option *)&setting, nopt); + exit(EX_OK); + } + if (strcmp(argv[1], "-s") == 0 || strcmp(argv[1], "--stats") == 0) { + metric_describe_all((struct metric *)&stats, nmetric); + exit(EX_OK); + } + fp = fopen(argv[1], "r"); + if (fp == NULL) { + log_stderr("cannot open config: incorrect path or doesn't exist"); + exit(EX_DATAERR); + } + } + + if (option_load_default((struct option *)&setting, nopt) != CC_OK) { + log_stderr("failed to load default option values"); + exit(EX_CONFIG); + } + + if (fp != NULL) { + log_stderr("load config from %s", argv[1]); + status = option_load_file(fp, (struct option *)&setting, nopt); + fclose(fp); + } + if (status != CC_OK) { + log_stderr("failed to load config"); + exit(EX_DATAERR); + } + + setup(); + option_print_all((struct option *)&setting, nopt); + + core_run(&worker_processor); + + exit(EX_OK); +} diff --git a/src/server/slimredis/setting.c b/src/server/slimredis/setting.c new file mode 100644 index 000000000..c5444c24a --- /dev/null +++ b/src/server/slimredis/setting.c @@ -0,0 +1,19 @@ +#include "setting.h" + +struct setting setting = { + { SLIMREDIS_OPTION(OPTION_INIT) }, + { ADMIN_OPTION(OPTION_INIT) }, + { SERVER_OPTION(OPTION_INIT) }, + { WORKER_OPTION(OPTION_INIT) }, + { PROCESS_OPTION(OPTION_INIT) }, + { REQUEST_OPTION(OPTION_INIT) }, + { RESPONSE_OPTION(OPTION_INIT) }, + { ARRAY_OPTION(OPTION_INIT) }, + { BUF_OPTION(OPTION_INIT) }, + { DBUF_OPTION(OPTION_INIT) }, + { DEBUG_OPTION(OPTION_INIT) }, + { SOCKIO_OPTION(OPTION_INIT) }, + { TCP_OPTION(OPTION_INIT) }, +}; + +unsigned int nopt = OPTION_CARDINALITY(struct setting); diff --git a/src/server/slimredis/setting.h b/src/server/slimredis/setting.h new file mode 100644 index 000000000..68f7a9cd1 --- /dev/null +++ b/src/server/slimredis/setting.h @@ -0,0 +1,47 @@ +#pragma once + +#include "data/process.h" + +#include "core/core.h" +#include "protocol/data/redis_include.h" + +#include +#include +#include +#include +#include +#include +#include + +/* option related */ +/* name type default description */ +#define SLIMREDIS_OPTION(ACTION) \ + ACTION( daemonize, OPTION_TYPE_BOOL, false, "daemonize the process" )\ + ACTION( pid_filename, OPTION_TYPE_STR, NULL, "file storing the pid" )\ + ACTION( dlog_intvl, OPTION_TYPE_UINT, 500, "debug log flush interval(ms)" ) + +typedef struct { + SLIMREDIS_OPTION(OPTION_DECLARE) +} slimredis_options_st; + +struct setting { + /* top-level */ + slimredis_options_st slimredis; + /* application modules */ + admin_options_st admin; + server_options_st server; + worker_options_st worker; + process_options_st process; + request_options_st request; + response_options_st response; + /* ccommon libraries */ + array_options_st array; + buf_options_st buf; + dbuf_options_st dbuf; + debug_options_st debug; + sockio_options_st sockio; + tcp_options_st tcp; +}; + +extern struct setting setting; +extern unsigned int nopt; diff --git a/src/server/slimredis/stats.c b/src/server/slimredis/stats.c new file mode 100644 index 000000000..1c86a710a --- /dev/null +++ b/src/server/slimredis/stats.c @@ -0,0 +1,21 @@ +#include "stats.h" + +struct stats stats = { + { PROCINFO_METRIC(METRIC_INIT) }, + { PROCESS_METRIC(METRIC_INIT) }, + { PARSE_REQ_METRIC(METRIC_INIT) }, + { COMPOSE_RSP_METRIC(METRIC_INIT) }, + { REQUEST_METRIC(METRIC_INIT) }, + { RESPONSE_METRIC(METRIC_INIT) }, + { CORE_SERVER_METRIC(METRIC_INIT) }, + { CORE_WORKER_METRIC(METRIC_INIT) }, + { BUF_METRIC(METRIC_INIT) }, + { DBUF_METRIC(METRIC_INIT) }, + { EVENT_METRIC(METRIC_INIT) }, + { LOG_METRIC(METRIC_INIT) }, + { SOCKIO_METRIC(METRIC_INIT) }, + { TCP_METRIC(METRIC_INIT) }, + { TIMING_WHEEL_METRIC(METRIC_INIT) }, +}; + +unsigned int nmetric = METRIC_CARDINALITY(stats); diff --git a/src/server/slimredis/stats.h b/src/server/slimredis/stats.h new file mode 100644 index 000000000..5c458fd4c --- /dev/null +++ b/src/server/slimredis/stats.h @@ -0,0 +1,38 @@ +#pragma once + +#include "data/process.h" + +#include "core/core.h" +#include "protocol/data/redis_include.h" +#include "util/procinfo.h" + +#include +#include +#include +#include +#include +#include