Skip to content

Commit

Permalink
changefeed: fix goroutine leak when changefeed is paused or stopped (#…
Browse files Browse the repository at this point in the history
…1066) (#1075)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Nov 13, 2020
1 parent 25cca48 commit ceab68d
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 7 deletions.
10 changes: 7 additions & 3 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ type changeFeed struct {
lastRebalanceTime time.Time

etcdCli kv.CDCEtcdClient

// context cancel function for all internal goroutines
cancel context.CancelFunc
}

// String implements fmt.Stringer interface.
Expand Down Expand Up @@ -943,18 +946,19 @@ func (c *changeFeed) startSyncPointTicker(ctx context.Context, interval time.Dur

func (c *changeFeed) Close() {
err := c.ddlHandler.Close()
if err != nil {
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("failed to close ddl handler", zap.Error(err))
}
err = c.sink.Close()
if err != nil {
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("failed to close owner sink", zap.Error(err))
}
if c.syncpointStore != nil {
err = c.syncpointStore.Close()
if err != nil {
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("failed to close owner sink", zap.Error(err))
}
}
c.cancel()
log.Info("changefeed closed", zap.String("id", c.id))
}
7 changes: 4 additions & 3 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,13 +342,13 @@ func (o *Owner) newChangeFeed(
select {
case <-ctx.Done():
case err = <-errCh:
cancel()
}
if err != nil && errors.Cause(err) != context.Canceled {
log.Error("error on running owner", zap.Error(err))
log.Error("error on running changefeed", zap.Error(err), zap.String("changefeed", id))
} else {
log.Info("owner exited")
log.Info("changefeed exited", zap.String("changfeed", id))
}
cancel()
}()

err = primarySink.Initialize(ctx, sinkTableInfo)
Expand Down Expand Up @@ -394,6 +394,7 @@ func (o *Owner) newChangeFeed(
sink: primarySink,
cyclicEnabled: info.Config.Cyclic.IsEnabled(),
lastRebalanceTime: time.Now(),
cancel: cancel,
}
return cf, nil
}
Expand Down
16 changes: 16 additions & 0 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ func (s *ownerSuite) TestHandleAdmin(c *check.C) {
cancel: cancel,
wg: errg,
},
cancel: cancel,
}
errCh := make(chan error, 1)
sink, err := sink.NewSink(ctx, cfID, "blackhole://", f, replicaConf, map[string]string{}, errCh)
Expand Down Expand Up @@ -537,6 +538,15 @@ func (s *ownerSuite) TestHandleAdmin(c *check.C) {
st, _, err := owner.etcdClient.GetChangeFeedStatus(ctx, cfID)
c.Assert(err, check.IsNil)
c.Assert(st.AdminJobType, check.Equals, model.AdminStop)
// check changefeed context is canceled
select {
case <-cctx.Done():
default:
c.Fatal("changefeed context is expected canceled")
}

cctx, cancel = context.WithCancel(ctx)
sampleCF.cancel = cancel

c.Assert(owner.EnqueueJob(model.AdminJob{CfID: cfID, Type: model.AdminResume}), check.IsNil)
c.Assert(owner.handleAdminJob(ctx), check.IsNil)
Expand Down Expand Up @@ -568,6 +578,12 @@ func (s *ownerSuite) TestHandleAdmin(c *check.C) {
st, _, err = owner.etcdClient.GetChangeFeedStatus(ctx, cfID)
c.Assert(err, check.IsNil)
c.Assert(st.AdminJobType, check.Equals, model.AdminRemove)
// check changefeed context is canceled
select {
case <-cctx.Done():
default:
c.Fatal("changefeed context is expected canceled")
}
}

func (s *ownerSuite) TestChangefeedApplyDDLJob(c *check.C) {
Expand Down
4 changes: 3 additions & 1 deletion cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,14 @@ func (p *processor) positionWorker(ctx context.Context) error {
if inErr != nil {
if errors.Cause(inErr) != context.Canceled {
logError := log.Error
errField := zap.Error(inErr)
if cerror.ErrAdminStopProcessor.Equal(inErr) {
logError = log.Warn
errField = zap.String("error", inErr.Error())
}
logError(
"update info failed",
zap.String("changefeed", p.changefeedID), zap.Error(inErr),
zap.String("changefeed", p.changefeedID), errField,
)
}
if p.isStopped() || cerror.ErrAdminStopProcessor.Equal(inErr) {
Expand Down

0 comments on commit ceab68d

Please sign in to comment.