From 43b0914783e968d1a0a6ef62cbb843725208c392 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 28 Sep 2020 15:40:19 +0800 Subject: [PATCH] backend: always retry ingest and get region if it's retryble (#405) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * retry get region if not region leader available * alway retry if get region return nil * always retry if get region returns nil Co-authored-by: 山岚 <36239017+YuJuncen@users.noreply.github.com> Co-authored-by: kennytm --- lightning/backend/local.go | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/lightning/backend/local.go b/lightning/backend/local.go index ba15ca382..8f55be935 100644 --- a/lightning/backend/local.go +++ b/lightning/backend/local.go @@ -878,13 +878,15 @@ func (local *local) WriteAndIngestPairs( for _, meta := range metas { var err error - for i := 0; i < maxRetryTimes; i++ { + errCnt := 0 + for errCnt < maxRetryTimes { log.L().Debug("ingest meta", zap.Reflect("meta", meta)) var resp *sst.IngestResponse resp, err = local.Ingest(ctx, meta, region) if err != nil { log.L().Warn("ingest failed", zap.Error(err), zap.Reflect("meta", meta), zap.Reflect("region", region)) + errCnt++ continue } failpoint.Inject("FailIngestMeta", func(val failpoint.Value) { @@ -899,7 +901,7 @@ func (local *local) WriteAndIngestPairs( }) var needRetry bool var newRegion *split.RegionInfo - needRetry, newRegion, err = isIngestRetryable(resp, region, meta) + needRetry, newRegion, err = local.isIngestRetryable(ctx, resp, region, meta) if err == nil { // ingest next meta break @@ -921,7 +923,7 @@ func (local *local) WriteAndIngestPairs( } } if err != nil { - log.L().Error("all retry ingest failed", zap.Reflect("ingest meta", meta), zap.Error(err)) + log.L().Warn("all retry ingest failed", zap.Reflect("ingest meta", meta), zap.Error(err)) return remainRange, errors.Trace(err) } } @@ -1114,7 +1116,12 @@ func (local *local) NewEncoder(tbl table.Table, options *SessionOptions) Encoder return NewTableKVEncoder(tbl, options) } -func isIngestRetryable(resp *sst.IngestResponse, region *split.RegionInfo, meta *sst.SSTMeta) (bool, *split.RegionInfo, error) { +func (local *local) isIngestRetryable( + ctx context.Context, + resp *sst.IngestResponse, + region *split.RegionInfo, + meta *sst.SSTMeta, +) (bool, *split.RegionInfo, error) { if resp.GetError() == nil { return false, nil, nil } @@ -1127,8 +1134,23 @@ func isIngestRetryable(resp *sst.IngestResponse, region *split.RegionInfo, meta Leader: newLeader, Region: region.Region, } - return true, newRegion, errors.Errorf("not leader: %s", errPb.GetMessage()) + } else { + var err error + for i := 0; ; i++ { + newRegion, err = local.splitCli.GetRegion(ctx, region.Region.GetStartKey()) + if err != nil { + return false, nil, errors.Trace(err) + } + if newRegion == nil { + log.L().Warn("get region by key return nil, will retry", zap.Reflect("region", region), + zap.Int("retry", i)) + time.Sleep(time.Second) + continue + } + } } + canRetry := newRegion != nil + return canRetry, newRegion, errors.Errorf("not leader: %s", errPb.GetMessage()) case errPb.EpochNotMatch != nil: if currentRegions := errPb.GetEpochNotMatch().GetCurrentRegions(); currentRegions != nil { var currentRegion *metapb.Region