Skip to content

Commit

Permalink
runtime: change mutex profile to count every blocked goroutine
Browse files Browse the repository at this point in the history
The pprof mutex profile was meant to match the Google C++ (now Abseil)
mutex profiler, originally designed and implemented by Mike Burrows.
When we worked on the Go version, pjw and I missed that C++ counts the
time each thread is blocked, even if multiple threads are blocked on a
mutex. That is, if 100 threads are blocked on the same mutex for the
same 10ms, that still counts as 1000ms of contention in C++. In Go, to
date, /debug/pprof/mutex has counted that as only 10ms of contention.
If 100 goroutines are blocked on one mutex and only 1 goroutine is
blocked on another mutex, we probably do want to see the first mutex
as being more contended, so the Abseil approach is the more useful one.

This CL adopts "contention scales with number of goroutines blocked",
to better match Abseil [1]. However, it still makes sure to attribute the
time to the unlock that caused the backup, not subsequent innocent
unlocks that were affected by the congestion. In this way it still gives
more accurate profiles than Abseil does.

[1] https://github.com/abseil/abseil-cpp/blob/lts_2023_01_25/absl/synchronization/mutex.cc#L2390

Fixes #61015.

Change-Id: I7eb9e706867ffa8c0abb5b26a1b448f6eba49331
Reviewed-on: https://go-review.googlesource.com/c/go/+/506415
Run-TryBot: Russ Cox <rsc@golang.org>
Auto-Submit: Russ Cox <rsc@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
  • Loading branch information
rsc authored and gopherbot committed Aug 17, 2023
1 parent 14a3ffc commit 1c00354
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/runtime/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@ func (t *SemTable) Enqueue(addr *uint32) {
//
// Returns true if there actually was a waiter to be dequeued.
func (t *SemTable) Dequeue(addr *uint32) bool {
s, _ := t.semTable.rootFor(addr).dequeue(addr)
s, _, _ := t.semTable.rootFor(addr).dequeue(addr)
if s != nil {
releaseSudog(s)
return true
Expand Down
2 changes: 0 additions & 2 deletions src/runtime/mprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,6 @@ func mutexevent(cycles int64, skip int) {
cycles = 0
}
rate := int64(atomic.Load64(&mutexprofilerate))
// TODO(pjw): measure impact of always calling fastrand vs using something
// like malloc.go:nextSample()
if rate > 0 && int64(fastrand())%rate == 0 {
saveblockevent(cycles, rate, skip+1, mutexProfile)
}
Expand Down
74 changes: 58 additions & 16 deletions src/runtime/pprof/pprof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ func containsStack(got [][]string, want []string) bool {
// awaitBlockedGoroutine spins on runtime.Gosched until a runtime stack dump
// shows a goroutine in the given state with a stack frame in
// runtime/pprof.<fName>.
func awaitBlockedGoroutine(t *testing.T, state, fName string) {
func awaitBlockedGoroutine(t *testing.T, state, fName string, count int) {
re := fmt.Sprintf(`(?m)^goroutine \d+ \[%s\]:\n(?:.+\n\t.+\n)*runtime/pprof\.%s`, regexp.QuoteMeta(state), fName)
r := regexp.MustCompile(re)

Expand All @@ -1047,7 +1047,7 @@ func awaitBlockedGoroutine(t *testing.T, state, fName string) {
buf = make([]byte, 2*len(buf))
continue
}
if r.Match(buf[:n]) {
if len(r.FindAll(buf[:n], -1)) >= count {
return
}
}
Expand All @@ -1056,7 +1056,7 @@ func awaitBlockedGoroutine(t *testing.T, state, fName string) {
func blockChanRecv(t *testing.T) {
c := make(chan bool)
go func() {
awaitBlockedGoroutine(t, "chan receive", "blockChanRecv")
awaitBlockedGoroutine(t, "chan receive", "blockChanRecv", 1)
c <- true
}()
<-c
Expand All @@ -1065,7 +1065,7 @@ func blockChanRecv(t *testing.T) {
func blockChanSend(t *testing.T) {
c := make(chan bool)
go func() {
awaitBlockedGoroutine(t, "chan send", "blockChanSend")
awaitBlockedGoroutine(t, "chan send", "blockChanSend", 1)
<-c
}()
c <- true
Expand All @@ -1074,7 +1074,7 @@ func blockChanSend(t *testing.T) {
func blockChanClose(t *testing.T) {
c := make(chan bool)
go func() {
awaitBlockedGoroutine(t, "chan receive", "blockChanClose")
awaitBlockedGoroutine(t, "chan receive", "blockChanClose", 1)
close(c)
}()
<-c
Expand All @@ -1086,7 +1086,7 @@ func blockSelectRecvAsync(t *testing.T) {
c2 := make(chan bool, 1)
go func() {
for i := 0; i < numTries; i++ {
awaitBlockedGoroutine(t, "select", "blockSelectRecvAsync")
awaitBlockedGoroutine(t, "select", "blockSelectRecvAsync", 1)
c <- true
}
}()
Expand All @@ -1102,7 +1102,7 @@ func blockSelectSendSync(t *testing.T) {
c := make(chan bool)
c2 := make(chan bool)
go func() {
awaitBlockedGoroutine(t, "select", "blockSelectSendSync")
awaitBlockedGoroutine(t, "select", "blockSelectSendSync", 1)
<-c
}()
select {
Expand All @@ -1115,7 +1115,7 @@ func blockMutex(t *testing.T) {
var mu sync.Mutex
mu.Lock()
go func() {
awaitBlockedGoroutine(t, "sync.Mutex.Lock", "blockMutex")
awaitBlockedGoroutine(t, "sync.Mutex.Lock", "blockMutex", 1)
mu.Unlock()
}()
// Note: Unlock releases mu before recording the mutex event,
Expand All @@ -1125,12 +1125,36 @@ func blockMutex(t *testing.T) {
mu.Lock()
}

func blockMutexN(t *testing.T, n int, d time.Duration) {
var wg sync.WaitGroup
var mu sync.Mutex
mu.Lock()
go func() {
awaitBlockedGoroutine(t, "sync.Mutex.Lock", "blockMutex", n)
time.Sleep(d)
mu.Unlock()
}()
// Note: Unlock releases mu before recording the mutex event,
// so it's theoretically possible for this to proceed and
// capture the profile before the event is recorded. As long
// as this is blocked before the unlock happens, it's okay.
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
mu.Unlock()
}()
}
wg.Wait()
}

func blockCond(t *testing.T) {
var mu sync.Mutex
c := sync.NewCond(&mu)
mu.Lock()
go func() {
awaitBlockedGoroutine(t, "sync.Cond.Wait", "blockCond")
awaitBlockedGoroutine(t, "sync.Cond.Wait", "blockCond", 1)
mu.Lock()
c.Signal()
mu.Unlock()
Expand Down Expand Up @@ -1217,7 +1241,11 @@ func TestMutexProfile(t *testing.T) {
t.Fatalf("need MutexProfileRate 0, got %d", old)
}

blockMutex(t)
const (
N = 100
D = 100 * time.Millisecond
)
blockMutexN(t, N, D)

t.Run("debug=1", func(t *testing.T) {
var w strings.Builder
Expand All @@ -1230,15 +1258,11 @@ func TestMutexProfile(t *testing.T) {
}
prof = strings.Trim(prof, "\n")
lines := strings.Split(prof, "\n")
if len(lines) != 6 {
t.Errorf("expected 6 lines, got %d %q\n%s", len(lines), prof, prof)
}
if len(lines) < 6 {
return
t.Fatalf("expected >=6 lines, got %d %q\n%s", len(lines), prof, prof)
}
// checking that the line is like "35258904 1 @ 0x48288d 0x47cd28 0x458931"
r2 := `^\d+ \d+ @(?: 0x[[:xdigit:]]+)+`
//r2 := "^[0-9]+ 1 @ 0x[0-9a-f x]+$"
if ok, err := regexp.MatchString(r2, lines[3]); err != nil || !ok {
t.Errorf("%q didn't match %q", lines[3], r2)
}
Expand All @@ -1263,12 +1287,30 @@ func TestMutexProfile(t *testing.T) {

stks := stacks(p)
for _, want := range [][]string{
{"sync.(*Mutex).Unlock", "runtime/pprof.blockMutex.func1"},
{"sync.(*Mutex).Unlock", "runtime/pprof.blockMutexN.func1"},
} {
if !containsStack(stks, want) {
t.Errorf("No matching stack entry for %+v", want)
}
}

i := 0
for ; i < len(p.SampleType); i++ {
if p.SampleType[i].Unit == "nanoseconds" {
break
}
}
if i >= len(p.SampleType) {
t.Fatalf("profile did not contain nanoseconds sample")
}
total := int64(0)
for _, s := range p.Sample {
total += s.Value[i]
}
d := time.Duration(total)
if d < N*D*9/10 || d > N*D*2 { // want N*D but allow [0.9,2.0]*that.
t.Fatalf("profile samples total %v, want %v", d, N*D)
}
})
}

Expand Down
9 changes: 8 additions & 1 deletion src/runtime/runtime2.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ type gobuf struct {
bp uintptr // for framepointer-enabled architectures
}

// sudog represents a g in a wait list, such as for sending/receiving
// sudog (pseudo-g) represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
Expand Down Expand Up @@ -382,6 +382,13 @@ type sudog struct {
// because c was closed.
success bool

// waiters is a count of semaRoot waiting list other than head of list,
// clamped to a uint16 to fit in unused space.
// Only meaningful at the head of the list.
// (If we wanted to be overly clever, we could store a high 16 bits
// in the second entry in the list.)
waiters uint16

parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
Expand Down
54 changes: 48 additions & 6 deletions src/runtime/sema.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,36 @@ func semrelease1(addr *uint32, handoff bool, skipframes int) {
unlock(&root.lock)
return
}
s, t0 := root.dequeue(addr)
s, t0, tailtime := root.dequeue(addr)
if s != nil {
root.nwait.Add(-1)
}
unlock(&root.lock)
if s != nil { // May be slow or even yield, so unlock first
acquiretime := s.acquiretime
if acquiretime != 0 {
mutexevent(t0-acquiretime, 3+skipframes)
// Charge contention that this (delayed) unlock caused.
// If there are N more goroutines waiting beyond the
// one that's waking up, charge their delay as well, so that
// contention holding up many goroutines shows up as
// more costly than contention holding up a single goroutine.
// It would take O(N) time to calculate how long each goroutine
// has been waiting, so instead we charge avg(head-wait, tail-wait)*N.
// head-wait is the longest wait and tail-wait is the shortest.
// (When we do a lifo insertion, we preserve this property by
// copying the old head's acquiretime into the inserted new head.
// In that case the overall average may be slightly high, but that's fine:
// the average of the ends is only an approximation to the actual
// average anyway.)
// The root.dequeue above changed the head and tail acquiretime
// to the current time, so the next unlock will not re-count this contention.
dt0 := t0 - acquiretime
dt := dt0
if s.waiters != 0 {
dtail := t0 - tailtime
dt += (dtail + dt0) / 2 * int64(s.waiters)
}
mutexevent(dt, 3+skipframes)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
Expand Down Expand Up @@ -248,6 +269,7 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
s.elem = unsafe.Pointer(addr)
s.next = nil
s.prev = nil
s.waiters = 0

var last *sudog
pt := &root.treap
Expand All @@ -258,7 +280,7 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
// Substitute s in t's place in treap.
*pt = s
s.ticket = t.ticket
s.acquiretime = t.acquiretime
s.acquiretime = t.acquiretime // preserve head acquiretime as oldest time
s.parent = t.parent
s.prev = t.prev
s.next = t.next
Expand All @@ -274,6 +296,10 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
if s.waittail == nil {
s.waittail = t
}
s.waiters = t.waiters
if s.waiters+1 != 0 {
s.waiters++
}
t.parent = nil
t.prev = nil
t.next = nil
Expand All @@ -287,6 +313,9 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
}
t.waittail = s
s.waitlink = nil
if t.waiters+1 != 0 {
t.waiters++
}
}
return
}
Expand Down Expand Up @@ -330,7 +359,10 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
// in semaRoot blocked on addr.
// If the sudog was being profiled, dequeue returns the time
// at which it was woken up as now. Otherwise now is 0.
func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) {
// If there are additional entries in the wait list, dequeue
// returns tailtime set to the last entry's acquiretime.
// Otherwise tailtime is found.acquiretime.
func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now, tailtime int64) {
ps := &root.treap
s := *ps
for ; s != nil; s = *ps {
Expand All @@ -343,7 +375,7 @@ func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) {
ps = &s.next
}
}
return nil, 0
return nil, 0, 0

Found:
now = int64(0)
Expand All @@ -368,7 +400,16 @@ Found:
} else {
t.waittail = nil
}
t.waiters = s.waiters
if t.waiters > 1 {
t.waiters--
}
// Set head and tail acquire time to 'now',
// because the caller will take care of charging
// the delays before now for all entries in the list.
t.acquiretime = now
tailtime = s.waittail.acquiretime
s.waittail.acquiretime = now
s.waitlink = nil
s.waittail = nil
} else {
Expand All @@ -390,13 +431,14 @@ Found:
} else {
root.treap = nil
}
tailtime = s.acquiretime
}
s.parent = nil
s.elem = nil
s.next = nil
s.prev = nil
s.ticket = 0
return s, now
return s, now, tailtime
}

// rotateLeft rotates the tree rooted at node x.
Expand Down

0 comments on commit 1c00354

Please sign in to comment.