Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add monitor feature #653

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
4 changes: 3 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ nutcracker_SOURCES = \
nc_array.c nc_array.h \
nc_util.c nc_util.h \
nc_queue.h \
nc_monitor.c nc_monitor.h \
nc.c

nutcracker_LDADD = $(top_builddir)/src/hashkit/libhashkit.a
Expand Down Expand Up @@ -81,7 +82,8 @@ test_all_SOURCES = test_all.c \
nc_string.c nc_string.h \
nc_array.c nc_array.h \
nc_util.c nc_util.h \
nc_queue.h
nc_queue.h \
nc_monitor.c nc_monitor.h

test_all_LDADD = $(top_builddir)/src/hashkit/libhashkit.a
test_all_LDADD += $(top_builddir)/src/proto/libproto.a
Expand Down
19 changes: 19 additions & 0 deletions src/nc_array.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,22 @@ array_each(const struct array *a, array_each_t func, void *data)

return NC_OK;
}

rstatus_t
array_del(struct array *a, uint32_t idx)
{
uint8_t *pos = NULL;
uint64_t len = 0;

if (a->nelem == 0 || idx >= a->nelem) {
return NC_ERROR;
}

pos = (uint8_t*)a->elem + (a->size * idx);
len = (a->nelem - idx - 1) * a->size;

memmove(pos, pos + a->size, (size_t)len);
a->nelem--;

return NC_OK;
}
1 change: 1 addition & 0 deletions src/nc_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,6 @@ void *array_top(const struct array *a);
void array_swap(struct array *a, struct array *b);
void array_sort(struct array *a, array_compare_t compare);
rstatus_t array_each(const struct array *a, array_each_t func, void *data);
rstatus_t array_del(struct array *a, uint32_t idx);

#endif
5 changes: 5 additions & 0 deletions src/nc_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ client_close(struct context *ctx, struct conn *conn)

client_close_stats(ctx, conn->owner, conn->err, conn->eof);

/* when client close, if conn in monitor, delete it */
if (conn->monitor_client) {
del_from_monitor(conn);
}

if (conn->sd < 0) {
conn->unref(conn);
conn_put(conn);
Expand Down
14 changes: 14 additions & 0 deletions src/nc_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <nc_conf.h>
#include <nc_server.h>
#include <proto/nc_proto.h>
#include <nc_monitor.h>

#define DEFINE_ACTION(_hash, _name) string(#_name),
static const struct string hash_strings[] = {
Expand Down Expand Up @@ -110,6 +111,10 @@ static const struct command conf_commands[] = {
conf_add_server,
offsetof(struct conf_pool, server) },

{ string("enable_monitor"),
conf_set_bool,
offsetof(struct conf_pool, enable_monitor) },

null_command
};

Expand Down Expand Up @@ -225,6 +230,8 @@ conf_pool_init(struct conf_pool *cp, const struct string *name)
return status;
}

cp->enable_monitor = CONF_UNSET_NUM;

log_debug(LOG_VVERB, "init conf pool %p, '%.*s'", cp, name->len, name->data);

return NC_OK;
Expand Down Expand Up @@ -311,6 +318,9 @@ conf_pool_each_transform(void *elem, void *data)
return status;
}

sp->enable_monitor = cp->enable_monitor ? 1 : 0;
monitor_init(sp);

log_debug(LOG_VERB, "transform to pool %"PRIu32" '%.*s'", sp->idx,
sp->name.len, sp->name.data);

Expand Down Expand Up @@ -1282,6 +1292,10 @@ conf_validate_pool(struct conf *cf, struct conf_pool *cp)
return status;
}

if (cp->enable_monitor == CONF_UNSET_NUM) {
cp->enable_monitor = CONF_DEFAULT_ENABLE_MONITOR;
}

cp->valid = 1;

return NC_OK;
Expand Down
2 changes: 2 additions & 0 deletions src/nc_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#define CONF_DEFAULT_SERVER_CONNECTIONS 1
#define CONF_DEFAULT_KETAMA_PORT 11211
#define CONF_DEFAULT_TCPKEEPALIVE false
#define CONF_DEFAULT_ENABLE_MONITOR false

struct conf_listen {
struct string pname; /* listen: as "hostname:port" */
Expand Down Expand Up @@ -94,6 +95,7 @@ struct conf_pool {
int server_retry_timeout; /* server_retry_timeout: in msec */
int server_failure_limit; /* server_failure_limit: */
struct array server; /* servers: conf_server[] */
int enable_monitor; /* enable_monitor: */
unsigned valid:1; /* valid? */
};

Expand Down
1 change: 1 addition & 0 deletions src/nc_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ _conn_get(void)
conn->done = 0;
conn->redis = 0;
conn->authenticated = 0;
conn->monitor_client = 0;

ntotal_conn++;
ncurr_conn++;
Expand Down
1 change: 1 addition & 0 deletions src/nc_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ struct conn {
unsigned done:1; /* done? aka close? */
unsigned redis:1; /* redis? */
unsigned authenticated:1; /* authenticated? */
unsigned monitor_client:1;/* monitor client? */
};

TAILQ_HEAD(conn_tqh, conn);
Expand Down
1 change: 1 addition & 0 deletions src/nc_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ struct event_base;
#include <nc_message.h>
#include <nc_connection.h>
#include <nc_server.h>
#include <nc_monitor.h>

struct context {
uint32_t id; /* unique context id */
Expand Down
26 changes: 26 additions & 0 deletions src/nc_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ _msg_get(void)
msg->fdone = 0;
msg->swallow = 0;
msg->redis = 0;
msg->monitor = 0;

return msg;
}
Expand Down Expand Up @@ -910,3 +911,28 @@ bool msg_set_placeholder_key(struct msg *r)
return true;
}

rstatus_t
msg_append_full(struct msg *msg, uint8_t *pos, size_t n)
{
struct mbuf *mbuf = NULL;
size_t cidx = 0;
size_t mbsize = 0;
size_t clen = 0;

do {
mbuf = msg_ensure_mbuf(msg, n);
if (mbuf == NULL) {
return NC_ENOMEM;
}

mbsize = mbuf_size(mbuf);

clen = n > mbsize ? mbsize : n;
mbuf_copy(mbuf, pos+cidx, clen);
cidx += clen;
msg->mlen += (uint32_t)clen;
n -= clen;
} while(n);

return NC_OK;
}
5 changes: 5 additions & 0 deletions src/nc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ typedef enum msg_parse_result {
ACTION( REQ_MC_TOUCH ) /* memcache touch request */ \
ACTION( REQ_MC_QUIT ) /* memcache quit request */ \
ACTION( REQ_MC_VERSION ) /* memcache version request */ \
ACTION( REQ_MC_MONITOR ) /* memcache monitor request, only used for proxy */ \
ACTION( RSP_MC_NUM ) /* memcache arithmetic response */ \
ACTION( RSP_MC_STORED ) /* memcache cas and storage response */ \
ACTION( RSP_MC_NOT_STORED ) \
Expand Down Expand Up @@ -202,6 +203,7 @@ typedef enum msg_parse_result {
ACTION( REQ_REDIS_SELECT) /* only during init */ \
ACTION( REQ_REDIS_COMMAND) /* Sent to random server for redis-cli completions*/ \
ACTION( REQ_REDIS_LOLWUT) /* Vitally important */ \
ACTION( REQ_REDIS_MONITOR) /* monitor */ \
ACTION( RSP_REDIS_STATUS ) /* redis response */ \
ACTION( RSP_REDIS_ERROR ) \
ACTION( RSP_REDIS_ERROR_ERR ) \
Expand Down Expand Up @@ -300,6 +302,7 @@ struct msg {
unsigned fdone:1; /* all fragments are done? */
unsigned swallow:1; /* swallow response? */
unsigned redis:1; /* redis? */
unsigned monitor:1; /* monitor comamnd? */
};

TAILQ_HEAD(msg_tqh, msg);
Expand Down Expand Up @@ -350,4 +353,6 @@ void rsp_recv_done(struct context *ctx, struct conn *conn, struct msg *msg, stru
struct msg *rsp_send_next(struct context *ctx, struct conn *conn);
void rsp_send_done(struct context *ctx, struct conn *conn, struct msg *msg);

rstatus_t msg_append_full(struct msg *msg, uint8_t *pos, size_t n);

#endif
174 changes: 174 additions & 0 deletions src/nc_monitor.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* twemproxy - A fast and lightweight proxy for memcached protocol.
*
* Copyright (C) 2021, wei huang <wei.kukey@gmail.com>
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <nc_monitor.h>
#include <nc_array.h>

void
monitor_init(struct server_pool *sp)
{
if (sp->enable_monitor) {
array_init(&sp->monitor_conns, CONF_DEFAULT_ARRAY_MONITOR_NUM, sizeof(struct conn *));
}
}

void
monitor_deinit(struct server_pool *sp)
{
struct array *monitor_conns = &sp->monitor_conns;

ASSERT(monitor_conns != NULL);

if (sp->enable_monitor) {
while (array_n(monitor_conns) > 0) {
array_pop(monitor_conns);
}
array_deinit(monitor_conns);
}
}

rstatus_t
add_to_monitor(struct conn *c)
{
struct server_pool *sp = c->owner;

ASSERT(c->client && sp != NULL);

struct conn **monitor = array_push(&sp->monitor_conns);
if (monitor == NULL) {
return NC_ENOMEM;
}
c->monitor_client = 1;
*monitor = c;

return NC_OK;
}

void
del_from_monitor(struct conn *c)
{
uint32_t i;
struct conn **tmp_conn = NULL;
struct server_pool *sp = c->owner;
struct array *a = NULL;

ASSERT(c->client && c->monitor_client);
ASSERT(sp != NULL);

a = &sp->monitor_conns;
for (i = 0; i < array_n(a); i++) {
tmp_conn = array_get(a, i);
if (*tmp_conn == c) {
array_del(a, i);
break;
}
}
}

struct monitor_data
{
struct msg *m;
struct conn *c;
struct context *ctx;
struct string *d;
};

static int
monitor_callback(void *conn, void *data)
{
struct monitor_data *mdata = data;
struct conn **monitor_conn = conn;
struct conn *req_c = *monitor_conn;

struct msg *req = req_get(req_c);
if (req == NULL) {
return NC_ENOMEM;
}
struct msg *rsp = msg_get(req_c, 0, mdata->c->redis);
if (rsp == NULL) {
msg_put(req);
return NC_ENOMEM;
}

req->peer = rsp;
rsp->peer = req;

req->done = 1;
rsp->done = 1;

if (msg_append_full(rsp, mdata->d->data, mdata->d->len) != NC_OK) {
msg_put(req);
msg_put(rsp);
return NC_ENOMEM;
}
req_c->enqueue_outq(mdata->ctx, req_c, req);
if (event_add_out(mdata->ctx->evb, req_c) != NC_OK) {
req_c->err = errno;
req_c->dequeue_outq(mdata->ctx, req_c, req);
msg_put(req);
msg_put(rsp);
return NC_ERROR;
}

return NC_OK;
}

rstatus_t rsp_send_monitor_msg(struct context *ctx, struct conn *c, struct msg *m)
{
ASSERT(c->client);

struct server_pool *sp = c->owner;
struct string monitor_message = null_string;
struct monitor_data mdata = {0};
mdata.m = m;
mdata.c = c;
mdata.d = &monitor_message;
mdata.ctx = ctx;
struct keypos kpos = {0};
struct keypos *tmp_kpos = NULL;

/* Only redis command command has a fake key. */
if (m->type != MSG_REQ_REDIS_COMMAND) {
tmp_kpos = array_get(m->keys, 0);

kpos.start = tmp_kpos->start;
kpos.end = tmp_kpos->end;
}

if (c->redis) {
string_printf(&monitor_message, "+%ld.%06ld [%s] command=%s key0=%.*s\r\n",
m->start_ts/1000000, m->start_ts%1000000,
nc_unresolve_peer_desc(c->sd),
(msg_type_string(m->type))->data,
kpos.end - kpos.start, kpos.start);

} else {
/* This monitor protocol only for twemproxy. */
string_printf(&monitor_message, "MONITOR\r\n%ld.%06ld [%s] command=%s key0=%.*s\r\nEND\r\n",
m->start_ts/1000000, m->start_ts%1000000,
nc_unresolve_peer_desc(c->sd),
(msg_type_string(m->type))->data,
kpos.end - kpos.start, kpos.start);
}

array_each(&sp->monitor_conns, monitor_callback, &mdata);

string_deinit(&monitor_message);
return NC_OK;
}
Loading