diff --git a/README.md b/README.md
index 8ea872428fc..12407210dc9 100644
--- a/README.md
+++ b/README.md
@@ -91,4 +91,4 @@ open-source by providing free hosting, and YourKit, LLC for generously providing
free licenses for YourKit Java
Profiler for open-source development.
-
\ No newline at end of file
+
diff --git a/batch/.dockerignore b/batch/.dockerignore
new file mode 100644
index 00000000000..fc9e6ee0cfc
--- /dev/null
+++ b/batch/.dockerignore
@@ -0,0 +1,5 @@
+batch.log
+logs
+*~
+*.pyc
+**/__pycache__
diff --git a/batch/.gitignore b/batch/.gitignore
new file mode 100644
index 00000000000..fc9e6ee0cfc
--- /dev/null
+++ b/batch/.gitignore
@@ -0,0 +1,5 @@
+batch.log
+logs
+*~
+*.pyc
+**/__pycache__
diff --git a/batch/Dockerfile b/batch/Dockerfile
new file mode 100644
index 00000000000..ee230fa0f7c
--- /dev/null
+++ b/batch/Dockerfile
@@ -0,0 +1,14 @@
+FROM alpine:3.8
+
+RUN apk update
+RUN apk add python3 py3-cffi py3-cryptography
+RUN pip3 install -U pip
+RUN pip install flask
+RUN pip install kubernetes
+RUN pip install cerberus
+
+COPY batch /batch
+
+EXPOSE 5000
+
+CMD ["python3", "/batch/server.py"]
diff --git a/batch/Dockerfile.test b/batch/Dockerfile.test
new file mode 100644
index 00000000000..75248202684
--- /dev/null
+++ b/batch/Dockerfile.test
@@ -0,0 +1,13 @@
+FROM alpine:3.8
+
+RUN apk update
+RUN apk add python3 # python3=3.6.4-r1
+RUN pip3 install -U pip
+RUN pip install flask
+RUN pip install kubernetes
+RUN pip install cerberus
+
+COPY batch /batch
+COPY test /test
+
+CMD ["python3", "-m", "unittest", "/test/test_batch.py"]
diff --git a/batch/Makefile b/batch/Makefile
new file mode 100644
index 00000000000..eafa6bbbfc5
--- /dev/null
+++ b/batch/Makefile
@@ -0,0 +1,40 @@
+.PHONY: hail-ci-build-image push-hail-ci-build-image
+
+hail-ci-build-image:
+ docker build -t batch-pr-builder -f Dockerfile.pr-builder .
+ echo "gcr.io/broad-ctsa/batch-pr-builder:`docker images -q --no-trunc batch-pr-builder | sed -e 's,[^:]*:,,'`" > ../hail-ci-build-image
+ docker tag batch-pr-builder `cat ../hail-ci-build-image`
+
+push-hail-ci-build-image: hail-ci-build-image
+ docker push `cat ../hail-ci-build-image`
+
+build: build-batch build-batch-test
+
+build-batch:
+ docker build -t batch .
+
+build-batch-test:
+ docker build -t batch-test -f Dockerfile.test .
+
+push: push-batch push-batch-test
+
+push-batch: IMAGE="gcr.io/broad-ctsa/batch:$(shell docker images -q --no-trunc batch | sed -e 's,[^:]*:,,')"
+push-batch: build-batch
+ echo $(IMAGE) > batch-image
+ docker tag batch $(IMAGE)
+ docker push $(IMAGE)
+
+push-batch-test: IMAGE="gcr.io/broad-ctsa/batch-test:$(shell docker images -q --no-trunc batch-test | sed -e 's,[^:]*:,,')"
+push-batch-test: build-batch-test
+ echo $(IMAGE) > batch-test-image
+ docker tag batch $(IMAGE)
+ docker push $(IMAGE)
+
+run-docker:
+ docker run -e BATCH_USE_KUBE_CONFIG=1 -i -v $(HOME)/.kube:/root/.kube -p 5000:5000 -t batch
+
+run:
+ BATCH_USE_KUBE_CONFIG=1 python batch/server.py
+
+test-local:
+ POD_IP='127.0.0.1' BATCH_URL='http://127.0.0.1:5000' python -m unittest -v test/test_batch.py
diff --git a/batch/README.md b/batch/README.md
new file mode 100644
index 00000000000..5877a69567e
--- /dev/null
+++ b/batch/README.md
@@ -0,0 +1,196 @@
+Getting Started
+---
+
+Start a `minikube` k8s cluster and configure your `kubectl` to point at that k8s
+cluster:
+
+```
+minikube start
+```
+
+If you get a weird minikube error, try
+
+```
+minikube delete
+rm -rf ~/.minikube
+brew cask reinstall minikube # or equivalent on your OS
+minikube start
+```
+
+When you want to return to using a google k8s cluster, you can run this:
+
+```
+gcloud container clusters get-credentials CLUSTER_NAME
+```
+
+Set some environment variables so that docker images are placed in the
+`minikube` cluster's docker registry:
+
+```
+eval $(minikube docker-env)
+```
+
+Build the batch and test image
+
+```
+make build-batch build-test
+```
+
+edit the `deployment.yaml` so that the container named `batch` has
+`imagePullPolicy: Never`. This ensures that k8s does not go look for the image
+in the Google Container Registry and instead uses the local image cache (which
+you just updated when you ran `make build-batch build-test`).
+
+Give way too many privileges to the default service account so that `batch` can
+start new pods:
+
+```
+kubectl create clusterrolebinding \
+ cluster-admin-default \
+ --clusterrole cluster-admin \
+ --serviceaccount=default:default
+```
+
+Create a batch service:
+
+```
+kubectl create -f deployment.yaml
+```
+
+If you ever need to shutdown the service, execute:
+
+```
+kubectl delete -f deployment.yaml
+```
+
+Look for the newly created batch pod:
+
+```
+kubectl get pods
+```
+
+And create a port forward from the k8s cluster to your local machine (this works
+for clusters in GKE too):
+
+```
+kubectl port-forward POD_NAME 5000:5000
+```
+
+The former port is the local one and the latter port is the remote one (i.e. in
+the k8s pod). Now you can load the conda environment for testing and run the
+tests against this deployment:
+
+```
+conda env create -f environment.yaml
+conda activate hail-batch
+make test-local
+```
+
+
+
+---
+
+Kubernetes [Python client](https://github.com/kubernetes-client/python/blob/master/kubernetes/README.md)
+ - [V1Pod](https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Pod.md)
+ - [create_namespaced_pod](https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#create_namespaced_pod)
+ - [delete_namespaced_pod](https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#delete_namespaced_pod)
+ -
+
+To get kubectl credentials for a GKE cluster:
+
+```
+$ gcloud container clusters get-credentials
+```
+
+To authorize docker to push to GCR:
+
+```
+$ gcloud auth configure-docker
+```
+
+To run batch locally, using the local kube credentials:
+
+```
+$ docker run -i -v $HOME/.kube:/root/.kube -p 5000:5000 -t batch
+```
+
+On OSX, the port will be accessible on the docker-machine:
+
+```
+$(docker-machine ip default):5000
+```
+
+Get a shell in a running pod:
+
+```
+$ kubectl exec -it -- /bin/sh
+```
+
+Hit a Flask REST endpoint with Curl:
+
+```
+$ curl -X POST -H "Content-Type: application/json" -d
+$ curl -X POST -H "Content-Type: application/json" -d '{"name": "batchtest", "image": "gcr.io/broad-ctsa/true"}' batch/jobs/create
+```
+
+Give default:default serviceaccount cluster-admin privileges:
+
+```
+$ kubectl create clusterrolebinding cluster-admin-default --clusterrole cluster-admin --serviceaccount=default:default
+```
+
+Run an image in a new pod:
+
+```
+$ kubectl run --restart=Never --image --
+```
+
+For example, run a shell in an new pod:
+
+```
+$ kubectl run -i --tty apline --image=alpine --restart=Never -- sh
+```
+
+Forward from a local port to a port on pod:
+
+```
+$ kubectl port-forward jupyter-deployment-5f54cff675-msr85 8888:8888 # :
+```
+
+Run container with a given hostname:
+
+$ docker run -d --rm --name spark-m -h spark-m -p 8080:8080 -p 7077:7077 spark-m
+
+List all containers, included stopped containers:
+
+$ docker ps -a
+
+Remove all stopped containers:
+
+$ docker ps -aq --no-trunc -f status=exited | xargs docker rm
+
+Run a docker container linked to another:
+
+$ docker run -d --rm --cpus 0.5 --name spark-w-0 --link spark-m spark-w -c 1 -m 2g
+
+Get IP of container:
+
+$ docker inspect | grep IPAddress
+
+---
+
+The following will set some environment variables so that future invocations of
+`docker build` will make images available to the minikube cluster. This allows
+you to test images without pushing them to a remote container registry.
+
+```
+eval $(minikube docker-env)
+make build-batch build-test
+```
+
+NB: you must also set the `imagePullPolicy` of any `container` you `kubectl
+create` to `Never` if you're using the `:latest` image tag (which is implicitly
+used if no tag is specified on the image name). Otherwise, k8s will always try
+to check if there is a newer version of the image. Even if `imagePullPolicy`
+is set to `NotIfPresent`, k8s will still check for a newer image if you use the
+`:latest` tag.
diff --git a/batch/batch/__init__.py b/batch/batch/__init__.py
new file mode 100644
index 00000000000..770048111af
--- /dev/null
+++ b/batch/batch/__init__.py
@@ -0,0 +1,2 @@
+import batch.client
+import batch.api
diff --git a/batch/batch/api.py b/batch/batch/api.py
new file mode 100644
index 00000000000..4f118e31338
--- /dev/null
+++ b/batch/batch/api.py
@@ -0,0 +1,64 @@
+import json
+import time
+import random
+import requests
+
+def create_job(url, spec, attributes, batch_id, callback):
+ d = {'spec': spec}
+ if attributes:
+ d['attributes'] = attributes
+ if batch_id:
+ d['batch_id'] = batch_id
+ if callback:
+ d['callback'] = callback
+
+ r = requests.post(url + '/jobs/create', json = d)
+ r.raise_for_status()
+ return r.json()
+
+def list_jobs(url):
+ r = requests.get(url + '/jobs')
+ r.raise_for_status()
+ return r.json()
+
+def get_job(url, job_id):
+ r = requests.get(url + '/jobs/{}'.format(job_id))
+ r.raise_for_status()
+ return r.json()
+
+def get_job_log(url, job_id):
+ r = requests.get(url + '/jobs/{}/log'.format(job_id))
+ r.raise_for_status()
+ return r.text
+
+def delete_job(url, job_id):
+ r = requests.delete(url + '/jobs/{}/delete'.format(job_id))
+ r.raise_for_status()
+ return r.json()
+
+def cancel_job(url, job_id):
+ r = requests.post(url + '/jobs/{}/cancel'.format(job_id))
+ r.raise_for_status()
+ return r.json()
+
+def create_batch(url, attributes):
+ d = {}
+ if attributes:
+ d['attributes'] = attributes
+ r = requests.post(url + '/batches/create', json = d)
+ r.raise_for_status()
+ return r.json()
+
+def get_batch(url, batch_id):
+ r = requests.get(url + '/batches/{}'.format(batch_id))
+ r.raise_for_status()
+ return r.json()
+
+def delete_batch(url, batch_id):
+ r = requests.delete(url + '/batches/{}'.format(batch_id))
+ r.raise_for_status()
+ return r.json()
+
+def refresh_k8s_state(url):
+ r = requests.post(url + '/refresh_k8s_state')
+ r.raise_for_status()
diff --git a/batch/batch/client.py b/batch/batch/client.py
new file mode 100644
index 00000000000..3dfcc30babf
--- /dev/null
+++ b/batch/batch/client.py
@@ -0,0 +1,177 @@
+import json
+import time
+import random
+import requests
+import batch.api as api
+
+class Job(object):
+ def __init__(self, client, id, attributes=None, _status = None):
+ if attributes is None:
+ attributes = {}
+
+ self.client = client
+ self.id = id
+ self.attributes = attributes
+ self._status = _status
+
+ def is_complete(self):
+ if self._status:
+ state = self._status['state']
+ if state == 'Complete' or state == 'Cancelled':
+ return True
+ return False
+
+ def cached_status(self):
+ assert self._status != None
+ return self._status
+
+ def status(self):
+ self._status = self.client._get_job(self.id)
+ return self._status
+
+ def wait(self):
+ i = 0
+ while True:
+ self.status() # update
+ if self.is_complete():
+ return self._status
+ j = random.randrange(2 ** i)
+ time.sleep(0.100 * j)
+ # max 5.12s
+ if i < 9:
+ i = i + 1
+
+ def cancel(self):
+ self.client._cancel_job(self.id)
+
+ def delete(self):
+ self.client._delete_job(self.id)
+
+ self.id = None
+ self.attributes = None
+ self._status = None
+
+ def log(self):
+ return self.client._get_job_log(self.id)
+
+class Batch(object):
+ def __init__(self, client, id):
+ self.client = client
+ self.id = id
+
+ def create_job(self, image, command=None, args=None, env=None, ports=None,
+ resources=None, tolerations=None, volumes=None, attributes=None, callback=None):
+ return self.client._create_job(image, command, args, env, ports, resources, tolerations, volumes, attributes, self.id, callback)
+
+ def status(self):
+ return self.client._get_batch(self.id)
+
+ def wait(self):
+ i = 0
+ while True:
+ status = self.status()
+ if status['jobs']['Created'] == 0:
+ return status
+ j = random.randrange(2 ** i)
+ time.sleep(0.100 * j)
+ # max 5.12s
+ if i < 9:
+ i = i + 1
+
+class BatchClient(object):
+ def __init__(self, url=None):
+ if not url:
+ url = 'http://batch'
+ self.url = url
+
+ def _create_job(self, image, command, args, env, ports, resources, tolerations, volumes, attributes, batch_id, callback):
+ if env:
+ env = [{'name': k, 'value': v} for (k, v) in env.items()]
+ else:
+ env = []
+ env.extend([{
+ 'name': 'POD_IP',
+ 'valueFrom': {
+ 'fieldRef': {'fieldPath': 'status.podIP'}
+ }
+ }, {
+ 'name': 'POD_NAME',
+ 'valueFrom': {
+ 'fieldRef': {'fieldPath': 'metadata.name'}
+ }
+ }])
+
+ container = {
+ 'image': image,
+ 'name': 'default'
+ }
+ if command:
+ container['command'] = command
+ if args:
+ container['args'] = args
+ if env:
+ container['env'] = env
+ if ports:
+ container['ports'] = [{
+ 'containerPort': p,
+ 'protocol': 'TCP'
+ } for p in ports]
+ if resources:
+ container['resources'] = resources
+ if volumes:
+ container['volumeMounts'] = [v['volume_mount'] for v in volumes]
+ spec = {
+ 'containers': [container],
+ 'restartPolicy': 'Never'
+ }
+ if volumes:
+ spec['volumes'] = [v['volume'] for v in volumes]
+ if tolerations:
+ spec['tolerations'] = tolerations
+
+ j = api.create_job(self.url, spec, attributes, batch_id, callback)
+ return Job(self, j['id'], j.get('attributes'))
+
+ def _get_job(self, id):
+ return api.get_job(self.url, id)
+
+ def _get_job_log(self, id):
+ return api.get_job_log(self.url, id)
+
+ def _delete_job(self, id):
+ api.delete_job(self.url, id)
+
+ def _cancel_job(self, id):
+ api.cancel_job(self.url, id)
+
+ def _get_batch(self, batch_id):
+ return api.get_batch(self.url, batch_id)
+
+ def _refresh_k8s_state(self):
+ api.refresh_k8s_state(self.url)
+
+ def list_jobs(self):
+ jobs = api.list_jobs(self.url)
+ return [Job(self, j['id'], j.get('attributes'), j) for j in jobs]
+
+ def get_job(self, id):
+ # make sure job exists
+ j = api.get_job(self.url, id)
+ return Job(self, j['id'], j.get('attributes'), j)
+
+ def create_job(self,
+ image,
+ command=None,
+ args=None,
+ env=None,
+ ports=None,
+ resources=None,
+ tolerations=None,
+ volumes=None,
+ attributes=None,
+ callback=None):
+ return self._create_job(image, command, args, env, ports, resources, tolerations, volumes, attributes, None, callback)
+
+ def create_batch(self, attributes=None):
+ b = api.create_batch(self.url, attributes)
+ return Batch(self, b['id'])
diff --git a/batch/batch/server.py b/batch/batch/server.py
new file mode 100644
index 00000000000..e0fe31440b2
--- /dev/null
+++ b/batch/batch/server.py
@@ -0,0 +1,465 @@
+import sys
+import os
+import time
+import random
+import uuid
+from collections import Counter
+import logging
+import threading
+from flask import Flask, request, jsonify, abort, url_for
+import kubernetes as kube
+import cerberus
+import requests
+
+if not os.path.exists('logs'):
+ os.mkdir('logs')
+else:
+ if not os.path.isdir('logs'):
+ raise OSError('logs exists but is not a directory')
+
+fmt = logging.Formatter(
+ # NB: no space after levename because WARNING is so long
+ '%(levelname)s\t| %(asctime)s \t| %(filename)s \t| %(funcName)s:%(lineno)d | '
+ '%(message)s')
+
+fh = logging.FileHandler('batch.log')
+fh.setLevel(logging.INFO)
+fh.setFormatter(fmt)
+
+ch = logging.StreamHandler()
+ch.setLevel(logging.INFO)
+ch.setFormatter(fmt)
+
+log = logging.getLogger('batch')
+log.setLevel(logging.INFO)
+
+logging.basicConfig(
+ handlers=[fh, ch],
+ level=logging.INFO)
+
+REFRESH_INTERVAL_IN_SECONDS = int(os.environ.get('REFRESH_INTERVAL_IN_SECONDS', 5 * 60))
+
+log.info(f'REFRESH_INTERVAL_IN_SECONDS {REFRESH_INTERVAL_IN_SECONDS}')
+
+if 'BATCH_USE_KUBE_CONFIG' in os.environ:
+ kube.config.load_kube_config()
+else:
+ kube.config.load_incluster_config()
+v1 = kube.client.CoreV1Api()
+
+instance_id = uuid.uuid4().hex
+log.info(f'instance_id = {instance_id}')
+
+counter = 0
+def next_id():
+ global counter
+
+ counter = counter + 1
+ return counter
+
+pod_name_job = {}
+job_id_job = {}
+
+def _log_path(id):
+ return f'logs/job-{id}.log'
+
+def _read_file(p):
+ with open(p, 'r') as f:
+ return f.read()
+
+class Job(object):
+ def _create_pod(self):
+ assert not self._pod_name
+
+ pod = v1.create_namespaced_pod('default', self.pod_template)
+ self._pod_name = pod.metadata.name
+ pod_name_job[self._pod_name] = self
+
+ log.info('created pod name: {} for job {}'.format(self._pod_name, self.id))
+
+ def _delete_pod(self):
+ if self._pod_name:
+ try:
+ v1.delete_namespaced_pod(self._pod_name, 'default', kube.client.V1DeleteOptions())
+ except kube.client.rest.ApiException as e:
+ if e.status == 404:
+ pass
+ else:
+ raise
+ del pod_name_job[self._pod_name]
+ self._pod_name = None
+
+ def _read_log(self):
+ if self._state == 'Created':
+ if self._pod_name:
+ try:
+ return v1.read_namespaced_pod_log(self._pod_name, 'default')
+ except:
+ pass
+ elif self._state == 'Complete':
+ p = _log_path(self.id)
+ return _read_file(p)
+ else:
+ assert self._state == 'Cancelled'
+ return None
+
+ def __init__(self, pod_spec, batch_id, attributes, callback):
+ self.id = next_id()
+ job_id_job[self.id] = self
+
+ self.batch_id = batch_id
+ if batch_id:
+ batch = batch_id_batch[batch_id]
+ batch.jobs.append(self)
+
+ self.attributes = attributes
+ self.callback = callback
+
+ self.pod_template = kube.client.V1Pod(
+ metadata = kube.client.V1ObjectMeta(generate_name = 'job-{}-'.format(self.id),
+ labels = {
+ 'app': 'batch-job',
+ 'hail.is/batch-instance': instance_id
+ }),
+ spec = pod_spec)
+
+ self._pod_name = None
+
+ self._state = 'Created'
+ log.info('created job {}'.format(self.id))
+
+ self._create_pod()
+
+ def set_state(self, new_state):
+ if self._state != new_state:
+ log.info('job {} changed state: {} -> {}'.format(
+ self.id,
+ self._state,
+ new_state))
+ self._state = new_state
+
+ def cancel(self):
+ if self.is_complete():
+ return
+ self._delete_pod()
+ self.set_state('Cancelled')
+
+ def delete(self):
+ # remove from structures
+ del job_id_job[self.id]
+ if self.batch_id:
+ batch = batch_id_batch[batch_id]
+ batch.remove(self)
+
+ self._delete_pod()
+
+ def is_complete(self):
+ return self._state == 'Complete' or self._state == 'Cancelled'
+
+ def mark_unscheduled(self):
+ if self._pod_name:
+ del pod_name_job[self._pod_name]
+ self._pod_name = None
+ self._create_pod()
+
+ def mark_complete(self, pod):
+ self.exit_code = pod.status.container_statuses[0].state.terminated.exit_code
+
+ pod_log = v1.read_namespaced_pod_log(pod.metadata.name, 'default')
+ p = _log_path(self.id)
+ with open(p, 'w') as f:
+ f.write(pod_log)
+ log.info(f'wrote log for job {self.id} to {p}')
+
+ if self._pod_name:
+ del pod_name_job[self._pod_name]
+ self._pod_name = None
+
+ self.set_state('Complete')
+
+ log.info('job {} complete, exit_code {}'.format(
+ self.id, self.exit_code))
+
+ if self.callback:
+ def f(id, callback, json):
+ try:
+ requests.post(callback, json = json, timeout=120)
+ except requests.exceptions.RequestException as re:
+ log.warn(f'callback for job {id} failed due to an error, I will not retry. Error: {re}')
+
+ threading.Thread(target=f, args=(self.id, self.callback, self.to_json())).start()
+
+ def to_json(self):
+ result = {
+ 'id': self.id,
+ 'state': self._state
+ }
+ if self._state == 'Complete':
+ result['exit_code'] = self.exit_code
+ pod_log = self._read_log()
+ if pod_log:
+ result['log'] = pod_log
+ if self.attributes:
+ result['attributes'] = self.attributes
+ return result
+
+app = Flask('batch')
+
+@app.route('/jobs/create', methods=['POST'])
+def create_job():
+ parameters = request.json
+
+ schema = {
+ # will be validated when creating pod
+ 'spec': {'type': 'dict',
+ 'required': True,
+ 'allow_unknown': True,
+ 'schema': {}
+ },
+ 'batch_id': {'type': 'integer'},
+ 'attributes': {
+ 'type': 'dict',
+ 'keyschema': {'type': 'string'},
+ 'valueschema': {'type': 'string'}
+ },
+ 'callback': {'type': 'string'}
+ }
+ v = cerberus.Validator(schema)
+ if (not v.validate(parameters)):
+ # print(v.errors)
+ abort(404, 'invalid request: {}'.format(v.errors))
+
+ pod_spec = v1.api_client._ApiClient__deserialize(
+ parameters['spec'], kube.client.V1PodSpec)
+
+ batch_id = parameters.get('batch_id')
+ if batch_id:
+ if batch_id not in batch_id_batch:
+ abort(404, 'valid request: batch_id {} not found'.format(batch_id))
+
+ job = Job(
+ pod_spec, batch_id, parameters.get('attributes'), parameters.get('callback'))
+ return jsonify(job.to_json())
+
+@app.route('/jobs', methods=['GET'])
+def get_job_list():
+ return jsonify([job.to_json() for _, job in job_id_job.items()])
+
+@app.route('/jobs/', methods=['GET'])
+def get_job(job_id):
+ job = job_id_job.get(job_id)
+ if not job:
+ abort(404)
+ return jsonify(job.to_json())
+
+@app.route('/jobs//log', methods=['GET'])
+def get_job_log(job_id):
+ if job_id > counter:
+ abort(404)
+
+ job = job_id_job.get(job_id)
+ if job:
+ job_log = job._read_log()
+ if job_log:
+ return job_log
+ else:
+ p = _log_path(job_id)
+ if os.path.exists(p):
+ return _read_file(p)
+
+ abort(404)
+
+@app.route('/jobs//delete', methods=['DELETE'])
+def delete_job(job_id):
+ job = job_id_job.get(job_id)
+ if not job:
+ abort(404)
+ job.delete()
+ return jsonify({})
+
+@app.route('/jobs//cancel', methods=['POST'])
+def cancel_job(job_id):
+ job = job_id_job.get(job_id)
+ if not job:
+ abort(404)
+ job.cancel()
+ return jsonify({})
+
+batch_id_batch = {}
+
+class Batch(object):
+ def __init__(self, attributes):
+ self.attributes = attributes
+ self.id = next_id()
+ batch_id_batch[self.id] = self
+ self.jobs = []
+
+ def delete(self):
+ del batch_id_batch[self.id]
+ for j in self.jobs:
+ assert j.batch_id == self.id
+ j.batch_id = None
+
+ def to_json(self):
+ state_count = Counter([j._state for j in self.jobs])
+ return {
+ 'id': self.id,
+ 'jobs': {
+ 'Created': state_count.get('Created', 0),
+ 'Complete': state_count.get('Complete', 0),
+ 'Cancelled': state_count.get('Cancelled', 0)
+ },
+ 'attributes': self.attributes
+ }
+
+@app.route('/batches/create', methods=['POST'])
+def create_batch():
+ parameters = request.json
+
+ schema = {
+ 'attributes': {
+ 'type': 'dict',
+ 'keyschema': {'type': 'string'},
+ 'valueschema': {'type': 'string'}
+ }
+ }
+ v = cerberus.Validator(schema)
+ if (not v.validate(parameters)):
+ abort(404, 'invalid request: {}'.format(v.errors))
+
+ batch = Batch(parameters.get('attributes'))
+ return jsonify(batch.to_json())
+
+@app.route('/batches/', methods=['GET'])
+def get_batch(batch_id):
+ batch = batch_id_batch.get(batch_id)
+ if not batch:
+ abort(404)
+ return jsonify(batch.to_json())
+
+@app.route('/batches//delete', methods=['DELETE'])
+def delete_batch(batch_id):
+ batch = batch_id_batch.get(batch_id)
+ if not batch:
+ abort(404)
+ batch.delete()
+ return jsonify({})
+
+def update_job_with_pod(job, pod):
+ if pod:
+ if pod.status.container_statuses:
+ assert len(pod.status.container_statuses) == 1
+ container_status = pod.status.container_statuses[0]
+ assert container_status.name == 'default'
+
+ if container_status.state and container_status.state.terminated:
+ job.mark_complete(pod)
+ else:
+ job.mark_unscheduled()
+
+@app.route('/pod_changed', methods=['POST'])
+def pod_changed():
+ parameters = request.json
+
+ pod_name = parameters['pod_name']
+
+ job = pod_name_job.get(pod_name)
+ if job and not job.is_complete():
+ try:
+ pod = v1.read_namespaced_pod(pod_name, 'default')
+ except kube.client.rest.ApiException as e:
+ if e.status == 404:
+ pod = None
+ else:
+ raise
+
+ update_job_with_pod(job, pod)
+
+ return '', 204
+
+@app.route('/refresh_k8s_state', methods=['POST'])
+def refresh_k8s_state():
+ log.info('started k8s state refresh')
+
+ pods = v1.list_namespaced_pod(
+ 'default',
+ label_selector=f'app=batch-job,hail.is/batch-instance={instance_id}')
+
+ seen_pods = set()
+ for pod in pods.items:
+ pod_name = pod.metadata.name
+ seen_pods.add(pod_name)
+
+ job = pod_name_job.get(pod_name)
+ if job and not job.is_complete():
+ update_job_with_pod(job, pod)
+
+ for pod_name, job in pod_name_job.items():
+ if pod_name not in seen_pods:
+ update_job_with_pod(job, None)
+
+ log.info('k8s state refresh complete')
+
+ return '', 204
+
+def run_forever(target, *args, **kwargs):
+ # target should be a function
+ target_name = target.__name__
+
+ expected_retry_interval_ms = 15 * 1000 # 15s
+ while True:
+ start = time.time()
+ try:
+ log.info(f'run_forever: run target {target_name}')
+ target(*args, **kwargs)
+ log.info(f'run_forever: target {target_name} returned')
+ except:
+ log.error(f'run_forever: target {target_name} threw exception', exc_info=sys.exc_info())
+ end = time.time()
+
+ run_time_ms = int((end - start) * 1000 + 0.5)
+ t = random.randrange(expected_retry_interval_ms * 2) - run_time_ms
+ if t > 0:
+ log.debug(f'run_forever: {target_name}: sleep {t}ms')
+ time.sleep(t / 1000.0)
+
+def flask_event_loop():
+ app.run(threaded=False, host='0.0.0.0')
+
+def kube_event_loop():
+ w = kube.watch.Watch()
+ stream = w.stream(
+ v1.list_namespaced_pod,
+ 'default',
+ label_selector=f'app=batch-job,hail.is/batch-instance={instance_id}')
+ for event in stream:
+ pod = event['object']
+ name = pod.metadata.name
+ requests.post('http://127.0.0.1:5000/pod_changed', json={'pod_name': name}, timeout=120)
+
+def polling_event_loop():
+ time.sleep(1)
+ while True:
+ try:
+ r = requests.post('http://127.0.0.1:5000/refresh_k8s_state', timeout=120)
+ r.raise_for_status()
+ except requests.HTTPError as e:
+ log.error(f'Could not poll due to exception: {e}, text: {e.response.text}')
+ except Exception as e:
+ log.error(f'Could not poll due to exception: {e}')
+ pass
+ time.sleep(REFRESH_INTERVAL_IN_SECONDS)
+
+kube_thread = threading.Thread(target=run_forever, args=(kube_event_loop,))
+kube_thread.start()
+
+polling_thread = threading.Thread(target=run_forever, args=(polling_event_loop,))
+polling_thread.start()
+
+# debug/reloader must run in main thread
+# see: https://stackoverflow.com/questions/31264826/start-a-flask-application-in-separate-thread
+# flask_thread = threading.Thread(target=flask_event_loop)
+# flask_thread.start()
+run_forever(flask_event_loop)
+
+kube_thread.join()
diff --git a/batch/deployment.yaml.in b/batch/deployment.yaml.in
new file mode 100644
index 00000000000..3ea4aef7004
--- /dev/null
+++ b/batch/deployment.yaml.in
@@ -0,0 +1,36 @@
+apiVersion: apps/v1beta2
+kind: Deployment
+metadata:
+ name: batch-deployment
+ labels:
+ hail.is/sha: @sha@
+spec:
+ selector:
+ matchLabels:
+ app: batch
+ replicas: 1
+ template:
+ metadata:
+ labels:
+ app: batch
+ hail.is/sha: @sha@
+ spec:
+ containers:
+ - name: batch
+ image: @image@
+ ports:
+ - containerPort: 5000
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: batch
+ labels:
+ app: batch
+spec:
+ ports:
+ - port: 80
+ protocol: TCP
+ targetPort: 5000
+ selector:
+ app: batch
diff --git a/batch/hail-ci-build.sh b/batch/hail-ci-build.sh
new file mode 100644
index 00000000000..367868d33ec
--- /dev/null
+++ b/batch/hail-ci-build.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+set -ex
+
+source activate hail-batch
+
+# run the server in the background with in-cluster config
+python batch/server.py &
+
+sleep 5
+
+POD_IP='127.0.0.1' BATCH_URL='http://127.0.0.1:5000' python -m unittest test/test_batch.py
diff --git a/batch/hail-ci-deploy.sh b/batch/hail-ci-deploy.sh
new file mode 100644
index 00000000000..512ab98b96a
--- /dev/null
+++ b/batch/hail-ci-deploy.sh
@@ -0,0 +1,36 @@
+set -ex
+
+cd batch
+
+gcloud -q auth activate-service-account \
+ --key-file=/secrets/gcr-push-service-account-key.json
+
+gcloud -q auth configure-docker
+
+SHA=$(git rev-parse --short=12 HEAD)
+
+# get sha label of batch deployment
+DEPLOYED_SHA=$(kubectl get --selector=app=batch deployments -o "jsonpath={.items[*].metadata.labels.hail\.is/sha}")
+
+if [[ $(git cat-file -t "$DEPLOYED_SHA" 2>/dev/null || true) == commit ]]; then
+ if [[ "$SHA" == "$DEPLOYED_SHA" ]]; then
+ exit 0
+ fi
+
+ NEEDS_REDEPLOY=$(cd $ROOT && python3 project-changed.py $DEPLOYED_SHA batch)
+ if [[ $NEEDS_REDEPLOY = no ]]; then
+ exit 0
+ fi
+fi
+
+# requires docker
+make push-batch
+
+sed -e "s,@sha@,$SHA," \
+ -e "s,@image@,$(cat batch-image)," \
+ < deployment.yaml.in > deployment.yaml
+
+kubectl apply -f deployment.yaml
+
+# ci can't recover from batch restart yet
+kubectl delete pods -l app=hail-ci
diff --git a/batch/setup.py b/batch/setup.py
new file mode 100644
index 00000000000..6efa2acbb36
--- /dev/null
+++ b/batch/setup.py
@@ -0,0 +1,11 @@
+from setuptools import setup, find_packages
+
+setup(
+ name = 'batch',
+ version = '0.0.1',
+ url = 'https://github.com/hail-is/batch.git',
+ author = 'Hail Team',
+ author_email = 'hail@broadinstitute.org',
+ description = 'Job manager for k8s',
+ packages = find_packages()
+)
diff --git a/batch/test/__init__.py b/batch/test/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/batch/test/test_batch.py b/batch/test/test_batch.py
new file mode 100644
index 00000000000..2868f24c542
--- /dev/null
+++ b/batch/test/test_batch.py
@@ -0,0 +1,195 @@
+import threading
+import time
+import os
+import unittest
+import batch
+import requests
+from werkzeug.serving import make_server
+from flask import Flask, request, jsonify, url_for, Response
+
+class ServerThread(threading.Thread):
+ def __init__(self, app, host='127.0.0.1', port=5000):
+ super().__init__()
+
+ @app.route('/ping', methods=['GET'])
+ def ping():
+ return Response(status=200)
+
+ self.host = host
+ self.port = port
+ self.app = app
+ self.server = make_server(self.host, self.port, app)
+ self.context = app.app_context()
+ self.context.push()
+
+ def ping(self):
+ ping_url = 'http://{}:{}/ping'.format(self.host, self.port)
+
+ up = False
+ while not up:
+ try:
+ requests.get(ping_url)
+ up = True
+ except requests.exceptions.ConnectionError:
+ time.sleep(0.01)
+
+ def start(self):
+ super().start()
+ self.ping()
+
+ def run(self):
+ self.server.serve_forever()
+
+ def shutdown(self):
+ self.server.shutdown()
+
+class Test(unittest.TestCase):
+ def setUp(self):
+ self.batch = batch.client.BatchClient(
+ url = os.environ.get('BATCH_URL'))
+
+ self.ip = os.environ.get('POD_IP')
+ if not self.ip:
+ self.ip = '127.0.0.1'
+
+ def test_job(self):
+ j = self.batch.create_job('alpine', ['echo', 'test'])
+ status = j.wait()
+ self.assertTrue('attributes' not in status)
+ self.assertEqual(status['state'], 'Complete')
+ self.assertEqual(status['exit_code'], 0)
+
+ self.assertEqual(status['log'], 'test\n')
+ self.assertEqual(j.log(), 'test\n')
+
+ self.assertTrue(j.is_complete())
+
+ def test_attributes(self):
+ a = {
+ 'name': 'test_attributes',
+ 'foo': 'bar'
+ }
+ j = self.batch.create_job(
+ 'alpine', ['true'],
+ attributes = a)
+ status = j.status()
+ assert(status['attributes'] == a)
+
+ def test_fail(self):
+ j = self.batch.create_job('alpine', ['false'])
+ status = j.wait()
+ self.assertEqual(status['exit_code'], 1)
+
+ def test_deleted_job_log(self):
+ j = self.batch.create_job('alpine', ['echo', 'test'])
+ id = j.id
+ j.wait()
+ j.delete()
+ self.assertEqual(self.batch._get_job_log(id), 'test\n')
+
+ def test_delete_job(self):
+ j = self.batch.create_job('alpine', ['sleep', '30'])
+ id = j.id
+ j.delete()
+
+ # verify doesn't exist
+ try:
+ self.batch._get_job(id)
+ except requests.HTTPError as e:
+ if e.response.status_code == 404:
+ pass
+ else:
+ raise
+
+ def test_cancel_job(self):
+ j = self.batch.create_job('alpine', ['sleep', '30'])
+ status = j.status()
+ self.assertTrue(status['state'], 'Created')
+
+ j.cancel()
+
+ status = j.status()
+ self.assertTrue(status['state'], 'Cancelled')
+ self.assertTrue('log' not in status)
+
+ # cancelled job has no log
+ try:
+ j.log()
+ except requests.HTTPError as e:
+ if e.response.status_code == 404:
+ pass
+ else:
+ raise
+
+ def test_get_nonexistent_job(self):
+ try:
+ self.batch._get_job(666)
+ except requests.HTTPError as e:
+ if e.response.status_code == 404:
+ pass
+ else:
+ raise
+
+ def test_api_cancel_nonexistent_job(self):
+ try:
+ self.batch._cancel_job(666)
+ except requests.HTTPError as e:
+ if e.response.status_code == 404:
+ pass
+ else:
+ raise
+
+ def test_get_job(self):
+ j = self.batch.create_job('alpine', ['true'])
+ j2 = self.batch.get_job(j.id)
+ status2 = j2.status()
+ assert(status2['id'] == j.id)
+
+ def test_batch(self):
+ b = self.batch.create_batch()
+ j1 = b.create_job('alpine', ['false'])
+ j2 = b.create_job('alpine', ['sleep', '1'])
+ j3 = b.create_job('alpine', ['sleep', '30'])
+
+ # test list_jobs
+ jobs = self.batch.list_jobs()
+ self.assertTrue(
+ set([j.id for j in jobs]).issuperset([j1.id, j2.id, j3.id]))
+
+ # test refresh_k8s_state
+ self.batch._refresh_k8s_state()
+
+ j2.wait()
+ j3.cancel()
+ bstatus = b.wait()
+
+ n_cancelled = bstatus['jobs']['Cancelled']
+ n_complete = bstatus['jobs']['Complete']
+ self.assertTrue(n_cancelled <= 1)
+ self.assertTrue(n_cancelled + n_complete == 3)
+
+ def test_callback(self):
+ app = Flask('test-client')
+
+ d = {}
+
+ @app.route('/test', methods=['POST'])
+ def test():
+ d['status'] = request.get_json()
+ return Response(status=200)
+
+ port = 5869
+ server = ServerThread(app, host=self.ip, port=port)
+ server.start()
+
+ j = self.batch.create_job('alpine', ['echo', 'test'],
+ attributes={'foo': 'bar'},
+ callback='http://{}:{}/test'.format(self.ip, port))
+ j.wait()
+
+ status = d['status']
+ self.assertEqual(status['state'], 'Complete')
+ self.assertEqual(status['attributes'], {'foo': 'bar'})
+
+ server.shutdown()
+ server.join()
diff --git a/hail-ci-build.sh b/hail-ci-build.sh
index df5d33b239c..2d49ca1aeed 100755
--- a/hail-ci-build.sh
+++ b/hail-ci-build.sh
@@ -1,7 +1,15 @@
#!/bin/bash
set -ex
-HAIL_CHANGED=$(python3 project-changed.py target/$TARGET_BRANCH hail)
-if [[ $HAIL_CHANGED != no ]]; then
- (cd hail && /bin/bash hail-ci-build.sh)
-fi
+PROJECTS='hail batch ci site scorecard cloudtools'
+
+for project in $PROJECTS; do
+ CHANGED=$(python3 project-changed.py target/$TARGET_BRANCH $project)
+ if [[ $CHANGED != no ]]; then
+ if [[ -e $project/hail-ci-build.sh ]]; then
+ (cd $project && /bin/bash hail-ci-build.sh)
+ fi
+ fi
+done
+
+
diff --git a/hail-ci-deploy.sh b/hail-ci-deploy.sh
index 1a3cb5ebb89..846fb1126e6 100644
--- a/hail-ci-deploy.sh
+++ b/hail-ci-deploy.sh
@@ -2,3 +2,4 @@
set -ex
(cd hail && /bin/bash hail-ci-deploy.sh)
+(cd batch && /bin/bash hail-ci-deploy.sh)
diff --git a/project-changed.py b/project-changed.py
index 2552f382207..6823832c785 100644
--- a/project-changed.py
+++ b/project-changed.py
@@ -14,7 +14,8 @@
'batch': 'batch/',
'ci': 'ci/',
'site': 'site/',
- 'scorecard': 'scorecard/'
+ 'scorecard': 'scorecard/',
+ 'cloudtools': 'cloudtools/'
}
orig_hash = sys.argv[1]