From 8dd02e80f9e42eebb59fee10c24c7cc686f9e481 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Fri, 2 Apr 2021 19:44:57 +1100 Subject: [PATCH] Simplify pipestream by correctly interrupting a read on context cancellation. https://github.com/golang/go/issues/20280#issuecomment-655588450 has an excellent example on how to interrupt a read on context cancellation, which means we don't need to set a read deadline on every read attempt. Then, the error handling is simplified, and as a side effect we aren't having spurious wakeups on idle fifos. This also means we no longer have a race on first read when the write end of a fifo isn't ready. Issue: #486 --- internal/mtail/read_pipe_integration_test.go | 3 +- internal/tailer/logstream/cancel.go | 37 +++++++++++ internal/tailer/logstream/pipestream.go | 70 ++++---------------- internal/tailer/logstream/pipestream_test.go | 10 ++- 4 files changed, 56 insertions(+), 64 deletions(-) create mode 100644 internal/tailer/logstream/cancel.go diff --git a/internal/mtail/read_pipe_integration_test.go b/internal/mtail/read_pipe_integration_test.go index 345a3907d..84a4fec24 100644 --- a/internal/mtail/read_pipe_integration_test.go +++ b/internal/mtail/read_pipe_integration_test.go @@ -28,6 +28,7 @@ func TestReadFromPipe(t *testing.T) { testutil.FatalIfErr(t, unix.Mkfifo(logFile, 0600)) + // TODO: race if this openfile happens after teststartserver. f, err := os.OpenFile(logFile, os.O_RDWR|syscall.O_NONBLOCK, 0600) testutil.FatalIfErr(t, err) defer func() { @@ -40,7 +41,7 @@ func TestReadFromPipe(t *testing.T) { lineCountCheck := m.ExpectExpvarDeltaWithDeadline("lines_total", 3) testutil.WriteString(t, f, "1\n2\n3\n") - m.PollWatched(1) + m.PollWatched(0) lineCountCheck() } diff --git a/internal/tailer/logstream/cancel.go b/internal/tailer/logstream/cancel.go new file mode 100644 index 000000000..ca397b629 --- /dev/null +++ b/internal/tailer/logstream/cancel.go @@ -0,0 +1,37 @@ +package logstream + +import ( + "context" + "io" + "os" + "strings" + "time" + + "github.com/golang/glog" +) + +type ReadDeadliner interface { + SetReadDeadline(t time.Time) error +} + +func SetReadDeadlineOnDone(ctx context.Context, d ReadDeadliner) { + go func() { + <-ctx.Done() + glog.Info("cancelled, setting read deadline to interrupt read") + d.SetReadDeadline(time.Now()) + }() +} + +func IsEndOrCancel(err error) bool { + if err == io.EOF { + return true + } + if os.IsTimeout(err) { + return true + } + // https://github.com/golang/go/issues/4373 + if strings.Contains(err.Error(), "use of closed network connection") { + return true + } + return false +} diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index eaeaaa98d..afa944280 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -6,8 +6,6 @@ package logstream import ( "bytes" "context" - "errors" - "io" "os" "sync" "syscall" @@ -51,6 +49,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake return err } glog.V(2).Infof("opened new pipe %v", fd) + var total int wg.Add(1) go func() { @@ -68,16 +67,14 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake ps.completed = true ps.mu.Unlock() }() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + SetReadDeadlineOnDone(ctx, fd) + b := make([]byte, 0, defaultReadBufferSize) capB := cap(b) partial := bytes.NewBufferString("") - var timedout bool for { - // Set idle timeout - if err := fd.SetReadDeadline(time.Now().Add(defaultReadTimeout)); err != nil { - logErrors.Add(ps.pathname, 1) - glog.V(2).Infof("%s: %s", ps.pathname, err) - } n, err := fd.Read(b[:capB]) glog.V(2).Infof("%v: read %d bytes, err is %v", fd, n, err) @@ -90,66 +87,25 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake ps.mu.Unlock() } - var perr *os.PathError - if errors.As(err, &perr) && perr.Timeout() && n == 0 { - glog.V(2).Info("timed out") - timedout = true - // Named Pipes EOF only when the writer has closed, so we look - // for a timeout on read to detect a writer stall and thus let - // us check below for cancellation. - goto Sleep - } - // Per pipe(7): If all file descriptors referring to the write end - // of a pipe have been closed, then an attempt to read(2) from the - // pipe will see end-of-file (read(2) will return 0). - // All other errors also finish the stream and are counted. - // However when the pipe is freshly opened - if err != nil { - if err != io.EOF { - glog.Info(err) - logErrors.Add(ps.pathname, 1) + if err != nil && IsEndOrCancel(err) { + glog.V(2).Infof("%v: %s: exiting", fd, err) + if partial.Len() > 0 { + sendLine(ctx, ps.pathname, partial, ps.lines) } - glog.V(2).Infof("%v: stream has errored, exiting", fd) ps.mu.Lock() ps.completed = true ps.mu.Unlock() return } - // No error implies there's more to read, unless it looks like - // context is Done. - if err == nil && ctx.Err() == nil { - continue - } - - Sleep: - // If we've stalled or it looks like the context is done, then test to see if it's time to exit. - if timedout || ctx.Err() != nil { - timedout = false - // Test to see if it's time to exit. - select { - case <-ctx.Done(): - glog.V(2).Infof("%v: context has been cancelled, exiting", fd) - if partial.Len() > 0 { - sendLine(ctx, ps.pathname, partial, ps.lines) - } - ps.mu.Lock() - ps.completed = true - ps.mu.Unlock() - return - default: - // keep going - } - } // Yield and wait glog.V(2).Infof("%v: waiting", fd) select { case <-ctx.Done(): - // Same for cancellation; this makes tests stable, but - // could argue exiting immediately is less surprising. - // Assumption is that this doesn't make a difference in - // production. - glog.V(2).Infof("%v: Cancelled after next read", fd) + // Exit immediately; cancelled context is going to cause the + // next read to be interrupted and exit, so don't bother going + // around the loop again. + return case <-waker.Wake(): // sleep until next Wake() glog.V(2).Infof("%v: Wake received", fd) diff --git a/internal/tailer/logstream/pipestream_test.go b/internal/tailer/logstream/pipestream_test.go index 690adc89e..bdfbbaba1 100644 --- a/internal/tailer/logstream/pipestream_test.go +++ b/internal/tailer/logstream/pipestream_test.go @@ -29,11 +29,10 @@ func TestPipeStreamReadCompletedBecauseClosed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) waker := waker.NewTestAlways() - // TODO(#486): Open the file WRONLY after the logstream starts. - f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe) + ps, err := logstream.New(ctx, &wg, waker, name, lines, false) testutil.FatalIfErr(t, err) - ps, err := logstream.New(ctx, &wg, waker, name, lines, false) + f, err := os.OpenFile(name, os.O_WRONLY, os.ModeNamedPipe) testutil.FatalIfErr(t, err) testutil.WriteString(t, f, "1\n") @@ -70,11 +69,10 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) waker := waker.NewTestAlways() - // TODO(#486): Open the file WRONLY after the logstream starts. - f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe) + ps, err := logstream.New(ctx, &wg, waker, name, lines, false) testutil.FatalIfErr(t, err) - ps, err := logstream.New(ctx, &wg, waker, name, lines, false) + f, err := os.OpenFile(name, os.O_WRONLY, os.ModeNamedPipe) testutil.FatalIfErr(t, err) testutil.WriteString(t, f, "1\n")