From c58ccd99d6a0166883fd12ce3345ee2bdb25524f Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Tue, 8 Feb 2022 17:01:57 -0500 Subject: [PATCH 1/2] WIP - filelog receiver should not attempt to track lost files on windows --- operator/builtin/input/file/file.go | 69 +++++++++++++++++------------ 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index f13d2f15..8e7b663d 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "os" + "runtime" "sync" "time" @@ -85,11 +86,14 @@ func (f *InputOperator) Start(persister operator.Persister) error { func (f *InputOperator) Stop() error { f.cancel() f.wg.Wait() - for _, reader := range f.lastPollReaders { - reader.Close() - } - for _, reader := range f.knownFiles { - reader.Close() + + if runtime.GOOS != "windows" { + for _, reader := range f.lastPollReaders { + reader.Close() + } + for _, reader := range f.knownFiles { + reader.Close() + } } f.knownFiles = nil f.cancel = nil @@ -148,27 +152,32 @@ func (f *InputOperator) poll(ctx context.Context) { readers := f.makeReaders(matches) f.firstCheck = false - // Detect files that have been rotated out of matching pattern - lostReaders := make([]*Reader, 0, len(f.lastPollReaders)) -OUTER: - for _, oldReader := range f.lastPollReaders { - for _, reader := range readers { - if reader.Fingerprint.StartsWith(oldReader.Fingerprint) { - continue OUTER + if runtime.GOOS != "windows" { + + // Detect files that have been rotated out of matching pattern + lostReaders := make([]*Reader, 0, len(f.lastPollReaders)) + OUTER: + for _, oldReader := range f.lastPollReaders { + for _, reader := range readers { + if reader.Fingerprint.StartsWith(oldReader.Fingerprint) { + continue OUTER + } } + lostReaders = append(lostReaders, oldReader) } - lostReaders = append(lostReaders, oldReader) - } - var wg sync.WaitGroup - for _, reader := range lostReaders { - wg.Add(1) - go func(r *Reader) { - defer wg.Done() - r.ReadToEnd(ctx) - }(reader) + var lostWG sync.WaitGroup + for _, reader := range lostReaders { + lostWG.Add(1) + go func(r *Reader) { + defer lostWG.Done() + r.ReadToEnd(ctx) + }(reader) + } + lostWG.Wait() } + var wg sync.WaitGroup for _, reader := range readers { wg.Add(1) go func(r *Reader) { @@ -176,16 +185,20 @@ OUTER: r.ReadToEnd(ctx) }(reader) } - - // Wait until all the reader goroutines are finished wg.Wait() - // Close all files - for _, reader := range f.lastPollReaders { - reader.Close() - } + if runtime.GOOS != "windows" { + // Close all files + for _, reader := range f.lastPollReaders { + reader.Close() + } - f.lastPollReaders = readers + f.lastPollReaders = readers + } else { + for _, reader := range readers { + reader.Close() + } + } f.saveCurrent(readers) f.syncLastPollFiles(ctx) From 7b5858775ecc00f78340e455dec5b30597fa0ed7 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Tue, 8 Feb 2022 17:57:28 -0500 Subject: [PATCH 2/2] Pull os-specific file handle management into separate struct --- operator/builtin/input/file/config.go | 1 + operator/builtin/input/file/file.go | 60 +++-------------- operator/builtin/input/file/roller.go | 22 ++++++ operator/builtin/input/file/roller_other.go | 67 +++++++++++++++++++ operator/builtin/input/file/roller_windows.go | 36 ++++++++++ 5 files changed, 135 insertions(+), 51 deletions(-) create mode 100644 operator/builtin/input/file/roller.go create mode 100644 operator/builtin/input/file/roller_other.go create mode 100644 operator/builtin/input/file/roller_windows.go diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index 4d8e1f31..0a69b4c0 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -167,6 +167,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, firstCheck: true, cancel: func() {}, knownFiles: make([]*Reader, 0, 10), + roller: newRoller(), fingerprintSize: int(c.FingerprintSize), MaxLogSize: int(c.MaxLogSize), MaxConcurrentFiles: c.MaxConcurrentFiles, diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index 8e7b663d..346583ee 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" "os" - "runtime" "sync" "time" @@ -48,10 +47,10 @@ type InputOperator struct { persister operator.Persister - knownFiles []*Reader - queuedMatches []string - maxBatchFiles int - lastPollReaders []*Reader + knownFiles []*Reader + queuedMatches []string + maxBatchFiles int + roller roller startAtBeginning bool @@ -71,6 +70,7 @@ func (f *InputOperator) Start(persister operator.Persister) error { f.firstCheck = true f.persister = persister + // Load offsets from disk if err := f.loadLastPollFiles(ctx); err != nil { return fmt.Errorf("read known files from database: %s", err) @@ -86,14 +86,9 @@ func (f *InputOperator) Start(persister operator.Persister) error { func (f *InputOperator) Stop() error { f.cancel() f.wg.Wait() - - if runtime.GOOS != "windows" { - for _, reader := range f.lastPollReaders { - reader.Close() - } - for _, reader := range f.knownFiles { - reader.Close() - } + f.roller.cleanup() + for _, reader := range f.knownFiles { + reader.Close() } f.knownFiles = nil f.cancel = nil @@ -152,31 +147,6 @@ func (f *InputOperator) poll(ctx context.Context) { readers := f.makeReaders(matches) f.firstCheck = false - if runtime.GOOS != "windows" { - - // Detect files that have been rotated out of matching pattern - lostReaders := make([]*Reader, 0, len(f.lastPollReaders)) - OUTER: - for _, oldReader := range f.lastPollReaders { - for _, reader := range readers { - if reader.Fingerprint.StartsWith(oldReader.Fingerprint) { - continue OUTER - } - } - lostReaders = append(lostReaders, oldReader) - } - - var lostWG sync.WaitGroup - for _, reader := range lostReaders { - lostWG.Add(1) - go func(r *Reader) { - defer lostWG.Done() - r.ReadToEnd(ctx) - }(reader) - } - lostWG.Wait() - } - var wg sync.WaitGroup for _, reader := range readers { wg.Add(1) @@ -187,19 +157,7 @@ func (f *InputOperator) poll(ctx context.Context) { } wg.Wait() - if runtime.GOOS != "windows" { - // Close all files - for _, reader := range f.lastPollReaders { - reader.Close() - } - - f.lastPollReaders = readers - } else { - for _, reader := range readers { - reader.Close() - } - } - + f.roller.roll(ctx, readers) f.saveCurrent(readers) f.syncLastPollFiles(ctx) } diff --git a/operator/builtin/input/file/roller.go b/operator/builtin/input/file/roller.go new file mode 100644 index 00000000..21498fdb --- /dev/null +++ b/operator/builtin/input/file/roller.go @@ -0,0 +1,22 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import "context" + +type roller interface { + roll(context.Context, []*Reader) + cleanup() +} diff --git a/operator/builtin/input/file/roller_other.go b/operator/builtin/input/file/roller_other.go new file mode 100644 index 00000000..2dc80f87 --- /dev/null +++ b/operator/builtin/input/file/roller_other.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !windows +// +build !windows + +package file + +import ( + "context" + "sync" +) + +type detectLostFiles struct { + oldReaders []*Reader +} + +func newRoller() roller { + return &detectLostFiles{[]*Reader{}} +} + +func (r *detectLostFiles) roll(ctx context.Context, readers []*Reader) { + // Detect files that have been rotated out of matching pattern + lostReaders := make([]*Reader, 0, len(r.oldReaders)) +OUTER: + for _, oldReader := range r.oldReaders { + for _, reader := range readers { + if reader.Fingerprint.StartsWith(oldReader.Fingerprint) { + continue OUTER + } + } + lostReaders = append(lostReaders, oldReader) + } + + var lostWG sync.WaitGroup + for _, reader := range lostReaders { + lostWG.Add(1) + go func(r *Reader) { + defer lostWG.Done() + r.ReadToEnd(ctx) + }(reader) + } + lostWG.Wait() + + for _, reader := range r.oldReaders { + reader.Close() + } + + r.oldReaders = readers +} + +func (r *detectLostFiles) cleanup() { + for _, reader := range r.oldReaders { + reader.Close() + } +} diff --git a/operator/builtin/input/file/roller_windows.go b/operator/builtin/input/file/roller_windows.go new file mode 100644 index 00000000..fbf44fe9 --- /dev/null +++ b/operator/builtin/input/file/roller_windows.go @@ -0,0 +1,36 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build windows +// +build windows + +package file + +import "context" + +type closeImmediately struct{} + +func newRoller() roller { + return &closeImmediately{} +} + +func (r *closeImmediately) roll(_ context.Context, readers []*Reader) { + for _, reader := range readers { + reader.Close() + } +} + +func (r *closeImmediately) cleanup() { + return +}