diff --git a/.chloggen/healthcheck-v2-ext.yaml b/.chloggen/healthcheck-v2-ext.yaml new file mode 100644 index 000000000000..a504cad4e971 --- /dev/null +++ b/.chloggen/healthcheck-v2-ext.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: extension/healthcheckv2 + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add extension/subcomponent management logic. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26661] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/extension/healthcheckv2extension/extension.go b/extension/healthcheckv2extension/extension.go index 033a481f00dd..6cecded478c4 100644 --- a/extension/healthcheckv2extension/extension.go +++ b/extension/healthcheckv2extension/extension.go @@ -7,36 +7,198 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" + "go.uber.org/multierr" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/grpc" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/http" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status" ) +type eventSourcePair struct { + source *component.InstanceID + event *component.StatusEvent +} + type healthCheckExtension struct { - config Config - telemetry component.TelemetrySettings + config Config + telemetry component.TelemetrySettings + aggregator *status.Aggregator + subcomponents []component.Component + eventCh chan *eventSourcePair + readyCh chan struct{} } var _ component.Component = (*healthCheckExtension)(nil) +var _ extension.ConfigWatcher = (*healthCheckExtension)(nil) +var _ extension.PipelineWatcher = (*healthCheckExtension)(nil) func newExtension( - _ context.Context, + ctx context.Context, config Config, set extension.Settings, ) *healthCheckExtension { - return &healthCheckExtension{ - config: config, - telemetry: set.TelemetrySettings, + var comps []component.Component + + errPriority := status.PriorityPermanent + if config.ComponentHealthConfig != nil && + config.ComponentHealthConfig.IncludeRecoverable && + !config.ComponentHealthConfig.IncludePermanent { + errPriority = status.PriorityRecoverable + } + + aggregator := status.NewAggregator(errPriority) + + if config.UseV2 && config.GRPCConfig != nil { + grpcServer := grpc.NewServer( + config.GRPCConfig, + config.ComponentHealthConfig, + set.TelemetrySettings, + aggregator, + ) + comps = append(comps, grpcServer) + } + + if !config.UseV2 || config.UseV2 && config.HTTPConfig != nil { + httpServer := http.NewServer( + config.HTTPConfig, + config.LegacyConfig, + config.ComponentHealthConfig, + set.TelemetrySettings, + aggregator, + ) + comps = append(comps, httpServer) } + + hc := &healthCheckExtension{ + config: config, + subcomponents: comps, + telemetry: set.TelemetrySettings, + aggregator: aggregator, + eventCh: make(chan *eventSourcePair), + readyCh: make(chan struct{}), + } + + // Start processing events in the background so that our status watcher doesn't + // block others before the extension starts. + go hc.eventLoop(ctx) + + return hc } // Start implements the component.Component interface. -func (hc *healthCheckExtension) Start(context.Context, component.Host) error { +func (hc *healthCheckExtension) Start(ctx context.Context, host component.Host) error { hc.telemetry.Logger.Debug("Starting health check extension V2", zap.Any("config", hc.config)) + for _, comp := range hc.subcomponents { + if err := comp.Start(ctx, host); err != nil { + return err + } + } + return nil } // Shutdown implements the component.Component interface. -func (hc *healthCheckExtension) Shutdown(context.Context) error { +func (hc *healthCheckExtension) Shutdown(ctx context.Context) error { + // Preemptively send the stopped event, so it can be exported before shutdown + hc.telemetry.ReportStatus(component.NewStatusEvent(component.StatusStopped)) + + close(hc.eventCh) + hc.aggregator.Close() + + var err error + for _, comp := range hc.subcomponents { + err = multierr.Append(err, comp.Shutdown(ctx)) + } + + return err +} + +// ComponentStatusChanged implements the extension.StatusWatcher interface. +func (hc *healthCheckExtension) ComponentStatusChanged( + source *component.InstanceID, + event *component.StatusEvent, +) { + // There can be late arriving events after shutdown. We need to close + // the event channel so that this function doesn't block and we release all + // goroutines, but attempting to write to a closed channel will panic; log + // and recover. + defer func() { + if r := recover(); r != nil { + hc.telemetry.Logger.Info( + "discarding event received after shutdown", + zap.Any("source", source), + zap.Any("event", event), + ) + } + }() + hc.eventCh <- &eventSourcePair{source: source, event: event} +} + +// NotifyConfig implements the extension.ConfigWatcher interface. +func (hc *healthCheckExtension) NotifyConfig(ctx context.Context, conf *confmap.Conf) error { + var err error + for _, comp := range hc.subcomponents { + if cw, ok := comp.(extension.ConfigWatcher); ok { + err = multierr.Append(err, cw.NotifyConfig(ctx, conf)) + } + } + return err +} + +// Ready implements the extension.PipelineWatcher interface. +func (hc *healthCheckExtension) Ready() error { + close(hc.readyCh) return nil } + +// NotReady implements the extension.PipelineWatcher interface. +func (hc *healthCheckExtension) NotReady() error { + return nil +} + +func (hc *healthCheckExtension) eventLoop(ctx context.Context) { + // Record events with component.StatusStarting, but queue other events until + // PipelineWatcher.Ready is called. This prevents aggregate statuses from + // flapping between StatusStarting and StatusOK as components are started + // individually by the service. + var eventQueue []*eventSourcePair + + for loop := true; loop; { + select { + case esp, ok := <-hc.eventCh: + if !ok { + return + } + if esp.event.Status() != component.StatusStarting { + eventQueue = append(eventQueue, esp) + continue + } + hc.aggregator.RecordStatus(esp.source, esp.event) + case <-hc.readyCh: + for _, esp := range eventQueue { + hc.aggregator.RecordStatus(esp.source, esp.event) + } + eventQueue = nil + loop = false + case <-ctx.Done(): + return + } + } + + // After PipelineWatcher.Ready, record statuses as they are received. + for { + select { + case esp, ok := <-hc.eventCh: + if !ok { + return + } + hc.aggregator.RecordStatus(esp.source, esp.event) + case <-ctx.Done(): + return + } + } +} diff --git a/extension/healthcheckv2extension/extension_test.go b/extension/healthcheckv2extension/extension_test.go new file mode 100644 index 000000000000..b988172afcd9 --- /dev/null +++ b/extension/healthcheckv2extension/extension_test.go @@ -0,0 +1,135 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package healthcheckv2extension + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/extension/extensiontest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/testhelpers" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" +) + +func TestComponentStatus(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.HTTPConfig.Endpoint = testutil.GetAvailableLocalAddress(t) + cfg.GRPCConfig.NetAddr.Endpoint = testutil.GetAvailableLocalAddress(t) + cfg.UseV2 = true + ext := newExtension(context.Background(), *cfg, extensiontest.NewNopSettings()) + + // Status before Start will be StatusNone + st, ok := ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise) + require.True(t, ok) + assert.Equal(t, st.Status(), component.StatusNone) + + require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost())) + + traces := testhelpers.NewPipelineMetadata("traces") + + // StatusStarting will be sent immediately. + for _, id := range traces.InstanceIDs() { + ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStarting)) + } + + // StatusOK will be queued until the PipelineWatcher Ready method is called. + for _, id := range traces.InstanceIDs() { + ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusOK)) + } + + // Note the use of assert.Eventually here and throughout this test is because + // status events are processed asynchronously in the background. + assert.Eventually(t, func() bool { + st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise) + require.True(t, ok) + return st.Status() == component.StatusStarting + }, time.Second, 10*time.Millisecond) + + require.NoError(t, ext.Ready()) + + assert.Eventually(t, func() bool { + st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise) + require.True(t, ok) + return st.Status() == component.StatusOK + }, time.Second, 10*time.Millisecond) + + // StatusStopping will be sent immediately. + for _, id := range traces.InstanceIDs() { + ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStopping)) + } + + assert.Eventually(t, func() bool { + st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise) + require.True(t, ok) + return st.Status() == component.StatusStopping + }, time.Second, 10*time.Millisecond) + + require.NoError(t, ext.NotReady()) + require.NoError(t, ext.Shutdown(context.Background())) + + // Events sent after shutdown will be discarded + for _, id := range traces.InstanceIDs() { + ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStopped)) + } + + st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise) + require.True(t, ok) + assert.Equal(t, component.StatusStopping, st.Status()) +} + +func TestNotifyConfig(t *testing.T) { + confMap, err := confmaptest.LoadConf( + filepath.Join("internal", "http", "testdata", "config.yaml"), + ) + require.NoError(t, err) + confJSON, err := os.ReadFile( + filepath.Clean(filepath.Join("internal", "http", "testdata", "config.json")), + ) + require.NoError(t, err) + + endpoint := testutil.GetAvailableLocalAddress(t) + + cfg := createDefaultConfig().(*Config) + cfg.UseV2 = true + cfg.HTTPConfig.Endpoint = endpoint + cfg.HTTPConfig.Config.Enabled = true + cfg.HTTPConfig.Config.Path = "/config" + + ext := newExtension(context.Background(), *cfg, extensiontest.NewNopSettings()) + + require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) }) + + client := &http.Client{} + url := fmt.Sprintf("http://%s/config", endpoint) + + var resp *http.Response + + resp, err = client.Get(url) + require.NoError(t, err) + assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + + require.NoError(t, ext.NotifyConfig(context.Background(), confMap)) + + resp, err = client.Get(url) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + assert.Equal(t, confJSON, body) +} diff --git a/extension/healthcheckv2extension/go.mod b/extension/healthcheckv2extension/go.mod index 7065e6499325..4a89b3c1c53b 100644 --- a/extension/healthcheckv2extension/go.mod +++ b/extension/healthcheckv2extension/go.mod @@ -15,6 +15,7 @@ require ( go.opentelemetry.io/otel/metric v1.28.0 go.opentelemetry.io/otel/trace v1.28.0 go.uber.org/goleak v1.3.0 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 google.golang.org/grpc v1.65.0 ) @@ -62,7 +63,6 @@ require ( go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect go.opentelemetry.io/otel/sdk v1.28.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.27.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect