Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create prototype shipper-beat #35318

Merged
merged 21 commits into from
May 11, 2023

Conversation

fearful-symmetry
Copy link
Contributor

@fearful-symmetry fearful-symmetry commented May 3, 2023

What does this PR do?

Close #35135 (to see for context)

This creates a shipper filebeat input. When run, it starts a single filebeat input and gRPC server per-agent input, configures any processors, then sends the output upstream as any other filebeat input normally would.

The changes needed for this include:

  • A new shipper filebeat input
  • Changes to the V2 central management code so we don't start a shipper input per-datastream
  • Changes to the fileset V2 Client API, so Publish() returns a queue.EntryID value.
  • A custom acker to track the PersistedIndex value
  • Adds a shipper.enabled flag used to disable global processors when filebeat is in shipper mode

What currently works

  • The shipper will start up and configure itself, and configure any per-datastream processors based on an input config
  • it will send and receive events in a way that appears correct

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Author's Checklist

  • Add tests - not sure if we want to use existing filebeat testing infrastructure, or try and use some of the new integration test suite tools.

How to test this PR locally

  • Right now, this has currently only been tested with agent in standalone mode.
  1. Pull down, build this PR
  2. Build or download the latest 8.8.0-SNAPSHOT build of elastic-agent
  3. unpack elastic-agent, place a build of filebeat from this branch into elastic-agent's components directory
  4. Add the following blurb to filebeat.spec.yml, under the inputs section:
  - name: shipper
    description: "Shipper"
    platforms: *platforms
    outputs: *outputs
    shippers: *shippers
    command: *command
  1. modify the elastic-agent.yml config to start a shipper input. Example:

inputs:
  - type: shipper
    # Each input must have a unique ID.
    id: shipper-id-one
    data_stream:
      namespace: default
    use_output: default
    server: /tmp/test-shipper-beat.sock
    streams:
      - id: system/metrics-test
        metricsets:
        - process_summary
        data_stream:
          dataset: system.process_summary
          type: metrics
        period: 1m
        processors:
        - drop_event.when.regexp:
            system.filesystem.mount_point: ^/(sys|cgroup|proc|dev|etc|host|lib|snap)($|/)

Note that this is just a generic metricbeat input config, but with the type changed to shipper, and the server component added.

  1. Start elastic-agent, make sure the filebeat-shipper starts successfully
  2. Start another beat that's configured to use the filebeat-shipper as the output. This can be done by setting the output config as such:
output.shipper:
  server: unix:///tmp/test-shipper-beat.sock
  1. To check processor settings, modify the filebeat.spec.yml file to add -E shipper.enabled=true

@fearful-symmetry fearful-symmetry added the Team:Elastic-Agent Label for the Agent team label May 3, 2023
@fearful-symmetry fearful-symmetry self-assigned this May 3, 2023
@fearful-symmetry fearful-symmetry requested review from a team as code owners May 3, 2023 19:01
@fearful-symmetry fearful-symmetry requested review from faec and leehinman and removed request for a team May 3, 2023 19:01
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent (Team:Elastic-Agent)

@botelastic botelastic bot added needs_team Indicates that the issue/PR needs a Team:* label and removed needs_team Indicates that the issue/PR needs a Team:* label labels May 3, 2023
@mergify
Copy link
Contributor

mergify bot commented May 3, 2023

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @fearful-symmetry? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v8./d.0 is the label to automatically backport to the 8./d branch. /d is the digit

@elasticmachine
Copy link
Collaborator

elasticmachine commented May 3, 2023

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2023-05-10T22:26:48.470+0000

  • Duration: 84 min 49 sec

Test stats 🧪

Test Results
Failed 0
Passed 26339
Skipped 1975
Total 28314

💚 Flaky test report

Tests succeeded.

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

Copy link
Contributor

@faec faec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(To repeat some of our offline conversation:) My biggest comment is that I'd prefer to track acknowledgments with the existing client callback mechanisms rather than expanding the queue API, especially since this approach requires expanding the Pipeline interface.

The client callbacks are already used throughout Filebeat to track acknowledgments and metrics, so any errors they cause would also be high-priority errors in Filebeat itself. The biggest problem with them right now is that they're undocumented, so it's not obvious what invariants they satisfy or how to deploy them correctly, but I'd rather we work on improving their documentation and simplifying their use than expand yet-another alternate signal path inside the pipeline.

Here's how I think a callback-based implementation would go:

  • Each shipperInput (corresponding to one component) should maintain its own atomic persistedIndex counter, initialized to zero. (This means different components will have incompatible/overlapping event IDs, which I think is fine, though if we really wanted a shared counter for some reason, we could give them one without breaking the rest of this summary.)
  • In shipperInput.Run, when calling pipeline.ConnectWith, include the configuration EventListener: acker.TrackingCounter(ackCallback)
    • The callback here has type func(acked int, total int). See libbeat/comment/acker/acker.go for details on how it works. TrackingCounter exists to solve the issue that successful ACKs are always batched and delivered strictly in publication order, whereas failures during publication are delivered immediately (before earlier events have been acknowledged) -- but for our purposes, "successfully acked" and "filtered by processors without sending to the queue" both have the same effect on persistedIndex, and we want to report both of them in strictly increasing order. TrackingCounter handles that bookkeeping so we don't have to worry about error callbacks happening out of order.
  • In the callback for TrackingCounter, update the input's persistedIndex by adding total to it. Despite the name, "total" here means an incremental total, i.e. the total number of events that are being acknowledged by this callback, not the total that have been sent overall. The acked parameter includes only those events that were successfully sent upstream rather than dropped by processors etc, but since we don't make that distinction in persistedIndex you can probably ignore it.

You can otherwise leave the shipperInput logic unchanged, and report persistedIndex from your atomic counter rather than from a pipeline helper.

This will probably change the PR a lot, apologies for that, but I think it will also make it a lot smaller and more self-contained.

Copy link
Contributor

@leehinman leehinman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank You, this is a big step forward. The only major concern the default processors change, and if it would be a breaking change.

libbeat/outputs/elasticsearch/client.go Outdated Show resolved Hide resolved
{"add_cloud_metadata": nil},
{"add_docker_metadata": nil},
{"add_kubernetes_metadata": nil},
// {"add_cloud_metadata": nil},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be a breaking change to remove these processors from the default Processors. Is that right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it definitely would be. We also don't want to duplicate them in the shipper Filebeat. The solution we should pursue for now should be to configure the shipper instance of Filebeat without any default processors, and leave them in the inputs. This will leave the processor configuration unchanged from what it is today.

In the near future we want all the default processors to migrate to the shipper (see elastic/elastic-agent-shipper#292). There is some complexity to this however:

  1. If someone disables the shipper the default processors need to go back in the inputs
  2. The default processor configurations vary somewhat by Beat
  3. The shipper's default processors are true global processors and will apply to events the did not before, like endpoint security.

Those are my three reasons for not wanting these enabled in the shipper Filebeat as part of this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, I did that while I was debugging to make the output a little less cluttered, forgot to remove it, hahaha

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's enough to just put it back, I think you need to conditionally remove it when Filebeat is run as the shipper. Otherwise the default Filebeat processors will become the default output processors, which might have unexpected side effects. The default processors for each Beat are not all the same today.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also agreed; we should probably just have the shipper disable any global processors until we figure out how to move them into the shipper properly.

x-pack/filebeat/input/shipper/input.go Outdated Show resolved Hide resolved
x-pack/filebeat/input/shipper/input.go Outdated Show resolved Hide resolved
x-pack/filebeat/input/shipper/input.go Outdated Show resolved Hide resolved
x-pack/filebeat/input/shipper/input.go Outdated Show resolved Hide resolved
Comment on lines 266 to 269
// inject index from stream config into metadata
// not sure how stock beats do this, and also not sure if it's actually needed?
metadata := helpers.AsMap(event.Metadata)
metadata["index"] = stream.index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't need this. I think the data_stream is already in the Fields but if not we should just add the data_stream, not index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yah, that's what I thought too, but I wasn't 100% sure. Although not sure how we should attempt to add data_stream values if they don't already exist?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yah, I assume that would be client-side. Your first comment made me think that the shipper should somehow fill in the missing Datastream object if it can't find one.

@faec
Copy link
Contributor

faec commented May 5, 2023

Just realized I forgot the other half of the persistedIndex interface, which is reporting to callers the index of the events you accepted from them. Maybe you figured this out on your own by now, but: similarly create a counter starting from zero, and increment it by one every time you call client.Publish -- that's the id of the event that you just sent. The alleged contract of TrackingCounter is that if you do that, the total values reported by the TrackingCounter callback will always eventually exactly match the number of calls to client.Publish.

@fearful-symmetry fearful-symmetry requested review from leehinman and faec May 8, 2023 17:01
@fearful-symmetry
Copy link
Contributor Author

Thanks for the feedback @faec , that comment cleared up most of my confusion about how the acker worked, I also copy-pasted some of your comments into the code to hopefully make things a little more clear in the future.

faec
faec previously approved these changes May 8, 2023
Copy link
Contributor

@faec faec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Just some minor comments

libbeat/common/fleetmode/fleet_mode.go Outdated Show resolved Hide resolved
x-pack/libbeat/management/generate.go Outdated Show resolved Hide resolved
// ShipperMode checks to see if we're running filebeat as a shipper.
// This is primarily used to disable global processors.
// Handling global processors in the shipper is an ongoing issue, see https://github.com/elastic/elastic-agent-shipper/issues/292
func ShipperMode() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this implementation mean that the shipper has to be enabled on the command-line rather than in the beat configuration? (This is ok if so, I'm just trying to understand what constraints we're working with.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, technically it has to be both, since the global processor setup happens so early on that we have to catch it before we get any config from fleet.

x-pack/filebeat/input/shipper/acker.go Outdated Show resolved Hide resolved
// strip the config we get from agent
config.Conn.Server = strings.TrimPrefix(config.Conn.Server, "unix://")
// following lines are helpful for debugging config,
// will be useful as we figure out how to integrate with agent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you 😭

x-pack/filebeat/input/shipper/input.go Show resolved Hide resolved
@fearful-symmetry fearful-symmetry requested a review from faec May 9, 2023 15:34
@fearful-symmetry
Copy link
Contributor Author

Alright, had to change the shipper mode value from a command line flag to an environment variable, since the global processor injection happens before the CLI flags are initialized.

@cmacknz
Copy link
Member

cmacknz commented May 9, 2023

Alright, had to change the shipper mode value from a command line flag to an environment variable, since the global processor injection happens before the CLI flags are initialized.

Can we just change the initialization order? With a command line flag we can use the agent spec file to set that argument when launching the shipper, with an environment variable we need to modify the agent to do this either based on new spec file syntax or just hard coding it (which we want to avoid).

@fearful-symmetry
Copy link
Contributor Author

with an environment variable we need to modify the agent to do this either based on new spec file syntax or just hard coding it (which we want to avoid).

@cmacknz So, the agent spec file already has an env field if I remember, I've used it before for gRPC debugging, since those flags are usually set via env vars.

@fearful-symmetry
Copy link
Contributor Author

Yup, the specfile supports Args and Env: https://github.com/elastic/elastic-agent/blob/a4234ac47f98260977309034ac6a7812963c1ab0/pkg/component/spec.go#L77

@cmacknz
Copy link
Member

cmacknz commented May 9, 2023

Ah great, I only looked at the existing spec files not the underlying data type.

@pierrehilbert pierrehilbert requested review from belimawr and rdner May 10, 2023 16:05
Copy link
Contributor

@leehinman leehinman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM would wait for @faec to review before merge

@fearful-symmetry fearful-symmetry merged commit 327b5fe into elastic:main May 11, 2023
chrisberkhout pushed a commit that referenced this pull request Jun 1, 2023
* first pass at shipper beat

* finish shutdown, cleanup code

* cleanup management code

* try to make linter happy

* fix tests

* use built-in acker, clean up server code

* fix merge

* cleanup

* add check for shipper mode in processors

* add headers

* fix duration in windows

* fix linter

* clean up server

* cleanup, use env var for shipper mode

* remove debug statements
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Team:Elastic-Agent Label for the Agent team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Elastic Agent] Prototype a shipper gRPC input for Filebeat
5 participants