-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathjobsession.go
229 lines (210 loc) · 8.57 KB
/
jobsession.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package drmaa2os
import (
"fmt"
"time"
"github.com/dgruber/drmaa2interface"
"github.com/dgruber/drmaa2os/pkg/d2hlp"
"github.com/dgruber/drmaa2os/pkg/jobtracker"
"github.com/mitchellh/copystructure"
)
// JobSession instance acts as container for job instances controlled
// through the DRMAA API. The session methods support the submission
// of new jobs and the monitoring of existing jobs.
type JobSession struct {
name string
tracker []jobtracker.JobTracker
// libdrmaa stores newly created internal job session name
// inside contact string
contact string
}
// Close MUST perform the necessary action to disengage from the DRM system.
// It SHOULD be callable only once, by only one of the application threads.
// This SHOULD be ensured by the library implementation. Additional calls
// beyond the first one SHOULD lead to a InvalidSessionException error
// notification.
// The corresponding state information MUST be saved to some stable storage
// before the method returns. This method SHALL NOT affect any jobs or
// reservations in the session (e.g., queued and running jobs remain queued
// and running). (TODO)
func (js *JobSession) Close() error {
if js.name == "" && js.tracker == nil {
return ErrorInvalidSession
}
var err error
if closer, ok := js.tracker[0].(jobtracker.Closer); ok {
// disengage from storage and DRM system
err = closer.Close()
}
js.name = ""
js.tracker = nil
return err
}
// GetContact method reports the contact value that was used in the
// SessionManager::createJobSession call for this instance. If no
// value was originally provided, the default contact string from the
// implementation MUST be returned.
func (js *JobSession) GetContact() (string, error) {
if len(js.tracker) >= 1 {
if cs, ok := js.tracker[0].(jobtracker.ContactStringer); ok {
return cs.Contact()
}
}
return "not implemented", nil
}
// GetSessionName reports the session name, a value that resulted from the
// SessionManager::createJobSession or SessionManager::openJobSession
// call for this instance.
func (js *JobSession) GetSessionName() (string, error) {
return js.name, nil
}
// GetJobCategories provides the list of valid job category names which
// can be used for the jobCategory attribute in a JobTemplate instance.
func (js *JobSession) GetJobCategories() ([]string, error) {
var lastError error
jobCategories := make([]string, 0, 16)
for _, tracker := range js.tracker {
cat, err := tracker.ListJobCategories()
if err != nil {
lastError = err
continue
}
jobCategories = append(jobCategories, cat...)
}
return jobCategories, lastError
}
// GetJobs returns the set of jobs that belong to the job session. The
// filter parameter allows to choose a subset of the session jobs as
// return value. If no job matches or the session has no jobs attached,
// the method MUST return an empty set. If filter is UNSET, all session
// jobs MUST be returned.
// Time-dependent effects of this method, such as jobs no longer matching
// to filter criteria on evaluation time, are implementation-specific.
// The purpose of the filter parameter is to keep scalability with a
// large number of jobs per session. Applications therefore must consider
// the possibly changed state of jobs during their evaluation of the method
// result.
func (js *JobSession) GetJobs(filter drmaa2interface.JobInfo) ([]drmaa2interface.Job, error) {
var joblist []drmaa2interface.Job
hasFilter := true
if d2hlp.JobInfoIsUnset(filter) {
hasFilter = false
}
for _, tracker := range js.tracker {
jobs, err := tracker.ListJobs()
if err != nil {
fmt.Printf("error listings jobs: %v", err)
return nil, err
}
for _, jobid := range jobs {
if hasFilter {
jinfo, err := tracker.JobInfo(jobid)
if err != nil {
// TODO add as exited with info from jobsession DB
//fmt.Printf("failed getting job info for job %s: %v\n", jobid, err)
continue
}
if d2hlp.JobInfoMatches(jinfo, filter) == false {
// this job is not allowed by the filter
continue
}
}
// get job template from tracker if it supports it
jobtemplate := drmaa2interface.JobTemplate{}
if jobTemplater, ok := tracker.(jobtracker.JobTemplater); ok {
jobtemplate, _ = jobTemplater.JobTemplate(jobid)
}
job := newJob(jobid, js.name, jobtemplate, tracker)
joblist = append(joblist, drmaa2interface.Job(job))
}
}
return joblist, nil
}
// GetJobArray method returns the JobArray instance by the given ID.
// If the session does not / no longer contain the according job array,
// InvalidArgumentException SHALL be thrown.
func (js *JobSession) GetJobArray(id string) (drmaa2interface.ArrayJob, error) {
var joblist []drmaa2interface.Job
arrayJobTemplate := drmaa2interface.JobTemplate{}
for _, tracker := range js.tracker {
jobids, err := tracker.ListArrayJobs(id)
if err != nil {
return nil, err
}
for _, id := range jobids {
// get job template from tracker if it supports it
jobtemplate := drmaa2interface.JobTemplate{}
if jobTemplater, ok := tracker.(jobtracker.JobTemplater); ok {
jobtemplate, _ = jobTemplater.JobTemplate(id)
}
job := newJob(id, js.name, jobtemplate, tracker)
joblist = append(joblist, drmaa2interface.Job(job))
}
if arrayJobTemplate.RemoteCommand == "" {
if jobTemplater, ok := tracker.(jobtracker.JobTemplater); ok {
arrayJobTemplate, _ = jobTemplater.JobTemplate(id)
}
}
}
return newArrayJob(id, js.name, arrayJobTemplate, joblist), nil
}
// RunJob method submits a job with the attributes defined in the given job template
// instance. The method returns a Job object that represents the job in the underlying
// DRM system. Depending on the job template settings, submission attempts may be
// rejected with an InvalidArgumentException. The error details SHOULD provide further
// information about the attribute(s) responsible for the rejection. When this method
// returns a valid Job instance, the following conditions SHOULD be fulfilled:
// - The job is part of the persistent state of the job session.
// - All non-DRMAA and DRMAA interfaces to the DRM system report the job as
// being submitted to the DRM system.
// - The job has one of the DRMAA job states.
func (js *JobSession) RunJob(jt drmaa2interface.JobTemplate) (drmaa2interface.Job, error) {
jtCopy, err := copystructure.Copy(jt)
if err != nil {
return nil, fmt.Errorf("failed to copy job template: %w", err)
}
id, err := js.tracker[0].AddJob(jtCopy.(drmaa2interface.JobTemplate))
if err != nil {
return nil, err
}
return newJob(id, js.name, jtCopy.(drmaa2interface.JobTemplate), js.tracker[0]), nil
}
// RunBulkJobs method creates a set of parametric jobs, each with attributes as defined
// in the given job template instance.
func (js *JobSession) RunBulkJobs(jt drmaa2interface.JobTemplate, begin, end, step, maxParallel int) (drmaa2interface.ArrayJob, error) {
jtCopy, err := copystructure.Copy(jt)
if err != nil {
return nil, fmt.Errorf("failed to copy job template: %w", err)
}
id, err := js.tracker[0].AddArrayJob(jtCopy.(drmaa2interface.JobTemplate),
begin, end, step, maxParallel)
if err != nil {
return nil, err
}
return js.GetJobArray(id)
}
// WaitAnyStarted method blocks until any of the jobs referenced in the jobs
// parameter entered one of the "Started" states.
//
// The timeout argument specifies the desired waiting time for the state change.
// The constant value drmaa2interface.InfiniteTime MUST be supported to get an
// indefinite waiting time. The constant value drmaa2interface.ZeroTime MUST be
// supported to express that the method call SHALL return immediately.
// A time.Duration can be specified to indicate the maximum waiting time.
// If the method call returns because of timeout, an TimeoutException SHALL be
// raised.
func (js *JobSession) WaitAnyStarted(jobs []drmaa2interface.Job, timeout time.Duration) (drmaa2interface.Job, error) {
return waitAny(true, jobs, timeout)
}
// WaitAnyTerminated method blocks until any of the jobs referenced in the
// jobs parameter entered one of the "Terminated" states.
//
// The timeout argument specifies the desired waiting time for the state change.
// The constant value drmaa2interface.InfiniteTime MUST be supported to get an
// indefinite waiting time. The constant value drmaa2interface.ZeroTime MUST be
// supported to express that the method call SHALL return immediately.
// A time.Duration can be specified to indicate the maximum waiting time.
// If the method call returns because of timeout, an TimeoutException SHALL be
// raised.
func (js *JobSession) WaitAnyTerminated(jobs []drmaa2interface.Job, timeout time.Duration) (drmaa2interface.Job, error) {
return waitAny(false, jobs, timeout)
}