From 504a556c1f55b9ea6a28100bdbaf4b14bb3f42a2 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 20 May 2021 09:02:42 -0400 Subject: [PATCH 1/3] Move file opening into makeReaders method, and add Close method to Reader --- operator/builtin/input/file/file.go | 57 +++++++++++++-------------- operator/builtin/input/file/reader.go | 5 +++ 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index ad1fea14..1eb2d857 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -136,26 +136,7 @@ func (f *InputOperator) poll(ctx context.Context) { } } - // Open the files first to minimize the time between listing and opening - files := make([]*os.File, 0, len(matches)) - for _, path := range matches { - if _, ok := f.SeenPaths[path]; !ok { - if f.startAtBeginning { - f.Infow("Started watching file", "path", path) - } else { - f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path) - } - f.SeenPaths[path] = struct{}{} - } - file, err := os.Open(path) - if err != nil { - f.Errorw("Failed to open file", zap.Error(err)) - continue - } - files = append(files, file) - } - - readers := f.makeReaders(files) + readers := f.makeReaders(matches) f.firstCheck = false var wg sync.WaitGroup @@ -171,8 +152,8 @@ func (f *InputOperator) poll(ctx context.Context) { wg.Wait() // Close all files - for _, file := range files { - file.Close() + for _, reader := range readers { + reader.Close() } f.saveCurrent(readers) @@ -208,7 +189,27 @@ func getMatches(includes, excludes []string) []string { // makeReaders takes a list of paths, then creates readers from each of those paths, // discarding any that have a duplicate fingerprint to other files that have already // been read this polling interval -func (f *InputOperator) makeReaders(files []*os.File) []*Reader { +func (f *InputOperator) makeReaders(filesPaths []string) []*Reader { + + // Open the files first to minimize the time between listing and opening + files := make([]*os.File, 0, len(filesPaths)) + for _, path := range filesPaths { + if _, ok := f.SeenPaths[path]; !ok { + if f.startAtBeginning { + f.Infow("Started watching file", "path", path) + } else { + f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path) + } + f.SeenPaths[path] = struct{}{} + } + file, err := os.Open(path) + if err != nil { + f.Errorw("Failed to open file", zap.Error(err)) + continue + } + files = append(files, file) + } + // Get fingerprints for each file fps := make([]*Fingerprint, 0, len(files)) for _, file := range files { @@ -220,10 +221,6 @@ func (f *InputOperator) makeReaders(files []*os.File) []*Reader { fps = append(fps, fp) } - // Make a copy of the files so we don't modify the original - filesCopy := make([]*os.File, len(files)) - copy(filesCopy, files) - // Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files OUTER: for i := 0; i < len(fps); { @@ -231,7 +228,7 @@ OUTER: if len(fp.FirstBytes) == 0 { // Empty file, don't read it until we can compare its fingerprint fps = append(fps[:i], fps[i+1:]...) - filesCopy = append(filesCopy[:i], filesCopy[i+1:]...) + files = append(files[:i], files[i+1:]...) } for j := 0; j < len(fps); j++ { @@ -244,7 +241,7 @@ OUTER: if fp.StartsWith(fp2) || fp2.StartsWith(fp) { // Exclude fps = append(fps[:i], fps[i+1:]...) - filesCopy = append(filesCopy[:i], filesCopy[i+1:]...) + files = append(files[:i], files[i+1:]...) continue OUTER } } @@ -253,7 +250,7 @@ OUTER: readers := make([]*Reader, 0, len(fps)) for i := 0; i < len(fps); i++ { - reader, err := f.newReader(filesCopy[i], fps[i], f.firstCheck) + reader, err := f.newReader(files[i], fps[i], f.firstCheck) if err != nil { f.Errorw("Failed to create reader", zap.Error(err)) continue diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 12dbf6bd..150319cd 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -117,6 +117,11 @@ func (f *Reader) ReadToEnd(ctx context.Context) { } } +// Close will close the file +func (f *Reader) Close() error { + return f.file.Close() +} + // Emit creates an entry with the decoded message and sends it to the next // operator in the pipeline func (f *Reader) emit(ctx context.Context, msgBuf []byte) error { From 3e2e8d2808344701516f6a95c050be7911cbf39d Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 20 May 2021 10:10:51 -0400 Subject: [PATCH 2/3] Fix lint --- operator/builtin/input/file/file.go | 1 - 1 file changed, 1 deletion(-) diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index 1eb2d857..e6883ef8 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -190,7 +190,6 @@ func getMatches(includes, excludes []string) []string { // discarding any that have a duplicate fingerprint to other files that have already // been read this polling interval func (f *InputOperator) makeReaders(filesPaths []string) []*Reader { - // Open the files first to minimize the time between listing and opening files := make([]*os.File, 0, len(filesPaths)) for _, path := range filesPaths { From dacced7546e696856473d24e3f81890d86784c9d Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Mon, 24 May 2021 15:15:20 -0400 Subject: [PATCH 3/3] Close files if they will not be converted to readers --- operator/builtin/input/file/file.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index e6883ef8..fcfb2553 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -225,6 +225,7 @@ OUTER: for i := 0; i < len(fps); { fp := fps[i] if len(fp.FirstBytes) == 0 { + files[i].Close() // Empty file, don't read it until we can compare its fingerprint fps = append(fps[:i], fps[i+1:]...) files = append(files[:i], files[i+1:]...) @@ -239,6 +240,7 @@ OUTER: fp2 := fps[j] if fp.StartsWith(fp2) || fp2.StartsWith(fp) { // Exclude + files[i].Close() fps = append(fps[:i], fps[i+1:]...) files = append(files[:i], files[i+1:]...) continue OUTER