Skip to content

Commit

Permalink
Add support for log tags in newest CRI spec (elastic#8265) (elastic#8272
Browse files Browse the repository at this point in the history
)

* Add support for log tags in newest CRI spec

CRI spec has changed to include tags (to tell if the line is full or
partial):

```
2016-10-06T00:17:09.669794202Z stdout F The content of the log entry 1
2016-10-06T00:17:09.669794202Z stdout P First line of log entry 2
2016-10-06T00:17:09.669794202Z stdout P Second line of the log entry 2
2016-10-06T00:17:10.113242941Z stderr F Last line of the log entry 2
```

I still have to determine how this affects the older format, obviously
this change is breaking it, but to be seen if CRI is still supporting
it.

For more details you can check:
https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/kubelet-cri-logging.md

(cherry picked from commit c06e91b)
  • Loading branch information
exekias authored Sep 11, 2018
1 parent f2ad8ef commit 97c3477
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.4[Check the HEAD diff]

- Fixed a memory leak when harvesters are closed. {pull}7820[7820]
- Fixed a docker input error due to the offset update bug in partial log join.{pull}8177[8177]
- Update CRI format to support partial/full tags. {pull}8265[8265]

*Heartbeat*

Expand Down
6 changes: 6 additions & 0 deletions filebeat/docs/inputs/input-docker.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ Enable partial messages joining. Docker `json-file` driver splits log lines larg
end of line (`\n`) is present for common lines in the resulting file, while it's not the for the lines
that have been split. `combine_partial` joins them back together when enabled. It is enabled by default.

===== `cri.parse_flags`

Enable CRI flags parsing from the log file. CRI uses flags to signal a partial line, enabling this will
ensure partial lines are rejoined. It is disabled by default.


The following input configures {beatname_uc} to read the `stdout` stream from
all containers under the default Docker containers path:

Expand Down
5 changes: 4 additions & 1 deletion filebeat/input/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ type config struct {

// Partial configures the prospector to join partial lines
Partial bool `config:"combine_partials"`

// Enable CRI flags parsing (to be switched to default in 7.0)
CRIFlags bool `config:"cri.parse_flags"`
}

type containers struct {
IDs []string `config:"ids"`
Path string `config:"path"`

// Stream can be all,stdout or stderr
// Stream can be all, stdout or stderr
Stream string `config:"stream"`
}
4 changes: 4 additions & 0 deletions filebeat/input/docker/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func NewInput(
return nil, errors.Wrap(err, "update input config")
}

if err := cfg.SetBool("docker-json.cri_flags", -1, config.Partial); err != nil {
return nil, errors.Wrap(err, "update input config")
}

// Add stream to meta to ensure different state per stream
if config.Containers.Stream != "all" {
if context.Meta == nil {
Expand Down
7 changes: 3 additions & 4 deletions filebeat/input/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,9 @@ type config struct {

// Hidden on purpose, used by the docker input:
DockerJSON *struct {
Stream string `config:"stream"`

// TODO move this to true by default
Partial bool `config:"partial"`
Stream string `config:"stream"`
Partial bool `config:"partial"`
CRIFlags bool `config:"cri_flags"`
} `config:"docker-json"`
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {

if h.config.DockerJSON != nil {
// Docker json-file format, add custom parsing to the pipeline
r = docker_json.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial)
r = docker_json.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.CRIFlags)
}

if h.config.JSON != nil {
Expand Down
135 changes: 86 additions & 49 deletions filebeat/reader/docker_json/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,76 +37,118 @@ type Reader struct {

// join partial lines
partial bool
}

type dockerLog struct {
Timestamp string `json:"time"`
Log string `json:"log"`
Stream string `json:"stream"`
// parse CRI flags
criflags bool
}

type crioLog struct {
Timestamp time.Time
Stream string
Log []byte
type logLine struct {
Partial bool `json:"-"`
Timestamp time.Time `json:"-"`
Time string `json:"time"`
Stream string `json:"stream"`
Log string `json:"log"`
}

// New creates a new reader renaming a field
func New(r reader.Reader, stream string, partial bool) *Reader {
func New(r reader.Reader, stream string, partial bool, CRIFlags bool) *Reader {
return &Reader{
stream: stream,
partial: partial,
reader: r,
stream: stream,
partial: partial,
reader: r,
criflags: CRIFlags,
}
}

// parseCRILog parses logs in CRI log format.
// CRI log format example :
// 2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache
func parseCRILog(message reader.Message, msg *crioLog) (reader.Message, error) {
log := strings.SplitN(string(message.Content), " ", 3)
if len(log) < 3 {
return message, errors.New("invalid CRI log")
func (p *Reader) parseCRILog(message *reader.Message, msg *logLine) error {
split := 3
// read line tags if split is enabled:
if p.criflags {
split = 4
}
ts, err := time.Parse(time.RFC3339, log[0])

// current field
i := 0

// timestamp
log := strings.SplitN(string(message.Content), " ", split)
if len(log) < split {
return errors.New("invalid CRI log format")
}
ts, err := time.Parse(time.RFC3339, log[i])
if err != nil {
return message, errors.Wrap(err, "parsing CRI timestamp")
return errors.Wrap(err, "parsing CRI timestamp")
}
message.Ts = ts
i++

// stream
msg.Stream = log[i]
i++

// tags
partial := false
if p.criflags {
// currently only P(artial) or F(ull) are available
tags := strings.Split(log[i], ":")
for _, tag := range tags {
if tag == "P" {
partial = true
}
}
i++
}

msg.Timestamp = ts
msg.Stream = log[1]
msg.Log = []byte(log[2])
msg.Partial = partial
message.AddFields(common.MapStr{
"stream": msg.Stream,
})
message.Content = msg.Log
message.Ts = ts
// Remove ending \n for partial messages
message.Content = []byte(log[i])
if partial {
message.Content = bytes.TrimRightFunc(message.Content, func(r rune) bool {
return r == '\n' || r == '\r'
})
}

return message, nil
return nil
}

// parseReaderLog parses logs in Docker JSON log format.
// Docker JSON log format example:
// {"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}
func parseDockerJSONLog(message reader.Message, msg *dockerLog) (reader.Message, error) {
func (p *Reader) parseDockerJSONLog(message *reader.Message, msg *logLine) error {
dec := json.NewDecoder(bytes.NewReader(message.Content))

if err := dec.Decode(&msg); err != nil {
return message, errors.Wrap(err, "decoding docker JSON")
return errors.Wrap(err, "decoding docker JSON")
}

// Parse timestamp
ts, err := time.Parse(time.RFC3339, msg.Timestamp)
ts, err := time.Parse(time.RFC3339, msg.Time)
if err != nil {
return message, errors.Wrap(err, "parsing docker timestamp")
return errors.Wrap(err, "parsing docker timestamp")
}

message.AddFields(common.MapStr{
"stream": msg.Stream,
})
message.Content = []byte(msg.Log)
message.Ts = ts
msg.Partial = message.Content[len(message.Content)-1] != byte('\n')

return nil
}

func (p *Reader) parseLine(message *reader.Message, msg *logLine) error {
if strings.HasPrefix(string(message.Content), "{") {
return p.parseDockerJSONLog(message, msg)
}

return message, nil
return p.parseCRILog(message, msg)
}

// Next returns the next line.
Expand All @@ -117,32 +159,27 @@ func (p *Reader) Next() (reader.Message, error) {
return message, err
}

var dockerLine dockerLog
var crioLine crioLog
var logLine logLine
err = p.parseLine(&message, &logLine)
if err != nil {
return message, err
}

if strings.HasPrefix(string(message.Content), "{") {
message, err = parseDockerJSONLog(message, &dockerLine)
// Handle multiline messages, join partial lines
for p.partial && logLine.Partial {
next, err := p.reader.Next()
if err != nil {
return message, err
}
// Handle multiline messages, join lines that don't end with \n
for p.partial && message.Content[len(message.Content)-1] != byte('\n') {
next, err := p.reader.Next()
if err != nil {
return message, err
}
next, err = parseDockerJSONLog(next, &dockerLine)
if err != nil {
return message, err
}
message.Content = append(message.Content, next.Content...)
message.Bytes += next.Bytes
err = p.parseLine(&next, &logLine)
if err != nil {
return message, err
}
} else {
message, err = parseCRILog(message, &crioLine)
message.Content = append(message.Content, next.Content...)
message.Bytes += next.Bytes
}

if p.stream != "all" && p.stream != dockerLine.Stream && p.stream != crioLine.Stream {
if p.stream != "all" && p.stream != logLine.Stream {
continue
}

Expand Down
Loading

0 comments on commit 97c3477

Please sign in to comment.