Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

File Input Cleanup - Step 1 #155

Merged
merged 3 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 28 additions & 30 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,26 +136,7 @@ func (f *InputOperator) poll(ctx context.Context) {
}
}

// Open the files first to minimize the time between listing and opening
files := make([]*os.File, 0, len(matches))
for _, path := range matches {
if _, ok := f.SeenPaths[path]; !ok {
if f.startAtBeginning {
f.Infow("Started watching file", "path", path)
} else {
f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
}
f.SeenPaths[path] = struct{}{}
}
file, err := os.Open(path)
if err != nil {
f.Errorw("Failed to open file", zap.Error(err))
continue
}
files = append(files, file)
}

readers := f.makeReaders(files)
readers := f.makeReaders(matches)
f.firstCheck = false

var wg sync.WaitGroup
Expand All @@ -171,8 +152,8 @@ func (f *InputOperator) poll(ctx context.Context) {
wg.Wait()

// Close all files
for _, file := range files {
file.Close()
for _, reader := range readers {
reader.Close()
}

f.saveCurrent(readers)
Expand Down Expand Up @@ -208,7 +189,26 @@ func getMatches(includes, excludes []string) []string {
// makeReaders takes a list of paths, then creates readers from each of those paths,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (f *InputOperator) makeReaders(files []*os.File) []*Reader {
func (f *InputOperator) makeReaders(filesPaths []string) []*Reader {
// Open the files first to minimize the time between listing and opening
files := make([]*os.File, 0, len(filesPaths))
for _, path := range filesPaths {
if _, ok := f.SeenPaths[path]; !ok {
if f.startAtBeginning {
f.Infow("Started watching file", "path", path)
} else {
f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
}
f.SeenPaths[path] = struct{}{}
}
file, err := os.Open(path)
if err != nil {
f.Errorw("Failed to open file", zap.Error(err))
continue
}
files = append(files, file)
}

// Get fingerprints for each file
fps := make([]*Fingerprint, 0, len(files))
for _, file := range files {
Expand All @@ -220,18 +220,15 @@ func (f *InputOperator) makeReaders(files []*os.File) []*Reader {
fps = append(fps, fp)
}

// Make a copy of the files so we don't modify the original
filesCopy := make([]*os.File, len(files))
copy(filesCopy, files)

// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files
OUTER:
for i := 0; i < len(fps); {
fp := fps[i]
if len(fp.FirstBytes) == 0 {
files[i].Close()
// Empty file, don't read it until we can compare its fingerprint
fps = append(fps[:i], fps[i+1:]...)
filesCopy = append(filesCopy[:i], filesCopy[i+1:]...)
files = append(files[:i], files[i+1:]...)
}

for j := 0; j < len(fps); j++ {
Expand All @@ -243,8 +240,9 @@ OUTER:
fp2 := fps[j]
if fp.StartsWith(fp2) || fp2.StartsWith(fp) {
// Exclude
files[i].Close()
fps = append(fps[:i], fps[i+1:]...)
filesCopy = append(filesCopy[:i], filesCopy[i+1:]...)
files = append(files[:i], files[i+1:]...)
continue OUTER
}
}
Expand All @@ -253,7 +251,7 @@ OUTER:

readers := make([]*Reader, 0, len(fps))
for i := 0; i < len(fps); i++ {
reader, err := f.newReader(filesCopy[i], fps[i], f.firstCheck)
reader, err := f.newReader(files[i], fps[i], f.firstCheck)
if err != nil {
f.Errorw("Failed to create reader", zap.Error(err))
continue
Expand Down
5 changes: 5 additions & 0 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ func (f *Reader) ReadToEnd(ctx context.Context) {
}
}

// Close will close the file
func (f *Reader) Close() error {
return f.file.Close()
}

// Emit creates an entry with the decoded message and sends it to the next
// operator in the pipeline
func (f *Reader) emit(ctx context.Context, msgBuf []byte) error {
Expand Down