Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Component Status Reporting #8169

Merged
merged 40 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1115e9d
Introduce component status reporting
tigrannajaryan Nov 15, 2022
897ebf8
Get things working after rebase
mwear Aug 1, 2023
a98e35b
Add changelog
mwear Aug 2, 2023
1e77eb5
Lint
mwear Aug 2, 2023
8cf3ece
Implement and use component.GlobalID in place of component.StatusSource
mwear Aug 2, 2023
35e703f
Replace GlobalID with InstanceID
mwear Aug 15, 2023
124d20e
Status implemented as a finite state machine
mwear Aug 22, 2023
dd19267
Add ReportFatalError behavior to ReportComponentStatus
mwear Aug 24, 2023
0d954fd
Improved testing; cleanup
mwear Aug 24, 2023
b45540e
Move ReportComponentStatus from Host to TelemetrySettings
mwear Aug 30, 2023
2f0f54e
Share state machines between component and service versionss of Repor…
mwear Sep 8, 2023
e690b51
Automatically report status during startup/shutdown
mwear Sep 9, 2023
9bba9be
StatusFunc improvements
mwear Sep 10, 2023
7d08aa5
Cleanup
mwear Sep 10, 2023
7f35ad5
Do not automatically report StatusOK during startup
mwear Sep 11, 2023
26e59f1
Refactor instanceID creation
mwear Sep 14, 2023
67676a4
More accurate comments
mwear Sep 14, 2023
e0a891e
Fix state transitions to StatusStopped
mwear Sep 14, 2023
a0c106c
Remove functional options; replace with per-error type event construc…
mwear Sep 15, 2023
ec257c6
Do not return errors from StatusWatchers
mwear Sep 15, 2023
2754546
Rename servicetelemetry.Settings to servicetelemetry.TelemetrySettings
mwear Sep 15, 2023
f1c678f
Fix typo in component/status.go
mwear Sep 15, 2023
ed88afd
Handle ReportComponentStatus for SharedComponents
mwear Sep 15, 2023
22f2c40
Implement AggregateStatus to compute an effective status from a map o…
mwear Sep 16, 2023
362e1ab
Add additional utility methods for component status and events
mwear Sep 17, 2023
f821467
Automatically report status for extensions during startup/shutdown
mwear Sep 17, 2023
178f885
Replace LastErrorEvent with more flexible LastEventByStatus
mwear Sep 17, 2023
6f265c1
Add component.EffectiveStatus method
mwear Sep 18, 2023
20bd41d
Fix out of date comment
mwear Sep 20, 2023
bed7e50
Rename EffectiveStatus to AggregateStatusEvent; add more comments
mwear Sep 20, 2023
5cb9626
Fix flaky test
mwear Sep 20, 2023
e4fb9ac
Correct comments in component/telemetry.go
mwear Sep 27, 2023
59ad517
Improve comments
mwear Sep 29, 2023
0a646ad
Move StatusWatcher interface to extension package
mwear Sep 29, 2023
4ada43f
Reduce public API for event utility methods
mwear Sep 29, 2023
9f0811f
Update comment in component/status.go
mwear Oct 3, 2023
9455b7f
Fix automatic status reporting for SharedComponents
mwear Oct 5, 2023
5361925
Update version in deprecation comments
mwear Oct 5, 2023
711fb73
Streamline error assertions
mwear Oct 5, 2023
6e09fb0
Fix test post rebase
mwear Oct 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/component-status.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: core

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds the ability for components to report status and for extensions to subscribe to status events by implementing an optional StatusWatcher interface.

# One or more tracking issues or pull requests related to the change
issues: [7682]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
7 changes: 7 additions & 0 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,10 @@ type CreateDefaultConfigFunc func() Config
func (f CreateDefaultConfigFunc) CreateDefaultConfig() Config {
return f()
}

// InstanceID uniquely identifies a component instance
type InstanceID struct {
ID ID
Kind Kind
PipelineIDs map[ID]struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does an InstanceID need a map of its pipelines?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. This is what differentiates instances of a component. For example:

receivers:
  foo: ...

exporters:
  bar: ...

pipelines:
  logs:
    receivers: foo
    exporters: bar
  logs/2:
    receivers: foo
    exporters: bar
  traces:
    receivers: foo
    exporters: bar

The configuration defines two components, but the collector will instantiate four instances:

{
	ID: "foo",
	Kind: "receiver",
	PipelineIDs: { "logs": {}, "logs/2": {} }
},
{
	ID: "foo",
	Kind: "receiver",
	PipelineIDs: { "traces": {}, }
},
{
	ID: "bar",
	Kind: "exporter",
	PipelineIDs: { "logs": {}, "logs/2": {} }
},
{
	ID: "bar",
	Kind: "exporter",
	PipelineIDs: { "traces": {}, }
},

This differentiation isn't utilized in this PR but it will be necessary in order to produce an accurate & aggregated health for a component or pipeline. @mwear, do you think this logic should be part of this PR too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was leaving this for our first StatusWatchers, but I think it does make sense to have a shared method to compute an aggregated status. Would a function, component.AggregateStatus([]*StatusEvent) *StatusEvent make sense? It would return the highest priority event from the slice (based on the status value). If there is a tie, it would use the earliest timestamp? Is this something along the lines of what you had in mind?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an aggregation function definitely makes sense.

It would return the highest priority event from the slice (based on the status value).

Based on the analysis I did earlier, I think this logic would fail to account for some notable cases.

For example, say we're in the process of Stopping a pipeline. We could in theory have four instance with the following statuses: [Stopped, OK, Recoverable, Starting]. The first instance has presumably been issued a Stop command and immediately complied. The others either haven't yet been issued the command, haven't acked it, or for whatever reason their statuses haven't been updated yet. I think the correct aggregated status here is Stopping, even though none of the instances show that status. We have to infer that because one instance is Stopped, we must be in the process of stopping the component or pipeline as a whole.

I had proposed specific logic for aggregation here. However, if you disagree and think we should stick with a simple highest priority, then I think the following is the closest order:

	StatusNone Status = iota
	StatusOK
	StatusStarting // Aggregate is not OK until all instances are done starting
	StatusRecoverableError
	StatusStopped
	StatusStopping // Aggregate is not Stopped until all instances are done stopping
	StatusPermanentError
	StatusFatalError

My proposed logic didn't account for timestamps, but I think there are two aspects to this which we need to handle.

  1. Ensure that we only consider the latest event from each instance. I think you could solve this by changing the signature to accept a map: AggregateStatus(map[InstanceID]*StatusEvent) *StatusEvent
  2. Assuming (1), what is the appropriate timestamp to apply to an aggregated status? We could probably make a case for any of the following:
    • Earliest, because it indicates the "most stale" instance status.
    • Latest, because it indicates the most recently reported instance status.
    • Average, median, or some other statistical representation e.g. excluding outliers
    • Some combination of the above. This would require a slightly different structured though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this a little more, I don't think it's necessary to return a StatusEvent for an aggregate status. An effective status is most likely all that we're going to need. I went ahead and implemented component.AggregateStatus based on your rules from #8169 (comment). I think it's good that we codified the rules for computing an effective status and ff it turns out we need more later, we can build on this initial implementation. Does that sound reasonable to you?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote up a proposal for how we can compute effective status for pipelines by aggregating component status, and how we can compute effective status for the collector overall by aggregating pipeline status. The proposal is here: open-telemetry/opamp-spec#165 (comment). I added two more utility methods so that we can find the most recent status event (component.LastStatusEvent) and find the most recent event with a specific status (component.LastEventByStatus). Along with component.AggregateStatus, I believe we have all the utility methods a StatusWatcher would need to report status according to my proposal. Since we need to be able to group by InstanceID and pipelineID (which is of type component.ID), I implemented the functions with a type parameter for the key of the map of status events passed in. They will work with any comparable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I've come full circle and have decided that it would be useful to have a method that returns an effective status (as a status event). I called the method component.EffectiveStatus. Based on the OpAMP proposal, the effective status should have:

  • a status equal to the aggregated status
  • the timestamp of the most recent status event
  • for an error status, the error of the most recent event with the same error status

The status is an aggregation of all the current states, the timestamp represents the most recent state change, and in the case of an error status, the error on the event will match the most recent occurrence of the same error state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the aggregate collector status makes sense too.

One thought on this regarding recursion - While I think it is ok, it's likely not necessary in this case. What I mean is that the following should all be true:

  1. A component status is an aggregate of its instance statuses.
  2. A pipeline status is an aggregate of its instance statuses.
  3. Any of the following should be equivalent:
  • A collector status is an aggregate of its pipeline statuses.
  • A collector status is an aggregate of its component statuses.
  • A collector status is an aggregate of its instance statuses.

Notably here, I think we could directly aggregate all instance statuses across the collector. However, we should get the same result if we derive the collector status from either component or pipeline statuses.

}
3 changes: 3 additions & 0 deletions component/componenttest/nop_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,8 @@ func NewNopTelemetrySettings() component.TelemetrySettings {
MeterProvider: noop.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
ReportComponentStatus: func(*component.StatusEvent) error {
return nil
},
}
}
2 changes: 2 additions & 0 deletions component/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ type Host interface {
//
// ReportFatalError should be called by the component anytime after Component.Start() ends and
// before Component.Shutdown() begins.
// Deprecated: [0.87.0] Use TelemetrySettings.ReportComponentStatus instead (with an event
// component.StatusFatalError)
ReportFatalError(err error)

// GetFactory of the specified kind. Returns the factory for a component type.
Expand Down
193 changes: 193 additions & 0 deletions component/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package component // import "go.opentelemetry.io/collector/component"

import (
"time"
)

type Status int32

// Enumeration of possible component statuses
const (
StatusNone Status = iota
StatusStarting
StatusOK
StatusRecoverableError
StatusPermanentError
StatusFatalError
StatusStopping
StatusStopped
)

// String returns a string representation of a Status
func (s Status) String() string {
switch s {
case StatusStarting:
return "StatusStarting"
case StatusOK:
return "StatusOK"
case StatusRecoverableError:
return "StatusRecoverableError"
case StatusPermanentError:
return "StatusPermanentError"
case StatusFatalError:
return "StatusFatalError"
case StatusStopping:
return "StatusStopping"
case StatusStopped:
return "StatusStopped"
}
return "StatusNone"
}

// StatusEvent contains a status and timestamp, and can contain an error
type StatusEvent struct {
status Status
err error
timestamp time.Time
}

// Status returns the Status (enum) associated with the StatusEvent
func (ev *StatusEvent) Status() Status {
return ev.status
}

// Err returns the error associated with the StatusEvent.
func (ev *StatusEvent) Err() error {
return ev.err
}

// Timestamp returns the timestamp associated with the StatusEvent
func (ev *StatusEvent) Timestamp() time.Time {
return ev.timestamp
}

// NewStatusEvent creates and returns a StatusEvent with the specified status and sets the timestamp
// time.Now(). To set an error on the event for an error status use one of the dedicated
// constructors (e.g. NewRecoverableErrorEvent, NewPermanentErrorEvent, NewFatalErrorEvent)
func NewStatusEvent(status Status) *StatusEvent {
return &StatusEvent{
status: status,
timestamp: time.Now(),
}
}

// NewRecoverableErrorEvent creates and returns a StatusEvent with StatusRecoverableError, the
// specified error, and a timestamp set to time.Now().
func NewRecoverableErrorEvent(err error) *StatusEvent {
ev := NewStatusEvent(StatusRecoverableError)
ev.err = err
return ev
}

// NewPermanentErrorEvent creates and returns a StatusEvent with StatusPermanentError, the
// specified error, and a timestamp set to time.Now().
func NewPermanentErrorEvent(err error) *StatusEvent {
ev := NewStatusEvent(StatusPermanentError)
ev.err = err
return ev
}

// NewFatalErrorEvent creates and returns a StatusEvent with StatusFatalError, the
// specified error, and a timestamp set to time.Now().
func NewFatalErrorEvent(err error) *StatusEvent {
ev := NewStatusEvent(StatusFatalError)
ev.err = err
return ev
}

// StatusFunc is the expected type of ReportComponentStatus for component.TelemetrySettings
type StatusFunc func(*StatusEvent) error

// AggregateStatus will derive a status for the given input using the following rules in order:
// 1. If all instances have the same status, there is nothing to aggregate, return it.
// 2. If any instance encounters a fatal error, the component is in a Fatal Error state.
// 3. If any instance is in a Permanent Error state, the component status is Permanent Error.
// 4. If any instance is Stopping, the component is in a Stopping state.
// 5. An instance is Stopped, but not all instances are Stopped, we must be in the process of Stopping the component.
// 6. If any instance is in a Recoverable Error state, the component status is Recoverable Error.
// 7. By process of elimination, the only remaining state is starting.
func AggregateStatus[K comparable](eventMap map[K]*StatusEvent) Status {
seen := make(map[Status]struct{})
for _, ev := range eventMap {
seen[ev.Status()] = struct{}{}
}

// All statuses are the same. Note, this will handle StatusOK and StatusStopped as these two
// cases require all components be in the same state.
if len(seen) == 1 {
for st := range seen {
return st
}
}

// Handle mixed status cases
if _, isFatal := seen[StatusFatalError]; isFatal {
return StatusFatalError
}

if _, isPermanent := seen[StatusPermanentError]; isPermanent {
return StatusPermanentError
}
Comment on lines +131 to +133
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that if any component declares they are permanently in an error state then we are no longer able to track stopping/stopped/recoverableerror/starting aggregate statuses, right? Are we OK with this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on whether we expose multiple levels of granularity, or just one. As I understand it, the intention here is that one could look at each component independently, or each pipeline independently, or the collector as a whole.

So if for example you have a component in a permanent error state, the collector would have an aggregate status of permanent error, but one could still see each component status independently.

Copy link
Member Author

@mwear mwear Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirming what @djaglowski is saying, an aggregate status is useful for the seeing the overall health of a collector, or the health of a pipeline, but we intend to expose the individual component statuses in addition to the aggregate statuses. A user can look at the aggregate statuses to quickly assess the health of a collector, but then drill down to the component level for more details.


if _, isStopping := seen[StatusStopping]; isStopping {
return StatusStopping
}

if _, isStopped := seen[StatusStopped]; isStopped {
return StatusStopping
}

if _, isRecoverable := seen[StatusRecoverableError]; isRecoverable {
return StatusRecoverableError
}

// By process of elimination, this is the last possible status; no check necessary.
return StatusStarting
mwear marked this conversation as resolved.
Show resolved Hide resolved
}

// StatusIsError returns true for error statuses (e.g. StatusRecoverableError,
// StatusPermanentError, or StatusFatalError)
func StatusIsError(status Status) bool {
return status == StatusRecoverableError ||
status == StatusPermanentError ||
status == StatusFatalError
}

// AggregateStatusEvent returns a status event where:
// - The status is set to the aggregate status of the events in the eventMap
// - The timestamp is set to the latest timestamp of the events in the eventMap
// - For an error status, the event will have same error as the most current event of the same
// error type from the eventMap
func AggregateStatusEvent[K comparable](eventMap map[K]*StatusEvent) *StatusEvent {
var lastEvent, lastMatchingEvent *StatusEvent
aggregateStatus := AggregateStatus[K](eventMap)

for _, ev := range eventMap {
if lastEvent == nil || lastEvent.timestamp.Before(ev.timestamp) {
lastEvent = ev
}
if aggregateStatus == ev.Status() &&
(lastMatchingEvent == nil || lastMatchingEvent.timestamp.Before(ev.timestamp)) {
lastMatchingEvent = ev
}
}

// the effective status matches an existing event
if lastEvent.Status() == aggregateStatus {
return lastEvent
}

// the effective status requires a synthetic event
aggregateEvent := &StatusEvent{
status: aggregateStatus,
timestamp: lastEvent.timestamp,
}
if StatusIsError(aggregateStatus) {
aggregateEvent.err = lastMatchingEvent.err
}

return aggregateEvent
}
Loading