Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: migrate tools support pika v3.5.0 #2984

Open
wants to merge 1 commit into
base: 3.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,16 @@ sync-window-size : 9000
# Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB). The value range is [64MB, 1GB].
max-conn-rbuf-size : 268435456

###################
## Migrate Settings
###################

target-redis-host : 127.0.0.1
target-redis-port : 6379
target-redis-pwd :

sync-batch-num : 100
redis-sender-num : 10

#######################################################################E#######
#! Critical Settings !#
Expand Down
66 changes: 66 additions & 0 deletions include/migrator_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#ifndef MIGRATOR_THREAD_H_
#define MIGRATOR_THREAD_H_

#include <iostream>
#include <mutex>

#include "storage/storage.h"
#include "net/include/redis_cli.h"

#include "include/pika_sender.h"

class MigratorThread : public net::Thread {
public:
MigratorThread(std::shared_ptr<storage::Storage> storage_, std::vector<std::shared_ptr<PikaSender>> *senders, int type, int thread_num) :
storage_(storage_),
should_exit_(false),
senders_(senders),
type_(type),
thread_num_(thread_num),
thread_index_(0),
num_(0) {
}

virtual ~ MigratorThread();

int64_t num() {
std::lock_guard<std::mutex> l(num_mutex_);
return num_;
}

void Stop() {
should_exit_ = true;
}

private:
void PlusNum() {
std::lock_guard<std::mutex> l(num_mutex_);
++num_;
}

void DispatchKey(const std::string &command, const std::string& key = "");

void MigrateDB();
void MigrateStringsDB();
void MigrateListsDB();
void MigrateHashesDB();
void MigrateSetsDB();
void MigrateZsetsDB();

virtual void *ThreadMain();

private:
std::shared_ptr<storage::Storage> storage_;
bool should_exit_;

std::vector<std::shared_ptr<PikaSender>> *senders_;
int type_;
int thread_num_;
int thread_index_;

int64_t num_;
std::mutex num_mutex_;
};

#endif

15 changes: 15 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,14 @@ class PikaConf : public pstd::BaseConf {
int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); }
int consensus_level() { return consensus_level_.load(); }
int replication_num() { return replication_num_.load(); }

std::string target_redis_host() { return target_redis_host_; }
int target_redis_port() { return target_redis_port_; }
std::string target_redis_pwd() { return target_redis_pwd_; }
int sync_batch_num() { return sync_batch_num_; }
int redis_sender_num() { return redis_sender_num_; }


int rate_limiter_mode() {
std::shared_lock l(rwlock_);
return rate_limiter_mode_;
Expand Down Expand Up @@ -925,6 +933,13 @@ class PikaConf : public pstd::BaseConf {
std::map<std::string, std::string> diff_commands_;
void TryPushDiffCommands(const std::string& command, const std::string& value);

// migrate configure items
std::string target_redis_host_;
int target_redis_port_;
std::string target_redis_pwd_;
int sync_batch_num_;
int redis_sender_num_;

//
// Critical configure items
//
Expand Down
1 change: 1 addition & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class PikaReplBgWorker {
net::BGThread bg_thread_;
static int HandleWriteBinlog(net::RedisParser* parser, const net::RedisCmdArgsType& argv);
static void ParseBinlogOffset(const InnerMessage::BinlogOffset& pb_offset, LogOffset* offset);
static void ParseAndSendPikaCommand(const std::shared_ptr<Cmd>& c_ptr);
};

#endif // PIKA_REPL_BGWROKER_H_
43 changes: 43 additions & 0 deletions include/pika_sender.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#ifndef PIKA_SENDER_H_
#define PIKA_SENDER_H_

#include <atomic>
#include <thread>
#include <chrono>
#include <iostream>
#include <queue>

#include "net/include/bg_thread.h"
#include "net/include/net_cli.h"
#include "net/include/redis_cli.h"

class PikaSender : public net::Thread {
public:
PikaSender(std::string ip, int64_t port, std::string password);
virtual ~PikaSender();
void LoadKey(const std::string &cmd);
void Stop();

int64_t elements() { return elements_; }

void SendCommand(std::string &command, const std::string &key);
int QueueSize();
void ConnectRedis();

private:
net::NetCli *cli_;
pstd::CondVar wsignal_;
pstd::CondVar rsignal_;
std::mutex signal_mutex;
std::mutex keys_queue_mutex_;
std::queue<std::string> keys_queue_;
std::string ip_;
int port_;
std::string password_;
std::atomic<bool> should_exit_;
int64_t elements_;

virtual void *ThreadMain();
};

#endif
12 changes: 12 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "include/pika_statistic.h"
#include "include/pika_transaction.h"
#include "include/rsync_server.h"
#include "include/redis_sender.h"

extern std::unique_ptr<PikaConf> g_pika_conf;

Expand Down Expand Up @@ -309,6 +310,12 @@ class PikaServer : public pstd::noncopyable {

pstd::Status GetCmdRouting(std::vector<net::RedisCmdArgsType>& redis_cmds, std::vector<Node>* dst, bool* all_local);

/*
* migrate used
*/
int SendRedisCommand(const std::string& command, const std::string& key);
void RetransmitData(const std::string& path);

// info debug use
void ServerStatus(std::string* info);

Expand Down Expand Up @@ -617,6 +624,11 @@ class PikaServer : public pstd::noncopyable {
*/
std::unique_ptr<PikaAuxiliaryThread> pika_auxiliary_thread_;

/*
* migrate to redis used
*/
std::vector<std::unique_ptr<RedisSender>> redis_senders_;

/*
* Async slotsMgrt use
*/
Expand Down
52 changes: 52 additions & 0 deletions include/redis_sender.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#ifndef REDIS_SENDER_H_
#define REDIS_SENDER_H_

#include <atomic>
#include <thread>
#include <chrono>
#include <iostream>
#include <queue>

#include "pika_repl_bgworker.h"
#include "net/include/net_cli.h"
#include "net/include/redis_cli.h"

class RedisSender : public net::Thread {
public:
RedisSender(int id, std::string ip, int64_t port, std::string password);
virtual ~RedisSender();
void Stop(void);
int64_t elements() {
return elements_;
}

void SendRedisCommand(const std::string &command);

private:
int SendCommand(std::string &command);
void ConnectRedis();
size_t commandQueueSize() {
std::lock_guard l(keys_mutex_);
return commands_queue_.size();
}

private:
int id_;
std::shared_ptr<net::NetCli> cli_;
pstd::CondVar rsignal_;
pstd::CondVar wsignal_;
pstd::Mutex signal_mutex_;
pstd::Mutex keys_mutex_;
std::queue<std::string> commands_queue_;
std::string ip_;
int port_;
std::string password_;
bool should_exit_;
int32_t cnt_;
int64_t elements_;
std::atomic<time_t> last_write_time_;

virtual void *ThreadMain();
};

#endif
43 changes: 43 additions & 0 deletions pika-migrate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
## Pika3.5到Redis迁移工具

### 适用版本:
Pika 3.5, 单机模式且只支持单db

### 功能
将Pika中的数据在线迁移到Pika、Redis(支持全量、增量同步)

### 开发背景:
之前Pika项目官方提供的pika\_to\_redis工具仅支持离线将Pika的DB中的数据迁移到Pika、Redis, 且无法增量同步, 该工具实际上就是一个特殊的Pika, 只不过成为从库之后, 内部会将从主库获取到的数据转发给Redis,同时并支持增量同步, 实现热迁功能.

### 热迁原理
1. pika-port通过dbsync请求获取主库当前全量db数据, 以及当前db数据所对应的binlog点位
2. 获取到主库当前全量db数据之后, 扫描db, 将db中的数据转发给Redis
3. 通过之前获取的binlog的点位向主库进行增量同步, 在增量同步的过程中, 将从主库获取到的binlog重组成Redis命令, 转发给Redis

### 新增配置项
```cpp
###################
## Migrate Settings
###################

target-redis-host : 127.0.0.1
target-redis-port : 6379
target-redis-pwd : abc

sync-batch-num : 100
redis-sender-num : 10
```

### 步骤
1. 考虑到在pika-port在将全量数据写入到Redis这段时间可能耗时很长, 导致主库原先binlog点位已经被清理, 我们首先在主库上执行`config set expire-logs-nums 10000`, 让主库保留10000个Binlog文件(Binlog文件占用磁盘空间, 可以根据实际情况确定保留binlog的数量), 确保后续该工具请求增量同步的时候, 对应的Binlog文件还存在.
2. 修改该工具配置文件的`target-redis-host, target-redis-port, target-redis-pwd, sync-batch-num, redis-sender-num`配置项(`sync-batch-num`是该工具接收到主库的全量数据之后, 为了提升转发效率, 将`sync-batch-num`个数据一起打包发送给Redis, 此外该工具内部可以指定`redis-sender-num`个线程用于转发命令, 命令通过Key的哈希值被分配到不同的线程中, 所以无需担心多线程发送导致的数据错乱的问题)
3. 使用`pika -c pika.conf`命令启动该工具, 查看日志是否有报错信息
4. 向该工具执行`slaveof ip port force`向主库请求同步, 观察是否有报错信息
5. 在确认主从关系建立成功之后(此时pika-port同时也在向目标Redis转发数据了)通过向主库执行`info Replication`查看主从同步延迟(可在主库写入一个特殊的Key, 然后看在Redis测是否可以立马获取到, 来判断是否数据已经基本同步完毕)

### 注意事项
1. Pika支持不同数据结构采用同名Key, 但是Redis不支持, 所以在有同Key数据的场景下, 以第一个迁移到Redis数据结构为准, 其他同Key数据结构会丢失
2. 该工具只支持热迁移单机模式下, 并且只采用单DB版本的Pika, 如果是集群模式, 或者是多DB场景, 工具会报错并且退出.
3. 为了避免由于主库Binlog被清理导致该工具触发多次全量同步向Redis写入脏数据, 工具自身做了保护, 在第二次触发全量同步时会报错退出.


Loading
Loading