Skip to content

Commit 906a40a

Browse files
committed
added remote submitting capability to Job.
renamed DAG to Workflow
1 parent 4862e8d commit 906a40a

11 files changed

+139
-63
lines changed

.activate

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
. /Users/sdc50/Documents/_MyDocuments/CI-Water/code/venvs/condorpy/bin/activate

.gitignore

+1-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,5 @@
33
*.pyo
44
.pydevproject
55
test/.coverage
6-
references
7-
activate
6+
.activate
87

condorpy/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
# should have be distributed with this file.
88

99
from job import Job
10-
from dag import DAG, Node
10+
from workflow import Workflow, DAG
11+
from node import Node
1112
from templates import Templates
1213
Templates = Templates()
1314
Templates.load()

condorpy/job.py

+81-20
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@
66
# the terms of the BSD 2-Clause License. A copy of the BSD 2-Clause License
77
# should have be distributed with this file.
88

9-
#TODO: add ability to get stats about the job (i.e. number of jobs, run time, etc.)
10-
#TODO: add ability to submit to remote schedulers
11-
12-
import os, subprocess, re
9+
import os, subprocess, re, uuid
1310
from collections import OrderedDict
11+
from tethyscluster.sshutils import SSHClient
12+
from tethyscluster.exception import RemoteCommandFailed, SSHError
1413

1514
class Job(object):
1615
"""classdocs
@@ -20,7 +19,17 @@ class Job(object):
2019
"""
2120

2221

23-
def __init__(self, name, attributes=None, executable=None, arguments=None, num_jobs=1):
22+
def __init__(self,
23+
name,
24+
attributes=None,
25+
executable=None,
26+
arguments=None,
27+
num_jobs=1,
28+
host=None,
29+
username=None,
30+
password=None,
31+
private_key=None,
32+
private_key_pass=None):
2433
"""Constructor
2534
2635
"""
@@ -31,11 +40,17 @@ def __init__(self, name, attributes=None, executable=None, arguments=None, num_j
3140
object.__setattr__(self, '_num_jobs', int(num_jobs))
3241
object.__setattr__(self, '_cluster_id', 0)
3342
object.__setattr__(self, '_job_file', '')
43+
object.__setattr__(self, '_remote', None)
44+
object.__setattr__(self, '_remote_input_files', None)
45+
if host:
46+
object.__setattr__(self, '_remote', SSHClient(host, username, password, private_key, private_key_pass))
47+
object.__setattr__(self, '_remote_id', uuid.uuid4().hex)
3448
self.job_name = name
3549
self.executable = executable
3650
self.arguments = arguments
3751

3852

53+
3954
def __str__(self):
4055
"""docstring
4156
@@ -145,7 +160,6 @@ def job_file(self):
145160
146161
:return:
147162
"""
148-
#TODO: should the job file be just the name or the name and initdir?
149163
job_file_name = '%s.job' % (self.name)
150164
job_file_path = os.path.join(self.initial_dir, job_file_name)
151165
self._job_file = job_file_path
@@ -157,12 +171,11 @@ def log_file(self):
157171
158172
:return:
159173
"""
160-
#TODO: should the log file be just the name or the name and initdir?
161174
log_file = self.get('log')
162175
if not log_file:
163176
log_file = '%s.log' % (self.name)
164177
self.set('log', log_file)
165-
return self._resolve_attribute('log')
178+
return os.path.join(self.initial_dir, self._resolve_attribute('log'))
166179

167180
@property
168181
def initial_dir(self):
@@ -172,9 +185,19 @@ def initial_dir(self):
172185
"""
173186
initial_dir = self._resolve_attribute('initialdir')
174187
if not initial_dir:
175-
initial_dir = os.getcwd()
188+
initial_dir = os.path.relpath(os.getcwd())
189+
if self._remote and os.path.isabs(initial_dir):
190+
raise Exception('Cannot define an absolute path as an initial_dir on a remote scheduler')
176191
return initial_dir
177192

193+
@property
194+
def remote_input_files(self):
195+
return self._remote_input_files
196+
197+
@remote_input_files.setter
198+
def remote_input_files(self, files):
199+
self._remote_input_files = files
200+
178201
def submit(self, queue=None, options=[]):
179202
"""docstring
180203
@@ -191,9 +214,7 @@ def submit(self, queue=None, options=[]):
191214
args.extend(options)
192215
args.append(self.job_file)
193216

194-
process = subprocess.Popen(args, stdout = subprocess.PIPE, stderr=subprocess.PIPE)
195-
out,err = process.communicate()
196-
217+
out, err = self._execute(args)
197218
if err:
198219
if re.match('WARNING',err):
199220
print(err)
@@ -214,8 +235,7 @@ def remove(self, options=[], job_num=None):
214235
args.extend(options)
215236
job_id = '%s.%s' % (self.cluster_id, job_num) if job_num else self.cluster_id
216237
args.append(job_id)
217-
process = subprocess.Popen(args, stdout = subprocess.PIPE, stderr=subprocess.PIPE)
218-
out,err = process.communicate()
238+
out, err = self._execute(args)
219239
print(out,err)
220240

221241
def edit(self):
@@ -238,11 +258,9 @@ def wait(self, options=[], job_num=None):
238258
args = ['condor_wait']
239259
args.extend(options)
240260
job_id = '%s.%s' % (self.cluster_id, job_num) if job_num else str(self.cluster_id)
241-
abs_log_file = os.path.join(self.initial_dir, self.log_file)
261+
abs_log_file = os.path.abspath(self.log_file)
242262
args.extend([abs_log_file, job_id])
243-
print args
244-
process = subprocess.Popen(args, stdout = subprocess.PIPE, stderr=subprocess.PIPE)
245-
process.communicate()
263+
out, err = self._execute(args)
246264

247265
def get(self, attr, value=None):
248266
"""get attribute from job file
@@ -267,12 +285,40 @@ def delete(self, attr):
267285
"""
268286
self.attributes.pop(attr)
269287

288+
def sync_remote_output(self):
289+
self._copy_output_from_remote()
290+
291+
def _execute(self, args):
292+
out = None
293+
err = None
294+
if self._remote:
295+
cmd = ' '.join(args)
296+
try:
297+
cmd = 'cd %s && %s' % (self._remote_id, cmd)
298+
out = '\n'.join(self._remote.execute(cmd))
299+
except RemoteCommandFailed as e:
300+
err = e.output
301+
except SSHError as e:
302+
err = e.msg
303+
else:
304+
process = subprocess.Popen(args, stdout = subprocess.PIPE, stderr=subprocess.PIPE)
305+
out,err = process.communicate()
306+
307+
return out, err
308+
309+
def _copy_input_files_to_remote(self):
310+
self._remote.put(self.remote_input_files, self._remote_id)
311+
312+
def _copy_output_from_remote(self):
313+
self._remote.get(os.path.join(self._remote_id, self.initial_dir))
270314

271315
def _write_job_file(self):
272316
self._make_job_dirs()
273-
job_file = open(self.job_file, 'w')
317+
job_file = self._open(self.job_file, 'w')
274318
job_file.write(self.__str__())
275319
job_file.close()
320+
if self._remote:
321+
self._copy_input_files_to_remote()
276322

277323
def _list_attributes(self):
278324
list = []
@@ -281,12 +327,21 @@ def _list_attributes(self):
281327
list.append(k + ' = ' + str(v))
282328
return list
283329

330+
def _open(self, file_name, mode='w'):
331+
if self._remote:
332+
return self._remote.remote_file(os.path.join(self._remote_id,file_name), mode)
333+
else:
334+
return open(file_name, mode)
335+
284336
def _make_dir(self, dir_name):
285337
"""docstring
286338
287339
"""
288340
try:
289-
os.makedirs(dir_name)
341+
if self._remote:
342+
self._remote.makedirs(os.path.join(self._remote_id,dir_name))
343+
else:
344+
os.makedirs(dir_name)
290345
except OSError:
291346
pass
292347

@@ -321,6 +376,12 @@ def _resolve_attribute_match(self, match):
321376

322377
return self.get(match.group(1), match.group(0))
323378

379+
def __del__(self):
380+
if self._remote:
381+
self._remote.execute('rm -rf %s' % (self._remote_id,))
382+
self._remote.close()
383+
del self._remote
384+
324385

325386

326387
class NoExecutable(Exception):

condorpy/node.py

-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
from job import Job
1010

11-
#TODO: add node type (DATA, JOB)
1211
class Node(object):
1312
"""
1413
@@ -242,7 +241,6 @@ def _get_all_ancestors(self):
242241

243242
if self in ancestors:
244243
raise
245-
#TODO: make loop exception
246244
return ancestors
247245

248246
def _get_all_descendants(self):
@@ -258,7 +256,6 @@ def _get_all_descendants(self):
258256

259257
if self in descendants:
260258
raise
261-
#TODO: make loop exception
262259
return descendants
263260

264261
def _link_parent_nodes(self):

condorpy/templates.py

-1
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,4 @@ def vanilla_transfer_files(self):
8181
@property
8282
def vanilla_nfs(self):
8383
vanilla_nfs = self.vanilla_base
84-
#TODO: test nfs jobs
8584
return vanilla_nfs

condorpy/dag.py condorpy/workflow.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
from node import Node
1010
import subprocess, re
1111

12-
#TODO: set initialdir that overrides jobs' initaildir?
13-
14-
class DAG(object):
12+
class Workflow(object):
1513
"""
1614
1715
"""
@@ -135,3 +133,7 @@ def _write_dag_file(self):
135133
dag_file = open(self.dag_file, 'w')
136134
dag_file.write(self.__str__())
137135
dag_file.close()
136+
137+
138+
# For backwards compatibility
139+
DAG = Workflow

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
# Project uses reStructuredText, so ensure that the docutils get
2525
# installed or upgraded on the target machine
26-
#install_requires = ['docutils>=0.3'],
26+
install_requires = ['tethyscluster'],
2727

2828

2929
# package_data = {

tests/test_job.py

+21-7
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ def test__init__(self):
3333
'_attributes': attributes,
3434
'_num_jobs': 1,
3535
'_cluster_id': 0,
36-
'_job_file': ''}
36+
'_job_file': '',
37+
'_remote': None,
38+
'_remote_input_files': None}
3739
actual = self.job.__dict__
3840
msg = 'testing initialization with default values'
3941
self.assertDictEqual(expected, actual, '%s\nExpected: %s\nActual: %s\n' % (msg, expected, actual))
@@ -45,11 +47,9 @@ def test__init__(self):
4547
attributes['executable'] = exe
4648
attributes['arguments'] = args
4749

48-
expected = {'_name': self.job_name,
50+
expected.update({'_name': self.job_name,
4951
'_attributes': attributes,
50-
'_num_jobs': int(num_jobs),
51-
'_cluster_id': 0,
52-
'_job_file': ''}
52+
'_num_jobs': int(num_jobs)})
5353
actual = self.job.__dict__
5454
msg = 'testing initialization with all values supplied'
5555
self.assertDictEqual(expected, actual, '%s\nExpected: %s\nActual: %s\n' % (msg, expected, actual))
@@ -157,7 +157,7 @@ def test_job_file(self):
157157

158158
def test_log_file(self):
159159
self.job = Job(self.job_name, Templates.base)
160-
log_file = '%s/%s.%s.log' % (self.job.logdir, self.job_name, self.job.cluster_id)
160+
log_file = '%s/%s/%s.%s.log' % (self.job.initial_dir, self.job.logdir, self.job_name, self.job.cluster_id)
161161
expected = log_file
162162
actual = self.job.log_file
163163
msg = 'checking resolving attribute function for log file'
@@ -260,4 +260,18 @@ def test_resolve_attribute_match(self):
260260

261261
if __name__ == "__main__":
262262
#import sys;sys.argv = ['', 'Test.testName']
263-
unittest.main()
263+
unittest.main()
264+
265+
266+
'''
267+
import os
268+
os.chdir('sandbox')
269+
from condorpy import Job, Templates
270+
j = Job('remote_test', Templates.vanilla_transfer_files, 'copy_test.py', host='54.173.151.202', username='root', private_key='~/.tethyscluster/starcluster-east.pem')
271+
j.arguments = 'input.txt'
272+
j.remote_input_files = ['copy_test.py', 'input.txt']
273+
j.transfer_input_files = '../input.txt'
274+
j.submit()
275+
j.sync_remote_output()
276+
del j
277+
'''

0 commit comments

Comments
 (0)