Skip to content

Commit

Permalink
#125: support more process start and stop in the same time
Browse files Browse the repository at this point in the history
  • Loading branch information
stou committed Jan 26, 2019
1 parent c613a3f commit 707bf5d
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 42 deletions.
134 changes: 127 additions & 7 deletions ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"github.com/ochinchina/supervisord/config"
"github.com/ochinchina/supervisord/types"
"github.com/ochinchina/supervisord/xmlrpcclient"
"os"
"strings"
Expand All @@ -15,7 +16,35 @@ type CtlCommand struct {
Verbose bool `short:"v" long:"verbose" description:"Show verbose debug information"`
}

type StatusCommand struct {
}

type StartCommand struct {
}

type StopCommand struct {
}

type ShutdownCommand struct {
}

type ReloadCommand struct {
}

type PidCommand struct {
}

type SignalCommand struct {
}

var ctlCommand CtlCommand
var statusCommand StatusCommand
var startCommand StartCommand
var stopCommand StopCommand
var shutdownCommand ShutdownCommand
var reloadCommand ReloadCommand
var pidCommand PidCommand
var signalCommand SignalCommand

func (x *CtlCommand) getServerUrl() string {
options.Configuration, _ = findSupervisordConf()
Expand Down Expand Up @@ -67,14 +96,19 @@ func (x *CtlCommand) getPassword() string {
return ""
}

func (x *CtlCommand) createRpcClient() *xmlrpcclient.XmlRPCClient {
rpcc := xmlrpcclient.NewXmlRPCClient(x.getServerUrl(), x.Verbose)
rpcc.SetUser(x.getUser())
rpcc.SetPassword(x.getPassword())
return rpcc
}

func (x *CtlCommand) Execute(args []string) error {
if len(args) == 0 {
return nil
}

rpcc := xmlrpcclient.NewXmlRPCClient(x.getServerUrl(), x.Verbose)
rpcc.SetUser(x.getUser())
rpcc.SetPassword(x.getPassword())
rpcc := x.createRpcClient()
verb := args[0]

switch verb {
Expand Down Expand Up @@ -223,15 +257,36 @@ func (x *CtlCommand) getPid(rpcc *xmlrpcclient.XmlRPCClient, process string) {

func (x *CtlCommand) showProcessInfo(reply *xmlrpcclient.AllProcessInfoReply, processesMap map[string]bool) {
for _, pinfo := range reply.Value {
name := pinfo.Name
description := pinfo.Description
if strings.ToLower(description) == "<string></string>" {
description = ""
}
if len(processesMap) <= 0 || processesMap[name] {
fmt.Printf("%s%-33s%-10s%s%s\n", x.getANSIColor(pinfo.Statename), name, pinfo.Statename, description, "\x1b[0m")
if x.inProcessMap(&pinfo, processesMap) {
fmt.Printf("%s%-33s%-10s%s%s\n", x.getANSIColor(pinfo.Statename), pinfo.GetFullName(), pinfo.Statename, description, "\x1b[0m")
}
}
}

func (x *CtlCommand) inProcessMap(procInfo *types.ProcessInfo, processesMap map[string]bool) bool {
if len(processesMap) <= 0 {
return true
}
for procName, _ := range processesMap {
if procName == procInfo.Name || procName == procInfo.GetFullName() {
return true
}

// check the wildcast '*'
pos := strings.Index(procName, ":")
if pos != -1 {
groupName := procName[0:pos]
programName := procName[pos+1:]
if programName == "*" && groupName == procInfo.Group {
return true
}
}
}
return false
}

func (x *CtlCommand) getANSIColor(statename string) string {
Expand All @@ -247,9 +302,74 @@ func (x *CtlCommand) getANSIColor(statename string) string {
}
}

func (sc *StatusCommand) Execute(args []string) error {
ctlCommand.status(ctlCommand.createRpcClient(), args)
return nil
}

func (sc *StartCommand) Execute(args []string) error {
ctlCommand.startStopProcesses(ctlCommand.createRpcClient(), "start", args)
return nil
}

func (sc *StopCommand) Execute(args []string) error {
ctlCommand.startStopProcesses(ctlCommand.createRpcClient(), "stop", args)
return nil
}

func (sc *ShutdownCommand) Execute(args []string) error {
ctlCommand.shutdown(ctlCommand.createRpcClient())
return nil
}

func (rc *ReloadCommand) Execute(args []string) error {
ctlCommand.reload(ctlCommand.createRpcClient())
return nil
}

func (rc *SignalCommand) Execute(args []string) error {
sig_name, processes := args[0], args[1:]
ctlCommand.signal(ctlCommand.createRpcClient(), sig_name, processes)
return nil
}

func (pc *PidCommand) Execute(args []string) error {
ctlCommand.getPid(ctlCommand.createRpcClient(), args[0])
return nil
}

func init() {
parser.AddCommand("ctl",
ctlCmd, _ := parser.AddCommand("ctl",
"Control a running daemon",
"The ctl subcommand resembles supervisorctl command of original daemon.",
&ctlCommand)
ctlCmd.AddCommand("status",
"show program status",
"show all or some program status",
&statusCommand)
ctlCmd.AddCommand("start",
"start programs",
"start one or more programs",
&startCommand)
ctlCmd.AddCommand("stop",
"stop programs",
"stop one or more programs",
&stopCommand)
ctlCmd.AddCommand("shutdown",
"shutdown supervisord",
"shutdown supervisord",
&shutdownCommand)
ctlCmd.AddCommand("reload",
"reload the programs",
"reload the programs",
&reloadCommand)
ctlCmd.AddCommand("signal",
"send signal to program",
"send signal to program",
&signalCommand)
ctlCmd.AddCommand("pid",
"get the pid of specified program",
"get the pid of specified program",
&pidCommand)

}
43 changes: 31 additions & 12 deletions process/process_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package process

import (
"fmt"
"strings"
"sync"

Expand Down Expand Up @@ -90,21 +91,39 @@ func (pm *ProcessManager) Remove(name string) *Process {

// return process if found or nil if not found
func (pm *ProcessManager) Find(name string) *Process {
pm.lock.Lock()
defer pm.lock.Unlock()
proc, ok := pm.procs[name]
if ok {
log.Debug("succeed to find process:", name)
} else {
//remove group field if it is included
if pos := strings.Index(name, ":"); pos != -1 {
proc, ok = pm.procs[name[pos+1:]]
procs := pm.FindMatch(name)
if len(procs) == 1 {
if procs[0].GetName() == name || name == fmt.Sprintf("%s:%s", procs[0].GetGroup(), procs[0].GetName()) {
return procs[0]
}
if !ok {
log.Info("fail to find process:", name)
}
return nil
}

func (pm *ProcessManager) FindMatch(name string) []*Process {
result := make([]*Process, 0)
if pos := strings.Index(name, ":"); pos != -1 {
groupName := name[0:pos]
programName := name[pos+1:]
pm.ForEachProcess(func(p *Process) {
if p.GetGroup() == groupName {
if programName == "*" || programName == p.GetName() {
result = append(result, p)
}
}
})
} else {
pm.lock.Lock()
defer pm.lock.Unlock()
proc, ok := pm.procs[name]
if ok {
result = append(result, proc)
}
}
return proc
if len(result) <= 0 {
log.Info("fail to find process:", name)
}
return result
}

// clear all the processes
Expand Down
24 changes: 15 additions & 9 deletions supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,14 @@ func (s *Supervisor) GetProcessInfo(r *http.Request, args *struct{ Name string }
}

func (s *Supervisor) StartProcess(r *http.Request, args *StartProcessArgs, reply *struct{ Success bool }) error {
proc := s.procMgr.Find(args.Name)
procs := s.procMgr.FindMatch(args.Name)

if proc == nil {
if len(procs) <= 0 {
return fmt.Errorf("fail to find process %s", args.Name)
}
proc.Start(args.Wait)
for _, proc := range procs {
proc.Start(args.Wait)
}
reply.Success = true
return nil
}
Expand Down Expand Up @@ -264,11 +266,13 @@ func (s *Supervisor) StartProcessGroup(r *http.Request, args *StartProcessArgs,

func (s *Supervisor) StopProcess(r *http.Request, args *StartProcessArgs, reply *struct{ Success bool }) error {
log.WithFields(log.Fields{"program": args.Name}).Info("stop process")
proc := s.procMgr.Find(args.Name)
if proc == nil {
procs := s.procMgr.FindMatch(args.Name)
if len(procs) <= 0 {
return fmt.Errorf("fail to find process %s", args.Name)
}
proc.Stop(args.Wait)
for _, proc := range procs {
proc.Stop(args.Wait)
}
reply.Success = true
return nil
}
Expand Down Expand Up @@ -316,14 +320,16 @@ func (s *Supervisor) StopAllProcesses(r *http.Request, args *struct {
}

func (s *Supervisor) SignalProcess(r *http.Request, args *types.ProcessSignal, reply *struct{ Success bool }) error {
proc := s.procMgr.Find(args.Name)
if proc == nil {
procs := s.procMgr.FindMatch(args.Name)
if len(procs) <= 0 {
reply.Success = false
return fmt.Errorf("No process named %s", args.Name)
}
sig, err := signals.ToSignal(args.Signal)
if err == nil {
proc.Signal(sig, false)
for _, proc := range procs {
proc.Signal(sig, false)
}
}
reply.Success = true
return nil
Expand Down
40 changes: 26 additions & 14 deletions types/comm-types.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package types

import (
"fmt"
)

type ProcessInfo struct {
Name string `xml:"name" json:"name"`
Group string `xml:"group" json:"group"`
Description string `xml:"description" json:"description"`
Start int `xml:"start" json:"start"`
Stop int `xml:"stop" json:"stop"`
Now int `xml:"now" json:"now"`
State int `xml:"state" json:"state"`
Statename string `xml:"statename" json:"statename"`
Spawnerr string `xml:"spawnerr" json:"spawnerr"`
Exitstatus int `xml:"exitstatus" json:"exitstatus"`
Logfile string `xml:"logfile" json:"logfile"`
Stdout_logfile string `xml:"stdout_logfile" json:"stdout_logfile"`
Stderr_logfile string `xml:"stderr_logfile" json:"stderr_logfile"`
Pid int `xml:"pid" json:"pid"`
Name string `xml:"name" json:"name"`
Group string `xml:"group" json:"group"`
Description string `xml:"description" json:"description"`
Start int `xml:"start" json:"start"`
Stop int `xml:"stop" json:"stop"`
Now int `xml:"now" json:"now"`
State int `xml:"state" json:"state"`
Statename string `xml:"statename" json:"statename"`
Spawnerr string `xml:"spawnerr" json:"spawnerr"`
Exitstatus int `xml:"exitstatus" json:"exitstatus"`
Logfile string `xml:"logfile" json:"logfile"`
Stdout_logfile string `xml:"stdout_logfile" json:"stdout_logfile"`
Stderr_logfile string `xml:"stderr_logfile" json:"stderr_logfile"`
Pid int `xml:"pid" json:"pid"`
}

type ReloadConfigResult struct {
Expand All @@ -31,3 +35,11 @@ type ProcessSignal struct {
type BooleanReply struct {
Success bool
}

func (pi ProcessInfo) GetFullName() string {
if len(pi.Group) > 0 {
return fmt.Sprintf("%s:%s", pi.Group, pi.Name)
} else {
return pi.Name
}
}

0 comments on commit 707bf5d

Please sign in to comment.