Skip to content

Commit

Permalink
backend: always retry ingest and get region if it's retryble (pingcap…
Browse files Browse the repository at this point in the history
…#405)

* retry get region if not region leader available

* alway retry if get region return nil

* always retry if get region returns nil

Co-authored-by: 山岚 <36239017+YuJuncen@users.noreply.github.com>
Co-authored-by: kennytm <kennytm@gmail.com>
  • Loading branch information
3 people authored Sep 28, 2020
1 parent bce9977 commit 43b0914
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions lightning/backend/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,13 +878,15 @@ func (local *local) WriteAndIngestPairs(

for _, meta := range metas {
var err error
for i := 0; i < maxRetryTimes; i++ {
errCnt := 0
for errCnt < maxRetryTimes {
log.L().Debug("ingest meta", zap.Reflect("meta", meta))
var resp *sst.IngestResponse
resp, err = local.Ingest(ctx, meta, region)
if err != nil {
log.L().Warn("ingest failed", zap.Error(err), zap.Reflect("meta", meta),
zap.Reflect("region", region))
errCnt++
continue
}
failpoint.Inject("FailIngestMeta", func(val failpoint.Value) {
Expand All @@ -899,7 +901,7 @@ func (local *local) WriteAndIngestPairs(
})
var needRetry bool
var newRegion *split.RegionInfo
needRetry, newRegion, err = isIngestRetryable(resp, region, meta)
needRetry, newRegion, err = local.isIngestRetryable(ctx, resp, region, meta)
if err == nil {
// ingest next meta
break
Expand All @@ -921,7 +923,7 @@ func (local *local) WriteAndIngestPairs(
}
}
if err != nil {
log.L().Error("all retry ingest failed", zap.Reflect("ingest meta", meta), zap.Error(err))
log.L().Warn("all retry ingest failed", zap.Reflect("ingest meta", meta), zap.Error(err))
return remainRange, errors.Trace(err)
}
}
Expand Down Expand Up @@ -1114,7 +1116,12 @@ func (local *local) NewEncoder(tbl table.Table, options *SessionOptions) Encoder
return NewTableKVEncoder(tbl, options)
}

func isIngestRetryable(resp *sst.IngestResponse, region *split.RegionInfo, meta *sst.SSTMeta) (bool, *split.RegionInfo, error) {
func (local *local) isIngestRetryable(
ctx context.Context,
resp *sst.IngestResponse,
region *split.RegionInfo,
meta *sst.SSTMeta,
) (bool, *split.RegionInfo, error) {
if resp.GetError() == nil {
return false, nil, nil
}
Expand All @@ -1127,8 +1134,23 @@ func isIngestRetryable(resp *sst.IngestResponse, region *split.RegionInfo, meta
Leader: newLeader,
Region: region.Region,
}
return true, newRegion, errors.Errorf("not leader: %s", errPb.GetMessage())
} else {
var err error
for i := 0; ; i++ {
newRegion, err = local.splitCli.GetRegion(ctx, region.Region.GetStartKey())
if err != nil {
return false, nil, errors.Trace(err)
}
if newRegion == nil {
log.L().Warn("get region by key return nil, will retry", zap.Reflect("region", region),
zap.Int("retry", i))
time.Sleep(time.Second)
continue
}
}
}
canRetry := newRegion != nil
return canRetry, newRegion, errors.Errorf("not leader: %s", errPb.GetMessage())
case errPb.EpochNotMatch != nil:
if currentRegions := errPb.GetEpochNotMatch().GetCurrentRegions(); currentRegions != nil {
var currentRegion *metapb.Region
Expand Down

0 comments on commit 43b0914

Please sign in to comment.