From 0c8c8699145966194ce148c128027f2c5c969f26 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Sun, 28 Jun 2020 11:00:04 +0800 Subject: [PATCH] processor: fix processor doesn't fully exit after `p.stop` is called (#693) --- cdc/processor.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cdc/processor.go b/cdc/processor.go index e61c6ab5702..9acbd3e2573 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -87,8 +87,9 @@ type processor struct { globalResolvedTs uint64 checkpointTs uint64 - ddlPuller puller.Puller - schemaStorage *entry.SchemaStorage + ddlPuller puller.Puller + ddlPullerCancel context.CancelFunc + schemaStorage *entry.SchemaStorage tsRWriter storage.ProcessorTsRWriter output chan *model.PolymorphicEvent @@ -213,7 +214,8 @@ func newProcessor( func (p *processor) Run(ctx context.Context) { wg, cctx := errgroup.WithContext(ctx) p.wg = wg - ddlPullerCtx := util.PutTableIDInCtx(cctx, 0) + ddlPullerCtx, ddlPullerCancel := context.WithCancel(util.PutTableIDInCtx(cctx, 0)) + p.ddlPullerCancel = ddlPullerCancel wg.Go(func() error { return p.positionWorker(cctx) @@ -862,6 +864,7 @@ func (p *processor) stop(ctx context.Context) error { for _, tbl := range p.tables { tbl.cancel() } + p.ddlPullerCancel() // mark tables share the same context with its original table, don't need to cancel p.stateMu.Unlock() atomic.StoreInt32(&p.stopped, 1)