Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

使用 master 分支最新代码的异步接口时,写入大量命令时会 coredump,请帮忙看下啥原因导致的,谢谢! #26

Closed
helloBingGeGe opened this issue Dec 2, 2016 · 19 comments

Comments

@helloBingGeGe
Copy link

helloBingGeGe commented Dec 2, 2016

#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <hircluster.h>
#include <adapters/libevent.h>

const char* get_redis_reply_type(int type)
{
    static const char *REDIS_REPLY_TYPE[] = {
        "ERROR_UNDEFINED_REDIS_REPLY_TYPE",
        "REDIS_REPLY_STRING",
        "REDIS_REPLY_ARRAY",
        "REDIS_REPLY_INTEGER",
        "REDIS_REPLY_NIL",
        "REDIS_REPLY_STATUS",
        "REDIS_REPLY_ERROR"
    };

    if(type >= REDIS_REPLY_STRING && type <= REDIS_REPLY_ERROR)
    {
        return REDIS_REPLY_TYPE[type];
    }
    else
        return REDIS_REPLY_TYPE[0];
}

void check_redis_reply(redisReply *reply, const char *cmd, int free)
{
    if(reply == NULL)
    {
        printf("exec : [%s] reply is null\n", cmd == NULL ? "NULL" : cmd);
    }
    else
    {
        printf("exec : [%s], reply->type = [%d:%s], reply->str = [%s]\n", 
                cmd == NULL ? "NULL" : cmd, reply->type, get_redis_reply_type(reply->type), reply->str);
        if(free)
            freeReplyObject(reply);
    }
}

void connectCallback(const redisAsyncContext *c, int status)
{
    if(status != REDIS_OK)
    {
        printf("connect error : [%s]\n", c->errstr);
        return;
    }
    else
    {
        printf("connect ok !\n");
    }
}

void disconnectCallback(const redisAsyncContext *c, int status)
{
    if(status != REDIS_OK)
    {
        printf("disconnect error : [%s]\n", c->errstr);
        return;
    }
    else
    {
        printf("disconnect ok !\n");
    }
}

void getCallback(redisClusterAsyncContext *acc, void *reply, void *cmd)
{
    // 会自动 free reply, 不要手动 free
    check_redis_reply(reply, cmd, 0);
}

void* event_loop_thread(void* c)
{
    struct event_base *base = event_base_new();

    redisClusterAsyncContext *acc = redisClusterAsyncConnect("127.0.0.1:6380", HIRCLUSTER_FLAG_NULL);

    if (acc->err) {
        printf("async connect error : [%s]\n", acc->errstr);
        return NULL;
    }

    redisClusterLibeventAttach(acc, base);
    redisClusterAsyncSetConnectCallback(acc, connectCallback);
    redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);

    // 必须先任意注册一个事件再 event_base_dispatch ,不然 event_base_dispatch 会失败
    if(redisClusterAsyncCommand(acc, getCallback, "first", "set %d %d", 0, 0) != REDIS_OK)
        printf("redisClusterAsyncCommand error : [%d:%s]\n", acc->err, acc->errstr);

    *(redisClusterAsyncContext**)c = acc;

    int ret = event_base_dispatch(base);
    printf("event loop error ! [%d]\n", ret);
}

int main()
{
    printf("------test async------\n");

    pthread_t tid;
    redisClusterAsyncContext *acc = NULL;
    if(pthread_create(&tid, NULL, event_loop_thread, &acc) != 0)
    {
        printf("pthread_create error : [%s]\n", strerror(errno));
        return -1;
    }

    while(acc == NULL)
    {
        sleep(1);
    }

    unsigned int i=0;
    while(1)
    {
        if(redisClusterAsyncCommand(acc, getCallback, NULL, "set %d %d", i, i) != REDIS_OK)
            printf("redisClusterAsyncCommand error !\n");
        ++i;
        usleep(1000);
    }
    
    pause();
    return 0;
}

运行大概30s后内存就开始暴涨了 --!

@deep011
Copy link
Contributor

deep011 commented Dec 5, 2016

acc不能被两个线程一起访问的

@deep011 deep011 closed this as completed Dec 5, 2016
@helloBingGeGe
Copy link
Author

@deep011 请教下, evetloop 会阻塞当前线程, 那如果想在多线程中用 acc 的话异步接口岂不是没法用了? 你们生产用的是同步接口还是异步的? 如果是异步是怎么做到的呢,烦请请指点下,谢谢!

@deep011
Copy link
Contributor

deep011 commented Dec 5, 2016

一个单独的线程运行evetloop,并且监控请求队列。其他线程把请求放入该队列中,evetloop线程从队列中拿出后执行。

@helloBingGeGe
Copy link
Author

@deep011 这个监控队列是通过 libevent 的信号事件吗?有什么地方可以找到例子吗? 如果是频繁向redis写入很多数据,这种模式是否高效?

@deep011
Copy link
Contributor

deep011 commented Dec 5, 2016

异步模式肯定是高效的。你可以在evetloop线程中设置一个readable事件,这个事件被触发的时候,就可以执行响应的redis异步命令了。

@helloBingGeGe
Copy link
Author

helloBingGeGe commented Dec 5, 2016

#include <time.h>
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
#include <hircluster.h>
#include <event2/thread.h>
#include <adapters/libevent.h>

#include <deque>
#include <string>
#include "ThreadLock.h"
using namespace std;

ThreadLock* g_lock;
deque<string> g_queue;
struct event g_write_reids_event;

const char* get_redis_reply_type(int type)
{
    static const char *REDIS_REPLY_TYPE[] = {
        "ERROR_UNDEFINED_REDIS_REPLY_TYPE",
        "REDIS_REPLY_STRING",
        "REDIS_REPLY_ARRAY",
        "REDIS_REPLY_INTEGER",
        "REDIS_REPLY_NIL",
        "REDIS_REPLY_STATUS",
        "REDIS_REPLY_ERROR"
    };

    if(type >= REDIS_REPLY_STRING && type <= REDIS_REPLY_ERROR)
    {
        return REDIS_REPLY_TYPE[type];
    }
    else
        return REDIS_REPLY_TYPE[0];
}

void check_redis_reply(redisReply *reply, const char* cmd, int free)
{
    if(reply == NULL)
    {
        printf("exec : [%s] reply is null\n", cmd == NULL ? "NULL" : cmd);
    }
    else
    {
        struct timeval tv;
        gettimeofday(&tv, NULL);
        time_t now = tv.tv_sec;
        struct tm *buf = localtime( &now );
        printf("exec : [%s], reply->type = [%d:%s], reply->str = [%s], time=[%d:%d:%d]\n", 
                cmd == NULL ? "NULL" : cmd, reply->type, get_redis_reply_type(reply->type), reply->str, buf->tm_hour, buf->tm_min, buf->tm_sec);
        if(free)
            freeReplyObject(reply);
    }
}

void connectCallback(const redisAsyncContext *c, int status)
{
    if(status != REDIS_OK)
    {
        printf("connect error : [%s]\n", c->errstr);
        return;
    }
    else
    {
        printf("connect ok !\n");
    }
}

void disconnectCallback(const redisAsyncContext *c, int status)
{
    if(status != REDIS_OK)
    {
        printf("disconnect error : [%s]\n", c->errstr);
        return;
    }
    else
    {
        printf("disconnect ok !\n");
    }
}

void getCallback(redisClusterAsyncContext *acc, void *reply, void *cmd)
{
    // 会自动 free reply, 不要手动 free
    check_redis_reply((redisReply*)reply, (const char*)cmd, 0);
}

    static void
signal_cb(evutil_socket_t fd, short event, void *arg)
{
    struct event *signal = (struct event*)arg;
    printf("%s: got signal %d\n", __func__, EVENT_SIGNAL(signal));
    exit(0);
}

    static void
redisClusterAsyncCommand_cb(evutil_socket_t fd, short event, void *arg)
{
    GuardLock gl(g_lock);

    redisClusterAsyncContext *acc = (redisClusterAsyncContext *)arg;

    while(!g_queue.empty())
    {
        if(redisClusterAsyncCommand(acc, getCallback, (void*)g_queue.front().c_str(), g_queue.front().c_str()) != REDIS_OK)
            printf("redisClusterAsyncCommand [%s] error !\n", g_queue.front().c_str());
        g_queue.pop_front();
    }
}

void* event_loop_thread(void* c)
{
    if(-1 == evthread_use_pthreads())
        printf("use thread-safe event failed !\n");

    struct event_base *base = event_base_new();

    redisClusterAsyncContext *acc = redisClusterAsyncConnect("127.0.0.1:6380", HIRCLUSTER_FLAG_NULL);

    if (acc->err) {
        printf("async connect error : [%s]\n", acc->errstr);
        return NULL;
    }

    redisClusterLibeventAttach(acc, base);
    redisClusterAsyncSetConnectCallback(acc, connectCallback);
    redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);

    // 注册事件回调异步写redis函数
    event_assign(&g_write_reids_event, base, -1, EV_PERSIST, redisClusterAsyncCommand_cb, acc);
    event_add(&g_write_reids_event, NULL);

    // 注册退出事件
    struct event signal_int;
    event_assign(&signal_int, base, SIGINT, EV_SIGNAL|EV_PERSIST, signal_cb,
            &signal_int);
    event_add(&signal_int, NULL);

    *(redisClusterAsyncContext**)c = acc;

    int ret = event_base_dispatch(base);
    event_base_free(base);
    printf("event loop error ! [%d]\n", ret);
}

void* test_thread(void* v)
{
    unsigned int i=0;
    while(1)
    {
        char cmd[32] = {0};
        snprintf(cmd, sizeof(cmd), "set %u %u", i, i);

        {
                GuardLock gl(g_lock);
                g_queue.push_back(cmd);
        }
        event_active(&g_write_reids_event, i, i);
        ++i;

        usleep(100);
    }
}

int main()
{
    signal(SIGPIPE, SIG_IGN);

    g_lock = new ThreadLock();

    pthread_t tid;
    redisClusterAsyncContext *acc = NULL;
    if(pthread_create(&tid, NULL, event_loop_thread, &acc) != 0)
    {
        printf("pthread_create error : [%s]\n", strerror(errno));
        return -1;
    }

    // 等待 event_loop_thread 创建成功
    while(acc == NULL)
    {
        sleep(1);
    }

    pthread_t tid1;
    if(pthread_create(&tid1, NULL, test_thread, NULL) != 0)
    {
        printf("pthread_create error : [%s]\n", strerror(errno));
        return -1;
    }   

    pause();
    return 0;
}

@deep011 大神请问下是否像这样多个线程共享一个队列,然后 event_loop 单线程的处理队列中的命令吗? 这样多线程频繁的写入,锁竞争太大了吧, 我在我的2核虚拟机上每秒只能处理2000笔。
测试又发现写到大概150多万条的时候 coredump 了。。。

Core was generated by `./a.out'.
Program terminated with signal 11, Segmentation fault.
#0  0x000000000040b585 in dictSdsKeyCompare (privdata=0x0, key1=0x7f320c1266d8, key2=0x7f3200000000) at hircluster.c:78
78          if (l1 != l2) return 0;
Missing separate debuginfos, use: debuginfo-install glibc-2.12-1.192.el6.x86_64 libgcc-4.4.7-17.el6.x86_64 libstdc++-4.4.7-17.el6.x86_64
(gdb) bt
#0  0x000000000040b585 in dictSdsKeyCompare (privdata=0x0, key1=0x7f320c1266d8, key2=0x7f3200000000) at hircluster.c:78
#1  0x000000000040aa7c in dictFind (ht=0x7f320c16d440, key=0x7f320c1266d8) at dict.c:251
#2  0x000000000040f418 in cluster_nodes_swap_ctx (cc=0x7f3212451010, ip=<value optimized out>, port=<value optimized out>)
    at hircluster.c:626
#3  cluster_update_route_by_addr (cc=0x7f3212451010, ip=<value optimized out>, port=<value optimized out>) at hircluster.c:1406
#4  0x000000000040f6ca in cluster_update_route (cc=0x7f3212451010) at hircluster.c:1910
#5  0x000000000040fc5b in redisClusterAsyncCallback (ac=<value optimized out>, r=0x0, privdata=0x7f320c073cc0) at hircluster.c:4294
#6  0x00000000004081bd in __redisRunCallback (ac=0x7f320c0e8670) at async.c:269
#7  __redisAsyncFree (ac=0x7f320c0e8670) at async.c:283
#8  0x00000000004092bc in __redisAsyncDisconnect (ac=0x7f320c0e8670) at async.c:348
#9  redisProcessCallbacks (ac=0x7f320c0e8670) at async.c:487
#10 0x00000000004020d0 in redisLibeventReadEvent(int, short, void*) ()
#11 0x00007f3212032f0c in event_process_active_single_queue (base=0x7f320c000980, flags=0) at event.c:1368
#12 event_process_active (base=0x7f320c000980, flags=0) at event.c:1438
#13 event_base_loop (base=0x7f320c000980, flags=0) at event.c:1639
#14 0x00000000004027f3 in event_loop_thread(void*) ()
#15 0x00007f32110d5aa1 in start_thread () from /lib64/libpthread.so.0
#16 0x00007f32113d3aad in clone () from /lib64/libc.so.6

@helloBingGeGe
Copy link
Author

helloBingGeGe commented Dec 6, 2016

127.0.0.1:6380> cluster nodes
907d20a754a99ecd4daae47fbc4c1ff25e988a3f 127.0.0.1:6382 slave d5d9401de32f0d19f923289e264b4190c272b0d0 1481010415359 1481010401565 8 connected
5c19bf9e08f9c9c730a57c0bb6fcbddf60ab6480 127.0.0.1:6380 myself,slave 9dbc28ed8b4cf78227876dcac5120eb4d922ed13 0 0 1 connected
d5d9401de32f0d19f923289e264b4190c272b0d0 127.0.0.1:6385 master,fail? - 1481010399557 1481010397686 8 disconnected 10923-16383
9dbc28ed8b4cf78227876dcac5120eb4d922ed13 127.0.0.1:6383 master,fail? - 1481010399557 1481010397686 9 disconnected 0-5460
51a8a647e88155f5b134a982e8aa26064e6c6d28 127.0.0.1:6384 master - 1481010415359 1481010401565 7 connected 5461-10922
d53f74b2e85e1d5e0f456dcdfc4583695fb92b91 127.0.0.1:6381 slave,fail 51a8a647e88155f5b134a982e8aa26064e6c6d28 1481010393671 1481010393073 7 disconnected
(10.34s)

127.0.0.1:6380> cluster info
cluster_state:fail
cluster_slots_assigned:16384
cluster_slots_ok:5462
cluster_slots_pfail:10922
cluster_slots_fail:0
cluster_known_nodes:6
cluster_size:3
cluster_current_epoch:30
cluster_my_epoch:27
cluster_stats_messages_sent:73692
cluster_stats_messages_received:73138

@deep011 发现每次快 coredump 的时候(写入大概150多万条命令), redis-cluster 就极不稳定, 如上面的 cluster nodes 命令所示, 然后部分redis节点就再也无法写入数据, 接着应用程序就会出现上面的崩溃。

@deep011
Copy link
Contributor

deep011 commented Dec 6, 2016

你的集群设置timeout了吗?

@helloBingGeGe
Copy link
Author

用的 create-cluster 脚本:

Settings

PORT=6379
TIMEOUT=2000
NODES=6
REPLICAS=1

@deep011
Copy link
Contributor

deep011 commented Dec 6, 2016

  1. 看看你cluster的cluster-require-full-coverage设置的是什么?如果是yes的话,要改成no;
  2. hiredis-vip的异步api效率很高的,qps跑到几万很easy,你的qps不高,主要是你程序使用的问题;
  3. 控制命令的发送量,并不是发送的越快,效率就越高,要看发送了多少命令,有多少命令还没有收到回复,要控制没有收到回复的命令数量,如果命令积攒的太多没有回复,反而大大降低你的qps。

@helloBingGeGe
Copy link
Author

helloBingGeGe commented Dec 6, 2016

@deep011

  1. 已改为no
  2. 能指出下哪里的问题吗 - -!
  3. 看了下响应很快但是写到150w的时候有响应就很慢了, redis-server 都是 D 了
    image

@deep011
Copy link
Contributor

deep011 commented Dec 6, 2016

@deep011
Copy link
Contributor

deep011 commented Dec 6, 2016

另外,请下载最新的master代码。

@helloBingGeGe
Copy link
Author

@deep011 非常感谢!我立马试试

@helloBingGeGe
Copy link
Author

helloBingGeGe commented Dec 6, 2016

image
image
image
image

@deep011 为什么也是写到150w的时候出现了大量的 null reply,写入的速度也是越来越慢,reids-server 像是被写死了后又自动重启了 - -!

@deep011
Copy link
Contributor

deep011 commented Dec 7, 2016

你集群有问题,你把redis-cluster-tool装上先。

@helloBingGeGe
Copy link
Author

helloBingGeGe commented Dec 7, 2016

@deep011 谢谢你的耐心解答, 找出问题原因了, 是我虚拟机只有 512M 的内存, 导致写到150w的时候,内存被耗光了。。。

@dandyhuang
Copy link

@deep011 谢谢你的耐心解答, 找出问题原因了, 是我虚拟机只有 512M 的内存, 导致写到150w的时候,内存被耗光了。。。
请教一下,你这边是用的大佬给的 https://github.com/vipshop/hiredis-vip/wiki/test_hiredisvip_async_ae.c 例子来封装的吗,不是很想用这个

@dandyhuang
Copy link

异步模式肯定是高效的。你可以在evetloop线程中设置一个readable事件,这个事件被触发的时候,就可以执行响应的redis异步命令了。

大佬这个一定要设置一个readable事件吗,不能调用完redisClusterAsyncCommand后在 aeMain里头,获取回掉吗,我这样处理。 回掉没有调用

#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <iostream>
#include <thread>
#include <vector>

#ifdef __cplusplus
extern "C" {
#endif
#include "adapters/ae.h"
#include "async.h"
#include "hircluster.h"
#include "hiredis.h"
#ifdef __cplusplus
}
#endif

/* Put event loop in the global scope, so it can be explicitly stopped */
void getCallback(redisClusterAsyncContext* c, void* r, void* privdata) {
  redisReply* reply = (redisReply*)r;
  if (reply == NULL) return;
  printf("argv[%s]: %s\n", (char*)privdata, reply->str);

  /* Disconnect after receiving the reply to GET */
  // redisAsyncDisconnect(c);
}

void connectCallback(const redisAsyncContext* c, int status) {
  if (status != REDIS_OK) {
    printf("Error: %s\n", c->errstr);
    // aeStop(loop);
    return;
  }

  printf("Connected...\n");
}

void disconnectCallback(const redisAsyncContext* c, int status) {
  if (status != REDIS_OK) {
    printf("Error: %s\n", c->errstr);
    // aeStop(loop);
    return;
  }

  printf("Disconnected...\n");
  // aeStop(loop);
}

void* thread_fun(void* data) {
  aeEventLoop* loop = aeCreateEventLoop(64);
  int flags = HIRCLUSTER_FLAG_NULL;
  printf("address: %s\n", (char*)data);
  std::string address = (const char*)data;
  redisClusterAsyncContext* c = redisClusterAsyncConnect(address.c_str(), flags);
  if (c->err) {
    /* Let *c leak for now... */
    printf("Error: %s\n", c->errstr);
    return NULL;
  }
  redisClusterAeAttach(loop, c);
  redisClusterAsyncSetConnectCallback(c, connectCallback);
  redisClusterAsyncSetDisconnectCallback(c, disconnectCallback);
  // aeMain(loop);
  // redisAsyncContext* c = (redisAsyncContext*)data;
  loop->stop = 0;
  while (!loop->stop) {
    redisClusterAsyncCommand(c, getCallback, (char*)"1", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"1", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"2", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"3", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"4", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"5", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"6", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"7", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"8", "GET key");
    redisClusterAsyncCommand(c, getCallback, (char*)"9", "GET key");
    // CMessageCollector::GetInstance()->SetResponseMessage(reply);
    if (loop->beforesleep != NULL) {
      loop->beforesleep(loop);
    }
    printf("thread_fun get\n");
    sleep(1);
    aeProcessEvents(loop, AE_ALL_EVENTS | AE_DONT_WAIT);
  }
}

int main(int argc, char** argv) {
  signal(SIGPIPE, SIG_IGN);

  std::vector<std::thread> threads;
  for (int i = 0; i < 5; ++i) {
    threads.push_back(std::thread(thread_fun, argv[1]));
  }

  // redisClusterAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc - 1], strlen(argv[argc - 1]));

  // aeMain(loop);
  for (auto& thread : threads) {
    thread.join();
  }
  return 0;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants