Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][cmd/opampsupervisor]: move supervisor start logic into separate Start() function #34509

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) {
})

s := newSupervisor(t, "basic", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -281,6 +283,8 @@ func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) {
"url": server.addr,
"storage_dir": storageDir,
})

require.Nil(t, s.Start())
defer s.Shutdown()

// Verify the collector runs eventually by pinging the healthcheck extension
Expand Down Expand Up @@ -332,6 +336,8 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) {
s := newSupervisor(t, "basic", map[string]string{
"url": server.addr,
})

require.Nil(t, s.Start())
defer s.Shutdown()

// Verify the collector is running by checking the metrics endpoint
Expand Down Expand Up @@ -416,6 +422,8 @@ func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) {
})

s := newSupervisor(t, "basic", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -492,6 +500,8 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) {
})

s := newSupervisor(t, "nocap", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -547,6 +557,8 @@ func TestSupervisorBootstrapsCollector(t *testing.T) {
})

s := newSupervisor(t, "nocap", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -594,6 +606,8 @@ func TestSupervisorReportsEffectiveConfig(t *testing.T) {
})

s := newSupervisor(t, "basic", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -701,6 +715,8 @@ func TestSupervisorAgentDescriptionConfigApplies(t *testing.T) {
})

s := newSupervisor(t, "agent_description", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -852,6 +868,8 @@ func TestSupervisorRestartCommand(t *testing.T) {
})

s := newSupervisor(t, "basic", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -918,6 +936,8 @@ func TestSupervisorOpAMPConnectionSettings(t *testing.T) {
server.ConnectionCallbacksStruct{})

s := newSupervisor(t, "accepts_conn", map[string]string{"url": initialServer.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(initialServer.supervisorConnected, true)
Expand Down Expand Up @@ -978,6 +998,8 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {

s := newSupervisor(t, "persistence", map[string]string{"url": initialServer.addr, "storage_dir": tempDir})

require.Nil(t, s.Start())

waitForSupervisorConnection(initialServer.supervisorConnected, true)

cfg, hash, _, _ := createSimplePipelineCollectorConf(t)
Expand Down Expand Up @@ -1020,6 +1042,8 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
defer newServer.shutdown()

s1 := newSupervisor(t, "persistence", map[string]string{"url": newServer.addr, "storage_dir": tempDir})

require.Nil(t, s1.Start())
defer s1.Shutdown()

waitForSupervisorConnection(newServer.supervisorConnected, true)
Expand Down Expand Up @@ -1066,6 +1090,8 @@ func TestSupervisorPersistsInstanceID(t *testing.T) {
"storage_dir": storageDir,
})

require.Nil(t, s.Start())

waitForSupervisorConnection(server.supervisorConnected, true)

t.Logf("Supervisor connected")
Expand Down Expand Up @@ -1095,6 +1121,8 @@ func TestSupervisorPersistsInstanceID(t *testing.T) {
"url": server.addr,
"storage_dir": storageDir,
})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -1148,6 +1176,8 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) {
"storage_dir": storageDir,
})

require.Nil(t, s.Start())

waitForSupervisorConnection(server.supervisorConnected, true)

t.Logf("Supervisor connected")
Expand Down Expand Up @@ -1175,6 +1205,8 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) {
"url": server.addr,
"storage_dir": storageDir,
})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -1206,6 +1238,8 @@ func TestSupervisorWritesAgentFilesToStorageDir(t *testing.T) {
"storage_dir": storageDir,
})

require.Nil(t, s.Start())

waitForSupervisorConnection(server.supervisorConnected, true)

t.Logf("Supervisor connected")
Expand Down
7 changes: 7 additions & 0 deletions cmd/opampsupervisor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ func main() {
return
}

err = supervisor.Start()
if err != nil {
logger.Error(err.Error())
os.Exit(-1)
return
}

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
Expand Down
20 changes: 12 additions & 8 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,34 +165,38 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
return nil, fmt.Errorf("error creating storage dir: %w", err)
}

return s, nil
}

func (s *Supervisor) Start() error {
var err error
s.persistentState, err = loadOrCreatePersistentState(s.persistentStateFilePath())
if err != nil {
return nil, err
return err
}

if err = s.getBootstrapInfo(); err != nil {
return nil, fmt.Errorf("could not get bootstrap info from the Collector: %w", err)
return fmt.Errorf("could not get bootstrap info from the Collector: %w", err)
}

healthCheckPort, err := s.findRandomPort()

if err != nil {
return nil, fmt.Errorf("could not find port for health check: %w", err)
return fmt.Errorf("could not find port for health check: %w", err)
}

s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", healthCheckPort)

logger.Debug("Supervisor starting",
s.logger.Debug("Supervisor starting",
zap.String("id", s.persistentState.InstanceID.String()))

err = s.loadAndWriteInitialMergedConfig()
if err != nil {
return nil, fmt.Errorf("failed loading initial config: %w", err)
return fmt.Errorf("failed loading initial config: %w", err)
}

if err = s.startOpAMP(); err != nil {
return nil, fmt.Errorf("cannot start OpAMP client: %w", err)
return fmt.Errorf("cannot start OpAMP client: %w", err)
}

s.commander, err = commander.NewCommander(
Expand All @@ -202,7 +206,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
"--config", s.agentConfigFilePath(),
)
if err != nil {
return nil, err
return err
}

s.startHealthCheckTicker()
Expand All @@ -219,7 +223,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
s.forwardCustomMessagesToServerLoop()
}()

return s, nil
return nil
}

func (s *Supervisor) createTemplates() error {
Expand Down
Loading