Skip to content

Commit

Permalink
Add extension/component management logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mwear committed Jul 17, 2024
1 parent 2997a90 commit f4d8810
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 9 deletions.
27 changes: 27 additions & 0 deletions .chloggen/healthcheck-v2-ext.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: extension/healthcheckv2

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add extension/subcomponent management logic.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26661]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
178 changes: 170 additions & 8 deletions extension/healthcheckv2extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,198 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/extension"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/grpc"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/http"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status"
)

type eventSourcePair struct {
source *component.InstanceID
event *component.StatusEvent
}

type healthCheckExtension struct {
config Config
telemetry component.TelemetrySettings
config Config
telemetry component.TelemetrySettings
aggregator *status.Aggregator
subcomponents []component.Component
eventCh chan *eventSourcePair
readyCh chan struct{}
}

var _ component.Component = (*healthCheckExtension)(nil)
var _ extension.ConfigWatcher = (*healthCheckExtension)(nil)
var _ extension.PipelineWatcher = (*healthCheckExtension)(nil)

func newExtension(
_ context.Context,
ctx context.Context,
config Config,
set extension.Settings,
) *healthCheckExtension {
return &healthCheckExtension{
config: config,
telemetry: set.TelemetrySettings,
var comps []component.Component

errPriority := status.PriorityPermanent
if config.ComponentHealthConfig != nil &&
config.ComponentHealthConfig.IncludeRecoverable &&
!config.ComponentHealthConfig.IncludePermanent {
errPriority = status.PriorityRecoverable
}

aggregator := status.NewAggregator(errPriority)

if config.UseV2 && config.GRPCConfig != nil {
grpcServer := grpc.NewServer(
config.GRPCConfig,
config.ComponentHealthConfig,
set.TelemetrySettings,
aggregator,
)
comps = append(comps, grpcServer)
}

if !config.UseV2 || config.UseV2 && config.HTTPConfig != nil {
httpServer := http.NewServer(
config.HTTPConfig,
config.LegacyConfig,
config.ComponentHealthConfig,
set.TelemetrySettings,
aggregator,
)
comps = append(comps, httpServer)
}

hc := &healthCheckExtension{
config: config,
subcomponents: comps,
telemetry: set.TelemetrySettings,
aggregator: aggregator,
eventCh: make(chan *eventSourcePair),
readyCh: make(chan struct{}),
}

// Start processing events in the background so that our status watcher doesn't
// block others before the extension starts.
go hc.eventLoop(ctx)

return hc
}

// Start implements the component.Component interface.
func (hc *healthCheckExtension) Start(context.Context, component.Host) error {
func (hc *healthCheckExtension) Start(ctx context.Context, host component.Host) error {
hc.telemetry.Logger.Debug("Starting health check extension V2", zap.Any("config", hc.config))

for _, comp := range hc.subcomponents {
if err := comp.Start(ctx, host); err != nil {
return err
}
}

return nil
}

// Shutdown implements the component.Component interface.
func (hc *healthCheckExtension) Shutdown(context.Context) error {
func (hc *healthCheckExtension) Shutdown(ctx context.Context) error {
// Preemptively send the stopped event, so it can be exported before shutdown
hc.telemetry.ReportStatus(component.NewStatusEvent(component.StatusStopped))

close(hc.eventCh)
hc.aggregator.Close()

var err error
for _, comp := range hc.subcomponents {
err = multierr.Append(err, comp.Shutdown(ctx))
}

return err
}

// ComponentStatusChanged implements the extension.StatusWatcher interface.
func (hc *healthCheckExtension) ComponentStatusChanged(
source *component.InstanceID,
event *component.StatusEvent,
) {
// There can be late arriving events after shutdown. We need to close
// the event channel so that this function doesn't block and we release all
// goroutines, but attempting to write to a closed channel will panic; log
// and recover.
defer func() {
if r := recover(); r != nil {
hc.telemetry.Logger.Info(
"discarding event received after shutdown",
zap.Any("source", source),
zap.Any("event", event),
)
}
}()
hc.eventCh <- &eventSourcePair{source: source, event: event}
}

// NotifyConfig implements the extension.ConfigWatcher interface.
func (hc *healthCheckExtension) NotifyConfig(ctx context.Context, conf *confmap.Conf) error {
var err error
for _, comp := range hc.subcomponents {
if cw, ok := comp.(extension.ConfigWatcher); ok {
err = multierr.Append(err, cw.NotifyConfig(ctx, conf))
}
}
return err
}

// Ready implements the extension.PipelineWatcher interface.
func (hc *healthCheckExtension) Ready() error {
close(hc.readyCh)
return nil
}

// NotReady implements the extension.PipelineWatcher interface.
func (hc *healthCheckExtension) NotReady() error {
return nil
}

func (hc *healthCheckExtension) eventLoop(ctx context.Context) {
// Record events with component.StatusStarting, but queue other events until
// PipelineWatcher.Ready is called. This prevents aggregate statuses from
// flapping between StatusStarting and StatusOK as components are started
// individually by the service.
var eventQueue []*eventSourcePair

for loop := true; loop; {
select {
case esp, ok := <-hc.eventCh:
if !ok {
return
}
if esp.event.Status() != component.StatusStarting {
eventQueue = append(eventQueue, esp)
continue
}
hc.aggregator.RecordStatus(esp.source, esp.event)
case <-hc.readyCh:
for _, esp := range eventQueue {
hc.aggregator.RecordStatus(esp.source, esp.event)
}
eventQueue = nil
loop = false
case <-ctx.Done():
return
}
}

// After PipelineWatcher.Ready, record statuses as they are received.
for {
select {
case esp, ok := <-hc.eventCh:
if !ok {
return
}
hc.aggregator.RecordStatus(esp.source, esp.event)
case <-ctx.Done():
return
}
}
}
135 changes: 135 additions & 0 deletions extension/healthcheckv2extension/extension_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package healthcheckv2extension

import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/extension/extensiontest"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/testhelpers"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
)

func TestComponentStatus(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.HTTPConfig.Endpoint = testutil.GetAvailableLocalAddress(t)
cfg.GRPCConfig.NetAddr.Endpoint = testutil.GetAvailableLocalAddress(t)
cfg.UseV2 = true
ext := newExtension(context.Background(), *cfg, extensiontest.NewNopSettings())

// Status before Start will be StatusNone
st, ok := ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
assert.Equal(t, st.Status(), component.StatusNone)

require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))

traces := testhelpers.NewPipelineMetadata("traces")

// StatusStarting will be sent immediately.
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStarting))
}

// StatusOK will be queued until the PipelineWatcher Ready method is called.
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusOK))
}

// Note the use of assert.Eventually here and throughout this test is because
// status events are processed asynchronously in the background.
assert.Eventually(t, func() bool {
st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
return st.Status() == component.StatusStarting
}, time.Second, 10*time.Millisecond)

require.NoError(t, ext.Ready())

assert.Eventually(t, func() bool {
st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
return st.Status() == component.StatusOK
}, time.Second, 10*time.Millisecond)

// StatusStopping will be sent immediately.
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStopping))
}

assert.Eventually(t, func() bool {
st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
return st.Status() == component.StatusStopping
}, time.Second, 10*time.Millisecond)

require.NoError(t, ext.NotReady())
require.NoError(t, ext.Shutdown(context.Background()))

// Events sent after shutdown will be discarded
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStopped))
}

st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
assert.Equal(t, component.StatusStopping, st.Status())
}

func TestNotifyConfig(t *testing.T) {
confMap, err := confmaptest.LoadConf(
filepath.Join("internal", "http", "testdata", "config.yaml"),
)
require.NoError(t, err)
confJSON, err := os.ReadFile(
filepath.Clean(filepath.Join("internal", "http", "testdata", "config.json")),
)
require.NoError(t, err)

endpoint := testutil.GetAvailableLocalAddress(t)

cfg := createDefaultConfig().(*Config)
cfg.UseV2 = true
cfg.HTTPConfig.Endpoint = endpoint
cfg.HTTPConfig.Config.Enabled = true
cfg.HTTPConfig.Config.Path = "/config"

ext := newExtension(context.Background(), *cfg, extensiontest.NewNopSettings())

require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })

client := &http.Client{}
url := fmt.Sprintf("http://%s/config", endpoint)

var resp *http.Response

resp, err = client.Get(url)
require.NoError(t, err)
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)

require.NoError(t, ext.NotifyConfig(context.Background(), confMap))

resp, err = client.Get(url)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, confJSON, body)
}
2 changes: 1 addition & 1 deletion extension/healthcheckv2extension/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
go.opentelemetry.io/otel/metric v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
google.golang.org/grpc v1.65.0
)
Expand Down Expand Up @@ -62,7 +63,6 @@ require (
go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down

0 comments on commit f4d8810

Please sign in to comment.