diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 0e4555040b4..e10650704e3 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -119,7 +119,7 @@ func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...cl finalCmps = append(finalCmps, clientv3.Compare(clientv3.CreateRevision(ls.leaderKey), "=", 0)) resp, err := kv.NewSlowLogTxn(ls.client). If(finalCmps...). - Then(clientv3.OpPut(ls.leaderKey, leaderData, clientv3.WithLease(newLease.ID))). + Then(clientv3.OpPut(ls.leaderKey, leaderData, clientv3.WithLease(newLease.ID.Load().(clientv3.LeaseID)))). Commit() log.Info("check campaign resp", zap.Any("resp", resp)) if err != nil { diff --git a/pkg/election/lease.go b/pkg/election/lease.go index f7542bff042..a0db045256f 100644 --- a/pkg/election/lease.go +++ b/pkg/election/lease.go @@ -42,7 +42,7 @@ type lease struct { // etcd client and lease client *clientv3.Client lease clientv3.Lease - ID clientv3.LeaseID + ID atomic.Value // store as clientv3.LeaseID // leaseTimeout and expireTime are used to control the lease's lifetime leaseTimeout time.Duration expireTime atomic.Value @@ -64,7 +64,7 @@ func (l *lease) Grant(leaseTimeout int64) error { log.Warn("lease grants too slow", zap.Duration("cost", cost), zap.String("purpose", l.Purpose)) } log.Info("lease granted", zap.Int64("lease-id", int64(leaseResp.ID)), zap.Int64("lease-timeout", leaseTimeout), zap.String("purpose", l.Purpose)) - l.ID = leaseResp.ID + l.ID.Store(leaseResp.ID) l.leaseTimeout = time.Duration(leaseTimeout) * time.Second l.expireTime.Store(start.Add(time.Duration(leaseResp.TTL) * time.Second)) return nil @@ -80,7 +80,11 @@ func (l *lease) Close() error { // Try to revoke lease to make subsequent elections faster. ctx, cancel := context.WithTimeout(l.client.Ctx(), revokeLeaseTimeout) defer cancel() - l.lease.Revoke(ctx, l.ID) + var leaseID clientv3.LeaseID + if l.ID.Load() != nil { + leaseID = l.ID.Load().(clientv3.LeaseID) + } + l.lease.Revoke(ctx, leaseID) return l.lease.Close() } @@ -145,7 +149,11 @@ func (l *lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-c start := time.Now() ctx1, cancel := context.WithTimeout(ctx, l.leaseTimeout) defer cancel() - res, err := l.lease.KeepAliveOnce(ctx1, l.ID) + var leaseID clientv3.LeaseID + if l.ID.Load() != nil { + leaseID = l.ID.Load().(clientv3.LeaseID) + } + res, err := l.lease.KeepAliveOnce(ctx1, leaseID) if err != nil { log.Warn("lease keep alive failed", zap.String("purpose", l.Purpose), errs.ZapError(err)) return diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 7034aa31547..93babbbfb44 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -208,7 +208,7 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() { _, _, err := client.GetTS(ctx) return err == nil }) - // Transfer leader to trigger the TSO resetting. + // Resign leader to trigger the TSO resetting. re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)")) oldLeaderName := suite.cluster.WaitLeader() err := suite.cluster.GetServer(oldLeaderName).ResignLeader()