Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

profiler: start collecting profiles immediately #1417

Merged
merged 4 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,42 +289,42 @@ func (p *profiler) collect(ticker <-chan time.Time) {
wg sync.WaitGroup
)
for {
now := now()
bat := batch{
seq: p.seq,
host: p.cfg.hostname,
start: now,
// NB: while this is technically wrong in that it does not
// record the actual start and end timestamps for the batch,
// it is how the backend understands the client-side
// configured CPU profile duration: (start-end).
end: now.Add(p.cfg.cpuDuration),
}
p.seq++

completed = completed[:0]
for _, t := range p.enabledProfileTypes() {
wg.Add(1)
go func(t ProfileType) {
defer wg.Done()
profs, err := p.runProfile(t)
if err != nil {
log.Error("Error getting %s profile: %v; skipping.", t, err)
tags := append(p.cfg.tags.Slice(), t.Tag())
p.cfg.statsd.Count("datadog.profiling.go.collect_error", 1, tags, 1)
}
mu.Lock()
defer mu.Unlock()
completed = append(completed, profs...)
}(t)
}
wg.Wait()
for _, prof := range completed {
bat.addProfile(prof)
}
p.enqueueUpload(bat)
select {
case <-ticker:
now := now()
bat := batch{
seq: p.seq,
host: p.cfg.hostname,
start: now,
// NB: while this is technically wrong in that it does not
// record the actual start and end timestamps for the batch,
// it is how the backend understands the client-side
// configured CPU profile duration: (start-end).
end: now.Add(p.cfg.cpuDuration),
}
p.seq++

completed = completed[:0]
for _, t := range p.enabledProfileTypes() {
wg.Add(1)
go func(t ProfileType) {
defer wg.Done()
profs, err := p.runProfile(t)
if err != nil {
log.Error("Error getting %s profile: %v; skipping.", t, err)
tags := append(p.cfg.tags.Slice(), t.Tag())
p.cfg.statsd.Count("datadog.profiling.go.collect_error", 1, tags, 1)
}
mu.Lock()
defer mu.Unlock()
completed = append(completed, profs...)
}(t)
}
wg.Wait()
for _, prof := range completed {
bat.addProfile(prof)
}
p.enqueueUpload(bat)
case <-p.exit:
return
}
Expand Down
91 changes: 35 additions & 56 deletions profiler/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -167,17 +166,20 @@ func TestStartStopIdempotency(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
for j := 0; j < 20; j++ {
// startup logging makes this test very slow
Start(WithLogStartup(false))
//
// Also, the CPU profile is really slow to stop (200ms/iter)
// so we just do the heap profile
Start(WithLogStartup(false), WithProfileTypes(HeapProfile))
}
}()
}
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
for j := 0; j < 20; j++ {
Stop()
}
}()
Expand Down Expand Up @@ -241,58 +243,6 @@ func TestStopLatency(t *testing.T) {
}
}

func TestProfilerInternal(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: We could "fix" this test by removing the tick <- time.Now() line and changing p.exit <- struct{}{} to close(p.exit). That would make the test pass. However I personally think this test should just be removed.

t.Run("collect", func(t *testing.T) {
p, err := unstartedProfiler(
WithPeriod(1*time.Millisecond),
CPUDuration(1*time.Millisecond),
WithProfileTypes(HeapProfile, CPUProfile),
)
require.NoError(t, err)
var startCPU, stopCPU, writeHeap uint64
p.testHooks.startCPUProfile = func(_ io.Writer) error {
atomic.AddUint64(&startCPU, 1)
return nil
}
p.testHooks.stopCPUProfile = func() { atomic.AddUint64(&stopCPU, 1) }
p.testHooks.lookupProfile = func(name string, w io.Writer, _ int) error {
if name == "heap" {
atomic.AddUint64(&writeHeap, 1)
}
_, err := w.Write(textProfile{Text: "main 5\n"}.Protobuf())
return err
}

tick := make(chan time.Time)
wait := make(chan struct{})

go func() {
p.collect(tick)
close(wait)
}()

tick <- time.Now()

var bat batch
select {
case bat = <-p.out:
case <-time.After(200 * time.Millisecond):
t.Fatalf("missing batch")
}

assert := assert.New(t)
assert.EqualValues(1, writeHeap)
assert.EqualValues(1, startCPU)
assert.EqualValues(1, stopCPU)

// should contain cpu.pprof, metrics.json, delta-heap.pprof
assert.Equal(3, len(bat.profiles))

p.exit <- struct{}{}
<-wait
})
}

func TestSetProfileFraction(t *testing.T) {
t.Run("on", func(t *testing.T) {
start := runtime.SetMutexProfileFraction(0)
Expand Down Expand Up @@ -568,3 +518,32 @@ func TestTelemetryEnabled(t *testing.T) {
check("profile_period", time.Duration(10*time.Millisecond).String())
check("cpu_duration", time.Duration(1*time.Millisecond).String())
}

func TestImmediateProfile(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I've come up with so far for testing this change. Time-based tests aren't my favorite but I think this should be okay & not flaky. I'm open to other ideas!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with this test. The alternative would be a test that uses an unstartedProfiler and calls the collect() method with a ticket that never fires and still expect a profile.

received := make(chan struct{}, 1)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case received <- struct{}{}:
default:
}
}))
defer server.Close()

err := Start(
WithAgentAddr(server.Listener.Addr().String()),
WithProfileTypes(HeapProfile),
WithPeriod(3*time.Second),
)
require.NoError(t, err)
defer Stop()

// Wait a little less than 2 profile periods. We should start profiling
// immediately. If it significantly longer than 1 profile period to get
// a profile, consider the test failed
timeout := time.After(5 * time.Second)
select {
case <-timeout:
t.Fatal("should have ")
case <-received:
}
}