diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index e1f7a539ae6a..885afc5ed8aa 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -394,6 +394,9 @@ type LoopWatcher struct { loadBatchSize int64 // watchChangeRetryInterval is used to set the retry interval for watching etcd change. watchChangeRetryInterval time.Duration + // updateClientCh is used to update the etcd client. + // It's only used for testing. + updateClientCh chan *clientv3.Client } // NewLoopWatcher creates a new LoopWatcher. @@ -407,6 +410,7 @@ func NewLoopWatcher(ctx context.Context, wg *sync.WaitGroup, client *clientv3.Cl wg: wg, forceLoadCh: make(chan struct{}, 1), isLoadedCh: make(chan error, 1), + updateClientCh: make(chan *clientv3.Client, 1), putFn: putFn, deleteFn: deleteFn, postEventFn: postEventFn, @@ -445,6 +449,9 @@ func (lw *LoopWatcher) StartWatchLoop() { zap.Error(err)) watchStartRevision = nextRevision time.Sleep(lw.watchChangeRetryInterval) + failpoint.Inject("updateClient", func(val failpoint.Value) { + lw.client = <-lw.updateClientCh + }) } } } diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 0ab5d40b6243..4258dc60a2ca 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -318,6 +318,9 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { // Add a new member and check members etcd2 := checkAddEtcdMember(t, cfg1, client1) + defer func() { + etcd2.Close() + }() checkMembers(re, client2, []*embed.Etcd{etcd1, etcd2}) // scale in etcd1 @@ -435,6 +438,7 @@ func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error for { if enableDiscard.Load() { io.Copy(io.Discard, src) + continue } readNum, errRead := src.Read(buffer) if readNum > 0 { @@ -671,6 +675,9 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { suite.NoError(err) checkCache("") + // we use close client and update client in failpoint to simulate the network error and recover + failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/updateClient", "return(true)") + // Case1: restart the etcd server suite.etcd.Close() suite.startEtcd() @@ -681,7 +688,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { suite.client.Close() suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0]) suite.NoError(err) - watcher.client = suite.client + watcher.updateClientCh <- suite.client suite.put("TestWatcherBreak", "2") checkCache("2") @@ -690,7 +697,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0]) suite.NoError(err) suite.put("TestWatcherBreak", "3") - watcher.client = suite.client + watcher.updateClientCh <- suite.client checkCache("3") // Case4: close the etcd client and put a new value with compact @@ -704,7 +711,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { resp2, err := suite.etcd.Server.Compact(suite.ctx, &etcdserverpb.CompactionRequest{Revision: revision}) suite.NoError(err) suite.Equal(revision, resp2.Header.Revision) - watcher.client = suite.client + watcher.updateClientCh <- suite.client checkCache("4") // Case5: there is an error data in cache @@ -713,6 +720,8 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { cache.Unlock() watcher.ForceLoad() checkCache("4") + + failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/updateClient") } func (suite *loopWatcherTestSuite) startEtcd() {