Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 281f5b9

Browse files
author
Chodor Marek
committedMay 27, 2019
Skipping unparsable lines in docker input
1 parent f841521 commit 281f5b9

File tree

5 files changed

+38
-11
lines changed

5 files changed

+38
-11
lines changed
 

‎CHANGELOG.next.asciidoc

+1
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
100100
- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063]
101101
- Fix goroutine leak caused on initialization failures of log input. {pull}12125[12125]
102102
- Fix goroutine leak on non-explicit finalization of log input. {pull}12164[12164]
103+
- Skipping unparsable log entries from docker json reader {pull}12268[12268]
103104

104105
*Heartbeat*
105106

‎filebeat/input/log/harvester.go

+4
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,10 @@ func (h *Harvester) Run() error {
276276
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)
277277
case ErrInactive:
278278
logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)
279+
case reader.ErrLineUnparsable:
280+
logp.Info("Skipping unparsable line in file: %v", h.state.Source)
281+
//line unparsable, go to next line
282+
continue
279283
default:
280284
logp.Err("Read line error: %v; File: %v", err, h.state.Source)
281285
}

‎libbeat/reader/reader.go

+9
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,19 @@
1717

1818
package reader
1919

20+
import (
21+
"errors"
22+
)
23+
2024
// Reader is the interface that wraps the basic Next method for
2125
// getting a new message.
2226
// Next returns the message being read or and error. EOF is returned
2327
// if reader will not return any new message on subsequent calls.
2428
type Reader interface {
2529
Next() (Message, error)
2630
}
31+
32+
var (
33+
//ErrLineUnparsable is error thrown when Next() element from input is corrupted and can not be parsed
34+
ErrLineUnparsable = errors.New("line is unparsable")
35+
)

‎libbeat/reader/readjson/docker_json.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/pkg/errors"
2828

2929
"github.com/elastic/beats/libbeat/common"
30+
"github.com/elastic/beats/libbeat/logp"
3031
"github.com/elastic/beats/libbeat/reader"
3132
)
3233

@@ -188,7 +189,8 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
188189
var logLine logLine
189190
err = p.parseLine(&message, &logLine)
190191
if err != nil {
191-
return message, err
192+
logp.Err("Parse line error: %v", err)
193+
return message, reader.ErrLineUnparsable
192194
}
193195

194196
// Handle multiline messages, join partial lines
@@ -204,7 +206,8 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
204206
}
205207
err = p.parseLine(&next, &logLine)
206208
if err != nil {
207-
return message, err
209+
logp.Err("Parse line error: %v", err)
210+
return message, reader.ErrLineUnparsable
208211
}
209212
message.Content = append(message.Content, next.Content...)
210213
}

‎libbeat/reader/readjson/docker_json_test.go

+19-9
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestDockerJSON(t *testing.T) {
3535
partial bool
3636
format string
3737
criflags bool
38-
expectedError bool
38+
expectedError error
3939
expectedMessage reader.Message
4040
}{
4141
{
@@ -53,7 +53,7 @@ func TestDockerJSON(t *testing.T) {
5353
name: "Wrong JSON",
5454
input: [][]byte{[]byte(`this is not JSON`)},
5555
stream: "all",
56-
expectedError: true,
56+
expectedError: reader.ErrLineUnparsable,
5757
expectedMessage: reader.Message{
5858
Bytes: 16,
5959
},
@@ -62,7 +62,7 @@ func TestDockerJSON(t *testing.T) {
6262
name: "Wrong CRI",
6363
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout`)},
6464
stream: "all",
65-
expectedError: true,
65+
expectedError: reader.ErrLineUnparsable,
6666
expectedMessage: reader.Message{
6767
Bytes: 37,
6868
},
@@ -71,7 +71,7 @@ func TestDockerJSON(t *testing.T) {
7171
name: "Wrong CRI",
7272
input: [][]byte{[]byte(`{this is not JSON nor CRI`)},
7373
stream: "all",
74-
expectedError: true,
74+
expectedError: reader.ErrLineUnparsable,
7575
expectedMessage: reader.Message{
7676
Bytes: 25,
7777
},
@@ -80,7 +80,7 @@ func TestDockerJSON(t *testing.T) {
8080
name: "Missing time",
8181
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
8282
stream: "all",
83-
expectedError: true,
83+
expectedError: reader.ErrLineUnparsable,
8484
expectedMessage: reader.Message{
8585
Bytes: 82,
8686
},
@@ -207,7 +207,7 @@ func TestDockerJSON(t *testing.T) {
207207
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
208208
stream: "all",
209209
format: "cri",
210-
expectedError: true,
210+
expectedError: reader.ErrLineUnparsable,
211211
expectedMessage: reader.Message{
212212
Bytes: 82,
213213
},
@@ -217,7 +217,7 @@ func TestDockerJSON(t *testing.T) {
217217
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)},
218218
stream: "all",
219219
format: "docker",
220-
expectedError: true,
220+
expectedError: reader.ErrLineUnparsable,
221221
expectedMessage: reader.Message{
222222
Bytes: 115,
223223
},
@@ -289,12 +289,21 @@ func TestDockerJSON(t *testing.T) {
289289
[]byte(`{"log":"shutdown...\n","stream`),
290290
},
291291
stream: "stdout",
292-
expectedError: true,
292+
expectedError: reader.ErrLineUnparsable,
293293
expectedMessage: reader.Message{
294294
Bytes: 139,
295295
},
296296
partial: true,
297297
},
298+
{
299+
name: "Corrupted log message line",
300+
input: [][]byte{[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)},
301+
stream: "all",
302+
expectedError: reader.ErrLineUnparsable,
303+
expectedMessage: reader.Message{
304+
Bytes: 97,
305+
},
306+
},
298307
}
299308

300309
for _, test := range tests {
@@ -303,8 +312,9 @@ func TestDockerJSON(t *testing.T) {
303312
json := New(r, test.stream, test.partial, test.format, test.criflags)
304313
message, err := json.Next()
305314

306-
if test.expectedError {
315+
if test.expectedError != nil {
307316
assert.Error(t, err)
317+
assert.Equal(t, test.expectedError, err)
308318
} else {
309319
assert.NoError(t, err)
310320
}

0 commit comments

Comments
 (0)
Please sign in to comment.