diff --git a/process-compose.yaml b/process-compose.yaml index 81c103c..83d75f2 100644 --- a/process-compose.yaml +++ b/process-compose.yaml @@ -19,6 +19,10 @@ processes: # condition: process_completed_successfully environment: - 'EXIT_CODE=0' + shutdown: + command: "sleep 2 && pkill -f process1" + signal: 15 + timeout_seconds: 4 _process2: command: "./test_loop.bash process2" @@ -33,9 +37,9 @@ processes: - 'PRINT_ERR=111' - 'EXIT_CODE=2' shutdown: - command: "pkill -f process2" + command: "sleep 2 && pkill -f process2" signal: 15 - timeout_seconds: 2 + timeout_seconds: 4 process3: command: "./test_loop.bash process3" diff --git a/src/app/config.go b/src/app/config.go index c30806f..c7ebbf8 100644 --- a/src/app/config.go +++ b/src/app/config.go @@ -62,11 +62,12 @@ const ( ) const ( - ProcessStateDisabled = "Disabled" - ProcessStatePending = "Pending" - ProcessStateRunning = "Running" - ProcessStateRestarting = "Restarting" - ProcessStateCompleted = "Completed" + ProcessStateDisabled = "Disabled" + ProcessStatePending = "Pending" + ProcessStateRunning = "Running" + ProcessStateRestarting = "Restarting" + ProcessStateTerminating = "Terminating" + ProcessStateCompleted = "Completed" ) type RestartPolicyConfig struct { diff --git a/src/app/pc_string.go b/src/app/pc_string.go index 28d486b..a7735a5 100644 --- a/src/app/pc_string.go +++ b/src/app/pc_string.go @@ -1,7 +1,23 @@ package app -import "strings" +import ( + "fmt" + "strings" + "time" +) func isStringDefined(str string) bool { return len(strings.TrimSpace(str)) > 0 } + +func durationToString(dur time.Duration) string { + if dur.Minutes() < 3 { + return dur.Round(time.Second).String() + } else if dur.Minutes() < 60 { + return fmt.Sprintf("%.0fm", dur.Minutes()) + } else if dur.Hours() < 24 { + return fmt.Sprintf("%dh%dm", int(dur.Hours()), int(dur.Minutes())%60) + } else { + return fmt.Sprintf("%dh", int(dur.Hours())) + } +} diff --git a/src/app/process.go b/src/app/process.go index d793a1f..852830a 100644 --- a/src/app/process.go +++ b/src/app/process.go @@ -29,6 +29,7 @@ type Process struct { procConf ProcessConfig procState *ProcessState sync.Mutex + stateMtx sync.Mutex procCond sync.Cond procColor func(a ...interface{}) string noColor func(a ...interface{}) string @@ -67,18 +68,24 @@ func NewProcess( return proc } -func (p *Process) Run() error { +func (p *Process) run() error { + if p.isState(ProcessStateTerminating) { + return nil + } for { - p.cmd = exec.Command(getRunnerShell(), getRunnerArg(), p.getCommand()) - p.cmd.Env = p.getProcessEnvironment() - p.setProcArgs() - stdout, _ := p.cmd.StdoutPipe() - stderr, _ := p.cmd.StderrPipe() - go p.handleOutput(stdout, p.handleInfo) - go p.handleOutput(stderr, p.handleError) - p.cmd.Start() + starter := func() error { + p.cmd = exec.Command(getRunnerShell(), getRunnerArg(), p.getCommand()) + p.cmd.Env = p.getProcessEnvironment() + p.setProcArgs() + stdout, _ := p.cmd.StdoutPipe() + stderr, _ := p.cmd.StderrPipe() + go p.handleOutput(stdout, p.handleInfo) + go p.handleOutput(stderr, p.handleError) + return p.cmd.Start() + } + p.setStateAndRun(ProcessStateRunning, starter) + p.startTime = time.Now() - p.procState.Status = ProcessStateRunning p.procState.Pid = p.cmd.Process.Pid //Wait should wait for I/O consumption, but if the execution is too fast @@ -94,7 +101,7 @@ func (p *Process) Run() error { if !p.isRestartable(p.procState.ExitCode) { break } - p.procState.Status = ProcessStateRestarting + p.setState(ProcessStateRestarting) p.procState.Restarts += 1 log.Info().Msgf("Restarting %s in %v second(s)... Restarts: %d", p.procConf.Name, p.getBackoff().Seconds(), p.procState.Restarts) @@ -115,7 +122,7 @@ func (p *Process) getBackoff() time.Duration { func (p *Process) getProcessEnvironment() []string { env := []string{ - "PC_PROC_NAME=" + p.GetName(), + "PC_PROC_NAME=" + p.getName(), "PC_REPLICA_NUM=" + strconv.Itoa(p.replica), } env = append(env, os.Environ()...) @@ -147,7 +154,7 @@ func (p *Process) isRestartable(exitCode int) bool { return false } -func (p *Process) WaitForCompletion(waitee string) int { +func (p *Process) waitForCompletion(waitee string) int { p.Lock() defer p.Unlock() @@ -157,12 +164,20 @@ func (p *Process) WaitForCompletion(waitee string) int { return p.procState.ExitCode } -func (p *Process) WontRun() { +func (p *Process) wontRun() { p.onProcessEnd() } +// perform gracefull process shutdown if defined in configuration func (p *Process) shutDown() error { + if !p.isState(ProcessStateRunning) { + log.Debug().Msgf("process %s is in state %s not shutting down", p.getName(), p.procState.Status) + // prevent pending process from running + p.setState(ProcessStateTerminating) + return nil + } + p.setState(ProcessStateTerminating) if isStringDefined(p.procConf.ShutDownParams.ShutDownCommand) { return p.doConfiguredStop(p.procConf.ShutDownParams) } @@ -174,7 +189,7 @@ func (p *Process) doConfiguredStop(params ShutDownParams) error { if timeout == 0 { timeout = DEFAULT_SHUTDOWN_TIMEOUT_SEC } - log.Debug().Msgf("killing %s with timeout %d ...", p.GetName(), timeout) + log.Debug().Msgf("terminating %s with timeout %d ...", p.getName(), timeout) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second) defer cancel() @@ -183,17 +198,22 @@ func (p *Process) doConfiguredStop(params ShutDownParams) error { if err := cmd.Run(); err != nil { // the process termination timedout and it will be killed - log.Error().Msgf("killing %s with timeout %d failed", p.GetName(), timeout) + log.Error().Msgf("terminating %s with timeout %d failed - %s", p.getName(), timeout, err.Error()) return p.stop(int(syscall.SIGKILL)) } return nil } +func (p *Process) prepareForShutDown() { + // prevent restart during global shutdown + p.procConf.RestartPolicy.Restart = RestartPolicyNo +} + func (p *Process) onProcessEnd() { if isStringDefined(p.procConf.LogLocation) { p.logger.Close() } - p.procState.Status = ProcessStateCompleted + p.setState(ProcessStateCompleted) p.Lock() p.done = true @@ -201,11 +221,11 @@ func (p *Process) onProcessEnd() { p.procCond.Broadcast() } -func (p *Process) GetName() string { +func (p *Process) getName() string { return p.procConf.Name } -func (p *Process) GetNameWithReplica() string { +func (p *Process) getNameWithReplica() string { return fmt.Sprintf("%s_%d", p.procConf.Name, p.replica) } @@ -214,24 +234,12 @@ func (p *Process) getCommand() string { } func (p *Process) updateProcState() { - if p.procState.Status == ProcessStateRunning { + if p.isState(ProcessStateRunning) { dur := time.Since(p.startTime) p.procState.SystemTime = durationToString(dur) } } -func durationToString(dur time.Duration) string { - if dur.Minutes() < 3 { - return dur.Round(time.Second).String() - } else if dur.Minutes() < 60 { - return fmt.Sprintf("%.0fm", dur.Minutes()) - } else if dur.Hours() < 24 { - return fmt.Sprintf("%dh%dm", int(dur.Hours()), int(dur.Minutes())%60) - } else { - return fmt.Sprintf("%dh", int(dur.Hours())) - } -} - func (p *Process) handleOutput(pipe io.ReadCloser, handler func(message string)) { outscanner := bufio.NewScanner(pipe) @@ -242,17 +250,36 @@ func (p *Process) handleOutput(pipe io.ReadCloser, } func (p *Process) handleInfo(message string) { - p.logger.Info(message, p.GetName(), p.replica) - fmt.Printf("[%s]\t%s\n", p.procColor(p.GetNameWithReplica()), message) + p.logger.Info(message, p.getName(), p.replica) + fmt.Printf("[%s]\t%s\n", p.procColor(p.getNameWithReplica()), message) p.logBuffer.Write(message) } func (p *Process) handleError(message string) { - p.logger.Error(message, p.GetName(), p.replica) - fmt.Printf("[%s]\t%s\n", p.procColor(p.GetNameWithReplica()), p.redColor(message)) + p.logger.Error(message, p.getName(), p.replica) + fmt.Printf("[%s]\t%s\n", p.procColor(p.getNameWithReplica()), p.redColor(message)) p.logBuffer.Write(message) } +func (p *Process) isState(state string) bool { + p.stateMtx.Lock() + defer p.stateMtx.Unlock() + return p.procState.Status == state +} + +func (p *Process) setState(state string) { + p.stateMtx.Lock() + defer p.stateMtx.Unlock() + p.procState.Status = state +} + +func (p *Process) setStateAndRun(state string, runnable func() error) error { + p.stateMtx.Lock() + defer p.stateMtx.Unlock() + p.procState.Status = state + return runnable() +} + func getRunnerShell() string { shell, ok := os.LookupEnv("SHELL") if !ok { @@ -261,8 +288,6 @@ func getRunnerShell() string { } else { shell = "bash" } - } else { - return shell } return shell } diff --git a/src/app/process_unix.go b/src/app/process_unix.go index 71fe9fe..b209595 100644 --- a/src/app/process_unix.go +++ b/src/app/process_unix.go @@ -12,6 +12,9 @@ const ( ) func (p *Process) stop(sig int) error { + if p.cmd == nil { + return nil + } if sig < min_sig || sig > max_sig { sig = int(syscall.SIGTERM) } diff --git a/src/app/project.go b/src/app/project.go index 8a9a73b..1a09aa3 100644 --- a/src/app/project.go +++ b/src/app/project.go @@ -67,14 +67,14 @@ func (p *Project) runProcess(proc ProcessConfig) { p.addRunningProcess(process) p.wg.Add(1) go func() { - defer p.removeRunningProcess(process.GetName()) + defer p.removeRunningProcess(process.getName()) defer p.wg.Done() if err := p.waitIfNeeded(process.procConf); err != nil { log.Error().Msgf("Error: %s", err.Error()) - log.Error().Msgf("Error: process %s won't run", process.GetName()) - process.WontRun() + log.Error().Msgf("Error: process %s won't run", process.getName()) + process.wontRun() } else { - process.Run() + process.run() } }() } @@ -85,10 +85,10 @@ func (p *Project) waitIfNeeded(process ProcessConfig) error { switch process.DependsOn[k].Condition { case ProcessConditionCompleted: - runningProc.WaitForCompletion(process.Name) + runningProc.waitForCompletion(process.Name) case ProcessConditionCompletedSuccessfully: log.Info().Msgf("%s is waiting for %s to complete successfully", process.Name, k) - exitCode := runningProc.WaitForCompletion(process.Name) + exitCode := runningProc.waitForCompletion(process.Name) if exitCode != 0 { return fmt.Errorf("process %s depended on %s to complete successfully, but it exited with status %d", process.Name, k, exitCode) @@ -141,7 +141,7 @@ func (p *Project) GetProcessState(name string) *ProcessState { func (p *Project) addRunningProcess(process *Process) { p.mapMutex.Lock() - p.runningProcesses[process.GetName()] = process + p.runningProcesses[process.getName()] = process p.mapMutex.Unlock() } @@ -186,6 +186,18 @@ func (p *Project) StopProcess(name string) error { return nil } +func (p *Project) ShutDownProject() { + p.mapMutex.Lock() + runProc := p.runningProcesses + p.mapMutex.Unlock() + for _, proc := range runProc { + proc.prepareForShutDown() + } + for _, proc := range runProc { + proc.shutDown() + } +} + func (p *Project) getProcessLog(name string) (*pclog.ProcessLogBuffer, error) { if procLogs, ok := p.processLogs[name]; ok { return procLogs, nil diff --git a/src/main.go b/src/main.go index 9ca9459..71beef4 100644 --- a/src/main.go +++ b/src/main.go @@ -4,6 +4,8 @@ import ( "flag" "fmt" "net/http" + "os/signal" + "syscall" "time" "os" @@ -58,6 +60,18 @@ func quiet() func() { } } +func runHeadless(project *app.Project) { + cancelChan := make(chan os.Signal, 1) + // catch SIGETRM or SIGINTERRUPT + signal.Notify(cancelChan, syscall.SIGTERM, syscall.SIGINT) + go func() { + project.Run() + }() + sig := <-cancelChan + log.Info().Msgf("Caught %v - Shutting down the running processes...", sig) + project.ShutDownProject() +} + func main() { fileName := "" port := 8080 @@ -107,7 +121,9 @@ func main() { go project.Run() tui.SetupTui(version, project.LogLength) } else { - project.Run() + runHeadless(project) } + log.Info().Msg("Thank you for using proccess-compose") + } diff --git a/src/tui/view.go b/src/tui/view.go index 2e887ff..6b938f1 100644 --- a/src/tui/view.go +++ b/src/tui/view.go @@ -38,40 +38,7 @@ func newPcView(version string, logLength int) *pcView { pv.procTable = pv.createProcTable() pv.statTable = pv.createStatTable() pv.updateHelpTextView() - pv.appView.SetRoot(pv.createGrid(), true).EnableMouse(true). - SetInputCapture(func(event *tcell.EventKey) *tcell.EventKey { - switch event.Key() { - case tcell.KeyF10: - - m := tview.NewModal(). - SetText("Are you sure you want to quit?"). - AddButtons([]string{"Quit", "Cancel"}). - SetDoneFunc(func(buttonIndex int, buttonLabel string) { - if buttonLabel == "Quit" { - pv.appView.Stop() - } else { - pv.appView.SetRoot(pv.createGrid(), true) - } - }) - // Display and focus the dialog - pv.appView.SetRoot(m, false) - - case tcell.KeyF5: - pv.logFollow = !pv.logFollow - name := pv.getSelectedProcName() - if pv.logFollow { - pv.followLog(name) - go pv.updateLogs() - } else { - pv.unFollowLog() - } - pv.updateHelpTextView() - case tcell.KeyF6: - pv.logsText.ToggleWrap() - pv.updateHelpTextView() - } - return event - }) + pv.appView.SetRoot(pv.createGrid(), true).EnableMouse(true).SetInputCapture(pv.onAppKey) if len(pv.procNames) > 0 { name := pv.procNames[0] pv.logsText.SetTitle(name) @@ -80,6 +47,59 @@ func newPcView(version string, logLength int) *pcView { return pv } +func (pv *pcView) onAppKey(event *tcell.EventKey) *tcell.EventKey { + switch event.Key() { + case tcell.KeyF10: + pv.terminateAppView() + case tcell.KeyF5: + pv.logFollow = !pv.logFollow + name := pv.getSelectedProcName() + if pv.logFollow { + pv.followLog(name) + go pv.updateLogs() + } else { + pv.unFollowLog() + } + pv.updateHelpTextView() + case tcell.KeyF6: + pv.logsText.ToggleWrap() + pv.updateHelpTextView() + case tcell.KeyCtrlC: + pv.terminateAppView() + default: + return event + } + return nil +} + +func (pv *pcView) terminateAppView() { + + m := tview.NewModal(). + SetText("Are you sure you want to quit?\nThis will terminate all the running processes."). + AddButtons([]string{"Quit", "Cancel"}). + SetDoneFunc(func(buttonIndex int, buttonLabel string) { + if buttonLabel == "Quit" { + go pv.handleShutDown() + } + pv.appView.SetRoot(pv.createGrid(), true) + + }) + // Display and focus the dialog + pv.appView.SetRoot(m, false) +} + +func (pv *pcView) handleShutDown() { + pv.statTable.SetCell(0, 2, tview.NewTableCell("Shutting Down..."). + SetSelectable(false). + SetAlign(tview.AlignRight). + SetExpansion(0). + SetTextColor(tcell.ColorWhite). + SetBackgroundColor(tcell.ColorRed)) + app.PROJ.ShutDownProject() + pv.appView.Stop() + +} + func (pv *pcView) fillTableData() { if app.PROJ == nil { return @@ -176,19 +196,20 @@ func (pv *pcView) createStatTable() *tview.Table { table := tview.NewTable().SetBorders(false).SetSelectable(false, false) table.SetCell(0, 0, tview.NewTableCell("Version:").SetSelectable(false).SetTextColor(tcell.ColorYellow)) - table.SetCell(0, 1, tview.NewTableCell(pv.version).SetSelectable(false)) + table.SetCell(0, 1, tview.NewTableCell(pv.version).SetSelectable(false).SetExpansion(1)) table.SetCell(1, 0, tview.NewTableCell("Hostname:").SetSelectable(false).SetTextColor(tcell.ColorYellow)) hostname, err := os.Hostname() if err != nil { hostname = err.Error() } - table.SetCell(1, 1, tview.NewTableCell(hostname).SetSelectable(false)) + table.SetCell(1, 1, tview.NewTableCell(hostname).SetSelectable(false).SetExpansion(1)) table.SetCell(2, 0, tview.NewTableCell("Processes:").SetSelectable(false).SetTextColor(tcell.ColorYellow)) - table.SetCell(2, 1, tview.NewTableCell(strconv.Itoa(len(pv.procNames))).SetSelectable(false)) + table.SetCell(2, 1, tview.NewTableCell(strconv.Itoa(len(pv.procNames))).SetSelectable(false).SetExpansion(1)) + table.SetCell(0, 2, tview.NewTableCell("").SetSelectable(false)) - table.SetCell(0, 2, tview.NewTableCell("🔥 Process Compose"). + table.SetCell(0, 3, tview.NewTableCell("🔥 Process Compose"). SetSelectable(false). SetAlign(tview.AlignRight). SetExpansion(1).