From 037661cd1d79b59c114999a79cb1b3d291c9684d Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 13 May 2020 12:13:24 +0800 Subject: [PATCH 1/2] notify: fix data race and send on close chan in receiveCh --- pkg/notify/notify.go | 54 ++++++++++++++++++++++++++------------- pkg/notify/notify_test.go | 15 +++++++++++ 2 files changed, 51 insertions(+), 18 deletions(-) diff --git a/pkg/notify/notify.go b/pkg/notify/notify.go index 64acff30d18..08363fa04d0 100644 --- a/pkg/notify/notify.go +++ b/pkg/notify/notify.go @@ -20,23 +20,41 @@ func (n *Notifier) Notify() { n.mu.RLock() defer n.mu.RUnlock() for _, receiver := range n.receivers { - signalNonBlocking(receiver.rec.c) + receiver.rec.signalNonBlocking() } } -func signalNonBlocking(ch chan struct{}) { +// Receiver is a receiver of notifier, including the receiver channel and stop receiver function. +type Receiver struct { + C <-chan struct{} + c chan struct{} + Stop func() + ticker *time.Ticker + closeCh chan struct{} +} + +// returns true if the receiverCh should be closed +func (r *Receiver) signalNonBlocking() bool { select { - case ch <- struct{}{}: + case <-r.closeCh: + return true + case r.c <- struct{}{}: default: } + return false } -// Receiver is a receiver of notifier, including the receiver channel and stop receiver function. -type Receiver struct { - C <-chan struct{} - Stop func() - ticker *time.Ticker - c chan struct{} +func (r *Receiver) signalTickLoop() { + go func() { + loop: + for range r.ticker.C { + exit := r.signalNonBlocking() + if exit { + break loop + } + } + close(r.c) + }() } // NewReceiver creates a receiver @@ -47,22 +65,22 @@ func (n *Notifier) NewReceiver(tickTime time.Duration) *Receiver { currentIndex := n.maxIndex n.maxIndex++ receiverCh := make(chan struct{}, 1) + closeCh := make(chan struct{}) var ticker *time.Ticker if tickTime > 0 { ticker = time.NewTicker(tickTime) - go func() { - for range ticker.C { - signalNonBlocking(receiverCh) - } - }() } rec := &Receiver{ C: receiverCh, + c: receiverCh, Stop: func() { n.remove(currentIndex) }, - ticker: ticker, - c: receiverCh, + ticker: ticker, + closeCh: closeCh, + } + if tickTime > 0 { + rec.signalTickLoop() } n.receivers = append(n.receivers, struct { rec *Receiver @@ -77,10 +95,10 @@ func (n *Notifier) remove(index int) { for i, receiver := range n.receivers { if receiver.index == index { n.receivers = append(n.receivers[:i], n.receivers[i+1:]...) + close(receiver.rec.closeCh) if receiver.rec.ticker != nil { receiver.rec.ticker.Stop() } - close(receiver.rec.c) break } } @@ -94,7 +112,7 @@ func (n *Notifier) Close() { if receiver.rec.ticker != nil { receiver.rec.ticker.Stop() } - close(receiver.rec.c) + close(receiver.rec.closeCh) } n.receivers = nil } diff --git a/pkg/notify/notify_test.go b/pkg/notify/notify_test.go index 3cd2021fbeb..27e95405748 100644 --- a/pkg/notify/notify_test.go +++ b/pkg/notify/notify_test.go @@ -42,3 +42,18 @@ func (s *notifySuite) TestNotifyHub(c *check.C) { <-r5.C r5.Stop() } + +func (s *notifySuite) TestContinusStop(c *check.C) { + notifier := new(Notifier) + n := 20000 + receivers := make([]*Receiver, n) + for i := 0; i < n; i++ { + receivers[i] = notifier.NewReceiver(10 * time.Millisecond) + } + for i := 0; i < n; i++ { + <-receivers[i].C + } + for i := 0; i < n; i++ { + receivers[i].Stop() + } +} From 22ae2a7787e1220439041d1a2a4bf3e17bd6251e Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 13 May 2020 12:42:49 +0800 Subject: [PATCH 2/2] reduce goroutine count to avoid "race: limit on 8128 simultaneously alive goroutines is exceeded, dying" --- pkg/notify/notify_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/notify/notify_test.go b/pkg/notify/notify_test.go index 27e95405748..269f7b69fb8 100644 --- a/pkg/notify/notify_test.go +++ b/pkg/notify/notify_test.go @@ -45,7 +45,7 @@ func (s *notifySuite) TestNotifyHub(c *check.C) { func (s *notifySuite) TestContinusStop(c *check.C) { notifier := new(Notifier) - n := 20000 + n := 5000 receivers := make([]*Receiver, n) for i := 0; i < n; i++ { receivers[i] = notifier.NewReceiver(10 * time.Millisecond)