From f74a3029e1967190f61b37e408d1ef2605bbb984 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Sun, 20 Jun 2021 18:56:52 +0800 Subject: [PATCH] kv_client: fix unit tests deadlocked when panicking (#2050) --- cdc/kv/client_test.go | 45 ++++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 9fe034b71f3..fb58e847084 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -282,9 +282,9 @@ func newMockServiceSpecificAddr( cdcpb.RegisterChangeDataServer(grpcServer, srv) wg.Add(1) go func() { + defer wg.Done() err := grpcServer.Serve(lis) c.Assert(err, check.IsNil) - wg.Done() }() return } @@ -336,9 +336,9 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) - wg.Done() }() // new session, request to store 1, request to store 2 @@ -425,10 +425,10 @@ func (s *etcdSuite) TestRecvLargeMessageSize(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() // new session, new request @@ -514,10 +514,10 @@ func (s *etcdSuite) TestHandleError(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() // wait request id allocated with: new session, new request @@ -670,10 +670,10 @@ func (s *etcdSuite) TestCompatibilityWithSameConn(c *check.C) { var wg2 sync.WaitGroup wg2.Add(1) go func() { + defer wg2.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(cerror.ErrVersionIncompatible.Equal(err), check.IsTrue) cdcClient.Close() //nolint:errcheck - wg2.Done() }() // wait request id allocated with: new session, new request @@ -729,10 +729,10 @@ func (s *etcdSuite) testHandleFeedEvent(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() // wait request id allocated with: new session, new request @@ -1136,6 +1136,7 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} defer wg.Wait() + defer cancel() server1Stopped := make(chan struct{}) ch1 := make(chan *cdcpb.ChangeDataEvent, 10) @@ -1175,10 +1176,10 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() var requestIds sync.Map @@ -1284,10 +1285,10 @@ func (s *etcdSuite) testStreamRecvWithError(c *check.C, failpointStr string) { eventCh := make(chan *model.RegionFeedEvent, 40) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() // wait request id allocated with: new session, new request @@ -1633,10 +1634,10 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() err = retry.Run(time.Millisecond*500, 20, func() error { @@ -1726,10 +1727,10 @@ func (s *etcdSuite) TestNoPendingRegionError(c *check.C) { var wg2 sync.WaitGroup wg2.Add(1) go func() { + defer wg2.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(cerror.ErrNoPendingRegion.Equal(err), check.IsTrue) cdcClient.Close() //nolint:errcheck - wg2.Done() }() // wait request id allocated with: new session, new request @@ -1782,10 +1783,10 @@ func (s *etcdSuite) TestDropStaleRequest(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() // wait request id allocated with: new session, new request @@ -1889,10 +1890,10 @@ func (s *etcdSuite) TestResolveLock(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() // wait request id allocated with: new session, new request @@ -1987,10 +1988,10 @@ func (s *etcdSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Change var clientWg sync.WaitGroup clientWg.Add(1) go func() { + defer clientWg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(err, check.Equals, errUnreachable) cdcClient.Close() //nolint:errcheck - clientWg.Done() }() // wait request id allocated with: new session, new request @@ -2121,10 +2122,10 @@ func (s *etcdSuite) TestEventAfterFeedStop(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() // wait request id allocated with: new session, new request @@ -2265,10 +2266,10 @@ func (s *etcdSuite) TestOutOfRegionRangeEvent(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() // wait request id allocated with: new session, new request @@ -2493,10 +2494,10 @@ func (s *etcdSuite) TestResolveLockNoCandidate(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() // wait request id allocated with: new session, new request @@ -2585,10 +2586,10 @@ func (s *etcdSuite) TestFailRegionReentrant(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() // wait request id allocated with: new session, new request @@ -2670,10 +2671,10 @@ func (s *etcdSuite) TestClientV1UnlockRangeReentrant(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() // wait the second region is scheduled @@ -2744,10 +2745,10 @@ func (s *etcdSuite) testClientErrNoPendingRegion(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() baseAllocatedID := currentRequestID() @@ -2818,10 +2819,10 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() baseAllocatedID := currentRequestID() @@ -2974,10 +2975,10 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 100) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() // the kv client is blocked by failpoint injection, and after region has split @@ -3087,10 +3088,10 @@ func (s *etcdSuite) TestEvTimeUpdate(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { + defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) cdcClient.Close() //nolint:errcheck - wg.Done() }() // wait request id allocated with: new session, new request