Skip to content

Commit

Permalink
[extension/healthcheckv2] Manage extension and subcomponents (#34144)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
The PR is the sixth and final in a series to decompose #30673 into more
manageable pieces for review. This PR adds code to wire up the extension
as a status watcher and to manage the HTTP and gRPC services as
independent subcomponents. After this merges the extension will be
usable for evaluation purposes and we will likely make some refinements
based on feedback from early adopters.

**Link to tracking Issue:** #26661

**Testing:** Units / manual

**Documentation:** Comments, etc.
  • Loading branch information
mwear authored Jul 18, 2024
1 parent 2997a90 commit 54af4bd
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 9 deletions.
27 changes: 27 additions & 0 deletions .chloggen/healthcheck-v2-ext.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: extension/healthcheckv2

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add extension/subcomponent management logic.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26661]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
178 changes: 170 additions & 8 deletions extension/healthcheckv2extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,198 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/extension"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/grpc"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/http"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status"
)

type eventSourcePair struct {
source *component.InstanceID
event *component.StatusEvent
}

type healthCheckExtension struct {
config Config
telemetry component.TelemetrySettings
config Config
telemetry component.TelemetrySettings
aggregator *status.Aggregator
subcomponents []component.Component
eventCh chan *eventSourcePair
readyCh chan struct{}
}

var _ component.Component = (*healthCheckExtension)(nil)
var _ extension.ConfigWatcher = (*healthCheckExtension)(nil)
var _ extension.PipelineWatcher = (*healthCheckExtension)(nil)

func newExtension(
_ context.Context,
ctx context.Context,
config Config,
set extension.Settings,
) *healthCheckExtension {
return &healthCheckExtension{
config: config,
telemetry: set.TelemetrySettings,
var comps []component.Component

errPriority := status.PriorityPermanent
if config.ComponentHealthConfig != nil &&
config.ComponentHealthConfig.IncludeRecoverable &&
!config.ComponentHealthConfig.IncludePermanent {
errPriority = status.PriorityRecoverable
}

aggregator := status.NewAggregator(errPriority)

if config.UseV2 && config.GRPCConfig != nil {
grpcServer := grpc.NewServer(
config.GRPCConfig,
config.ComponentHealthConfig,
set.TelemetrySettings,
aggregator,
)
comps = append(comps, grpcServer)
}

if !config.UseV2 || config.UseV2 && config.HTTPConfig != nil {
httpServer := http.NewServer(
config.HTTPConfig,
config.LegacyConfig,
config.ComponentHealthConfig,
set.TelemetrySettings,
aggregator,
)
comps = append(comps, httpServer)
}

hc := &healthCheckExtension{
config: config,
subcomponents: comps,
telemetry: set.TelemetrySettings,
aggregator: aggregator,
eventCh: make(chan *eventSourcePair),
readyCh: make(chan struct{}),
}

// Start processing events in the background so that our status watcher doesn't
// block others before the extension starts.
go hc.eventLoop(ctx)

return hc
}

// Start implements the component.Component interface.
func (hc *healthCheckExtension) Start(context.Context, component.Host) error {
func (hc *healthCheckExtension) Start(ctx context.Context, host component.Host) error {
hc.telemetry.Logger.Debug("Starting health check extension V2", zap.Any("config", hc.config))

for _, comp := range hc.subcomponents {
if err := comp.Start(ctx, host); err != nil {
return err
}
}

return nil
}

// Shutdown implements the component.Component interface.
func (hc *healthCheckExtension) Shutdown(context.Context) error {
func (hc *healthCheckExtension) Shutdown(ctx context.Context) error {
// Preemptively send the stopped event, so it can be exported before shutdown
hc.telemetry.ReportStatus(component.NewStatusEvent(component.StatusStopped))

close(hc.eventCh)
hc.aggregator.Close()

var err error
for _, comp := range hc.subcomponents {
err = multierr.Append(err, comp.Shutdown(ctx))
}

return err
}

// ComponentStatusChanged implements the extension.StatusWatcher interface.
func (hc *healthCheckExtension) ComponentStatusChanged(
source *component.InstanceID,
event *component.StatusEvent,
) {
// There can be late arriving events after shutdown. We need to close
// the event channel so that this function doesn't block and we release all
// goroutines, but attempting to write to a closed channel will panic; log
// and recover.
defer func() {
if r := recover(); r != nil {
hc.telemetry.Logger.Info(
"discarding event received after shutdown",
zap.Any("source", source),
zap.Any("event", event),
)
}
}()
hc.eventCh <- &eventSourcePair{source: source, event: event}
}

// NotifyConfig implements the extension.ConfigWatcher interface.
func (hc *healthCheckExtension) NotifyConfig(ctx context.Context, conf *confmap.Conf) error {
var err error
for _, comp := range hc.subcomponents {
if cw, ok := comp.(extension.ConfigWatcher); ok {
err = multierr.Append(err, cw.NotifyConfig(ctx, conf))
}
}
return err
}

// Ready implements the extension.PipelineWatcher interface.
func (hc *healthCheckExtension) Ready() error {
close(hc.readyCh)
return nil
}

// NotReady implements the extension.PipelineWatcher interface.
func (hc *healthCheckExtension) NotReady() error {
return nil
}

func (hc *healthCheckExtension) eventLoop(ctx context.Context) {
// Record events with component.StatusStarting, but queue other events until
// PipelineWatcher.Ready is called. This prevents aggregate statuses from
// flapping between StatusStarting and StatusOK as components are started
// individually by the service.
var eventQueue []*eventSourcePair

for loop := true; loop; {
select {
case esp, ok := <-hc.eventCh:
if !ok {
return
}
if esp.event.Status() != component.StatusStarting {
eventQueue = append(eventQueue, esp)
continue
}
hc.aggregator.RecordStatus(esp.source, esp.event)
case <-hc.readyCh:
for _, esp := range eventQueue {
hc.aggregator.RecordStatus(esp.source, esp.event)
}
eventQueue = nil
loop = false
case <-ctx.Done():
return
}
}

// After PipelineWatcher.Ready, record statuses as they are received.
for {
select {
case esp, ok := <-hc.eventCh:
if !ok {
return
}
hc.aggregator.RecordStatus(esp.source, esp.event)
case <-ctx.Done():
return
}
}
}
135 changes: 135 additions & 0 deletions extension/healthcheckv2extension/extension_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package healthcheckv2extension

import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/extension/extensiontest"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/testhelpers"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
)

func TestComponentStatus(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.HTTPConfig.Endpoint = testutil.GetAvailableLocalAddress(t)
cfg.GRPCConfig.NetAddr.Endpoint = testutil.GetAvailableLocalAddress(t)
cfg.UseV2 = true
ext := newExtension(context.Background(), *cfg, extensiontest.NewNopSettings())

// Status before Start will be StatusNone
st, ok := ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
assert.Equal(t, st.Status(), component.StatusNone)

require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))

traces := testhelpers.NewPipelineMetadata("traces")

// StatusStarting will be sent immediately.
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStarting))
}

// StatusOK will be queued until the PipelineWatcher Ready method is called.
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusOK))
}

// Note the use of assert.Eventually here and throughout this test is because
// status events are processed asynchronously in the background.
assert.Eventually(t, func() bool {
st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
return st.Status() == component.StatusStarting
}, time.Second, 10*time.Millisecond)

require.NoError(t, ext.Ready())

assert.Eventually(t, func() bool {
st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
return st.Status() == component.StatusOK
}, time.Second, 10*time.Millisecond)

// StatusStopping will be sent immediately.
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStopping))
}

assert.Eventually(t, func() bool {
st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
return st.Status() == component.StatusStopping
}, time.Second, 10*time.Millisecond)

require.NoError(t, ext.NotReady())
require.NoError(t, ext.Shutdown(context.Background()))

// Events sent after shutdown will be discarded
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStopped))
}

st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
assert.Equal(t, component.StatusStopping, st.Status())
}

func TestNotifyConfig(t *testing.T) {
confMap, err := confmaptest.LoadConf(
filepath.Join("internal", "http", "testdata", "config.yaml"),
)
require.NoError(t, err)
confJSON, err := os.ReadFile(
filepath.Clean(filepath.Join("internal", "http", "testdata", "config.json")),
)
require.NoError(t, err)

endpoint := testutil.GetAvailableLocalAddress(t)

cfg := createDefaultConfig().(*Config)
cfg.UseV2 = true
cfg.HTTPConfig.Endpoint = endpoint
cfg.HTTPConfig.Config.Enabled = true
cfg.HTTPConfig.Config.Path = "/config"

ext := newExtension(context.Background(), *cfg, extensiontest.NewNopSettings())

require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })

client := &http.Client{}
url := fmt.Sprintf("http://%s/config", endpoint)

var resp *http.Response

resp, err = client.Get(url)
require.NoError(t, err)
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)

require.NoError(t, ext.NotifyConfig(context.Background(), confMap))

resp, err = client.Get(url)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, confJSON, body)
}
2 changes: 1 addition & 1 deletion extension/healthcheckv2extension/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
go.opentelemetry.io/otel/metric v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
google.golang.org/grpc v1.65.0
)
Expand Down Expand Up @@ -62,7 +63,6 @@ require (
go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down

0 comments on commit 54af4bd

Please sign in to comment.