-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool.go
142 lines (127 loc) · 3.44 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package main
import (
"context"
"fmt"
"io"
"math"
"os"
"os/exec"
)
var (
executeCmd = initExecuteCmd()
)
func initExecuteCmd() func(context.Context, string) *exec.Cmd {
shellPath := os.Getenv("SHELL")
// If we can't find the current shell, we'll try to lookup the shell paths
supportedShells := []string{"bash", "sh", "zsh"}
if shellPath == "" {
for _, sh := range supportedShells {
path, err := exec.LookPath(sh)
if err == nil {
shellPath = path
}
}
}
if shellPath == "" {
panic("failed to find a shell")
}
return func(ctx context.Context, cmd string) *exec.Cmd {
return exec.CommandContext(ctx, shellPath, "-c", cmd)
}
}
// Worker executes given node in a separate goroutine.
type Worker struct {
ctx context.Context
id uint64
// Stdout is used to redirect the output from the shell command stdout
Stdout io.Writer
// Stderr is used to redirect the output from the shell command stderr.
// If nil, Stdout will be instead.
Stderr io.Writer
Env []string
}
// Execute executes given job from n. Worker will execute steps from the given job
// in sequential order. If any of the steps fails, Execute will return early
// Environment variables will be set appropriate before the shell command runs.
// There are 2 kinds of environment variables: builtin and user-space.
// Following are available builtin environment variables:
// - GOTOPUS_JOB_ID
// - GOTOPUS_JOB_NAME
// - GOTOPUS_STEP_NAME
// - GOTOPUS_WORKER_ID
//
// User-space environment variables are given from the config
func (w *Worker) Execute(n *Node) error {
if w.Stdout == nil {
return fmt.Errorf("Stdout is required to be not nil")
}
if w.Stderr == nil {
w.Stderr = w.Stdout
}
jobEnv := make(Env)
jobEnv.SetBuiltin("JOB_ID", n.ID)
jobEnv.SetBuiltin("JOB_NAME", n.Job.Name)
jobEnvEncoded := append(w.Env, jobEnv.Encode()...)
for _, step := range n.Job.Steps {
stepEnv := make(Env)
stepEnv.SetBuiltin("WORKER_ID", w.id)
stepEnv.SetBuiltin("STEP_NAME", step.Name)
for k, v := range step.Env {
stepEnv.Set(k, v)
}
cmd := executeCmd(w.ctx, step.Run)
cmd.Env = append(jobEnvEncoded, stepEnv.Encode()...)
cmd.Stdout = w.Stdout
cmd.Stderr = w.Stderr
if err := cmd.Run(); err != nil {
return err
}
}
return nil
}
// PoolJob represents a job unit that can be submitted to a Pool.
type PoolJob func(Worker)
// PoolStart starts a pool of workers in different goroutines lazily with
// maxWorkers as the limit. The caller can submit jobs by using the function
// from the return value.
// For example:
// submit := PoolStart(ctx, 0)
// submit(func(w Worker){
// // do work here. This work will be done concurrently
// })
//
// If ctx gets cancelled, all of the workers will exit and all resources will be freed.
//
// If maxWorkers is 0, the pool can grow infinitely until it runs out of memory
// to spawn more workers.
func PoolStart(ctx context.Context, maxWorkers uint64) func(PoolJob) {
env := os.Environ()
jobChan := make(chan PoolJob)
createWorker := func(id uint64) {
worker := Worker{ctx: ctx, id: id, Env: env}
for {
select {
case job := <-jobChan:
job(worker)
case <-ctx.Done():
return
}
}
}
if maxWorkers == 0 {
maxWorkers = math.MaxUint64
}
var numWorkers uint64
return func(job PoolJob) {
select {
case jobChan <- job:
default:
// If the pool still can grow, we'll spawn another worker
if numWorkers < maxWorkers {
go createWorker(numWorkers)
numWorkers++
}
jobChan <- job
}
}
}