-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathfile_other.go
57 lines (49 loc) · 1.83 KB
/
file_other.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
//go:build !windows
package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"
import (
"context"
"sync"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)
// Take care of files which disappeared from the pattern since the last poll cycle
// this can mean either files which were removed, or rotated into a name not matching the pattern
// we do this before reading existing files to ensure we emit older log lines before newer ones
func (m *Manager) readLostFiles(ctx context.Context) {
if m.readerFactory.DeleteAtEOF {
// Lost files are not expected when delete_at_eof is enabled
// since we are deleting the files before they can become lost.
return
}
previousPollFiles := m.tracker.PreviousPollFiles()
lostReaders := make([]*reader.Reader, 0, len(previousPollFiles))
OUTER:
for _, oldReader := range previousPollFiles {
for _, newReader := range m.tracker.CurrentPollFiles() {
if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
}
if !newReader.NameEquals(oldReader) {
continue
}
// At this point, we know that the file has been rotated. However, we do not know
// if it was moved or truncated. If truncated, then both handles point to the same
// file, in which case we should only read from it using the new reader. We can use
// the Validate method to ensure that the file has not been truncated.
if !oldReader.Validate() {
continue OUTER
}
}
lostReaders = append(lostReaders, oldReader)
}
var lostWG sync.WaitGroup
for _, lostReader := range lostReaders {
lostWG.Add(1)
go func(r *reader.Reader) {
defer lostWG.Done()
r.ReadToEnd(ctx)
}(lostReader)
}
lostWG.Wait()
}