diff --git a/e2core/backend/satbackend/exec/exec.go b/e2core/backend/satbackend/exec/exec.go index c34fd0a4..a303d2c1 100644 --- a/e2core/backend/satbackend/exec/exec.go +++ b/e2core/backend/satbackend/exec/exec.go @@ -11,9 +11,11 @@ import ( "github.com/pkg/errors" ) +type WaitFunc func() error + // Run runs a command, outputting to terminal and returning the full output and/or error // a channel is returned which, when sent on, will terminate the process that was started -func Run(cmd []string, env ...string) (string, context.CancelCauseFunc, error) { +func Run(cmd []string, env ...string) (string, context.CancelCauseFunc, WaitFunc, error) { procUUID := uuid.New().String() uuidEnv := fmt.Sprintf("%s_UUID=%s", strings.ToUpper(cmd[0]), procUUID) env = append(env, uuidEnv) @@ -31,10 +33,10 @@ func Run(cmd []string, env ...string) (string, context.CancelCauseFunc, error) { err := command.Start() if err != nil { - return "", nil, errors.Wrap(err, "command.Start()") + return "", nil, nil, errors.Wrap(err, "command.Start()") } - return procUUID, cxl, nil + return procUUID, cxl, command.Wait, nil } // this is unused but we may want to do logging-to-speficig-directory some time in the diff --git a/e2core/backend/satbackend/orchestrator.go b/e2core/backend/satbackend/orchestrator.go index ddfc5eac..938c95e8 100644 --- a/e2core/backend/satbackend/orchestrator.go +++ b/e2core/backend/satbackend/orchestrator.go @@ -126,18 +126,25 @@ func (o *Orchestrator) reconcileConstellation(syncer *syncer.Syncer) { satWatcher := o.sats[module.FQMN] - launch := func() { - ll.Debug().Str("moduleFQMN", module.FQMN).Msg("launching sat") + satWatcher.deadListLock.Lock() + for deadPort := range satWatcher.deadList { + _ = satWatcher.terminateInstance(deadPort) + } + satWatcher.deadList = make(map[string]struct{}) + satWatcher.deadListLock.Unlock() + launch := func() { cmd, port := modStartCommand(module) + ll.Debug().Str("moduleFQMN", module.FQMN).Str("port", port).Msg("launching sat") + connectionsEnv := "" if module.Namespace == "default" { connectionsEnv = string(defaultConnectionsJSON) } // repeat forever in case the command does error out - processUUID, cxl, err := exec.Run( + processUUID, cxl, wait, err := exec.Run( cmd, "SAT_HTTP_PORT="+port, "SAT_CONTROL_PLANE="+o.opts.ControlPlane, @@ -148,6 +155,20 @@ func (o *Orchestrator) reconcileConstellation(syncer *syncer.Syncer) { return } + go func() { + err := wait() + if err != nil { + ll.Err(err).Str("moduleFQMN", module.FQMN).Str("port", port).Msg("calling waitfunc for the module failed") + } + + err = satWatcher.addToDead(port) + if err != nil { + ll.Err(err).Str("moduleFQMN", module.FQMN).Str("port", port).Msg("adding the port to the dead list") + } + + ll.Info().Str("moduleFQMN", module.FQMN).Str("port", port).Msg("added port to dead list") + }() + satWatcher.add(module.FQMN, port, processUUID, cxl) ll.Debug().Str("moduleFQMN", module.FQMN).Str("port", port).Msg("successfully started sat") diff --git a/e2core/backend/satbackend/watcher.go b/e2core/backend/satbackend/watcher.go index ac6dc991..10293ebb 100644 --- a/e2core/backend/satbackend/watcher.go +++ b/e2core/backend/satbackend/watcher.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "sync" "time" "github.com/pkg/errors" @@ -28,9 +29,12 @@ type MetricsResponse struct { // watcher watches a "replicaSet" of Sats for a single FQMN type watcher struct { - fqmn string - instances map[string]*instance - log zerolog.Logger + fqmn string + instances map[string]*instance + log zerolog.Logger + deadList map[string]struct{} + deadListLock sync.RWMutex + instancesRunning sync.WaitGroup } type instance struct { @@ -49,14 +53,38 @@ type watcherReport struct { // newWatcher creates a new watcher instance for the given fqmn func newWatcher(fqmn string, log zerolog.Logger) *watcher { return &watcher{ - fqmn: fqmn, - instances: map[string]*instance{}, - log: log.With().Str("module", "watcher").Logger(), + fqmn: fqmn, + instances: map[string]*instance{}, + log: log.With().Str("module", "watcher").Logger(), + deadList: make(map[string]struct{}), + deadListLock: sync.RWMutex{}, + instancesRunning: sync.WaitGroup{}, } } +func (w *watcher) addToDead(port string) error { + w.deadListLock.Lock() + defer w.deadListLock.Unlock() + + _, ok := w.deadList[port] + if ok { + return errors.New("port is already in the dead list") + } + + w.deadList[port] = struct{}{} + return nil +} + // add inserts a new instance to the watched pool. func (w *watcher) add(fqmn, port, uuid string, cxl context.CancelCauseFunc) { + w.log.Info().Str("port", port).Str("fqmn", fqmn).Msg("adding one to the waitgroup port") + w.instancesRunning.Add(1) + + i, ok := w.instances[port] + if ok { + w.log.Error().Str("port", port).Str("fqmn", fqmn).Str("uuid", uuid).Any("existing", i).Msg("omething already exists on this port") + } + w.instances[port] = &instance{ fqmn: fqmn, uuid: uuid, @@ -73,6 +101,9 @@ func (w *watcher) scaleDown() { ll.Debug().Str("fqmn", w.instances[p].fqmn).Str("port", p).Msg("scaling down, terminating instance") w.instances[p].cxl(errScaleDown) + delete(w.instances, p) + // same as wg.Done() + w.instancesRunning.Add(-1) break } @@ -86,7 +117,10 @@ func (w *watcher) terminate() { instance.cxl(errTerminateAll) delete(w.instances, p) + w.instancesRunning.Add(-1) } + + w.instancesRunning.Wait() } // terminateInstance terminates the instance from the given port @@ -99,6 +133,8 @@ func (w *watcher) terminateInstance(p string) error { inst.cxl(errTerminateOne) delete(w.instances, p) + w.log.Info().Str("port", p).Msg("removing one delta from the wait group") + w.instancesRunning.Add(-1) return nil }