Skip to content

Commit

Permalink
processor: fix processor doesn't fully exit after p.stop is called (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Jun 28, 2020
1 parent 3b4db2b commit 0c8c869
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ type processor struct {
globalResolvedTs uint64
checkpointTs uint64

ddlPuller puller.Puller
schemaStorage *entry.SchemaStorage
ddlPuller puller.Puller
ddlPullerCancel context.CancelFunc
schemaStorage *entry.SchemaStorage

tsRWriter storage.ProcessorTsRWriter
output chan *model.PolymorphicEvent
Expand Down Expand Up @@ -213,7 +214,8 @@ func newProcessor(
func (p *processor) Run(ctx context.Context) {
wg, cctx := errgroup.WithContext(ctx)
p.wg = wg
ddlPullerCtx := util.PutTableIDInCtx(cctx, 0)
ddlPullerCtx, ddlPullerCancel := context.WithCancel(util.PutTableIDInCtx(cctx, 0))
p.ddlPullerCancel = ddlPullerCancel

wg.Go(func() error {
return p.positionWorker(cctx)
Expand Down Expand Up @@ -862,6 +864,7 @@ func (p *processor) stop(ctx context.Context) error {
for _, tbl := range p.tables {
tbl.cancel()
}
p.ddlPullerCancel()
// mark tables share the same context with its original table, don't need to cancel
p.stateMu.Unlock()
atomic.StoreInt32(&p.stopped, 1)
Expand Down

0 comments on commit 0c8c869

Please sign in to comment.