Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve handling of LOADING response #657

Merged
merged 2 commits into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *singleClient) Do(ctx context.Context, cmd Completed) (resp RedisResult)
attempts := 1
retry:
resp = c.conn.Do(ctx, cmd)
if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.NonRedisError(), ctx) {
if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.Error(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, cmd, resp.Error(),
)
Expand Down Expand Up @@ -88,7 +88,7 @@ retry:
resps = c.conn.DoMulti(ctx, multi...).s
if c.retry && allReadOnly(multi) {
for i, resp := range resps {
if c.isRetryable(resp.NonRedisError(), ctx) {
if c.isRetryable(resp.Error(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, multi[i], resp.Error(),
)
Expand Down Expand Up @@ -116,7 +116,7 @@ retry:
resps = c.conn.DoMultiCache(ctx, multi...).s
if c.retry {
for i, resp := range resps {
if c.isRetryable(resp.NonRedisError(), ctx) {
if c.isRetryable(resp.Error(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, Completed(multi[i].Cmd), resp.Error(),
)
Expand All @@ -139,7 +139,7 @@ func (c *singleClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dura
attempts := 1
retry:
resp = c.conn.DoCache(ctx, cmd, ttl)
if c.retry && c.isRetryable(resp.NonRedisError(), ctx) {
if c.retry && c.isRetryable(resp.Error(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error())
if shouldRetry {
attempts++
Expand Down Expand Up @@ -214,7 +214,7 @@ retry:
return newErrResult(err)
}
resp = c.wire.Do(ctx, cmd)
if c.retry && cmd.IsReadOnly() && isRetryable(resp.NonRedisError(), c.wire, ctx) {
if c.retry && cmd.IsReadOnly() && isRetryable(resp.Error(), c.wire, ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, cmd, resp.Error(),
)
Expand Down Expand Up @@ -244,7 +244,7 @@ retry:
}
resp = c.wire.DoMulti(ctx, multi...).s
for i, cmd := range multi {
if retryable && isRetryable(resp[i].NonRedisError(), c.wire, ctx) {
if retryable && isRetryable(resp[i].Error(), c.wire, ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, multi[i], resp[i].Error(),
)
Expand Down Expand Up @@ -312,11 +312,23 @@ func (c *dedicatedSingleClient) release() {
}

func (c *singleClient) isRetryable(err error, ctx context.Context) bool {
return err != nil && err != ErrDoCacheAborted && atomic.LoadUint32(&c.stop) == 0 && ctx.Err() == nil
if err == nil || err == Nil || err == ErrDoCacheAborted || atomic.LoadUint32(&c.stop) != 0 || ctx.Err() != nil {
return false
}
if err, ok := err.(*RedisError); ok {
return err.IsLoading()
}
return true
}

func isRetryable(err error, w wire, ctx context.Context) bool {
return err != nil && w.Error() == nil && ctx.Err() == nil
if err == nil || err == Nil || w.Error() != nil || ctx.Err() != nil {
return false
}
if err, ok := err.(*RedisError); ok {
return err.IsLoading()
}
return true
}

func allReadOnly(multi []Completed) bool {
Expand Down
165 changes: 165 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,171 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
})
}

func TestSingleClientLoadingRetry(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())

setup := func() (*singleClient, *mockConn) {
m := &mockConn{}
client, err := newSingleClient(
&ClientOption{InitAddress: []string{""}},
m,
func(dst string, opt *ClientOption) conn { return m },
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}
return client, m
}

t.Run("Do Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoFn = func(cmd Completed) RedisResult {
attempts++
if attempts == 1 {
return newResult(RedisMessage{typ: '-', string: "LOADING Redis is loading the dataset in memory"}, nil)
}
return newResult(RedisMessage{typ: '+', string: "OK"}, nil)
}

if v, err := client.Do(context.Background(), client.B().Get().Key("test").Build()).ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
if attempts != 2 {
t.Fatalf("expected 2 attempts, got %v", attempts)
}
})

t.Run("Do not retry on non-loading errors", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoFn = func(cmd Completed) RedisResult {
attempts++
if attempts == 1 {
return newResult(RedisMessage{typ: '-', string: "ERR some other error"}, nil)
}
return newResult(RedisMessage{typ: '+', string: "OK"}, nil)
}

if err := client.Do(context.Background(), client.B().Get().Key("test").Build()).Error(); err == nil {
t.Fatal("expected error but got nil")
}
if attempts != 1 {
t.Fatalf("unexpected attempts %v, expected no retry", attempts)
}
})

t.Run("DoMulti Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoMultiFn = func(multi ...Completed) *redisresults {
attempts++
if attempts == 1 {
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '-', string: "LOADING Redis is loading the dataset in memory"}, nil)}}
}
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "OK"}, nil)}}
}

cmd := client.B().Get().Key("test").Build()
resps := client.DoMulti(context.Background(), cmd)
if len(resps) != 1 {
t.Fatalf("unexpected response length %v", len(resps))
}
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
})

t.Run("DoCache Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoCacheFn = func(cmd Cacheable, ttl time.Duration) RedisResult {
attempts++
if attempts == 1 {
return newResult(RedisMessage{typ: '-', string: "LOADING Redis is loading the dataset in memory"}, nil)
}
return newResult(RedisMessage{typ: '+', string: "OK"}, nil)
}

cmd := client.B().Get().Key("test").Cache()
if v, err := client.DoCache(context.Background(), cmd, time.Minute).ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
})

t.Run("DoMultiCache Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoMultiCacheFn = func(multi ...CacheableTTL) *redisresults {
attempts++
if attempts == 1 {
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '-', string: "LOADING Redis is loading the dataset in memory"}, nil)}}
}
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "OK"}, nil)}}
}

cmd := client.B().Get().Key("test").Cache()
resps := client.DoMultiCache(context.Background(), CT(cmd, time.Minute))
if len(resps) != 1 {
t.Fatalf("unexpected response length %v", len(resps))
}
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
})

t.Run("Dedicated Do Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoFn = func(cmd Completed) RedisResult {
attempts++
if attempts == 1 {
return newResult(RedisMessage{typ: '-', string: "LOADING Redis is loading the dataset in memory"}, nil)
}
return newResult(RedisMessage{typ: '+', string: "OK"}, nil)
}
m.AcquireFn = func() wire { return &mockWire{DoFn: m.DoFn} }

err := client.Dedicated(func(c DedicatedClient) error {
if v, err := c.Do(context.Background(), c.B().Get().Key("test").Build()).ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
return nil
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
})

t.Run("Dedicated DoMulti Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoMultiFn = func(multi ...Completed) *redisresults {
attempts++
if attempts == 1 {
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '-', string: "LOADING Redis is loading the dataset in memory"}, nil)}}
}
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "OK"}, nil)}}
}
m.AcquireFn = func() wire { return &mockWire{DoMultiFn: m.DoMultiFn} }

err := client.Dedicated(func(c DedicatedClient) error {
resps := c.DoMulti(context.Background(), c.B().Get().Key("test").Build())
if len(resps) != 1 {
t.Fatalf("unexpected response length %v", len(resps))
}
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
return nil
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
})
}

func BenchmarkSingleClient_DoCache(b *testing.B) {
ctx := context.Background()
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}, Dialer: net.Dialer{KeepAlive: -1}})
Expand Down
4 changes: 2 additions & 2 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,13 +1123,13 @@ func (c *clusterClient) Close() {
}

func (c *clusterClient) shouldRefreshRetry(err error, ctx context.Context) (addr string, mode RedirectMode) {
if err != nil && atomic.LoadUint32(&c.stop) == 0 {
if err != nil && err != Nil && err != ErrDoCacheAborted && atomic.LoadUint32(&c.stop) == 0 {
if err, ok := err.(*RedisError); ok {
if addr, ok = err.IsMoved(); ok {
mode = RedirectMove
} else if addr, ok = err.IsAsk(); ok {
mode = RedirectAsk
} else if err.IsClusterDown() || err.IsTryAgain() {
} else if err.IsClusterDown() || err.IsTryAgain() || err.IsLoading() {
mode = RedirectRetry
}
} else if ctx.Err() == nil {
Expand Down
Loading
Loading