From 279136577b538377777e9838ca22454088fffd4f Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 24 Jun 2020 16:15:48 +0800 Subject: [PATCH] processor: fix processor doesn't fully exit after `p.stop` is called --- cdc/processor.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cdc/processor.go b/cdc/processor.go index c978b668ab6..78a1719eecc 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -87,8 +87,9 @@ type processor struct { globalResolvedTs uint64 checkpointTs uint64 - ddlPuller puller.Puller - schemaStorage *entry.SchemaStorage + ddlPuller puller.Puller + ddlPullerCancel context.CancelFunc + schemaStorage *entry.SchemaStorage tsRWriter storage.ProcessorTsRWriter output chan *model.PolymorphicEvent @@ -212,7 +213,8 @@ func newProcessor( func (p *processor) Run(ctx context.Context) { wg, cctx := errgroup.WithContext(ctx) p.wg = wg - ddlPullerCtx := util.PutTableIDInCtx(cctx, 0) + ddlPullerCtx, ddlPullerCancel := context.WithCancel(util.PutTableIDInCtx(cctx, 0)) + p.ddlPullerCancel = ddlPullerCancel wg.Go(func() error { return p.positionWorker(cctx) @@ -856,6 +858,7 @@ func (p *processor) stop(ctx context.Context) error { for _, tbl := range p.tables { tbl.cancel() } + p.ddlPullerCancel() // mark tables share the same context with its original table, don't need to cancel p.stateMu.Unlock() atomic.StoreInt32(&p.stopped, 1)