-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathsessionmanager.go
384 lines (342 loc) · 14 KB
/
sessionmanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
package drmaa2os
import (
"errors"
"fmt"
"log"
"code.cloudfoundry.org/lager"
"github.com/dgruber/drmaa2interface"
"github.com/dgruber/drmaa2os/pkg/jobtracker"
"github.com/dgruber/drmaa2os/pkg/storage"
)
// SessionType represents the selected resource manager.
type SessionType int
const (
// DefaultSession handles jobs as processes
DefaultSession SessionType = iota
// DockerSession manages Docker containers
DockerSession
// CloudFoundrySession manages Cloud Foundry application tasks
CloudFoundrySession
// KubernetesSession creates Kubernetes jobs
KubernetesSession
// SingularitySession manages Singularity containers
SingularitySession
// SlurmSession manages slurm jobs
SlurmSession
// LibDRMAASession manages jobs through libdrmaa.so
LibDRMAASession
// PodmanSession manages jobs as podman containers either locally or remote
PodmanSession
// RemoteSession manages jobs over the network through a remote server
RemoteSession
// ExternalSession can be used by external JobTracker implementations
// during development time before they get added here
ExternalSession
// GoogleBatchSession manages Google Cloud Batch jobs
GoogleBatchSession
// MPIOperatorSession manages jobs as MPI operator jobs on Kubernetes
MPIOperatorSession
// ContainerdSession manages jobs as containerd containers
ContainerdSession
)
func init() {
// initialize job tracker registration map
atomicTrackers.Store(make(map[SessionType]jobtracker.Allocator))
}
// RegisterJobTracker registers a JobTracker implementation at session manager
// so that it can be used. This is done in the init() method of the JobTracker
// implementation. That means the application which wants to use a specific JobTracker
// needs to import the JobTracker implementation package with _.
//
// Like when Docker needs to be used as job management backend:
//
// import _ "github.com/dgruber/drmaa2os/pkg/jobtracker/pkg/dockertracker"
//
// When multiple backends to be used, all of them needs to be imported so
// that they are registered in the main application.
func RegisterJobTracker(sessionType SessionType, tracker jobtracker.Allocator) {
trackerMutex.Lock()
jtMap := atomicTrackers.Load().(map[SessionType]jobtracker.Allocator)
if jtMap == nil {
jtMap = make(map[SessionType]jobtracker.Allocator)
}
jtMap[sessionType] = tracker
atomicTrackers.Store(jtMap)
trackerMutex.Unlock()
}
// SessionManager allows to create, list, and destroy job, reserveration,
// and monitoring sessions. It also returns holds basic information about
// the resource manager and its capabilities.
type SessionManager struct {
store storage.Storer
log lager.Logger
sessionType SessionType
jobTrackerCreateParams interface{}
}
// NewDefaultSessionManager creates a SessionManager which starts jobs
// as processes.
func NewDefaultSessionManager(dbpath string) (*SessionManager, error) {
return makeSessionManager(dbpath, DefaultSession)
}
// NewDefaultSessionManagerWithParams creates a SessionManager which
// starts jobs as processes. By providing a simpletracker.(SimpleTrackerInitParams)
// data structure some specific behaviour of the JobSesion can be
// triggered. Currently it provides additional support for keeping
// job IDs during DRMAA2 applications persistent in a file based DB.
func NewDefaultSessionManagerWithParams(ds interface{}, dbpath string) (*SessionManager, error) {
sm, err := makeSessionManager(dbpath, DefaultSession)
if err != nil {
return sm, err
}
sm.jobTrackerCreateParams = ds
return sm, nil
}
// NewSingularitySessionManager creates a new session manager creating and
// maintaining jobs as Singularity containers.
func NewSingularitySessionManager(dbpath string) (*SessionManager, error) {
return makeSessionManager(dbpath, SingularitySession)
}
// NewDockerSessionManager creates a SessionManager which maintains jobs as
// Docker containers. This requires to have following import:
// import(_ "github.com/dgruber/drmaa2os/pkg/jobtracker/dockertracker")
func NewDockerSessionManager(dbpath string) (*SessionManager, error) {
return makeSessionManager(dbpath, DockerSession)
}
// NewCloudFoundrySessionManager creates a SessionManager which maintains jobs
// as Cloud Foundry tasks.
// addr needs to point to the cloud controller API and username and password
// needs to be set as well.
func NewCloudFoundrySessionManager(addr, username, password, dbpath string) (*SessionManager, error) {
sm, err := makeSessionManager(dbpath, CloudFoundrySession)
if err != nil {
return sm, err
}
// specific parameters for Cloud Foundry
sm.jobTrackerCreateParams = []string{addr, username, password}
return sm, nil
}
// NewKubernetesSessionManager creates a new session manager which uses
// Kubernetes tasks as execution backend for jobs. The first parameter must
// be either a *kubernetes.Clientset or nil to allocate a new one.
func NewKubernetesSessionManager(cs interface{}, dbpath string) (*SessionManager, error) {
sm, err := makeSessionManager(dbpath, KubernetesSession)
if err != nil {
return sm, err
}
// when a job session is created is requires a kubernetes clientset
sm.jobTrackerCreateParams = cs
return sm, nil
}
// NewSlurmSessionManager creates a new session manager which wraps the
// slurm command line for managing jobs.
func NewSlurmSessionManager(dbpath string) (*SessionManager, error) {
return makeSessionManager(dbpath, SlurmSession)
}
// NewLibDRMAASessionManager creates a new session manager which wraps
// libdrmaa.so (DRMAA v1) through the Go DRMAA library. Please check out
// the details of github.com/dgruber/drmaa before using it. Make sure
// all neccessary paths are set (C header files, LD_LIBRARY_PATH).
func NewLibDRMAASessionManager(dbpath string) (*SessionManager, error) {
return makeSessionManager(dbpath, LibDRMAASession)
}
// NewLibDRMAASessionManagerWithParams creates a Go DRMAA session manager
// like NewLibDRMAASessionManager but with additional parameters. The
// parameters must be of type _libdrmaa.LibDRMAASessionParams_.
func NewLibDRMAASessionManagerWithParams(ds interface{}, dbpath string) (*SessionManager, error) {
sm, err := makeSessionManager(dbpath, LibDRMAASession)
if err != nil {
return sm, err
}
sm.jobTrackerCreateParams = ds
return sm, nil
}
// NewPodmanSessionManager creates a new session manager for Podman.
// The first parameter is either nil for using defaults or must be
// of type _podmantracker.PodmanTrackerParams_.
func NewPodmanSessionManager(ps interface{}, dbpath string) (*SessionManager, error) {
sm, err := makeSessionManager(dbpath, PodmanSession)
if err != nil {
return sm, err
}
// specific parameters for Podman
sm.jobTrackerCreateParams = ps
return sm, nil
}
// NewRemoteSessionManager create a new session manager for accessing
// a remote jobtracker server implementation which can be of any
// backend type.
func NewRemoteSessionManager(rs interface{}, dbpath string) (*SessionManager, error) {
sm, err := makeSessionManager(dbpath, RemoteSession)
if err != nil {
return sm, err
}
// specific parameters for remote (like server address)
sm.jobTrackerCreateParams = rs
return sm, nil
}
// NexExternalSessionManager creates a new external session. This can be
// used when a JobTrack is implemented outside of the repository.
// Note that only one ExternalSession is available so it makes sense to
// add a constant here.
func NexExternalSessionManager(dbpath string) (*SessionManager, error) {
return makeSessionManager(dbpath, ExternalSession)
}
// NewGoogleBatchSessionManager see https://github.com/dgruber/gcpbatchtracker
func NewGoogleBatchSessionManager(parameters interface{}, dbpath string) (*SessionManager, error) {
sm, err := makeSessionManager(dbpath, GoogleBatchSession)
if err != nil {
return sm, err
}
// specific parameters for GoogleBatch (like project ID and region)
sm.jobTrackerCreateParams = parameters
return sm, nil
}
// NewMPIOperatorSessionManager (TODO) see https://github.com/dgruber/mpioperatortracker
func NewMPIOperatorSessionManager(parameters interface{}, dbpath string) (*SessionManager, error) {
sm, err := makeSessionManager(dbpath, MPIOperatorSession)
if err != nil {
return sm, err
}
sm.jobTrackerCreateParams = parameters
return sm, nil
}
// NewContainerdSessionManager creates a new session manager for containerd.
// The first parameter is either nil for using defaults or must be
// of type _containerdtracker.ContainerdTrackerParams_.
func NewContainerdSessionManager(cs interface{}, dbpath string) (*SessionManager, error) {
sm, err := makeSessionManager(dbpath, ContainerdSession)
if err != nil {
return sm, err
}
// specific parameters for Containerd
sm.jobTrackerCreateParams = cs
return sm, nil
}
// CreateJobSession creates a new JobSession for managing jobs.
func (sm *SessionManager) CreateJobSession(name, contact string) (drmaa2interface.JobSession, error) {
if err := sm.create(storage.JobSessionType, name, contact); err != nil {
return nil, err
}
// allocate a registered job tracker - registration happens
// when the package is imported in the init method of the
// JobTracker implementation package
jt, err := sm.newRegisteredJobTracker(name, sm.jobTrackerCreateParams)
if err != nil {
return nil, err
}
js := newJobSession(name, []jobtracker.JobTracker{jt})
// for libdrmaa return contact string and store it for open job session
if sm.sessionType == LibDRMAASession && sm.jobTrackerCreateParams != nil {
if contactStringer, ok := jt.(jobtracker.ContactStringer); ok {
contact, err := contactStringer.Contact()
if err != nil {
return nil, fmt.Errorf("Failed to get contact string after session creation: %v", err)
}
// store new contact string for job session
fmt.Printf("saving contact string %s\n", contact)
err = sm.store.Put(storage.JobSessionType, name, contact)
if err != nil {
return nil, fmt.Errorf("Failed to store contact string after session creation: %v", err)
}
}
}
return js, nil
}
// CreateReservationSession creates a new ReservationSession.
func (sm *SessionManager) CreateReservationSession(name, contact string) (drmaa2interface.ReservationSession, error) {
return nil, ErrorUnsupportedOperation
}
// OpenMonitoringSession opens a session for monitoring jobs.
func (sm *SessionManager) OpenMonitoringSession(sessionName string) (drmaa2interface.MonitoringSession, error) {
msJobTracker, msMonitorer, err := sm.newRegisteredMonitoringSessionJobTracker(sessionName, sm.jobTrackerCreateParams)
if err != nil {
return nil, err
}
return &MonitoringSession{
name: sessionName,
jobtracker: msJobTracker,
monitorer: msMonitorer,
}, nil
}
// OpenJobSession creates a new session for managing jobs. The semantic of a job session
// and the job session name depends on the resource manager.
func (sm *SessionManager) OpenJobSession(name string) (drmaa2interface.JobSession, error) {
if exists := sm.store.Exists(storage.JobSessionType, name); !exists {
return nil, errors.New("JobSession does not exist")
}
// require a copy as it gets modified
createParams := sm.jobTrackerCreateParams
// restore contact string from storage and set it as ContactString
// in job tracker create params
if sm.sessionType == LibDRMAASession && createParams != nil {
contact, err := sm.store.Get(storage.JobSessionType, name)
if err != nil {
return nil, fmt.Errorf("could not get contact string for job session: %s: %v",
name, err)
}
log.Printf("using internal DRMAA job session %s with contact string %s from DB\n", name, contact)
err = TryToSetContactString(&createParams, contact)
if err != nil {
return nil, fmt.Errorf("could not set new contact string for opening job session %s: %v",
name, err)
}
}
jt, err := sm.newRegisteredJobTracker(name, createParams)
if err != nil {
return nil, err
}
js := JobSession{
name: name,
tracker: []jobtracker.JobTracker{jt},
}
return &js, nil
}
// OpenReservationSession opens a reservation session.
func (sm *SessionManager) OpenReservationSession(name string) (drmaa2interface.ReservationSession, error) {
return nil, ErrorUnsupportedOperation
}
// DestroyJobSession destroys a job session by name.
func (sm *SessionManager) DestroyJobSession(name string) error {
// A job session must be closed before destroying it. The only
// way to test if it closed without having a handle is trying
// to open it (and close it again). If opening fails an error
// should be returned to prevent consumer issues later on...
js, err := sm.OpenJobSession(name)
if err != nil {
return fmt.Errorf("job session must be closed before destroying it. cloud not open job session during destruction: %v", err)
}
err = js.Close()
if err != nil {
return fmt.Errorf("cloud not close job session during destruction: %v", err)
}
return sm.delete(storage.JobSessionType, name)
}
// DestroyReservationSession removes a reservation session.
func (sm *SessionManager) DestroyReservationSession(name string) error {
return ErrorUnsupportedOperation
}
// GetJobSessionNames returns a list of all job sessions.
func (sm *SessionManager) GetJobSessionNames() ([]string, error) {
return sm.store.List(storage.JobSessionType)
}
// GetReservationSessionNames returns a list of all reservation sessions.
func (sm *SessionManager) GetReservationSessionNames() ([]string, error) {
return nil, ErrorUnsupportedOperation
}
// GetDrmsName returns the name of the distributed resource manager.
func (sm *SessionManager) GetDrmsName() (string, error) {
return "drmaa2os", nil
}
// GetDrmsVersion returns the version of the distributed resource manager.
func (sm *SessionManager) GetDrmsVersion() (drmaa2interface.Version, error) {
return drmaa2interface.Version{Minor: "0", Major: "1"}, nil
}
// Supports returns true of false of the given Capability is supported by DRMAA2OS.
func (sm *SessionManager) Supports(capability drmaa2interface.Capability) bool {
return false
}
// RegisterEventNotification creates an event channel which emits events when
// the conditions described in the given notification specification are met.
func (sm *SessionManager) RegisterEventNotification() (drmaa2interface.EventChannel, error) {
return nil, ErrorUnsupportedOperation
}