Skip to content

Commit

Permalink
release v1.4 support cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
vinllen committed Jul 23, 2019
1 parent ffec501 commit ae5c51b
Show file tree
Hide file tree
Showing 12 changed files with 513 additions and 257 deletions.
3 changes: 3 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
2019-07-23 Alibaba Cloud.
* VERSION: 1.4.0
* FEATURE: support cluster.
2019-06-23 Alibaba Cloud.
* VERSION: 1.2.4
* BUGFIX: fix bug of stat print.
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ Redis-full-check fetches keys from source and then checks these keys exist on th
* source->RedisFullCheck->target
* target->RedisFullCheck->source

# supports
standalone, cluster, proxy(aliyun-cluster, tencent-cluster)

# Code branch rules
Version rules: a.b.c.<br>

Expand Down
55 changes: 42 additions & 13 deletions src/full_check/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,31 @@ import (
"full_check/common"

"github.com/garyburd/redigo/redis"
redigoCluster "github.com/vinllen/redis-go-cluster"
)

var (
emptyError = errors.New("empty")
)

type RedisHost struct {
Addr string
Password string
TimeoutMs uint64
Role string // "source" or "target"
Authtype string // "auth" or "adminauth"
DBType int
Addr []string
Password string
TimeoutMs uint64
Role string // "source" or "target"
Authtype string // "auth" or "adminauth"
DBType int
DBFilterList map[int]struct{} // whitelist
}

func (p RedisHost) String() string {
return fmt.Sprintf("%s redis addr: %s", p.Role, p.Addr)
}

func (p RedisHost) IsCluster() bool {
return p.DBType == common.TypeCluster
}

type RedisClient struct {
redisHost RedisHost
db int32
Expand Down Expand Up @@ -79,24 +85,47 @@ func (p *RedisClient) CheckHandleNetError(err error) bool {
func (p *RedisClient) Connect() error {
var err error
if p.conn == nil {
if p.redisHost.TimeoutMs == 0 {
p.conn, err = redis.Dial("tcp", p.redisHost.Addr)
if p.redisHost.IsCluster() == false {
// single db or proxy
if p.redisHost.TimeoutMs == 0 {
p.conn, err = redis.Dial("tcp", p.redisHost.Addr[0])
} else {
p.conn, err = redis.DialTimeout("tcp", p.redisHost.Addr[0], time.Millisecond*time.Duration(p.redisHost.TimeoutMs),
time.Millisecond*time.Duration(p.redisHost.TimeoutMs), time.Millisecond*time.Duration(p.redisHost.TimeoutMs))
}
} else {
p.conn, err = redis.DialTimeout("tcp", p.redisHost.Addr, time.Millisecond*time.Duration(p.redisHost.TimeoutMs),
time.Millisecond*time.Duration(p.redisHost.TimeoutMs), time.Millisecond*time.Duration(p.redisHost.TimeoutMs))
// cluster

cluster, err := redigoCluster.NewCluster(
&redigoCluster.Options{
StartNodes: p.redisHost.Addr,
ConnTimeout: time.Duration(p.redisHost.TimeoutMs) * time.Millisecond,
ReadTimeout: 0,
WriteTimeout: 0,
KeepAlive: 16,
AliveTime: 60 * time.Second,
Password: p.redisHost.Password,
})
if err == nil {
p.conn = common.NewClusterConn(cluster, 0)
}
}
if err != nil {
return err
}

if len(p.redisHost.Password) != 0 {
_, err = p.conn.Do(p.redisHost.Authtype, p.redisHost.Password)
if err != nil {
return err
}
}
_, err = p.conn.Do("select", p.db)
if err != nil {
return err

if p.redisHost.DBType != common.TypeCluster {
_, err = p.conn.Do("select", p.db)
if err != nil {
return err
}
}
} // p.conn == nil
return nil
Expand Down
49 changes: 35 additions & 14 deletions src/full_check/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,42 @@ import (
* map[int32]int64: logical db node map.
* []string: physical db nodes.
*/
func (p *RedisClient) FetchBaseInfo() (map[int32]int64, []string, error) {
// get keyspace
keyspaceContent, err := p.Do("info", "Keyspace")
if err != nil {
return nil, nil, fmt.Errorf("get keyspace failed[%v]", err)
}
func (p *RedisClient) FetchBaseInfo(isCluster bool) (map[int32]int64, []string, error) {
var logicalDBMap map[int32]int64

// parse to map
logicalDBMap, err := common.ParseKeyspace(keyspaceContent.([]byte))
if err != nil {
return nil, nil, fmt.Errorf("parse keyspace failed[%v]", err)
}
if !isCluster {
// get keyspace
keyspaceContent, err := p.Do("info", "Keyspace")
if err != nil {
return nil, nil, fmt.Errorf("get keyspace failed[%v]", err)
}

// parse to map
logicalDBMap, err = common.ParseKeyspace(keyspaceContent.([]byte))
if err != nil {
return nil, nil, fmt.Errorf("parse keyspace failed[%v]", err)
}

// set to 1 logical db if source is tencentProxy and map length is null
if len(logicalDBMap) == 0 && p.redisHost.DBType == common.TypeTencentProxy {
// set to 1 logical db if source is tencentProxy and map length is null
if len(logicalDBMap) == 0 && p.redisHost.DBType == common.TypeTencentProxy {
logicalDBMap[0] = 0
}
} else {
// is cluster
logicalDBMap = make(map[int32]int64)
logicalDBMap[0] = 0
}

// remove db that isn't in DBFilterList(white list)
if len(p.redisHost.DBFilterList) != 0 {
for key := range logicalDBMap {
if _, ok := p.redisHost.DBFilterList[int(key)]; !ok {
delete(logicalDBMap, key)
}
}
}


physicalDBList := make([]string, 0)
// get db list
switch p.redisHost.DBType {
Expand All @@ -49,7 +67,7 @@ func (p *RedisClient) FetchBaseInfo() (map[int32]int64, []string, error) {
return nil, nil, fmt.Errorf("source node count[%v] illegal", count)
} else {
for id := int64(0); id < count; id++ {
physicalDBList[id] = fmt.Sprintf("%v", id)
physicalDBList = append(physicalDBList, fmt.Sprintf("%v", id))
}
}
case common.TypeTencentProxy:
Expand All @@ -61,6 +79,9 @@ func (p *RedisClient) FetchBaseInfo() (map[int32]int64, []string, error) {
case common.TypeDB:
// do nothing
physicalDBList = append(physicalDBList, "meaningless")
case common.TypeCluster:
// equal to the source ip list
physicalDBList = p.redisHost.Addr
default:
return nil, nil, fmt.Errorf("unknown redis db type[%v]", p.redisHost.DBType)
}
Expand Down
101 changes: 101 additions & 0 deletions src/full_check/common/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package common

import (
redigoCluster "github.com/vinllen/redis-go-cluster"
redigo "github.com/garyburd/redigo/redis"
)

const(
RecvChanSize = 4096
)

/* implement redigo.Conn(https://github.com/garyburd/redigo)
* Embed redis-go-cluster(https://github.com/chasex/redis-go-cluster)
* The reason I create this struct is that redis-go-cluster isn't fulfill redigo.Conn
* interface. So I implement "Err", "Send", "Flush" and "Receive" interfaces.
*/
type ClusterConn struct {
client *redigoCluster.Cluster
recvChan chan reply
batcher *redigoCluster.Batch
}

type reply struct {
answer interface{}
err error
}

func NewClusterConn(clusterClient *redigoCluster.Cluster, recvChanSize int) redigo.Conn {
if recvChanSize == 0 {
recvChanSize = RecvChanSize
}

return &ClusterConn{
client: clusterClient,
recvChan: make(chan reply, recvChanSize),
}
}

func (cc *ClusterConn) Close() error {
cc.client.Close()
return nil
}

func (cc *ClusterConn) Err() error {
return nil
}

func (cc *ClusterConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
return cc.client.Do(commandName, args...)
}

// just add into batcher
func (cc *ClusterConn) Send(commandName string, args ...interface{}) error {
if cc.batcher == nil {
cc.batcher = cc.client.NewBatch()
}
return cc.batcher.Put(commandName, args...)
}

// send batcher and put the return into recvChan
func (cc *ClusterConn) Flush() error {
ret, err := cc.client.RunBatch(cc.batcher)
defer func() {
cc.batcher = nil // reset batcher
}()

if err != nil {
cc.recvChan <- reply{
answer: nil,
err: err,
}

return err
}

// for redis-go-cluster driver, "Receive" function returns all the replies once flushed.
// However, this action is different with redigo driver that "Receive" only returns 1
// reply each time.

retLength := len(ret)
availableSize := cap(cc.recvChan) - len(cc.recvChan)
if availableSize < retLength {
Logger.Warnf("available channel size[%v] less than current returned batch size[%v]", availableSize, retLength)
}
// Logger.Debugf("cluster flush batch with size[%v], return replies size[%v]", cc.batcher.GetBatchSize(), retLength)

for _, ele := range ret {
cc.recvChan <- reply{
answer: ele,
err: err,
}
}

return err
}

// read recvChan
func (cc *ClusterConn) Receive() (reply interface{}, err error) {
ret := <- cc.recvChan
return ret.answer, ret.err
}
7 changes: 5 additions & 2 deletions src/full_check/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ const (

// db type
TypeDB = 0 // db
TypeAliyunProxy = 1 // aliyun proxy
TypeTencentProxy = 2 // tencent cloud proxy
TypeCluster = 1
TypeAliyunProxy = 2 // aliyun proxy
TypeTencentProxy = 3 // tencent cloud proxy

TypeMaster = "master"
TypeSlave = "slave"
TypeAll = "all"

Splitter = ";"
)

var (
Expand Down
26 changes: 26 additions & 0 deletions src/full_check/common/mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package common

import (
"bytes"
"strings"
"strconv"
)

func Min(a, b int) int {
Expand All @@ -24,4 +26,28 @@ func ParseInfo(content []byte) map[string]string {
result[string(items[0])] = string(items[1])
}
return result
}

func FilterDBList(dbs string) map[int]struct{} {
ret := make(map[int]struct{})
// empty
if dbs == "-1" {
return ret
}

// empty
dbList := strings.Split(dbs, Splitter)
if len(dbList) == 0 {
return ret
}

for _, ele := range dbList {
val, err := strconv.Atoi(ele)
if err != nil {
panic(err)
}

ret[val] = struct{}{}
}
return ret
}
50 changes: 26 additions & 24 deletions src/full_check/configure/conf.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
package conf

var Opts struct {
SourceAddr string `short:"s" long:"source" value-name:"SOURCE" description:"Set host:port of source redis."`
SourcePassword string `short:"p" long:"sourcepassword" value-name:"Password" description:"Set source redis password"`
SourceAuthType string `long:"sourceauthtype" value-name:"AUTH-TYPE" default:"auth" description:"useless for opensource redis, valid value:auth/adminauth" `
SourceDBType int `long:"sourcedbtype" default:"0" description:"0: db, 1: aliyun proxy, 2: tencent proxy"`
TargetAddr string `short:"t" long:"target" value-name:"TARGET" description:"Set host:port of target redis."`
TargetPassword string `short:"a" long:"targetpassword" value-name:"Password" description:"Set target redis password"`
TargetAuthType string `long:"targetauthtype" value-name:"AUTH-TYPE" default:"auth" description:"useless for opensource redis, valid value:auth/adminauth" `
// TargetDBType int `long:"targetdbtype" default:"1" description:"1: db, 2: aliyun proxy 3: tencent proxy"`
ResultDBFile string `short:"d" long:"db" value-name:"Sqlite3-DB-FILE" default:"result.db" description:"sqlite3 db file for store result. If exist, it will be removed and a new file is created."`
CompareTimes string `long:"comparetimes" value-name:"COUNT" default:"3" description:"Total compare count, at least 1. In the first round, all keys will be compared. The subsequent rounds of the comparison will be done on the previous results."`
CompareMode int `short:"m" long:"comparemode" default:"2" description:"compare mode, 1: compare full value, 2: only compare value length, 3: only compare keys outline, 4: compare full value, but only compare value length when meets big key"`
Id string `long:"id" default:"unknown" description:"used in metric, run id"`
JobId string `long:"jobid" default:"unknown" description:"used in metric, job id"`
TaskId string `long:"taskid" default:"unknown" description:"used in metric, task id"`
Qps int `short:"q" long:"qps" default:"15000" description:"max batch qps limit: e.g., if qps is 10, full-check fetches 10 * $batch keys every second"`
Interval int `long:"interval" value-name:"Second" default:"5" description:"The time interval for each round of comparison(Second)"`
BatchCount string `long:"batchcount" value-name:"COUNT" default:"256" description:"the count of key/field per batch compare, valid value [1, 10000]"`
Parallel int `long:"parallel" value-name:"COUNT" default:"5" description:"concurrent goroutine number for comparison, valid value [1, 100]"`
LogFile string `long:"log" value-name:"FILE" description:"log file, if not specified, log is put to console"`
ResultFile string `long:"result" value-name:"FILE" description:"store all diff result, format is 'db\tdiff-type\tkey\tfield'"`
MetricPrint bool `long:"metric" value-name:"BOOL" description:"print metric in log"`
BigKeyThreshold int64 `long:"bigkeythreshold" value-name:"COUNT" default:"16384"`
FilterList string `short:"f" long:"filterlist" value-name:"FILTER" default:"" description:"if the filter list isn't empty, all elements in list will be synced. The input should be split by '|'. The end of the string is followed by a * to indicate a prefix match, otherwise it is a full match. e.g.: 'abc*|efg|m*' matches 'abc', 'abc1', 'efg', 'm', 'mxyz', but 'efgh', 'p' aren't'"`
Version bool `short:"v" long:"version"`
SourceAddr string `short:"s" long:"source" value-name:"SOURCE" description:"Set host:port of source redis. If db type is cluster, split by semicolon(;'), e.g., 10.1.1.1:1000;10.2.2.2:2000;10.3.3.3:3000. Only need to give a role in the master or slave."`
SourcePassword string `short:"p" long:"sourcepassword" value-name:"Password" description:"Set source redis password"`
SourceAuthType string `long:"sourceauthtype" value-name:"AUTH-TYPE" default:"auth" description:"useless for opensource redis, valid value:auth/adminauth" `
SourceDBType int `long:"sourcedbtype" default:"0" description:"0: db, 1: cluster 2: aliyun proxy, 3: tencent proxy"`
SourceDBFilterList string `long:"sourcedbfilterlist" default:"-1" description:"db white list that need to be compared, -1 means fetch all, \"0;5;15\" means fetch db 0, 5, and 15"`
TargetAddr string `short:"t" long:"target" value-name:"TARGET" description:"Set host:port of target redis. If db type is cluster, split by semicolon(;'), e.g., 10.1.1.1:1000;10.2.2.2:2000;10.3.3.3:3000. Only need to give a role in the master or slave."`
TargetPassword string `short:"a" long:"targetpassword" value-name:"Password" description:"Set target redis password"`
TargetAuthType string `long:"targetauthtype" value-name:"AUTH-TYPE" default:"auth" description:"useless for opensource redis, valid value:auth/adminauth" `
TargetDBType int `long:"targetdbtype" default:"0" description:"0: db, 1: cluster 2: aliyun proxy 3: tencent proxy"`
TargetDBFilterList string `long:"targetdbfilterlist" default:"-1" description:"db white list that need to be compared, -1 means fetch all, \"0;5;15\" means fetch db 0, 5, and 15"`
ResultDBFile string `short:"d" long:"db" value-name:"Sqlite3-DB-FILE" default:"result.db" description:"sqlite3 db file for store result. If exist, it will be removed and a new file is created."`
CompareTimes string `long:"comparetimes" value-name:"COUNT" default:"3" description:"Total compare count, at least 1. In the first round, all keys will be compared. The subsequent rounds of the comparison will be done on the previous results."`
CompareMode int `short:"m" long:"comparemode" default:"2" description:"compare mode, 1: compare full value, 2: only compare value length, 3: only compare keys outline, 4: compare full value, but only compare value length when meets big key"`
Id string `long:"id" default:"unknown" description:"used in metric, run id"`
JobId string `long:"jobid" default:"unknown" description:"used in metric, job id"`
TaskId string `long:"taskid" default:"unknown" description:"used in metric, task id"`
Qps int `short:"q" long:"qps" default:"15000" description:"max batch qps limit: e.g., if qps is 10, full-check fetches 10 * $batch keys every second"`
Interval int `long:"interval" value-name:"Second" default:"5" description:"The time interval for each round of comparison(Second)"`
BatchCount string `long:"batchcount" value-name:"COUNT" default:"256" description:"the count of key/field per batch compare, valid value [1, 10000]"`
Parallel int `long:"parallel" value-name:"COUNT" default:"5" description:"concurrent goroutine number for comparison, valid value [1, 100]"`
LogFile string `long:"log" value-name:"FILE" description:"log file, if not specified, log is put to console"`
ResultFile string `long:"result" value-name:"FILE" description:"store all diff result, format is 'db\tdiff-type\tkey\tfield'"`
MetricPrint bool `long:"metric" value-name:"BOOL" description:"print metric in log"`
BigKeyThreshold int64 `long:"bigkeythreshold" value-name:"COUNT" default:"16384"`
FilterList string `short:"f" long:"filterlist" value-name:"FILTER" default:"" description:"if the filter list isn't empty, all elements in list will be synced. The input should be split by '|'. The end of the string is followed by a * to indicate a prefix match, otherwise it is a full match. e.g.: 'abc*|efg|m*' matches 'abc', 'abc1', 'efg', 'm', 'mxyz', but 'efgh', 'p' aren't'"`
Version bool `short:"v" long:"version"`
}
Loading

0 comments on commit ae5c51b

Please sign in to comment.