Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Leavrth <jianjun.liao@outlook.com>
  • Loading branch information
Leavrth committed Jan 24, 2024
1 parent 2a90692 commit 8573aae
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 79 deletions.
61 changes: 48 additions & 13 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1337,22 +1337,57 @@ func SendBackup(
var errReset error
var errBackup error

retry := -1
return utils.WithRetry(ctx, func() error {
retry += 1
if retry != 0 {
client, errReset = resetFn()
if errReset != nil {
return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+
"please check the tikv status", storeID)
}
}
logutil.CL(ctx).Info("try backup", zap.Int("retry time", retry))
for retry := 0; retry < backupRetryTimes; retry++ {
logutil.CL(ctx).Info("try backup",
zap.Int("retry time", retry),
)
errBackup = doSendBackup(ctx, client, req, respFn)
if errBackup != nil {
if isRetryableError(errBackup) {
time.Sleep(3 * time.Second)
client, errReset = resetFn()
if errReset != nil {
return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+
"please check the tikv status", storeID)
}
continue
}
logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID), zap.Int("retry", retry))
return berrors.ErrFailedToConnect.Wrap(errBackup).GenWithStack("failed to create backup stream to store %d", storeID)
}
// finish backup
break
}
return nil
}

return nil
}, utils.NewBackupSSTBackoffer())
// gRPC communication cancelled with connection closing
const (
gRPC_Cancel = "the client connection is closing"
)

// isRetryableError represents whether we should retry reset grpc connection.
func isRetryableError(err error) bool {
// some errors can be retried
// https://github.com/pingcap/tidb/issues/34350
switch status.Code(err) {
case codes.Unavailable, codes.DeadlineExceeded,
codes.ResourceExhausted, codes.Aborted, codes.Internal:
{
log.Warn("backup met some errors, these errors can be retry 5 times", zap.Error(err))
return true
}
}

// At least, there are two possible cancel() call,
// one from backup range, another from gRPC, here we retry when gRPC cancel with connection closing
if status.Code(err) == codes.Canceled {
if s, ok := status.FromError(err); ok {
if strings.Contains(s.Message(), gRPC_Cancel) {
log.Warn("backup met grpc cancel error, this errors can be retry 5 times", zap.Error(err))
return true
}
}
}
return false
}
45 changes: 0 additions & 45 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"database/sql"
"io"
"strings"
"time"

"github.com/pingcap/errors"
Expand All @@ -27,10 +26,6 @@ const (
downloadSSTWaitInterval = 1 * time.Second
downloadSSTMaxWaitInterval = 4 * time.Second

backupSSTRetryTimes = 5
backupSSTWaitInterval = 2 * time.Second
backupSSTMaxWaitInterval = 3 * time.Second

resetTSRetryTime = 16
resetTSWaitInterval = 50 * time.Millisecond
resetTSMaxWaitInterval = 500 * time.Millisecond
Expand All @@ -40,34 +35,11 @@ const (
resetTSMaxWaitIntervalExt = 300 * time.Second

// region heartbeat are 10 seconds by default, if some region has 2 heartbeat missing (15 seconds), it appear to be a network issue between PD and TiKV.
<<<<<<< HEAD
flashbackRetryTime = 3
flashbackWaitInterval = 3000 * time.Millisecond
flashbackMaxWaitInterval = 15 * time.Second
=======
FlashbackRetryTime = 3
FlashbackWaitInterval = 3 * time.Second
FlashbackMaxWaitInterval = 15 * time.Second

ChecksumRetryTime = 8
ChecksumWaitInterval = 1 * time.Second
ChecksumMaxWaitInterval = 30 * time.Second

gRPC_Cancel = "the client connection is closing"
>>>>>>> d6ef1c722a9 (br: add more retry strategy (s3.ReadFile: body reader / pushBackup: backoffer) (#50541))
)

// At least, there are two possible cancel() call,
// one from go context, another from gRPC, here we retry when gRPC cancel with connection closing
func isGRPCCancel(err error) bool {
if s, ok := status.FromError(err); ok {
if strings.Contains(s.Message(), gRPC_Cancel) {
return true
}
}
return false
}

// RetryState is the mutable state needed for retrying.
// It likes the `utils.Backoffer`, but more fundamental:
// this only control the backoff time and knows nothing about what error happens.
Expand Down Expand Up @@ -158,11 +130,6 @@ func NewDownloadSSTBackoffer() Backoffer {
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval)
}

func NewBackupSSTBackoffer() Backoffer {
errContext := NewErrorContext("backup sst", 3)
return NewBackoffer(backupSSTRetryTimes, backupSSTWaitInterval, backupSSTMaxWaitInterval, errContext)
}

func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err))
if MessageIsRetryableStorageError(err.Error()) {
Expand All @@ -180,21 +147,9 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
bo.attempt = 0
default:
switch status.Code(e) {
<<<<<<< HEAD
case codes.Unavailable, codes.Aborted:
=======
case codes.Unavailable, codes.Aborted, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Internal:
>>>>>>> d6ef1c722a9 (br: add more retry strategy (s3.ReadFile: body reader / pushBackup: backoffer) (#50541))
bo.delayTime = 2 * bo.delayTime
bo.attempt--
case codes.Canceled:
if isGRPCCancel(err) {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
} else {
bo.delayTime = 0
bo.attempt = 0
}
default:
// Unexcepted error
bo.delayTime = 0
Expand Down
19 changes: 0 additions & 19 deletions br/pkg/utils/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,22 +160,3 @@ func TestNewDownloadSSTBackofferWithCancel(t *testing.T) {
context.Canceled,
}, multierr.Errors(err))
}

func TestNewBackupSSTBackofferWithCancel(t *testing.T) {
var counter int
backoffer := utils.NewBackupSSTBackoffer()
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
if counter == 3 {
return context.Canceled
}
return berrors.ErrKVIngestFailed
}, backoffer)
require.Equal(t, 4, counter)
require.Equal(t, []error{
berrors.ErrKVIngestFailed,
berrors.ErrKVIngestFailed,
berrors.ErrKVIngestFailed,
context.Canceled,
}, multierr.Errors(err))
}
1 change: 0 additions & 1 deletion br/pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ var retryableServerError = []string{
"body write aborted",
"error during dispatch",
"put object timeout",
"timeout after",
"internalerror",
"not read from or written to within the timeout period",
"<code>requesttimeout</code>",
Expand Down
2 changes: 1 addition & 1 deletion br/tests/br_full/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ test_log="${TEST_DIR}/${DB}_test.log"
error_str="not read from or written to within the timeout period"
unset BR_LOG_TO_TERM

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"<Code>RequestTimeout</Code>\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"<Code>InvalidPart</Code>\")->1*return(\"end of file before message length reached\")->1*return(\"timeout after\")"
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"<Code>RequestTimeout</Code>\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"<Code>InvalidPart</Code>\")->1*return(\"end of file before message length reached\")"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4 --log-file $test_log
export GO_FAILPOINTS=""
size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}')
Expand Down

0 comments on commit 8573aae

Please sign in to comment.