-
Notifications
You must be signed in to change notification settings - Fork 25
/
proc.go
126 lines (98 loc) · 2.47 KB
/
proc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright 2021 The KCL Authors. All rights reserved.
package kclvm_runtime
import (
"bytes"
"errors"
"io"
"net/rpc"
"os/exec"
"github.com/chai2010/protorpc"
)
type _Process struct {
busy bool
cmd *exec.Cmd
stdin io.WriteCloser
stdout io.ReadCloser
stderr *limitBuffer
c *rpc.Client
done chan error
}
// 创建新的进程, 可能失败
func createProcess(exe string, arg ...string) (p *_Process, err error) {
p = new(_Process)
p.cmd = exec.Command(exe, arg...)
p.stdin, err = p.cmd.StdinPipe()
if err != nil {
return nil, err
}
p.stdout, err = p.cmd.StdoutPipe()
if err != nil {
return nil, err
}
p.stderr = newLimitBuffer(10 * 1024)
p.cmd.Stderr = p.stderr
// 启动进程
if err := p.cmd.Start(); err != nil {
return nil, err
}
// 等待退出结果(2个缓存, 对应 Wait 和 Kill 返回值)
p.done = make(chan error, 2)
go func() {
p.done <- p.cmd.Wait()
}()
// NewXxxServiceClient 会独占 信道(只能选择1个), 多个客户端需要手工构建 client
conn := &procReadWriteCloser{proc: p, r: p.stdout, w: p.stdin}
p.c = rpc.NewClientWithCodec(protorpc.NewClientCodec(conn))
return p, nil
}
func (p *_Process) IsExited() bool { return len(p.done) > 0 }
func (p *_Process) IsFree() bool { return !p.IsExited() && !p.busy }
func (p *_Process) SetFree() { p.busy = false }
func (p *_Process) SetBusy() { p.busy = true }
func (p *_Process) GetClient() *rpc.Client { return p.c }
func (p *_Process) GetStderr() io.Reader { return io.LimitReader(p.stderr, int64(p.stderr.cap)) }
func (p *_Process) Kill() error {
if p.IsExited() {
return nil
}
err := p.cmd.Process.Kill()
p.done <- err
return err
}
type procReadWriteCloser struct {
proc *_Process
r io.ReadCloser
w io.WriteCloser
}
func (p *procReadWriteCloser) Read(data []byte) (n int, err error) {
return p.r.Read(data)
}
func (p *procReadWriteCloser) Write(data []byte) (n int, err error) {
return p.w.Write(data)
}
func (p *procReadWriteCloser) Close() error {
return p.proc.Kill()
}
type limitBuffer struct {
buf bytes.Buffer
cap int
}
func newLimitBuffer(cap int) *limitBuffer {
return &limitBuffer{cap: cap}
}
func (b *limitBuffer) Write(p []byte) (n int, err error) {
n = b.cap - b.buf.Len()
if n > 0 {
b.buf.Write(p[:n])
}
if n < len(p) {
err = errors.New("limitBuffer: overflow")
}
return n, err
}
func (b *limitBuffer) Read(p []byte) (n int, err error) {
return b.buf.Read(p)
}
func (b *limitBuffer) String() string {
return b.buf.String()
}