Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify pipestream by correctly interrupting a read on context cancellation. #497

Merged
merged 1 commit into from
Apr 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/mtail/read_pipe_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestReadFromPipe(t *testing.T) {

testutil.FatalIfErr(t, unix.Mkfifo(logFile, 0600))

// TODO: race if this openfile happens after teststartserver.
f, err := os.OpenFile(logFile, os.O_RDWR|syscall.O_NONBLOCK, 0600)
testutil.FatalIfErr(t, err)
defer func() {
Expand All @@ -40,7 +41,7 @@ func TestReadFromPipe(t *testing.T) {
lineCountCheck := m.ExpectExpvarDeltaWithDeadline("lines_total", 3)

testutil.WriteString(t, f, "1\n2\n3\n")
m.PollWatched(1)
m.PollWatched(0)

lineCountCheck()
}
37 changes: 37 additions & 0 deletions internal/tailer/logstream/cancel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package logstream

import (
"context"
"io"
"os"
"strings"
"time"

"github.com/golang/glog"
)

type ReadDeadliner interface {
SetReadDeadline(t time.Time) error
}

func SetReadDeadlineOnDone(ctx context.Context, d ReadDeadliner) {
go func() {
<-ctx.Done()
glog.Info("cancelled, setting read deadline to interrupt read")
d.SetReadDeadline(time.Now())
}()
}

func IsEndOrCancel(err error) bool {
if err == io.EOF {
return true
}
if os.IsTimeout(err) {
return true
}
// https://github.com/golang/go/issues/4373
if strings.Contains(err.Error(), "use of closed network connection") {
return true
}
return false
}
70 changes: 13 additions & 57 deletions internal/tailer/logstream/pipestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ package logstream
import (
"bytes"
"context"
"errors"
"io"
"os"
"sync"
"syscall"
Expand Down Expand Up @@ -51,6 +49,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
return err
}
glog.V(2).Infof("opened new pipe %v", fd)

var total int
wg.Add(1)
go func() {
Expand All @@ -68,16 +67,14 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
ps.completed = true
ps.mu.Unlock()
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
SetReadDeadlineOnDone(ctx, fd)

b := make([]byte, 0, defaultReadBufferSize)
capB := cap(b)
partial := bytes.NewBufferString("")
var timedout bool
for {
// Set idle timeout
if err := fd.SetReadDeadline(time.Now().Add(defaultReadTimeout)); err != nil {
logErrors.Add(ps.pathname, 1)
glog.V(2).Infof("%s: %s", ps.pathname, err)
}
n, err := fd.Read(b[:capB])
glog.V(2).Infof("%v: read %d bytes, err is %v", fd, n, err)

Expand All @@ -90,66 +87,25 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
ps.mu.Unlock()
}

var perr *os.PathError
if errors.As(err, &perr) && perr.Timeout() && n == 0 {
glog.V(2).Info("timed out")
timedout = true
// Named Pipes EOF only when the writer has closed, so we look
// for a timeout on read to detect a writer stall and thus let
// us check below for cancellation.
goto Sleep
}
// Per pipe(7): If all file descriptors referring to the write end
// of a pipe have been closed, then an attempt to read(2) from the
// pipe will see end-of-file (read(2) will return 0).
// All other errors also finish the stream and are counted.
// However when the pipe is freshly opened
if err != nil {
if err != io.EOF {
glog.Info(err)
logErrors.Add(ps.pathname, 1)
if err != nil && IsEndOrCancel(err) {
glog.V(2).Infof("%v: %s: exiting", fd, err)
if partial.Len() > 0 {
sendLine(ctx, ps.pathname, partial, ps.lines)
}
glog.V(2).Infof("%v: stream has errored, exiting", fd)
ps.mu.Lock()
ps.completed = true
ps.mu.Unlock()
return
}

// No error implies there's more to read, unless it looks like
// context is Done.
if err == nil && ctx.Err() == nil {
continue
}

Sleep:
// If we've stalled or it looks like the context is done, then test to see if it's time to exit.
if timedout || ctx.Err() != nil {
timedout = false
// Test to see if it's time to exit.
select {
case <-ctx.Done():
glog.V(2).Infof("%v: context has been cancelled, exiting", fd)
if partial.Len() > 0 {
sendLine(ctx, ps.pathname, partial, ps.lines)
}
ps.mu.Lock()
ps.completed = true
ps.mu.Unlock()
return
default:
// keep going
}
}
// Yield and wait
glog.V(2).Infof("%v: waiting", fd)
select {
case <-ctx.Done():
// Same for cancellation; this makes tests stable, but
// could argue exiting immediately is less surprising.
// Assumption is that this doesn't make a difference in
// production.
glog.V(2).Infof("%v: Cancelled after next read", fd)
// Exit immediately; cancelled context is going to cause the
// next read to be interrupted and exit, so don't bother going
// around the loop again.
return
case <-waker.Wake():
// sleep until next Wake()
glog.V(2).Infof("%v: Wake received", fd)
Expand Down
10 changes: 4 additions & 6 deletions internal/tailer/logstream/pipestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ func TestPipeStreamReadCompletedBecauseClosed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
waker := waker.NewTestAlways()

// TODO(#486): Open the file WRONLY after the logstream starts.
f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe)
ps, err := logstream.New(ctx, &wg, waker, name, lines, false)
testutil.FatalIfErr(t, err)

ps, err := logstream.New(ctx, &wg, waker, name, lines, false)
f, err := os.OpenFile(name, os.O_WRONLY, os.ModeNamedPipe)
testutil.FatalIfErr(t, err)

testutil.WriteString(t, f, "1\n")
Expand Down Expand Up @@ -70,11 +69,10 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
waker := waker.NewTestAlways()

// TODO(#486): Open the file WRONLY after the logstream starts.
f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe)
ps, err := logstream.New(ctx, &wg, waker, name, lines, false)
testutil.FatalIfErr(t, err)

ps, err := logstream.New(ctx, &wg, waker, name, lines, false)
f, err := os.OpenFile(name, os.O_WRONLY, os.ModeNamedPipe)
testutil.FatalIfErr(t, err)

testutil.WriteString(t, f, "1\n")
Expand Down