Skip to content

Commit

Permalink
adding redis binary (barebone) (#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yao Yue authored May 19, 2018
1 parent 1196b58 commit 2e8c7b6
Show file tree
Hide file tree
Showing 17 changed files with 733 additions and 1 deletion.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ option(HAVE_LOGGING "logging enabled by default" ON)
option(HAVE_STATS "stats enabled by default" ON)

option(TARGET_PINGSERVER "build pingserver binary" ON)
option(TARGET_REDIS "build redis binary" ON)
option(TARGET_SLIMREDIS "build slimredis binary" ON)
option(TARGET_SLIMCACHE "build slimcache binary" ON)
option(TARGET_TWEMCACHE "build twemcache binary" ON)
Expand Down
4 changes: 4 additions & 0 deletions src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ if(TARGET_PINGSERVER)
add_subdirectory(pingserver)
endif()

if(TARGET_REDIS)
add_subdirectory(redis)
endif()

if(TARGET_SLIMREDIS)
add_subdirectory(slimredis)
endif()
Expand Down
30 changes: 30 additions & 0 deletions src/server/redis/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
add_subdirectory(admin)
add_subdirectory(data)

set(SOURCE
${SOURCE}
main.c
setting.c
stats.c)

set(MODULES
core
ds_bitmap
protocol_admin
protocol_redis
slab
time
util)

set(LIBS
ccommon-static
${CMAKE_THREAD_LIBS_INIT})

set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/_bin)
set(TARGET_NAME ${PROJECT_NAME}_redis)

add_executable(${TARGET_NAME} ${SOURCE})
target_link_libraries(${TARGET_NAME} ${MODULES} ${LIBS})

install(TARGETS ${TARGET_NAME} RUNTIME DESTINATION bin)
add_dependencies(service ${TARGET_NAME})
4 changes: 4 additions & 0 deletions src/server/redis/admin/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
set(SOURCE
${SOURCE}
${CMAKE_CURRENT_SOURCE_DIR}/process.c
PARENT_SCOPE)
80 changes: 80 additions & 0 deletions src/server/redis/admin/process.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#include "process.h"

#include "protocol/admin/admin_include.h"
#include "util/procinfo.h"

#include <cc_mm.h>
#include <cc_print.h>

#define REDIS_ADMIN_MODULE_NAME "redis::admin"

extern struct stats stats;
extern unsigned int nmetric;

static bool admin_init = false;
static char *buf = NULL;
static size_t cap;

void
admin_process_setup(void)
{
log_info("set up the %s module", REDIS_ADMIN_MODULE_NAME);
if (admin_init) {
log_warn("%s has already been setup, overwrite",
REDIS_ADMIN_MODULE_NAME);
}

cap = nmetric * METRIC_PRINT_LEN;
buf = cc_alloc(cap);
/* TODO: check return status of cc_alloc */

admin_init = true;
}

void
admin_process_teardown(void)
{
log_info("tear down the %s module", REDIS_ADMIN_MODULE_NAME);
if (!admin_init) {
log_warn("%s has never been setup", REDIS_ADMIN_MODULE_NAME);
}

admin_init = false;
}

static void
_admin_stats_default(struct response *rsp, struct request *req)
{
procinfo_update();
rsp->data.data = buf;
rsp->data.len = print_stats(buf, cap, (struct metric *)&stats, nmetric);
}

static void
_admin_stats(struct response *rsp, struct request *req)
{
if (bstring_empty(&req->arg)) {
_admin_stats_default(rsp, req);
return;
} else {
rsp->type = RSP_INVALID;
}
}

void
admin_process_request(struct response *rsp, struct request *req)
{
rsp->type = RSP_GENERIC;

switch (req->type) {
case REQ_STATS:
_admin_stats(rsp, req);
break;
case REQ_VERSION:
rsp->data = str2bstr(VERSION_PRINTED);
break;
default:
rsp->type = RSP_INVALID;
break;
}
}
4 changes: 4 additions & 0 deletions src/server/redis/admin/process.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#pragma once

void admin_process_setup(void);
void admin_process_teardown(void);
5 changes: 5 additions & 0 deletions src/server/redis/data/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
set(SOURCE
${SOURCE}
${CMAKE_CURRENT_SOURCE_DIR}/process.c
${CMAKE_CURRENT_SOURCE_DIR}/cmd_misc.c
PARENT_SCOPE)
30 changes: 30 additions & 0 deletions src/server/redis/data/cmd_misc.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#include "process.h"

#include "protocol/data/redis_include.h"
#include <cc_array.h>
#include <cc_debug.h>


bool allow_flush = ALLOW_FLUSH;

void
cmd_ping(struct response *rsp, struct request *req, struct command *cmd)
{
struct element *el = NULL;

el = array_push(rsp->token);
ASSERT(el != NULL); /* cannot fail because we preallocate tokens */

if (cmd->nopt == 0) { /* no additional argument, respond pong */
rsp->type = ELEM_STR;
el->type = ELEM_STR;
el->bstr = str2bstr(RSP_PONG);
} else { /* behave as echo, use bulk string */
struct element *arg = (struct element *)array_get(req->token, 1);
rsp->type = ELEM_BULK;
el->type = ELEM_BULK;
el->bstr = arg->bstr;
}

INCR(process_metrics, ping);
}
13 changes: 13 additions & 0 deletions src/server/redis/data/cmd_misc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#pragma once

/* name type description */
#define PROCESS_MISC_METRIC(ACTION) \
ACTION( flushall, METRIC_COUNTER, "# flushall requests" )\
ACTION( ping, METRIC_COUNTER, "# ping requests" )

struct request;
struct response;
struct command;

/* cmd_* functions must be command_fn (process.c) compatible */
void cmd_ping(struct response *rsp, struct request *req, struct command *cmd);
187 changes: 187 additions & 0 deletions src/server/redis/data/process.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#include "process.h"

#include "protocol/data/redis_include.h"

#include <buffer/cc_dbuf.h>
#include <cc_debug.h>
#include <cc_print.h>

#define REDIS_PROCESS_MODULE_NAME "redis::process"

#define OVERSIZE_ERR_MSG "oversized value, cannot be stored"
#define OOM_ERR_MSG "server is out of memory"
#define CMD_ERR_MSG "command not supported"
#define OTHER_ERR_MSG "unknown server error"


typedef void (* command_fn)(struct response *, struct request *, struct command *cmd);
static command_fn command_registry[REQ_SENTINEL];

static bool process_init = false;
process_metrics_st *process_metrics = NULL;

void
process_setup(process_options_st *options, process_metrics_st *metrics)
{
log_info("set up the %s module", REDIS_PROCESS_MODULE_NAME);

if (process_init) {
log_warn("%s has already been setup, overwrite",
REDIS_PROCESS_MODULE_NAME);
}

process_metrics = metrics;

if (options != NULL) {
allow_flush = option_bool(&options->allow_flush);
}

command_registry[REQ_PING] = cmd_ping;

process_init = true;
}

void
process_teardown(void)
{
log_info("tear down the %s module", REDIS_PROCESS_MODULE_NAME);
if (!process_init) {
log_warn("%s has never been setup", REDIS_PROCESS_MODULE_NAME);
}

command_registry[REQ_PING] = cmd_ping;

allow_flush = ALLOW_FLUSH;
process_metrics = NULL;
process_init = false;
}


void
process_request(struct response *rsp, struct request *req)
{
struct command cmd;
command_fn func = command_registry[req->type];

if (func == NULL) {
struct element *reply = (struct element *)array_push(rsp->token);
log_warn("command is recognized but not implemented");

rsp->type = reply->type = ELEM_ERR;
reply->bstr = str2bstr(RSP_ERR_NOSUPPORT);
INCR(process_metrics, process_ex);

return;
}

cmd = command_table[req->type];
cmd.nopt = req->token->nelem - cmd.narg;

log_verb("processing command '%.*s' with %d optional arguments",
cmd.bstr.len, cmd.bstr.data, cmd.nopt);
func(rsp, req, &cmd);
}

int
redis_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
{
parse_rstatus_t status;
struct request *req; /* data should be NULL or hold a req pointer */
struct response *rsp;

req = request_borrow();
rsp = response_borrow();
if (req == NULL || rsp == NULL) {
goto error;
}

/* keep parse-process-compose until running out of data in rbuf */
while (buf_rsize(*rbuf) > 0) {
request_reset(req);
response_reset(rsp);

/* stage 1: parsing */
log_verb("%"PRIu32" bytes left", buf_rsize(*rbuf));

status = parse_req(req, *rbuf);
if (status == PARSE_EUNFIN) {
buf_lshift(*rbuf);
goto done;
}
if (status != PARSE_OK) {
/* parsing errors are all client errors, since we don't know
* how to recover from client errors in this condition (we do not
* have a valid request so we don't know where the invalid request
* ends), we should close the connection
*/
log_warn("illegal request received, status: %d", status);
INCR(process_metrics, process_ex);
INCR(process_metrics, process_client_ex);
goto error;
}

/* stage 2: processing- check for quit, allocate response(s), process */

/* quit is special, no response expected */
if (req->type == REQ_QUIT) {
log_info("peer called quit");
goto error;
}

/* actual processing */
process_request(rsp, req);

/* stage 3: write response(s) if necessary */

/* noreply means no need to write to buffers */
if (compose_rsp(wbuf, rsp) < 0) {
log_error("composing rsp erred");
INCR(process_metrics, process_ex);
INCR(process_metrics, process_server_ex);
goto error;
}

/* logging, clean-up */
}

done:
request_return(&req);
response_return(&rsp);

return 0;

error:
request_return(&req);
response_return(&rsp);

return -1;
}


int
redis_process_write(struct buf **rbuf, struct buf **wbuf, void **data)
{
log_verb("post-write processing");

buf_lshift(*rbuf);
dbuf_shrink(rbuf);
buf_lshift(*wbuf);
dbuf_shrink(wbuf);

return 0;
}


int
redis_process_error(struct buf **rbuf, struct buf **wbuf, void **data)
{
log_verb("post-error processing");

/* normalize buffer size */
buf_reset(*rbuf);
dbuf_shrink(rbuf);
buf_reset(*wbuf);
dbuf_shrink(wbuf);

return 0;
}
Loading

0 comments on commit 2e8c7b6

Please sign in to comment.