From e7871a6f8c8d9cc0b1a6989e7613a3bf074946ec Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 18 Jul 2024 13:40:02 -0400 Subject: [PATCH 01/12] remove waiting for opamp server connection --- cmd/opampsupervisor/supervisor/supervisor.go | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 3ef418a68f14..3099a3417ece 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -131,8 +131,6 @@ type Supervisor struct { agentStartHealthCheckAttempts int agentRestarting atomic.Bool - connectedToOpAMPServer chan struct{} - // The OpAMP server to communicate with the Collector's OpAMP extension opampServer server.OpAMPServer opampServerPort int @@ -145,7 +143,6 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { hasNewConfig: make(chan struct{}, 1), agentConfigOwnMetricsSection: &atomic.Value{}, mergedConfig: &atomic.Value{}, - connectedToOpAMPServer: make(chan struct{}), effectiveConfig: &atomic.Value{}, agentDescription: &atomic.Value{}, doneChan: make(chan struct{}), @@ -198,10 +195,6 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { return nil, fmt.Errorf("cannot start OpAMP client: %w", err) } - if connErr := s.waitForOpAMPConnection(); connErr != nil { - return nil, fmt.Errorf("failed to connect to the OpAMP server: %w", connErr) - } - s.commander, err = commander.NewCommander( s.logger, s.config.Agent, @@ -403,7 +396,6 @@ func (s *Supervisor) startOpAMPClient() error { InstanceUid: types.InstanceUid(s.persistentState.InstanceID), Callbacks: types.CallbacksStruct{ OnConnectFunc: func(_ context.Context) { - s.connectedToOpAMPServer <- struct{}{} s.logger.Debug("Connected to the server.") }, OnConnectFailedFunc: func(_ context.Context, err error) { @@ -679,17 +671,7 @@ func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *prot return err } } - return s.waitForOpAMPConnection() -} - -func (s *Supervisor) waitForOpAMPConnection() error { - // wait for the OpAMP client to connect to the server or timeout - select { - case <-s.connectedToOpAMPServer: - return nil - case <-time.After(10 * time.Second): - return errors.New("timed out waiting for the server to connect") - } + return nil } func (s *Supervisor) composeBootstrapConfig() ([]byte, error) { From 90b5d22a922e68fc50634172ab89a6b55c277ff1 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 18 Jul 2024 15:49:02 -0400 Subject: [PATCH 02/12] Write test for starting without a running opamp server --- cmd/opampsupervisor/e2e_test.go | 108 +++++++++++++++++- cmd/opampsupervisor/supervisor/supervisor.go | 10 +- .../supervisor/supervisor_test.go | 2 +- .../collector/healthcheck_config.yaml | 16 +++ 4 files changed, 132 insertions(+), 4 deletions(-) create mode 100644 cmd/opampsupervisor/testdata/collector/healthcheck_config.yaml diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index b5faabe7183a..170bdd0f221f 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -9,13 +9,16 @@ import ( "bytes" "context" "crypto/sha256" + "fmt" "io" "log" + "net" "net/http" "net/http/httptest" "os" "os/exec" "path" + "path/filepath" "runtime" "strings" "sync/atomic" @@ -36,6 +39,7 @@ import ( "github.com/stretchr/testify/require" semconv "go.opentelemetry.io/collector/semconv/v1.21.0" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config" @@ -74,10 +78,17 @@ type testingOpAMPServer struct { addr string supervisorConnected chan bool sendToSupervisor func(*protobufs.ServerToAgent) + start func() shutdown func() } func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer { + s := newUnstartedOpAMPServer(t, connectingCallback, callbacks) + s.start() + return s +} + +func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer { var agentConn atomic.Value var isAgentConnected atomic.Bool var didShutdown atomic.Bool @@ -108,7 +119,7 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca require.NoError(t, err) mux := http.NewServeMux() mux.HandleFunc("/v1/opamp", handler) - httpSrv := httptest.NewServer(mux) + httpSrv := httptest.NewUnstartedServer(mux) shutdown := func() { if !didShutdown.Load() { @@ -135,6 +146,7 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca addr: httpSrv.Listener.Addr().String(), supervisorConnected: connectedChan, sendToSupervisor: send, + start: httpSrv.Start, shutdown: shutdown, } } @@ -238,6 +250,56 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) { }, 10*time.Second, 500*time.Millisecond, "Log never appeared in output") } +func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) { + storageDir := t.TempDir() + remoteConfigFilePath := filepath.Join(storageDir, "last_recv_remote_config.dat") + + cfg, hash, healthcheckPort := createHealthCheckCollectorConf(t) + remoteConfigProto := &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: cfg.Bytes()}, + }, + }, + ConfigHash: hash, + } + marshalledRemoteConfig, err := proto.Marshal(remoteConfigProto) + require.NoError(t, err) + + require.NoError(t, os.WriteFile(remoteConfigFilePath, marshalledRemoteConfig, 0600)) + + server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{}) + defer server.shutdown() + + // TODO: figure out URL ahead of time + s := newSupervisor(t, "basic", map[string]string{ + "url": server.addr, + "storage_dir": storageDir, + }) + defer s.Shutdown() + + // Verify the collector runs eventually by pinging the healthcheck extension + require.Eventually(t, func() bool { + resp, err := http.DefaultClient.Get(fmt.Sprintf("http://localhost:%d", healthcheckPort)) + if err != nil { + t.Logf("Failed healthcheck: %s", err) + return false + } + require.NoError(t, resp.Body.Close()) + if resp.StatusCode >= 300 || resp.StatusCode < 200 { + t.Logf("Got non-200 status code: %d", resp.StatusCode) + return false + } + return true + }, 3*time.Second, 100*time.Millisecond) + + // Start the server and wait for the supervisor to connect + server.start() + + // Verify supervisor connects to server + waitForSupervisorConnection(server.supervisorConnected, true) +} + func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) { var healthReport atomic.Value var agentConfig atomic.Value @@ -639,6 +701,32 @@ func createBadCollectorConf(t *testing.T) (*bytes.Buffer, []byte) { return bytes.NewBuffer(colCfg), h.Sum(nil) } +func createHealthCheckCollectorConf(t *testing.T) (cfg *bytes.Buffer, hash []byte, remotePort int) { + colCfgTpl, err := os.ReadFile(path.Join("testdata", "collector", "healthcheck_config.yaml")) + require.NoError(t, err) + + templ, err := template.New("").Parse(string(colCfgTpl)) + require.NoError(t, err) + + port, err := findRandomPort() + + var confmapBuf bytes.Buffer + err = templ.Execute( + &confmapBuf, + map[string]string{ + "HealthCheckEndpoint": fmt.Sprintf("localhost:%d", port), + }, + ) + require.NoError(t, err) + + h := sha256.New() + if _, err := io.Copy(h, bytes.NewBuffer(confmapBuf.Bytes())); err != nil { + t.Fatalf("Failed to compute hash: %s", err) + } + + return &confmapBuf, h.Sum(nil), port +} + // Wait for the Supervisor to connect to or disconnect from the OpAMP server func waitForSupervisorConnection(connection chan bool, connected bool) { select { @@ -1012,3 +1100,21 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) { require.Equal(t, newID, uuid.UUID(newRecievedAgentID)) } + +func findRandomPort() (int, error) { + l, err := net.Listen("tcp", "localhost:0") + + if err != nil { + return 0, err + } + + port := l.Addr().(*net.TCPAddr).Port + + err = l.Close() + + if err != nil { + return 0, err + } + + return port, nil +} diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 3099a3417ece..61b656dda852 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -186,7 +186,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { logger.Debug("Supervisor starting", zap.String("id", s.persistentState.InstanceID.String())) - err = s.loadInitialMergedConfig() + err = s.loadAndWriteInitialMergedConfig() if err != nil { return nil, fmt.Errorf("failed loading initial config: %w", err) } @@ -748,7 +748,7 @@ func (s *Supervisor) composeOpAMPExtensionConfig() []byte { return cfg.Bytes() } -func (s *Supervisor) loadInitialMergedConfig() error { +func (s *Supervisor) loadAndWriteInitialMergedConfig() error { var lastRecvRemoteConfig, lastRecvOwnMetricsConfig []byte var err error @@ -791,6 +791,12 @@ func (s *Supervisor) loadInitialMergedConfig() error { return fmt.Errorf("could not compose initial merged config: %w", err) } + // write the initial merged config to disk + cfg := s.mergedConfig.Load().(string) + if err := os.WriteFile(agentConfigFilePath, []byte(cfg), 0600); err != nil { + s.logger.Error("Failed to write agent config.", zap.Error(err)) + } + return nil } diff --git a/cmd/opampsupervisor/supervisor/supervisor_test.go b/cmd/opampsupervisor/supervisor/supervisor_test.go index 1af3653e05cc..df946175a003 100644 --- a/cmd/opampsupervisor/supervisor/supervisor_test.go +++ b/cmd/opampsupervisor/supervisor/supervisor_test.go @@ -69,7 +69,7 @@ service: exporters: [file]` require.NoError(t, s.createTemplates()) - require.NoError(t, s.loadInitialMergedConfig()) + require.NoError(t, s.loadAndWriteInitialMergedConfig()) configChanged, err := s.composeMergedConfig(&protobufs.AgentRemoteConfig{ Config: &protobufs.AgentConfigMap{ diff --git a/cmd/opampsupervisor/testdata/collector/healthcheck_config.yaml b/cmd/opampsupervisor/testdata/collector/healthcheck_config.yaml new file mode 100644 index 000000000000..e6baee1be269 --- /dev/null +++ b/cmd/opampsupervisor/testdata/collector/healthcheck_config.yaml @@ -0,0 +1,16 @@ +receivers: + nop: + +exporters: + nop: + +extensions: + health_check/livenesscheck: + endpoint: "{{ .HealthCheckEndpoint }}" + +service: + extensions: [health_check/livenesscheck] + pipelines: + logs: + receivers: [nop] + exporters: [nop] From 277d990731e46f4a5d88ed09941d5d2968e97a95 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 18 Jul 2024 16:00:01 -0400 Subject: [PATCH 03/12] add chlog --- ...x_supervisor-dont-require-server-connection.yaml | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 .chloggen/fix_supervisor-dont-require-server-connection.yaml diff --git a/.chloggen/fix_supervisor-dont-require-server-connection.yaml b/.chloggen/fix_supervisor-dont-require-server-connection.yaml new file mode 100644 index 000000000000..930693f63ac3 --- /dev/null +++ b/.chloggen/fix_supervisor-dont-require-server-connection.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cmd/opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Start even if the OpAMP server cannot be contacted, and continually retry connecting. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33408, 33799] From fe11945fefd3053de499cf7993434fe13e3c2333 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 18 Jul 2024 16:05:10 -0400 Subject: [PATCH 04/12] remove fixed TODO --- cmd/opampsupervisor/e2e_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 170bdd0f221f..855f1c2e09ff 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -271,7 +271,6 @@ func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) { server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{}) defer server.shutdown() - // TODO: figure out URL ahead of time s := newSupervisor(t, "basic", map[string]string{ "url": server.addr, "storage_dir": storageDir, From d276bdfd5dd84731b0e1e2c124bbcfaf1544a0c0 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 18 Jul 2024 16:07:44 -0400 Subject: [PATCH 05/12] Ensure supervisor connects and does not timeout in test --- cmd/opampsupervisor/e2e_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 855f1c2e09ff..da15940c4667 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -268,7 +268,12 @@ func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) { require.NoError(t, os.WriteFile(remoteConfigFilePath, marshalledRemoteConfig, 0600)) - server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{}) + connected := atomic.Bool{} + server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{ + OnConnectedFunc: func(ctx context.Context, conn types.Connection) { + connected.Store(true) + }, + }) defer server.shutdown() s := newSupervisor(t, "basic", map[string]string{ @@ -297,6 +302,8 @@ func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) { // Verify supervisor connects to server waitForSupervisorConnection(server.supervisorConnected, true) + + require.True(t, connected.Load(), "Supervisor failed to connect") } func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) { From 83be33c2f265f7e1ca6f79250cd3b02c2d423531 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 18 Jul 2024 16:08:38 -0400 Subject: [PATCH 06/12] use sum256 over hasher --- cmd/opampsupervisor/e2e_test.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index da15940c4667..2c282ffaee55 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -725,12 +725,9 @@ func createHealthCheckCollectorConf(t *testing.T) (cfg *bytes.Buffer, hash []byt ) require.NoError(t, err) - h := sha256.New() - if _, err := io.Copy(h, bytes.NewBuffer(confmapBuf.Bytes())); err != nil { - t.Fatalf("Failed to compute hash: %s", err) - } + h := sha256.Sum256(confmapBuf.Bytes()) - return &confmapBuf, h.Sum(nil), port + return &confmapBuf, h[:], port } // Wait for the Supervisor to connect to or disconnect from the OpAMP server From 385025e8b31c32abe2f29dfa8a917836adf2a5ff Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 22 Jul 2024 09:43:06 -0400 Subject: [PATCH 07/12] 200 -> 2xx --- cmd/opampsupervisor/e2e_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 2c282ffaee55..4cbbd70eadea 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -291,7 +291,7 @@ func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) { } require.NoError(t, resp.Body.Close()) if resp.StatusCode >= 300 || resp.StatusCode < 200 { - t.Logf("Got non-200 status code: %d", resp.StatusCode) + t.Logf("Got non-2xx status code: %d", resp.StatusCode) return false } return true From ec2660fcc3750e51d8573d1160467f9bb7e095d1 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 22 Jul 2024 09:59:28 -0400 Subject: [PATCH 08/12] Add test for collector with no existing config --- cmd/opampsupervisor/e2e_test.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 4cbbd70eadea..3f6002d87e76 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -306,6 +306,32 @@ func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) { require.True(t, connected.Load(), "Supervisor failed to connect") } +func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) { + connected := atomic.Bool{} + server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{ + OnConnectedFunc: func(ctx context.Context, conn types.Connection) { + connected.Store(true) + }, + }) + defer server.shutdown() + + // The supervisor is started without a running OpAMP server. + // The supervisor should start successfully, even if the OpAMP server is stopped. + s := newSupervisor(t, "basic", map[string]string{ + "url": server.addr, + }) + defer s.Shutdown() + + // Start the server and wait for the supervisor to connect + server.start() + + // Verify supervisor connects to server + waitForSupervisorConnection(server.supervisorConnected, true) + + require.True(t, connected.Load(), "Supervisor failed to connect") + +} + func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) { var healthReport atomic.Value var agentConfig atomic.Value From e15abbe70fa77190c7e268ee9177897a08daac8d Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 29 Jul 2024 13:12:42 -0400 Subject: [PATCH 09/12] endure collector starts with noop config --- cmd/opampsupervisor/e2e_test.go | 15 +++++++++++ cmd/opampsupervisor/supervisor/supervisor.go | 27 +++++++++++++++++--- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 3f6002d87e76..8c9c5b36df79 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -322,6 +322,21 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) { }) defer s.Shutdown() + // Verify the collector is running by checking the metrics endpoint + require.Eventually(t, func() bool { + resp, err := http.DefaultClient.Get("http://localhost:8888/metrics") + if err != nil { + t.Logf("Failed check for prometheus metrics: %s", err) + return false + } + require.NoError(t, resp.Body.Close()) + if resp.StatusCode >= 300 || resp.StatusCode < 200 { + t.Logf("Got non-2xx status code: %d", resp.StatusCode) + return false + } + return true + }, 3*time.Second, 100*time.Millisecond) + // Start the server and wait for the supervisor to connect server.start() diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 61b656dda852..1cfb6cc1108d 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -674,9 +674,7 @@ func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *prot return nil } -func (s *Supervisor) composeBootstrapConfig() ([]byte, error) { - var k = koanf.New("::") - +func (s *Supervisor) composeNoopPipeline() ([]byte, error) { var cfg bytes.Buffer err := s.bootstrapTemplate.Execute(&cfg, map[string]any{ "InstanceUid": s.persistentState.InstanceID.String(), @@ -686,7 +684,17 @@ func (s *Supervisor) composeBootstrapConfig() ([]byte, error) { return nil, err } - if err = k.Load(rawbytes.Provider(cfg.Bytes()), yaml.Parser(), koanf.WithMergeFunc(configMergeFunc)); err != nil { + return cfg.Bytes(), nil +} + +func (s *Supervisor) composeBootstrapConfig() ([]byte, error) { + var k = koanf.New("::") + + cfg, err := s.composeNoopPipeline() + if err != nil { + return nil, err + } + if err = k.Load(rawbytes.Provider(cfg), yaml.Parser(), koanf.WithMergeFunc(configMergeFunc)); err != nil { return nil, err } if err = k.Load(rawbytes.Provider(s.composeOpAMPExtensionConfig()), yaml.Parser(), koanf.WithMergeFunc(configMergeFunc)); err != nil { @@ -901,6 +909,17 @@ func (s *Supervisor) composeMergedConfig(config *protobufs.AgentRemoteConfig) (c return false, fmt.Errorf("cannot merge config named %s: %w", name, err) } } + } else { + // Add noop pipeline + var noopConfig []byte + noopConfig, err = s.composeNoopPipeline() + if err != nil { + return false, fmt.Errorf("could not compose noop pipeline: %w", err) + } + + if err = k.Load(rawbytes.Provider(noopConfig), yaml.Parser(), koanf.WithMergeFunc(configMergeFunc)); err != nil { + return false, fmt.Errorf("could not merge noop pipeline: %w", err) + } } // Merge own metrics config. From 68beddb318023a02612961e6b2fd9f89b49cf9a8 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 29 Jul 2024 13:36:34 -0400 Subject: [PATCH 10/12] test that supervisor gets new config --- cmd/opampsupervisor/e2e_test.go | 46 +++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 8c9c5b36df79..39635add3e82 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -9,6 +9,7 @@ import ( "bytes" "context" "crypto/sha256" + "errors" "fmt" "io" "log" @@ -307,11 +308,22 @@ func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) { } func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) { + cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t) + + configuredChan := make(chan struct{}) connected := atomic.Bool{} server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{ OnConnectedFunc: func(ctx context.Context, conn types.Connection) { connected.Store(true) }, + OnMessageFunc: func(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + lastCfgHash := message.GetRemoteConfigStatus().GetLastRemoteConfigHash() + if bytes.Equal(lastCfgHash, hash) { + close(configuredChan) + } + + return &protobufs.ServerToAgent{} + }, }) defer server.shutdown() @@ -345,6 +357,40 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) { require.True(t, connected.Load(), "Supervisor failed to connect") + // Verify that the collector can run a new config sent to it + server.sendToSupervisor(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: cfg.Bytes()}, + }, + }, + ConfigHash: hash, + }, + }) + + select { + case <-configuredChan: + case <-time.After(2 * time.Second): + require.FailNow(t, "timed out waiting for collector to reconfigure") + } + + sampleLog := `{"body":"hello, world"}` + n, err := inputFile.WriteString(sampleLog + "\n") + require.NotZero(t, n, "Could not write to input file") + require.NoError(t, err) + + require.Eventually(t, func() bool { + logRecord := make([]byte, 1024) + + n, err = outputFile.Read(logRecord) + if !errors.Is(err, io.EOF) { + require.NoError(t, err) + } + + return n != 0 + }, 10*time.Second, 500*time.Millisecond, "Log never appeared in output") + } func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) { From 128af7f9d19c3032392fa649f1c3b64251d41bc4 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 1 Aug 2024 10:22:29 -0400 Subject: [PATCH 11/12] rename bootstrap template to noop pipeline template --- cmd/opampsupervisor/supervisor/supervisor.go | 14 +++++++------- .../{bootstrap_pipeline.yaml => nooppipeline.yaml} | 0 2 files changed, 7 insertions(+), 7 deletions(-) rename cmd/opampsupervisor/supervisor/templates/{bootstrap_pipeline.yaml => nooppipeline.yaml} (100%) diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 1cfb6cc1108d..0f6cbcf6b740 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -43,8 +43,8 @@ import ( ) var ( - //go:embed templates/bootstrap_pipeline.yaml - bootstrapConfTpl string + //go:embed templates/nooppipeline.yaml + noopPipelineTpl string //go:embed templates/extraconfig.yaml extraConfigTpl string @@ -89,7 +89,7 @@ type Supervisor struct { // Supervisor's persistent state persistentState *persistentState - bootstrapTemplate *template.Template + noopPipelineTemplate *template.Template opampextensionTemplate *template.Template extraConfigTemplate *template.Template ownTelemetryTemplate *template.Template @@ -224,7 +224,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { func (s *Supervisor) createTemplates() error { var err error - if s.bootstrapTemplate, err = template.New("bootstrap").Parse(bootstrapConfTpl); err != nil { + if s.noopPipelineTemplate, err = template.New("nooppipeline").Parse(noopPipelineTpl); err != nil { return err } if s.extraConfigTemplate, err = template.New("extraconfig").Parse(extraConfigTpl); err != nil { @@ -273,7 +273,7 @@ func (s *Supervisor) getBootstrapInfo() (err error) { return err } - bootstrapConfig, err := s.composeBootstrapConfig() + bootstrapConfig, err := s.composeNoopConfig() if err != nil { return err } @@ -676,7 +676,7 @@ func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *prot func (s *Supervisor) composeNoopPipeline() ([]byte, error) { var cfg bytes.Buffer - err := s.bootstrapTemplate.Execute(&cfg, map[string]any{ + err := s.noopPipelineTemplate.Execute(&cfg, map[string]any{ "InstanceUid": s.persistentState.InstanceID.String(), "SupervisorPort": s.opampServerPort, }) @@ -687,7 +687,7 @@ func (s *Supervisor) composeNoopPipeline() ([]byte, error) { return cfg.Bytes(), nil } -func (s *Supervisor) composeBootstrapConfig() ([]byte, error) { +func (s *Supervisor) composeNoopConfig() ([]byte, error) { var k = koanf.New("::") cfg, err := s.composeNoopPipeline() diff --git a/cmd/opampsupervisor/supervisor/templates/bootstrap_pipeline.yaml b/cmd/opampsupervisor/supervisor/templates/nooppipeline.yaml similarity index 100% rename from cmd/opampsupervisor/supervisor/templates/bootstrap_pipeline.yaml rename to cmd/opampsupervisor/supervisor/templates/nooppipeline.yaml From 9f00540216b5baaec31da737a029b832ef3791d0 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 1 Aug 2024 10:33:13 -0400 Subject: [PATCH 12/12] add section to spec describing behavior when OpAMP server is unavailable --- cmd/opampsupervisor/specification/README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cmd/opampsupervisor/specification/README.md b/cmd/opampsupervisor/specification/README.md index 411f7ce3de10..10df4925e0ca 100644 --- a/cmd/opampsupervisor/specification/README.md +++ b/cmd/opampsupervisor/specification/README.md @@ -160,6 +160,13 @@ agent: ``` +### Operation When OpAMP Server is Unavailable + +When the supervisor cannot connect to the OpAMP server, the collector will +be run with the last known configuration, or with a "noop" configuration +if no previous configuration is persisted. The supervisor will continually +attempt to reconnect to the OpAMP server with exponential backoff. + ### Executing Collector The Supervisor starts and stops the Collector process as necessary. When