Skip to content

Commit

Permalink
Prototype for batch Python interface (hail-is#4937)
Browse files Browse the repository at this point in the history
* Prototype for batch Python interface

* wip

* Attempt #2

* change how temp dir is formatted

* change directory structure

* Add tests and actually execute pipeline

* added project

* better error msg

* added docker support

* Add infrastructure for testing, conda, etc.

* support for new pipeline env

* comment out docker test
  • Loading branch information
jigold authored and danking committed Jan 30, 2019
1 parent d31cc85 commit c883545
Show file tree
Hide file tree
Showing 17 changed files with 643 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Dockerfile.pr-builder
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ COPY hail/gradle gradle
COPY hail/python/hail/dev-environment.yml python/hail/dev-environment.yml
COPY batch/environment.yml batch/environment.yml
COPY ci/environment.yml ci/environment.yml
COPY pipeline/environment.yml pipeline/environment.yml

RUN useradd --create-home --shell /bin/bash hail && \
usermod -a -G docker hail && \
Expand All @@ -101,6 +102,7 @@ RUN ./gradlew downloadDependencies --gradle-user-home /gradle-cache
RUN conda env create -f ./python/hail/dev-environment.yml && \
conda env create -f batch/environment.yml && \
conda env create -f ci/environment.yml && \
conda env create -f pipeline/environment.yml && \
rm -rf /home/hail/.conda/pkgs/*

# gcloud iam key files will be stored here
Expand Down
2 changes: 1 addition & 1 deletion hail-ci-build-image
Original file line number Diff line number Diff line change
@@ -1 +1 @@
gcr.io/hail-vdc/hail-pr-builder:124133f80af4373efd3ea198c63c503821ee965dc40f23041312286a09350571
gcr.io/hail-vdc/hail-pr-builder:5aae9c62fdc50e2ecf53b30346fe0aa902613822efdc984bb28e009d55a05cc9
2 changes: 2 additions & 0 deletions pipeline/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.pyc
**/__pycache__
8 changes: 8 additions & 0 deletions pipeline/environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
name: hail-pipeline
dependencies:
- python=3.6
- pip
- pip:
- pylint
- flake8
- pytest
9 changes: 9 additions & 0 deletions pipeline/hail-ci-build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash
set -ex

. ../loadconda
conda activate hail-pipeline

flake8 pipeline
pylint pipeline --rcfile pipeline/pylintrc --score=n
pytest test
6 changes: 6 additions & 0 deletions pipeline/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .pipeline import Pipeline
from .backend import LocalBackend

__all__ = ['Pipeline',
'LocalBackend',
]
94 changes: 94 additions & 0 deletions pipeline/pipeline/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import abc
import os
import subprocess as sp

from .resource import Resource, ResourceGroup
from .utils import get_sha, escape_string


class Backend:
@abc.abstractmethod
def tmp_dir(self):
return

@abc.abstractmethod
def run(self, pipeline, dry_run, verbose, bg, delete_on_exit):
return

@abc.abstractmethod
def copy(self, src, dest):
return


class LocalBackend(Backend):
def __init__(self, tmp_dir='/tmp/'):
self._tmp_dir = tmp_dir

def run(self, pipeline, dry_run, verbose, bg, delete_on_exit):
tmpdir = self.tmp_dir()

script = ['#!/bin/bash',
'set -e' + 'x' if verbose else '',
'\n',
'# change cd to tmp directory',
f"cd {tmpdir}",
'\n']

def define_resource(r):
if isinstance(r, str):
r = pipeline._resource_map[r]

if isinstance(r, Resource):
assert r._value is not None
init = f"{r._uid}={escape_string(r._value)}"
else:
assert isinstance(r, ResourceGroup)
init = f"{r._uid}={escape_string(r._root)}"
return init

for task in pipeline._tasks:
script.append(f"# {task._uid} {task._label if task._label else ''}")

resource_defs = [define_resource(r) for _, r in task._resources.items()]
if task._docker:
defs = '; '.join(resource_defs) + '; ' if resource_defs else ''
cmd = "&& ".join(task._command)
image = task._docker
script += [f"docker run "
f"-v {tmpdir}:{tmpdir} "
f"-w {tmpdir} "
f"{image} /bin/bash "
f"-c {escape_string(defs + cmd)}",
'\n']
else:
script += resource_defs
script += task._command + ['\n']

script = "\n".join(script)

if dry_run:
print(script)
else:
try:
sp.check_output(script, shell=True) # FIXME: implement non-blocking (bg = True)
except sp.CalledProcessError as e:
print(e.output)
raise e
finally:
if delete_on_exit:
sp.run(f'rm -r {tmpdir}', shell=True)

def tmp_dir(self):
def _get_random_name():
directory = self._tmp_dir + '/pipeline.{}/'.format(get_sha(8))

if os.path.isdir(directory):
return _get_random_name()
else:
os.makedirs(directory, exist_ok=True)
return directory

return _get_random_name()

def copy(self, src, dest): # FIXME: symbolic links? support gsutil?
return f"cp {src} {dest}"
120 changes: 120 additions & 0 deletions pipeline/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import re

from .backend import LocalBackend
from .task import Task
from .resource import Resource, ResourceGroup
from .utils import get_sha


class Pipeline:
_counter = 0
_uid_prefix = "__PIPELINE__"
_regex_pattern = r"(?P<PIPELINE>{}\d+)".format(_uid_prefix)

@classmethod
def _get_uid(cls):
uid = "{}{}".format(cls._uid_prefix, cls._counter)
cls._counter += 1
return uid

def __init__(self, backend=None):
self._tasks = []
self._resource_map = {}
self._allocated_files = set()
self._backend = backend if backend else LocalBackend()
self._uid = Pipeline._get_uid()

def new_task(self):
t = Task(pipeline=self)
self._tasks.append(t)
return t

def _tmp_file(self, prefix=None, suffix=None):
def _get_random_file():
file = '{}{}{}'.format(prefix if prefix else '',
get_sha(8),
suffix if suffix else '')
if file not in self._allocated_files:
self._allocated_files.add(file)
return file
else:
return _get_random_file()

return _get_random_file()

def _new_resource(self, source=None, value=None):
r = Resource(source, value if value else self._tmp_file())
self._resource_map[r._uid] = r
return r

def _new_resource_group(self, source, mappings):
assert isinstance(mappings, dict)
root = self._tmp_file()
d = {}
new_resource_map = {}
for name, code in mappings.items():
if not isinstance(code, str):
raise ValueError(f"value for name '{name}' is not a string. Found '{type(code)}' instead.")
r = self._new_resource(source=source, value=eval(f'f"""{code}"""')) # pylint: disable=W0123
d[name] = r
new_resource_map[r._uid] = r

self._resource_map.update(new_resource_map)
rg = ResourceGroup(root, **d)
self._resource_map.update({rg._uid: rg})
return rg

def _read_input(self, source, dest=None):
dest = dest if dest else self._tmp_file()
cp_task = (self.new_task()
.label('read_input')
.command(self._backend.copy(source, dest)))
return self._new_resource(source=cp_task, value=dest)

def read_input(self, source):
return str(self._read_input(source))

def read_input_group(self, **kwargs):
root = self._tmp_file()
added_resources = {name: self._read_input(file, root + '.' + name) for name, file in kwargs.items()}
rg = ResourceGroup(root, **added_resources)
self._resource_map.update({rg._uid: rg})
return rg

def write_output(self, resource, dest):
if isinstance(resource, str):
resource = self._resource_map[resource]
else:
assert isinstance(resource, Resource)
assert resource._uid in self._resource_map
(self.new_task()
.label('write_output')
.command(self._backend.copy(resource, dest)))

def select_tasks(self, pattern):
return [task for task in self._tasks if task._label is not None and re.match(pattern, task._label) is not None]

def run(self, dry_run=False, verbose=False, delete_on_exit=True):
dependencies = {task: task._dependencies for task in self._tasks}
ordered_tasks = []
niter = 0
while dependencies:
for task, deps in dependencies.items():
if not deps:
ordered_tasks.append(task)
niter = 0
for task, _ in dependencies.items():
dependencies[task] = dependencies[task].difference(set(ordered_tasks))
for task in ordered_tasks:
if task in dependencies:
del dependencies[task]
niter += 1

if niter == 100:
raise ValueError("cycle detected in dependency graph")

self._tasks = ordered_tasks
self._backend.run(self, dry_run, verbose, False, delete_on_exit) # FIXME: expose bg option when implemented!

def __str__(self):
return self._uid
22 changes: 22 additions & 0 deletions pipeline/pipeline/pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[MESSAGES CONTROL]
# C0111 Missing docstring, W1203 logging fstring interpolation, C0111 missing
# doc string, R0913 too many arguments, W0622 redefining built in, W0212
# protected member access, W0621 redefining name from outer scope, R0914 too
# many local variables, W0603 using the global statement, R0902 too many
# instance attributes, R1705 unnecessary "else" after "return", W0511 fixme,
# R0903 Too few public methods, R0401 Cyclic import, R0801 Similar lines

disable=C0111,W1203,W1202,C0111,R0913,W0622,W0212,W0621,R0914,W0603,R0902,R1705,W0511,R0903,R0401,R0801

[FORMAT]
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
indent-string=' '

attr-rgx=[a-z_][a-z0-9_]{0,30}|id$
argument-rgx=[a-z_][a-z0-9_]{0,30}|id$
variable-rgx=[a-z_][a-z0-9_]{0,30}|id|f$
const-rgx=(([a-zA-Z_][a-zA-Z0-9_]*)|(__.*__))$

# Maximum number of characters on a single line.
max-line-length=120
66 changes: 66 additions & 0 deletions pipeline/pipeline/resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
class Resource:
_counter = 0
_uid_prefix = "__RESOURCE__"
_regex_pattern = r"(?P<RESOURCE>{}\d+)".format(_uid_prefix)

@classmethod
def _new_uid(cls):
uid = "{}{}".format(cls._uid_prefix, cls._counter)
cls._counter += 1
return uid

def __init__(self, source=None, value=None):
from .task import Task
assert isinstance(source, Task) or source is None
assert value is None or isinstance(value, str)
self._value = value
self._source = source
self._uid = Resource._new_uid()

def __str__(self):
return self._uid


class ResourceGroup:
_counter = 0
_uid_prefix = "__RESOURCE_GROUP__"
_regex_pattern = r"(?P<RESOURCE_GROUP>{}\d+)".format(_uid_prefix)

@classmethod
def _new_uid(cls):
uid = "{}{}".format(cls._uid_prefix, cls._counter)
cls._counter += 1
return uid

def __init__(self, root, **values):
self._namespace = {}
self._root = root
self._uid = ResourceGroup._new_uid()

for name, resource in values.items():
assert isinstance(resource, Resource)
self._namespace[name] = resource

def _get_resource(self, item):
if item not in self._namespace:
raise KeyError(f"'{item}' not found in the resource group. "
f"Hint: you must declare each attribute when constructing the resource group.")
r = self._namespace[item]
return r._uid

def __getitem__(self, item):
return self._get_resource(item)

def __getattr__(self, item):
return self._get_resource(item)

def __add__(self, other):
assert isinstance(other, str)
return str(self._uid) + other

def __radd__(self, other):
assert isinstance(other, str)
return other + str(self._uid)

def __str__(self):
return self._uid
Loading

0 comments on commit c883545

Please sign in to comment.