Skip to content

Commit

Permalink
sink: fix issue, Fail-fast for unrecoverable DML errors (#1928) (#2316)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 21, 2021
1 parent 47d9bff commit 26adda2
Show file tree
Hide file tree
Showing 24 changed files with 476 additions and 255 deletions.
31 changes: 10 additions & 21 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sync/atomic"
"time"

"github.com/cenkalti/backoff"
"github.com/pingcap/errors"
"github.com/pingcap/log"
timodel "github.com/pingcap/parser/model"
Expand Down Expand Up @@ -719,31 +718,21 @@ func (s *schemaStorageImpl) GetSnapshot(ctx context.Context, ts uint64) (*schema

// The infinite retry here is a temporary solution to the `ErrSchemaStorageUnresolved` caused by
// DDL puller lagging too much.
err := retry.RunWithInfiniteRetry(10*time.Millisecond, func() error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
default:
}
startTime := time.Now()
err := retry.Do(ctx, func() error {
var err error
snap, err = s.getSnapshot(ts)
if cerror.ErrSchemaStorageUnresolved.NotEqual(err) {
return backoff.Permanent(err)
}

return err
}, func(elapsed time.Duration) {
if elapsed >= 5*time.Minute {
if time.Since(startTime) >= 5*time.Minute && isRetryable(err) {
log.Warn("GetSnapshot is taking too long, DDL puller stuck?", zap.Uint64("ts", ts))
}
})
return err
}, retry.WithBackoffBaseDelay(10), retry.WithInfiniteTries(), retry.WithIsRetryableErr(isRetryable))

switch e := err.(type) {
case *backoff.PermanentError:
return nil, e.Err
default:
return snap, err
}
return snap, err
}

func isRetryable(err error) bool {
return cerror.IsRetryableError(err) && cerror.ErrSchemaStorageUnresolved.Equal(err)
}

// GetLastSnapshot returns the last snapshot
Expand Down
58 changes: 25 additions & 33 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ import (

const (
dialTimeout = 10 * time.Second
maxRetry = 100
tikvRequestMaxBackoff = 20000 // Maximum total sleep time(in ms)
// TODO find optimal values and test extensively before releasing
// The old values cause the gRPC stream to stall for some unknown reason.
Expand Down Expand Up @@ -447,7 +446,7 @@ func (c *CDCClient) getRegionLimiter(regionID uint64) *rate.Limiter {
}

func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream cdcpb.ChangeData_EventFeedClient, err error) {
err = retry.Run(50*time.Millisecond, 3, func() error {
err = retry.Do(ctx, func() error {
conn, err := c.getConn(ctx, addr)
if err != nil {
log.Info("get connection to store failed, retry later", zap.String("addr", addr), zap.Error(err))
Expand Down Expand Up @@ -475,7 +474,7 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
}
log.Debug("created stream to store", zap.String("addr", addr))
return nil
})
}, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(3), retry.WithIsRetryableErr(cerror.IsRetryableError))
return
}

Expand Down Expand Up @@ -1100,38 +1099,31 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
regions []*tikv.Region
err error
)
retryErr := retry.Run(50*time.Millisecond, maxRetry,
func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
scanT0 := time.Now()
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
regions, err = s.regionCache.BatchLoadRegionsWithKeyRange(bo, nextSpan.Start, nextSpan.End, limit)
scanRegionsDuration.WithLabelValues(captureAddr).Observe(time.Since(scanT0).Seconds())
if err != nil {
return cerror.WrapError(cerror.ErrPDBatchLoadRegions, err)
}
metas := make([]*metapb.Region, 0, len(regions))
for _, region := range regions {
if region.GetMeta() == nil {
err = cerror.ErrMetaNotInRegion.GenWithStackByArgs()
log.Warn("batch load region", zap.Stringer("span", nextSpan), zap.Error(err))
return err
}
metas = append(metas, region.GetMeta())
}
if !regionspan.CheckRegionsLeftCover(metas, nextSpan) {
err = cerror.ErrRegionsNotCoverSpan.GenWithStackByArgs(nextSpan, metas)
log.Warn("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas), zap.Error(err))
retryErr := retry.Do(ctx, func() error {
scanT0 := time.Now()
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
regions, err = s.regionCache.BatchLoadRegionsWithKeyRange(bo, nextSpan.Start, nextSpan.End, limit)
scanRegionsDuration.WithLabelValues(captureAddr).Observe(time.Since(scanT0).Seconds())
if err != nil {
return cerror.WrapError(cerror.ErrPDBatchLoadRegions, err)
}
metas := make([]*metapb.Region, 0, len(regions))
for _, region := range regions {
if region.GetMeta() == nil {
err = cerror.ErrMetaNotInRegion.GenWithStackByArgs()
log.Warn("batch load region", zap.Stringer("span", nextSpan), zap.Error(err))
return err
}
log.Debug("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas))
return nil
})

metas = append(metas, region.GetMeta())
}
if !regionspan.CheckRegionsLeftCover(metas, nextSpan) {
err = cerror.ErrRegionsNotCoverSpan.GenWithStackByArgs(nextSpan, metas)
log.Warn("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas), zap.Error(err))
return err
}
log.Debug("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas))
return nil
}, retry.WithBackoffMaxDelay(50), retry.WithMaxTries(100), retry.WithIsRetryableErr(cerror.IsRetryableError))
if retryErr != nil {
return retryErr
}
Expand Down
17 changes: 8 additions & 9 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
}()

// wait all regions requested from cdc kv client
err = retry.Run(time.Millisecond*500, 20, func() error {
err = retry.Do(context.Background(), func() error {
count := 0
requestIDs.Range(func(_, _ interface{}) bool {
count++
Expand All @@ -210,7 +210,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
return nil
}
return errors.Errorf("region number %d is not as expected %d", count, regionNum)
})
}, retry.WithBackoffBaseDelay(500), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20))
if err != nil {
b.Error(err)
}
Expand Down Expand Up @@ -288,7 +288,7 @@ func prepareBench(b *testing.B, regionNum int) (
}()

// wait all regions requested from cdc kv client
err = retry.Run(time.Millisecond*500, 20, func() error {
err = retry.Do(context.Background(), func() error {
count := 0
requestIDs.Range(func(_, _ interface{}) bool {
count++
Expand All @@ -298,7 +298,7 @@ func prepareBench(b *testing.B, regionNum int) (
return nil
}
return errors.Errorf("region number %d is not as expected %d", count, regionNum)
})
}, retry.WithBackoffBaseDelay(500), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20))
if err != nil {
b.Error(err)
}
Expand Down Expand Up @@ -373,13 +373,12 @@ func benchmarkSingleWorkerResolvedTs(b *testing.B, clientV2 bool) {
}
}
})

err := retry.Run(time.Millisecond*500, 20, func() error {
err := retry.Do(context.Background(), func() error {
if len(mockSrvCh) == 0 {
return nil
}
return errors.New("not all events are sent yet")
})
}, retry.WithBackoffBaseDelay(500), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20))
if err != nil {
b.Error(err)
}
Expand Down Expand Up @@ -494,14 +493,14 @@ func benchmarkMultipleStoreResolvedTs(b *testing.B, clientV2 bool) {
}
})

err := retry.Run(time.Millisecond*500, 1000, func() error {
err := retry.Do(context.Background(), func() error {
for _, input := range inputs {
if len(input) != 0 {
return errors.New("not all events are sent yet")
}
}
return nil
})
}, retry.WithBackoffBaseDelay(500), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(1000))
if err != nil {
b.Error(err)
}
Expand Down
60 changes: 36 additions & 24 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,13 @@ func newMockServiceSpecificAddr(

// waitRequestID waits request ID larger than the given allocated ID
func waitRequestID(c *check.C, allocatedID uint64) {
err := retry.Run(time.Millisecond*10, 20, func() error {
err := retry.Do(context.Background(), func() error {
if currentRequestID() > allocatedID {
return nil
}
return errors.Errorf("request id %d is not larger than %d", currentRequestID(), allocatedID)
})
}, retry.WithBackoffBaseDelay(10), retry.WithMaxTries(20))

c.Assert(err, check.IsNil)
}

Expand Down Expand Up @@ -1203,14 +1204,15 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) {
// The expected request ids are agnostic because the kv client could retry
// for more than one time, so we wait until the newly started server receives
// requests for both two regions.
err = retry.Run(time.Millisecond*200, 10, func() error {
err = retry.Do(context.Background(), func() error {
_, ok1 := requestIds.Load(regionID3)
_, ok2 := requestIds.Load(regionID4)
if ok1 && ok2 {
return nil
}
return errors.New("waiting for kv client requests received by server")
})
}, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10))

c.Assert(err, check.IsNil)
reqID1, _ := requestIds.Load(regionID3)
reqID2, _ := requestIds.Load(regionID4)
Expand Down Expand Up @@ -1285,12 +1287,13 @@ func (s *etcdSuite) testStreamRecvWithError(c *check.C, failpointStr string) {
waitRequestID(c, baseAllocatedID+1)
initialized1 := mockInitializedEvent(regionID, currentRequestID())
ch1 <- initialized1
err = retry.Run(time.Millisecond*200, 10, func() error {
err = retry.Do(context.Background(), func() error {
if len(ch1) == 0 {
return nil
}
return errors.New("message is not sent")
})
}, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10))

c.Assert(err, check.IsNil)

// another stream will be established, so we notify and wait the first
Expand Down Expand Up @@ -1410,22 +1413,24 @@ func (s *etcdSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) {

// wait request id allocated with: new session, new request
waitRequestID(c, baseAllocatedID+1)
err = retry.Run(time.Millisecond*50, 10, func() error {
err = retry.Do(context.Background(), func() error {
if atomic.LoadUint64(&requestID) == currentRequestID() {
return nil
}
return errors.Errorf("request is not received, requestID: %d, expected: %d",
atomic.LoadUint64(&requestID), currentRequestID())
})
}, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(10))

c.Assert(err, check.IsNil)
initialized1 := mockInitializedEvent(regionID, currentRequestID())
ch1 <- initialized1
err = retry.Run(time.Millisecond*200, 10, func() error {
err = retry.Do(context.Background(), func() error {
if len(ch1) == 0 {
return nil
}
return errors.New("message is not sent")
})
}, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10))

c.Assert(err, check.IsNil)

resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
Expand All @@ -1436,12 +1441,13 @@ func (s *etcdSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) {
},
}}
ch1 <- resolved
err = retry.Run(time.Millisecond*200, 10, func() error {
err = retry.Do(context.Background(), func() error {
if len(ch1) == 0 {
return nil
}
return errors.New("message is not sent")
})
}, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10))

c.Assert(err, check.IsNil)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(\"\")")
c.Assert(err, check.IsNil)
Expand All @@ -1461,22 +1467,24 @@ func (s *etcdSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) {

// wait request id allocated with: new session, new request*2
waitRequestID(c, baseAllocatedID+2)
err = retry.Run(time.Millisecond*50, 10, func() error {
err = retry.Do(context.Background(), func() error {
if atomic.LoadUint64(&requestID) == currentRequestID() {
return nil
}
return errors.Errorf("request is not received, requestID: %d, expected: %d",
atomic.LoadUint64(&requestID), currentRequestID())
})
}, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(10))

c.Assert(err, check.IsNil)
initialized2 := mockInitializedEvent(regionID, currentRequestID())
ch1 <- initialized2
err = retry.Run(time.Millisecond*200, 10, func() error {
err = retry.Do(context.Background(), func() error {
if len(ch1) == 0 {
return nil
}
return errors.New("message is not sent")
})
}, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10))

c.Assert(err, check.IsNil)

resolved = &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
Expand Down Expand Up @@ -1622,20 +1630,22 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) {
cdcClient.Close() //nolint:errcheck
}()

err = retry.Run(time.Millisecond*500, 20, func() error {
err = retry.Do(context.Background(), func() error {
if atomic.LoadInt32(&call) >= versionGenCallBoundary {
return nil
}
return errors.Errorf("version generator is not updated in time, call time %d", atomic.LoadInt32(&call))
})
}, retry.WithBackoffBaseDelay(500), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20))

c.Assert(err, check.IsNil)
err = retry.Run(time.Millisecond*200, 10, func() error {
err = retry.Do(context.Background(), func() error {
_, ok := requestIds.Load(regionID)
if ok {
return nil
}
return errors.New("waiting for kv client requests received by server")
})
}, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10))

c.Assert(err, check.IsNil)
reqID, _ := requestIds.Load(regionID)
initialized := mockInitializedEvent(regionID, reqID.(uint64))
Expand Down Expand Up @@ -2830,13 +2840,14 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
// The second TiKV could start up slowly, which causes the kv client retries
// to TiKV for more than one time, so we can't determine the correct requestID
// here, we must use the real request ID received by TiKV server
err = retry.Run(time.Millisecond*300, 10, func() error {
err = retry.Do(context.Background(), func() error {
_, ok := requestIds.Load(regionID3)
if ok {
return nil
}
return errors.New("waiting for kv client requests received by server")
})
}, retry.WithBackoffBaseDelay(300), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10))

c.Assert(err, check.IsNil)
requestID, _ := requestIds.Load(regionID3)

Expand Down Expand Up @@ -2949,7 +2960,7 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) {
}

// wait for all regions requested from cdc kv client
err = retry.Run(time.Millisecond*200, 20, func() error {
err = retry.Do(context.Background(), func() error {
count := 0
requestIDs.Range(func(_, _ interface{}) bool {
count++
Expand All @@ -2959,7 +2970,8 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) {
return nil
}
return errors.Errorf("region number %d is not as expected %d", count, regionNum)
})
}, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20))

c.Assert(err, check.IsNil)

// send initialized event and a resolved ts event to each region
Expand Down
Loading

0 comments on commit 26adda2

Please sign in to comment.