Skip to content

Commit

Permalink
refactor: remove wakeup loop by real lock
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed Feb 27, 2022
1 parent 4d57cd8 commit c13154e
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var _ wire = (*pipe)(nil)
type pipe struct {
waits int32
state int32
sleep int32
slept int32

once sync.Once
cond sync.Cond
Expand Down Expand Up @@ -58,7 +58,7 @@ func newPipe(conn net.Conn, option *ClientOption) (p *pipe, err error) {

p = &pipe{
conn: conn,
cond: sync.Cond{L: noLock{}},
cond: sync.Cond{L: &sync.Mutex{}},
queue: newRing(),
cache: newLRU(option.CacheSizeEachConn),
r: bufio.NewReader(conn),
Expand Down Expand Up @@ -96,18 +96,24 @@ func newPipe(conn net.Conn, option *ClientOption) (p *pipe, err error) {
}

func (p *pipe) _sleep() (slept bool) {
atomic.AddInt32(&p.sleep, 1) // create barrier
if slept = atomic.LoadInt32(&p.waits) == 0 && atomic.LoadInt32(&p.state) == 1; slept {
p.cond.Wait()
p.cond.L.Lock()
if slept = atomic.LoadInt32(&p.waits) == 0 && atomic.LoadInt32(&p.state) == 1; slept {
p.slept = 1
p.cond.Wait()
p.slept = 0
}
p.cond.L.Unlock()
}
atomic.AddInt32(&p.sleep, -1)
return slept
}

func (p *pipe) _awake() {
for atomic.LoadInt32(&p.sleep) != 0 {
p.cond.L.Lock()
slept := p.slept
p.cond.L.Unlock()
if slept == 1 {
p.cond.Broadcast()
runtime.Gosched()
}
}

Expand Down Expand Up @@ -622,12 +628,6 @@ var errClosing = &errs{error: ErrClosing}

type errs struct{ error }

type noLock struct{}

func (n noLock) Lock() {}

func (n noLock) Unlock() {}

type writeTimeoutConn struct {
net.Conn
current time.Time
Expand Down

0 comments on commit c13154e

Please sign in to comment.