Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cmd/opampsupervisor]: Don't fail to start if the OpAMP server is unavailable #34159

Merged
13 changes: 13 additions & 0 deletions .chloggen/fix_supervisor-dont-require-server-connection.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cmd/opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Start even if the OpAMP server cannot be contacted, and continually retry connecting.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33408, 33799]
137 changes: 136 additions & 1 deletion cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"path"
"path/filepath"
"runtime"
"strings"
"sync/atomic"
Expand All @@ -36,6 +39,7 @@ import (
"github.com/stretchr/testify/require"
semconv "go.opentelemetry.io/collector/semconv/v1.21.0"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config"
Expand Down Expand Up @@ -74,10 +78,17 @@ type testingOpAMPServer struct {
addr string
supervisorConnected chan bool
sendToSupervisor func(*protobufs.ServerToAgent)
start func()
shutdown func()
}

func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer {
s := newUnstartedOpAMPServer(t, connectingCallback, callbacks)
s.start()
return s
}

func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer {
var agentConn atomic.Value
var isAgentConnected atomic.Bool
var didShutdown atomic.Bool
Expand Down Expand Up @@ -108,7 +119,7 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca
require.NoError(t, err)
mux := http.NewServeMux()
mux.HandleFunc("/v1/opamp", handler)
httpSrv := httptest.NewServer(mux)
httpSrv := httptest.NewUnstartedServer(mux)

shutdown := func() {
if !didShutdown.Load() {
Expand All @@ -135,6 +146,7 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca
addr: httpSrv.Listener.Addr().String(),
supervisorConnected: connectedChan,
sendToSupervisor: send,
start: httpSrv.Start,
shutdown: shutdown,
}
}
Expand Down Expand Up @@ -238,6 +250,88 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) {
}, 10*time.Second, 500*time.Millisecond, "Log never appeared in output")
}

func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) {
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
storageDir := t.TempDir()
remoteConfigFilePath := filepath.Join(storageDir, "last_recv_remote_config.dat")

cfg, hash, healthcheckPort := createHealthCheckCollectorConf(t)
remoteConfigProto := &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
}
marshalledRemoteConfig, err := proto.Marshal(remoteConfigProto)
require.NoError(t, err)

require.NoError(t, os.WriteFile(remoteConfigFilePath, marshalledRemoteConfig, 0600))

connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
OnConnectedFunc: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
})
defer server.shutdown()

s := newSupervisor(t, "basic", map[string]string{
"url": server.addr,
"storage_dir": storageDir,
})
defer s.Shutdown()

// Verify the collector runs eventually by pinging the healthcheck extension
require.Eventually(t, func() bool {
resp, err := http.DefaultClient.Get(fmt.Sprintf("http://localhost:%d", healthcheckPort))
if err != nil {
t.Logf("Failed healthcheck: %s", err)
return false
}
require.NoError(t, resp.Body.Close())
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
t.Logf("Got non-2xx status code: %d", resp.StatusCode)
return false
}
return true
}, 3*time.Second, 100*time.Millisecond)

// Start the server and wait for the supervisor to connect
server.start()

// Verify supervisor connects to server
waitForSupervisorConnection(server.supervisorConnected, true)

require.True(t, connected.Load(), "Supervisor failed to connect")
}

func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) {
connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
OnConnectedFunc: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
})
defer server.shutdown()

// The supervisor is started without a running OpAMP server.
// The supervisor should start successfully, even if the OpAMP server is stopped.
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
s := newSupervisor(t, "basic", map[string]string{
"url": server.addr,
})
defer s.Shutdown()

// Start the server and wait for the supervisor to connect
server.start()

// Verify supervisor connects to server
waitForSupervisorConnection(server.supervisorConnected, true)

require.True(t, connected.Load(), "Supervisor failed to connect")
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved

}

func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) {
var healthReport atomic.Value
var agentConfig atomic.Value
Expand Down Expand Up @@ -639,6 +733,29 @@ func createBadCollectorConf(t *testing.T) (*bytes.Buffer, []byte) {
return bytes.NewBuffer(colCfg), h.Sum(nil)
}

func createHealthCheckCollectorConf(t *testing.T) (cfg *bytes.Buffer, hash []byte, remotePort int) {
colCfgTpl, err := os.ReadFile(path.Join("testdata", "collector", "healthcheck_config.yaml"))
require.NoError(t, err)

templ, err := template.New("").Parse(string(colCfgTpl))
require.NoError(t, err)

port, err := findRandomPort()

var confmapBuf bytes.Buffer
err = templ.Execute(
&confmapBuf,
map[string]string{
"HealthCheckEndpoint": fmt.Sprintf("localhost:%d", port),
},
)
require.NoError(t, err)

h := sha256.Sum256(confmapBuf.Bytes())

return &confmapBuf, h[:], port
}

// Wait for the Supervisor to connect to or disconnect from the OpAMP server
func waitForSupervisorConnection(connection chan bool, connected bool) {
select {
Expand Down Expand Up @@ -1012,3 +1129,21 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) {

require.Equal(t, newID, uuid.UUID(newRecievedAgentID))
}

func findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

if err != nil {
return 0, err
}

port := l.Addr().(*net.TCPAddr).Port

err = l.Close()

if err != nil {
return 0, err
}

return port, nil
}
30 changes: 9 additions & 21 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ type Supervisor struct {
agentStartHealthCheckAttempts int
agentRestarting atomic.Bool

connectedToOpAMPServer chan struct{}

// The OpAMP server to communicate with the Collector's OpAMP extension
opampServer server.OpAMPServer
opampServerPort int
Expand All @@ -145,7 +143,6 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
hasNewConfig: make(chan struct{}, 1),
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
connectedToOpAMPServer: make(chan struct{}),
effectiveConfig: &atomic.Value{},
agentDescription: &atomic.Value{},
doneChan: make(chan struct{}),
Expand Down Expand Up @@ -189,7 +186,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
logger.Debug("Supervisor starting",
zap.String("id", s.persistentState.InstanceID.String()))

err = s.loadInitialMergedConfig()
err = s.loadAndWriteInitialMergedConfig()
if err != nil {
return nil, fmt.Errorf("failed loading initial config: %w", err)
}
Expand All @@ -198,10 +195,6 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
return nil, fmt.Errorf("cannot start OpAMP client: %w", err)
}

if connErr := s.waitForOpAMPConnection(); connErr != nil {
return nil, fmt.Errorf("failed to connect to the OpAMP server: %w", connErr)
}

s.commander, err = commander.NewCommander(
s.logger,
s.config.Agent,
Expand Down Expand Up @@ -403,7 +396,6 @@ func (s *Supervisor) startOpAMPClient() error {
InstanceUid: types.InstanceUid(s.persistentState.InstanceID),
Callbacks: types.CallbacksStruct{
OnConnectFunc: func(_ context.Context) {
s.connectedToOpAMPServer <- struct{}{}
s.logger.Debug("Connected to the server.")
},
OnConnectFailedFunc: func(_ context.Context, err error) {
Expand Down Expand Up @@ -679,17 +671,7 @@ func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *prot
return err
}
}
return s.waitForOpAMPConnection()
}

func (s *Supervisor) waitForOpAMPConnection() error {
// wait for the OpAMP client to connect to the server or timeout
select {
case <-s.connectedToOpAMPServer:
return nil
case <-time.After(10 * time.Second):
return errors.New("timed out waiting for the server to connect")
}
return nil
}

func (s *Supervisor) composeBootstrapConfig() ([]byte, error) {
Expand Down Expand Up @@ -766,7 +748,7 @@ func (s *Supervisor) composeOpAMPExtensionConfig() []byte {
return cfg.Bytes()
}

func (s *Supervisor) loadInitialMergedConfig() error {
func (s *Supervisor) loadAndWriteInitialMergedConfig() error {
var lastRecvRemoteConfig, lastRecvOwnMetricsConfig []byte
var err error

Expand Down Expand Up @@ -809,6 +791,12 @@ func (s *Supervisor) loadInitialMergedConfig() error {
return fmt.Errorf("could not compose initial merged config: %w", err)
}

// write the initial merged config to disk
cfg := s.mergedConfig.Load().(string)
if err := os.WriteFile(agentConfigFilePath, []byte(cfg), 0600); err != nil {
s.logger.Error("Failed to write agent config.", zap.Error(err))
}

evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/opampsupervisor/supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ service:
exporters: [file]`

require.NoError(t, s.createTemplates())
require.NoError(t, s.loadInitialMergedConfig())
require.NoError(t, s.loadAndWriteInitialMergedConfig())

configChanged, err := s.composeMergedConfig(&protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
Expand Down
16 changes: 16 additions & 0 deletions cmd/opampsupervisor/testdata/collector/healthcheck_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
receivers:
nop:

exporters:
nop:

extensions:
health_check/livenesscheck:
endpoint: "{{ .HealthCheckEndpoint }}"

service:
extensions: [health_check/livenesscheck]
pipelines:
logs:
receivers: [nop]
exporters: [nop]