From 94bd690aeef033045db9c6eaaa336cb809f20fd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Wed, 8 Jun 2022 20:12:30 +0800 Subject: [PATCH] pkg/chann(ticdc): prevent unbounded channel may causing cpu spin (#5749) close pingcap/tiflow#5748 --- pkg/chann/chann.go | 143 ++++++++++++++++++++++++---------------- pkg/chann/chann_test.go | 70 +++++++++++++++----- 2 files changed, 139 insertions(+), 74 deletions(-) diff --git a/pkg/chann/chann.go b/pkg/chann/chann.go index d4720d45eb2..2d39eeeda86 100644 --- a/pkg/chann/chann.go +++ b/pkg/chann/chann.go @@ -48,6 +48,12 @@ // Furthermore, all channels provides methods to send (In()), // receive (Out()), and close (Close()). // +// An unbounded channel is not a buffered channel with infinite capacity, +// and they have different memory model semantics in terms of receiving +// a value: The recipient of a buffered channel is immediately available +// after a send is complete. However, the recipient of an unbounded channel +// may be available within a bounded time frame after a send is complete. +// // Note that to close a channel, must use Close() method instead of the // language built-in method // Two additional methods: ApproxLen and Cap returns the current status @@ -93,6 +99,7 @@ func Cap(n int) Opt { // unbuffered, or unbounded. To create a new channel, use New to allocate // one, and use Cap to configure the capacity of the channel. type Chann[T any] struct { + q []T in, out chan T close chan struct{} cfg *config @@ -127,7 +134,7 @@ func New[T any](opts ...Opt) *Chann[T] { for _, o := range opts { o(cfg) } - ch := &Chann[T]{cfg: cfg} + ch := &Chann[T]{cfg: cfg, close: make(chan struct{})} switch ch.cfg.typ { case unbuffered: ch.in = make(chan T) @@ -138,63 +145,7 @@ func New[T any](opts ...Opt) *Chann[T] { case unbounded: ch.in = make(chan T, 16) ch.out = make(chan T, 16) - ch.close = make(chan struct{}) - ready := make(chan struct{}) - var nilT T - - go func() { - q := make([]T, 0, 1<<10) - ready <- struct{}{} - for { - select { - case e, ok := <-ch.in: - if !ok { - panic("chann: send-only channel ch.In() closed unexpectedly") - } - atomic.AddInt64(&ch.cfg.len, 1) - q = append(q, e) - case <-ch.close: - goto closed - } - - for len(q) > 0 { - select { - case ch.out <- q[0]: - atomic.AddInt64(&ch.cfg.len, -1) - q[0] = nilT - q = q[1:] - case e, ok := <-ch.in: - if !ok { - panic("chann: send-only channel ch.In() closed unexpectedly") - } - atomic.AddInt64(&ch.cfg.len, 1) - q = append(q, e) - case <-ch.close: - goto closed - } - } - if cap(q) < 1<<5 { - q = make([]T, 0, 1<<10) - } - } - - closed: - close(ch.in) - for e := range ch.in { - q = append(q, e) - } - for len(q) > 0 { - select { - case ch.out <- q[0]: - q[0] = nilT // de-reference earlier to help GC - q = q[1:] - default: - } - } - close(ch.out) - close(ch.close) - }() - <-ready + go ch.unboundedProcessing() } return ch } @@ -213,11 +164,87 @@ func (ch *Chann[T]) Close() { switch ch.cfg.typ { case buffered, unbuffered: close(ch.in) + close(ch.close) default: ch.close <- struct{}{} } } +// unboundedProcessing is a processing loop that implements unbounded +// channel semantics. +func (ch *Chann[T]) unboundedProcessing() { + var nilT T + + ch.q = make([]T, 0, 1<<10) + for { + select { + case e, ok := <-ch.in: + if !ok { + panic("chann: send-only channel ch.In() closed unexpectedly") + } + atomic.AddInt64(&ch.cfg.len, 1) + ch.q = append(ch.q, e) + case <-ch.close: + ch.unboundedTerminate() + return + } + + for len(ch.q) > 0 { + select { + case ch.out <- ch.q[0]: + atomic.AddInt64(&ch.cfg.len, -1) + ch.q[0] = nilT + ch.q = ch.q[1:] + case e, ok := <-ch.in: + if !ok { + panic("chann: send-only channel ch.In() closed unexpectedly") + } + atomic.AddInt64(&ch.cfg.len, 1) + ch.q = append(ch.q, e) + case <-ch.close: + ch.unboundedTerminate() + return + } + } + if cap(ch.q) < 1<<5 { + ch.q = make([]T, 0, 1<<10) + } + } +} + +// unboundedTerminate terminates the unbounde channel's processing loop +// and make sure all unprocessed elements be consumed if there is +// a pending receiver. +func (ch *Chann[T]) unboundedTerminate() { + var zeroT T + + close(ch.in) + for e := range ch.in { + ch.q = append(ch.q, e) + } + for len(ch.q) > 0 { + select { + // NOTICE: If no receiver is receiving the element, it will be blocked. + // So the consumer have to deal with all the elements in the queue. + case ch.out <- ch.q[0]: + } + ch.q[0] = zeroT // de-reference earlier to help GC + ch.q = ch.q[1:] + } + close(ch.out) + close(ch.close) +} + +// isClose reports the close status of a channel. +func (ch *Chann[T]) isClosed() bool { + select { + case <-ch.close: + return true + default: + return false + } +} + // Len returns an approximation of the length of the channel. // // Note that in a concurrent scenario, the returned length of a channel diff --git a/pkg/chann/chann_test.go b/pkg/chann/chann_test.go index e9fe8127a00..a127f1d10d3 100644 --- a/pkg/chann/chann_test.go +++ b/pkg/chann/chann_test.go @@ -224,6 +224,8 @@ func TestNonblockRecvRace(t *testing.T) { } } +const internalCacheSize = 16 + 1<<10 + // This test checks that select acts on the state of the channels at one // moment in the execution, not over a smeared time window. // In the test, one goroutine does: @@ -237,15 +239,19 @@ func TestNonblockRecvRace(t *testing.T) { // always receive from one or the other. It must never execute the default case. func TestNonblockSelectRace(t *testing.T) { n := 1000 - if testing.Short() { - n = 1000 - } done := New[bool](Cap(1)) for i := 0; i < n; i++ { c1 := New[int]() c2 := New[int]() + // The input channel of an unbounded buffer have an internal + // cache queue. When the input channel and the internal cache + // queue both gets full, we are certain that once the next send + // is complete, the out will be available for sure hence the + // waiting time of a receive is bounded. + for i := 0; i < internalCacheSize; i++ { + c1.In() <- 1 + } c1.In() <- 1 - time.Sleep(time.Millisecond) go func() { runtime.Gosched() select { @@ -257,44 +263,58 @@ func TestNonblockSelectRace(t *testing.T) { } done.In() <- true }() + // Same for c2 + for i := 0; i < internalCacheSize; i++ { + c2.In() <- 1 + } c2.In() <- 1 select { - case <-c2.Out(): + case <-c1.Out(): default: } require.Truef(t, <-done.Out(), "no chan is ready") c1.Close() + // Drop all events. + for range c1.Out() { + } c2.Close() + for range c2.Out() { + } } } // Same as TestNonblockSelectRace, but close(c2) replaces c2 <- 1. func TestNonblockSelectRace2(t *testing.T) { - n := 100000 - if testing.Short() { - n = 1000 - } + n := 1000 done := make(chan bool, 1) for i := 0; i < n; i++ { - c1 := make(chan int, 1) - c2 := make(chan int) - c1 <- 1 + c1 := New[int]() + c2 := New[int]() + // See TestNonblockSelectRace. + for i := 0; i < internalCacheSize; i++ { + c1.In() <- 1 + } + c1.In() <- 1 go func() { select { - case <-c1: - case <-c2: + case <-c1.Out(): + case <-c2.Out(): default: done <- false return } done <- true }() - close(c2) + c2.Close() select { - case <-c1: + case <-c1.Out(): default: } require.Truef(t, <-done, "no chan is ready") + c1.Close() + // Drop all events. + for range c1.Out() { + } } } @@ -409,6 +429,24 @@ func TestUnboundedChann(t *testing.T) { } func TestUnboundedChannClose(t *testing.T) { + t.Run("close-status", func(t *testing.T) { + ch := New[any]() + for i := 0; i < 100; i++ { + ch.In() <- 0 + } + ch.Close() + go func() { + for range ch.Out() { + } + }() + + // Theoretically, this is not a dead loop. If the channel + // is closed, then this loop must terminate at somepoint. + // If not, we will meet timeout in the test. + for !ch.isClosed() { + t.Log("unbounded channel is still not entirely closed") + } + }) t.Run("struct{}", func(t *testing.T) { grs := runtime.NumGoroutine() N := 10