Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cmd/opampsupervisor]: Supervisor waits for configurable healthchecks to report remote config status #34907

Merged
merged 29 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
697f96e
[cmd/opampsupervisor]: Supervisor waits for configurable healthchecks…
srikanthccv Aug 28, 2024
9259bf6
Merge branch 'main' into issue_21079
srikanthccv Aug 28, 2024
a74ef23
Update agent config validation
srikanthccv Aug 30, 2024
9f83104
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Aug 30, 2024
136b2b6
Review comments
srikanthccv Sep 6, 2024
8e85f7b
Frequent checks for subsequent asserts
srikanthccv Sep 6, 2024
3cabc23
Resolve conflicts
srikanthccv Sep 6, 2024
8208ca2
Merge branch 'main' into issue_21079
srikanthccv Sep 11, 2024
fd57ea9
resolve conflicts
srikanthccv Oct 8, 2024
88da95f
resolve conflicts again
srikanthccv Oct 8, 2024
04261f3
Fix tests
srikanthccv Oct 8, 2024
6c7f617
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Oct 8, 2024
dedd6a0
Merge branch 'main' into issue_21079
srikanthccv Oct 8, 2024
169d25f
Merge branch 'main' into issue_21079
srikanthccv Oct 16, 2024
8d56b88
Fix tests
srikanthccv Oct 16, 2024
6752571
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Oct 16, 2024
a149e5f
Remove unnecessary check
srikanthccv Oct 16, 2024
873c072
Add CHANGELOG entry
srikanthccv Oct 16, 2024
b9b4d20
Merge branch 'main' into issue_21079
srikanthccv Oct 17, 2024
38016ff
Merge branch 'main' into issue_21079
srikanthccv Oct 20, 2024
e317745
Use agent health from opamp extension for config status report
srikanthccv Oct 20, 2024
fde058d
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Oct 20, 2024
165f1a2
go mod tidy
srikanthccv Oct 20, 2024
b7dbd63
Merge branch 'main' into issue_21079
srikanthccv Oct 30, 2024
2e0908d
Remove health check interval option
srikanthccv Oct 30, 2024
3a61771
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Oct 30, 2024
4c43fed
Update config_test
srikanthccv Oct 30, 2024
89f143e
Remove removed interval refs
srikanthccv Oct 30, 2024
6e3f678
Resolve conflicts
srikanthccv Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/opamp_21079_configurable_health_checks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Supervisor waits for configurable healthchecks to report remote config status.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [21079]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
127 changes: 126 additions & 1 deletion cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ func newSupervisor(t *testing.T, configType string, extraConfigData map[string]s
cfg, err := config.Load(cfgFile.Name())
require.NoError(t, err)

s, err := supervisor.NewSupervisor(zap.NewNop(), cfg)
logger, err := zap.NewDevelopment()
require.NoError(t, err)

s, err := supervisor.NewSupervisor(logger, cfg)
require.NoError(t, err)

return s
Expand Down Expand Up @@ -1443,6 +1446,128 @@ func TestSupervisorLogging(t *testing.T) {
require.NoError(t, logFile.Close())
}

func TestSupervisorRemoteConfigApplyStatus(t *testing.T) {
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
var agentConfig atomic.Value
var healthReport atomic.Value
var remoteConfigStatus atomic.Value
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
agentConfig.Store(string(config.Body))
}
}
if message.Health != nil {
healthReport.Store(message.Health)
}
if message.RemoteConfigStatus != nil {
remoteConfigStatus.Store(message.RemoteConfigStatus)
}

return &protobufs.ServerToAgent{}
},
})

s := newSupervisor(t, "report_status", map[string]string{
"url": server.addr,
"config_apply_timeout": "3s",
})
require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t)

server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
},
})

// Check that the status is set to APPLYING
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING
}, 5*time.Second, 100*time.Millisecond, "Remote config status was not set to APPLYING")

// Wait for collector to become healthy
require.Eventually(t, func() bool {
health, ok := healthReport.Load().(*protobufs.ComponentHealth)
return ok && health.Healthy
}, 10*time.Second, 10*time.Millisecond, "Collector did not become healthy")

// Check that the status is set to APPLIED
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED
}, 5*time.Second, 10*time.Millisecond, "Remote config status was not set to APPLIED")

require.Eventually(t, func() bool {
cfg, ok := agentConfig.Load().(string)
if ok {
// The effective config may be structurally different compared to what was sent,
// and will also have some data redacted,
// so just check that it includes the filelog receiver
return strings.Contains(cfg, "filelog")
}

return false
}, 5*time.Second, 10*time.Millisecond, "Collector was not started with remote config")

n, err := inputFile.WriteString("{\"body\":\"hello, world\"}\n")
require.NotZero(t, n, "Could not write to input file")
require.NoError(t, err)

require.Eventually(t, func() bool {
logRecord := make([]byte, 1024)
n, _ := outputFile.Read(logRecord)

return n != 0
}, 10*time.Second, 100*time.Millisecond, "Log never appeared in output")

// Test with bad configuration
badCfg, badHash := createBadCollectorConf(t)

server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: badCfg.Bytes()},
},
},
ConfigHash: badHash,
},
})

// Check that the status is set to APPLYING
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING
}, 5*time.Second, 200*time.Millisecond, "Remote config status was not set to APPLYING for bad config")

// Wait for the health checks to fail
require.Eventually(t, func() bool {
health, ok := healthReport.Load().(*protobufs.ComponentHealth)
return ok && !health.Healthy
}, 30*time.Second, 100*time.Millisecond, "Collector did not become unhealthy with bad config")

// Check that the status is set to FAILED after failed health checks
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED
}, 15*time.Second, 100*time.Millisecond, "Remote config status was not set to FAILED for bad config")
}

func TestSupervisorOpAmpServerPort(t *testing.T) {
var agentConfig atomic.Value
server := newOpAMPServer(
Expand Down
6 changes: 6 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type Agent struct {
Executable string
OrphanDetectionInterval time.Duration `mapstructure:"orphan_detection_interval"`
Description AgentDescription `mapstructure:"description"`
ConfigApplyTimeout time.Duration `mapstructure:"config_apply_timeout"`
BootstrapTimeout time.Duration `mapstructure:"bootstrap_timeout"`
HealthCheckPort int `mapstructure:"health_check_port"`
OpAMPServerPort int `mapstructure:"opamp_server_port"`
Expand Down Expand Up @@ -185,6 +186,10 @@ func (a Agent) Validate() error {
return fmt.Errorf("could not stat agent::executable path: %w", err)
}

if a.ConfigApplyTimeout <= 0 {
return errors.New("agent::config_apply_timeout must be valid duration")
}

return nil
}

Expand Down Expand Up @@ -234,6 +239,7 @@ func DefaultSupervisor() Supervisor {
},
Agent: Agent{
OrphanDetectionInterval: 5 * time.Second,
ConfigApplyTimeout: 5 * time.Second,
BootstrapTimeout: 3 * time.Second,
PassthroughLogs: false,
},
Expand Down
41 changes: 41 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
ConfigApplyTimeout: 2 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -59,6 +60,7 @@ func TestValidate(t *testing.T) {
},
Agent: Agent{
Executable: "${file_path}",
ConfigApplyTimeout: 2 * time.Second,
OrphanDetectionInterval: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -84,6 +86,7 @@ func TestValidate(t *testing.T) {
},
Agent: Agent{
Executable: "${file_path}",
ConfigApplyTimeout: 2 * time.Second,
OrphanDetectionInterval: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -109,6 +112,7 @@ func TestValidate(t *testing.T) {
},
Agent: Agent{
Executable: "${file_path}",
ConfigApplyTimeout: 2 * time.Second,
OrphanDetectionInterval: 5 * time.Second,
},
Capabilities: Capabilities{
Expand Down Expand Up @@ -138,6 +142,7 @@ func TestValidate(t *testing.T) {
},
Agent: Agent{
Executable: "${file_path}",
ConfigApplyTimeout: 2 * time.Second,
OrphanDetectionInterval: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -164,6 +169,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "",
OrphanDetectionInterval: 5 * time.Second,
ConfigApplyTimeout: 2 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -190,6 +196,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "./path/does/not/exist",
OrphanDetectionInterval: 5 * time.Second,
ConfigApplyTimeout: 2 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -215,6 +222,7 @@ func TestValidate(t *testing.T) {
},
Agent: Agent{
Executable: "${file_path}",
ConfigApplyTimeout: 2 * time.Second,
OrphanDetectionInterval: -1,
},
Capabilities: Capabilities{
Expand Down Expand Up @@ -242,6 +250,7 @@ func TestValidate(t *testing.T) {
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
HealthCheckPort: 65536,
ConfigApplyTimeout: 2 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
Expand Down Expand Up @@ -269,6 +278,7 @@ func TestValidate(t *testing.T) {
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
HealthCheckPort: 0,
ConfigApplyTimeout: 2 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -295,6 +305,7 @@ func TestValidate(t *testing.T) {
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
HealthCheckPort: 29848,
ConfigApplyTimeout: 2 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -320,6 +331,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
ConfigApplyTimeout: 2 * time.Second,
BootstrapTimeout: -5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -343,6 +355,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
ConfigApplyTimeout: 2 * time.Second,
OpAMPServerPort: 65536,
BootstrapTimeout: 5 * time.Second,
},
Expand All @@ -367,6 +380,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
ConfigApplyTimeout: 2 * time.Second,
OpAMPServerPort: 0,
BootstrapTimeout: 5 * time.Second,
},
Expand All @@ -378,6 +392,33 @@ func TestValidate(t *testing.T) {
},
},
},
{
name: "Invalid config apply timeout",
config: Supervisor{
Server: OpAMPServer{
Endpoint: "wss://localhost:9090/opamp",
Headers: http.Header{
"Header1": []string{"HeaderValue"},
},
TLSSetting: configtls.ClientConfig{
Insecure: true,
},
},
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
OpAMPServerPort: 8080,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
AcceptsRemoteConfig: true,
},
Storage: Storage{
Directory: "/etc/opamp-supervisor/storage",
},
},
expectedError: "agent::config_apply_timeout must be valid duration",
},
}

// create some fake files for validating agent config
Expand Down
Loading