Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feature: migrate tools support pika v3.5.0
Browse files Browse the repository at this point in the history
pro-spild committed Jan 6, 2025
1 parent cd92a46 commit e744786
Showing 16 changed files with 1,228 additions and 3 deletions.
10 changes: 10 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
@@ -289,6 +289,16 @@ sync-window-size : 9000
# Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB). The value range is [64MB, 1GB].
max-conn-rbuf-size : 268435456

###################
## Migrate Settings
###################

target-redis-host : 127.0.0.1
target-redis-port : 6379
target-redis-pwd :

sync-batch-num : 100
redis-sender-num : 10

#######################################################################E#######
#! Critical Settings !#
66 changes: 66 additions & 0 deletions include/migrator_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#ifndef MIGRATOR_THREAD_H_
#define MIGRATOR_THREAD_H_

#include <iostream>
#include <mutex>

#include "storage/storage.h"
#include "net/include/redis_cli.h"

#include "include/pika_sender.h"

class MigratorThread : public net::Thread {
public:
MigratorThread(std::shared_ptr<storage::Storage> storage_, std::vector<std::shared_ptr<PikaSender>> *senders, int type, int thread_num) :
storage_(storage_),
should_exit_(false),
senders_(senders),
type_(type),
thread_num_(thread_num),
thread_index_(0),
num_(0) {
}

virtual ~ MigratorThread();

int64_t num() {
std::lock_guard<std::mutex> l(num_mutex_);
return num_;
}

void Stop() {
should_exit_ = true;
}

private:
void PlusNum() {
std::lock_guard<std::mutex> l(num_mutex_);
++num_;
}

void DispatchKey(const std::string &command, const std::string& key = "");

void MigrateDB();
void MigrateStringsDB();
void MigrateListsDB();
void MigrateHashesDB();
void MigrateSetsDB();
void MigrateZsetsDB();

virtual void *ThreadMain();

private:
std::shared_ptr<storage::Storage> storage_;
bool should_exit_;

std::vector<std::shared_ptr<PikaSender>> *senders_;
int type_;
int thread_num_;
int thread_index_;

int64_t num_;
std::mutex num_mutex_;
};

#endif

15 changes: 15 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
@@ -346,6 +346,14 @@ class PikaConf : public pstd::BaseConf {
int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); }
int consensus_level() { return consensus_level_.load(); }
int replication_num() { return replication_num_.load(); }

std::string target_redis_host() { return target_redis_host_; }
int target_redis_port() { return target_redis_port_; }
std::string target_redis_pwd() { return target_redis_pwd_; }
int sync_batch_num() { return sync_batch_num_; }
int redis_sender_num() { return redis_sender_num_; }


int rate_limiter_mode() {
std::shared_lock l(rwlock_);
return rate_limiter_mode_;
@@ -925,6 +933,13 @@ class PikaConf : public pstd::BaseConf {
std::map<std::string, std::string> diff_commands_;
void TryPushDiffCommands(const std::string& command, const std::string& value);

// migrate configure items
std::string target_redis_host_;
int target_redis_port_;
std::string target_redis_pwd_;
int sync_batch_num_;
int redis_sender_num_;

//
// Critical configure items
//
1 change: 1 addition & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@ class PikaReplBgWorker {
net::BGThread bg_thread_;
static int HandleWriteBinlog(net::RedisParser* parser, const net::RedisCmdArgsType& argv);
static void ParseBinlogOffset(const InnerMessage::BinlogOffset& pb_offset, LogOffset* offset);
static void ParseAndSendPikaCommand(const std::shared_ptr<Cmd>& c_ptr);
};

#endif // PIKA_REPL_BGWROKER_H_
43 changes: 43 additions & 0 deletions include/pika_sender.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#ifndef PIKA_SENDER_H_
#define PIKA_SENDER_H_

#include <atomic>
#include <thread>
#include <chrono>
#include <iostream>
#include <queue>

#include "net/include/bg_thread.h"
#include "net/include/net_cli.h"
#include "net/include/redis_cli.h"

class PikaSender : public net::Thread {
public:
PikaSender(std::string ip, int64_t port, std::string password);
virtual ~PikaSender();
void LoadKey(const std::string &cmd);
void Stop();

int64_t elements() { return elements_; }

void SendCommand(std::string &command, const std::string &key);
int QueueSize();
void ConnectRedis();

private:
net::NetCli *cli_;
pstd::CondVar wsignal_;
pstd::CondVar rsignal_;
std::mutex signal_mutex;
std::mutex keys_queue_mutex_;
std::queue<std::string> keys_queue_;
std::string ip_;
int port_;
std::string password_;
std::atomic<bool> should_exit_;
int64_t elements_;

virtual void *ThreadMain();
};

#endif
12 changes: 12 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@
#include "include/pika_statistic.h"
#include "include/pika_transaction.h"
#include "include/rsync_server.h"
#include "include/redis_sender.h"

extern std::unique_ptr<PikaConf> g_pika_conf;

@@ -309,6 +310,12 @@ class PikaServer : public pstd::noncopyable {

pstd::Status GetCmdRouting(std::vector<net::RedisCmdArgsType>& redis_cmds, std::vector<Node>* dst, bool* all_local);

/*
* migrate used
*/
int SendRedisCommand(const std::string& command, const std::string& key);
void RetransmitData(const std::string& path);

// info debug use
void ServerStatus(std::string* info);

@@ -617,6 +624,11 @@ class PikaServer : public pstd::noncopyable {
*/
std::unique_ptr<PikaAuxiliaryThread> pika_auxiliary_thread_;

/*
* migrate to redis used
*/
std::vector<std::unique_ptr<RedisSender>> redis_senders_;

/*
* Async slotsMgrt use
*/
52 changes: 52 additions & 0 deletions include/redis_sender.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#ifndef REDIS_SENDER_H_
#define REDIS_SENDER_H_

#include <atomic>
#include <thread>
#include <chrono>
#include <iostream>
#include <queue>

#include "pika_repl_bgworker.h"
#include "net/include/net_cli.h"
#include "net/include/redis_cli.h"

class RedisSender : public net::Thread {
public:
RedisSender(int id, std::string ip, int64_t port, std::string password);
virtual ~RedisSender();
void Stop(void);
int64_t elements() {
return elements_;
}

void SendRedisCommand(const std::string &command);

private:
int SendCommand(std::string &command);
void ConnectRedis();
size_t commandQueueSize() {
std::lock_guard l(keys_mutex_);
return commands_queue_.size();
}

private:
int id_;
std::shared_ptr<net::NetCli> cli_;
pstd::CondVar rsignal_;
pstd::CondVar wsignal_;
pstd::Mutex signal_mutex_;
pstd::Mutex keys_mutex_;
std::queue<std::string> commands_queue_;
std::string ip_;
int port_;
std::string password_;
bool should_exit_;
int32_t cnt_;
int64_t elements_;
std::atomic<time_t> last_write_time_;

virtual void *ThreadMain();
};

#endif
43 changes: 43 additions & 0 deletions pika-migrate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
## Pika3.5到Redis迁移工具

### 适用版本:
Pika 3.5, 单机模式且只支持单db

### 功能
将Pika中的数据在线迁移到Pika、Redis(支持全量、增量同步)

### 开发背景:
之前Pika项目官方提供的pika\_to\_redis工具仅支持离线将Pika的DB中的数据迁移到Pika、Redis, 且无法增量同步, 该工具实际上就是一个特殊的Pika, 只不过成为从库之后, 内部会将从主库获取到的数据转发给Redis,同时并支持增量同步, 实现热迁功能.

### 热迁原理
1. pika-port通过dbsync请求获取主库当前全量db数据, 以及当前db数据所对应的binlog点位
2. 获取到主库当前全量db数据之后, 扫描db, 将db中的数据转发给Redis
3. 通过之前获取的binlog的点位向主库进行增量同步, 在增量同步的过程中, 将从主库获取到的binlog重组成Redis命令, 转发给Redis

### 新增配置项
```cpp
###################
## Migrate Settings
###################

target-redis-host : 127.0.0.1
target-redis-port : 6379
target-redis-pwd : abc

sync-batch-num : 100
redis-sender-num : 10
```

### 步骤
1. 考虑到在pika-port在将全量数据写入到Redis这段时间可能耗时很长, 导致主库原先binlog点位已经被清理, 我们首先在主库上执行`config set expire-logs-nums 10000`, 让主库保留10000个Binlog文件(Binlog文件占用磁盘空间, 可以根据实际情况确定保留binlog的数量), 确保后续该工具请求增量同步的时候, 对应的Binlog文件还存在.
2. 修改该工具配置文件的`target-redis-host, target-redis-port, target-redis-pwd, sync-batch-num, redis-sender-num`配置项(`sync-batch-num`是该工具接收到主库的全量数据之后, 为了提升转发效率, 将`sync-batch-num`个数据一起打包发送给Redis, 此外该工具内部可以指定`redis-sender-num`个线程用于转发命令, 命令通过Key的哈希值被分配到不同的线程中, 所以无需担心多线程发送导致的数据错乱的问题)
3. 使用`pika -c pika.conf`命令启动该工具, 查看日志是否有报错信息
4. 向该工具执行`slaveof ip port force`向主库请求同步, 观察是否有报错信息
5. 在确认主从关系建立成功之后(此时pika-port同时也在向目标Redis转发数据了)通过向主库执行`info Replication`查看主从同步延迟(可在主库写入一个特殊的Key, 然后看在Redis测是否可以立马获取到, 来判断是否数据已经基本同步完毕)

### 注意事项
1. Pika支持不同数据结构采用同名Key, 但是Redis不支持, 所以在有同Key数据的场景下, 以第一个迁移到Redis数据结构为准, 其他同Key数据结构会丢失
2. 该工具只支持热迁移单机模式下, 并且只采用单DB版本的Pika, 如果是集群模式, 或者是多DB场景, 工具会报错并且退出.
3. 为了避免由于主库Binlog被清理导致该工具触发多次全量同步向Redis写入脏数据, 工具自身做了保护, 在第二次触发全量同步时会报错退出.


466 changes: 466 additions & 0 deletions src/migrator_thread.cc

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
@@ -207,6 +207,12 @@ int PikaConf::Load() {

if (classic_mode_.load()) {
GetConfInt("databases", &databases_);

// pika-migrate-tool only support 1 db
if (databases_ != 1) {
LOG(FATAL) << "pika-migrate-tool only support 1 db";
}

if (databases_ < 1 || databases_ > MAX_DB_NUM) {
LOG(FATAL) << "config databases error, limit [1 ~ 8], the actual is: " << databases_;
}
@@ -627,6 +633,22 @@ int PikaConf::Load() {
sync_window_size_.store(tmp_sync_window_size);
}

// redis-migrate conifg args
target_redis_host_ = "127.0.0.1";
GetConfStr("target-redis-host", &target_redis_host_);

target_redis_port_ = 6379;
GetConfInt("target-redis-port", &target_redis_port_);

target_redis_pwd_ = "";
GetConfStr("target-redis-pwd" , &target_redis_pwd_);

sync_batch_num_ = 100;
GetConfInt("sync-batch-num", &sync_batch_num_);

redis_sender_num_ = 8;
GetConfInt("redis-sender-num", &redis_sender_num_);

// max conn rbuf size
int tmp_max_conn_rbuf_size = PIKA_MAX_CONN_RBUF;
GetConfIntHuman("max-conn-rbuf-size", &tmp_max_conn_rbuf_size);
3 changes: 3 additions & 0 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
@@ -473,6 +473,9 @@ bool DB::TryUpdateMasterOffset() {
<< ", master_ip: " << master_ip << ", master_port: " << master_port << ", filenum: " << filenum
<< ", offset: " << offset << ", term: " << term << ", index: " << index;

// Retransmit Data to target redis
g_pika_server->RetransmitData(dbsync_path_);

pstd::DeleteFile(info_path);
if (!ChangeDb(dbsync_path_)) {
LOG(WARNING) << "DB: " << db_name_ << ", Failed to change db";
32 changes: 32 additions & 0 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
@@ -231,6 +231,7 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr) {
&& PIKA_CACHE_NONE != g_pika_conf->cache_mode()
&& c_ptr->GetDB()->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
if (c_ptr->is_write()) {
ParseAndSendPikaCommand(c_ptr);
c_ptr->DoThroughDB();
if (c_ptr->IsNeedUpdateCache()) {
c_ptr->DoUpdateCache();
@@ -239,6 +240,7 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr) {
LOG(WARNING) << "It is impossbile to reach here";
}
} else {
ParseAndSendPikaCommand(c_ptr);
c_ptr->Do();
}
if (!c_ptr->IsSuspend()) {
@@ -274,3 +276,33 @@ void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr) {
}
}
}

void PikaReplBgWorker::ParseAndSendPikaCommand(const std::shared_ptr<Cmd>& c_ptr) {
const PikaCmdArgsType& argv = c_ptr->argv();
if (!strcasecmp(argv[0].data(), "pksetexat")) {
if (argv.size() != 4) {
LOG(WARNING) << "find invaild command, command size: " << argv.size();
return;
} else {
std::string key = argv[1];
int timestamp = std::atoi(argv[2].data());
std::string value = argv[3];

int seconds = timestamp - time(NULL);
PikaCmdArgsType tmp_argv;
tmp_argv.push_back("setex");
tmp_argv.push_back(key);
tmp_argv.push_back(std::to_string(seconds));
tmp_argv.push_back(value);

std::string command;
net::SerializeRedisCommand(tmp_argv, &command);
g_pika_server->SendRedisCommand(command, key);
}
} else {
std::string key = argv.size() >= 2 ? argv[1] : argv[0];
std::string command;
net::SerializeRedisCommand(argv, &command);
g_pika_server->SendRedisCommand(command, key);
}
}
9 changes: 7 additions & 2 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
@@ -154,6 +154,7 @@ void PikaReplClientConn::HandleMetaSyncResponse(void* arg) {
LOG(INFO) << "Finish to handle meta sync response";
}

static bool alerady_full_sync = false;
void PikaReplClientConn::HandleDBSyncResponse(void* arg) {
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
@@ -178,12 +179,16 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) {
return;
}

slave_db->SetMasterSessionId(session_id);
if (!alerady_full_sync) {
alerady_full_sync = true;
} else {
LOG(FATAL) << "DBSyncResponse should only be called once";
}

slave_db->SetMasterSessionId(session_id);
slave_db->StopRsync();
slave_db->SetReplState(ReplState::kWaitDBSync);
LOG(INFO) << "DB: " << db_name << " Need Wait To Sync";

//now full sync is starting, add an unfinished full sync count
g_pika_conf->AddInternalUsedUnfinishedFullSync(slave_db->DBName());
}
173 changes: 173 additions & 0 deletions src/pika_sender.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include "include/pika_sender.h"

#include <glog/logging.h>

PikaSender::PikaSender(std::string ip, int64_t port, std::string password):
cli_(NULL),
ip_(ip),
port_(port),
password_(password),
should_exit_(false),
elements_(0)
{
}

PikaSender::~PikaSender() {
}

int PikaSender::QueueSize() {
std::lock_guard<std::mutex> lock(keys_queue_mutex_);
return keys_queue_.size();
}

void PikaSender::Stop() {
should_exit_.store(true);
wsignal_.notify_all();
rsignal_.notify_all();
LOG(INFO) << "PikaSender received stop signal";
}

void PikaSender::ConnectRedis() {
while (cli_ == NULL) {
// Connect to redis
cli_ = net::NewRedisCli();
cli_->set_connect_timeout(1000);
pstd::Status s = cli_->Connect(ip_, port_);
if (!s.ok()) {
delete cli_;
cli_ = NULL;
LOG(WARNING) << "Can not connect to " << ip_ << ":" << port_ << ", status: " << s.ToString();
continue;
} else {
// Connect success

// Authentication
if (!password_.empty()) {
net::RedisCmdArgsType argv, resp;
std::string cmd;

argv.push_back("AUTH");
argv.push_back(password_);
net::SerializeRedisCommand(argv, &cmd);
pstd::Status s = cli_->Send(&cmd);

if (s.ok()) {
s = cli_->Recv(&resp);
if (resp[0] == "OK") {
} else {
LOG(FATAL) << "Connect to redis(" << ip_ << ":" << port_ << ") Invalid password";
cli_->Close();
delete cli_;
cli_ = NULL;
should_exit_ = true;
return;
}
} else {
LOG(WARNING) << "send auth failed: " << s.ToString();
cli_->Close();
delete cli_;
cli_ = NULL;
continue;
}
} else {
// If forget to input password
net::RedisCmdArgsType argv, resp;
std::string cmd;

argv.push_back("PING");
net::SerializeRedisCommand(argv, &cmd);
pstd::Status s = cli_->Send(&cmd);

if (s.ok()) {
s = cli_->Recv(&resp);
if (s.ok()) {
if (resp[0] == "NOAUTH Authentication required.") {
LOG(FATAL) << "Ping redis(" << ip_ << ":" << port_ << ") NOAUTH Authentication required";
cli_->Close();
delete cli_;
cli_ = NULL;
should_exit_ = true;
return;
}
} else {
LOG(WARNING) << "Recv failed: " << s.ToString();
cli_->Close();
delete cli_;
cli_ = NULL;
}
}
}
}
}
}

void PikaSender::LoadKey(const std::string &key) {
std::unique_lock lock(signal_mutex);
wsignal_.wait(lock, [this]() { return keys_queue_.size() < 100000 || should_exit_; });
if(!should_exit_) {
std::lock_guard<std::mutex> lock(keys_queue_mutex_);
keys_queue_.push(key);
rsignal_.notify_one();
}
}

void PikaSender::SendCommand(std::string &command, const std::string &key) {
// Send command
pstd::Status s = cli_->Send(&command);
if (!s.ok()) {
elements_--;
LoadKey(key);
cli_->Close();
LOG(INFO) << s.ToString().data();
delete cli_;
cli_ = NULL;
ConnectRedis();
}else{
cli_->Recv(NULL);
}
}

void *PikaSender::ThreadMain() {
LOG(INFO) << "Start sender thread...";

if (cli_ == NULL) {
ConnectRedis();
}

while (!should_exit_ || QueueSize() != 0) {
std::string command;

std::unique_lock lock(signal_mutex);
rsignal_.wait(lock, [this]() { return !QueueSize() == 0 || should_exit_; });
if (QueueSize() == 0 && should_exit_) {
return NULL;
}
lock.unlock();

std::string key;
{
std::lock_guard<std::mutex> lock(keys_queue_mutex_);
key = keys_queue_.front();
elements_++;
keys_queue_.pop();
}
wsignal_.notify_one();
SendCommand(key, key);

}


if (cli_) {
cli_->Close();
delete cli_;
cli_ = NULL;
}
LOG(INFO) << "PikaSender thread complete";
return NULL;
}

96 changes: 95 additions & 1 deletion src/pika_server.cc
Original file line number Diff line number Diff line change
@@ -24,6 +24,8 @@
#include "include/pika_monotonic_time.h"
#include "include/pika_rm.h"
#include "include/pika_server.h"
#include "include/pika_sender.h"
#include "include/migrator_thread.h"

using pstd::Status;
extern PikaServer* g_pika_server;
@@ -101,6 +103,15 @@ PikaServer::PikaServer()
}
}

// Create redis sender
for (int i = 0; i < g_pika_conf->redis_sender_num(); ++i) {
redis_senders_.emplace_back(std::make_unique<RedisSender>(int(i),
g_pika_conf->target_redis_host(),
g_pika_conf->target_redis_port(),
g_pika_conf->target_redis_pwd()));
}


acl_ = std::make_unique<::Acl>();
SetSlowCmdThreadPoolFlag(g_pika_conf->slow_cmd_pool());
bgsave_thread_.set_thread_name("PikaServer::bgsave_thread_");
@@ -129,7 +140,10 @@ PikaServer::~PikaServer() {
bgsave_thread_.StopThread();
key_scan_thread_.StopThread();
pika_migrate_thread_->StopThread();

for (size_t i = 0; i < redis_senders_.size(); ++i) {
redis_senders_[i]->Stop();
}
redis_senders_.clear();
dbs_.clear();

LOG(INFO) << "PikaServer " << pthread_self() << " exit!!!";
@@ -209,6 +223,15 @@ void PikaServer::Start() {
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
}

for (size_t i = 0; i < redis_senders_.size(); ++i) {
ret = redis_senders_[i]->StartThread();
if (ret != net::kSuccess) {
dbs_.clear();
LOG(FATAL) << "Start RedisSender Error: " << ret
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
}
}

time(&start_time_s_);
LOG(INFO) << "Pika Server going to start";
rsync_server_->Start();
@@ -1542,6 +1565,77 @@ Status PikaServer::GetCmdRouting(std::vector<net::RedisCmdArgsType>& redis_cmds,
return Status::OK();
}

int PikaServer::SendRedisCommand(const std::string& command, const std::string& key) {
// Send command
size_t idx = std::hash<std::string>()(key) % redis_senders_.size();
redis_senders_[idx]->SendRedisCommand(command);
return 0;
}

void PikaServer::RetransmitData(const std::string& path) {
std::shared_ptr<storage::Storage> storage_ = std::make_shared<storage::Storage>();
rocksdb::Status s = storage_->Open(g_pika_server->storage_options(), path);

if (!s.ok()) {
LOG(FATAL) << "open received database error: " << s.ToString();
return;
}

// Init SenderThread
int thread_num = g_pika_conf->redis_sender_num();
std::string target_host = g_pika_conf->target_redis_host();
int target_port = g_pika_conf->target_redis_port();
std::string target_pwd = g_pika_conf->target_redis_pwd();

LOG(INFO) << "open received database success, start retransmit data to redis("
<< target_host << ":" << target_port << ")";


std::vector<std::shared_ptr<PikaSender>> pika_senders;
std::vector<std::shared_ptr<MigratorThread>> migrators;

for (int i = 0; i < thread_num; i++) {
pika_senders.emplace_back(std::make_shared<PikaSender>(target_host, target_port, target_pwd));
}
migrators.emplace_back(std::make_shared<MigratorThread>(storage_, &pika_senders, storage::kStrings, thread_num));
migrators.emplace_back(std::make_shared<MigratorThread>(storage_, &pika_senders, storage::kLists, thread_num));
migrators.emplace_back(std::make_shared<MigratorThread>(storage_, &pika_senders, storage::kHashes, thread_num));
migrators.emplace_back(std::make_shared<MigratorThread>(storage_, &pika_senders, storage::kSets, thread_num));
migrators.emplace_back(std::make_shared<MigratorThread>(storage_, &pika_senders, storage::kZSets, thread_num));

for (size_t i = 0; i < pika_senders.size(); i++) {
pika_senders[i]->StartThread();
}
for (size_t i = 0; i < migrators.size(); i++) {
migrators[i]->StartThread();
}

for (size_t i = 0; i < migrators.size(); i++) {
migrators[i]->JoinThread();
}
for (size_t i = 0; i < pika_senders.size(); i++) {
pika_senders[i]->Stop();
}
for (size_t i = 0; i < pika_senders.size(); i++) {
pika_senders[i]->JoinThread();
}

int64_t replies = 0, records = 0;
for (size_t i = 0; i < migrators.size(); i++) {
records += migrators[i]->num();
}
migrators.clear();
for (size_t i = 0; i < pika_senders.size(); i++) {
replies += pika_senders[i]->elements();
}
pika_senders.clear();

LOG(INFO) << "=============== Retransmit Finish =====================";
LOG(INFO) << "Total records : " << records << " have been Scaned";
LOG(INFO) << "Total replies : " << replies << " received from redis server";
LOG(INFO) << "=======================================================";
}

void PikaServer::ServerStatus(std::string* info) {
std::stringstream tmp_stream;
size_t q_size = ClientProcessorThreadPoolCurQueueSize();
188 changes: 188 additions & 0 deletions src/redis_sender.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.


#include "include/redis_sender.h"

#include <time.h>
#include <unistd.h>

#include <glog/logging.h>

static time_t kCheckDiff = 1;

RedisSender::RedisSender(int id, std::string ip, int64_t port, std::string password):
id_(id),
cli_(NULL),
ip_(ip),
port_(port),
password_(password),
should_exit_(false),
cnt_(0),
elements_(0) {

last_write_time_ = ::time(NULL);
}

RedisSender::~RedisSender() {
LOG(INFO) << "RedisSender thread " << id_ << " exit!!!";
}

void RedisSender::ConnectRedis() {
while (cli_ == NULL) {
// Connect to redis
cli_ = std::shared_ptr<net::NetCli>(net::NewRedisCli());
cli_->set_connect_timeout(1000);
cli_->set_recv_timeout(10000);
cli_->set_send_timeout(10000);
pstd::Status s = cli_->Connect(ip_, port_);
if (!s.ok()) {
LOG(WARNING) << "Can not connect to " << ip_ << ":" << port_ << ", status: " << s.ToString();
cli_ = NULL;
sleep(3);
continue;
} else {
// Connect success
LOG(INFO) << "RedisSender thread " << id_ << "Connect to redis(" << ip_ << ":" << port_ << ") success";
// Authentication
if (!password_.empty()) {
net::RedisCmdArgsType argv, resp;
std::string cmd;

argv.push_back("AUTH");
argv.push_back(password_);
net::SerializeRedisCommand(argv, &cmd);
pstd::Status s = cli_->Send(&cmd);

if (s.ok()) {
s = cli_->Recv(&resp);
if (resp[0] == "OK") {
} else {
LOG(FATAL) << "Connect to redis(" << ip_ << ":" << port_ << ") Invalid password";
cli_->Close();
cli_ = NULL;
should_exit_ = true;
return;
}
} else {
LOG(WARNING) << "send auth failed: " << s.ToString();
cli_->Close();
cli_ = NULL;
continue;
}
} else {
// If forget to input password
net::RedisCmdArgsType argv, resp;
std::string cmd;

argv.push_back("PING");
net::SerializeRedisCommand(argv, &cmd);
pstd::Status s = cli_->Send(&cmd);

if (s.ok()) {
s = cli_->Recv(&resp);
if (s.ok()) {
if (resp[0] == "NOAUTH Authentication required.") {
LOG(FATAL) << "Ping redis(" << ip_ << ":" << port_ << ") NOAUTH Authentication required";
cli_->Close();
cli_ = NULL;
should_exit_ = true;
return;
}
} else {
LOG(WARNING) << s.ToString();
cli_->Close();
cli_ = NULL;
}
}
}
}
}
}

void RedisSender::Stop() {
set_should_stop();
should_exit_ = true;
rsignal_.notify_all();
wsignal_.notify_all();
}

void RedisSender::SendRedisCommand(const std::string &command) {
std::unique_lock lock(signal_mutex_);
wsignal_.wait(lock, [this]() { return commandQueueSize() < 100000; });
if (!should_exit_) {
std::lock_guard l(keys_mutex_);
commands_queue_.push(command);
rsignal_.notify_one();
}
}

int RedisSender::SendCommand(std::string &command) {
time_t now = ::time(NULL);
if (kCheckDiff < now - last_write_time_) {
int ret = cli_->CheckAliveness();
if (ret < 0) {
cli_ = NULL;
ConnectRedis();
}
last_write_time_ = now;
}

// Send command
int idx = 0;
do {
pstd::Status s = cli_->Send(&command);
if (s.ok()) {
cli_->Recv(NULL);
return 0;
}

cli_->Close();
cli_ = NULL;
ConnectRedis();
} while(++idx < 3);
LOG(WARNING) << "RedisSender " << id_ << " fails to send redis command " << command << ", times: " << idx << ", error: " << "send command failed";
return -1;
}

void *RedisSender::ThreadMain() {
LOG(INFO) << "Start redis sender " << id_ << " thread...";
// sleep(15);

ConnectRedis();

while (!should_exit_) {
std::unique_lock lock(signal_mutex_);
while (commandQueueSize() == 0 && !should_exit_) {
rsignal_.wait_for(lock, std::chrono::milliseconds(100));
}

if (should_exit_) {
break;
}

if (commandQueueSize() == 0) {
continue;
}

// get redis command
std::string command;
{
std::lock_guard l(keys_mutex_);
command = commands_queue_.front();
elements_++;
commands_queue_.pop();
}

wsignal_.notify_one();
SendCommand(command);

}

LOG(INFO) << "RedisSender thread " << id_ << " complete";
cli_ = NULL;
return NULL;
}

0 comments on commit e744786

Please sign in to comment.