Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[extension/healthcheckv2] Add event aggregation logic (#32695)
**Description:** This PR is the second in a series to decompose #30673 into more manageable pieces for review. **Aggregator** This PR introduces an aggregator data structure and event aggregation logic for status events. The extension implements the `StatusWatcher` optional interface, which the collector will call with a `component.StatusEvent` for each change in component status. These events will be aggregated by an aggregation function, and stored in the aggregator. The aggregator is a recursive data structure. At the top it contains the overall status of the collector. At the next level, it contains the statuses for each pipeline, and at the level below that, it contains the statuses for each component in a pipeline. Each node in the data structure is an aggregation the status events in the level below. The overall collector status is the aggregation of the pipeline statuses, and at the next level, the pipeline statuses are the aggregations of the component statuses. The data structure allows you to query the status of the collector overall, or for individual pipelines by name. There is also a pub/sub mechanism used for streaming aggregated statuses. **Aggregation Function** The purpose of the aggregator is to aggregate events so that the most relevant status event bubbles to the top. This allows us to get the status of the collector overall or a pipeline through a simple lookup. There is an aggregation function that determines the priority of events and how they should be aggregated. In many cases, the result will be an existing status event. In some cases a new event will be synthesized. In order to match the behavior existing healthcheck extension, lifecycle events (e.g. starting, stopping, etc) are prioritized over runtime events. Next, error statuses are prioritized with PermanentErrors as higher priority than RecoverableErrors, but this can vary based on user provided configuration. If PermanentErrors are ignored by configuration, but RecoverableErrors are included, then RecoverableErrors will take priority over PermanentErrors. The StatusWatcher interface receives immutable events of type `component.StatusEvent`. Since we sometimes need to synthesize new events during aggregation, an `Event` interface was introduced so that the aggregator can use `component.StatusEvent` instances or instances of events synthesized by the status package. It's worth mentioning that there is [existing status event aggregation logic](https://github.com/open-telemetry/opentelemetry-collector/blob/main/component/status.go#L101-L190) in collector core, but it did not meet the needs of this extension. It does not prioritize lifecycle events over error events, and it will always prioritize permanent errors over recoverable. By prioritizing lifecycle events over error events we can return a 503 when restarting a collector rather than a 500 when a collector in a final state, such as PermanentError. This is necessary to match the behavior of the existing extension. Since users have the option to include or ignore recoverable and permanent errors, we need the ability to prioritize them accordingly. We can discuss what the fate of the aggregation code in core should be. **Examples** Below are examples of overall collector and pipeline status that are based on the aggregator data structure. The rendering of the examples will come in a later PR. You can also look at the parent PR to see how all of this fits together. Note that the pipeline status example is a subtree of the overall collector status. Overall collector status: ```json { "start_time": "2024-01-18T17:27:12.570394-08:00", "healthy": true, "status": "StatusRecoverableError", "error": "rpc error: code = ResourceExhausted desc = resource exhausted", "status_time": "2024-01-18T17:27:32.572301-08:00", "components": { "extensions": { "healthy": true, "status": "StatusOK", "status_time": "2024-01-18T17:27:12.570428-08:00", "components": { "extension:healthcheckv2": { "healthy": true, "status": "StatusOK", "status_time": "2024-01-18T17:27:12.570428-08:00" } } }, "pipeline:metrics/grpc": { "healthy": true, "status": "StatusRecoverableError", "error": "rpc error: code = ResourceExhausted desc = resource exhausted", "status_time": "2024-01-18T17:27:32.572301-08:00", "components": { "exporter:otlp/staging": { "healthy": true, "status": "StatusRecoverableError", "error": "rpc error: code = ResourceExhausted desc = resource exhausted", "status_time": "2024-01-18T17:27:32.572301-08:00" }, "processor:batch": { "healthy": true, "status": "StatusOK", "status_time": "2024-01-18T17:27:12.571132-08:00" }, "receiver:otlp": { "healthy": true, "status": "StatusOK", "status_time": "2024-01-18T17:27:12.571576-08:00" } } }, "pipeline:traces/http": { "healthy": true, "status": "StatusOK", "status_time": "2024-01-18T17:27:12.571625-08:00", "components": { "exporter:otlphttp/staging": { "healthy": true, "status": "StatusOK", "status_time": "2024-01-18T17:27:12.571615-08:00" }, "processor:batch": { "healthy": true, "status": "StatusOK", "status_time": "2024-01-18T17:27:12.571621-08:00" }, "receiver:otlp": { "healthy": true, "status": "StatusOK", "status_time": "2024-01-18T17:27:12.571625-08:00" } } } } } ``` Status for pipeline `traces/http`: ```json { "start_time": "2024-01-18T17:27:12.570394-08:00", "healthy": true, "status": "StatusOK", "status_time": "2024-01-18T17:27:12.571625-08:00", "components": { "exporter:otlphttp/staging": { "healthy": true, "status": "StatusOK", "status_time": "2024-01-18T17:27:12.571615-08:00" }, "processor:batch": { "healthy": true, "status": "StatusOK", "status_time": "2024-01-18T17:27:12.571621-08:00" }, "receiver:otlp": { "healthy": true, "status": "StatusOK", "status_time": "2024-01-18T17:27:12.571625-08:00" } } } ``` **Link to tracking Issue:** #26661 **Testing:** Units / manual **Documentation:** Comments, etc --------- Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com>
- Loading branch information