Skip to content

Commit

Permalink
syncx: sync.Cond的超时等待版,Cond.WaitWithContext(ctx)
Browse files Browse the repository at this point in the history
  • Loading branch information
fifth-month committed Jun 27, 2023
1 parent 9637a9e commit 736bdb1
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 79 deletions.
224 changes: 147 additions & 77 deletions syncx/cond.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,101 @@
package syncx

import (
"container/list"
"context"
"sync"
)

// notifyList 是一个简单的 runtime_notifyList 实现,但增加了 waitWithContext 方法
// Cond 实现了一个条件变量,是等待或宣布一个事件发生的goroutines的汇合点。
//
// 在改变条件和调用Wait方法的时候,Cond 关联的锁对象 L (*Mutex 或者 *RWMutex)必须被加锁,
//
// 在Go内存模型的术语中,Cond 保证 Broadcast或Signal的调用 同步于 因此而解除阻塞的 Wait 之前。
//
// 绝大多数简单用例, 最好使用 channels 而不是 Cond
// (Broadcast 对应于关闭一个 channel, Signal 对应于给一个 channel 发送消息).
type Cond struct {
noCopy noCopy
// L 在观察或改变条件时被加锁
L sync.Locker
notifyList *notifyList
}

// NewCond 返回 关联了 l 的新 Cond .
func NewCond(l sync.Locker) *Cond {
return &Cond{
L: l,
notifyList: newNotifyList(),
}
}

// Wait 自动解锁 c.L 并挂起当前调用的 goroutine. 在恢复执行之后 Wait 在返回前将加 c.L 锁成功.
// 和其它系统不一样, 除非调用 Broadcast 或 Signal 或者 ctx 超时了,否则 Wait 不会返回.
//
// 成功唤醒时, 返回 nil. 超时失败时, 返回ctx.Err().
// 如果 ctx 超时了, Wait 可能依旧执行成功返回 nil.
//
// 在 Wait 第一次继续执行时,因为 c.L 没有加锁, 当 Wait 返回的时候,调用者通常不能假设条件是真的
// 相反, caller 应该在循环中调用 Wait:
//
// c.L.Lock()
// for !condition() {
// if err := c.Wait(ctx); err != nil {
// // 超时唤醒了,并不是被正常唤醒的,可以做一些超时的处理
// }
// }
// ... condition 满足了,do work ...
// c.L.Unlock()
func (c *Cond) Wait(ctx context.Context) error {
t := c.notifyList.add() // 解锁前,将等待的对象放入链表中
c.L.Unlock() // 一定是在等待对象放入链表后再解锁,避免刚解锁就发生协程切换,执行了signal后,再换回来导致永远阻塞
defer c.L.Lock()
return c.notifyList.wait(ctx, t)
}

// Signal 唤醒一个等待在 c 上的goroutine.
//
// 调用时,caller 可以持有也可以不持有 c.L 锁
//
// Signal() 不影响 goroutine 调度的优先级; 如果其它的 goroutines
// 尝试着锁定 c.L, 它们可能在 "waiting" goroutine 之前被唤醒.
func (c *Cond) Signal() {
c.notifyList.notifyOne()
}

// Broadcast 唤醒所有等待在 c 上的goroutine.
//
// 调用时,caller 可以持有也可以不持有 c.L 锁
func (c *Cond) Broadcast() {
c.notifyList.notifyAll()
}

// notifyList 是一个简单的 runtime_notifyList 实现,但增强了 wait 方法
type notifyList struct {
mu sync.Mutex
list *list.List
chPool *sync.Pool
mu sync.Mutex
list *chanList
}

func newNotifyList() *notifyList {
return &notifyList{
mu: sync.Mutex{},
list: list.New(),
chPool: &sync.Pool{
New: func() any {
return make(chan struct{}, 1)
},
},
list: newChanList(),
}
}

func (l *notifyList) add() *list.Element {
func (l *notifyList) add() *node {
l.mu.Lock()
defer l.mu.Unlock()
return l.list.PushBack(l.chPool.Get())
el := l.list.alloc()
l.list.PushBack(el)
return el
}

func (l *notifyList) waitWithContext(ctx context.Context, elem *list.Element) error {
ch := elem.Value.(chan struct{})
func (l *notifyList) wait(ctx context.Context, elem *node) error {
ch := elem.Value
// 回收ch,超时时,因为没有被使用过,直接复用
// 正常唤醒时,由于被放入了一条消息,但被取出来了一次,所以可以重复使用
defer l.chPool.Put(ch)
// 正常唤醒时,由于被放入了一条消息,但被取出来了一次,所以elem中的ch可以重复使用
// 由于ch是挂在elem上的,所以elem在ch被回收之前,不可以被错误回收,所以必须在这里进行回收
defer l.list.free(elem)
select { // 由于会随机选择一条,在超时和通知同时存在的话,如果通知先行,则没有影响,如果超时的同时,又来了通知
case <-ctx.Done(): // 进了超时分支,但同时协程发生了切换进入了notifyOne的分支;这个时候,根据remove的成功与否可以知道是否是需要唤醒的
l.mu.Lock()
Expand All @@ -58,12 +118,10 @@ func (l *notifyList) waitWithContext(ctx context.Context, elem *list.Element) er
// double check: 检查是否在加锁前,刚好被正常通知了,
case <-ch: // 如果取到数据,代表收到了信号了,ch也因为被取了一次消息,意味着可以再次复用
// 转移信号到下一个
// 如果没有下一个等待的,就返回
if l.list.Len() == 0 {
return ctx.Err()
}
// 如果有下一个等待的,就唤醒它
l.notifyNext()
if l.list.Len() != 0 {
l.notifyNext()
}

Check warning on line 124 in syncx/cond.go

View check run for this annotation

Codecov / codecov/patch

syncx/cond.go#L123-L124

Added lines #L123 - L124 were not covered by tests
default: // 如果取不到数据,代表不可能被正常唤醒了,ch也意味着没有被使用
// 这种情况代表加锁成功后,没有被通知到,属于真正的超时的情况,从队列移除等待对象,避免被错误通知唤醒,返回超时错误信息
l.list.Remove(elem)
Expand All @@ -85,7 +143,7 @@ func (l *notifyList) notifyOne() {

func (l *notifyList) notifyNext() {
front := l.list.Front()
ch := front.Value.(chan struct{})
ch := front.Value
l.list.Remove(front)
ch <- struct{}{}
}
Expand All @@ -98,67 +156,79 @@ func (l *notifyList) notifyAll() {
}
}

// Cond 实现了一个条件变量,是等待或宣布一个事件发生的goroutines的汇合点。
//
// 在改变条件和调用Wait方法的时候,Cond 关联的锁对象 L (*Mutex 或者 *RWMutex)必须被加锁,
//
// # Cond 在初次使用后,不要复制对象
//
// 在Go内存模型的术语中,Cond安排对Broadcast或Signal的调用"happens before"任何解除阻塞的 Wait 调用。
//
// 绝大多数简单用例, 最好使用channels而不是 Cond
// (Broadcast 对应于关闭一个 channel, Signal 对应于给一个 channel 发送消息).
type Cond struct {
// L 在观察或改变条件时被加锁
L sync.Locker
notifyList *notifyList
// node 保存chan的链表元素
type node struct {
prev *node
next *node
Value chan struct{}
}

// NewCond 返回 关联了 l 的新 Cond .
func NewCond(l sync.Locker) *Cond {
return &Cond{
L: l,
notifyList: newNotifyList(),
// chanList 用于存放保存channel的一个双链表, 带复用元素的功能
type chanList struct {
// 哨兵元素,方便处理元素个数为0的情况
sentinel *node
size int
pool *sync.Pool
}

func newChanList() *chanList {
sentinel := &node{}
sentinel.prev = sentinel
sentinel.next = sentinel
return &chanList{
sentinel: sentinel,
size: 0,
pool: &sync.Pool{
New: func() any {
return &node{
Value: make(chan struct{}, 1),
}
},
},
}
}

// Wait 自动解锁 c.L 并挂起当前调用的 goroutine. 在恢复执行之后 Wait 在返回前将加 c.L 锁成功.
// 和其它系统不一样, 除非调用 Broadcast 或 Signal 或者 ctx 超时了,否则 Wait 不会返回.
//
// 成功唤醒时, 返回 nil. 超时失败时, 返回ctx.Err().
// 如果 ctx 超时了, Wait 可能依旧执行成功返回 nil.
//
// 在 Wait 第一次继续执行时,因为 c.L 没有加锁, 当 Wait 返回的时候,调用者通常不能假设条件是真的
// 相反, caller 应该在循环中调用 Wait:
//
// c.L.Lock()
// for !condition() {
// if err := c.Wait(ctx); err != nil {
// // 超时唤醒了,并不是被正常唤醒的,可以做一些超时的处理
// }
// }
// ... condition 满足了,do work ...
// c.L.Unlock()
func (c *Cond) Wait(ctx context.Context) error {
t := c.notifyList.add() // 解锁前,将等待的对象放入链表中
c.L.Unlock() // 一定是在等待对象放入链表后再解锁,避免刚解锁就发生协程切换,执行了signal后,再换回来导致永远阻塞
defer c.L.Lock()
return c.notifyList.waitWithContext(ctx, t)
// Len 获取链表长度
func (l *chanList) Len() int {
return l.size
}

// Signal 唤醒一个等待在 c 上的goroutine.
//
// 调用时,caller 可以持有也可以不持有 c.L 锁
//
// Signal() 不影响 goroutine 调度的优先级; 如果其它的 goroutines
// 尝试着锁定 c.L, 它们可能在 "waiting" goroutine 之前被唤醒.
func (c *Cond) Signal() {
c.notifyList.notifyOne()
// Front 获取队首元素
func (l *chanList) Front() *node {
return l.sentinel.next
}

// Broadcast 唤醒所有等待在 c 上的goroutine.
//
// 调用时,caller 可以持有也可以不持有 c.L 锁
func (c *Cond) Broadcast() {
c.notifyList.notifyAll()
// alloc 申请新的元素,包含复用的chan
func (l *chanList) alloc() *node {
elem := l.pool.Get().(*node)
return elem
}

// PushBack 追加元素到队尾
func (l *chanList) PushBack(elem *node) {
elem.next = l.sentinel
elem.prev = l.sentinel.prev
l.sentinel.prev.next = elem
l.sentinel.prev = elem
l.size++
}

// Remove 元素移除时,还不能回收该元素,避免元素上的chan被错误覆盖
func (l *chanList) Remove(elem *node) {
elem.prev.next = elem.next
elem.next.prev = elem.prev
elem.prev = nil
elem.next = nil
l.size--
}

// free 回收该元素,用于下次alloc获取时复用,避免再次分配
func (l *chanList) free(elem *node) {
l.pool.Put(elem)
}

// 用于静态代码检查复制的问题
type noCopy struct{}

func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}

Check warning on line 234 in syncx/cond.go

View check run for this annotation

Codecov / codecov/patch

syncx/cond.go#L233-L234

Added lines #L233 - L234 were not covered by tests
60 changes: 58 additions & 2 deletions syncx/cond_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package syncx
import (
"context"
"math/rand"
"reflect"
"sync"
"testing"
"time"
)

func TestCond_WaitWithContext(t *testing.T) {
func TestCond_Broadcast(t *testing.T) {

cond := NewCond(&sync.Mutex{})

Expand Down Expand Up @@ -96,7 +97,7 @@ func TestCond_WaitWithContext(t *testing.T) {
}
}

func TestCond_WakeOrder(t *testing.T) {
func TestCond_Signal(t *testing.T) {

cond := NewCond(&sync.Mutex{})

Expand Down Expand Up @@ -239,3 +240,58 @@ func Test_InOrder(t *testing.T) {
})
}
}

// TestChanList 测试有序,和清空后重复使用是否有问题
func TestChanList(t *testing.T) {

l := newChanList()

testcases := []struct {
name string
num int
}{
{"", 5},
{"", 3},
{"", 10},
}

for _, testcase := range testcases {
t.Run(testcase.name, func(tt *testing.T) {
inputNodes := make([]*node, 0, testcase.num)
inputChans := make([]chan struct{}, 0, testcase.num)
for i := 0; i < testcase.num; i++ {
ele := l.alloc()
inputNodes = append(inputNodes, ele)
inputChans = append(inputChans, ele.Value)
l.PushBack(ele)
}
if length := l.Len(); length != testcase.num {
t.Errorf("list.Len() = %v, want %v", length, testcase.num)
}
outNodes := make([]*node, 0, testcase.num)
outChans := make([]chan struct{}, 0, testcase.num)
for l.Len() != 0 {
front := l.Front()
outNodes = append(outNodes, front)
outChans = append(outChans, front.Value)
l.Remove(front)
}
if !reflect.DeepEqual(outChans, inputChans) {
t.Errorf("chan list is %v, but got %v", inputChans, outChans)
}
if !reflect.DeepEqual(outNodes, inputNodes) {
t.Errorf("element list is %v, but got %v", inputNodes, outNodes)
}
})
}
}

// BenchmarkChanList 测试有无内存分配增加的情况
func BenchmarkChanList(b *testing.B) {
l := newChanList()
for i := 0; i < b.N; i++ {
elem := l.alloc()
l.PushBack(elem)
l.Remove(elem)
}
}

0 comments on commit 736bdb1

Please sign in to comment.