diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go
index b7e209d92e9fb..d9e790403f89f 100644
--- a/br/pkg/backup/client.go
+++ b/br/pkg/backup/client.go
@@ -1337,57 +1337,22 @@ func SendBackup(
var errReset error
var errBackup error
- for retry := 0; retry < backupRetryTimes; retry++ {
- logutil.CL(ctx).Info("try backup",
- zap.Int("retry time", retry),
- )
+ retry := -1
+ return utils.WithRetry(ctx, func() error {
+ retry += 1
+ if retry != 0 {
+ client, errReset = resetFn()
+ if errReset != nil {
+ return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+
+ "please check the tikv status", storeID)
+ }
+ }
+ logutil.CL(ctx).Info("try backup", zap.Int("retry time", retry))
errBackup = doSendBackup(ctx, client, req, respFn)
if errBackup != nil {
- if isRetryableError(errBackup) {
- time.Sleep(3 * time.Second)
- client, errReset = resetFn()
- if errReset != nil {
- return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+
- "please check the tikv status", storeID)
- }
- continue
- }
- logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID), zap.Int("retry", retry))
return berrors.ErrFailedToConnect.Wrap(errBackup).GenWithStack("failed to create backup stream to store %d", storeID)
}
- // finish backup
- break
- }
- return nil
-}
-// gRPC communication cancelled with connection closing
-const (
- gRPC_Cancel = "the client connection is closing"
-)
-
-// isRetryableError represents whether we should retry reset grpc connection.
-func isRetryableError(err error) bool {
- // some errors can be retried
- // https://github.com/pingcap/tidb/issues/34350
- switch status.Code(err) {
- case codes.Unavailable, codes.DeadlineExceeded,
- codes.ResourceExhausted, codes.Aborted, codes.Internal:
- {
- log.Warn("backup met some errors, these errors can be retry 5 times", zap.Error(err))
- return true
- }
- }
-
- // At least, there are two possible cancel() call,
- // one from backup range, another from gRPC, here we retry when gRPC cancel with connection closing
- if status.Code(err) == codes.Canceled {
- if s, ok := status.FromError(err); ok {
- if strings.Contains(s.Message(), gRPC_Cancel) {
- log.Warn("backup met grpc cancel error, this errors can be retry 5 times", zap.Error(err))
- return true
- }
- }
- }
- return false
+ return nil
+ }, utils.NewBackupSSTBackoffer())
}
diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go
index 8cc6e1970aff9..6466ee459cedb 100644
--- a/br/pkg/storage/s3.go
+++ b/br/pkg/storage/s3.go
@@ -515,22 +515,41 @@ func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) er
// ReadFile reads the file from the storage and returns the contents.
func (rs *S3Storage) ReadFile(ctx context.Context, file string) ([]byte, error) {
- input := &s3.GetObjectInput{
- Bucket: aws.String(rs.options.Bucket),
- Key: aws.String(rs.options.Prefix + file),
- }
- result, err := rs.svc.GetObjectWithContext(ctx, input)
- if err != nil {
- return nil, errors.Annotatef(err,
- "failed to read s3 file, file info: input.bucket='%s', input.key='%s'",
- *input.Bucket, *input.Key)
- }
- defer result.Body.Close()
- data, err := io.ReadAll(result.Body)
- if err != nil {
- return nil, errors.Trace(err)
+ var (
+ data []byte
+ readErr error
+ )
+ for retryCnt := 0; retryCnt < maxErrorRetries; retryCnt += 1 {
+ input := &s3.GetObjectInput{
+ Bucket: aws.String(rs.options.Bucket),
+ Key: aws.String(rs.options.Prefix + file),
+ }
+ result, err := rs.svc.GetObjectWithContext(ctx, input)
+ if err != nil {
+ return nil, errors.Annotatef(err,
+ "failed to read s3 file, file info: input.bucket='%s', input.key='%s'",
+ *input.Bucket, *input.Key)
+ }
+ data, readErr = io.ReadAll(result.Body)
+ // close the body of response since data has been already read out
+ result.Body.Close()
+ // for unit test
+ failpoint.Inject("read-s3-body-failed", func(_ failpoint.Value) {
+ log.Info("original error", zap.Error(readErr))
+ readErr = errors.Errorf("read: connection reset by peer")
+ })
+ if readErr != nil {
+ if isDeadlineExceedError(readErr) || isCancelError(readErr) {
+ return nil, errors.Annotatef(readErr, "failed to read body from get object result, file info: input.bucket='%s', input.key='%s', retryCnt='%d'",
+ *input.Bucket, *input.Key, retryCnt)
+ }
+ continue
+ }
+ return data, nil
}
- return data, nil
+ // retry too much, should be failed
+ return nil, errors.Annotatef(readErr, "failed to read body from get object result (retry too much), file info: input.bucket='%s', input.key='%s'",
+ rs.options.Bucket, rs.options.Prefix+file)
}
// DeleteFile delete the file in s3 storage
@@ -989,6 +1008,10 @@ type retryerWithLog struct {
client.DefaultRetryer
}
+func isCancelError(err error) bool {
+ return strings.Contains(err.Error(), "context canceled")
+}
+
func isDeadlineExceedError(err error) bool {
// TODO find a better way.
// Known challenges:
diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go
index 52468dbe32a9b..f84d6d9d32a1c 100644
--- a/br/pkg/storage/s3_test.go
+++ b/br/pkg/storage/s3_test.go
@@ -12,6 +12,7 @@ import (
"net/http"
"net/http/httptest"
"os"
+ "strings"
"sync"
"testing"
@@ -1343,3 +1344,40 @@ func TestRetryError(t *testing.T) {
require.NoError(t, err)
require.Equal(t, count, int32(2))
}
+
+func TestS3ReadFileRetryable(t *testing.T) {
+ s := createS3Suite(t)
+ ctx := aws.BackgroundContext()
+ errMsg := "just some unrelated error"
+ expectedErr := errors.New(errMsg)
+
+ s.s3.EXPECT().
+ GetObjectWithContext(ctx, gomock.Any()).
+ DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
+ require.Equal(t, "bucket", aws.StringValue(input.Bucket))
+ require.Equal(t, "prefix/file", aws.StringValue(input.Key))
+ return &s3.GetObjectOutput{
+ Body: io.NopCloser(bytes.NewReader([]byte("test"))),
+ }, nil
+ })
+ s.s3.EXPECT().
+ GetObjectWithContext(ctx, gomock.Any()).
+ DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
+ require.Equal(t, "bucket", aws.StringValue(input.Bucket))
+ require.Equal(t, "prefix/file", aws.StringValue(input.Key))
+ return &s3.GetObjectOutput{
+ Body: io.NopCloser(bytes.NewReader([]byte("test"))),
+ }, nil
+ })
+ s.s3.EXPECT().
+ GetObjectWithContext(ctx, gomock.Any()).
+ Return(nil, expectedErr)
+
+ require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed", "2*return(true)"))
+ defer func() {
+ failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed")
+ }()
+ _, err := s.storage.ReadFile(ctx, "file")
+ require.Error(t, err)
+ require.True(t, strings.Contains(err.Error(), errMsg))
+}
diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel
index a47c472a356d2..103dfecb592b9 100644
--- a/br/pkg/utils/BUILD.bazel
+++ b/br/pkg/utils/BUILD.bazel
@@ -89,7 +89,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
- shard_count = 33,
+ shard_count = 34,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go
index 1a9f157807f3e..11869f6752c16 100644
--- a/br/pkg/utils/backoff.go
+++ b/br/pkg/utils/backoff.go
@@ -6,6 +6,7 @@ import (
"context"
"database/sql"
"io"
+ "strings"
"time"
"github.com/pingcap/errors"
@@ -26,6 +27,10 @@ const (
downloadSSTWaitInterval = 1 * time.Second
downloadSSTMaxWaitInterval = 4 * time.Second
+ backupSSTRetryTimes = 5
+ backupSSTWaitInterval = 2 * time.Second
+ backupSSTMaxWaitInterval = 3 * time.Second
+
resetTSRetryTime = 16
resetTSWaitInterval = 50 * time.Millisecond
resetTSMaxWaitInterval = 500 * time.Millisecond
@@ -35,11 +40,34 @@ const (
resetTSMaxWaitIntervalExt = 300 * time.Second
// region heartbeat are 10 seconds by default, if some region has 2 heartbeat missing (15 seconds), it appear to be a network issue between PD and TiKV.
+<<<<<<< HEAD
flashbackRetryTime = 3
flashbackWaitInterval = 3000 * time.Millisecond
flashbackMaxWaitInterval = 15 * time.Second
+=======
+ FlashbackRetryTime = 3
+ FlashbackWaitInterval = 3 * time.Second
+ FlashbackMaxWaitInterval = 15 * time.Second
+
+ ChecksumRetryTime = 8
+ ChecksumWaitInterval = 1 * time.Second
+ ChecksumMaxWaitInterval = 30 * time.Second
+
+ gRPC_Cancel = "the client connection is closing"
+>>>>>>> d6ef1c722a9 (br: add more retry strategy (s3.ReadFile: body reader / pushBackup: backoffer) (#50541))
)
+// At least, there are two possible cancel() call,
+// one from go context, another from gRPC, here we retry when gRPC cancel with connection closing
+func isGRPCCancel(err error) bool {
+ if s, ok := status.FromError(err); ok {
+ if strings.Contains(s.Message(), gRPC_Cancel) {
+ return true
+ }
+ }
+ return false
+}
+
// RetryState is the mutable state needed for retrying.
// It likes the `utils.Backoffer`, but more fundamental:
// this only control the backoff time and knows nothing about what error happens.
@@ -130,6 +158,11 @@ func NewDownloadSSTBackoffer() Backoffer {
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval)
}
+func NewBackupSSTBackoffer() Backoffer {
+ errContext := NewErrorContext("backup sst", 3)
+ return NewBackoffer(backupSSTRetryTimes, backupSSTWaitInterval, backupSSTMaxWaitInterval, errContext)
+}
+
func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err))
if MessageIsRetryableStorageError(err.Error()) {
@@ -147,9 +180,21 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
bo.attempt = 0
default:
switch status.Code(e) {
+<<<<<<< HEAD
case codes.Unavailable, codes.Aborted:
+=======
+ case codes.Unavailable, codes.Aborted, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Internal:
+>>>>>>> d6ef1c722a9 (br: add more retry strategy (s3.ReadFile: body reader / pushBackup: backoffer) (#50541))
bo.delayTime = 2 * bo.delayTime
bo.attempt--
+ case codes.Canceled:
+ if isGRPCCancel(err) {
+ bo.delayTime = 2 * bo.delayTime
+ bo.attempt--
+ } else {
+ bo.delayTime = 0
+ bo.attempt = 0
+ }
default:
// Unexcepted error
bo.delayTime = 0
diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go
index 31778052f77e1..3d54bfdc3a7e0 100644
--- a/br/pkg/utils/backoff_test.go
+++ b/br/pkg/utils/backoff_test.go
@@ -160,3 +160,22 @@ func TestNewDownloadSSTBackofferWithCancel(t *testing.T) {
context.Canceled,
}, multierr.Errors(err))
}
+
+func TestNewBackupSSTBackofferWithCancel(t *testing.T) {
+ var counter int
+ backoffer := utils.NewBackupSSTBackoffer()
+ err := utils.WithRetry(context.Background(), func() error {
+ defer func() { counter++ }()
+ if counter == 3 {
+ return context.Canceled
+ }
+ return berrors.ErrKVIngestFailed
+ }, backoffer)
+ require.Equal(t, 4, counter)
+ require.Equal(t, []error{
+ berrors.ErrKVIngestFailed,
+ berrors.ErrKVIngestFailed,
+ berrors.ErrKVIngestFailed,
+ context.Canceled,
+ }, multierr.Errors(err))
+}
diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go
index c0476e8db3701..3c23d8e4ac07e 100644
--- a/br/pkg/utils/retry.go
+++ b/br/pkg/utils/retry.go
@@ -30,6 +30,7 @@ var retryableServerError = []string{
"body write aborted",
"error during dispatch",
"put object timeout",
+ "timeout after",
"internalerror",
"not read from or written to within the timeout period",
"requesttimeout
",
diff --git a/br/tests/br_full/run.sh b/br/tests/br_full/run.sh
index fabc7154302d0..a97fc3cd0cf68 100755
--- a/br/tests/br_full/run.sh
+++ b/br/tests/br_full/run.sh
@@ -52,7 +52,7 @@ test_log="${TEST_DIR}/${DB}_test.log"
error_str="not read from or written to within the timeout period"
unset BR_LOG_TO_TERM
-export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"RequestTimeout
\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"InvalidPart
\")->1*return(\"end of file before message length reached\")"
+export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"RequestTimeout
\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"InvalidPart
\")->1*return(\"end of file before message length reached\")->1*return(\"timeout after\")"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4 --log-file $test_log
export GO_FAILPOINTS=""
size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}')