Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add clutser topology awareness #638

Merged
merged 15 commits into from
Sep 25, 2024
61 changes: 60 additions & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
// ErrNoSlot indicates that there is no redis node owns the key slot.
var ErrNoSlot = errors.New("the slot has no redis node")
var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option")
var ErrInvalidScanInterval = errors.New("scan interval must be greater than or equal to 0")

type clusterClient struct {
pslots [16384]conn
Expand All @@ -31,6 +32,7 @@ type clusterClient struct {
stop uint32
cmd Builder
retry bool
stopCh chan struct{}
}

// NOTE: connrole and conn must be initialized at the same time
Expand All @@ -46,6 +48,7 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error)
opt: opt,
conns: make(map[string]connrole),
retry: !opt.DisableRetry,
stopCh: make(chan struct{}),
}

if opt.ReplicaOnly && opt.SendToReplicas != nil {
Expand Down Expand Up @@ -74,6 +77,12 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error)
return client, err
}

if opt.ClusterOption.ScanInterval > 0 {
go client.runClusterTopologyRefreshment()
} else if opt.ClusterOption.ScanInterval < 0 {
return nil, ErrInvalidScanInterval
}

return client, nil
}

Expand Down Expand Up @@ -193,6 +202,40 @@ func (c *clusterClient) _refresh() (err error) {
}
}

shouldRefresh := false
c.mu.RLock()
// check if the new topology is different from the current one
for addr, cc := range conns {
old, ok := c.conns[addr]
if !ok || old.replica != cc.replica {
shouldRefresh = true
break
}
}
// check if the current topology is different from the new one
if !shouldRefresh {
for addr := range c.conns {
if _, ok := conns[addr]; !ok {
shouldRefresh = true
break
}
}
}
// check if cluster client is initialized.
if !shouldRefresh {
for _, cc := range c.pslots {
if cc == nil {
shouldRefresh = true
break
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check really necessary? Also, it is valid that a living cluster has some missing slots.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes. you are right.
9f886c5

c.mu.RUnlock()

if !shouldRefresh {
return nil
}

var removes []conn

c.mu.RLock()
Expand Down Expand Up @@ -358,6 +401,19 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g
return groups
}

func (c *clusterClient) runClusterTopologyRefreshment() {
ticker := time.NewTicker(c.opt.ClusterOption.ScanInterval)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
c.lazyRefresh()
}
}
}

func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) {
c.mu.RLock()
if slot == cmds.InitSlot {
Expand Down Expand Up @@ -1018,7 +1074,10 @@ func (c *clusterClient) Nodes() map[string]Client {
}

func (c *clusterClient) Close() {
atomic.StoreUint32(&c.stop, 1)
if atomic.CompareAndSwapUint32(&c.stop, 0, 1) {
close(c.stopCh)
}

c.mu.RLock()
for _, cc := range c.conns {
go cc.conn.Close()
Expand Down
Loading
Loading