From 814bf88cf225cd422a50435865fb5b9f55b7e59e Mon Sep 17 00:00:00 2001 From: Nicolas Hillegeer Date: Thu, 22 Feb 2024 13:03:07 -0800 Subject: [PATCH] trace: make trace byte receiving synchronous This avoids the asynchrony related to the pipe+buffer+goroutine. Using this approach, we can guarantee that once the `Write` call completes, the data is stored in flightrecorder buffers. This is related to https://go.dev/cl/562616. There is a behavioral change: previously, in case of error the recorder goroutine would just stop, which would essentially block the tracing infra by not accepting the `Write`. Now, we actively stop tracing on any error. I did not do real performance tests, but it can't be very different: Before: running go [test . -count 10] ok golang.org/x/exp/trace 60.423s After: running go [test . -count 10] ok golang.org/x/exp/trace 60.394s Change-Id: Ie900fec2b45f1c227c82e68f4f7af1902de3582b Reviewed-on: https://go-review.googlesource.com/c/exp/+/566255 Reviewed-by: Michael Knyszek Auto-Submit: Michael Knyszek LUCI-TryBot-Result: Go LUCI --- trace/flightrecorder.go | 227 ++++++++++++++++++++-------------------- 1 file changed, 115 insertions(+), 112 deletions(-) diff --git a/trace/flightrecorder.go b/trace/flightrecorder.go index e189b8052..95ae8c627 100644 --- a/trace/flightrecorder.go +++ b/trace/flightrecorder.go @@ -7,8 +7,9 @@ package trace import ( - "bufio" + "bytes" "encoding/binary" + "errors" "fmt" "io" "math/bits" @@ -28,13 +29,9 @@ import ( // // Only one flight recording may be active at any given time. type FlightRecorder struct { - // State for coordinating with the recorder goroutine. - fromTracer *io.PipeReader - toRecorder *io.PipeWriter - recorderWait sync.WaitGroup - err error + err error - // State specific to the recorder goroutine. + // State specific to the recorder. header [16]byte active rawGeneration ringMu sync.Mutex @@ -100,6 +97,114 @@ func (r *FlightRecorder) SetSize(bytes int) { r.targetSize = bytes } +// A recorder receives bytes from the runtime tracer, processes it. +type recorder struct { + r *FlightRecorder + + headerReceived bool +} + +func (w *recorder) Write(p []byte) (n int, err error) { + r := w.r + + defer func() { + if err != nil { + // Propagate errors to the flightrecorder. + if r.err == nil { + r.err = err + } + trace.Stop() // Stop the tracer, preventing further writes. + } + }() + + rd := bytes.NewReader(p) + + if !w.headerReceived { + if len(p) < len(r.header) { + return 0, fmt.Errorf("expected at least %d bytes in the first write", len(r.header)) + } + rd.Read(r.header[:]) + w.headerReceived = true + } + + b, gen, err := readBatch(rd) // Every write from the runtime is guaranteed to be a complete batch. + if err == io.EOF { + if rd.Len() > 0 { + return len(p) - rd.Len(), errors.New("short read") + } + return len(p), nil + } + if err != nil { + return len(p) - rd.Len(), err + } + + // Check if we're entering a new generation. + if r.active.gen != 0 && r.active.gen+1 == gen { + r.ringMu.Lock() + + // Validate r.active.freq before we use it. It's required for a generation + // to not be considered broken, and without it, we can't correctly handle + // SetPeriod. + if r.active.freq == 0 { + return len(p) - rd.Len(), fmt.Errorf("broken trace: failed to find frequency event in generation %d", r.active.gen) + } + + // Get the current trace clock time. + now := traceTimeNow(r.active.freq) + + // Add the current generation to the ring. Make sure we always have at least one + // complete generation by putting the active generation onto the new list, regardless + // of whatever our settings are. + // + // N.B. Let's completely replace the ring here, so that WriteTo can just make a copy + // and not worry about aliasing. This creates allocations, but at a very low rate. + newRing := []rawGeneration{r.active} + size := r.active.size + for i := len(r.ring) - 1; i >= 0; i-- { + // Stop adding older generations if the new ring already exceeds the thresholds. + // This ensures we keep generations that cross a threshold, but not any that lie + // entirely outside it. + if size > r.wantSize || now.Sub(newRing[len(newRing)-1].minTraceTime()) > r.wantDur { + break + } + size += r.ring[i].size + newRing = append(newRing, r.ring[i]) + } + slices.Reverse(newRing) + r.ring = newRing + r.ringMu.Unlock() + + // Start a new active generation. + r.active = rawGeneration{} + } + + // Obtain the frequency if this is a frequency batch. + if b.isFreqBatch() { + freq, err := parseFreq(b) + if err != nil { + return len(p) - rd.Len(), err + } + r.active.freq = freq + } + + // Append the batch to the current generation. + if r.active.gen == 0 { + r.active.gen = gen + } + if r.active.minTime == 0 || r.active.minTime > b.time { + r.active.minTime = b.time + } + r.active.size += 1 + r.active.size += uvarintSize(gen) + r.active.size += uvarintSize(uint64(b.m)) + r.active.size += uvarintSize(uint64(b.time)) + r.active.size += uvarintSize(uint64(len(b.data))) + r.active.size += len(b.data) + r.active.batches = append(r.active.batches, b) + + return len(p) - rd.Len(), nil +} + // Start begins flight recording. Only one flight recorder or one call to [runtime/trace.Start] // may be active at any given time. Returns an error if starting the flight recorder would // violate this rule. @@ -111,106 +216,13 @@ func (r *FlightRecorder) Start() error { r.wantSize = r.targetSize r.wantDur = r.targetPeriod r.err = nil - r.fromTracer, r.toRecorder = io.Pipe() - // Start tracing, sending data to the recorder goroutine (not yet started) via an io.Pipe. - if err := trace.Start(r.toRecorder); err != nil { + // Start tracing, data is sent to a recorder which forwards it to our own + // storage. + if err := trace.Start(&recorder{r: r}); err != nil { return err } - // Start recorder goroutine. - r.recorderWait.Add(1) - go func() { - defer r.recorderWait.Done() - - // Read in the header so we can tack it on to the front - // of whatever WriteTo emits later. - _, err := io.ReadFull(r.fromTracer, r.header[:]) - if err != nil { - r.err = err - return - } - - // Process the rest of the trace. - rd := bufio.NewReader(r.fromTracer) - for { - b, gen, err := readBatch(rd) - if err == io.EOF || err == io.ErrClosedPipe { - break - } - if err != nil { - r.err = err - return - } - - // Check if we're entering a new generation. - if r.active.gen != 0 && r.active.gen+1 == gen { - r.ringMu.Lock() - - // Validate r.active.freq before we use it. It's required for a generation - // to not be considered broken, and without it, we can't correctly handle - // SetPeriod. - if r.active.freq == 0 { - r.err = fmt.Errorf("broken trace: failed to find frequency event in generation %d", r.active.gen) - return - } - - // Get the current trace clock time. - now := traceTimeNow(r.active.freq) - - // Add the current generation to the ring. Make sure we always have at least one - // complete generation by putting the active generation onto the new list, regardless - // of whatever our settings are. - // - // N.B. Let's completely replace the ring here, so that WriteTo can just make a copy - // and not worry about aliasing. This creates allocations, but at a very low rate. - newRing := []rawGeneration{r.active} - size := r.active.size - for i := len(r.ring) - 1; i >= 0; i-- { - // Stop adding older generations if the new ring already exceeds the thresholds. - // This ensures we keep generations that cross a threshold, but not any that lie - // entirely outside it. - if size > r.wantSize || now.Sub(newRing[len(newRing)-1].minTraceTime()) > r.wantDur { - break - } - size += r.ring[i].size - newRing = append(newRing, r.ring[i]) - } - slices.Reverse(newRing) - r.ring = newRing - r.ringMu.Unlock() - - // Start a new active generation. - r.active = rawGeneration{} - } - - // Obtain the frequency if this is a frequency batch. - if b.isFreqBatch() { - freq, err := parseFreq(b) - if err != nil { - r.err = err - return - } - r.active.freq = freq - } - - // Append the batch to the current generation. - if r.active.gen == 0 { - r.active.gen = gen - } - if r.active.minTime == 0 || r.active.minTime > b.time { - r.active.minTime = b.time - } - r.active.size += 1 - r.active.size += uvarintSize(gen) - r.active.size += uvarintSize(uint64(b.m)) - r.active.size += uvarintSize(uint64(b.time)) - r.active.size += uvarintSize(uint64(len(b.data))) - r.active.size += len(b.data) - r.active.batches = append(r.active.batches, b) - } - }() - r.enabled = true return nil } @@ -224,18 +236,9 @@ func (r *FlightRecorder) Stop() error { r.enabled = false trace.Stop() - // Close the write side of the pipe. This is safe because tracing has stopped, so no more will - // be written to the pipe. - r.fromTracer.Close() - - // Wait for the reader to exit. - r.recorderWait.Wait() - // Reset all state. No need to lock because the reader has already exited. r.active = rawGeneration{} r.ring = nil - r.toRecorder.Close() - r.fromTracer.Close() return r.err }