-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathrunner.go
121 lines (104 loc) · 2 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package main
import (
"bufio"
"bytes"
"io"
"os/exec"
"strings"
"sync"
)
func cmdWithInput(template []string, placeholder, input string) *exec.Cmd {
argv := make([]string, len(template))
for i := range argv {
argv[i] = strings.Replace(template[i], placeholder, input, -1)
}
return exec.Command(argv[0], argv[1:]...)
}
type Runner struct {
cmd *exec.Cmd
stdinbuf *bytes.Buffer
stdoutbuf *bytes.Buffer
wg *sync.WaitGroup
}
func NewRunner(template []string, placeholder, input string, stdin *bytes.Buffer) *Runner {
cmd := cmdWithInput(template, placeholder, input)
return &Runner{
cmd: cmd,
stdinbuf: stdin,
stdoutbuf: &bytes.Buffer{},
wg: &sync.WaitGroup{},
}
}
func (r *Runner) Run() (<-chan string, error) {
stdout, err := r.cmd.StdoutPipe()
if err != nil {
return nil, err
}
stderr, err := r.cmd.StderrPipe()
if err != nil {
stdout.Close()
return nil, err
}
r.wg.Add(2)
outch := r.streamOutput(stdout, r.wg)
errch := r.streamOutput(stderr, r.wg)
ch := make(chan string)
go func() {
for {
select {
case stdoutline, ok := <-outch:
if !ok {
outch = nil
break
}
r.stdoutbuf.WriteString(stdoutline)
ch <- stdoutline
case stderrline, ok := <-errch:
if !ok {
errch = nil
break
}
ch <- stderrline
}
if outch == nil && errch == nil {
close(ch)
return
}
}
}()
if r.stdinbuf.Len() != 0 {
r.cmd.Stdin = bytes.NewBuffer(r.stdinbuf.Bytes())
}
err = r.cmd.Start()
if err != nil {
return nil, err
}
return ch, nil
}
func (r *Runner) streamOutput(rc io.ReadCloser, wg *sync.WaitGroup) <-chan string {
ch := make(chan string)
reader := bufio.NewReader(rc)
go func() {
for {
line, err := reader.ReadBytes('\n')
if s := string(line); s != "" {
ch <- s
}
if err != nil {
break
}
}
rc.Close()
close(ch)
wg.Done()
}()
return ch
}
func (r *Runner) KillWait() {
r.cmd.Process.Kill()
r.Wait()
}
func (r *Runner) Wait() {
r.wg.Wait()
r.cmd.Wait()
}