Skip to content

Commit

Permalink
Add infrastructure for testing, conda, etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
jigold committed Jan 30, 2019
1 parent f3f294b commit 1ceeae9
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 48 deletions.
8 changes: 8 additions & 0 deletions pipeline/environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
name: hail-pipeline
dependencies:
- python=3.6
- pip
- pip:
- pylint
- flake8
- pytest
6 changes: 6 additions & 0 deletions pipeline/hail-ci-build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash
set -ex

flake8 pipeline
pylint pipeline --rcfile pipeline/pylintrc --score=n
pytest test
23 changes: 11 additions & 12 deletions pipeline/pipeline/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .utils import get_sha, escape_string


class Backend(object):
class Backend:
@abc.abstractmethod
def tmp_dir(self):
return
Expand All @@ -16,7 +16,7 @@ def run(self, pipeline, dry_run, verbose, bg, delete_on_exit):
return

@abc.abstractmethod
def cp(self, src, dest):
def copy(self, src, dest):
return


Expand Down Expand Up @@ -50,15 +50,15 @@ def define_resource(r):
script.append(f"# {task._uid} {task._label if task._label else ''}")

resource_defs = [define_resource(r) for _, r in task._resources.items()]
if task._settings.docker:
if task._docker:
defs = '; '.join(resource_defs) + '; ' if resource_defs else ''
cmd = "&& ".join(task._command)
image = task._settings.docker
image = task._docker
script += [f"docker run "
f"-v {tmpdir}:{tmpdir} "
f"-w {tmpdir} "
f"{image} /bin/bash "
f"-c {escape_string(defs + cmd)}",
f"-v {tmpdir}:{tmpdir} "
f"-w {tmpdir} "
f"{image} /bin/bash "
f"-c {escape_string(defs + cmd)}",
'\n']
else:
script += resource_defs
Expand All @@ -70,7 +70,7 @@ def define_resource(r):
print(script)
else:
try:
sp.check_output(script, shell=True) # FIXME: implement non-blocking (bg = True)
sp.check_output(script, shell=True) # FIXME: implement non-blocking (bg = True)
except sp.CalledProcessError as e:
print(e.output)
raise e
Expand All @@ -83,13 +83,12 @@ def _get_random_name():
directory = self._tmp_dir + '/pipeline.{}/'.format(get_sha(8))

if os.path.isdir(directory):
_get_random_name()
return _get_random_name()
else:
os.makedirs(directory, exist_ok=True)
return directory

return _get_random_name()

def cp(self, src, dest): # FIXME: symbolic links? support gsutil?
def copy(self, src, dest): # FIXME: symbolic links? support gsutil?
return f"cp {src} {dest}"

24 changes: 12 additions & 12 deletions pipeline/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .utils import get_sha


class Pipeline(object):
class Pipeline:
_counter = 0
_uid_prefix = "__PIPELINE__"
_regex_pattern = r"(?P<PIPELINE>{}\d+)".format(_uid_prefix)
Expand Down Expand Up @@ -38,7 +38,7 @@ def _get_random_file():
self._allocated_files.add(file)
return file
else:
_get_random_file()
return _get_random_file()

return _get_random_file()

Expand All @@ -55,7 +55,7 @@ def _new_resource_group(self, source, mappings):
for name, code in mappings.items():
if not isinstance(code, str):
raise ValueError(f"value for name '{name}' is not a string. Found '{type(code)}' instead.")
r = self._new_resource(source=source, value=eval(f'f"""{code}"""'))
r = self._new_resource(source=source, value=eval(f'f"""{code}"""')) # pylint: disable=W0123
d[name] = r
new_resource_map[r._uid] = r

Expand All @@ -68,15 +68,15 @@ def _read_input(self, source, dest=None):
dest = dest if dest else self._tmp_file()
cp_task = (self.new_task()
.label('read_input')
.command(self._backend.cp(source, dest)))
.command(self._backend.copy(source, dest)))
return self._new_resource(source=cp_task, value=dest)

def read_input(self, source):
return str(self._read_input(source))

def read_input_group(self, **kwargs):
root = self._tmp_file()
added_resources = {name:self._read_input(file, root + '.' + name) for name, file in kwargs.items()}
added_resources = {name: self._read_input(file, root + '.' + name) for name, file in kwargs.items()}
rg = ResourceGroup(root, **added_resources)
self._resource_map.update({rg._uid: rg})
return rg
Expand All @@ -87,20 +87,20 @@ def write_output(self, resource, dest):
else:
assert isinstance(resource, Resource)
assert resource._uid in self._resource_map
cp_task = (self.new_task()
.label('write_output')
.command(self._backend.cp(resource, dest)))
(self.new_task()
.label('write_output')
.command(self._backend.copy(resource, dest)))

def select_tasks(self, pattern):
return [task for task in self._tasks if task._label is not None and re.match(pattern, task._label) is not None]

def run(self, dry_run=False, verbose=True, delete_on_exit=True):
dependencies = {task:task._dependencies for task in self._tasks}
def run(self, dry_run=False, verbose=False, delete_on_exit=True):
dependencies = {task: task._dependencies for task in self._tasks}
ordered_tasks = []
niter = 0
while dependencies:
for task, deps in dependencies.items():
if len(deps) == 0:
if not deps:
ordered_tasks.append(task)
niter = 0
for task, _ in dependencies.items():
Expand All @@ -114,7 +114,7 @@ def run(self, dry_run=False, verbose=True, delete_on_exit=True):
raise ValueError("cycle detected in dependency graph")

self._tasks = ordered_tasks
self._backend.run(self, dry_run, verbose, False, delete_on_exit) # FIXME: expose bg option when implemented!
self._backend.run(self, dry_run, verbose, False, delete_on_exit) # FIXME: expose bg option when implemented!

def __str__(self):
return self._uid
22 changes: 22 additions & 0 deletions pipeline/pipeline/pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[MESSAGES CONTROL]
# C0111 Missing docstring, W1203 logging fstring interpolation, C0111 missing
# doc string, R0913 too many arguments, W0622 redefining built in, W0212
# protected member access, W0621 redefining name from outer scope, R0914 too
# many local variables, W0603 using the global statement, R0902 too many
# instance attributes, R1705 unnecessary "else" after "return", W0511 fixme,
# R0903 Too few public methods, R0401 Cyclic import, R0801 Similar lines

disable=C0111,W1203,W1202,C0111,R0913,W0622,W0212,W0621,R0914,W0603,R0902,R1705,W0511,R0903,R0401,R0801

[FORMAT]
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
indent-string=' '

attr-rgx=[a-z_][a-z0-9_]{0,30}|id$
argument-rgx=[a-z_][a-z0-9_]{0,30}|id$
variable-rgx=[a-z_][a-z0-9_]{0,30}|id|f$
const-rgx=(([a-zA-Z_][a-zA-Z0-9_]*)|(__.*__))$

# Maximum number of characters on a single line.
max-line-length=120
8 changes: 4 additions & 4 deletions pipeline/pipeline/resource.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class Resource(object):
class Resource:
_counter = 0
_uid_prefix = "__RESOURCE__"
_regex_pattern = r"(?P<RESOURCE>{}\d+)".format(_uid_prefix)
Expand All @@ -21,7 +21,7 @@ def __str__(self):
return self._uid


class ResourceGroup(object):
class ResourceGroup:
_counter = 0
_uid_prefix = "__RESOURCE_GROUP__"
_regex_pattern = r"(?P<RESOURCE_GROUP>{}\d+)".format(_uid_prefix)
Expand All @@ -43,8 +43,8 @@ def __init__(self, root, **values):

def _get_resource(self, item):
if item not in self._namespace:
raise KeyError(f"'{item}' not found in the resource group. " \
"Hint: you must declare each attribute when constructing the resource group.")
raise KeyError(f"'{item}' not found in the resource group. "
f"Hint: you must declare each attribute when constructing the resource group.")
r = self._namespace[item]
return r._uid

Expand Down
38 changes: 20 additions & 18 deletions pipeline/pipeline/task.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
import re
from .resource import Resource, ResourceGroup

class TaskSettings(object):
def __init__(self, cpu=None, memory=None, docker=None, env=None):
self.cpu = cpu
self.memory = memory
self.docker = docker
self.env = env
from .resource import Resource, ResourceGroup


class Task(object):
class Task:
_counter = 0
_uid_prefix = "__TASK__"
_regex_pattern = r"(?P<TASK>{}\d+)".format(_uid_prefix)
Expand All @@ -21,9 +15,11 @@ def _new_uid(cls):
return uid

def __init__(self, pipeline, label=None):
self._settings = TaskSettings(cpu=None, memory=None, docker=None, env=None)
self._pipeline = pipeline
self._label = label
self._cpu = None
self._memory = None
self._docker = None
self._command = []
self._namespace = {}
self._resources = {}
Expand Down Expand Up @@ -55,12 +51,17 @@ def declare_resource_group(self, **mappings):
self._namespace[name] = self._pipeline._new_resource_group(self, d)
return self

def depends_on(self, *tasks):
for t in tasks:
self._dependencies.add(t)

def command(self, command):
from .pipeline import Pipeline

def add_dependencies(r):
if isinstance(r, ResourceGroup):
[add_dependencies(resource) for _, resource in r._namespace.items()]
for _, resource in r._namespace.items():
add_dependencies(resource)
else:
assert isinstance(r, Resource)
if r._source is not None and r._source != self:
Expand All @@ -77,15 +78,16 @@ def handler(match_obj):
r_uid = match_obj.group()
r = self._pipeline._resource_map.get(r_uid)
if r is None:
raise KeyError(f"undefined resource '{r_uid}' in command '{command}'. Hint: resources must be from the same pipeline as the current task.")
raise KeyError(f"undefined resource '{r_uid}' in command '{command}'. "
f"Hint: resources must be from the same pipeline as the current task.")
add_dependencies(r)
self._resources[r._uid] = r
return f"${{{r_uid}}}"

subst_command = re.sub(f"({Resource._regex_pattern})|({ResourceGroup._regex_pattern})" \
f"|({Task._regex_pattern})|({Pipeline._regex_pattern})",
handler,
command)
subst_command = re.sub(f"({Resource._regex_pattern})|({ResourceGroup._regex_pattern})"
f"|({Task._regex_pattern})|({Pipeline._regex_pattern})",
handler,
command)
self._command.append(subst_command)
return self

Expand All @@ -94,15 +96,15 @@ def label(self, label):
return self

def memory(self, memory):
self._settings.memory = memory
self._memory = memory
return self

def cpu(self, cpu):
self._settings.cpu = cpu
self._cpu = cpu
return self

def docker(self, docker):
self._settings.docker = docker
self._docker = docker
return self

def __str__(self):
Expand Down
4 changes: 3 additions & 1 deletion pipeline/pipeline/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import random, string, shlex
import random
import string
import shlex


def get_sha(k):
Expand Down
2 changes: 2 additions & 0 deletions pipeline/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
max-line-length=120
2 changes: 1 addition & 1 deletion pipeline/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
setup(
name = 'pipeline',
version = '0.0.1',
url = 'https://github.com/hail-is/pipeline.git',
url = 'https://github.com/hail-is/hail/tree/master/pipeline',
author = 'Hail Team',
author_email = 'hail@broadinstitute.org',
description = 'Pipeline builder',
Expand Down
Empty file added pipeline/test/__init__.py
Empty file.

0 comments on commit 1ceeae9

Please sign in to comment.