Skip to content

Commit

Permalink
Automate status reporting on start (open-telemetry#8836)
Browse files Browse the repository at this point in the history
This is part of the continued component status reporting effort.
Currently we have automated status reporting for the following component
lifecycle events: `Starting`, `Stopping`, `Stopped` as well as
definitive errors that occur in the starting or stopping process (e.g.
as determined by an error return value). This leaves the responsibility
to the component to report runtime status after start and before stop.
We'd like to be able to extend the automatic status reporting to report
`StatusOK` if `Start` completes without an error. One complication with
this approach is that some components spawn async work (via goroutines)
that, depending on the Go scheduler, can report status before `Start`
returns. As such, we cannot assume a nil return value from `Start` means
the component has started properly. The solution is to detect if the
component has already reported status when start returns, if it has, we
will use the component-reported status and will not automatically report
status. If it hasn't, and `Start` returns without an error, we can
report `StatusOK`. Any subsequent reports from the component (async or
otherwise) will transition the component status accordingly.

The tl;dr is that we cannot control the execution of async code, that's
up to the Go scheduler, but we can handle the race, report the status
based on the execution, and not clobber status reported from within the
component during the startup process. That said, for components with
async starts, you may see a `StatusOK` before the component-reported
status, or just the component-reported status depending on the actual
execution of the code. In both cases, the end status will be same.

The work in this PR will allow us to simplify open-telemetry#8684 and open-telemetry#8788 and
ultimately choose which direction we want to go for runtime status
reporting.

**Link to tracking Issue:** open-telemetry#7682

**Testing:** units / manual

---------

Co-authored-by: Alex Boten <aboten@lightstep.com>
  • Loading branch information
2 people authored and pantuza committed Dec 8, 2023
1 parent 671265d commit fa319ca
Show file tree
Hide file tree
Showing 15 changed files with 346 additions and 112 deletions.
25 changes: 25 additions & 0 deletions .chloggen/automated-status-on-start.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: statusreporting

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Automates status reporting upon the completion of component.Start().

# One or more tracking issues or pull requests related to the change
issues: [7682]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
41 changes: 37 additions & 4 deletions component/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,43 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
)

// TelemetrySettings provides components with APIs to report telemetry.
//
// Note: there is a service version of this struct, servicetelemetry.TelemetrySettings, that mirrors
// this struct with the exception of ReportComponentStatus. When adding or removing anything from
// this struct consider whether or not the same should be done for the service version.
type TelemetrySettings struct {
// Logger that the factory can use during creation and can pass to the created
// component to be used later as well.
Logger *zap.Logger

// TracerProvider that the factory can pass to other instrumented third-party libraries.
TracerProvider trace.TracerProvider

// MeterProvider that the factory can pass to other instrumented third-party libraries.
MeterProvider metric.MeterProvider

// MetricsLevel controls the level of detail for metrics emitted by the collector.
// Experimental: *NOTE* this field is experimental and may be changed or removed.
MetricsLevel configtelemetry.Level

// Resource contains the resource attributes for the collector's telemetry.
Resource pcommon.Resource

// ReportComponentStatus allows a component to report runtime changes in status. The service
// will automatically report status for a component during startup and shutdown. Components can
// use this method to report status after start and before shutdown. ReportComponentStatus
// will only return errors if the API used incorrectly. The two scenarios where an error will
// be returned are:
//
// - An illegal state transition
// - Calling this method before component startup
//
// If the API is being used properly, these errors are safe to ignore.
ReportComponentStatus StatusFunc
}

// Deprecated: [0.91.0] Use TelemetrySettings directly
type TelemetrySettingsBase[T any] struct {
// Logger that the factory can use during creation and can pass to the created
// component to be used later as well.
Expand Down Expand Up @@ -42,7 +79,3 @@ type TelemetrySettingsBase[T any] struct {
// If the API is being used properly, these errors are safe to ignore.
ReportComponentStatus T
}

// TelemetrySettings and servicetelemetry.Settings differ in the method signature for
// ReportComponentStatus
type TelemetrySettings TelemetrySettingsBase[StatusFunc]
7 changes: 4 additions & 3 deletions internal/sharedcomponent/sharedcomponent.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), tel
c.seenSettings[telemetrySettings] = struct{}{}
prev := c.telemetry.ReportComponentStatus
c.telemetry.ReportComponentStatus = func(ev *component.StatusEvent) error {
if err := telemetrySettings.ReportComponentStatus(ev); err != nil {
return err
err := telemetrySettings.ReportComponentStatus(ev)
if prevErr := prev(ev); prevErr != nil {
err = prevErr
}
return prev(ev)
return err
}
}
return c, nil
Expand Down
24 changes: 19 additions & 5 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,24 @@ func TestComponentStatusWatcher(t *testing.T) {
// Start the newly created collector.
wg := startCollector(context.Background(), t, col)

// An unhealthy processor asynchronously reports a recoverable error.
expectedStatuses := []component.Status{
// An unhealthy processor asynchronously reports a recoverable error. Depending on the Go
// Scheduler the statuses reported at startup will be one of the two valid sequnces below.
startupStatuses1 := []component.Status{
component.StatusStarting,
component.StatusOK,
component.StatusRecoverableError,
}
startupStatuses2 := []component.Status{
component.StatusStarting,
component.StatusRecoverableError,
}
// the modulus of the actual statuses will match the modulus of the startup statuses
startupStatuses := func(actualStatuses []component.Status) []component.Status {
if len(actualStatuses)%2 == 1 {
return startupStatuses1
}
return startupStatuses2
}

// The "unhealthy" processors will now begin to asynchronously report StatusRecoverableError.
// We expect to see these reports.
Expand All @@ -197,8 +210,8 @@ func TestComponentStatusWatcher(t *testing.T) {
for k, v := range changedComponents {
// All processors must report a status change with the same ID
assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID)
// And all must have the expected statuses
assert.Equal(t, expectedStatuses, v)
// And all must have a valid startup sequence
assert.Equal(t, startupStatuses(v), v)
}
// We have 3 processors with exactly the same ID in otelcol-statuswatcher.yaml
// We must have exactly 3 items in our map. This ensures that the "source" argument
Expand All @@ -212,8 +225,9 @@ func TestComponentStatusWatcher(t *testing.T) {
wg.Wait()

// Check for additional statuses after Shutdown.
expectedStatuses = append(expectedStatuses, component.StatusStopping, component.StatusStopped)
for _, v := range changedComponents {
expectedStatuses := append([]component.Status{}, startupStatuses(v)...)
expectedStatuses = append(expectedStatuses, component.StatusStopping, component.StatusStopped)
assert.Equal(t, expectedStatuses, v)
}

Expand Down
26 changes: 21 additions & 5 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
extLogger.Info("Extension is starting...")
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStarting))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStarting),
)
if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil {
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(err))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(err),
)
return err
}
_ = bes.telemetry.Status.ReportComponentOKIfStarting(instanceID)
extLogger.Info("Extension started.")
}
return nil
Expand All @@ -55,13 +62,22 @@ func (bes *Extensions) Shutdown(ctx context.Context) error {
extID := bes.extensionIDs[i]
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopping))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopping),
)
if err := ext.Shutdown(ctx); err != nil {
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(err))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(err),
)
errs = multierr.Append(errs, err)
continue
}
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopped))
_ = bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopped),
)
}

return errs
Expand Down
8 changes: 5 additions & 3 deletions service/extensions/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
name: "successful startup/shutdown",
expectedStatuses: []*component.StatusEvent{
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
Expand All @@ -400,6 +401,7 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
name: "shutdown error",
expectedStatuses: []*component.StatusEvent{
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewPermanentErrorEvent(assert.AnError),
},
Expand Down Expand Up @@ -430,11 +432,11 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
assert.NoError(t, err)

var actualStatuses []*component.StatusEvent
init, statusFunc := status.NewServiceStatusFunc(func(id *component.InstanceID, ev *component.StatusEvent) {
rep := status.NewReporter(func(id *component.InstanceID, ev *component.StatusEvent) {
actualStatuses = append(actualStatuses, ev)
})
extensions.telemetry.ReportComponentStatus = statusFunc
init()
extensions.telemetry.Status = rep
rep.Ready()

assert.Equal(t, tc.startErr, extensions.Start(context.Background(), componenttest.NewNopHost()))
if tc.startErr == nil {
Expand Down
27 changes: 22 additions & 5 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,20 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error {
}

instanceID := g.instanceIDs[node.ID()]
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStarting))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStarting),
)

if compErr := comp.Start(ctx, host); compErr != nil {
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(compErr))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(compErr),
)
return compErr
}

_ = g.telemetry.Status.ReportComponentOKIfStarting(instanceID)
}
return nil
}
Expand All @@ -417,15 +425,24 @@ func (g *Graph) ShutdownAll(ctx context.Context) error {
}

instanceID := g.instanceIDs[node.ID()]
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopping))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopping),
)

if compErr := comp.Shutdown(ctx); compErr != nil {
errs = multierr.Append(errs, compErr)
_ = g.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(compErr))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(compErr),
)
continue
}

_ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopped))
_ = g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopped),
)
}
return errs
}
Expand Down
13 changes: 10 additions & 3 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2163,11 +2163,13 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{
instanceIDs[rNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
instanceIDs[eNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
Expand All @@ -2194,6 +2196,7 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
},
instanceIDs[eNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
Expand All @@ -2206,11 +2209,13 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{
instanceIDs[rSdErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewPermanentErrorEvent(assert.AnError),
},
instanceIDs[eNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
Expand All @@ -2223,11 +2228,13 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{
instanceIDs[rNoErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewStatusEvent(component.StatusStopped),
},
instanceIDs[eSdErr]: {
component.NewStatusEvent(component.StatusStarting),
component.NewStatusEvent(component.StatusOK),
component.NewStatusEvent(component.StatusStopping),
component.NewPermanentErrorEvent(assert.AnError),
},
Expand All @@ -2240,12 +2247,12 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
pg.telemetry = servicetelemetry.NewNopTelemetrySettings()

actualStatuses := make(map[*component.InstanceID][]*component.StatusEvent)
init, statusFunc := status.NewServiceStatusFunc(func(id *component.InstanceID, ev *component.StatusEvent) {
rep := status.NewReporter(func(id *component.InstanceID, ev *component.StatusEvent) {
actualStatuses[id] = append(actualStatuses[id], ev)
})

pg.telemetry.ReportComponentStatus = statusFunc
init()
pg.telemetry.Status = rep
rep.Ready()

e0, e1 := tc.edge[0], tc.edge[1]
pg.instanceIDs = map[int64]*component.InstanceID{
Expand Down
5 changes: 2 additions & 3 deletions service/internal/servicetelemetry/nop_telemetry_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/service/internal/status"
)

// NewNopTelemetrySettings returns a new nop settings for Create* functions.
Expand All @@ -21,8 +22,6 @@ func NewNopTelemetrySettings() TelemetrySettings {
MeterProvider: noopmetric.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
ReportComponentStatus: func(*component.InstanceID, *component.StatusEvent) error {
return nil
},
Status: status.NewReporter(func(*component.InstanceID, *component.StatusEvent) {}),
}
}
10 changes: 8 additions & 2 deletions service/internal/servicetelemetry/nop_telemetry_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@ import (

func TestNewNopSettings(t *testing.T) {
set := NewNopTelemetrySettings()

set.Status.Ready()
require.NotNil(t, set)
require.IsType(t, TelemetrySettings{}, set)
require.Equal(t, zap.NewNop(), set.Logger)
require.Equal(t, nooptrace.NewTracerProvider(), set.TracerProvider)
require.Equal(t, noopmetric.NewMeterProvider(), set.MeterProvider)
require.Equal(t, configtelemetry.LevelNone, set.MetricsLevel)
require.Equal(t, pcommon.NewResource(), set.Resource)
require.NoError(t, set.ReportComponentStatus(&component.InstanceID{}, component.NewStatusEvent(component.StatusStarting)))
require.NoError(t,
set.Status.ReportComponentStatus(
&component.InstanceID{},
component.NewStatusEvent(component.StatusStarting),
),
)
require.NoError(t, set.Status.ReportComponentOKIfStarting(&component.InstanceID{}))
}
Loading

0 comments on commit fa319ca

Please sign in to comment.