Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perform graceful termination of process compose #5

Merged
merged 1 commit into from
Jun 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions process-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ processes:
# condition: process_completed_successfully
environment:
- 'EXIT_CODE=0'
shutdown:
command: "sleep 2 && pkill -f process1"
signal: 15
timeout_seconds: 4

_process2:
command: "./test_loop.bash process2"
Expand All @@ -33,9 +37,9 @@ processes:
- 'PRINT_ERR=111'
- 'EXIT_CODE=2'
shutdown:
command: "pkill -f process2"
command: "sleep 2 && pkill -f process2"
signal: 15
timeout_seconds: 2
timeout_seconds: 4

process3:
command: "./test_loop.bash process3"
Expand Down
11 changes: 6 additions & 5 deletions src/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ const (
)

const (
ProcessStateDisabled = "Disabled"
ProcessStatePending = "Pending"
ProcessStateRunning = "Running"
ProcessStateRestarting = "Restarting"
ProcessStateCompleted = "Completed"
ProcessStateDisabled = "Disabled"
ProcessStatePending = "Pending"
ProcessStateRunning = "Running"
ProcessStateRestarting = "Restarting"
ProcessStateTerminating = "Terminating"
ProcessStateCompleted = "Completed"
)

type RestartPolicyConfig struct {
Expand Down
18 changes: 17 additions & 1 deletion src/app/pc_string.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
package app

import "strings"
import (
"fmt"
"strings"
"time"
)

func isStringDefined(str string) bool {
return len(strings.TrimSpace(str)) > 0
}

func durationToString(dur time.Duration) string {
if dur.Minutes() < 3 {
return dur.Round(time.Second).String()
} else if dur.Minutes() < 60 {
return fmt.Sprintf("%.0fm", dur.Minutes())
} else if dur.Hours() < 24 {
return fmt.Sprintf("%dh%dm", int(dur.Hours()), int(dur.Minutes())%60)
} else {
return fmt.Sprintf("%dh", int(dur.Hours()))
}
}
101 changes: 63 additions & 38 deletions src/app/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Process struct {
procConf ProcessConfig
procState *ProcessState
sync.Mutex
stateMtx sync.Mutex
procCond sync.Cond
procColor func(a ...interface{}) string
noColor func(a ...interface{}) string
Expand Down Expand Up @@ -67,18 +68,24 @@ func NewProcess(
return proc
}

func (p *Process) Run() error {
func (p *Process) run() error {
if p.isState(ProcessStateTerminating) {
return nil
}
for {
p.cmd = exec.Command(getRunnerShell(), getRunnerArg(), p.getCommand())
p.cmd.Env = p.getProcessEnvironment()
p.setProcArgs()
stdout, _ := p.cmd.StdoutPipe()
stderr, _ := p.cmd.StderrPipe()
go p.handleOutput(stdout, p.handleInfo)
go p.handleOutput(stderr, p.handleError)
p.cmd.Start()
starter := func() error {
p.cmd = exec.Command(getRunnerShell(), getRunnerArg(), p.getCommand())
p.cmd.Env = p.getProcessEnvironment()
p.setProcArgs()
stdout, _ := p.cmd.StdoutPipe()
stderr, _ := p.cmd.StderrPipe()
go p.handleOutput(stdout, p.handleInfo)
go p.handleOutput(stderr, p.handleError)
return p.cmd.Start()
}
p.setStateAndRun(ProcessStateRunning, starter)

p.startTime = time.Now()
p.procState.Status = ProcessStateRunning
p.procState.Pid = p.cmd.Process.Pid

//Wait should wait for I/O consumption, but if the execution is too fast
Expand All @@ -94,7 +101,7 @@ func (p *Process) Run() error {
if !p.isRestartable(p.procState.ExitCode) {
break
}
p.procState.Status = ProcessStateRestarting
p.setState(ProcessStateRestarting)
p.procState.Restarts += 1
log.Info().Msgf("Restarting %s in %v second(s)... Restarts: %d",
p.procConf.Name, p.getBackoff().Seconds(), p.procState.Restarts)
Expand All @@ -115,7 +122,7 @@ func (p *Process) getBackoff() time.Duration {

func (p *Process) getProcessEnvironment() []string {
env := []string{
"PC_PROC_NAME=" + p.GetName(),
"PC_PROC_NAME=" + p.getName(),
"PC_REPLICA_NUM=" + strconv.Itoa(p.replica),
}
env = append(env, os.Environ()...)
Expand Down Expand Up @@ -147,7 +154,7 @@ func (p *Process) isRestartable(exitCode int) bool {
return false
}

func (p *Process) WaitForCompletion(waitee string) int {
func (p *Process) waitForCompletion(waitee string) int {
p.Lock()
defer p.Unlock()

Expand All @@ -157,12 +164,20 @@ func (p *Process) WaitForCompletion(waitee string) int {
return p.procState.ExitCode
}

func (p *Process) WontRun() {
func (p *Process) wontRun() {
p.onProcessEnd()

}

// perform gracefull process shutdown if defined in configuration
func (p *Process) shutDown() error {
if !p.isState(ProcessStateRunning) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is messing my use-case up.
With the pg_ctl start example, the process goes to completed state, since it exits with 0 status code.
In this case the shutdown command not run.
For the sake of simplicity, I think we can ask users to provide an idempotently runnable / non-failing command like:

pg_ctl stop || true

and run it regardless of process state.

log.Debug().Msgf("process %s is in state %s not shutting down", p.getName(), p.procState.Status)
// prevent pending process from running
p.setState(ProcessStateTerminating)
return nil
}
p.setState(ProcessStateTerminating)
if isStringDefined(p.procConf.ShutDownParams.ShutDownCommand) {
return p.doConfiguredStop(p.procConf.ShutDownParams)
}
Expand All @@ -174,7 +189,7 @@ func (p *Process) doConfiguredStop(params ShutDownParams) error {
if timeout == 0 {
timeout = DEFAULT_SHUTDOWN_TIMEOUT_SEC
}
log.Debug().Msgf("killing %s with timeout %d ...", p.GetName(), timeout)
log.Debug().Msgf("terminating %s with timeout %d ...", p.getName(), timeout)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
defer cancel()

Expand All @@ -183,29 +198,34 @@ func (p *Process) doConfiguredStop(params ShutDownParams) error {

if err := cmd.Run(); err != nil {
// the process termination timedout and it will be killed
log.Error().Msgf("killing %s with timeout %d failed", p.GetName(), timeout)
log.Error().Msgf("terminating %s with timeout %d failed - %s", p.getName(), timeout, err.Error())
return p.stop(int(syscall.SIGKILL))
}
return nil
}

func (p *Process) prepareForShutDown() {
// prevent restart during global shutdown
p.procConf.RestartPolicy.Restart = RestartPolicyNo
}

func (p *Process) onProcessEnd() {
if isStringDefined(p.procConf.LogLocation) {
p.logger.Close()
}
p.procState.Status = ProcessStateCompleted
p.setState(ProcessStateCompleted)

p.Lock()
p.done = true
p.Unlock()
p.procCond.Broadcast()
}

func (p *Process) GetName() string {
func (p *Process) getName() string {
return p.procConf.Name
}

func (p *Process) GetNameWithReplica() string {
func (p *Process) getNameWithReplica() string {
return fmt.Sprintf("%s_%d", p.procConf.Name, p.replica)
}

Expand All @@ -214,24 +234,12 @@ func (p *Process) getCommand() string {
}

func (p *Process) updateProcState() {
if p.procState.Status == ProcessStateRunning {
if p.isState(ProcessStateRunning) {
dur := time.Since(p.startTime)
p.procState.SystemTime = durationToString(dur)
}
}

func durationToString(dur time.Duration) string {
if dur.Minutes() < 3 {
return dur.Round(time.Second).String()
} else if dur.Minutes() < 60 {
return fmt.Sprintf("%.0fm", dur.Minutes())
} else if dur.Hours() < 24 {
return fmt.Sprintf("%dh%dm", int(dur.Hours()), int(dur.Minutes())%60)
} else {
return fmt.Sprintf("%dh", int(dur.Hours()))
}
}

func (p *Process) handleOutput(pipe io.ReadCloser,
handler func(message string)) {
outscanner := bufio.NewScanner(pipe)
Expand All @@ -242,17 +250,36 @@ func (p *Process) handleOutput(pipe io.ReadCloser,
}

func (p *Process) handleInfo(message string) {
p.logger.Info(message, p.GetName(), p.replica)
fmt.Printf("[%s]\t%s\n", p.procColor(p.GetNameWithReplica()), message)
p.logger.Info(message, p.getName(), p.replica)
fmt.Printf("[%s]\t%s\n", p.procColor(p.getNameWithReplica()), message)
p.logBuffer.Write(message)
}

func (p *Process) handleError(message string) {
p.logger.Error(message, p.GetName(), p.replica)
fmt.Printf("[%s]\t%s\n", p.procColor(p.GetNameWithReplica()), p.redColor(message))
p.logger.Error(message, p.getName(), p.replica)
fmt.Printf("[%s]\t%s\n", p.procColor(p.getNameWithReplica()), p.redColor(message))
p.logBuffer.Write(message)
}

func (p *Process) isState(state string) bool {
p.stateMtx.Lock()
defer p.stateMtx.Unlock()
return p.procState.Status == state
}

func (p *Process) setState(state string) {
p.stateMtx.Lock()
defer p.stateMtx.Unlock()
p.procState.Status = state
}

func (p *Process) setStateAndRun(state string, runnable func() error) error {
p.stateMtx.Lock()
defer p.stateMtx.Unlock()
p.procState.Status = state
return runnable()
}

func getRunnerShell() string {
shell, ok := os.LookupEnv("SHELL")
if !ok {
Expand All @@ -261,8 +288,6 @@ func getRunnerShell() string {
} else {
shell = "bash"
}
} else {
return shell
}
return shell
}
Expand Down
3 changes: 3 additions & 0 deletions src/app/process_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ const (
)

func (p *Process) stop(sig int) error {
if p.cmd == nil {
return nil
}
if sig < min_sig || sig > max_sig {
sig = int(syscall.SIGTERM)
}
Expand Down
26 changes: 19 additions & 7 deletions src/app/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ func (p *Project) runProcess(proc ProcessConfig) {
p.addRunningProcess(process)
p.wg.Add(1)
go func() {
defer p.removeRunningProcess(process.GetName())
defer p.removeRunningProcess(process.getName())
defer p.wg.Done()
if err := p.waitIfNeeded(process.procConf); err != nil {
log.Error().Msgf("Error: %s", err.Error())
log.Error().Msgf("Error: process %s won't run", process.GetName())
process.WontRun()
log.Error().Msgf("Error: process %s won't run", process.getName())
process.wontRun()
} else {
process.Run()
process.run()
}
}()
}
Expand All @@ -85,10 +85,10 @@ func (p *Project) waitIfNeeded(process ProcessConfig) error {

switch process.DependsOn[k].Condition {
case ProcessConditionCompleted:
runningProc.WaitForCompletion(process.Name)
runningProc.waitForCompletion(process.Name)
case ProcessConditionCompletedSuccessfully:
log.Info().Msgf("%s is waiting for %s to complete successfully", process.Name, k)
exitCode := runningProc.WaitForCompletion(process.Name)
exitCode := runningProc.waitForCompletion(process.Name)
if exitCode != 0 {
return fmt.Errorf("process %s depended on %s to complete successfully, but it exited with status %d",
process.Name, k, exitCode)
Expand Down Expand Up @@ -141,7 +141,7 @@ func (p *Project) GetProcessState(name string) *ProcessState {

func (p *Project) addRunningProcess(process *Process) {
p.mapMutex.Lock()
p.runningProcesses[process.GetName()] = process
p.runningProcesses[process.getName()] = process
p.mapMutex.Unlock()
}

Expand Down Expand Up @@ -186,6 +186,18 @@ func (p *Project) StopProcess(name string) error {
return nil
}

func (p *Project) ShutDownProject() {
p.mapMutex.Lock()
runProc := p.runningProcesses
p.mapMutex.Unlock()
for _, proc := range runProc {
proc.prepareForShutDown()
}
for _, proc := range runProc {
proc.shutDown()
}
}

func (p *Project) getProcessLog(name string) (*pclog.ProcessLogBuffer, error) {
if procLogs, ok := p.processLogs[name]; ok {
return procLogs, nil
Expand Down
18 changes: 17 additions & 1 deletion src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"flag"
"fmt"
"net/http"
"os/signal"
"syscall"
"time"

"os"
Expand Down Expand Up @@ -58,6 +60,18 @@ func quiet() func() {
}
}

func runHeadless(project *app.Project) {
cancelChan := make(chan os.Signal, 1)
// catch SIGETRM or SIGINTERRUPT
signal.Notify(cancelChan, syscall.SIGTERM, syscall.SIGINT)
go func() {
project.Run()
}()
sig := <-cancelChan
log.Info().Msgf("Caught %v - Shutting down the running processes...", sig)
project.ShutDownProject()
}

func main() {
fileName := ""
port := 8080
Expand Down Expand Up @@ -107,7 +121,9 @@ func main() {
go project.Run()
tui.SetupTui(version, project.LogLength)
} else {
project.Run()
runHeadless(project)
}

log.Info().Msg("Thank you for using proccess-compose")

}
Loading