Skip to content

Commit

Permalink
Merge pull request docker-archive#98 from andrewhsu/fix-log-readers
Browse files Browse the repository at this point in the history
[17.06] Fix log readers can block writes indefinitely
  • Loading branch information
andrewhsu authored Jul 12, 2017
2 parents 6fa91f3 + 27813df commit 849fcd5
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 34 deletions.
55 changes: 35 additions & 20 deletions daemon/logger/jsonfilelog/jsonfilelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"strconv"
"sync"

Expand All @@ -15,19 +16,21 @@ import (
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/go-units"
"github.com/pkg/errors"
)

// Name is the name of the file that the jsonlogger logs to.
const Name = "json-file"

// JSONFileLogger is Logger implementation for default Docker logging.
type JSONFileLogger struct {
buf *bytes.Buffer
extra []byte // json-encoded extra attributes

mu sync.RWMutex
buf *bytes.Buffer // avoids allocating a new buffer on each call to `Log()`
closed bool
writer *loggerutils.RotateFileWriter
mu sync.Mutex
readers map[*logger.LogWatcher]struct{} // stores the active log followers
extra []byte // json-encoded extra attributes
closed bool
}

func init() {
Expand Down Expand Up @@ -90,33 +93,45 @@ func New(info logger.Info) (logger.Logger, error) {

// Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
func (l *JSONFileLogger) Log(msg *logger.Message) error {
l.mu.Lock()
err := writeMessageBuf(l.writer, msg, l.extra, l.buf)
l.buf.Reset()
l.mu.Unlock()
return err
}

func writeMessageBuf(w io.Writer, m *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
if err := marshalMessage(m, extra, buf); err != nil {
logger.PutMessage(m)
return err
}
logger.PutMessage(m)
if _, err := w.Write(buf.Bytes()); err != nil {
return errors.Wrap(err, "error writing log entry")
}
return nil
}

func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
timestamp, err := jsonlog.FastTimeMarshalJSON(msg.Timestamp)
if err != nil {
return err
}
l.mu.Lock()
logline := msg.Line
logLine := msg.Line
if !msg.Partial {
logline = append(msg.Line, '\n')
logLine = append(msg.Line, '\n')
}
err = (&jsonlog.JSONLogs{
Log: logline,
Log: logLine,
Stream: msg.Source,
Created: timestamp,
RawAttrs: l.extra,
}).MarshalJSONBuf(l.buf)
logger.PutMessage(msg)
RawAttrs: extra,
}).MarshalJSONBuf(buf)
if err != nil {
l.mu.Unlock()
return err
return errors.Wrap(err, "error writing log message to buffer")
}

l.buf.WriteByte('\n')
_, err = l.writer.Write(l.buf.Bytes())
l.buf.Reset()
l.mu.Unlock()

return err
err = buf.WriteByte('\n')
return errors.Wrap(err, "error finalizing log buffer")
}

// ValidateLogOpt looks for json specific log options max-file & max-size.
Expand Down
45 changes: 31 additions & 14 deletions daemon/logger/jsonfilelog/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package jsonfilelog
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand All @@ -18,6 +17,7 @@ import (
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/tailfile"
"github.com/pkg/errors"
)

const maxJSONDecodeRetry = 20000
Expand Down Expand Up @@ -48,36 +48,48 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
defer close(logWatcher.Msg)

// lock so the read stream doesn't get corrupted due to rotations or other log data written while we read
// lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files
// This will block writes!!!
l.mu.Lock()
l.mu.RLock()

// TODO it would be nice to move a lot of this reader implementation to the rotate logger object
pth := l.writer.LogPath()
var files []io.ReadSeeker
for i := l.writer.MaxFiles(); i > 1; i-- {
f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
if err != nil {
if !os.IsNotExist(err) {
logWatcher.Err <- err
break
l.mu.RUnlock()
return
}
continue
}
defer f.Close()

files = append(files, f)
}

latestFile, err := os.Open(pth)
if err != nil {
logWatcher.Err <- err
l.mu.Unlock()
logWatcher.Err <- errors.Wrap(err, "error opening latest log file")
l.mu.RUnlock()
return
}
defer latestFile.Close()

latestChunk, err := newSectionReader(latestFile)

// Now we have the reader sectioned, all fd's opened, we can unlock.
// New writes/rotates will not affect seeking through these files
l.mu.RUnlock()

if err != nil {
logWatcher.Err <- err
return
}

if config.Tail != 0 {
tailer := ioutils.MultiReadSeeker(append(files, latestFile)...)
tailer := ioutils.MultiReadSeeker(append(files, latestChunk)...)
tailFile(tailer, logWatcher, config.Tail, config.Since)
}

Expand All @@ -89,19 +101,14 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
}

if !config.Follow || l.closed {
l.mu.Unlock()
return
}

if config.Tail >= 0 {
latestFile.Seek(0, os.SEEK_END)
}

notifyRotate := l.writer.NotifyRotate()
defer l.writer.NotifyRotateEvict(notifyRotate)

l.mu.Lock()
l.readers[logWatcher] = struct{}{}

l.mu.Unlock()

followLogs(latestFile, logWatcher, notifyRotate, config.Since)
Expand All @@ -111,6 +118,16 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
l.mu.Unlock()
}

func newSectionReader(f *os.File) (*io.SectionReader, error) {
// seek to the end to get the size
// we'll leave this at the end of the file since section reader does not advance the reader
size, err := f.Seek(0, os.SEEK_END)
if err != nil {
return nil, errors.Wrap(err, "error getting current file size")
}
return io.NewSectionReader(f, 0, size), nil
}

func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
var rdr io.Reader
rdr = f
Expand Down

0 comments on commit 849fcd5

Please sign in to comment.