Skip to content

Commit

Permalink
feat: improve handling of LOADING response
Browse files Browse the repository at this point in the history
When a Redis node is loading data (e.g. after restart), it responds with LOADING
errors. This makes the client more resilient during Redis node restarts and
initial data loading phases.

Signed-off-by: Ernesto Alejandro Santana Hidalgo <ernesto.alejandrosantana@gmail.com>
  • Loading branch information
nesty92 committed Oct 29, 2024
1 parent 6b95467 commit 0d2c239
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 8 deletions.
20 changes: 13 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *singleClient) Do(ctx context.Context, cmd Completed) (resp RedisResult)
attempts := 1
retry:
resp = c.conn.Do(ctx, cmd)
if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.NonRedisError(), ctx) {
if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.Error(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, cmd, resp.Error(),
)
Expand Down Expand Up @@ -88,7 +88,7 @@ retry:
resps = c.conn.DoMulti(ctx, multi...).s
if c.retry && allReadOnly(multi) {
for i, resp := range resps {
if c.isRetryable(resp.NonRedisError(), ctx) {
if c.isRetryable(resp.Error(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, multi[i], resp.Error(),
)
Expand Down Expand Up @@ -116,7 +116,7 @@ retry:
resps = c.conn.DoMultiCache(ctx, multi...).s
if c.retry {
for i, resp := range resps {
if c.isRetryable(resp.NonRedisError(), ctx) {
if c.isRetryable(resp.Error(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, Completed(multi[i].Cmd), resp.Error(),
)
Expand All @@ -139,7 +139,7 @@ func (c *singleClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dura
attempts := 1
retry:
resp = c.conn.DoCache(ctx, cmd, ttl)
if c.retry && c.isRetryable(resp.NonRedisError(), ctx) {
if c.retry && c.isRetryable(resp.Error(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error())
if shouldRetry {
attempts++
Expand Down Expand Up @@ -214,7 +214,7 @@ retry:
return newErrResult(err)
}
resp = c.wire.Do(ctx, cmd)
if c.retry && cmd.IsReadOnly() && isRetryable(resp.NonRedisError(), c.wire, ctx) {
if c.retry && cmd.IsReadOnly() && isRetryable(resp.Error(), c.wire, ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, cmd, resp.Error(),
)
Expand Down Expand Up @@ -244,7 +244,7 @@ retry:
}
resp = c.wire.DoMulti(ctx, multi...).s
for i, cmd := range multi {
if retryable && isRetryable(resp[i].NonRedisError(), c.wire, ctx) {
if retryable && isRetryable(resp[i].Error(), c.wire, ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, multi[i], resp[i].Error(),
)
Expand Down Expand Up @@ -312,7 +312,13 @@ func (c *dedicatedSingleClient) release() {
}

func (c *singleClient) isRetryable(err error, ctx context.Context) bool {
return err != nil && err != ErrDoCacheAborted && atomic.LoadUint32(&c.stop) == 0 && ctx.Err() == nil
if err == nil || err == ErrDoCacheAborted || atomic.LoadUint32(&c.stop) != 0 || ctx.Err() != nil {
return false
}
if redisErr, ok := err.(*RedisError); ok {
return redisErr.IsLoading()
}
return true
}

func isRetryable(err error, w wire, ctx context.Context) bool {
Expand Down
165 changes: 165 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,171 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
})
}

func TestSingleClientLoadingRetry(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())

setup := func() (*singleClient, *mockConn) {
m := &mockConn{}
client, err := newSingleClient(
&ClientOption{InitAddress: []string{""}},
m,
func(dst string, opt *ClientOption) conn { return m },
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}
return client, m
}

t.Run("Do Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoFn = func(cmd Completed) RedisResult {
attempts++
if attempts == 1 {
return newResult(RedisMessage{typ: '-', string: "LOADING Redis is loading the dataset in memory"}, nil)
}
return newResult(RedisMessage{typ: '+', string: "OK"}, nil)
}

if v, err := client.Do(context.Background(), client.B().Get().Key("test").Build()).ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
if attempts != 2 {
t.Fatalf("expected 2 attempts, got %v", attempts)
}
})

t.Run("Do not retry on non-loading errors", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoFn = func(cmd Completed) RedisResult {
attempts++
if attempts == 1 {
return newResult(RedisMessage{typ: '-', string: "ERR some other error"}, nil)
}
return newResult(RedisMessage{typ: '+', string: "OK"}, nil)
}

if err := client.Do(context.Background(), client.B().Get().Key("test").Build()).Error(); err == nil {
t.Fatal("expected error but got nil")
}
if attempts != 1 {
t.Fatalf("unexpected attempts %v, expected no retry", attempts)
}
})

t.Run("DoMulti Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoMultiFn = func(multi ...Completed) *redisresults {
attempts++
if attempts == 1 {
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '-', string: "LOADING Redis is loading the dataset in memory"}, nil)}}
}
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "OK"}, nil)}}
}

cmd := client.B().Get().Key("test").Build()
resps := client.DoMulti(context.Background(), cmd)
if len(resps) != 1 {
t.Fatalf("unexpected response length %v", len(resps))
}
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
})

t.Run("DoCache Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoCacheFn = func(cmd Cacheable, ttl time.Duration) RedisResult {
attempts++
if attempts == 1 {
return newResult(RedisMessage{typ: '-', string: "LOADING Redis is loading the dataset in memory"}, nil)
}
return newResult(RedisMessage{typ: '+', string: "OK"}, nil)
}

cmd := client.B().Get().Key("test").Cache()
if v, err := client.DoCache(context.Background(), cmd, time.Minute).ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
})

t.Run("DoMultiCache Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoMultiCacheFn = func(multi ...CacheableTTL) *redisresults {
attempts++
if attempts == 1 {
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '-', string: "LOADING Redis is loading the dataset in memory"}, nil)}}
}
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "OK"}, nil)}}
}

cmd := client.B().Get().Key("test").Cache()
resps := client.DoMultiCache(context.Background(), CT(cmd, time.Minute))
if len(resps) != 1 {
t.Fatalf("unexpected response length %v", len(resps))
}
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
})

t.Run("Dedicated Do Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoFn = func(cmd Completed) RedisResult {
attempts++
if attempts == 1 {
return newResult(RedisMessage{typ: '-', string: "LOADING Redis is loading the dataset in memory"}, nil)
}
return newResult(RedisMessage{typ: '+', string: "OK"}, nil)
}
m.AcquireFn = func() wire { return &mockWire{DoFn: m.DoFn} }

err := client.Dedicated(func(c DedicatedClient) error {
if v, err := c.Do(context.Background(), c.B().Get().Key("test").Build()).ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
return nil
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
})

t.Run("Dedicated DoMulti Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoMultiFn = func(multi ...Completed) *redisresults {
attempts++
if attempts == 1 {
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '-', string: "LOADING Redis is loading the dataset in memory"}, nil)}}
}
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "OK"}, nil)}}
}
m.AcquireFn = func() wire { return &mockWire{DoMultiFn: m.DoMultiFn} }

err := client.Dedicated(func(c DedicatedClient) error {
resps := c.DoMulti(context.Background(), c.B().Get().Key("test").Build())
if len(resps) != 1 {
t.Fatalf("unexpected response length %v", len(resps))
}
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
return nil
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
})
}

func BenchmarkSingleClient_DoCache(b *testing.B) {
ctx := context.Background()
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}, Dialer: net.Dialer{KeepAlive: -1}})
Expand Down
1 change: 1 addition & 0 deletions hack/cmds/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,7 @@ var readOnlyCMDs = map[string]bool{
"objectrefcount": false,
"pexpiretime": false,
"pfcount": false,
"ping": false,
"pttl": false,
"pubsubchannels": false,
"pubsubnumpat": false,
Expand Down
2 changes: 1 addition & 1 deletion internal/cmds/gen_connection.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ func (r *RedisError) IsTryAgain() bool {
return strings.HasPrefix(r.string, "TRYAGAIN")
}

// IsLoading checks if it is a redis LOADING message
func (r *RedisError) IsLoading() bool {
return strings.HasPrefix(r.string, "LOADING")
}

// IsClusterDown checks if it is a redis CLUSTERDOWN message and returns ask address.
func (r *RedisError) IsClusterDown() bool {
return strings.HasPrefix(r.string, "CLUSTERDOWN")
Expand Down

0 comments on commit 0d2c239

Please sign in to comment.