Skip to content

Commit

Permalink
kv_client: fix unit tests deadlocked when panicking (#2050)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix authored Jun 20, 2021
1 parent 8d14960 commit f74a302
Showing 1 changed file with 23 additions and 22 deletions.
45 changes: 23 additions & 22 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,9 @@ func newMockServiceSpecificAddr(
cdcpb.RegisterChangeDataServer(grpcServer, srv)
wg.Add(1)
go func() {
defer wg.Done()
err := grpcServer.Serve(lis)
c.Assert(err, check.IsNil)
wg.Done()
}()
return
}
Expand Down Expand Up @@ -336,9 +336,9 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
wg.Done()
}()

// new session, request to store 1, request to store 2
Expand Down Expand Up @@ -425,10 +425,10 @@ func (s *etcdSuite) TestRecvLargeMessageSize(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// new session, new request
Expand Down Expand Up @@ -514,10 +514,10 @@ func (s *etcdSuite) TestHandleError(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait request id allocated with: new session, new request
Expand Down Expand Up @@ -670,10 +670,10 @@ func (s *etcdSuite) TestCompatibilityWithSameConn(c *check.C) {
var wg2 sync.WaitGroup
wg2.Add(1)
go func() {
defer wg2.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(cerror.ErrVersionIncompatible.Equal(err), check.IsTrue)
cdcClient.Close() //nolint:errcheck
wg2.Done()
}()

// wait request id allocated with: new session, new request
Expand Down Expand Up @@ -729,10 +729,10 @@ func (s *etcdSuite) testHandleFeedEvent(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait request id allocated with: new session, new request
Expand Down Expand Up @@ -1136,6 +1136,7 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
defer wg.Wait()
defer cancel()

server1Stopped := make(chan struct{})
ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
Expand Down Expand Up @@ -1175,10 +1176,10 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

var requestIds sync.Map
Expand Down Expand Up @@ -1284,10 +1285,10 @@ func (s *etcdSuite) testStreamRecvWithError(c *check.C, failpointStr string) {
eventCh := make(chan *model.RegionFeedEvent, 40)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait request id allocated with: new session, new request
Expand Down Expand Up @@ -1633,10 +1634,10 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

err = retry.Run(time.Millisecond*500, 20, func() error {
Expand Down Expand Up @@ -1726,10 +1727,10 @@ func (s *etcdSuite) TestNoPendingRegionError(c *check.C) {
var wg2 sync.WaitGroup
wg2.Add(1)
go func() {
defer wg2.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(cerror.ErrNoPendingRegion.Equal(err), check.IsTrue)
cdcClient.Close() //nolint:errcheck
wg2.Done()
}()

// wait request id allocated with: new session, new request
Expand Down Expand Up @@ -1782,10 +1783,10 @@ func (s *etcdSuite) TestDropStaleRequest(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait request id allocated with: new session, new request
Expand Down Expand Up @@ -1889,10 +1890,10 @@ func (s *etcdSuite) TestResolveLock(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait request id allocated with: new session, new request
Expand Down Expand Up @@ -1987,10 +1988,10 @@ func (s *etcdSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Change
var clientWg sync.WaitGroup
clientWg.Add(1)
go func() {
defer clientWg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(err, check.Equals, errUnreachable)
cdcClient.Close() //nolint:errcheck
clientWg.Done()
}()

// wait request id allocated with: new session, new request
Expand Down Expand Up @@ -2121,10 +2122,10 @@ func (s *etcdSuite) TestEventAfterFeedStop(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait request id allocated with: new session, new request
Expand Down Expand Up @@ -2265,10 +2266,10 @@ func (s *etcdSuite) TestOutOfRegionRangeEvent(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait request id allocated with: new session, new request
Expand Down Expand Up @@ -2493,10 +2494,10 @@ func (s *etcdSuite) TestResolveLockNoCandidate(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait request id allocated with: new session, new request
Expand Down Expand Up @@ -2585,10 +2586,10 @@ func (s *etcdSuite) TestFailRegionReentrant(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait request id allocated with: new session, new request
Expand Down Expand Up @@ -2670,10 +2671,10 @@ func (s *etcdSuite) TestClientV1UnlockRangeReentrant(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait the second region is scheduled
Expand Down Expand Up @@ -2744,10 +2745,10 @@ func (s *etcdSuite) testClientErrNoPendingRegion(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

baseAllocatedID := currentRequestID()
Expand Down Expand Up @@ -2818,10 +2819,10 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

baseAllocatedID := currentRequestID()
Expand Down Expand Up @@ -2974,10 +2975,10 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 100)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// the kv client is blocked by failpoint injection, and after region has split
Expand Down Expand Up @@ -3087,10 +3088,10 @@ func (s *etcdSuite) TestEvTimeUpdate(c *check.C) {
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
defer wg.Done()
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait request id allocated with: new session, new request
Expand Down

0 comments on commit f74a302

Please sign in to comment.