Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(e2core): Deal with dead processes #429

Merged
merged 4 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions e2core/backend/satbackend/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"github.com/pkg/errors"
)

type WaitFunc func() error

// Run runs a command, outputting to terminal and returning the full output and/or error
// a channel is returned which, when sent on, will terminate the process that was started
func Run(cmd []string, env ...string) (string, context.CancelCauseFunc, error) {
func Run(cmd []string, env ...string) (string, context.CancelCauseFunc, WaitFunc, error) {
procUUID := uuid.New().String()
uuidEnv := fmt.Sprintf("%s_UUID=%s", strings.ToUpper(cmd[0]), procUUID)
env = append(env, uuidEnv)
Expand All @@ -31,10 +33,10 @@ func Run(cmd []string, env ...string) (string, context.CancelCauseFunc, error) {

err := command.Start()
if err != nil {
return "", nil, errors.Wrap(err, "command.Start()")
return "", nil, nil, errors.Wrap(err, "command.Start()")
}

return procUUID, cxl, nil
return procUUID, cxl, command.Wait, nil
}

// this is unused but we may want to do logging-to-speficig-directory some time in the
Expand Down
30 changes: 27 additions & 3 deletions e2core/backend/satbackend/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,28 @@ func (o *Orchestrator) reconcileConstellation(syncer *syncer.Syncer) {

satWatcher := o.sats[module.FQMN]

launch := func() {
ll.Debug().Str("moduleFQMN", module.FQMN).Msg("launching sat")
ll.Info().Msg("starting the range over the things in the died channel")

satWatcher.diedListLock.Lock()
for diedPort := range satWatcher.diedList {
_ = satWatcher.terminateInstance(diedPort)
}
satWatcher.diedList = make(map[string]struct{})
satWatcher.diedListLock.Unlock()
ll.Info().Msg("no more things, continuing reconciliation")

launch := func() {
cmd, port := modStartCommand(module)

ll.Debug().Str("moduleFQMN", module.FQMN).Str("port", port).Msg("launching sat")

connectionsEnv := ""
if module.Namespace == "default" {
connectionsEnv = string(defaultConnectionsJSON)
}

// repeat forever in case the command does error out
processUUID, cxl, err := exec.Run(
processUUID, cxl, wait, err := exec.Run(
cmd,
"SAT_HTTP_PORT="+port,
"SAT_CONTROL_PLANE="+o.opts.ControlPlane,
Expand All @@ -148,6 +158,20 @@ func (o *Orchestrator) reconcileConstellation(syncer *syncer.Syncer) {
return
}

go func() {
err := wait()
if err != nil {
ll.Err(err).Str("moduleFQMN", module.FQMN).Str("port", port).Msg("calling waitfunc for the module failed")
}

err = satWatcher.addDied(port)
if err != nil {
ll.Err(err).Str("moduleFQMN", module.FQMN).Str("port", port).Msg("adding the port to the died list thing")
javorszky marked this conversation as resolved.
Show resolved Hide resolved
}

ll.Info().Str("moduleFQMN", module.FQMN).Str("port", port).Msg("sent died message into channel")
}()

satWatcher.add(module.FQMN, port, processUUID, cxl)

ll.Debug().Str("moduleFQMN", module.FQMN).Str("port", port).Msg("successfully started sat")
Expand Down
48 changes: 42 additions & 6 deletions e2core/backend/satbackend/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"sync"
"time"

"github.com/pkg/errors"
Expand All @@ -28,9 +29,12 @@ type MetricsResponse struct {

// watcher watches a "replicaSet" of Sats for a single FQMN
type watcher struct {
fqmn string
instances map[string]*instance
log zerolog.Logger
fqmn string
instances map[string]*instance
log zerolog.Logger
diedList map[string]struct{}
diedListLock sync.RWMutex
instancesRunning sync.WaitGroup
}

type instance struct {
Expand All @@ -49,14 +53,38 @@ type watcherReport struct {
// newWatcher creates a new watcher instance for the given fqmn
func newWatcher(fqmn string, log zerolog.Logger) *watcher {
return &watcher{
fqmn: fqmn,
instances: map[string]*instance{},
log: log.With().Str("module", "watcher").Logger(),
fqmn: fqmn,
instances: map[string]*instance{},
log: log.With().Str("module", "watcher").Logger(),
diedList: make(map[string]struct{}),
diedListLock: sync.RWMutex{},
instancesRunning: sync.WaitGroup{},
}
}

func (w *watcher) addDied(port string) error {
w.diedListLock.Lock()
defer w.diedListLock.Unlock()

_, ok := w.diedList[port]
if ok {
return errors.New("port is already in the died list")
}

w.diedList[port] = struct{}{}
return nil
}

// add inserts a new instance to the watched pool.
func (w *watcher) add(fqmn, port, uuid string, cxl context.CancelCauseFunc) {
w.log.Info().Str("port", port).Str("fqmn", fqmn).Msg("adding one to the waitgroup port")
w.instancesRunning.Add(1)

i, ok := w.instances[port]
if ok {
w.log.Error().Str("port", port).Str("fqmn", fqmn).Str("uuid", uuid).Any("exising", i).Msg("!!!! Something already exists on this port !!!!")
callahad marked this conversation as resolved.
Show resolved Hide resolved
}

w.instances[port] = &instance{
fqmn: fqmn,
uuid: uuid,
Expand All @@ -73,6 +101,9 @@ func (w *watcher) scaleDown() {
ll.Debug().Str("fqmn", w.instances[p].fqmn).Str("port", p).Msg("scaling down, terminating instance")

w.instances[p].cxl(errScaleDown)
delete(w.instances, p)
// same as wg.Done()
w.instancesRunning.Add(-1)

break
}
Expand All @@ -86,7 +117,10 @@ func (w *watcher) terminate() {
instance.cxl(errTerminateAll)

delete(w.instances, p)
w.instancesRunning.Add(-1)
}

w.instancesRunning.Wait()
cohix marked this conversation as resolved.
Show resolved Hide resolved
}

// terminateInstance terminates the instance from the given port
Expand All @@ -99,6 +133,8 @@ func (w *watcher) terminateInstance(p string) error {
inst.cxl(errTerminateOne)

delete(w.instances, p)
w.log.Info().Str("port", p).Msg("removing one delta from the wait group")
w.instancesRunning.Add(-1)

return nil
}
Expand Down