diff --git a/cdc/processor.go b/cdc/processor.go index e61c6ab5702..9acbd3e2573 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -87,8 +87,9 @@ type processor struct { globalResolvedTs uint64 checkpointTs uint64 - ddlPuller puller.Puller - schemaStorage *entry.SchemaStorage + ddlPuller puller.Puller + ddlPullerCancel context.CancelFunc + schemaStorage *entry.SchemaStorage tsRWriter storage.ProcessorTsRWriter output chan *model.PolymorphicEvent @@ -213,7 +214,8 @@ func newProcessor( func (p *processor) Run(ctx context.Context) { wg, cctx := errgroup.WithContext(ctx) p.wg = wg - ddlPullerCtx := util.PutTableIDInCtx(cctx, 0) + ddlPullerCtx, ddlPullerCancel := context.WithCancel(util.PutTableIDInCtx(cctx, 0)) + p.ddlPullerCancel = ddlPullerCancel wg.Go(func() error { return p.positionWorker(cctx) @@ -862,6 +864,7 @@ func (p *processor) stop(ctx context.Context) error { for _, tbl := range p.tables { tbl.cancel() } + p.ddlPullerCancel() // mark tables share the same context with its original table, don't need to cancel p.stateMu.Unlock() atomic.StoreInt32(&p.stopped, 1)