-
Notifications
You must be signed in to change notification settings - Fork 283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sink/kafka: fix send on closed channel panic #912
Conversation
/run-all-tests |
1 similar comment
/run-all-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Could you add a unit test where you send data asynchronously while closing the Producer? This is a scenario that might produce a data race (This might have been fixed by the adjustment to the select statement, but just to be sure). |
Yes, you are right, there still exists data race and send on closed chan risk, using following code @liuzix package main
import (
"context"
"fmt"
"sync"
"time"
)
type worker struct {
inputCh chan struct{}
closeCh chan struct{}
count int64
l sync.RWMutex
}
func (w *worker) send(ctx context.Context) bool {
w.l.Lock()
defer w.l.Unlock()
select {
case <-ctx.Done():
return false
case <-w.closeCh:
return true
default:
w.inputCh <- struct{}{}
}
return false
}
func (w *worker) produce(ctx context.Context) {
finish := false
for !finish {
finish = w.send(ctx)
}
}
func (w *worker) consume(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-w.inputCh:
w.count++
}
}
}
func (w *worker) stop(lock bool) {
if lock {
w.l.Lock()
}
close(w.closeCh)
if lock {
w.l.Unlock()
}
close(w.inputCh)
}
func test(i int, enableLock bool) {
w := &worker{
inputCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
}
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
w.produce(ctx)
}()
go func() {
defer wg.Done()
w.consume(ctx)
}()
time.Sleep(time.Millisecond * 1)
w.stop(enableLock)
cancel()
wg.Wait()
fmt.Printf("idx: %d count:%d\n", i, w.count)
}
func main() {
enableLock := true
// enableLock := false
for i := 0; i < 200; i++ {
test(i, enableLock)
}
} If we run with
But with lock enabled, everything goes well |
/run-kafka-tests |
/run-all-tests |
Codecov Report
@@ Coverage Diff @@
## master #912 +/- ##
===========================================
Coverage 32.6893% 32.6893%
===========================================
Files 99 99
Lines 11698 11698
===========================================
Hits 3824 3824
Misses 7491 7491
Partials 383 383 |
/lgtm |
/merge |
/run-all-tests |
@amyangfei merge failed. |
/run-kafka-tests |
/merge |
/run-all-tests |
@amyangfei merge failed. |
/merge |
/run-all-tests |
What problem does this PR solve?
Fix #908
What is changed and how it works?
It is safe to close chan from the sender routine generally, we put the
Close
of the producer in the later processor stop procedure.Besides there exists a bug in select usage. In Golang select model, if one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection, so we put message sender to the default branch
Check List
Tests
Release note