-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create prototype shipper-beat #35318
Create prototype shipper-beat #35318
Conversation
Pinging @elastic/elastic-agent (Team:Elastic-Agent) |
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(To repeat some of our offline conversation:) My biggest comment is that I'd prefer to track acknowledgments with the existing client callback mechanisms rather than expanding the queue API, especially since this approach requires expanding the Pipeline
interface.
The client callbacks are already used throughout Filebeat to track acknowledgments and metrics, so any errors they cause would also be high-priority errors in Filebeat itself. The biggest problem with them right now is that they're undocumented, so it's not obvious what invariants they satisfy or how to deploy them correctly, but I'd rather we work on improving their documentation and simplifying their use than expand yet-another alternate signal path inside the pipeline.
Here's how I think a callback-based implementation would go:
- Each
shipperInput
(corresponding to one component) should maintain its own atomicpersistedIndex
counter, initialized to zero. (This means different components will have incompatible/overlapping event IDs, which I think is fine, though if we really wanted a shared counter for some reason, we could give them one without breaking the rest of this summary.) - In
shipperInput.Run
, when callingpipeline.ConnectWith
, include the configurationEventListener: acker.TrackingCounter(ackCallback)
- The callback here has type
func(acked int, total int)
. Seelibbeat/comment/acker/acker.go
for details on how it works.TrackingCounter
exists to solve the issue that successful ACKs are always batched and delivered strictly in publication order, whereas failures during publication are delivered immediately (before earlier events have been acknowledged) -- but for our purposes, "successfully acked" and "filtered by processors without sending to the queue" both have the same effect onpersistedIndex
, and we want to report both of them in strictly increasing order.TrackingCounter
handles that bookkeeping so we don't have to worry about error callbacks happening out of order.
- The callback here has type
- In the callback for
TrackingCounter
, update the input'spersistedIndex
by addingtotal
to it. Despite the name, "total" here means an incremental total, i.e. the total number of events that are being acknowledged by this callback, not the total that have been sent overall. Theacked
parameter includes only those events that were successfully sent upstream rather than dropped by processors etc, but since we don't make that distinction inpersistedIndex
you can probably ignore it.
You can otherwise leave the shipperInput
logic unchanged, and report persistedIndex
from your atomic counter rather than from a pipeline helper.
This will probably change the PR a lot, apologies for that, but I think it will also make it a lot smaller and more self-contained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank You, this is a big step forward. The only major concern the default processors change, and if it would be a breaking change.
x-pack/filebeat/cmd/root.go
Outdated
{"add_cloud_metadata": nil}, | ||
{"add_docker_metadata": nil}, | ||
{"add_kubernetes_metadata": nil}, | ||
// {"add_cloud_metadata": nil}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be a breaking change to remove these processors from the default Processors. Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it definitely would be. We also don't want to duplicate them in the shipper Filebeat. The solution we should pursue for now should be to configure the shipper instance of Filebeat without any default processors, and leave them in the inputs. This will leave the processor configuration unchanged from what it is today.
In the near future we want all the default processors to migrate to the shipper (see elastic/elastic-agent-shipper#292). There is some complexity to this however:
- If someone disables the shipper the default processors need to go back in the inputs
- The default processor configurations vary somewhat by Beat
- The shipper's default processors are true global processors and will apply to events the did not before, like endpoint security.
Those are my three reasons for not wanting these enabled in the shipper Filebeat as part of this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry, I did that while I was debugging to make the output a little less cluttered, forgot to remove it, hahaha
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's enough to just put it back, I think you need to conditionally remove it when Filebeat is run as the shipper. Otherwise the default Filebeat processors will become the default output processors, which might have unexpected side effects. The default processors for each Beat are not all the same today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also agreed; we should probably just have the shipper disable any global processors until we figure out how to move them into the shipper properly.
// inject index from stream config into metadata | ||
// not sure how stock beats do this, and also not sure if it's actually needed? | ||
metadata := helpers.AsMap(event.Metadata) | ||
metadata["index"] = stream.index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't need this. I think the data_stream is already in the Fields
but if not we should just add the data_stream, not index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yah, that's what I thought too, but I wasn't 100% sure. Although not sure how we should attempt to add data_stream
values if they don't already exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Event stuct has Datastream. https://github.com/elastic/elastic-agent-shipper-client/blob/da2edfba0431acad015a03e8f184b970ba3457b6/pkg/proto/messages/publish.pb.go#LL96C1-L111C2
Clients have to fill this in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yah, I assume that would be client-side. Your first comment made me think that the shipper should somehow fill in the missing Datastream
object if it can't find one.
Just realized I forgot the other half of the |
Thanks for the feedback @faec , that comment cleared up most of my confusion about how the acker worked, I also copy-pasted some of your comments into the code to hopefully make things a little more clear in the future. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Just some minor comments
// ShipperMode checks to see if we're running filebeat as a shipper. | ||
// This is primarily used to disable global processors. | ||
// Handling global processors in the shipper is an ongoing issue, see https://github.com/elastic/elastic-agent-shipper/issues/292 | ||
func ShipperMode() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this implementation mean that the shipper has to be enabled on the command-line rather than in the beat configuration? (This is ok if so, I'm just trying to understand what constraints we're working with.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, technically it has to be both, since the global processor setup happens so early on that we have to catch it before we get any config from fleet.
// strip the config we get from agent | ||
config.Conn.Server = strings.TrimPrefix(config.Conn.Server, "unix://") | ||
// following lines are helpful for debugging config, | ||
// will be useful as we figure out how to integrate with agent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you 😭
Alright, had to change the shipper mode value from a command line flag to an environment variable, since the global processor injection happens before the CLI flags are initialized. |
Can we just change the initialization order? With a command line flag we can use the agent spec file to set that argument when launching the shipper, with an environment variable we need to modify the agent to do this either based on new spec file syntax or just hard coding it (which we want to avoid). |
@cmacknz So, the agent spec file already has an |
Yup, the specfile supports |
Ah great, I only looked at the existing spec files not the underlying data type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM would wait for @faec to review before merge
* first pass at shipper beat * finish shutdown, cleanup code * cleanup management code * try to make linter happy * fix tests * use built-in acker, clean up server code * fix merge * cleanup * add check for shipper mode in processors * add headers * fix duration in windows * fix linter * clean up server * cleanup, use env var for shipper mode * remove debug statements
What does this PR do?
Close #35135 (to see for context)
This creates a shipper filebeat input. When run, it starts a single filebeat input and gRPC server per-agent input, configures any processors, then sends the output upstream as any other filebeat input normally would.
The changes needed for this include:
shipper
filebeat inputPublish()
returns aqueue.EntryID
value.PersistedIndex
valueshipper.enabled
flag used to disable global processors when filebeat is in shipper modeWhat currently works
Checklist
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Author's Checklist
How to test this PR locally
components
directoryfilebeat.spec.yml
, under theinputs
section:elastic-agent.yml
config to start a shipper input. Example:Note that this is just a generic metricbeat input config, but with the
type
changed toshipper
, and theserver
component added.filebeat.spec.yml
file to add-E shipper.enabled=true