Skip to content

Commit

Permalink
profiler: start collecting profiles immediately (#1417)
Browse files Browse the repository at this point in the history
Previously, it would take two minutes for the first profile to be
uploaded. One minute for the internal time.Ticker to fire, and then an
additional minute to collect the profiles for the first time. This means
we wouldn't get profiles for the first minute of activity.

Two tests had to be changed due to this:

* TestStartStopIdempotency became significantly slower, because it would
  start 5000 profiles, and stopping each one involved waiting 200 ms
  for the CPU profile to stop. Changed to just collect the heap profile
  and to do fewer iterations.
* TestProfilerInternal broke because it expected one round of profiling
  to happen after the "ticker" fired, but now one happens sooner so you
  see multiple profiles. It also broke because the test was supposed to
  close p.exit, not send to it. In general, the test is too tied to the
  specific implementation. TestAllUploaded covers all the same behavior
  and more, and didn't get broken by this implementation change. So I
  opted to just delete TestProfilerInternal.
  • Loading branch information
nsrip-dd authored Aug 8, 2022
1 parent e55bc20 commit 03a3099
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 90 deletions.
68 changes: 34 additions & 34 deletions profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,42 +289,42 @@ func (p *profiler) collect(ticker <-chan time.Time) {
wg sync.WaitGroup
)
for {
now := now()
bat := batch{
seq: p.seq,
host: p.cfg.hostname,
start: now,
// NB: while this is technically wrong in that it does not
// record the actual start and end timestamps for the batch,
// it is how the backend understands the client-side
// configured CPU profile duration: (start-end).
end: now.Add(p.cfg.cpuDuration),
}
p.seq++

completed = completed[:0]
for _, t := range p.enabledProfileTypes() {
wg.Add(1)
go func(t ProfileType) {
defer wg.Done()
profs, err := p.runProfile(t)
if err != nil {
log.Error("Error getting %s profile: %v; skipping.", t, err)
tags := append(p.cfg.tags.Slice(), t.Tag())
p.cfg.statsd.Count("datadog.profiling.go.collect_error", 1, tags, 1)
}
mu.Lock()
defer mu.Unlock()
completed = append(completed, profs...)
}(t)
}
wg.Wait()
for _, prof := range completed {
bat.addProfile(prof)
}
p.enqueueUpload(bat)
select {
case <-ticker:
now := now()
bat := batch{
seq: p.seq,
host: p.cfg.hostname,
start: now,
// NB: while this is technically wrong in that it does not
// record the actual start and end timestamps for the batch,
// it is how the backend understands the client-side
// configured CPU profile duration: (start-end).
end: now.Add(p.cfg.cpuDuration),
}
p.seq++

completed = completed[:0]
for _, t := range p.enabledProfileTypes() {
wg.Add(1)
go func(t ProfileType) {
defer wg.Done()
profs, err := p.runProfile(t)
if err != nil {
log.Error("Error getting %s profile: %v; skipping.", t, err)
tags := append(p.cfg.tags.Slice(), t.Tag())
p.cfg.statsd.Count("datadog.profiling.go.collect_error", 1, tags, 1)
}
mu.Lock()
defer mu.Unlock()
completed = append(completed, profs...)
}(t)
}
wg.Wait()
for _, prof := range completed {
bat.addProfile(prof)
}
p.enqueueUpload(bat)
case <-p.exit:
return
}
Expand Down
91 changes: 35 additions & 56 deletions profiler/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -167,17 +166,20 @@ func TestStartStopIdempotency(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
for j := 0; j < 20; j++ {
// startup logging makes this test very slow
Start(WithLogStartup(false))
//
// Also, the CPU profile is really slow to stop (200ms/iter)
// so we just do the heap profile
Start(WithLogStartup(false), WithProfileTypes(HeapProfile))
}
}()
}
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
for j := 0; j < 20; j++ {
Stop()
}
}()
Expand Down Expand Up @@ -241,58 +243,6 @@ func TestStopLatency(t *testing.T) {
}
}

func TestProfilerInternal(t *testing.T) {
t.Run("collect", func(t *testing.T) {
p, err := unstartedProfiler(
WithPeriod(1*time.Millisecond),
CPUDuration(1*time.Millisecond),
WithProfileTypes(HeapProfile, CPUProfile),
)
require.NoError(t, err)
var startCPU, stopCPU, writeHeap uint64
p.testHooks.startCPUProfile = func(_ io.Writer) error {
atomic.AddUint64(&startCPU, 1)
return nil
}
p.testHooks.stopCPUProfile = func() { atomic.AddUint64(&stopCPU, 1) }
p.testHooks.lookupProfile = func(name string, w io.Writer, _ int) error {
if name == "heap" {
atomic.AddUint64(&writeHeap, 1)
}
_, err := w.Write(textProfile{Text: "main 5\n"}.Protobuf())
return err
}

tick := make(chan time.Time)
wait := make(chan struct{})

go func() {
p.collect(tick)
close(wait)
}()

tick <- time.Now()

var bat batch
select {
case bat = <-p.out:
case <-time.After(200 * time.Millisecond):
t.Fatalf("missing batch")
}

assert := assert.New(t)
assert.EqualValues(1, writeHeap)
assert.EqualValues(1, startCPU)
assert.EqualValues(1, stopCPU)

// should contain cpu.pprof, metrics.json, delta-heap.pprof
assert.Equal(3, len(bat.profiles))

p.exit <- struct{}{}
<-wait
})
}

func TestSetProfileFraction(t *testing.T) {
t.Run("on", func(t *testing.T) {
start := runtime.SetMutexProfileFraction(0)
Expand Down Expand Up @@ -568,3 +518,32 @@ func TestTelemetryEnabled(t *testing.T) {
check("profile_period", time.Duration(10*time.Millisecond).String())
check("cpu_duration", time.Duration(1*time.Millisecond).String())
}

func TestImmediateProfile(t *testing.T) {
received := make(chan struct{}, 1)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case received <- struct{}{}:
default:
}
}))
defer server.Close()

err := Start(
WithAgentAddr(server.Listener.Addr().String()),
WithProfileTypes(HeapProfile),
WithPeriod(3*time.Second),
)
require.NoError(t, err)
defer Stop()

// Wait a little less than 2 profile periods. We should start profiling
// immediately. If it takes significantly longer than 1 profile period to get
// a profile, consider the test failed
timeout := time.After(5 * time.Second)
select {
case <-timeout:
t.Fatal("should have received a profile already")
case <-received:
}
}

0 comments on commit 03a3099

Please sign in to comment.