Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

restore: speed up retry on not leader #179

Merged
merged 7 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
codecov:
require_ci_to_pass: yes

coverage:
status:
project:
default:
# Allow the coverage to drop by 3%
threshold: 3%
patch: off
3 changes: 1 addition & 2 deletions pkg/restore/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

var (
errNotLeader = errors.NewNoStackError("not leader")
errEpochNotMatch = errors.NewNoStackError("epoch not match")
errKeyNotInRegion = errors.NewNoStackError("key not in region")
errRegionNotFound = errors.NewNoStackError("region not found")
Expand Down Expand Up @@ -66,7 +65,7 @@ func newDownloadSSTBackoffer() utils.Backoffer {

func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
switch errors.Cause(err) {
case errResp, errGrpc, errEpochNotMatch, errNotLeader:
case errResp, errGrpc, errEpochNotMatch:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
case errRangeIsEmpty, errRewriteRuleNotFound:
Expand Down
108 changes: 65 additions & 43 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,23 +171,23 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul
ctx, cancel := context.WithTimeout(importer.ctx, importScanRegionTime)
defer cancel()
// Scan regions covered by the file range
regionInfos, err1 := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0)
if err1 != nil {
return errors.Trace(err1)
regionInfos, errScanRegion := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0)
if errScanRegion != nil {
return errors.Trace(errScanRegion)
}
log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos)))
// Try to download and ingest the file in every region
for _, regionInfo := range regionInfos {
info := regionInfo
// Try to download file.
var downloadMeta *import_sstpb.SSTMeta
err1 = utils.WithRetry(importer.ctx, func() error {
errDownload := utils.WithRetry(importer.ctx, func() error {
var e error
downloadMeta, e = importer.downloadSST(info, file, rewriteRules)
return e
}, newDownloadSSTBackoffer())
if err1 != nil {
if err1 == errRewriteRuleNotFound || err1 == errRangeIsEmpty {
if errDownload != nil {
if errDownload == errRewriteRuleNotFound || errDownload == errRangeIsEmpty {
// Skip this region
continue
}
Expand All @@ -196,32 +196,68 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul
zap.Stringer("region", info.Region),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey),
zap.Error(err1))
return err1
zap.Error(errDownload))
return errDownload
}
err1 = importer.ingestSST(downloadMeta, info)
// If error is `NotLeader`, update the region info and retry
for errors.Cause(err1) == errNotLeader {
log.Debug("ingest sst returns not leader error, retry it",
zap.Stringer("region", info.Region))
var newInfo *RegionInfo
newInfo, err1 = importer.metaClient.GetRegion(importer.ctx, info.Region.GetStartKey())
if err1 != nil {
break
ingestResp, errIngest := importer.ingestSST(downloadMeta, info)
ingestRetry:
for errIngest == nil {
errPb := ingestResp.GetError()
if errPb == nil {
// Ingest success
break ingestRetry
}
if !checkRegionEpoch(newInfo, info) {
err1 = errEpochNotMatch
break
switch {
case errPb.NotLeader != nil:
// If error is `NotLeader`, update the region info and retry
var newInfo *RegionInfo
if newLeader := errPb.GetNotLeader().GetLeader(); newLeader != nil {
newInfo = &RegionInfo{
Leader: newLeader,
Region: info.Region,
}
} else {
// Slow path, get region from PD
newInfo, errIngest = importer.metaClient.GetRegion(
importer.ctx, info.Region.GetStartKey())
if errIngest != nil {
break ingestRetry
}
}
log.Debug("ingest sst returns not leader error, retry it",
zap.Stringer("region", info.Region),
zap.Stringer("newLeader", newInfo.Leader))

if !checkRegionEpoch(newInfo, info) {
errIngest = errors.AddStack(errEpochNotMatch)
break ingestRetry
}
ingestResp, errIngest = importer.ingestSST(downloadMeta, newInfo)
case errPb.EpochNotMatch != nil:
// TODO handle epoch not match error
// 1. retry download if needed
// 2. retry ingest
errIngest = errors.AddStack(errEpochNotMatch)
break ingestRetry
case errPb.RegionNotFound != nil:
errIngest = errors.AddStack(errRegionNotFound)
break ingestRetry
case errPb.KeyNotInRegion != nil:
errIngest = errors.AddStack(errKeyNotInRegion)
break ingestRetry
default:
errIngest = errors.Errorf("ingest error %s", errPb)
break ingestRetry
}
err1 = importer.ingestSST(downloadMeta, newInfo)
}
if err1 != nil {

if errIngest != nil {
log.Error("ingest file failed",
zap.Stringer("file", file),
zap.Stringer("range", downloadMeta.GetRange()),
zap.Stringer("region", info.Region),
zap.Error(err1))
return err1
zap.Error(errIngest))
return errIngest
}
summary.CollectSuccessUnit(summary.TotalKV, file.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, file.TotalBytes)
Expand Down Expand Up @@ -290,7 +326,7 @@ func (importer *FileImporter) downloadSST(
func (importer *FileImporter) ingestSST(
sstMeta *import_sstpb.SSTMeta,
regionInfo *RegionInfo,
) error {
) (*import_sstpb.IngestResponse, error) {
leader := regionInfo.Leader
if leader == nil {
leader = regionInfo.Region.GetPeers()[0]
Expand All @@ -304,26 +340,12 @@ func (importer *FileImporter) ingestSST(
Context: reqCtx,
Sst: sstMeta,
}
log.Debug("download SST", zap.Stringer("sstMeta", sstMeta))
log.Debug("ingest SST", zap.Stringer("sstMeta", sstMeta), zap.Reflect("leader", leader))
resp, err := importer.importClient.IngestSST(importer.ctx, leader.GetStoreId(), req)
if err != nil {
if strings.Contains(err.Error(), "RegionNotFound") {
return errors.Trace(errRegionNotFound)
}
return errors.Trace(err)
}
respErr := resp.GetError()
if respErr != nil {
log.Debug("ingest sst resp error", zap.Stringer("error", respErr))
if respErr.GetKeyNotInRegion() != nil {
return errors.Trace(errKeyNotInRegion)
}
if respErr.GetNotLeader() != nil {
return errors.Trace(errNotLeader)
}
return errors.Wrap(errResp, respErr.String())
return nil, errors.Trace(err)
}
return nil
return resp, nil
}

func checkRegionEpoch(new, old *RegionInfo) bool {
Expand All @@ -347,5 +369,5 @@ func extractDownloadSSTError(e error) error {
case strings.Contains(e.Error(), "Cannot read"):
err = errCannotRead
}
return errors.Trace(err)
return errors.Annotatef(err, "%s", e)
}
36 changes: 31 additions & 5 deletions tests/_utils/run_services
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ start_services() {
stop_services

echo "Starting PD..."
mkdir -p "$TEST_DIR/pd"
bin/pd-server \
--client-urls "http://$PD_ADDR" \
--log-file "$TEST_DIR/pd.log" \
Expand All @@ -56,6 +57,7 @@ start_services() {

echo "Starting TiKV..."
for i in $(seq $TIKV_COUNT); do
mkdir -p "$TEST_DIR/tikv${i}"
bin/tikv-server \
--pd "$PD_ADDR" \
-A "$TIKV_ADDR$i" \
Expand All @@ -64,7 +66,16 @@ start_services() {
-C "tests/config/tikv.toml" \
-s "$TEST_DIR/tikv${i}" &
done
sleep 1

echo "Waiting initializing TiKV..."
while ! curl -sf "http://$PD_ADDR/pd/api/v1/cluster/status" | grep '"is_initialized": true'; do
i=$((i+1))
if [ "$i" -gt 10 ]; then
echo 'Failed to initialize TiKV cluster'
exit 1
fi
sleep 3
done

echo "Starting TiDB..."
bin/tidb-server \
Expand Down Expand Up @@ -116,7 +127,8 @@ start_services_withTLS() {
--data-dir "$TEST_DIR/pd" &
# wait until PD is online...
i=0
while ! curl -k --cert $1/certificates/client.pem \
while ! curl --cacert $1/certificates/ca.pem \
--cert $1/certificates/client.pem \
--key $1/certificates/client-key.pem \
-o /dev/null -sf "https://$PD_ADDR/pd/api/v1/version"; do
i=$((i+1))
Expand All @@ -136,7 +148,19 @@ start_services_withTLS() {
-C "$TIKV_CONFIG" \
-s "$TEST_DIR/tikv${i}" &
done
sleep 1

echo "Waiting initializing TiKV..."
while ! curl --cacert $1/certificates/ca.pem \
--cert $1/certificates/client.pem \
--key $1/certificates/client-key.pem \
-sf "https://$PD_ADDR/pd/api/v1/cluster/status" | grep '"is_initialized": true'; do
i=$((i+1))
if [ "$i" -gt 10 ]; then
echo 'Failed to initialize TiKV cluster'
exit 1
fi
sleep 3
done

echo "Starting TiDB..."
bin/tidb-server \
Expand All @@ -149,7 +173,8 @@ start_services_withTLS() {

echo "Verifying TiDB is started..."
i=0
while ! curl -k --cert $1/certificates/client.pem \
while ! curl --cacert $1/certificates/ca.pem \
--cert $1/certificates/client.pem \
--key $1/certificates/client-key.pem \
-o /dev/null -sf "https://$TIDB_IP:10080/status"; do
i=$((i+1))
Expand All @@ -161,7 +186,8 @@ start_services_withTLS() {
done

i=0
while ! curl -k --cert $1/certificates/client.pem \
while ! curl --cacert $1/certificates/ca.pem \
--cert $1/certificates/client.pem \
--key $1/certificates/client-key.pem \
"https://$PD_ADDR/pd/api/v1/cluster/status" -sf | grep -q "\"is_initialized\": true"; do
i=$((i+1))
Expand Down
12 changes: 5 additions & 7 deletions tests/br_table_partition/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,23 @@ done
echo "backup start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4

for i in $(seq $DB_COUNT); do
run_sql "DROP DATABASE $DB${i};"
done
run_sql "DROP DATABASE $DB;"

# restore full
echo "restore start..."
run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR

for i in $(seq $DB_COUNT); do
for i in $(seq $TABLE_COUNT); do
row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE${i};" | awk '/COUNT/{print $2}')
done

fail=false
for i in $(seq $DB_COUNT); do
for i in $(seq $TABLE_COUNT); do
if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then
fail=true
echo "TEST: [$TEST_NAME] fail on database $DB${i}"
echo "TEST: [$TEST_NAME] fail on table $DB.$TABLE${i}"
fi
echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}"
echo "table $DB.$TABLE${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}"
done

if $fail; then
Expand Down