Skip to content

Commit

Permalink
add a atomic value to prevant concurrent flush
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored and ti-chi-bot committed Jul 8, 2022
1 parent 72260e5 commit ed49c24
Showing 1 changed file with 11 additions and 0 deletions.
11 changes: 11 additions & 0 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type blackHoleSink struct {
statistics *metrics.Statistics
accumulated uint64
lastAccumulated uint64
flushing uint64 // 1 means flushing, 0 means not flushing
}

var _ Sink = (*blackHoleSink)(nil)
Expand All @@ -56,6 +57,15 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model
func (b *blackHoleSink) FlushRowChangedEvents(
ctx context.Context, _ model.TableID, resolved model.ResolvedTs,
) (model.ResolvedTs, error) {
// In black hole sink use a atomic lock to prevent
// concurrent flush to avoid statistics error.
if !atomic.CompareAndSwapUint64(&b.flushing, 0, 1) {
return model.ResolvedTs{
Mode: resolved.Mode,
Ts: 0,
BatchID: resolved.BatchID,
}, nil
}
log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolved.Ts))
err := b.statistics.RecordBatchExecution(func() (int, error) {
// TODO: add some random replication latency
Expand All @@ -65,6 +75,7 @@ func (b *blackHoleSink) FlushRowChangedEvents(
return int(batchSize), nil
})
b.statistics.PrintStatus(ctx)
atomic.StoreUint64(&b.flushing, 0)
return resolved, err
}

Expand Down

0 comments on commit ed49c24

Please sign in to comment.