diff --git a/README.md b/README.md index a380e291..a4715c9d 100644 --- a/README.md +++ b/README.md @@ -416,11 +416,17 @@ The source can be used with: - [Raspberry Pi Cameras](https://www.raspberrypi.com/documentation/computers/camera_software.html) - any your own software +Pipe commands support two parameters (format: `exec:{command}#{param1}#{param2}`): + +- `killsignal` - signal which will be send to stop the process (numeric form) +- `killtimeout` - time in seconds for forced termination with sigkill + ```yaml streams: stream: exec:ffmpeg -re -i /media/BigBuckBunny.mp4 -c copy -rtsp_transport tcp -f rtsp {output} picam_h264: exec:libcamera-vid -t 0 --inline -o - picam_mjpeg: exec:libcamera-vid -t 0 --codec mjpeg -o - + canon: exec:gphoto2 --capture-movie --stdout#killsignal=2#killtimeout=5 ``` #### Source: Echo diff --git a/internal/exec/exec.go b/internal/exec/exec.go index 6bc5698a..aefab201 100644 --- a/internal/exec/exec.go +++ b/internal/exec/exec.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "errors" "fmt" + "net/url" "os" "os/exec" "strings" @@ -45,17 +46,19 @@ func Init() { log = app.GetLogger("exec") } -func execHandle(url string) (core.Producer, error) { +func execHandle(rawURL string) (core.Producer, error) { var path string - args := shell.QuoteSplit(url[5:]) // remove `exec:` + rawURL, rawQuery, _ := strings.Cut(rawURL, "#") + + args := shell.QuoteSplit(rawURL[5:]) // remove `exec:` for i, arg := range args { if arg == "{output}" { if rtsp.Port == "" { return nil, errors.New("rtsp module disabled") } - sum := md5.Sum([]byte(url)) + sum := md5.Sum([]byte(rawURL)) path = "/" + hex.EncodeToString(sum[:]) args[i] = "rtsp://127.0.0.1:" + rtsp.Port + path break @@ -68,14 +71,15 @@ func execHandle(url string) (core.Producer, error) { } if path == "" { - return handlePipe(url, cmd) + query := streams.ParseQuery(rawQuery) + return handlePipe(rawURL, cmd, query) } - return handleRTSP(url, path, cmd) + return handleRTSP(rawURL, path, cmd) } -func handlePipe(url string, cmd *exec.Cmd) (core.Producer, error) { - r, err := PipeCloser(cmd) +func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error) { + r, err := PipeCloser(cmd, query) if err != nil { return nil, err } @@ -145,6 +149,8 @@ func handleRTSP(url, path string, cmd *exec.Cmd) (core.Producer, error) { // internal -var log zerolog.Logger -var waiters = map[string]chan core.Producer{} -var waitersMu sync.Mutex +var ( + log zerolog.Logger + waiters = map[string]chan core.Producer{} + waitersMu sync.Mutex +) diff --git a/internal/exec/pipe.go b/internal/exec/pipe.go index a76f381c..12ea136b 100644 --- a/internal/exec/pipe.go +++ b/internal/exec/pipe.go @@ -2,29 +2,55 @@ package exec import ( "bufio" + "errors" "io" + "net/url" "os/exec" + "syscall" + "time" "github.com/AlexxIT/go2rtc/pkg/core" ) // PipeCloser - return StdoutPipe that Kill cmd on Close call -func PipeCloser(cmd *exec.Cmd) (io.ReadCloser, error) { +func PipeCloser(cmd *exec.Cmd, query url.Values) (io.ReadCloser, error) { stdout, err := cmd.StdoutPipe() if err != nil { return nil, err } // add buffer for pipe reader to reduce syscall - return pipeCloser{bufio.NewReaderSize(stdout, core.BufferSize), stdout, cmd}, nil + return &pipeCloser{bufio.NewReaderSize(stdout, core.BufferSize), stdout, cmd, query}, nil } type pipeCloser struct { io.Reader io.Closer - cmd *exec.Cmd + cmd *exec.Cmd + query url.Values } -func (p pipeCloser) Close() error { - return core.Any(p.Closer.Close(), p.cmd.Process.Kill(), p.cmd.Wait()) +func (p *pipeCloser) Close() error { + return errors.Join(p.Closer.Close(), p.Kill(), p.Wait()) +} + +func (p *pipeCloser) Kill() error { + if s := p.query.Get("killsignal"); s != "" { + log.Trace().Msgf("[exec] kill with custom sig=%s", s) + sig := syscall.Signal(core.Atoi(s)) + return p.cmd.Process.Signal(sig) + } + return p.cmd.Process.Kill() +} + +func (p *pipeCloser) Wait() error { + if s := p.query.Get("killtimeout"); s != "" { + timeout := time.Duration(core.Atoi(s)) * time.Second + timer := time.AfterFunc(timeout, func() { + log.Trace().Msgf("[exec] kill after timeout=%s", s) + _ = p.cmd.Process.Kill() + }) + defer timer.Stop() // stop timer if Wait ends before timeout + } + return p.cmd.Wait() } diff --git a/internal/streams/helpers.go b/internal/streams/helpers.go index e59dab77..2ead1aa3 100644 --- a/internal/streams/helpers.go +++ b/internal/streams/helpers.go @@ -6,6 +6,9 @@ import ( ) func ParseQuery(s string) url.Values { + if len(s) == 0 { + return nil + } params := url.Values{} for _, key := range strings.Split(s, "#") { var value string