Skip to content

Commit

Permalink
owner: fix cleanup stale tasks before capture info is constructed (#1269
Browse files Browse the repository at this point in the history
) (#1280)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Jan 6, 2021
1 parent 2f9c29d commit c1c91a1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
19 changes: 18 additions & 1 deletion cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"os"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -68,7 +69,8 @@ type Owner struct {
pdClient pd.Client
etcdClient kv.CDCEtcdClient

captures map[model.CaptureID]*model.CaptureInfo
captureLoaded int32
captures map[model.CaptureID]*model.CaptureInfo

adminJobs []model.AdminJob
adminJobsLock sync.Mutex
Expand Down Expand Up @@ -1116,6 +1118,12 @@ func (o *Owner) watchFeedChange(ctx context.Context) {
}

func (o *Owner) run(ctx context.Context) error {
// captureLoaded == 0 means capture information is not built, owner can't
// run normal jobs now.
if atomic.LoadInt32(&o.captureLoaded) == int32(0) {
return nil
}

o.l.Lock()
defer o.l.Unlock()

Expand Down Expand Up @@ -1373,6 +1381,15 @@ func (o *Owner) rebuildCaptureEvents(ctx context.Context, captures map[model.Cap
o.removeCapture(c)
}
}
// captureLoaded is used to check whether the owner can execute cleanup stale tasks job.
// Because at the very beginning of a new owner, it doesn't have capture information in
// memory, cleanup stale tasks could have a false positive (where positive means owner
// should cleanup the stale task of a specific capture). After the first time of capture
// rebuild, even the etcd compaction and watch capture is rerun, we don't need to check
// captureLoaded anymore because existing tasks must belong to a capture which is still
// maintained in owner's memory.
atomic.StoreInt32(&o.captureLoaded, 1)

// clean up stale tasks each time before watch capture event starts,
// for two reasons:
// 1. when a new owner is elected, it must clean up stale task status and positions.
Expand Down
14 changes: 13 additions & 1 deletion cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net/url"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -906,13 +907,24 @@ func (s *ownerSuite) TestCleanUpStaleTasks(c *check.C) {
orphanTables: make(map[model.TableID]model.Ts),
},
}

// capture information is not built, owner.run does nothing
err = owner.run(ctx)
c.Assert(err, check.IsNil)
statuses, err := s.client.GetAllTaskStatus(ctx, changefeed)
c.Assert(err, check.IsNil)
// stale tasks are not cleaned up, since `cleanUpStaleTasks` does not run
c.Assert(len(statuses), check.Equals, 2)
c.Assert(len(owner.captures), check.Equals, 0)

err = owner.rebuildCaptureEvents(ctx, captures)
c.Assert(err, check.IsNil)
c.Assert(len(owner.captures), check.Equals, 1)
c.Assert(owner.captures, check.HasKey, capture.info.ID)
c.Assert(atomic.LoadInt32(&owner.captureLoaded), check.Equals, int32(1))
c.Assert(owner.changeFeeds[changefeed].orphanTables, check.DeepEquals, map[model.TableID]model.Ts{51: 100})
// check stale tasks are cleaned up
statuses, err := s.client.GetAllTaskStatus(ctx, changefeed)
statuses, err = s.client.GetAllTaskStatus(ctx, changefeed)
c.Assert(err, check.IsNil)
c.Assert(len(statuses), check.Equals, 1)
c.Assert(statuses, check.HasKey, capture.info.ID)
Expand Down

0 comments on commit c1c91a1

Please sign in to comment.