Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

system/socket: propagate backpressure from output #32192

Merged
merged 20 commits into from
Jul 25, 2022

Conversation

adriansr
Copy link
Contributor

@adriansr adriansr commented Jul 4, 2022

What does this PR do?

This PR modifies the socket dataset so that it is affected by output backpressure.

Originally, it operated as two separate goroutines. One would read kernel tracing events as fast as possible and maintain a view of the open sockets/flows on the system. Once a flow is deemed terminated, it would be stored on a linked list (done) for it to be published to the output.

A separate goroutine consumes that linked list and publishes the flows.

Under heavy network load, it can generate flows faster than the output is able to accept. Due to the linked list being unbounded, it would store more and more flows, causing Auditbeat's memory usage to grow over time.

This PR simplifies flow publication in such a way that if the output blocks, the main reader goroutine will block too, preventing the consumption of new kernel events and the generation of additional flows until the output is able to accept more documents.

Also it contains a necessary refactor: Moving the linked list implementation to a helper package. This adds a bit of noise.

Why is it important?

To prevent high-memory usage of Auditbeat under heavy network I/O.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Author's Checklist

  • [ ]

How to test this PR locally

With the below code it's easy to increase the memory usage of Auditbeat dramatically. With this patch, it will stabilize.

Flow flooder script
package main

import (
	"fmt"
	"math/rand"
	"net"
	"os"
)

const (
	NSOCKS     = 8000
	ADDR       = "127.0.0.1"
	ADDRLISTEN = ADDR + ":0"
)

func main() {
	addr, err := net.ResolveUDPAddr("udp4", ADDRLISTEN)
	if err != nil {
		panic(fmt.Errorf("ResolveUDPAddr(%s): %w", ADDRLISTEN, err))
	}
	var sock [NSOCKS]*net.UDPConn
	for idx := range sock {
		c, err := net.ListenUDP("udp4", addr)
		if err != nil {
			panic(fmt.Errorf("ListenUDP(%s): %w", addr, err))
		}
		sock[idx] = c
		laddr := c.LocalAddr().(*net.UDPAddr)
		fmt.Fprintf(os.Stderr, "sock[%d] is at %d\n", idx, laddr.Port)
	}
	data := []byte("Some random data")
	for {
		for _, src := range sock {
			dst := sock[rand.Intn(NSOCKS)]
			if _, err = src.WriteTo(data, dst.LocalAddr()); err != nil {
				panic(fmt.Errorf("WriteTo(%v -> %v): %w", src.LocalAddr(), dst.LocalAddr(), err))
			}
		}
	}
}

Related issues

Use cases

Screenshots

Logs

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Jul 4, 2022
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Jul 4, 2022
@adriansr adriansr added enhancement in progress Pull request is currently in progress. needs_team Indicates that the issue/PR needs a Team:* label labels Jul 4, 2022
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Jul 4, 2022
@botelastic
Copy link

botelastic bot commented Jul 4, 2022

This pull request doesn't have a Team:<team> label.

@elasticmachine
Copy link
Collaborator

elasticmachine commented Jul 4, 2022

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2022-07-22T14:58:51.626+0000

  • Duration: 50 min 44 sec

Test stats 🧪

Test Results
Failed 0
Passed 300
Skipped 49
Total 349

💚 Flaky test report

Tests succeeded.

🤖 GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

@adriansr adriansr requested a review from efd6 July 5, 2022 20:02
Copy link
Contributor

@efd6 efd6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my initial look, I will take another look later.

I'd just like to confirm/fix my understanding of the approach here. ISTM that this is decoupling the publication of events from the list of captured events from holding the list of events once the timeouts have happened. To prevent the publication from getting bound by publication rate limiting it. Is this correct?

I think it would be helpful to have a description of the approach in the PR description with a view to including it in the commit message when the squash happens.

func (s *state) ExpireFlows() {
start := s.clock()
toReport := s.expireFlows()
if sent := s.reportFlows(&toReport); sent > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sent cannot be negative, so sent != 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

x-pack/auditbeat/module/system/socket/state.go Outdated Show resolved Hide resolved
return toReport
}

func (l *linkedList) append(b linkedList) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for clarity, the caller to l.append never uses b again, true? Otherwise this is not safe since their b will continue to be a tail of its previous head. This is always what is happening here since b is always a newly constructed value, but it is probably worth documenting that this is the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I've updated the function to take a pointer for the second argument and zero it before returning.

Comment on lines +1038 to +968
var errs multierror.Errors
rootPut := func(key string, value interface{}) {
if _, err := root.Put(key, value); err != nil {
errs = append(errs, err)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄

}
}

func (s *state) expireFlows() (toReport linkedList) {
s.Lock()
defer s.Unlock()
deadline := s.clock().Add(-s.inactiveTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor point, but I'm wondering if s.clock() should be pulled out into now and have that used in the places here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 652 to 641
var toReport linkedList
// Send flows to the output as a deferred function to avoid
// holding on s mutex when there's backpressure from the output.
defer s.reportFlows(&toReport)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}

func (s *state) expireFlows() (toReport linkedList) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering at this point, when it is now a matter of collecting items, if it wouldn't be simpler to have a []linkedElement for reportFlows to work on; the allocations would likely be reduced (particularly if the size hints were used to preallocate) and the code would be simpler. This would involve changes in onSockTerminated and onFlowTerminated as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow. Removing elements from linkedLists and creating a new linked list from those existing elements is a zero-alloc operation. I'll add a test to ensure this is always the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think checking the escape analysis would be worthwhile. The head that is being obtained for adding to toReport is not the same memory that is in the source since we are not using the pointer, so it must be allocated. Since it is being used after the function returns, it cannot be on the stack, so I think the allocation needs to be on the heap. The tail of that head will all be already allocated and so they are certainly zero-alloc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right; I've looked at the escape analysis and it doesn't escape.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone over this more carefully because I was unable to find a fault in my logic (the -m report is not in source order so I missed it previously and accepted the output I saw) and I can see that the b parameter to append does leak.

./state.go:901:29: leaking param content: b

return true
}

func (ts *testingState) Error(_ error) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/_ //g

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

func (ts *testingState) feedEvents(evs []event) {
for idx, ev := range evs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add t.Helper() above this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@adriansr adriansr requested a review from efd6 July 6, 2022 09:59
@adriansr
Copy link
Contributor Author

adriansr commented Jul 6, 2022

@efd6 please re-review. I've updated the description with an explanation.

@efd6
Copy link
Contributor

efd6 commented Jul 6, 2022

OK, I have the logic now and it looks OK to me. Thanks for the clarification above. I think it would be helpful to also say that the back pressure is exerted through *state.Mutex. I'll take another look tomorrow.

Copy link
Contributor

@efd6 efd6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming my interpretation of the back pressure being exerted via the *state mutex is correct, this LGTM.

Comment on lines 879 to 880
s.reportFlow(f)
count++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If *state.reportFlow returns a conditional 1 for success and 0 for failure then the count would more accurately reflect what was reported (depending on whether you are interested in the attempts or the succeeds).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, done.

var toReport linkedList
// Send flows to the output as a deferred function to avoid
// holding on s mutex when there's backpressure from the output.
defer s.reportFlows(&toReport)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth logging the count here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to log in the main thread as this will report a flow every time a socket is closed.

The logging in ExpireFlows is there to have a way to tell when backpressure is happening without causing too much impact.

@adriansr
Copy link
Contributor Author

We have users testing this change. Waiting for feedback before moving out of the draft state.

@adriansr adriansr force-pushed the ab_socket_backpressure branch from 8845b36 to ef5c302 Compare July 22, 2022 10:16
@adriansr adriansr marked this pull request as ready for review July 22, 2022 10:18
@adriansr adriansr requested a review from a team as a code owner July 22, 2022 10:18
@elasticmachine
Copy link
Collaborator

Pinging @elastic/security-external-integrations (Team:Security-External Integrations)

@adriansr adriansr added backport-v8.3.0 Automated backport with mergify backport-7.17 Automated backport to the 7.17 branch with mergify bug and removed enhancement labels Jul 22, 2022
@adriansr adriansr removed the in progress Pull request is currently in progress. label Jul 25, 2022
@adriansr adriansr merged commit e362ca7 into elastic:main Jul 25, 2022
mergify bot pushed a commit that referenced this pull request Jul 25, 2022
This PR modifies the socket dataset so that it is affected by output
backpressure.

Originally, it operated as two separate goroutines. One would read kernel
tracing events as fast as possible and maintain a view of the open
sockets/flows on the system. Once a flow is deemed terminated, it would
be stored on a linked list (done) for it to be published to the output.

A separate goroutine consumes that linked list and publishes the flows.

Under heavy network load, it can generate flows faster than the output is
able to accept. Due to the linked list being unbounded, it would store
more and more flows, causing Auditbeat's memory usage to grow over time.

This PR simplifies flow publication in such a way that if the output
blocks, the main reader goroutine will block too, preventing the
consumption of new kernel events and the generation of additional flows
until the output is able to accept more documents.

Also it contains a necessary refactor: Moving the linked list
implementation to a helper package. This adds a bit of noise.

(cherry picked from commit e362ca7)

# Conflicts:
#	x-pack/auditbeat/module/system/socket/state.go
#	x-pack/auditbeat/module/system/socket/state_test.go
mergify bot pushed a commit that referenced this pull request Jul 25, 2022
This PR modifies the socket dataset so that it is affected by output
backpressure.

Originally, it operated as two separate goroutines. One would read kernel
tracing events as fast as possible and maintain a view of the open
sockets/flows on the system. Once a flow is deemed terminated, it would
be stored on a linked list (done) for it to be published to the output.

A separate goroutine consumes that linked list and publishes the flows.

Under heavy network load, it can generate flows faster than the output is
able to accept. Due to the linked list being unbounded, it would store
more and more flows, causing Auditbeat's memory usage to grow over time.

This PR simplifies flow publication in such a way that if the output
blocks, the main reader goroutine will block too, preventing the
consumption of new kernel events and the generation of additional flows
until the output is able to accept more documents.

Also it contains a necessary refactor: Moving the linked list
implementation to a helper package. This adds a bit of noise.

(cherry picked from commit e362ca7)
adriansr added a commit that referenced this pull request Jul 25, 2022
…put (#32480)

This PR modifies the socket dataset so that it is affected by output
backpressure.

Originally, it operated as two separate goroutines. One would read kernel
tracing events as fast as possible and maintain a view of the open
sockets/flows on the system. Once a flow is deemed terminated, it would
be stored on a linked list (done) for it to be published to the output.

A separate goroutine consumes that linked list and publishes the flows.

Under heavy network load, it can generate flows faster than the output is
able to accept. Due to the linked list being unbounded, it would store
more and more flows, causing Auditbeat's memory usage to grow over time.

This PR simplifies flow publication in such a way that if the output
blocks, the main reader goroutine will block too, preventing the
consumption of new kernel events and the generation of additional flows
until the output is able to accept more documents.

Also it contains a necessary refactor: Moving the linked list
implementation to a helper package. This adds a bit of noise.

(cherry picked from commit e362ca7)

Co-authored-by: Adrian Serrano <adrisr83@gmail.com>
adriansr added a commit that referenced this pull request Jul 25, 2022
This PR modifies the socket dataset so that it is affected by output
backpressure.

Originally, it operated as two separate goroutines. One would read kernel
tracing events as fast as possible and maintain a view of the open
sockets/flows on the system. Once a flow is deemed terminated, it would
be stored on a linked list (done) for it to be published to the output.

A separate goroutine consumes that linked list and publishes the flows.

Under heavy network load, it can generate flows faster than the output is
able to accept. Due to the linked list being unbounded, it would store
more and more flows, causing Auditbeat's memory usage to grow over time.

This PR simplifies flow publication in such a way that if the output
blocks, the main reader goroutine will block too, preventing the
consumption of new kernel events and the generation of additional flows
until the output is able to accept more documents.

Also it contains a necessary refactor: Moving the linked list
implementation to a helper package. This adds a bit of noise.

(cherry picked from commit e362ca7)
adriansr added a commit that referenced this pull request Jul 26, 2022
This PR modifies the socket dataset so that it is affected by output
backpressure.

Originally, it operated as two separate goroutines. One would read kernel
tracing events as fast as possible and maintain a view of the open
sockets/flows on the system. Once a flow is deemed terminated, it would
be stored on a linked list (done) for it to be published to the output.

A separate goroutine consumes that linked list and publishes the flows.

Under heavy network load, it can generate flows faster than the output is
able to accept. Due to the linked list being unbounded, it would store
more and more flows, causing Auditbeat's memory usage to grow over time.

This PR simplifies flow publication in such a way that if the output
blocks, the main reader goroutine will block too, preventing the
consumption of new kernel events and the generation of additional flows
until the output is able to accept more documents.

Also it contains a necessary refactor: Moving the linked list
implementation to a helper package. This adds a bit of noise.

(cherry picked from commit e362ca7)

Co-authored-by: Adrian Serrano <adrisr83@gmail.com>
chrisberkhout pushed a commit that referenced this pull request Jun 1, 2023
This PR modifies the socket dataset so that it is affected by output
backpressure.

Originally, it operated as two separate goroutines. One would read kernel
tracing events as fast as possible and maintain a view of the open
sockets/flows on the system. Once a flow is deemed terminated, it would
be stored on a linked list (done) for it to be published to the output.

A separate goroutine consumes that linked list and publishes the flows.

Under heavy network load, it can generate flows faster than the output is
able to accept. Due to the linked list being unbounded, it would store
more and more flows, causing Auditbeat's memory usage to grow over time.

This PR simplifies flow publication in such a way that if the output
blocks, the main reader goroutine will block too, preventing the
consumption of new kernel events and the generation of additional flows
until the output is able to accept more documents.

Also it contains a necessary refactor: Moving the linked list
implementation to a helper package. This adds a bit of noise.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-7.17 Automated backport to the 7.17 branch with mergify backport-v8.3.0 Automated backport with mergify bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Auditbeat: Propagate output backpressure in socket dataset
3 participants