Skip to content

Commit

Permalink
Merge pull request #52 from ikenchina/dev/rdb_command
Browse files Browse the repository at this point in the history
a new command
  • Loading branch information
ikenchina authored Aug 16, 2024
2 parents d9d68f1 + 9efb809 commit 8b038ec
Show file tree
Hide file tree
Showing 37 changed files with 1,274 additions and 840 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Translations: [English](README.md) | [简体中文](README_ZH.md)
- [Overview](#overview)
- [Features](#features)
- [Real-time Data Synchronization](#real-time-data-synchronization)
- [Load RDB into redis](#load-rdb-into-redis)
- [Other Features](#other-features)
- [Product Comparison](#product-comparison)
- [Technical Implementation](#technical-implementation)
Expand Down Expand Up @@ -65,6 +66,11 @@ The feature matrix of `redis-GunYu` for real-time synchronization
- Redis topology: Real-time monitoring of topology changes in the source and target Redis (e.g., adding/removing nodes, master-slave switch, etc.), to change consistency strategies and adjust other functional strategies


### Load RDB into redis

Load a RDB file to a running redis server or cluster. Refers to [RDB](rdb_en.md)


### Other Features

Additional features are currently under development.
Expand Down Expand Up @@ -118,6 +124,8 @@ This generates the `redisGunYu` binary file locally.
```
./redisGunYu -conf ./config.yaml
```
Default command is sync, you can run other command with `-cmd=command name`.


**Start with command line arguments**
```
Expand Down
20 changes: 17 additions & 3 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [简介](#简介)
- [特性](#特性)
- [数据实时同步](#数据实时同步)
- [RDB导入到redis](#rdb导入到redis)
- [其他](#其他)
- [产品比较](#产品比较)
- [实现](#实现)
Expand Down Expand Up @@ -71,6 +72,14 @@




### RDB导入到redis

此功能是解析RDB文件,然后将数据回放到正在运行的redis中,可以对RDB文件进行过滤。




### 其他

其他功能,仍在开发中。
Expand All @@ -79,7 +88,7 @@

## 产品比较

从产品需求上,对redis-GunYu和几个主流工具进行比较
从产品需求上,对redis-GunYu和几个主流工具的实时同步功能进行比较

功能点 | redis-shake/v2 | DTS | xpipe | redis-GunYu
-- | -- | -- | -- | --
Expand All @@ -94,7 +103,7 @@

## 实现

`redis-GunYu`的技术实现如图所示,具体技术原理请见[技术实现](docs/tech_zh.md)
`redis-GunYu`同步功能的技术实现如图所示,具体技术原理请见[技术实现](docs/tech_zh.md)

<img src="docs/imgs/sync.png" width = "400" height = "150" alt="架构图" align=center />

Expand Down Expand Up @@ -128,10 +137,15 @@ make

### 使用

我们以使用同步功能为例


**配置文件的方式启动**
```
./redisGunYu -conf ./config.yaml
./redisGunYu -conf ./config.yaml -cmd=sync
```
`-cmd=sync` 可忽略


**命令行传递参数的方式启动**
```
Expand Down
73 changes: 66 additions & 7 deletions cmd/rdb.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"bufio"
"context"
"errors"
"fmt"
Expand All @@ -9,8 +10,13 @@ import (
"sync/atomic"

"github.com/mgtv-tech/redis-GunYu/config"
"github.com/mgtv-tech/redis-GunYu/pkg/io/pipe"
"github.com/mgtv-tech/redis-GunYu/pkg/rdb"
"github.com/mgtv-tech/redis-GunYu/pkg/redis"
"github.com/mgtv-tech/redis-GunYu/pkg/store"
usync "github.com/mgtv-tech/redis-GunYu/pkg/sync"
"github.com/mgtv-tech/redis-GunYu/pkg/util"
"github.com/mgtv-tech/redis-GunYu/syncer"
)

type RdbCmd struct {
Expand All @@ -36,24 +42,25 @@ func (sc *RdbCmd) Stop() error {
}

func (rc *RdbCmd) Run() error {
action := config.GetFlag().RdbCmd.RdbAction
action := config.GetRdbCmdConfig().Action
switch action {
case "print":
util.PanicIfErr(rc.Print())
util.PanicIfErr(rc.Print(config.GetRdbCmdConfig().RdbPath, &config.GetRdbCmdConfig().Print))
case "load":
util.PanicIfErr(rc.Load(config.GetRdbCmdConfig().RdbPath, &config.GetRdbCmdConfig().Load))
default:
panic(fmt.Errorf("unknown action : %s", action))
}
return nil
}

func (rc *RdbCmd) Print() error {
rdbFn := config.GetFlag().RdbCmd.RdbPath
file, err := os.OpenFile(rdbFn, os.O_RDONLY, 0777)
func (rc *RdbCmd) Print(rdbPath string, cfg *config.RdbCmdPrint) error {
file, err := os.OpenFile(rdbPath, os.O_RDONLY, 0777)
if err != nil {
return err
}
var stat atomic.Int64
pipe := redis.ParseRdb(file, &stat, config.RdbPipeSize, "7.2") // @TODO version
pipe := rdb.ParseRdb(file, &stat, config.RdbPipeSize, rdb.WithTargetRedisVersion("7.2"), rdb.WithFunctionExists("flush")) // @TODO version
for {
select {
case e, ok := <-pipe:
Expand All @@ -67,7 +74,7 @@ func (rc *RdbCmd) Print() error {
return e.Err
}
fmt.Printf("db(%d), key(%s), value(%s)\n", e.DB, e.Key, e.Value())
if config.GetFlag().RdbCmd.ToCmd {
if cfg.ToCmd {
e.ObjectParser.ExecCmd(func(cmd string, args ...interface{}) error {
params := []interface{}{}
for _, arg := range args {
Expand All @@ -87,3 +94,55 @@ func (rc *RdbCmd) Print() error {
}
}
}

func (rc *RdbCmd) Load(rdbPath string, cfg *config.RdbCmdLoad) error {

readBufSize := 10 * 1024 * 1024
piper, pipew := pipe.NewSize(readBufSize)
buf := bufio.NewReaderSize(piper, readBufSize)

rdbRd, err := store.NewRdbReaderFromFile(pipew, rdbPath, false)
if err != nil {
return err
}

err = redis.FixVersion(cfg.Redis)
if err != nil {
return err
}
if err = redis.FixTopology(cfg.Redis); err != nil {
return err
}

reader := store.NewReader(buf, rdbRd, nil, 0, rdbRd.Size(), "")
reader.Start(usync.NewWaitCloserFromContext(rc.ctx, nil))

outputCfg := syncer.RedisOutputConfig{
InputName: "rdb_replay",
RunId: "",
CanTransaction: false,
Redis: *cfg.Redis,
EnableResumeFromBreakPoint: false,
ReplaceHashTag: cfg.Replay.ReplaceHashTag,
KeyExists: cfg.Replay.KeyExists,
KeyExistsLog: cfg.Replay.KeyExistsLog,
FunctionExists: cfg.Replay.FunctionExists,
MaxProtoBulkLen: cfg.Replay.MaxProtoBulkLen,
TargetDb: cfg.Replay.TargetDb,
TargetDbMap: cfg.Replay.TargetDbMap,
BatchCmdCount: cfg.Replay.BatchCmdCount,
BatchTicker: cfg.Replay.BatchTicker,
BatchBufferSize: cfg.Replay.BatchBufferSize,
KeepaliveTicker: cfg.Replay.KeepaliveTicker,
ReplayRdbParallel: cfg.Replay.ReplayRdbParallel,
ReplayRdbEnableRestore: *cfg.Replay.ReplayRdbEnableRestore,
UpdateCheckpointTicker: cfg.Replay.UpdateCheckpointTicker,
Stats: cfg.Replay.Stats,
Filter: cfg.Filter,
SyncDelayTestKey: "",
}

output := syncer.NewRedisOutput(outputCfg)

return output.SendRdb(rc.ctx, reader)
}
Loading

0 comments on commit 8b038ec

Please sign in to comment.