Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add clutser topology awareness #638

Merged
merged 15 commits into from
Sep 25, 2024
164 changes: 141 additions & 23 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
// ErrNoSlot indicates that there is no redis node owns the key slot.
var ErrNoSlot = errors.New("the slot has no redis node")
var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option")
var ErrInvalidScanInterval = errors.New("scan interval must be greater than or equal to 0")

type clusterClient struct {
pslots [16384]conn
Expand All @@ -31,6 +32,7 @@ type clusterClient struct {
stop uint32
cmd Builder
retry bool
stopCh chan struct{}
}

// NOTE: connrole and conn must be initialized at the same time
Expand All @@ -46,6 +48,7 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error)
opt: opt,
conns: make(map[string]connrole),
retry: !opt.DisableRetry,
stopCh: make(chan struct{}),
}

if opt.ReplicaOnly && opt.SendToReplicas != nil {
Expand Down Expand Up @@ -74,6 +77,12 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error)
return client, err
}

if opt.ClusterOption.ScanInterval > 0 {
go client.runClusterTopologyRefreshment()
} else if opt.ClusterOption.ScanInterval < 0 {
return nil, ErrInvalidScanInterval
}

return client, nil
}

Expand Down Expand Up @@ -120,6 +129,10 @@ func (c *clusterClient) lazyRefresh() {
c.sc.LazyDo(time.Second, c._refresh)
}

func (c *clusterClient) lazyConditionalRefresh() {
c.sc.LazyDo(time.Second, c.conditionalRefresh)
}

type clusterslots struct {
addr string
reply RedisResult
Expand All @@ -144,6 +157,38 @@ func getClusterSlots(c conn, timeout time.Duration) clusterslots {
}

func (c *clusterClient) _refresh() (err error) {
result, err := c.getClusterTopology()
if err != nil {
return err
}

groups := result.parse(c.opt.TLSConfig != nil)
conns := make(map[string]connrole, len(groups))
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false}
for _, addr := range g.nodes[1:] {
if c.rOpt != nil {
conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true}
} else {
conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true}
}
}
}
// make sure InitAddress always be present
for _, addr := range c.opt.InitAddress {
if _, ok := conns[addr]; !ok {
conns[addr] = connrole{
conn: c.connFn(addr, c.opt),
}
}
}

c.updateClusterTopologyCache(conns, groups)

return nil
}

func (c *clusterClient) getClusterTopology() (result clusterslots, err error) {
c.mu.RLock()
results := make(chan clusterslots, len(c.conns))
pending := make([]conn, 0, len(c.conns))
Expand All @@ -152,7 +197,6 @@ func (c *clusterClient) _refresh() (err error) {
}
c.mu.RUnlock()

var result clusterslots
for i := 0; i < cap(results); i++ {
if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections
for j := i; j < i+4 && j < len(pending); j++ {
Expand All @@ -168,31 +212,16 @@ func (c *clusterClient) _refresh() (err error) {
}
}
if err != nil {
return err
return
}
pending = nil

groups := result.parse(c.opt.TLSConfig != nil)
conns := make(map[string]connrole, len(groups))
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false}
for _, addr := range g.nodes[1:] {
if c.rOpt != nil {
conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true}
} else {
conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true}
}
}
}
// make sure InitAddress always be present
for _, addr := range c.opt.InitAddress {
if _, ok := conns[addr]; !ok {
conns[addr] = connrole{
conn: c.connFn(addr, c.opt),
}
}
}
return
}

func (c *clusterClient) updateClusterTopologyCache(
conns map[string]connrole, groups map[string]group,
) {
var removes []conn

c.mu.RLock()
Expand Down Expand Up @@ -262,6 +291,79 @@ func (c *clusterClient) _refresh() (err error) {
}
}(removes)
}
}

func (c *clusterClient) conditionalRefresh() (err error) {
result, err := c.getClusterTopology()
if err != nil {
return err
}

groups := result.parse(c.opt.TLSConfig != nil)

// we need to check whether the new topology is different from the current one.
// so we don't need to early re-create the connections.
conns := make(map[string]connrole, len(groups))
for master, g := range groups {
conns[master] = connrole{replica: false}
for _, addr := range g.nodes[1:] {
if c.rOpt != nil {
conns[addr] = connrole{replica: true}
} else {
conns[addr] = connrole{replica: true}
}
}
}
// make sure InitAddress always be present
for _, addr := range c.opt.InitAddress {
if _, ok := conns[addr]; !ok {
conns[addr] = connrole{}
}
}

isChanged := false
c.mu.RLock()
// check if the new topology is different from the current one
for addr, cc := range conns {
old, ok := c.conns[addr]
if !ok || old.replica != cc.replica {
isChanged = true
break
}
}
// check if the current topology is different from the new one
if !isChanged {
for addr := range c.conns {
if _, ok := conns[addr]; !ok {
isChanged = true
break
}
}
}
c.mu.RUnlock()

if !isChanged {
return nil
}

for addr, cc := range conns {
if cc.replica {
if c.rOpt != nil {
cc.conn = c.connFn(addr, c.rOpt)
} else {
cc.conn = c.connFn(addr, c.opt)
}
} else {
cc.conn = c.connFn(addr, c.opt)
}

conns[addr] = connrole{
conn: cc.conn,
replica: cc.replica,
}
}

c.updateClusterTopologyCache(conns, groups)

return nil
}
Expand Down Expand Up @@ -358,6 +460,19 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g
return groups
}

func (c *clusterClient) runClusterTopologyRefreshment() {
ticker := time.NewTicker(c.opt.ClusterOption.ScanInterval)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
c.lazyConditionalRefresh()
}
}
}

func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) {
c.mu.RLock()
if slot == cmds.InitSlot {
Expand Down Expand Up @@ -1018,7 +1133,10 @@ func (c *clusterClient) Nodes() map[string]Client {
}

func (c *clusterClient) Close() {
atomic.StoreUint32(&c.stop, 1)
if atomic.CompareAndSwapUint32(&c.stop, 0, 1) {
close(c.stopCh)
}

c.mu.RLock()
for _, cc := range c.conns {
go cc.conn.Close()
Expand Down
Loading
Loading