Skip to content
This repository has been archived by the owner on Oct 13, 2023. It is now read-only.

Commit

Permalink
Fix log readers can block writes indefinitely
Browse files Browse the repository at this point in the history
Before this patch, a log reader is able to block all log writes
indefinitely (and other operations) by simply opening the log stream and
not consuming all the messages.

The reason for this is we protect the read stream from corruption by
ensuring there are no new writes while the log stream is consumed (and
caught up with the live entries).

We can get around this issue because log files are append only, so we
can limit reads to only the section of the file that was written to when
the log stream was first requested.

Now logs are only blocked until all files are opened, rather than
streamed to the client.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
(cherry picked from commit e220918)

Conflicts:
components/engine/daemon/logger/jsonfilelog/read.go
Signed-off-by: Andrew Hsu <andrewhsu@docker.com>
  • Loading branch information
cpuguy83 authored and andrewhsu committed Jul 6, 2017
1 parent 02c1d87 commit 7bd7bde
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 33 deletions.
55 changes: 35 additions & 20 deletions components/engine/daemon/logger/jsonfilelog/jsonfilelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"strconv"
"sync"

Expand All @@ -15,19 +16,21 @@ import (
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/go-units"
"github.com/pkg/errors"
)

// Name is the name of the file that the jsonlogger logs to.
const Name = "json-file"

// JSONFileLogger is Logger implementation for default Docker logging.
type JSONFileLogger struct {
buf *bytes.Buffer
extra []byte // json-encoded extra attributes

mu sync.RWMutex
buf *bytes.Buffer // avoids allocating a new buffer on each call to `Log()`
closed bool
writer *loggerutils.RotateFileWriter
mu sync.Mutex
readers map[*logger.LogWatcher]struct{} // stores the active log followers
extra []byte // json-encoded extra attributes
closed bool
}

func init() {
Expand Down Expand Up @@ -90,33 +93,45 @@ func New(info logger.Info) (logger.Logger, error) {

// Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
func (l *JSONFileLogger) Log(msg *logger.Message) error {
l.mu.Lock()
err := writeMessageBuf(l.writer, msg, l.extra, l.buf)
l.buf.Reset()
l.mu.Unlock()
return err
}

func writeMessageBuf(w io.Writer, m *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
if err := marshalMessage(m, extra, buf); err != nil {
logger.PutMessage(m)
return err
}
logger.PutMessage(m)
if _, err := w.Write(buf.Bytes()); err != nil {
return errors.Wrap(err, "error writing log entry")
}
return nil
}

func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
timestamp, err := jsonlog.FastTimeMarshalJSON(msg.Timestamp)
if err != nil {
return err
}
l.mu.Lock()
logline := msg.Line
logLine := msg.Line
if !msg.Partial {
logline = append(msg.Line, '\n')
logLine = append(msg.Line, '\n')
}
err = (&jsonlog.JSONLogs{
Log: logline,
Log: logLine,
Stream: msg.Source,
Created: timestamp,
RawAttrs: l.extra,
}).MarshalJSONBuf(l.buf)
logger.PutMessage(msg)
RawAttrs: extra,
}).MarshalJSONBuf(buf)
if err != nil {
l.mu.Unlock()
return err
return errors.Wrap(err, "error writing log message to buffer")
}

l.buf.WriteByte('\n')
_, err = l.writer.Write(l.buf.Bytes())
l.buf.Reset()
l.mu.Unlock()

return err
err = buf.WriteByte('\n')
return errors.Wrap(err, "error finalizing log buffer")
}

// ValidateLogOpt looks for json specific log options max-file & max-size.
Expand Down
43 changes: 30 additions & 13 deletions components/engine/daemon/logger/jsonfilelog/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package jsonfilelog
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand All @@ -18,6 +17,7 @@ import (
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/tailfile"
"github.com/pkg/errors"
)

const maxJSONDecodeRetry = 20000
Expand Down Expand Up @@ -48,34 +48,46 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
defer close(logWatcher.Msg)

// lock so the read stream doesn't get corrupted due to rotations or other log data written while we read
// lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files
// This will block writes!!!
l.mu.Lock()
l.mu.RLock()

// TODO it would be nice to move a lot of this reader implementation to the rotate logger object
pth := l.writer.LogPath()
var files []io.ReadSeeker
for i := l.writer.MaxFiles(); i > 1; i-- {
f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
if err != nil {
if !os.IsNotExist(err) {
logWatcher.Err <- err
break
l.mu.RUnlock()
return
}
continue
}
defer f.Close()

files = append(files, f)
}

latestFile, err := os.Open(pth)
if err != nil {
logWatcher.Err <- err
l.mu.Unlock()
logWatcher.Err <- errors.Wrap(err, "error opening latest log file")
l.mu.RUnlock()
return
}
defer latestFile.Close()

latestChunk, err := newSectionReader(latestFile)

// Now we have the reader sectioned, all fd's opened, we can unlock.
// New writes/rotates will not affect seeking through these files
l.mu.RUnlock()

if err != nil {
logWatcher.Err <- err
return
}

if config.Tail != 0 {
tailer := ioutils.MultiReadSeeker(append(files, latestFile)...)
tailFile(tailer, logWatcher, config.Tail, config.Since)
Expand All @@ -89,19 +101,14 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
}

if !config.Follow || l.closed {
l.mu.Unlock()
return
}

if config.Tail >= 0 {
latestFile.Seek(0, os.SEEK_END)
}

notifyRotate := l.writer.NotifyRotate()
defer l.writer.NotifyRotateEvict(notifyRotate)

l.mu.Lock()
l.readers[logWatcher] = struct{}{}

l.mu.Unlock()

followLogs(latestFile, logWatcher, notifyRotate, config.Since)
Expand All @@ -111,6 +118,16 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
l.mu.Unlock()
}

func newSectionReader(f *os.File) (*io.SectionReader, error) {
// seek to the end to get the size
// we'll leave this at the end of the file since section reader does not advance the reader
size, err := f.Seek(0, os.SEEK_END)
if err != nil {
return nil, errors.Wrap(err, "error getting current file size")
}
return io.NewSectionReader(f, 0, size), nil
}

func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
var rdr io.Reader
rdr = f
Expand Down

0 comments on commit 7bd7bde

Please sign in to comment.