Skip to content

Commit

Permalink
pkg/chann(ticdc): prevent unbounded channel may causing cpu spin (#5749)
Browse files Browse the repository at this point in the history
close #5748
  • Loading branch information
Rustin170506 authored Jun 8, 2022
1 parent 8a18620 commit 94bd690
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 74 deletions.
143 changes: 85 additions & 58 deletions pkg/chann/chann.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
// Furthermore, all channels provides methods to send (In()),
// receive (Out()), and close (Close()).
//
// An unbounded channel is not a buffered channel with infinite capacity,
// and they have different memory model semantics in terms of receiving
// a value: The recipient of a buffered channel is immediately available
// after a send is complete. However, the recipient of an unbounded channel
// may be available within a bounded time frame after a send is complete.
//
// Note that to close a channel, must use Close() method instead of the
// language built-in method
// Two additional methods: ApproxLen and Cap returns the current status
Expand Down Expand Up @@ -93,6 +99,7 @@ func Cap(n int) Opt {
// unbuffered, or unbounded. To create a new channel, use New to allocate
// one, and use Cap to configure the capacity of the channel.
type Chann[T any] struct {
q []T
in, out chan T
close chan struct{}
cfg *config
Expand Down Expand Up @@ -127,7 +134,7 @@ func New[T any](opts ...Opt) *Chann[T] {
for _, o := range opts {
o(cfg)
}
ch := &Chann[T]{cfg: cfg}
ch := &Chann[T]{cfg: cfg, close: make(chan struct{})}
switch ch.cfg.typ {
case unbuffered:
ch.in = make(chan T)
Expand All @@ -138,63 +145,7 @@ func New[T any](opts ...Opt) *Chann[T] {
case unbounded:
ch.in = make(chan T, 16)
ch.out = make(chan T, 16)
ch.close = make(chan struct{})
ready := make(chan struct{})
var nilT T

go func() {
q := make([]T, 0, 1<<10)
ready <- struct{}{}
for {
select {
case e, ok := <-ch.in:
if !ok {
panic("chann: send-only channel ch.In() closed unexpectedly")
}
atomic.AddInt64(&ch.cfg.len, 1)
q = append(q, e)
case <-ch.close:
goto closed
}

for len(q) > 0 {
select {
case ch.out <- q[0]:
atomic.AddInt64(&ch.cfg.len, -1)
q[0] = nilT
q = q[1:]
case e, ok := <-ch.in:
if !ok {
panic("chann: send-only channel ch.In() closed unexpectedly")
}
atomic.AddInt64(&ch.cfg.len, 1)
q = append(q, e)
case <-ch.close:
goto closed
}
}
if cap(q) < 1<<5 {
q = make([]T, 0, 1<<10)
}
}

closed:
close(ch.in)
for e := range ch.in {
q = append(q, e)
}
for len(q) > 0 {
select {
case ch.out <- q[0]:
q[0] = nilT // de-reference earlier to help GC
q = q[1:]
default:
}
}
close(ch.out)
close(ch.close)
}()
<-ready
go ch.unboundedProcessing()
}
return ch
}
Expand All @@ -213,11 +164,87 @@ func (ch *Chann[T]) Close() {
switch ch.cfg.typ {
case buffered, unbuffered:
close(ch.in)
close(ch.close)
default:
ch.close <- struct{}{}
}
}

// unboundedProcessing is a processing loop that implements unbounded
// channel semantics.
func (ch *Chann[T]) unboundedProcessing() {
var nilT T

ch.q = make([]T, 0, 1<<10)
for {
select {
case e, ok := <-ch.in:
if !ok {
panic("chann: send-only channel ch.In() closed unexpectedly")
}
atomic.AddInt64(&ch.cfg.len, 1)
ch.q = append(ch.q, e)
case <-ch.close:
ch.unboundedTerminate()
return
}

for len(ch.q) > 0 {
select {
case ch.out <- ch.q[0]:
atomic.AddInt64(&ch.cfg.len, -1)
ch.q[0] = nilT
ch.q = ch.q[1:]
case e, ok := <-ch.in:
if !ok {
panic("chann: send-only channel ch.In() closed unexpectedly")
}
atomic.AddInt64(&ch.cfg.len, 1)
ch.q = append(ch.q, e)
case <-ch.close:
ch.unboundedTerminate()
return
}
}
if cap(ch.q) < 1<<5 {
ch.q = make([]T, 0, 1<<10)
}
}
}

// unboundedTerminate terminates the unbounde channel's processing loop
// and make sure all unprocessed elements be consumed if there is
// a pending receiver.
func (ch *Chann[T]) unboundedTerminate() {
var zeroT T

close(ch.in)
for e := range ch.in {
ch.q = append(ch.q, e)
}
for len(ch.q) > 0 {
select {
// NOTICE: If no receiver is receiving the element, it will be blocked.
// So the consumer have to deal with all the elements in the queue.
case ch.out <- ch.q[0]:
}
ch.q[0] = zeroT // de-reference earlier to help GC
ch.q = ch.q[1:]
}
close(ch.out)
close(ch.close)
}

// isClose reports the close status of a channel.
func (ch *Chann[T]) isClosed() bool {
select {
case <-ch.close:
return true
default:
return false
}
}

// Len returns an approximation of the length of the channel.
//
// Note that in a concurrent scenario, the returned length of a channel
Expand Down
70 changes: 54 additions & 16 deletions pkg/chann/chann_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ func TestNonblockRecvRace(t *testing.T) {
}
}

const internalCacheSize = 16 + 1<<10

// This test checks that select acts on the state of the channels at one
// moment in the execution, not over a smeared time window.
// In the test, one goroutine does:
Expand All @@ -237,15 +239,19 @@ func TestNonblockRecvRace(t *testing.T) {
// always receive from one or the other. It must never execute the default case.
func TestNonblockSelectRace(t *testing.T) {
n := 1000
if testing.Short() {
n = 1000
}
done := New[bool](Cap(1))
for i := 0; i < n; i++ {
c1 := New[int]()
c2 := New[int]()
// The input channel of an unbounded buffer have an internal
// cache queue. When the input channel and the internal cache
// queue both gets full, we are certain that once the next send
// is complete, the out will be available for sure hence the
// waiting time of a receive is bounded.
for i := 0; i < internalCacheSize; i++ {
c1.In() <- 1
}
c1.In() <- 1
time.Sleep(time.Millisecond)
go func() {
runtime.Gosched()
select {
Expand All @@ -257,44 +263,58 @@ func TestNonblockSelectRace(t *testing.T) {
}
done.In() <- true
}()
// Same for c2
for i := 0; i < internalCacheSize; i++ {
c2.In() <- 1
}
c2.In() <- 1
select {
case <-c2.Out():
case <-c1.Out():
default:
}
require.Truef(t, <-done.Out(), "no chan is ready")
c1.Close()
// Drop all events.
for range c1.Out() {
}
c2.Close()
for range c2.Out() {
}
}
}

// Same as TestNonblockSelectRace, but close(c2) replaces c2 <- 1.
func TestNonblockSelectRace2(t *testing.T) {
n := 100000
if testing.Short() {
n = 1000
}
n := 1000
done := make(chan bool, 1)
for i := 0; i < n; i++ {
c1 := make(chan int, 1)
c2 := make(chan int)
c1 <- 1
c1 := New[int]()
c2 := New[int]()
// See TestNonblockSelectRace.
for i := 0; i < internalCacheSize; i++ {
c1.In() <- 1
}
c1.In() <- 1
go func() {
select {
case <-c1:
case <-c2:
case <-c1.Out():
case <-c2.Out():
default:
done <- false
return
}
done <- true
}()
close(c2)
c2.Close()
select {
case <-c1:
case <-c1.Out():
default:
}
require.Truef(t, <-done, "no chan is ready")
c1.Close()
// Drop all events.
for range c1.Out() {
}
}
}

Expand Down Expand Up @@ -409,6 +429,24 @@ func TestUnboundedChann(t *testing.T) {
}

func TestUnboundedChannClose(t *testing.T) {
t.Run("close-status", func(t *testing.T) {
ch := New[any]()
for i := 0; i < 100; i++ {
ch.In() <- 0
}
ch.Close()
go func() {
for range ch.Out() {
}
}()

// Theoretically, this is not a dead loop. If the channel
// is closed, then this loop must terminate at somepoint.
// If not, we will meet timeout in the test.
for !ch.isClosed() {
t.Log("unbounded channel is still not entirely closed")
}
})
t.Run("struct{}", func(t *testing.T) {
grs := runtime.NumGoroutine()
N := 10
Expand Down

0 comments on commit 94bd690

Please sign in to comment.