Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cmd/opampsupervisor]: Supervisor waits for configurable healthchecks to report remote config status #34907

Merged
merged 29 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
697f96e
[cmd/opampsupervisor]: Supervisor waits for configurable healthchecks…
srikanthccv Aug 28, 2024
9259bf6
Merge branch 'main' into issue_21079
srikanthccv Aug 28, 2024
a74ef23
Update agent config validation
srikanthccv Aug 30, 2024
9f83104
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Aug 30, 2024
136b2b6
Review comments
srikanthccv Sep 6, 2024
8e85f7b
Frequent checks for subsequent asserts
srikanthccv Sep 6, 2024
3cabc23
Resolve conflicts
srikanthccv Sep 6, 2024
8208ca2
Merge branch 'main' into issue_21079
srikanthccv Sep 11, 2024
fd57ea9
resolve conflicts
srikanthccv Oct 8, 2024
88da95f
resolve conflicts again
srikanthccv Oct 8, 2024
04261f3
Fix tests
srikanthccv Oct 8, 2024
6c7f617
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Oct 8, 2024
dedd6a0
Merge branch 'main' into issue_21079
srikanthccv Oct 8, 2024
169d25f
Merge branch 'main' into issue_21079
srikanthccv Oct 16, 2024
8d56b88
Fix tests
srikanthccv Oct 16, 2024
6752571
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Oct 16, 2024
a149e5f
Remove unnecessary check
srikanthccv Oct 16, 2024
873c072
Add CHANGELOG entry
srikanthccv Oct 16, 2024
b9b4d20
Merge branch 'main' into issue_21079
srikanthccv Oct 17, 2024
38016ff
Merge branch 'main' into issue_21079
srikanthccv Oct 20, 2024
e317745
Use agent health from opamp extension for config status report
srikanthccv Oct 20, 2024
fde058d
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Oct 20, 2024
165f1a2
go mod tidy
srikanthccv Oct 20, 2024
b7dbd63
Merge branch 'main' into issue_21079
srikanthccv Oct 30, 2024
2e0908d
Remove health check interval option
srikanthccv Oct 30, 2024
3a61771
Merge branch 'issue_21079' of github.com:srikanthccv/opentelemetry-co…
srikanthccv Oct 30, 2024
4c43fed
Update config_test
srikanthccv Oct 30, 2024
89f143e
Remove removed interval refs
srikanthccv Oct 30, 2024
6e3f678
Resolve conflicts
srikanthccv Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,130 @@ func TestSupervisorLogging(t *testing.T) {
require.True(t, seenCollectorLog)
require.NoError(t, logFile.Close())
}
func TestSupervisorRemoteConfigApplyStatus(t *testing.T) {
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
now := time.Now()
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
var agentConfig atomic.Value
var healthReport atomic.Value
var remoteConfigStatus atomic.Value
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
agentConfig.Store(string(config.Body))
}
}
if message.Health != nil {
healthReport.Store(message.Health)
}
if message.RemoteConfigStatus != nil {
remoteConfigStatus.Store(message.RemoteConfigStatus)
}

return &protobufs.ServerToAgent{}
},
})

s := newSupervisor(t, "report_status", map[string]string{
"url": server.addr,
"successful_health_checks": "2",
"config_apply_timeout": "3s",
"health_check_interval": "1s",
})
require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t)

server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
},
})

// Check that the status is set to APPLYING
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING
}, 5*time.Second, 100*time.Millisecond, "Remote config status was not set to APPLYING")

// Wait for the required number of successful health checks
require.Eventually(t, func() bool {
health, ok := healthReport.Load().(*protobufs.ComponentHealth)
return ok && health.Healthy
}, 10*time.Second, 10*time.Millisecond, "Collector did not become healthy")

// Check that the status is set to APPLIED after successful health checks
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED
}, 5*time.Second, 10*time.Millisecond, "Remote config status was not set to APPLIED")

require.Eventually(t, func() bool {
cfg, ok := agentConfig.Load().(string)
if ok {
// The effective config may be structurally different compared to what was sent,
// and will also have some data redacted,
// so just check that it includes the filelog receiver
return strings.Contains(cfg, "filelog")
}

return false
}, 5*time.Second, 10*time.Millisecond, "Collector was not started with remote config")

n, err := inputFile.WriteString("{\"body\":\"hello, world\"}\n")
require.NotZero(t, n, "Could not write to input file")
require.NoError(t, err)

require.Eventually(t, func() bool {
logRecord := make([]byte, 1024)
n, _ := outputFile.Read(logRecord)

return n != 0
}, 10*time.Second, 100*time.Millisecond, "Log never appeared in output")

// Test with bad configuration
badCfg, badHash := createBadCollectorConf(t)

server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: badCfg.Bytes()},
},
},
ConfigHash: badHash,
},
})

// Check that the status is set to APPLYING
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING
}, 5*time.Second, 200*time.Millisecond, "Remote config status was not set to APPLYING for bad config")

// Wait for the health checks to fail
require.Eventually(t, func() bool {
health, ok := healthReport.Load().(*protobufs.ComponentHealth)
return ok && !health.Healthy
}, 30*time.Second, 100*time.Millisecond, "Collector did not become unhealthy with bad config")

// Check that the status is set to FAILED after failed health checks
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED
}, 15*time.Second, 100*time.Millisecond, "Remote config status was not set to FAILED for bad config")
}

func findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")
Expand Down
18 changes: 18 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ type Agent struct {
Executable string
OrphanDetectionInterval time.Duration `mapstructure:"orphan_detection_interval"`
Description AgentDescription `mapstructure:"description"`
SuccessfulHealthChecks int32 `mapstructure:"successful_health_checks"`
ConfigApplyTimeout time.Duration `mapstructure:"config_apply_timeout"`
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
HealthCheckInterval time.Duration `mapstructure:"health_check_interval"`
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
BootstrapTimeout time.Duration `mapstructure:"bootstrap_timeout"`
HealthCheckPort int `mapstructure:"health_check_port"`
PassthroughLogs bool `mapstructure:"passthrough_logs"`
Expand Down Expand Up @@ -180,6 +183,18 @@ func (a Agent) Validate() error {
return fmt.Errorf("could not stat agent::executable path: %w", err)
}

if a.SuccessfulHealthChecks <= 0 {
return errors.New("agent::successful_health_checks must be positive")
}

if a.ConfigApplyTimeout <= 0 {
return errors.New("agent::config_apply_timeout must be valid duration")
}

if a.HealthCheckInterval <= 0 {
return errors.New("agent::health_check_interval must be valid duration")
}

return nil
}

Expand Down Expand Up @@ -229,6 +244,9 @@ func DefaultSupervisor() Supervisor {
},
Agent: Agent{
OrphanDetectionInterval: 5 * time.Second,
SuccessfulHealthChecks: 3,
ConfigApplyTimeout: 30 * time.Second,
HealthCheckInterval: 10 * time.Second,
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
BootstrapTimeout: 3 * time.Second,
PassthroughLogs: false,
},
Expand Down
2 changes: 2 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config_test.go
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
SuccessfulHealthChecks: 1,
ConfigApplyTimeout: 1 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
Expand Down
Loading
Loading