From 4b91be069acd470ab6b4384d0c5f055797162614 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Fri, 27 Oct 2023 11:04:54 -0600 Subject: [PATCH] [chore][pkg/stanaza] Remove currentFps from fileconsumer.Manager (#28493) --- pkg/stanza/fileconsumer/file.go | 69 +++++++++++++-------------------- 1 file changed, 26 insertions(+), 43 deletions(-) diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 68e9ea64b042..6cb08e7cea55 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -34,8 +34,6 @@ type Manager struct { previousPollFiles []*reader.Reader knownFiles []*reader.Metadata - - currentFps []*fingerprint.Fingerprint } func (m *Manager) Start(persister operator.Persister) error { @@ -146,14 +144,8 @@ func (m *Manager) poll(ctx context.Context) { } func (m *Manager) consume(ctx context.Context, paths []string) { - m.Debug("Consuming files") - readers := make([]*reader.Reader, 0, len(paths)) - for _, path := range paths { - r := m.makeReader(path) - if r != nil { - readers = append(readers, r) - } - } + m.Debug("Consuming files", zap.Strings("paths", paths)) + readers := m.makeReaders(paths) // take care of files which disappeared from the pattern since the last poll cycle // this can mean either files which were removed, or rotated into a name not matching the pattern @@ -173,7 +165,6 @@ func (m *Manager) consume(ctx context.Context, paths []string) { wg.Wait() m.previousPollFiles = readers - m.clearCurrentFingerprints() } func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) { @@ -201,45 +192,37 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi return fp, file } -func (m *Manager) checkDuplicates(fp *fingerprint.Fingerprint) bool { - for i := 0; i < len(m.currentFps); i++ { - if fp.Equal(m.currentFps[i]) { - return true - } - } - return false -} - // makeReader take a file path, then creates reader, // discarding any that have a duplicate fingerprint to other files that have already // been read this polling interval -func (m *Manager) makeReader(path string) *reader.Reader { - // Open the files first to minimize the time between listing and opening - fp, file := m.makeFingerprint(path) - if fp == nil { - return nil - } - - // Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files - if m.checkDuplicates(fp) { - if err := file.Close(); err != nil { - m.Debugw("problem closing file", zap.Error(err)) +func (m *Manager) makeReaders(paths []string) []*reader.Reader { + readers := make([]*reader.Reader, 0, len(paths)) + for _, path := range paths { + fp, file := m.makeFingerprint(path) + if fp == nil { + continue } - return nil - } - m.currentFps = append(m.currentFps, fp) - reader, err := m.newReader(file, fp) - if err != nil { - m.Errorw("Failed to create reader", zap.Error(err)) - return nil - } + // Exclude duplicate paths with the same content. This can happen when files are + // being rotated with copy/truncate strategy. (After copy, prior to truncate.) + for _, r := range readers { + if fp.Equal(r.Fingerprint) { + if err := file.Close(); err != nil { + m.Debugw("problem closing file", zap.Error(err)) + } + continue + } + } - return reader -} + r, err := m.newReader(file, fp) + if err != nil { + m.Errorw("Failed to create reader", zap.Error(err)) + continue + } -func (m *Manager) clearCurrentFingerprints() { - m.currentFps = make([]*fingerprint.Fingerprint, 0) + readers = append(readers, r) + } + return readers } func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {