-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[receiver/filelog] Flush can send partial input #32170
Comments
Pinging code owners:
See Adding Labels via Comments if you do not have permissions to add labels yourself. |
Pinging code owners for receiver/filelog: @djaglowski. See Adding Labels via Comments if you do not have permissions to add labels yourself. |
Thanks for investigating and writing this up @OverOrion. To summarize the expected vs problematic behavior: Expected: If a file ends with an unterminated log and the flush timer expires, we should flush a token containing the content after the offset, up until either the end of file or max log size. Problem: When a file ends with an unterminated log and the flush timer expires, we are flushing a token containing the content after the offset, but only up until initial buffer size. My understanding of the incorrect behavior does not rely on multiple sequential scanners. I would describe it as follows: A scanner is created with an initial buffer size.
The correct behavior would be the same, except the last two items should be related: 3a. The EOF is found before the flush timer has expired. No token is returned.
I think this is the solution to enable the correct behavior described above. In fact, I believe this behavior previously existed but the nuance was not tested or documented. Let's make sure to include both this time so we don't regress again in the future. |
Thank's @OverOrion for digging into this and sharing the details!
I wonder if it's correct to wait for the flush timer to expire in order to return the buffer in case the EOF is found instead of doing it immediately when EOF is found 🤔 . If we return at EOF immediately, a bigger If we want to keep this in order to preserve 3b: In that case the flush timer actually has more or less no effect, right? |
Another option could be to move the opentelemetry-collector-contrib/pkg/stanza/flush/flush.go Lines 62 to 66 in 3ab6693
before the timeout check (previous if block). This if-block only makes sense if the goal is to actually consume the rest of the data but by leaving it in the end will not ensure that on the next call the timeout won't be reached. So it actually has an a non deterministic impact which depends on "timing". However this would be completely identical with preserving the reading till the EOF. |
This is worth a separate issue to discuss if you want. In short though, I think this makes a lot of assumptions about how files are written which I've never been comfortable making. Does every application & OS write complete logs atomically? Otherwise we're just emitting partial logs which would have been complete if we just waited a little longer. Maybe there's a case to be made but I think this could potentially create big problems.
The key concept which I maybe didn't articulate well is that flushing is never a necessity prior to EOF. It shouldn't be part of the logic at all. (This is essentially what the bug boils down to, that the flush timer is being applied when it really shouldn't even be considered.) As long as we're not yet at EOF, we should just keep consuming the data rapidly and never look at the flush timer. Once at EOF, it's a relevant consideration. Typically when flushing is necessary at all, it's not because the timer expires while reading the file. It's because file is sitting idle and the timer expired in between polls. It can work either way, but it's not intended to be a consideration which is relevant during the normal course of consuming the file. |
Not sure I understand what you are suggesting but the purpose of that is to reset the timer because the timer is relative to the last time a log was emitted. In other words, when the final (partial) log in a file is reached, this is basically when the timer should start. |
I see yeap. It makes sense to not rely on such an assumption.
My point was mainly that by inverting the order of the final 2 if-blocks we ensure that first the check for remaining data happens and only if we have no remaining data we proceed to the timeout check. So the flow should be: ...
// We're seeing new data so postpone the next flush
if len(data) > s.LastDataLength {
s.LastDataChange = time.Now()
s.LastDataLength = len(data)
}
// Flush timed out
if time.Since(s.LastDataChange) > period {
s.LastDataChange = time.Now()
s.LastDataLength = 0
return len(data), data, nil
}
// Ask for more data
return 0, nil, nil
... If I don't miss anything this looks equivalent to introducing the Overall I believe we are aligned here. We can keep any remaining discussions at #32100. |
That makes sense. Thanks for clarifying. |
**Description:** Flush could have sent partial input before EOF was reached, this PR fixes it. **Link to tracking Issue:** #31512, #32170 **Testing:** Added unit test `TestFlushPeriodEOF` **Documentation:** Added a note to `force_flush_period` option --------- Signed-off-by: Szilard Parrag <szilard.parrag@axoflow.com> Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
Since #32100 was merged I guess we can close this? |
@crobert-1 since we closed #31512 I think we can close this one as well. |
Component(s)
pkg/stanza
What happened?
Description
As mentioned in #31512 the filelogreceiver could "loose" some characters. The issue is a bit long now, so this is here to summarize the problem, minimize the reproduction steps and point to the relevant lines of code.
The problem is how the flush logic behaves with a scanner. By default the scanner expects newline terminated lines, but if it can't find it, then it will return the current buffer once flush timeout expires. The problem with this is that there is no communication between the flushing and scanner, so the following is possible:
The problem here seems to be with the different lifetimes of
FlushState
andScanner
as there is a singleFlushState
instance for a reader which will have differentScanner
instancesopentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner/scanner.go
Line 23 in d75362a
This means that these scanner instances will all all share the same fate:
n
bytes (n
==initial buffer size
), try to read more but since it can't find the newline terminator it won't return with tokens.opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader/reader.go
Lines 62 to 80 in d75362a
flush
timer expires, then the currentScanner
will be force flushed, yieldingn
bytes only (this could be different, depenending on when the flush timeout reaps it)The reconstruction is needed because:
Steps to Reproduce
Input creation without newline ending
Collector
opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner/scanner.go
Line 14 in 87630b2
Expected Result
Whole line as is
Actual Result
Chunked line
Possible solutions
atEOF
wastrue
, but it's only for file based sources, so it would not work forTCP
for example.Scanner
s so they would not have to be recreated just to read new input/lines, and the flush timeout would only send the the buffer if the scanning can't advance anymorebufio.Scanner
might need to be retired in favor of something else (something based onbufio.Reader
perhaps?), combined with keeping track of the current partial token. This is something that needs to be given some thought because many things rely on the scanner currently.What do you think @djaglowski @ChrsMark?
Also huge kudos to @MrAnno for pair debugging this issue with me 🚀
Collector version
e4c5b51
Environment information
Environment
OS: Ubuntu 23.10
Compiler(if manually compiled): go 1.21.6
OpenTelemetry Collector configuration
Log output
Additional context
I also added some good ole' print statements to Func()
opentelemetry-collector-contrib/pkg/stanza/flush/flush.go
Line 29 in d75362a
opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader/reader.go
Line 82 in d75362a
which helped with debugging.
The text was updated successfully, but these errors were encountered: