Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add more retry strategy (s3.ReadFile: body reader / pushBackup: backoffer) (#50541) #50707

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 13 additions & 48 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,57 +1387,22 @@
var errReset error
var errBackup error

for retry := 0; retry < backupRetryTimes; retry++ {
logutil.CL(ctx).Info("try backup",
zap.Int("retry time", retry),
)
retry := -1
return utils.WithRetry(ctx, func() error {
retry += 1
if retry != 0 {
client, errReset = resetFn()
if errReset != nil {
return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+
"please check the tikv status", storeID)
}

Check warning on line 1398 in br/pkg/backup/client.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/client.go#L1390-L1398

Added lines #L1390 - L1398 were not covered by tests
}
logutil.CL(ctx).Info("try backup", zap.Int("retry time", retry))

Check warning on line 1400 in br/pkg/backup/client.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/client.go#L1400

Added line #L1400 was not covered by tests
errBackup = doSendBackup(ctx, client, req, respFn)
if errBackup != nil {
if isRetryableError(errBackup) {
time.Sleep(3 * time.Second)
client, errReset = resetFn()
if errReset != nil {
return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+
"please check the tikv status", storeID)
}
continue
}
logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID), zap.Int("retry", retry))
return berrors.ErrFailedToConnect.Wrap(errBackup).GenWithStack("failed to create backup stream to store %d", storeID)
}
// finish backup
break
}
return nil
}

// gRPC communication cancelled with connection closing
const (
gRPC_Cancel = "the client connection is closing"
)

// isRetryableError represents whether we should retry reset grpc connection.
func isRetryableError(err error) bool {
// some errors can be retried
// https://github.com/pingcap/tidb/issues/34350
switch status.Code(err) {
case codes.Unavailable, codes.DeadlineExceeded,
codes.ResourceExhausted, codes.Aborted, codes.Internal:
{
log.Warn("backup met some errors, these errors can be retry 5 times", zap.Error(err))
return true
}
}

// At least, there are two possible cancel() call,
// one from backup range, another from gRPC, here we retry when gRPC cancel with connection closing
if status.Code(err) == codes.Canceled {
if s, ok := status.FromError(err); ok {
if strings.Contains(s.Message(), gRPC_Cancel) {
log.Warn("backup met grpc cancel error, this errors can be retry 5 times", zap.Error(err))
return true
}
}
}
return false
return nil

Check warning on line 1406 in br/pkg/backup/client.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/client.go#L1406

Added line #L1406 was not covered by tests
}, utils.NewBackupSSTBackoffer())
}
2 changes: 1 addition & 1 deletion br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ go_test(
],
embed = [":storage"],
flaky = True,
shard_count = 49,
shard_count = 50,
deps = [
"//br/pkg/mock",
"@com_github_aws_aws_sdk_go//aws",
Expand Down
53 changes: 38 additions & 15 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,22 +519,41 @@

// ReadFile reads the file from the storage and returns the contents.
func (rs *S3Storage) ReadFile(ctx context.Context, file string) ([]byte, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + file),
}
result, err := rs.svc.GetObjectWithContext(ctx, input)
if err != nil {
return nil, errors.Annotatef(err,
"failed to read s3 file, file info: input.bucket='%s', input.key='%s'",
*input.Bucket, *input.Key)
}
defer result.Body.Close()
data, err := io.ReadAll(result.Body)
if err != nil {
return nil, errors.Trace(err)
var (
data []byte
readErr error
)
for retryCnt := 0; retryCnt < maxErrorRetries; retryCnt += 1 {
input := &s3.GetObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + file),
}
result, err := rs.svc.GetObjectWithContext(ctx, input)
if err != nil {
return nil, errors.Annotatef(err,
"failed to read s3 file, file info: input.bucket='%s', input.key='%s'",
*input.Bucket, *input.Key)
}
data, readErr = io.ReadAll(result.Body)
// close the body of response since data has been already read out
result.Body.Close()
// for unit test
failpoint.Inject("read-s3-body-failed", func(_ failpoint.Value) {
log.Info("original error", zap.Error(readErr))
readErr = errors.Errorf("read: connection reset by peer")
})
if readErr != nil {
if isDeadlineExceedError(readErr) || isCancelError(readErr) {
return nil, errors.Annotatef(readErr, "failed to read body from get object result, file info: input.bucket='%s', input.key='%s', retryCnt='%d'",
*input.Bucket, *input.Key, retryCnt)
}

Check warning on line 549 in br/pkg/storage/s3.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/storage/s3.go#L547-L549

Added lines #L547 - L549 were not covered by tests
continue
}
return data, nil
}
return data, nil
// retry too much, should be failed
return nil, errors.Annotatef(readErr, "failed to read body from get object result (retry too much), file info: input.bucket='%s', input.key='%s'",
rs.options.Bucket, rs.options.Prefix+file)

Check warning on line 556 in br/pkg/storage/s3.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/storage/s3.go#L555-L556

Added lines #L555 - L556 were not covered by tests
}

// DeleteFile delete the file in s3 storage
Expand Down Expand Up @@ -993,6 +1012,10 @@
client.DefaultRetryer
}

func isCancelError(err error) bool {
return strings.Contains(err.Error(), "context canceled")
}

func isDeadlineExceedError(err error) bool {
// TODO find a better way.
// Known challenges:
Expand Down
38 changes: 38 additions & 0 deletions br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -1358,3 +1359,40 @@ func TestRetryError(t *testing.T) {
require.NoError(t, err)
require.Equal(t, count, int32(2))
}

func TestS3ReadFileRetryable(t *testing.T) {
s := createS3Suite(t)
ctx := aws.BackgroundContext()
errMsg := "just some unrelated error"
expectedErr := errors.New(errMsg)

s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
require.Equal(t, "bucket", aws.StringValue(input.Bucket))
require.Equal(t, "prefix/file", aws.StringValue(input.Key))
return &s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("test"))),
}, nil
})
s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
require.Equal(t, "bucket", aws.StringValue(input.Bucket))
require.Equal(t, "prefix/file", aws.StringValue(input.Key))
return &s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("test"))),
}, nil
})
s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
Return(nil, expectedErr)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed", "2*return(true)"))
defer func() {
failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed")
}()
_, err := s.storage.ReadFile(ctx, "file")
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), errMsg))
}
2 changes: 1 addition & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 37,
shard_count = 38,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
Expand Down
33 changes: 32 additions & 1 deletion br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"context"
"database/sql"
"io"
"strings"
"time"

"github.com/pingcap/errors"
Expand All @@ -26,6 +27,10 @@
downloadSSTWaitInterval = 1 * time.Second
downloadSSTMaxWaitInterval = 4 * time.Second

backupSSTRetryTimes = 5
backupSSTWaitInterval = 2 * time.Second
backupSSTMaxWaitInterval = 3 * time.Second

resetTSRetryTime = 16
resetTSWaitInterval = 50 * time.Millisecond
resetTSMaxWaitInterval = 500 * time.Millisecond
Expand All @@ -42,8 +47,21 @@
ChecksumRetryTime = 8
ChecksumWaitInterval = 1 * time.Second
ChecksumMaxWaitInterval = 30 * time.Second

gRPC_Cancel = "the client connection is closing"
)

// At least, there are two possible cancel() call,
// one from go context, another from gRPC, here we retry when gRPC cancel with connection closing
func isGRPCCancel(err error) bool {
if s, ok := status.FromError(err); ok {
if strings.Contains(s.Message(), gRPC_Cancel) {
return true
}

Check warning on line 60 in br/pkg/utils/backoff.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/utils/backoff.go#L59-L60

Added lines #L59 - L60 were not covered by tests
}
return false
}

// RetryState is the mutable state needed for retrying.
// It likes the `utils.Backoffer`, but more fundamental:
// this only control the backoff time and knows nothing about what error happens.
Expand Down Expand Up @@ -143,6 +161,11 @@
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval, errContext)
}

func NewBackupSSTBackoffer() Backoffer {
errContext := NewErrorContext("backup sst", 3)
return NewBackoffer(backupSSTRetryTimes, backupSSTWaitInterval, backupSSTMaxWaitInterval, errContext)
}

func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err))
// we don't care storeID here.
Expand All @@ -162,9 +185,17 @@
bo.attempt = 0
default:
switch status.Code(e) {
case codes.Unavailable, codes.Aborted:
case codes.Unavailable, codes.Aborted, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Internal:

Check warning on line 188 in br/pkg/utils/backoff.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/utils/backoff.go#L188

Added line #L188 was not covered by tests
bo.delayTime = 2 * bo.delayTime
bo.attempt--
case codes.Canceled:
if isGRPCCancel(err) {
bo.delayTime = 2 * bo.delayTime
bo.attempt--

Check warning on line 194 in br/pkg/utils/backoff.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/utils/backoff.go#L193-L194

Added lines #L193 - L194 were not covered by tests
} else {
bo.delayTime = 0
bo.attempt = 0
}
default:
// Unexpected error
bo.delayTime = 0
Expand Down
19 changes: 19 additions & 0 deletions br/pkg/utils/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,22 @@ func TestNewDownloadSSTBackofferWithCancel(t *testing.T) {
context.Canceled,
}, multierr.Errors(err))
}

func TestNewBackupSSTBackofferWithCancel(t *testing.T) {
var counter int
backoffer := utils.NewBackupSSTBackoffer()
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
if counter == 3 {
return context.Canceled
}
return berrors.ErrKVIngestFailed
}, backoffer)
require.Equal(t, 4, counter)
require.Equal(t, []error{
berrors.ErrKVIngestFailed,
berrors.ErrKVIngestFailed,
berrors.ErrKVIngestFailed,
context.Canceled,
}, multierr.Errors(err))
}
1 change: 1 addition & 0 deletions br/pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var retryableServerError = []string{
"body write aborted",
"error during dispatch",
"put object timeout",
"timeout after",
"internalerror",
"not read from or written to within the timeout period",
"<code>requesttimeout</code>",
Expand Down
2 changes: 1 addition & 1 deletion br/tests/br_full/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ test_log="${TEST_DIR}/${DB}_test.log"
error_str="not read from or written to within the timeout period"
unset BR_LOG_TO_TERM

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"<Code>RequestTimeout</Code>\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"<Code>InvalidPart</Code>\")->1*return(\"end of file before message length reached\")"
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"<Code>RequestTimeout</Code>\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"<Code>InvalidPart</Code>\")->1*return(\"end of file before message length reached\")->1*return(\"timeout after\")"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4 --log-file $test_log
export GO_FAILPOINTS=""
size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}')
Expand Down
Loading