diff --git a/ddl/ddl.go b/ddl/ddl.go index 21edcd8590d8e..d5336e360957d 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -476,6 +476,14 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { // If RunWorker is true, we need campaign owner and do DDL job. // Otherwise, we needn't do that. if RunWorker { + d.ownerManager.SetBeOwnerHook(func() { + var err error + d.ddlSeqNumMu.seqNum, err = d.GetNextDDLSeqNum() + if err != nil { + logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err)) + } + }) + err := d.ownerManager.CampaignOwner() if err != nil { return errors.Trace(err) @@ -497,11 +505,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { asyncNotify(worker.ddlJobCh) } - d.ddlSeqNumMu.seqNum, err = d.GetNextDDLSeqNum() - if err != nil { - return err - } - go d.schemaSyncer.StartCleanWork() if config.TableLockEnabled() { d.wg.Add(1) diff --git a/owner/manager.go b/owner/manager.go index f90dd4cebdd2d..1f4eae6c11786 100644 --- a/owner/manager.go +++ b/owner/manager.go @@ -54,6 +54,9 @@ type Manager interface { Cancel() // RequireOwner requires the ownerManager is owner. RequireOwner(ctx context.Context) error + + // SetBeOwnerHook sets a hook. The hook is called before becoming an owner. + SetBeOwnerHook(hook func()) } const ( @@ -68,16 +71,17 @@ type DDLOwnerChecker interface { // ownerManager represents the structure which is used for electing owner. type ownerManager struct { - id string // id is the ID of the manager. - key string - ctx context.Context - prompt string - logPrefix string - logCtx context.Context - etcdCli *clientv3.Client - cancel context.CancelFunc - elec unsafe.Pointer - wg sync.WaitGroup + id string // id is the ID of the manager. + key string + ctx context.Context + prompt string + logPrefix string + logCtx context.Context + etcdCli *clientv3.Client + cancel context.CancelFunc + elec unsafe.Pointer + wg sync.WaitGroup + beOwnerHook func() } // NewOwnerManager creates a new Manager. @@ -117,6 +121,10 @@ func (m *ownerManager) RequireOwner(ctx context.Context) error { return nil } +func (m *ownerManager) SetBeOwnerHook(hook func()) { + m.beOwnerHook = hook +} + // ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing. var ManagerSessionTTL = 60 @@ -166,6 +174,9 @@ func (m *ownerManager) ResignOwner(ctx context.Context) error { } func (m *ownerManager) toBeOwner(elec *concurrency.Election) { + if m.beOwnerHook != nil { + m.beOwnerHook() + } atomic.StorePointer(&m.elec, unsafe.Pointer(elec)) } diff --git a/owner/mock.go b/owner/mock.go index c13ff88f3fdf6..559c46650d080 100644 --- a/owner/mock.go +++ b/owner/mock.go @@ -27,9 +27,10 @@ var _ Manager = &mockManager{} // It's used for local store and testing. // So this worker will always be the owner. type mockManager struct { - owner int32 - id string // id is the ID of manager. - cancel context.CancelFunc + owner int32 + id string // id is the ID of manager. + cancel context.CancelFunc + beOwnerHook func() } // NewMockManager creates a new mock Manager. @@ -52,6 +53,9 @@ func (m *mockManager) IsOwner() bool { } func (m *mockManager) toBeOwner() { + if m.beOwnerHook != nil { + m.beOwnerHook() + } atomic.StoreInt32(&m.owner, 1) } @@ -91,3 +95,7 @@ func (m *mockManager) ResignOwner(ctx context.Context) error { func (m *mockManager) RequireOwner(context.Context) error { return nil } + +func (m *mockManager) SetBeOwnerHook(hook func()) { + m.beOwnerHook = hook +} diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 2832abe60ecb1..0b217583bb9b3 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -860,7 +860,9 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti pagingRange := resp.pbResp.Range // only paging requests need to calculate the next ranges if pagingRange == nil { - return nil, errors.New("lastRange in paging should not be nil") + // If the storage engine doesn't support paging protocol, it should have return all the region data. + // So we finish here. + return nil, nil } // calculate next ranges and grow the paging size task.ranges = worker.calculateRemain(task.ranges, pagingRange, worker.req.Desc)