diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 04b06e1aa30..06aa598ca68 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -188,6 +188,10 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { return } + unhealthyTimeout := watchLoopUnhealthyTimeout + failpoint.Inject("fastTick", func() { + unhealthyTimeout = 5 * time.Second + }) ticker := time.NewTicker(etcdutil.RequestProgressInterval) defer ticker.Stop() lastReceivedResponseTime := time.Now() @@ -213,7 +217,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { // When etcd is not available, the watcher.Watch will block, // so we check the etcd availability first. if !etcdutil.IsHealthy(serverCtx, ls.client) { - if time.Since(lastReceivedResponseTime) > watchLoopUnhealthyTimeout { + if time.Since(lastReceivedResponseTime) > unhealthyTimeout { log.Error("the connect of leadership watcher is unhealthy", zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index 1021462a902..6825c1602f3 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -16,6 +16,7 @@ package election import ( "context" + "fmt" "testing" "time" @@ -156,6 +157,29 @@ func TestExitWatch(t *testing.T) { time.Sleep(500 * time.Millisecond) server.Close() }) + // Case6: transfer leader without client reconnection. + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + cfg1 := server.Config() + cfg2 := etcdutil.NewTestSingleConfig(t) + cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) + cfg2.ClusterState = embed.ClusterStateFlagExisting + peerURL := cfg2.LPUrls[0].String() + addResp, err := etcdutil.AddEtcdMember(client, []string{peerURL}) + re.NoError(err) + etcd2, err := embed.StartEtcd(cfg2) + re.NoError(err) + re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) + <-etcd2.Server.ReadyNotify() + ep := cfg2.LCUrls[0].String() + client1, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + re.NoError(err) + + server.Server.HardStop() + client1.Delete(context.Background(), leaderKey) + }) + // TODO: add test to simulate the case that pd leader is io hang. re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) }