From 97aa3cb17d14b78d0861ff1731f1328b81d6395c Mon Sep 17 00:00:00 2001 From: Matthew Wear Date: Sun, 10 Sep 2023 16:08:04 -0700 Subject: [PATCH] Cleanup --- component/componenttest/nop_host.go | 2 -- component/componenttest/nop_host_test.go | 1 - component/host.go | 2 +- component/status.go | 1 + component/telemetry.go | 2 ++ otelcol/collector_test.go | 26 ++++++++++---- .../processortest/unhealthy_processor.go | 2 +- service/extensions/extensions.go | 6 +--- service/internal/components/host_wrapper.go | 2 +- .../internal/components/host_wrapper_test.go | 3 +- .../servicetelemetry/nop_settings_test.go | 4 +-- service/internal/servicetelemetry/settings.go | 7 +++- .../servicetelemetry/settings_test.go | 34 +++++++++++++++++++ service/internal/status/status.go | 20 ++++++++--- service/internal/status/status_test.go | 6 ++-- service/service.go | 1 + service/service_test.go | 31 ++++++++--------- 17 files changed, 105 insertions(+), 45 deletions(-) create mode 100644 service/internal/servicetelemetry/settings_test.go diff --git a/component/componenttest/nop_host.go b/component/componenttest/nop_host.go index 3518d584ac0..4accfab0d8c 100644 --- a/component/componenttest/nop_host.go +++ b/component/componenttest/nop_host.go @@ -17,8 +17,6 @@ func NewNopHost() component.Host { func (nh *nopHost) ReportFatalError(_ error) {} -func (nh *nopHost) ReportComponentStatus(_ component.Status, _ ...component.StatusEventOption) {} - func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { return nil } diff --git a/component/componenttest/nop_host_test.go b/component/componenttest/nop_host_test.go index f6ca2a0b5f8..1bcb92d1744 100644 --- a/component/componenttest/nop_host_test.go +++ b/component/componenttest/nop_host_test.go @@ -19,7 +19,6 @@ func TestNewNopHost(t *testing.T) { require.IsType(t, &nopHost{}, nh) nh.ReportFatalError(errors.New("TestError")) - assert.Nil(t, nh.GetExporters()) // nolint: staticcheck assert.Nil(t, nh.GetExtensions()) assert.Nil(t, nh.GetFactory(component.KindReceiver, "test")) diff --git a/component/host.go b/component/host.go index 44526b52d46..9b963727d1f 100644 --- a/component/host.go +++ b/component/host.go @@ -12,7 +12,7 @@ type Host interface { // // ReportFatalError should be called by the component anytime after Component.Start() ends and // before Component.Shutdown() begins. - // Deprecated: [0.65.0] Use ReportComponentStatus instead (with an event component.StatusFatalError) + // Deprecated: [x.x.x] Use ReportComponentStatus instead (with an event component.StatusFatalError) ReportFatalError(err error) // GetFactory of the specified kind. Returns the factory for a component type. diff --git a/component/status.go b/component/status.go index f6399a0dea5..e09ecbf90f1 100644 --- a/component/status.go +++ b/component/status.go @@ -137,4 +137,5 @@ type StatusWatcher interface { ComponentStatusChanged(source *InstanceID, event *StatusEvent) } +// StatusFunc is the expected type of ReportComponentStatus for compoment.TelemetrySettings type StatusFunc func(Status, ...StatusEventOption) error diff --git a/component/telemetry.go b/component/telemetry.go index eb700a19851..7c47bf2af2e 100644 --- a/component/telemetry.go +++ b/component/telemetry.go @@ -44,4 +44,6 @@ type TelemetrySettingsBase[T any] struct { ReportComponentStatus T } +// TelemetrySettings and servicetelemetry.Settings differ in the method signature for +// ReportComponentStatus type TelemetrySettings TelemetrySettingsBase[StatusFunc] diff --git a/otelcol/collector_test.go b/otelcol/collector_test.go index c8a0b10accd..1d3b8e698ac 100644 --- a/otelcol/collector_test.go +++ b/otelcol/collector_test.go @@ -158,20 +158,20 @@ func TestComponentStatusWatcher(t *testing.T) { assert.NoError(t, err) // Use a processor factory that creates "unhealthy" processor: one that - // always reports StatusError after successful Start. + // always reports StatusRecoverableError after successful Start. unhealthyProcessorFactory := processortest.NewUnhealthyProcessorFactory() factories.Processors[unhealthyProcessorFactory.Type()] = unhealthyProcessorFactory // Keep track of all status changes in a map. - changedComponents := map[*component.InstanceID]component.Status{} + changedComponents := map[*component.InstanceID][]component.Status{} var mux sync.Mutex onStatusChanged := func(source *component.InstanceID, event *component.StatusEvent) { - if event.Status() != component.StatusRecoverableError { + if source.ID.Type() != unhealthyProcessorFactory.Type() { return } mux.Lock() defer mux.Unlock() - changedComponents[source] = event.Status() + changedComponents[source] = append(changedComponents[source], event.Status()) } // Add a "statuswatcher" extension that will receive notifications when processor @@ -194,6 +194,13 @@ func TestComponentStatusWatcher(t *testing.T) { // Start the newly created collector. wg := startCollector(context.Background(), t, col) + // An unhealthy processor will successfully start, then report a recoverable error. + expectedStatuses := []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusRecoverableError, + } + // The "unhealthy" processors will now begin to asynchronously report StatusError. // We expect to see these reports. assert.Eventually(t, func() bool { @@ -203,8 +210,8 @@ func TestComponentStatusWatcher(t *testing.T) { for k, v := range changedComponents { // All processors must report a status change with the same ID assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID) - // And all must be in StatusError - assert.EqualValues(t, component.StatusRecoverableError, v) + // And all must have the expected statuses + assert.Equal(t, expectedStatuses, v) } // We have 3 processors with exactly the same ID in otelcol-statuswatcher.yaml // We must have exactly 3 items in our map. This ensures that the "source" argument @@ -216,6 +223,13 @@ func TestComponentStatusWatcher(t *testing.T) { col.Shutdown() wg.Wait() + + // Check for additional statuses after Shutdown. + expectedStatuses = append(expectedStatuses, component.StatusStopping, component.StatusStopped) + for _, v := range changedComponents { + assert.Equal(t, expectedStatuses, v) + } + assert.Equal(t, StateClosed, col.GetState()) } diff --git a/processor/processortest/unhealthy_processor.go b/processor/processortest/unhealthy_processor.go index 979bfbdd0fd..0ae30e50ec8 100644 --- a/processor/processortest/unhealthy_processor.go +++ b/processor/processortest/unhealthy_processor.go @@ -62,7 +62,7 @@ type unhealthyProcessor struct { telemetry component.TelemetrySettings } -func (p unhealthyProcessor) Start(_ context.Context, host component.Host) error { +func (p unhealthyProcessor) Start(_ context.Context, _ component.Host) error { go func() { _ = p.telemetry.ReportComponentStatus(component.StatusRecoverableError) }() diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index e576fef5ace..0686574ec18 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -154,17 +154,13 @@ func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) { extMap: make(map[component.ID]extension.Extension), } for _, extID := range cfg { - instanceID := &component.InstanceID{ ID: extID, Kind: component.KindExtension, } - - telSet := set.Telemetry.ToComponentTelemetrySettings(instanceID) - extSet := extension.CreateSettings{ ID: extID, - TelemetrySettings: telSet, + TelemetrySettings: set.Telemetry.ToComponentTelemetrySettings(instanceID), BuildInfo: set.BuildInfo, } extSet.TelemetrySettings.Logger = components.ExtensionLogger(set.Telemetry.Logger, extID) diff --git a/service/internal/components/host_wrapper.go b/service/internal/components/host_wrapper.go index 07ff2a741d7..2d386ddad67 100644 --- a/service/internal/components/host_wrapper.go +++ b/service/internal/components/host_wrapper.go @@ -27,7 +27,7 @@ func NewHostWrapper(host component.Host, logger *zap.Logger) component.Host { func (hw *hostWrapper) ReportFatalError(err error) { // The logger from the built component already identifies the component. hw.Logger.Error("Component fatal error", zap.Error(err)) - hw.Host.ReportFatalError(err) // nolint:staticcheck + hw.Host.ReportFatalError(err) } // RegisterZPages is used by zpages extension to register handles from service. diff --git a/service/internal/components/host_wrapper_test.go b/service/internal/components/host_wrapper_test.go index 25567810fff..62b7a744681 100644 --- a/service/internal/components/host_wrapper_test.go +++ b/service/internal/components/host_wrapper_test.go @@ -7,8 +7,9 @@ import ( "errors" "testing" - "go.opentelemetry.io/collector/component/componenttest" "go.uber.org/zap" + + "go.opentelemetry.io/collector/component/componenttest" ) func Test_newHostWrapper(_ *testing.T) { diff --git a/service/internal/servicetelemetry/nop_settings_test.go b/service/internal/servicetelemetry/nop_settings_test.go index d2463f5d1f0..90cdec4a9b6 100644 --- a/service/internal/servicetelemetry/nop_settings_test.go +++ b/service/internal/servicetelemetry/nop_settings_test.go @@ -7,13 +7,13 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/otel/metric/noop" - "go.opentelemetry.io/otel/trace" ) func TestNewNopSettings(t *testing.T) { diff --git a/service/internal/servicetelemetry/settings.go b/service/internal/servicetelemetry/settings.go index 43e18d972ca..c665bf27bb5 100644 --- a/service/internal/servicetelemetry/settings.go +++ b/service/internal/servicetelemetry/settings.go @@ -1,15 +1,20 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package servicetelemetry // import "go.opentelemetry.io/collector/internal/servicetelemetry" +package servicetelemetry // import "go.opentelemetry.io/collector/service/internal/servicetelemetry" import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/service/internal/status" ) +// Settings mirrors component.TelemetrySettings except for the method signature of +// ReportComponentStatus. The service level Settings is not bound a specific component, and +// therefore takes a component.InstanceID as an argument. type Settings component.TelemetrySettingsBase[status.ServiceStatusFunc] +// ToComponentTelemetrySettings returns a TelemetrySettings for a specific component derived from +// this service level Settings object. func (s Settings) ToComponentTelemetrySettings(id *component.InstanceID) component.TelemetrySettings { return component.TelemetrySettings{ Logger: s.Logger, diff --git a/service/internal/servicetelemetry/settings_test.go b/service/internal/servicetelemetry/settings_test.go new file mode 100644 index 00000000000..d5b3ee6a1e1 --- /dev/null +++ b/service/internal/servicetelemetry/settings_test.go @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package servicetelemetry + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestSettings(t *testing.T) { + set := Settings{ + Logger: zap.NewNop(), + TracerProvider: trace.NewNoopTracerProvider(), + MeterProvider: noop.NewMeterProvider(), + MetricsLevel: configtelemetry.LevelNone, + Resource: pcommon.NewResource(), + ReportComponentStatus: func(*component.InstanceID, component.Status, ...component.StatusEventOption) error { + return nil + }, + } + require.NoError(t, set.ReportComponentStatus(&component.InstanceID{}, component.StatusOK)) + + compSet := set.ToComponentTelemetrySettings(&component.InstanceID{}) + require.NoError(t, compSet.ReportComponentStatus(component.StatusOK)) +} diff --git a/service/internal/status/status.go b/service/internal/status/status.go index 38d91f5ea39..908e847e527 100644 --- a/service/internal/status/status.go +++ b/service/internal/status/status.go @@ -14,6 +14,7 @@ import ( // onTransitionFunc receives a component.StatusEvent on a successful state transition type onTransitionFunc func(*component.StatusEvent) +// errInvalidStateTransition is returned for invalid state transitions var errInvalidStateTransition = errors.New("invalid state transition") // fsm is a finite state machine that models transitions for component status @@ -23,10 +24,10 @@ type fsm struct { onTransition onTransitionFunc } -// Transition will attempt to execute a state transition. If successful, it calls the onTransitionFunc -// with a StatusEvent representing the new state. Returns an error if the arguments result in an -// invalid status, or if the state transition is not valid. -func (m *fsm) Transition(status component.Status, options ...component.StatusEventOption) error { +// transition will attempt to execute a state transition. If it's successful, it calls the +// onTransitionFunc with a StatusEvent representing the new state. Returns an error if the arguments +// result in an invalid status, or if the state transition is not valid. +func (m *fsm) transition(status component.Status, options ...component.StatusEventOption) error { if _, ok := m.transitions[m.current.Status()][status]; !ok { return fmt.Errorf( "cannot transition from %s to %s: %w", @@ -93,9 +94,13 @@ func newFSM(onTransition onTransitionFunc) *fsm { } } +// InitFunc can be used to toggle a ready flag to true type InitFunc func() + +// readFunc can be used to check the value of a ready flag type readyFunc func() bool +// initAndReadyFuncs returns a pair of functions to set and check a boolean ready flag func initAndReadyFuncs() (InitFunc, readyFunc) { mu := sync.RWMutex{} isReady := false @@ -115,9 +120,13 @@ func initAndReadyFuncs() (InitFunc, readyFunc) { return init, ready } +// NotifyStatusFunc is the receiver of status events after successful state transitions type NotifyStatusFunc func(*component.InstanceID, *component.StatusEvent) + +// ServiceStatusFunc is the expected type of ReportComponentStatus for servicetelemetry.Settings type ServiceStatusFunc func(id *component.InstanceID, status component.Status, opts ...component.StatusEventOption) error +// errStatusNotReady is returned when trying to report status before service start var errStatusNotReady = errors.New("report component status is not ready until service start") // NewServiceStatusFunc returns a function to be used as ReportComponentStatus for @@ -126,6 +135,7 @@ var errStatusNotReady = errors.New("report component status is not ready until s // the a component.InstanceID as a parameter. func NewServiceStatusFunc(notifyStatusChange NotifyStatusFunc) (InitFunc, ServiceStatusFunc) { init, isReady := initAndReadyFuncs() + // mu synchronizes access to the fsmMap and the underlying fsm during a state transition mu := sync.Mutex{} fsmMap := make(map[*component.InstanceID]*fsm) return init, @@ -142,7 +152,7 @@ func NewServiceStatusFunc(notifyStatusChange NotifyStatusFunc) (InitFunc, Servic }) fsmMap[id] = fsm } - return fsm.Transition(status, opts...) + return fsm.transition(status, opts...) } } diff --git a/service/internal/status/status_test.go b/service/internal/status/status_test.go index 884ac056161..a51890e2d9c 100644 --- a/service/internal/status/status_test.go +++ b/service/internal/status/status_test.go @@ -132,7 +132,7 @@ func TestStatusFSM(t *testing.T) { errorCount := 0 for _, status := range tc.reportedStatuses { - if err := fsm.Transition(status); err != nil { + if err := fsm.transition(status); err != nil { errorCount++ require.ErrorIs(t, err, errInvalidStateTransition) } @@ -146,11 +146,11 @@ func TestStatusFSM(t *testing.T) { func TestStatusEventError(t *testing.T) { fsm := newFSM(func(*component.StatusEvent) {}) - err := fsm.Transition(component.StatusStarting) + err := fsm.transition(component.StatusStarting) require.NoError(t, err) // the combination of StatusOK with an error is invalid - err = fsm.Transition(component.StatusOK, component.WithError(assert.AnError)) + err = fsm.transition(component.StatusOK, component.WithError(assert.AnError)) require.Error(t, err) require.ErrorIs(t, err, component.ErrStatusEventInvalidArgument) diff --git a/service/service.go b/service/service.go index bc3339df673..3ef1f6197b1 100644 --- a/service/service.go +++ b/service/service.go @@ -145,6 +145,7 @@ func (srv *Service) Start(ctx context.Context) error { zap.Int("NumCPU", runtime.NumCPU()), ) + // enable status reporting srv.statusInit() if err := srv.host.serviceExtensions.Start(ctx, srv.host); err != nil { diff --git a/service/service_test.go b/service/service_test.go index 5d68f247309..a0608fa6a2c 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -404,27 +404,26 @@ func TestNilCollectorEffectiveConfig(t *testing.T) { } func TestServiceFatalError(t *testing.T) { - //TODO: restore this test - // set := newNopSettings() - // set.AsyncErrorChannel = make(chan error) + set := newNopSettings() + set.AsyncErrorChannel = make(chan error) - // srv, err := New(context.Background(), set, newNopConfig()) - // require.NoError(t, err) + srv, err := New(context.Background(), set, newNopConfig()) + require.NoError(t, err) - // assert.NoError(t, srv.Start(context.Background())) - // t.Cleanup(func() { - // assert.NoError(t, srv.Shutdown(context.Background())) - // }) + assert.NoError(t, srv.Start(context.Background())) + t.Cleanup(func() { + assert.NoError(t, srv.Shutdown(context.Background())) + }) - // go func() { - // ev, _ := component.NewStatusEvent(component.StatusFatalError, component.WithError(assert.AnError)) - // srv.host.ReportComponentStatus(&component.InstanceID{}, ev) - // }() + go func() { + ev, _ := component.NewStatusEvent(component.StatusFatalError, component.WithError(assert.AnError)) + srv.host.notifyComponentStatusChange(&component.InstanceID{}, ev) + }() - // err = <-srv.host.asyncErrorChannel + err = <-srv.host.asyncErrorChannel - // require.Error(t, err) - // require.ErrorIs(t, err, assert.AnError) + require.Error(t, err) + require.ErrorIs(t, err, assert.AnError) } func assertResourceLabels(t *testing.T, res pcommon.Resource, expectedLabels map[string]labelValue) {