diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index 85712f256d8..0379acf8e08 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -35,6 +35,7 @@ type blackHoleSink struct { statistics *metrics.Statistics accumulated uint64 lastAccumulated uint64 + flushing uint64 // 1 means flushing, 0 means not flushing } var _ Sink = (*blackHoleSink)(nil) @@ -56,6 +57,15 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model func (b *blackHoleSink) FlushRowChangedEvents( ctx context.Context, _ model.TableID, resolved model.ResolvedTs, ) (model.ResolvedTs, error) { + // In black hole sink use a atomic lock to prevent + // concurrent flush to avoid statistics error. + if !atomic.CompareAndSwapUint64(&b.flushing, 0, 1) { + return model.ResolvedTs{ + Mode: resolved.Mode, + Ts: 0, + BatchID: resolved.BatchID, + }, nil + } log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolved.Ts)) err := b.statistics.RecordBatchExecution(func() (int, error) { // TODO: add some random replication latency @@ -65,6 +75,7 @@ func (b *blackHoleSink) FlushRowChangedEvents( return int(batchSize), nil }) b.statistics.PrintStatus(ctx) + atomic.StoreUint64(&b.flushing, 0) return resolved, err }