diff --git a/cdc/owner.go b/cdc/owner.go index 2d4e98e5d90..a68591cbe09 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -20,6 +20,7 @@ import ( "math" "os" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -68,7 +69,8 @@ type Owner struct { pdClient pd.Client etcdClient kv.CDCEtcdClient - captures map[model.CaptureID]*model.CaptureInfo + captureLoaded int32 + captures map[model.CaptureID]*model.CaptureInfo adminJobs []model.AdminJob adminJobsLock sync.Mutex @@ -1116,6 +1118,12 @@ func (o *Owner) watchFeedChange(ctx context.Context) { } func (o *Owner) run(ctx context.Context) error { + // captureLoaded == 0 means capture information is not built, owner can't + // run normal jobs now. + if atomic.LoadInt32(&o.captureLoaded) == int32(0) { + return nil + } + o.l.Lock() defer o.l.Unlock() @@ -1373,6 +1381,15 @@ func (o *Owner) rebuildCaptureEvents(ctx context.Context, captures map[model.Cap o.removeCapture(c) } } + // captureLoaded is used to check whether the owner can execute cleanup stale tasks job. + // Because at the very beginning of a new owner, it doesn't have capture information in + // memory, cleanup stale tasks could have a false positive (where positive means owner + // should cleanup the stale task of a specific capture). After the first time of capture + // rebuild, even the etcd compaction and watch capture is rerun, we don't need to check + // captureLoaded anymore because existing tasks must belong to a capture which is still + // maintained in owner's memory. + atomic.StoreInt32(&o.captureLoaded, 1) + // clean up stale tasks each time before watch capture event starts, // for two reasons: // 1. when a new owner is elected, it must clean up stale task status and positions. diff --git a/cdc/owner_test.go b/cdc/owner_test.go index cc8dd4eeefe..1002d781592 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -18,6 +18,7 @@ import ( "fmt" "net/url" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -906,13 +907,24 @@ func (s *ownerSuite) TestCleanUpStaleTasks(c *check.C) { orphanTables: make(map[model.TableID]model.Ts), }, } + + // capture information is not built, owner.run does nothing + err = owner.run(ctx) + c.Assert(err, check.IsNil) + statuses, err := s.client.GetAllTaskStatus(ctx, changefeed) + c.Assert(err, check.IsNil) + // stale tasks are not cleaned up, since `cleanUpStaleTasks` does not run + c.Assert(len(statuses), check.Equals, 2) + c.Assert(len(owner.captures), check.Equals, 0) + err = owner.rebuildCaptureEvents(ctx, captures) c.Assert(err, check.IsNil) c.Assert(len(owner.captures), check.Equals, 1) c.Assert(owner.captures, check.HasKey, capture.info.ID) + c.Assert(atomic.LoadInt32(&owner.captureLoaded), check.Equals, int32(1)) c.Assert(owner.changeFeeds[changefeed].orphanTables, check.DeepEquals, map[model.TableID]model.Ts{51: 100}) // check stale tasks are cleaned up - statuses, err := s.client.GetAllTaskStatus(ctx, changefeed) + statuses, err = s.client.GetAllTaskStatus(ctx, changefeed) c.Assert(err, check.IsNil) c.Assert(len(statuses), check.Equals, 1) c.Assert(statuses, check.HasKey, capture.info.ID)