-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
system/socket: propagate backpressure from output #32192
Conversation
This pull request doesn't have a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my initial look, I will take another look later.
I'd just like to confirm/fix my understanding of the approach here. ISTM that this is decoupling the publication of events from the list of captured events from holding the list of events once the timeouts have happened. To prevent the publication from getting bound by publication rate limiting it. Is this correct?
I think it would be helpful to have a description of the approach in the PR description with a view to including it in the commit message when the squash happens.
func (s *state) ExpireFlows() { | ||
start := s.clock() | ||
toReport := s.expireFlows() | ||
if sent := s.reportFlows(&toReport); sent > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sent
cannot be negative, so sent != 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
return toReport | ||
} | ||
|
||
func (l *linkedList) append(b linkedList) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for clarity, the caller to l.append
never uses b
again, true? Otherwise this is not safe since their b
will continue to be a tail of its previous head. This is always what is happening here since b
is always a newly constructed value, but it is probably worth documenting that this is the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I've updated the function to take a pointer for the second argument and zero it before returning.
var errs multierror.Errors | ||
rootPut := func(key string, value interface{}) { | ||
if _, err := root.Put(key, value); err != nil { | ||
errs = append(errs, err) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😄
} | ||
} | ||
|
||
func (s *state) expireFlows() (toReport linkedList) { | ||
s.Lock() | ||
defer s.Unlock() | ||
deadline := s.clock().Add(-s.inactiveTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a minor point, but I'm wondering if s.clock()
should be pulled out into now
and have that used in the places here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
var toReport linkedList | ||
// Send flows to the output as a deferred function to avoid | ||
// holding on s mutex when there's backpressure from the output. | ||
defer s.reportFlows(&toReport) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
} | ||
|
||
func (s *state) expireFlows() (toReport linkedList) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering at this point, when it is now a matter of collecting items, if it wouldn't be simpler to have a []linkedElement
for reportFlows
to work on; the allocations would likely be reduced (particularly if the size hints were used to preallocate) and the code would be simpler. This would involve changes in onSockTerminated
and onFlowTerminated
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow. Removing elements from linkedLists and creating a new linked list from those existing elements is a zero-alloc operation. I'll add a test to ensure this is always the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think checking the escape analysis would be worthwhile. The head that is being obtained for adding to toReport
is not the same memory that is in the source since we are not using the pointer, so it must be allocated. Since it is being used after the function returns, it cannot be on the stack, so I think the allocation needs to be on the heap. The tail of that head will all be already allocated and so they are certainly zero-alloc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right; I've looked at the escape analysis and it doesn't escape.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've gone over this more carefully because I was unable to find a fault in my logic (the -m
report is not in source order so I missed it previously and accepted the output I saw) and I can see that the b
parameter to append
does leak.
./state.go:901:29: leaking param content: b
return true | ||
} | ||
|
||
func (ts *testingState) Error(_ error) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/_ //g
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
|
||
func (ts *testingState) feedEvents(evs []event) { | ||
for idx, ev := range evs { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add t.Helper()
above this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@efd6 please re-review. I've updated the description with an explanation. |
OK, I have the logic now and it looks OK to me. Thanks for the clarification above. I think it would be helpful to also say that the back pressure is exerted through |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming my interpretation of the back pressure being exerted via the *state
mutex is correct, this LGTM.
s.reportFlow(f) | ||
count++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If *state.reportFlow
returns a conditional 1 for success and 0 for failure then the count would more accurately reflect what was reported (depending on whether you are interested in the attempts or the succeeds).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, done.
var toReport linkedList | ||
// Send flows to the output as a deferred function to avoid | ||
// holding on s mutex when there's backpressure from the output. | ||
defer s.reportFlows(&toReport) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth logging the count here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to log in the main thread as this will report a flow every time a socket is closed.
The logging in ExpireFlows is there to have a way to tell when backpressure is happening without causing too much impact.
We have users testing this change. Waiting for feedback before moving out of the draft state. |
Co-authored-by: Dan Kortschak <90160302+efd6@users.noreply.github.com>
Co-authored-by: Dan Kortschak <90160302+efd6@users.noreply.github.com>
Co-authored-by: Dan Kortschak <90160302+efd6@users.noreply.github.com>
8845b36
to
ef5c302
Compare
Pinging @elastic/security-external-integrations (Team:Security-External Integrations) |
This PR modifies the socket dataset so that it is affected by output backpressure. Originally, it operated as two separate goroutines. One would read kernel tracing events as fast as possible and maintain a view of the open sockets/flows on the system. Once a flow is deemed terminated, it would be stored on a linked list (done) for it to be published to the output. A separate goroutine consumes that linked list and publishes the flows. Under heavy network load, it can generate flows faster than the output is able to accept. Due to the linked list being unbounded, it would store more and more flows, causing Auditbeat's memory usage to grow over time. This PR simplifies flow publication in such a way that if the output blocks, the main reader goroutine will block too, preventing the consumption of new kernel events and the generation of additional flows until the output is able to accept more documents. Also it contains a necessary refactor: Moving the linked list implementation to a helper package. This adds a bit of noise. (cherry picked from commit e362ca7) # Conflicts: # x-pack/auditbeat/module/system/socket/state.go # x-pack/auditbeat/module/system/socket/state_test.go
This PR modifies the socket dataset so that it is affected by output backpressure. Originally, it operated as two separate goroutines. One would read kernel tracing events as fast as possible and maintain a view of the open sockets/flows on the system. Once a flow is deemed terminated, it would be stored on a linked list (done) for it to be published to the output. A separate goroutine consumes that linked list and publishes the flows. Under heavy network load, it can generate flows faster than the output is able to accept. Due to the linked list being unbounded, it would store more and more flows, causing Auditbeat's memory usage to grow over time. This PR simplifies flow publication in such a way that if the output blocks, the main reader goroutine will block too, preventing the consumption of new kernel events and the generation of additional flows until the output is able to accept more documents. Also it contains a necessary refactor: Moving the linked list implementation to a helper package. This adds a bit of noise. (cherry picked from commit e362ca7)
…put (#32480) This PR modifies the socket dataset so that it is affected by output backpressure. Originally, it operated as two separate goroutines. One would read kernel tracing events as fast as possible and maintain a view of the open sockets/flows on the system. Once a flow is deemed terminated, it would be stored on a linked list (done) for it to be published to the output. A separate goroutine consumes that linked list and publishes the flows. Under heavy network load, it can generate flows faster than the output is able to accept. Due to the linked list being unbounded, it would store more and more flows, causing Auditbeat's memory usage to grow over time. This PR simplifies flow publication in such a way that if the output blocks, the main reader goroutine will block too, preventing the consumption of new kernel events and the generation of additional flows until the output is able to accept more documents. Also it contains a necessary refactor: Moving the linked list implementation to a helper package. This adds a bit of noise. (cherry picked from commit e362ca7) Co-authored-by: Adrian Serrano <adrisr83@gmail.com>
This PR modifies the socket dataset so that it is affected by output backpressure. Originally, it operated as two separate goroutines. One would read kernel tracing events as fast as possible and maintain a view of the open sockets/flows on the system. Once a flow is deemed terminated, it would be stored on a linked list (done) for it to be published to the output. A separate goroutine consumes that linked list and publishes the flows. Under heavy network load, it can generate flows faster than the output is able to accept. Due to the linked list being unbounded, it would store more and more flows, causing Auditbeat's memory usage to grow over time. This PR simplifies flow publication in such a way that if the output blocks, the main reader goroutine will block too, preventing the consumption of new kernel events and the generation of additional flows until the output is able to accept more documents. Also it contains a necessary refactor: Moving the linked list implementation to a helper package. This adds a bit of noise. (cherry picked from commit e362ca7)
This PR modifies the socket dataset so that it is affected by output backpressure. Originally, it operated as two separate goroutines. One would read kernel tracing events as fast as possible and maintain a view of the open sockets/flows on the system. Once a flow is deemed terminated, it would be stored on a linked list (done) for it to be published to the output. A separate goroutine consumes that linked list and publishes the flows. Under heavy network load, it can generate flows faster than the output is able to accept. Due to the linked list being unbounded, it would store more and more flows, causing Auditbeat's memory usage to grow over time. This PR simplifies flow publication in such a way that if the output blocks, the main reader goroutine will block too, preventing the consumption of new kernel events and the generation of additional flows until the output is able to accept more documents. Also it contains a necessary refactor: Moving the linked list implementation to a helper package. This adds a bit of noise. (cherry picked from commit e362ca7) Co-authored-by: Adrian Serrano <adrisr83@gmail.com>
This PR modifies the socket dataset so that it is affected by output backpressure. Originally, it operated as two separate goroutines. One would read kernel tracing events as fast as possible and maintain a view of the open sockets/flows on the system. Once a flow is deemed terminated, it would be stored on a linked list (done) for it to be published to the output. A separate goroutine consumes that linked list and publishes the flows. Under heavy network load, it can generate flows faster than the output is able to accept. Due to the linked list being unbounded, it would store more and more flows, causing Auditbeat's memory usage to grow over time. This PR simplifies flow publication in such a way that if the output blocks, the main reader goroutine will block too, preventing the consumption of new kernel events and the generation of additional flows until the output is able to accept more documents. Also it contains a necessary refactor: Moving the linked list implementation to a helper package. This adds a bit of noise.
What does this PR do?
This PR modifies the socket dataset so that it is affected by output backpressure.
Originally, it operated as two separate goroutines. One would read kernel tracing events as fast as possible and maintain a view of the open sockets/flows on the system. Once a flow is deemed terminated, it would be stored on a linked list (
done
) for it to be published to the output.A separate goroutine consumes that linked list and publishes the flows.
Under heavy network load, it can generate flows faster than the output is able to accept. Due to the linked list being unbounded, it would store more and more flows, causing Auditbeat's memory usage to grow over time.
This PR simplifies flow publication in such a way that if the output blocks, the main reader goroutine will block too, preventing the consumption of new kernel events and the generation of additional flows until the output is able to accept more documents.
Also it contains a necessary refactor: Moving the linked list implementation to a helper package. This adds a bit of noise.
Why is it important?
To prevent high-memory usage of Auditbeat under heavy network I/O.
Checklist
[ ] I have made corresponding changes to the documentation[ ] I have made corresponding change to the default configuration filesCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Author's Checklist
How to test this PR locally
With the below code it's easy to increase the memory usage of Auditbeat dramatically. With this patch, it will stabilize.
Flow flooder script
Related issues
Use cases
Screenshots
Logs