Skip to content

Commit

Permalink
fix program startretries
Browse files Browse the repository at this point in the history
  • Loading branch information
ochinchina committed Sep 23, 2018
1 parent 248cb24 commit 026fb4b
Showing 1 changed file with 116 additions and 73 deletions.
189 changes: 116 additions & 73 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
)
Expand Down Expand Up @@ -63,7 +64,7 @@ type Process struct {
inStart bool
//true if the process is stopped by user
stopByUser bool
retryTimes int
retryTimes *int32
lock sync.RWMutex
stdin io.WriteCloser
StdoutLog logger.Logger
Expand All @@ -79,15 +80,9 @@ func NewProcess(supervisor_id string, config *config.ConfigEntry) *Process {
state: STOPPED,
inStart: false,
stopByUser: false,
retryTimes: 0}
retryTimes: new(int32)}
proc.config = config
proc.cmd = nil

//start the process if autostart is set to true
//if proc.isAutoStart() {
// proc.Start(false)
//}

return proc
}

Expand All @@ -112,7 +107,6 @@ func (p *Process) Start(wait bool) {
}

go func() {
p.retryTimes = 0

for {
if wait {
Expand All @@ -125,11 +119,6 @@ func (p *Process) Start(wait bool) {
runCond.Signal()
}
})
if (p.stopTime.Unix() - p.startTime.Unix()) < int64(p.getStartSeconds()) {
p.retryTimes++
} else {
p.retryTimes = 0
}
if p.stopByUser {
log.WithFields(log.Fields{"program": p.GetName()}).Info("Stopped by user, don't start it again")
break
Expand All @@ -138,10 +127,6 @@ func (p *Process) Start(wait bool) {
log.WithFields(log.Fields{"program": p.GetName()}).Info("Don't start the stopped program because its autorestart flag is false")
break
}
if p.retryTimes >= p.getStartRetries() {
log.WithFields(log.Fields{"program": p.GetName()}).Info("Don't start the stopped program because its retry times ", p.retryTimes, " is greater than start retries ", p.getStartRetries())
break
}
}
p.lock.Lock()
p.inStart = false
Expand Down Expand Up @@ -255,8 +240,8 @@ func (p *Process) getStartSeconds() int {
return p.config.GetInt("startsecs", 1)
}

func (p *Process) getStartRetries() int {
return p.config.GetInt("startretries", 3)
func (p *Process) getStartRetries() int32 {
return int32(p.config.GetInt("startretries", 3))
}

func (p *Process) isAutoStart() bool {
Expand Down Expand Up @@ -335,94 +320,152 @@ func (p *Process) getExitCodes() []int {
return result
}

func (p *Process) run(finishCb func()) {
args, err := parseCommand(p.config.GetStringExpression("command", ""))

if err != nil {
log.Error("the command is empty string")
finishCb()
return
}
p.lock.Lock()
// check if the process is running or not
//
func (p *Process) isRunning() bool {
if p.cmd != nil && p.cmd.ProcessState != nil {
status := p.cmd.ProcessState.Sys().(syscall.WaitStatus)
if status.Continued() {
log.WithFields(log.Fields{"program": p.GetName()}).Info("Don't start program because it is running")
p.lock.Unlock()
finishCb()
return
return true
}
}
return false
}

// create Command object for the program
func (p *Process) createProgramCommand() error {
args, err := parseCommand(p.config.GetStringExpression("command", ""))

if err != nil {
return err
}
p.cmd = exec.Command(args[0])
if len(args) > 1 {
p.cmd.Args = args
}
p.cmd.SysProcAttr = &syscall.SysProcAttr{}
if p.setUser() != nil {
log.WithFields(log.Fields{"user": p.config.GetString("user", "")}).Error("fail to run as user")
p.lock.Unlock()
finishCb()
return
return fmt.Errorf("fail to set user")
}
set_deathsig(p.cmd.SysProcAttr)
p.setEnv()
p.setDir()
p.setLog()

p.stdin, _ = p.cmd.StdinPipe()
p.startTime = time.Now()
p.changeStateTo(STARTING)
err = p.cmd.Start()
return nil

}

// wait for the started program exit
func (p *Process) waitForExit(startSecs int64) {
err := p.cmd.Wait()
if err != nil {
log.WithFields(log.Fields{"program": p.GetName()}).Errorf("fail to start program with error:%v", err)
log.WithFields(log.Fields{"program": p.GetName()}).Info("fail to wait for program exit")
} else if p.cmd.ProcessState != nil {
log.WithFields(log.Fields{"program": p.GetName()}).Infof("program stopped with status:%v", p.cmd.ProcessState)
} else {
log.WithFields(log.Fields{"program": p.GetName()}).Info("program stopped")
}

p.lock.Lock()
defer p.lock.Unlock()

p.stopTime = time.Now()
if p.stopTime.Unix()-p.startTime.Unix() < startSecs {
p.changeStateTo(BACKOFF)
} else {
p.changeStateTo(EXITED)
}
}

// fail to start the program
func (p *Process) failToStartProgram(reason string, finishCb func()) {
log.WithFields(log.Fields{"program": p.GetName()}).Errorf(reason)
p.changeStateTo(FATAL)
finishCb()
}

func (p *Process) monitorProgramIsRunning(endTime time.Time, finishCb func(), monitorExited *int32) {
for time.Now().Before(endTime) {
time.Sleep(time.Duration(100) * time.Millisecond)
if atomic.LoadInt32(p.retryTimes) >= p.getStartRetries() {
break
}
}
atomic.StoreInt32(monitorExited, 1)

p.lock.Lock()
defer p.lock.Unlock()
if p.state == BACKOFF {
p.changeStateTo(FATAL)
p.stopTime = time.Now()
p.lock.Unlock()
finishCb()
} else {
p.changeStateTo(RUNNING)
}
}

func (p *Process) run(finishCb func()) {
p.lock.Lock()
defer p.lock.Unlock()

// check if the program is in running state
if p.isRunning() {
log.WithFields(log.Fields{"program": p.GetName()}).Info("Don't start program because it is running")
finishCb()
return

}

p.startTime = time.Now()
p.changeStateTo(STARTING)
atomic.StoreInt32(p.retryTimes, 0)
startSecs := int64(p.config.GetInt("startsecs", 1))
endTime := time.Now().Add(time.Duration(startSecs) * time.Second)
monitorExited := int32(0)
for {
if atomic.LoadInt32(p.retryTimes) >= p.getStartRetries() {
p.failToStartProgram(fmt.Sprintf("fail to start program because retry times is greater than %d", p.getStartRetries()), finishCb)
break
}
atomic.AddInt32(p.retryTimes, 1)
err := p.createProgramCommand()
if err != nil {
p.failToStartProgram("fail to create program", finishCb)
break
}

err = p.cmd.Start()

if err != nil {
p.failToStartProgram(fmt.Sprintf("fail to start program with error:%v", err), finishCb)
break
}
if p.StdoutLog != nil {
p.StdoutLog.SetPid(p.cmd.Process.Pid)
}
if p.StderrLog != nil {
p.StderrLog.SetPid(p.cmd.Process.Pid)
}
log.WithFields(log.Fields{"program": p.GetName()}).Info("success to start program")
startSecs := p.config.GetInt("startsecs", 1)
//Set startsec to 0 to indicate that the program needn't stay
//running for any particular amount of time.
if startSecs <= 0 {
p.changeStateTo(RUNNING)

} else {
p.lock.Unlock()
time.Sleep(time.Duration(startSecs) * time.Second)
p.lock.Lock()
if tmpProc, err := os.FindProcess(p.cmd.Process.Pid); err == nil && tmpProc != nil {
p.changeStateTo(RUNNING)
}
} else if atomic.LoadInt32(p.retryTimes) == 1 { // only start monitor for first try
go p.monitorProgramIsRunning(endTime, finishCb, &monitorExited)
}
p.lock.Unlock()
log.WithFields(log.Fields{"program": p.GetName()}).Debug("wait program exit")
finishCb()
err = p.cmd.Wait()
if err == nil {
if p.cmd.ProcessState != nil {
log.WithFields(log.Fields{"program": p.GetName()}).Infof("program stopped with status:%v", p.cmd.ProcessState)
} else {
log.WithFields(log.Fields{"program": p.GetName()}).Info("program stopped")
}
} else {
log.WithFields(log.Fields{"program": p.GetName()}).Errorf("program stopped with error:%v", err)
}

p.lock.Unlock()
p.waitForExit(startSecs)
p.lock.Lock()
p.stopTime = time.Now()
if p.stopTime.Unix()-p.startTime.Unix() < int64(startSecs) {
p.changeStateTo(BACKOFF)
} else {
p.changeStateTo(EXITED)
}
// wait for monitor thread exit
for {
if atomic.LoadInt32(&monitorExited) != 0 {
break
}
p.lock.Unlock()
time.Sleep(time.Duration(100) * time.Millisecond)
}

}
Expand All @@ -432,11 +475,11 @@ func (p *Process) changeStateTo(procState ProcessState) {
progName := p.config.GetProgramName()
groupName := p.config.GetGroupName()
if procState == STARTING {
events.EmitEvent(events.CreateProcessStartingEvent(progName, groupName, p.state.String(), p.retryTimes))
events.EmitEvent(events.CreateProcessStartingEvent(progName, groupName, p.state.String(), int(atomic.LoadInt32(p.retryTimes))))
} else if procState == RUNNING {
events.EmitEvent(events.CreateProcessRunningEvent(progName, groupName, p.state.String(), p.cmd.Process.Pid))
} else if procState == BACKOFF {
events.EmitEvent(events.CreateProcessBackoffEvent(progName, groupName, p.state.String(), p.retryTimes))
events.EmitEvent(events.CreateProcessBackoffEvent(progName, groupName, p.state.String(), int(atomic.LoadInt32(p.retryTimes))))
} else if procState == STOPPING {
events.EmitEvent(events.CreateProcessStoppingEvent(progName, groupName, p.state.String(), p.cmd.Process.Pid))
} else if procState == EXITED {
Expand Down

0 comments on commit 026fb4b

Please sign in to comment.