-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
使用 master 分支最新代码的异步接口时,写入大量命令时会 coredump,请帮忙看下啥原因导致的,谢谢! #26
Comments
acc不能被两个线程一起访问的 |
@deep011 请教下, evetloop 会阻塞当前线程, 那如果想在多线程中用 acc 的话异步接口岂不是没法用了? 你们生产用的是同步接口还是异步的? 如果是异步是怎么做到的呢,烦请请指点下,谢谢! |
一个单独的线程运行evetloop,并且监控请求队列。其他线程把请求放入该队列中,evetloop线程从队列中拿出后执行。 |
@deep011 这个监控队列是通过 libevent 的信号事件吗?有什么地方可以找到例子吗? 如果是频繁向redis写入很多数据,这种模式是否高效? |
异步模式肯定是高效的。你可以在evetloop线程中设置一个readable事件,这个事件被触发的时候,就可以执行响应的redis异步命令了。 |
@deep011 大神请问下是否像这样多个线程共享一个队列,然后 event_loop 单线程的处理队列中的命令吗? 这样多线程频繁的写入,锁竞争太大了吧, 我在我的2核虚拟机上每秒只能处理2000笔。
|
@deep011 发现每次快 coredump 的时候(写入大概150多万条命令), redis-cluster 就极不稳定, 如上面的 cluster nodes 命令所示, 然后部分redis节点就再也无法写入数据, 接着应用程序就会出现上面的崩溃。 |
你的集群设置timeout了吗? |
用的 create-cluster 脚本: SettingsPORT=6379 |
|
另外,请下载最新的master代码。 |
@deep011 非常感谢!我立马试试 |
@deep011 为什么也是写到150w的时候出现了大量的 null reply,写入的速度也是越来越慢,reids-server 像是被写死了后又自动重启了 - -! |
你集群有问题,你把redis-cluster-tool装上先。 |
@deep011 谢谢你的耐心解答, 找出问题原因了, 是我虚拟机只有 512M 的内存, 导致写到150w的时候,内存被耗光了。。。 |
|
大佬这个一定要设置一个readable事件吗,不能调用完redisClusterAsyncCommand后在 aeMain里头,获取回掉吗,我这样处理。 回掉没有调用 #include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <iostream>
#include <thread>
#include <vector>
#ifdef __cplusplus
extern "C" {
#endif
#include "adapters/ae.h"
#include "async.h"
#include "hircluster.h"
#include "hiredis.h"
#ifdef __cplusplus
}
#endif
/* Put event loop in the global scope, so it can be explicitly stopped */
void getCallback(redisClusterAsyncContext* c, void* r, void* privdata) {
redisReply* reply = (redisReply*)r;
if (reply == NULL) return;
printf("argv[%s]: %s\n", (char*)privdata, reply->str);
/* Disconnect after receiving the reply to GET */
// redisAsyncDisconnect(c);
}
void connectCallback(const redisAsyncContext* c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
// aeStop(loop);
return;
}
printf("Connected...\n");
}
void disconnectCallback(const redisAsyncContext* c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
// aeStop(loop);
return;
}
printf("Disconnected...\n");
// aeStop(loop);
}
void* thread_fun(void* data) {
aeEventLoop* loop = aeCreateEventLoop(64);
int flags = HIRCLUSTER_FLAG_NULL;
printf("address: %s\n", (char*)data);
std::string address = (const char*)data;
redisClusterAsyncContext* c = redisClusterAsyncConnect(address.c_str(), flags);
if (c->err) {
/* Let *c leak for now... */
printf("Error: %s\n", c->errstr);
return NULL;
}
redisClusterAeAttach(loop, c);
redisClusterAsyncSetConnectCallback(c, connectCallback);
redisClusterAsyncSetDisconnectCallback(c, disconnectCallback);
// aeMain(loop);
// redisAsyncContext* c = (redisAsyncContext*)data;
loop->stop = 0;
while (!loop->stop) {
redisClusterAsyncCommand(c, getCallback, (char*)"1", "GET key");
redisClusterAsyncCommand(c, getCallback, (char*)"1", "GET key");
redisClusterAsyncCommand(c, getCallback, (char*)"2", "GET key");
redisClusterAsyncCommand(c, getCallback, (char*)"3", "GET key");
redisClusterAsyncCommand(c, getCallback, (char*)"4", "GET key");
redisClusterAsyncCommand(c, getCallback, (char*)"5", "GET key");
redisClusterAsyncCommand(c, getCallback, (char*)"6", "GET key");
redisClusterAsyncCommand(c, getCallback, (char*)"7", "GET key");
redisClusterAsyncCommand(c, getCallback, (char*)"8", "GET key");
redisClusterAsyncCommand(c, getCallback, (char*)"9", "GET key");
// CMessageCollector::GetInstance()->SetResponseMessage(reply);
if (loop->beforesleep != NULL) {
loop->beforesleep(loop);
}
printf("thread_fun get\n");
sleep(1);
aeProcessEvents(loop, AE_ALL_EVENTS | AE_DONT_WAIT);
}
}
int main(int argc, char** argv) {
signal(SIGPIPE, SIG_IGN);
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
threads.push_back(std::thread(thread_fun, argv[1]));
}
// redisClusterAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc - 1], strlen(argv[argc - 1]));
// aeMain(loop);
for (auto& thread : threads) {
thread.join();
}
return 0;
} |
运行大概30s后内存就开始暴涨了 --!
The text was updated successfully, but these errors were encountered: