Skip to content

Commit

Permalink
Use server-side filtering when retrieving Cloud Foundry logs (#33456)
Browse files Browse the repository at this point in the history
It reduces CPU usage of Filebeat significatively.

Co-authored-by: MichaelKatsoulis <michaelkatsoulis88@gmail.com>
(cherry picked from commit 5cd8b09)
  • Loading branch information
jsoriano authored and mergify[bot] committed Nov 10, 2022
1 parent cd8534c commit 421874f
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 20 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- httpjson input: Add request tracing logger. {issue}32402[32402] {pull}32412[32412]
- Add cloudflare R2 to provider list in AWS S3 input. {pull}32620[32620]
- Add support for single string containing multiple relation-types in getRFC5988Link. {pull}32811[32811]
- Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896]
- Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377]
- Improve httpjson documentation for split processor. {pull}33473[33473]
- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]
- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456]

*Auditbeat*

Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/cloudfoundry/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package cloudfoundry

import (
"github.com/pkg/errors"
"fmt"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
Expand Down Expand Up @@ -48,7 +48,7 @@ func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error {

consumer, err := hub.DopplerConsumer(callbacks)
if err != nil {
return errors.Wrapf(err, "initializing doppler consumer")
return fmt.Errorf("initializing doppler consumer: %w", err)
}

stopCtx, cancel := ctxtool.WithFunc(ctx.Cancelation, func() {
Expand Down
24 changes: 6 additions & 18 deletions x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,27 +80,15 @@ func (c *DopplerConsumer) Run() {
}

func (c *DopplerConsumer) logsFirehose() {
c.firehose(c.callbacks.Log, consumer.LogMessages)
c.firehose(c.callbacks.Log, filterLogs, consumer.LogMessages)
}

func (c *DopplerConsumer) metricsFirehose() {
c.firehose(c.callbacks.Metric, consumer.Metrics)
}

func (c *DopplerConsumer) firehose(cb func(evt Event), filter consumer.EnvelopeFilter) {
var msgChan <-chan *events.Envelope
var errChan <-chan error
filterFn := filterNoFilter
if filter == consumer.LogMessages {
// We are interested in more envelopes than the ones obtained when filtering
// by log messages, retrieve them all and filter later.
// If this causes performance or other problems, we will have to investigate
// if it is possible to pass different filters to the firehose url.
filterFn = filterLogs
msgChan, errChan = c.consumer.Firehose(c.subscriptionID, "")
} else {
msgChan, errChan = c.consumer.FilteredFirehose(c.subscriptionID, "", filter)
}
c.firehose(c.callbacks.Metric, filterNoFilter, consumer.Metrics)
}

func (c *DopplerConsumer) firehose(cb func(evt Event), filterFn func(*events.Envelope) bool, filter consumer.EnvelopeFilter) {
msgChan, errChan := c.consumer.FilteredFirehose(c.subscriptionID, "", filter)
for {
select {
case env := <-msgChan:
Expand Down

0 comments on commit 421874f

Please sign in to comment.