diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go
index 9e6af11b9d326..fff4ae0f84590 100644
--- a/br/pkg/backup/client.go
+++ b/br/pkg/backup/client.go
@@ -1437,57 +1437,22 @@ func SendBackup(
var errReset error
var errBackup error
- for retry := 0; retry < backupRetryTimes; retry++ {
- logutil.CL(ctx).Info("try backup",
- zap.Int("retry time", retry),
- )
+ retry := -1
+ return utils.WithRetry(ctx, func() error {
+ retry += 1
+ if retry != 0 {
+ client, errReset = resetFn()
+ if errReset != nil {
+ return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+
+ "please check the tikv status", storeID)
+ }
+ }
+ logutil.CL(ctx).Info("try backup", zap.Int("retry time", retry))
errBackup = doSendBackup(ctx, client, req, respFn)
if errBackup != nil {
- if isRetryableError(errBackup) {
- time.Sleep(3 * time.Second)
- client, errReset = resetFn()
- if errReset != nil {
- return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+
- "please check the tikv status", storeID)
- }
- continue
- }
- logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID), zap.Int("retry", retry))
return berrors.ErrFailedToConnect.Wrap(errBackup).GenWithStack("failed to create backup stream to store %d", storeID)
}
- // finish backup
- break
- }
- return nil
-}
-
-// gRPC communication cancelled with connection closing
-const (
- gRPC_Cancel = "the client connection is closing"
-)
-
-// isRetryableError represents whether we should retry reset grpc connection.
-func isRetryableError(err error) bool {
- // some errors can be retried
- // https://github.com/pingcap/tidb/issues/34350
- switch status.Code(err) {
- case codes.Unavailable, codes.DeadlineExceeded,
- codes.ResourceExhausted, codes.Aborted, codes.Internal:
- {
- log.Warn("backup met some errors, these errors can be retry 5 times", zap.Error(err))
- return true
- }
- }
- // At least, there are two possible cancel() call,
- // one from backup range, another from gRPC, here we retry when gRPC cancel with connection closing
- if status.Code(err) == codes.Canceled {
- if s, ok := status.FromError(err); ok {
- if strings.Contains(s.Message(), gRPC_Cancel) {
- log.Warn("backup met grpc cancel error, this errors can be retry 5 times", zap.Error(err))
- return true
- }
- }
- }
- return false
+ return nil
+ }, utils.NewBackupSSTBackoffer())
}
diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go
index 22c3250de9bf0..0abb60838fbf4 100644
--- a/br/pkg/storage/s3.go
+++ b/br/pkg/storage/s3.go
@@ -559,22 +559,41 @@ func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) er
// ReadFile reads the file from the storage and returns the contents.
func (rs *S3Storage) ReadFile(ctx context.Context, file string) ([]byte, error) {
- input := &s3.GetObjectInput{
- Bucket: aws.String(rs.options.Bucket),
- Key: aws.String(rs.options.Prefix + file),
- }
- result, err := rs.svc.GetObjectWithContext(ctx, input)
- if err != nil {
- return nil, errors.Annotatef(err,
- "failed to read s3 file, file info: input.bucket='%s', input.key='%s'",
- *input.Bucket, *input.Key)
- }
- defer result.Body.Close()
- data, err := io.ReadAll(result.Body)
- if err != nil {
- return nil, errors.Trace(err)
+ var (
+ data []byte
+ readErr error
+ )
+ for retryCnt := 0; retryCnt < maxErrorRetries; retryCnt += 1 {
+ input := &s3.GetObjectInput{
+ Bucket: aws.String(rs.options.Bucket),
+ Key: aws.String(rs.options.Prefix + file),
+ }
+ result, err := rs.svc.GetObjectWithContext(ctx, input)
+ if err != nil {
+ return nil, errors.Annotatef(err,
+ "failed to read s3 file, file info: input.bucket='%s', input.key='%s'",
+ *input.Bucket, *input.Key)
+ }
+ data, readErr = io.ReadAll(result.Body)
+ // close the body of response since data has been already read out
+ result.Body.Close()
+ // for unit test
+ failpoint.Inject("read-s3-body-failed", func(_ failpoint.Value) {
+ log.Info("original error", zap.Error(readErr))
+ readErr = errors.Errorf("read: connection reset by peer")
+ })
+ if readErr != nil {
+ if isDeadlineExceedError(readErr) || isCancelError(readErr) {
+ return nil, errors.Annotatef(readErr, "failed to read body from get object result, file info: input.bucket='%s', input.key='%s', retryCnt='%d'",
+ *input.Bucket, *input.Key, retryCnt)
+ }
+ continue
+ }
+ return data, nil
}
- return data, nil
+ // retry too much, should be failed
+ return nil, errors.Annotatef(readErr, "failed to read body from get object result (retry too much), file info: input.bucket='%s', input.key='%s'",
+ rs.options.Bucket, rs.options.Prefix+file)
}
// DeleteFile delete the file in s3 storage
@@ -1104,6 +1123,10 @@ type retryerWithLog struct {
client.DefaultRetryer
}
+func isCancelError(err error) bool {
+ return strings.Contains(err.Error(), "context canceled")
+}
+
func isDeadlineExceedError(err error) bool {
// TODO find a better way.
// Known challenges:
diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go
index 829e2049bdbcd..1fa5ae3a32a82 100644
--- a/br/pkg/storage/s3_test.go
+++ b/br/pkg/storage/s3_test.go
@@ -12,6 +12,7 @@ import (
"net/http"
"net/http/httptest"
"os"
+ "strings"
"sync"
"testing"
@@ -1383,3 +1384,40 @@ func TestRetryError(t *testing.T) {
require.NoError(t, err)
require.Equal(t, count, int32(2))
}
+
+func TestS3ReadFileRetryable(t *testing.T) {
+ s := createS3Suite(t)
+ ctx := aws.BackgroundContext()
+ errMsg := "just some unrelated error"
+ expectedErr := errors.New(errMsg)
+
+ s.s3.EXPECT().
+ GetObjectWithContext(ctx, gomock.Any()).
+ DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
+ require.Equal(t, "bucket", aws.StringValue(input.Bucket))
+ require.Equal(t, "prefix/file", aws.StringValue(input.Key))
+ return &s3.GetObjectOutput{
+ Body: io.NopCloser(bytes.NewReader([]byte("test"))),
+ }, nil
+ })
+ s.s3.EXPECT().
+ GetObjectWithContext(ctx, gomock.Any()).
+ DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
+ require.Equal(t, "bucket", aws.StringValue(input.Bucket))
+ require.Equal(t, "prefix/file", aws.StringValue(input.Key))
+ return &s3.GetObjectOutput{
+ Body: io.NopCloser(bytes.NewReader([]byte("test"))),
+ }, nil
+ })
+ s.s3.EXPECT().
+ GetObjectWithContext(ctx, gomock.Any()).
+ Return(nil, expectedErr)
+
+ require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed", "2*return(true)"))
+ defer func() {
+ failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed")
+ }()
+ _, err := s.storage.ReadFile(ctx, "file")
+ require.Error(t, err)
+ require.True(t, strings.Contains(err.Error(), errMsg))
+}
diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel
index 6dc2f2a7420f8..b9db66c4e18cd 100644
--- a/br/pkg/utils/BUILD.bazel
+++ b/br/pkg/utils/BUILD.bazel
@@ -87,7 +87,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
- shard_count = 33,
+ shard_count = 34,
deps = [
"//br/pkg/errors",
"//pkg/kv",
diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go
index 7f2f04cca9db5..6b7aa7a127863 100644
--- a/br/pkg/utils/backoff.go
+++ b/br/pkg/utils/backoff.go
@@ -6,6 +6,7 @@ import (
"context"
"database/sql"
"io"
+ "strings"
"time"
"github.com/pingcap/errors"
@@ -26,6 +27,10 @@ const (
downloadSSTWaitInterval = 1 * time.Second
downloadSSTMaxWaitInterval = 4 * time.Second
+ backupSSTRetryTimes = 5
+ backupSSTWaitInterval = 2 * time.Second
+ backupSSTMaxWaitInterval = 3 * time.Second
+
resetTSRetryTime = 16
resetTSWaitInterval = 50 * time.Millisecond
resetTSMaxWaitInterval = 500 * time.Millisecond
@@ -42,8 +47,21 @@ const (
ChecksumRetryTime = 8
ChecksumWaitInterval = 1 * time.Second
ChecksumMaxWaitInterval = 30 * time.Second
+
+ gRPC_Cancel = "the client connection is closing"
)
+// At least, there are two possible cancel() call,
+// one from go context, another from gRPC, here we retry when gRPC cancel with connection closing
+func isGRPCCancel(err error) bool {
+ if s, ok := status.FromError(err); ok {
+ if strings.Contains(s.Message(), gRPC_Cancel) {
+ return true
+ }
+ }
+ return false
+}
+
// RetryState is the mutable state needed for retrying.
// It likes the `utils.Backoffer`, but more fundamental:
// this only control the backoff time and knows nothing about what error happens.
@@ -143,6 +161,11 @@ func NewDownloadSSTBackoffer() Backoffer {
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval, errContext)
}
+func NewBackupSSTBackoffer() Backoffer {
+ errContext := NewErrorContext("backup sst", 3)
+ return NewBackoffer(backupSSTRetryTimes, backupSSTWaitInterval, backupSSTMaxWaitInterval, errContext)
+}
+
func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err))
// we don't care storeID here.
@@ -162,9 +185,17 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
bo.attempt = 0
default:
switch status.Code(e) {
- case codes.Unavailable, codes.Aborted, codes.DeadlineExceeded:
+ case codes.Unavailable, codes.Aborted, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Internal:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
+ case codes.Canceled:
+ if isGRPCCancel(err) {
+ bo.delayTime = 2 * bo.delayTime
+ bo.attempt--
+ } else {
+ bo.delayTime = 0
+ bo.attempt = 0
+ }
default:
// Unexpected error
bo.delayTime = 0
diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go
index 857010bfc871a..316896bde3f0d 100644
--- a/br/pkg/utils/backoff_test.go
+++ b/br/pkg/utils/backoff_test.go
@@ -178,3 +178,22 @@ func TestNewDownloadSSTBackofferWithCancel(t *testing.T) {
context.Canceled,
}, multierr.Errors(err))
}
+
+func TestNewBackupSSTBackofferWithCancel(t *testing.T) {
+ var counter int
+ backoffer := utils.NewBackupSSTBackoffer()
+ err := utils.WithRetry(context.Background(), func() error {
+ defer func() { counter++ }()
+ if counter == 3 {
+ return context.Canceled
+ }
+ return berrors.ErrKVIngestFailed
+ }, backoffer)
+ require.Equal(t, 4, counter)
+ require.Equal(t, []error{
+ berrors.ErrKVIngestFailed,
+ berrors.ErrKVIngestFailed,
+ berrors.ErrKVIngestFailed,
+ context.Canceled,
+ }, multierr.Errors(err))
+}
diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go
index 559a11d978274..d754070ff80d1 100644
--- a/br/pkg/utils/retry.go
+++ b/br/pkg/utils/retry.go
@@ -31,6 +31,7 @@ var retryableServerError = []string{
"body write aborted",
"error during dispatch",
"put object timeout",
+ "timeout after",
"internalerror",
"not read from or written to within the timeout period",
"requesttimeout
",
diff --git a/br/tests/br_full/run.sh b/br/tests/br_full/run.sh
index dcfb236a85fe0..3752ce40995b9 100755
--- a/br/tests/br_full/run.sh
+++ b/br/tests/br_full/run.sh
@@ -53,7 +53,7 @@ test_log="${TEST_DIR}/${DB}_test.log"
error_str="not read from or written to within the timeout period"
unset BR_LOG_TO_TERM
-export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"RequestTimeout
\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"InvalidPart
\")->1*return(\"end of file before message length reached\")"
+export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"RequestTimeout
\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"InvalidPart
\")->1*return(\"end of file before message length reached\")->1*return(\"timeout after\")"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4 --log-file $test_log
export GO_FAILPOINTS=""
size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}')