-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathjobsession_hlp.go
78 lines (70 loc) · 1.5 KB
/
jobsession_hlp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package drmaa2os
import (
"errors"
"fmt"
"time"
"github.com/dgruber/drmaa2interface"
"github.com/dgruber/drmaa2os/pkg/jobtracker"
)
func newJobSession(name string, tracker []jobtracker.JobTracker) *JobSession {
return &JobSession{
name: name,
tracker: tracker,
}
}
func waitAny(waitForStartedState bool, jobs []drmaa2interface.Job, timeout time.Duration) (drmaa2interface.Job, error) {
started := make(chan int, len(jobs))
errored := make(chan int, len(jobs))
abort := make(chan bool, len(jobs))
if len(jobs) == 0 {
return nil, fmt.Errorf("no job to wait for")
}
for i := 0; i < len(jobs); i++ {
index := i // closure fun
job := jobs[i]
waitForStarted := waitForStartedState
go func() {
finished := make(chan bool, 1)
go func() {
var errWait error
if waitForStarted {
errWait = job.WaitStarted(timeout)
} else {
errWait = job.WaitTerminated(timeout)
}
if errWait == nil {
started <- index
} else {
errored <- index
}
finished <- true
}()
select {
case <-abort:
return
case <-finished:
return
}
}()
}
t := time.NewTicker(timeout)
errorCnt := 0
for {
select {
case <-errored:
errorCnt++
if errorCnt >= len(jobs) {
return nil, errors.New("Error waiting for jobs")
}
continue
case jobindex := <-started:
// abort all waiting go routines
for i := 1; i <= len(jobs)-errorCnt; i++ {
abort <- true
}
return jobs[jobindex], nil
case <-t.C:
return nil, ErrorInvalidState
}
}
}