diff --git a/README.md b/README.md index 8ea872428fc..12407210dc9 100644 --- a/README.md +++ b/README.md @@ -91,4 +91,4 @@ open-source by providing free hosting, and YourKit, LLC for generously providing free licenses for YourKit Java Profiler for open-source development. - \ No newline at end of file + diff --git a/batch/.dockerignore b/batch/.dockerignore new file mode 100644 index 00000000000..fc9e6ee0cfc --- /dev/null +++ b/batch/.dockerignore @@ -0,0 +1,5 @@ +batch.log +logs +*~ +*.pyc +**/__pycache__ diff --git a/batch/.gitignore b/batch/.gitignore new file mode 100644 index 00000000000..fc9e6ee0cfc --- /dev/null +++ b/batch/.gitignore @@ -0,0 +1,5 @@ +batch.log +logs +*~ +*.pyc +**/__pycache__ diff --git a/batch/Dockerfile b/batch/Dockerfile new file mode 100644 index 00000000000..ee230fa0f7c --- /dev/null +++ b/batch/Dockerfile @@ -0,0 +1,14 @@ +FROM alpine:3.8 + +RUN apk update +RUN apk add python3 py3-cffi py3-cryptography +RUN pip3 install -U pip +RUN pip install flask +RUN pip install kubernetes +RUN pip install cerberus + +COPY batch /batch + +EXPOSE 5000 + +CMD ["python3", "/batch/server.py"] diff --git a/batch/Dockerfile.test b/batch/Dockerfile.test new file mode 100644 index 00000000000..75248202684 --- /dev/null +++ b/batch/Dockerfile.test @@ -0,0 +1,13 @@ +FROM alpine:3.8 + +RUN apk update +RUN apk add python3 # python3=3.6.4-r1 +RUN pip3 install -U pip +RUN pip install flask +RUN pip install kubernetes +RUN pip install cerberus + +COPY batch /batch +COPY test /test + +CMD ["python3", "-m", "unittest", "/test/test_batch.py"] diff --git a/batch/Makefile b/batch/Makefile new file mode 100644 index 00000000000..eafa6bbbfc5 --- /dev/null +++ b/batch/Makefile @@ -0,0 +1,40 @@ +.PHONY: hail-ci-build-image push-hail-ci-build-image + +hail-ci-build-image: + docker build -t batch-pr-builder -f Dockerfile.pr-builder . + echo "gcr.io/broad-ctsa/batch-pr-builder:`docker images -q --no-trunc batch-pr-builder | sed -e 's,[^:]*:,,'`" > ../hail-ci-build-image + docker tag batch-pr-builder `cat ../hail-ci-build-image` + +push-hail-ci-build-image: hail-ci-build-image + docker push `cat ../hail-ci-build-image` + +build: build-batch build-batch-test + +build-batch: + docker build -t batch . + +build-batch-test: + docker build -t batch-test -f Dockerfile.test . + +push: push-batch push-batch-test + +push-batch: IMAGE="gcr.io/broad-ctsa/batch:$(shell docker images -q --no-trunc batch | sed -e 's,[^:]*:,,')" +push-batch: build-batch + echo $(IMAGE) > batch-image + docker tag batch $(IMAGE) + docker push $(IMAGE) + +push-batch-test: IMAGE="gcr.io/broad-ctsa/batch-test:$(shell docker images -q --no-trunc batch-test | sed -e 's,[^:]*:,,')" +push-batch-test: build-batch-test + echo $(IMAGE) > batch-test-image + docker tag batch $(IMAGE) + docker push $(IMAGE) + +run-docker: + docker run -e BATCH_USE_KUBE_CONFIG=1 -i -v $(HOME)/.kube:/root/.kube -p 5000:5000 -t batch + +run: + BATCH_USE_KUBE_CONFIG=1 python batch/server.py + +test-local: + POD_IP='127.0.0.1' BATCH_URL='http://127.0.0.1:5000' python -m unittest -v test/test_batch.py diff --git a/batch/README.md b/batch/README.md new file mode 100644 index 00000000000..5877a69567e --- /dev/null +++ b/batch/README.md @@ -0,0 +1,196 @@ +Getting Started +--- + +Start a `minikube` k8s cluster and configure your `kubectl` to point at that k8s +cluster: + +``` +minikube start +``` + +If you get a weird minikube error, try + +``` +minikube delete +rm -rf ~/.minikube +brew cask reinstall minikube # or equivalent on your OS +minikube start +``` + +When you want to return to using a google k8s cluster, you can run this: + +``` +gcloud container clusters get-credentials CLUSTER_NAME +``` + +Set some environment variables so that docker images are placed in the +`minikube` cluster's docker registry: + +``` +eval $(minikube docker-env) +``` + +Build the batch and test image + +``` +make build-batch build-test +``` + +edit the `deployment.yaml` so that the container named `batch` has +`imagePullPolicy: Never`. This ensures that k8s does not go look for the image +in the Google Container Registry and instead uses the local image cache (which +you just updated when you ran `make build-batch build-test`). + +Give way too many privileges to the default service account so that `batch` can +start new pods: + +``` +kubectl create clusterrolebinding \ + cluster-admin-default \ + --clusterrole cluster-admin \ + --serviceaccount=default:default +``` + +Create a batch service: + +``` +kubectl create -f deployment.yaml +``` + +If you ever need to shutdown the service, execute: + +``` +kubectl delete -f deployment.yaml +``` + +Look for the newly created batch pod: + +``` +kubectl get pods +``` + +And create a port forward from the k8s cluster to your local machine (this works +for clusters in GKE too): + +``` +kubectl port-forward POD_NAME 5000:5000 +``` + +The former port is the local one and the latter port is the remote one (i.e. in +the k8s pod). Now you can load the conda environment for testing and run the +tests against this deployment: + +``` +conda env create -f environment.yaml +conda activate hail-batch +make test-local +``` + + + +--- + +Kubernetes [Python client](https://github.com/kubernetes-client/python/blob/master/kubernetes/README.md) + - [V1Pod](https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Pod.md) + - [create_namespaced_pod](https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#create_namespaced_pod) + - [delete_namespaced_pod](https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#delete_namespaced_pod) + - + +To get kubectl credentials for a GKE cluster: + +``` +$ gcloud container clusters get-credentials +``` + +To authorize docker to push to GCR: + +``` +$ gcloud auth configure-docker +``` + +To run batch locally, using the local kube credentials: + +``` +$ docker run -i -v $HOME/.kube:/root/.kube -p 5000:5000 -t batch +``` + +On OSX, the port will be accessible on the docker-machine: + +``` +$(docker-machine ip default):5000 +``` + +Get a shell in a running pod: + +``` +$ kubectl exec -it -- /bin/sh +``` + +Hit a Flask REST endpoint with Curl: + +``` +$ curl -X POST -H "Content-Type: application/json" -d +$ curl -X POST -H "Content-Type: application/json" -d '{"name": "batchtest", "image": "gcr.io/broad-ctsa/true"}' batch/jobs/create +``` + +Give default:default serviceaccount cluster-admin privileges: + +``` +$ kubectl create clusterrolebinding cluster-admin-default --clusterrole cluster-admin --serviceaccount=default:default +``` + +Run an image in a new pod: + +``` +$ kubectl run --restart=Never --image -- +``` + +For example, run a shell in an new pod: + +``` +$ kubectl run -i --tty apline --image=alpine --restart=Never -- sh +``` + +Forward from a local port to a port on pod: + +``` +$ kubectl port-forward jupyter-deployment-5f54cff675-msr85 8888:8888 # : +``` + +Run container with a given hostname: + +$ docker run -d --rm --name spark-m -h spark-m -p 8080:8080 -p 7077:7077 spark-m + +List all containers, included stopped containers: + +$ docker ps -a + +Remove all stopped containers: + +$ docker ps -aq --no-trunc -f status=exited | xargs docker rm + +Run a docker container linked to another: + +$ docker run -d --rm --cpus 0.5 --name spark-w-0 --link spark-m spark-w -c 1 -m 2g + +Get IP of container: + +$ docker inspect | grep IPAddress + +--- + +The following will set some environment variables so that future invocations of +`docker build` will make images available to the minikube cluster. This allows +you to test images without pushing them to a remote container registry. + +``` +eval $(minikube docker-env) +make build-batch build-test +``` + +NB: you must also set the `imagePullPolicy` of any `container` you `kubectl +create` to `Never` if you're using the `:latest` image tag (which is implicitly +used if no tag is specified on the image name). Otherwise, k8s will always try +to check if there is a newer version of the image. Even if `imagePullPolicy` +is set to `NotIfPresent`, k8s will still check for a newer image if you use the +`:latest` tag. diff --git a/batch/batch/__init__.py b/batch/batch/__init__.py new file mode 100644 index 00000000000..770048111af --- /dev/null +++ b/batch/batch/__init__.py @@ -0,0 +1,2 @@ +import batch.client +import batch.api diff --git a/batch/batch/api.py b/batch/batch/api.py new file mode 100644 index 00000000000..4f118e31338 --- /dev/null +++ b/batch/batch/api.py @@ -0,0 +1,64 @@ +import json +import time +import random +import requests + +def create_job(url, spec, attributes, batch_id, callback): + d = {'spec': spec} + if attributes: + d['attributes'] = attributes + if batch_id: + d['batch_id'] = batch_id + if callback: + d['callback'] = callback + + r = requests.post(url + '/jobs/create', json = d) + r.raise_for_status() + return r.json() + +def list_jobs(url): + r = requests.get(url + '/jobs') + r.raise_for_status() + return r.json() + +def get_job(url, job_id): + r = requests.get(url + '/jobs/{}'.format(job_id)) + r.raise_for_status() + return r.json() + +def get_job_log(url, job_id): + r = requests.get(url + '/jobs/{}/log'.format(job_id)) + r.raise_for_status() + return r.text + +def delete_job(url, job_id): + r = requests.delete(url + '/jobs/{}/delete'.format(job_id)) + r.raise_for_status() + return r.json() + +def cancel_job(url, job_id): + r = requests.post(url + '/jobs/{}/cancel'.format(job_id)) + r.raise_for_status() + return r.json() + +def create_batch(url, attributes): + d = {} + if attributes: + d['attributes'] = attributes + r = requests.post(url + '/batches/create', json = d) + r.raise_for_status() + return r.json() + +def get_batch(url, batch_id): + r = requests.get(url + '/batches/{}'.format(batch_id)) + r.raise_for_status() + return r.json() + +def delete_batch(url, batch_id): + r = requests.delete(url + '/batches/{}'.format(batch_id)) + r.raise_for_status() + return r.json() + +def refresh_k8s_state(url): + r = requests.post(url + '/refresh_k8s_state') + r.raise_for_status() diff --git a/batch/batch/client.py b/batch/batch/client.py new file mode 100644 index 00000000000..3dfcc30babf --- /dev/null +++ b/batch/batch/client.py @@ -0,0 +1,177 @@ +import json +import time +import random +import requests +import batch.api as api + +class Job(object): + def __init__(self, client, id, attributes=None, _status = None): + if attributes is None: + attributes = {} + + self.client = client + self.id = id + self.attributes = attributes + self._status = _status + + def is_complete(self): + if self._status: + state = self._status['state'] + if state == 'Complete' or state == 'Cancelled': + return True + return False + + def cached_status(self): + assert self._status != None + return self._status + + def status(self): + self._status = self.client._get_job(self.id) + return self._status + + def wait(self): + i = 0 + while True: + self.status() # update + if self.is_complete(): + return self._status + j = random.randrange(2 ** i) + time.sleep(0.100 * j) + # max 5.12s + if i < 9: + i = i + 1 + + def cancel(self): + self.client._cancel_job(self.id) + + def delete(self): + self.client._delete_job(self.id) + + self.id = None + self.attributes = None + self._status = None + + def log(self): + return self.client._get_job_log(self.id) + +class Batch(object): + def __init__(self, client, id): + self.client = client + self.id = id + + def create_job(self, image, command=None, args=None, env=None, ports=None, + resources=None, tolerations=None, volumes=None, attributes=None, callback=None): + return self.client._create_job(image, command, args, env, ports, resources, tolerations, volumes, attributes, self.id, callback) + + def status(self): + return self.client._get_batch(self.id) + + def wait(self): + i = 0 + while True: + status = self.status() + if status['jobs']['Created'] == 0: + return status + j = random.randrange(2 ** i) + time.sleep(0.100 * j) + # max 5.12s + if i < 9: + i = i + 1 + +class BatchClient(object): + def __init__(self, url=None): + if not url: + url = 'http://batch' + self.url = url + + def _create_job(self, image, command, args, env, ports, resources, tolerations, volumes, attributes, batch_id, callback): + if env: + env = [{'name': k, 'value': v} for (k, v) in env.items()] + else: + env = [] + env.extend([{ + 'name': 'POD_IP', + 'valueFrom': { + 'fieldRef': {'fieldPath': 'status.podIP'} + } + }, { + 'name': 'POD_NAME', + 'valueFrom': { + 'fieldRef': {'fieldPath': 'metadata.name'} + } + }]) + + container = { + 'image': image, + 'name': 'default' + } + if command: + container['command'] = command + if args: + container['args'] = args + if env: + container['env'] = env + if ports: + container['ports'] = [{ + 'containerPort': p, + 'protocol': 'TCP' + } for p in ports] + if resources: + container['resources'] = resources + if volumes: + container['volumeMounts'] = [v['volume_mount'] for v in volumes] + spec = { + 'containers': [container], + 'restartPolicy': 'Never' + } + if volumes: + spec['volumes'] = [v['volume'] for v in volumes] + if tolerations: + spec['tolerations'] = tolerations + + j = api.create_job(self.url, spec, attributes, batch_id, callback) + return Job(self, j['id'], j.get('attributes')) + + def _get_job(self, id): + return api.get_job(self.url, id) + + def _get_job_log(self, id): + return api.get_job_log(self.url, id) + + def _delete_job(self, id): + api.delete_job(self.url, id) + + def _cancel_job(self, id): + api.cancel_job(self.url, id) + + def _get_batch(self, batch_id): + return api.get_batch(self.url, batch_id) + + def _refresh_k8s_state(self): + api.refresh_k8s_state(self.url) + + def list_jobs(self): + jobs = api.list_jobs(self.url) + return [Job(self, j['id'], j.get('attributes'), j) for j in jobs] + + def get_job(self, id): + # make sure job exists + j = api.get_job(self.url, id) + return Job(self, j['id'], j.get('attributes'), j) + + def create_job(self, + image, + command=None, + args=None, + env=None, + ports=None, + resources=None, + tolerations=None, + volumes=None, + attributes=None, + callback=None): + return self._create_job(image, command, args, env, ports, resources, tolerations, volumes, attributes, None, callback) + + def create_batch(self, attributes=None): + b = api.create_batch(self.url, attributes) + return Batch(self, b['id']) diff --git a/batch/batch/server.py b/batch/batch/server.py new file mode 100644 index 00000000000..e0fe31440b2 --- /dev/null +++ b/batch/batch/server.py @@ -0,0 +1,465 @@ +import sys +import os +import time +import random +import uuid +from collections import Counter +import logging +import threading +from flask import Flask, request, jsonify, abort, url_for +import kubernetes as kube +import cerberus +import requests + +if not os.path.exists('logs'): + os.mkdir('logs') +else: + if not os.path.isdir('logs'): + raise OSError('logs exists but is not a directory') + +fmt = logging.Formatter( + # NB: no space after levename because WARNING is so long + '%(levelname)s\t| %(asctime)s \t| %(filename)s \t| %(funcName)s:%(lineno)d | ' + '%(message)s') + +fh = logging.FileHandler('batch.log') +fh.setLevel(logging.INFO) +fh.setFormatter(fmt) + +ch = logging.StreamHandler() +ch.setLevel(logging.INFO) +ch.setFormatter(fmt) + +log = logging.getLogger('batch') +log.setLevel(logging.INFO) + +logging.basicConfig( + handlers=[fh, ch], + level=logging.INFO) + +REFRESH_INTERVAL_IN_SECONDS = int(os.environ.get('REFRESH_INTERVAL_IN_SECONDS', 5 * 60)) + +log.info(f'REFRESH_INTERVAL_IN_SECONDS {REFRESH_INTERVAL_IN_SECONDS}') + +if 'BATCH_USE_KUBE_CONFIG' in os.environ: + kube.config.load_kube_config() +else: + kube.config.load_incluster_config() +v1 = kube.client.CoreV1Api() + +instance_id = uuid.uuid4().hex +log.info(f'instance_id = {instance_id}') + +counter = 0 +def next_id(): + global counter + + counter = counter + 1 + return counter + +pod_name_job = {} +job_id_job = {} + +def _log_path(id): + return f'logs/job-{id}.log' + +def _read_file(p): + with open(p, 'r') as f: + return f.read() + +class Job(object): + def _create_pod(self): + assert not self._pod_name + + pod = v1.create_namespaced_pod('default', self.pod_template) + self._pod_name = pod.metadata.name + pod_name_job[self._pod_name] = self + + log.info('created pod name: {} for job {}'.format(self._pod_name, self.id)) + + def _delete_pod(self): + if self._pod_name: + try: + v1.delete_namespaced_pod(self._pod_name, 'default', kube.client.V1DeleteOptions()) + except kube.client.rest.ApiException as e: + if e.status == 404: + pass + else: + raise + del pod_name_job[self._pod_name] + self._pod_name = None + + def _read_log(self): + if self._state == 'Created': + if self._pod_name: + try: + return v1.read_namespaced_pod_log(self._pod_name, 'default') + except: + pass + elif self._state == 'Complete': + p = _log_path(self.id) + return _read_file(p) + else: + assert self._state == 'Cancelled' + return None + + def __init__(self, pod_spec, batch_id, attributes, callback): + self.id = next_id() + job_id_job[self.id] = self + + self.batch_id = batch_id + if batch_id: + batch = batch_id_batch[batch_id] + batch.jobs.append(self) + + self.attributes = attributes + self.callback = callback + + self.pod_template = kube.client.V1Pod( + metadata = kube.client.V1ObjectMeta(generate_name = 'job-{}-'.format(self.id), + labels = { + 'app': 'batch-job', + 'hail.is/batch-instance': instance_id + }), + spec = pod_spec) + + self._pod_name = None + + self._state = 'Created' + log.info('created job {}'.format(self.id)) + + self._create_pod() + + def set_state(self, new_state): + if self._state != new_state: + log.info('job {} changed state: {} -> {}'.format( + self.id, + self._state, + new_state)) + self._state = new_state + + def cancel(self): + if self.is_complete(): + return + self._delete_pod() + self.set_state('Cancelled') + + def delete(self): + # remove from structures + del job_id_job[self.id] + if self.batch_id: + batch = batch_id_batch[batch_id] + batch.remove(self) + + self._delete_pod() + + def is_complete(self): + return self._state == 'Complete' or self._state == 'Cancelled' + + def mark_unscheduled(self): + if self._pod_name: + del pod_name_job[self._pod_name] + self._pod_name = None + self._create_pod() + + def mark_complete(self, pod): + self.exit_code = pod.status.container_statuses[0].state.terminated.exit_code + + pod_log = v1.read_namespaced_pod_log(pod.metadata.name, 'default') + p = _log_path(self.id) + with open(p, 'w') as f: + f.write(pod_log) + log.info(f'wrote log for job {self.id} to {p}') + + if self._pod_name: + del pod_name_job[self._pod_name] + self._pod_name = None + + self.set_state('Complete') + + log.info('job {} complete, exit_code {}'.format( + self.id, self.exit_code)) + + if self.callback: + def f(id, callback, json): + try: + requests.post(callback, json = json, timeout=120) + except requests.exceptions.RequestException as re: + log.warn(f'callback for job {id} failed due to an error, I will not retry. Error: {re}') + + threading.Thread(target=f, args=(self.id, self.callback, self.to_json())).start() + + def to_json(self): + result = { + 'id': self.id, + 'state': self._state + } + if self._state == 'Complete': + result['exit_code'] = self.exit_code + pod_log = self._read_log() + if pod_log: + result['log'] = pod_log + if self.attributes: + result['attributes'] = self.attributes + return result + +app = Flask('batch') + +@app.route('/jobs/create', methods=['POST']) +def create_job(): + parameters = request.json + + schema = { + # will be validated when creating pod + 'spec': {'type': 'dict', + 'required': True, + 'allow_unknown': True, + 'schema': {} + }, + 'batch_id': {'type': 'integer'}, + 'attributes': { + 'type': 'dict', + 'keyschema': {'type': 'string'}, + 'valueschema': {'type': 'string'} + }, + 'callback': {'type': 'string'} + } + v = cerberus.Validator(schema) + if (not v.validate(parameters)): + # print(v.errors) + abort(404, 'invalid request: {}'.format(v.errors)) + + pod_spec = v1.api_client._ApiClient__deserialize( + parameters['spec'], kube.client.V1PodSpec) + + batch_id = parameters.get('batch_id') + if batch_id: + if batch_id not in batch_id_batch: + abort(404, 'valid request: batch_id {} not found'.format(batch_id)) + + job = Job( + pod_spec, batch_id, parameters.get('attributes'), parameters.get('callback')) + return jsonify(job.to_json()) + +@app.route('/jobs', methods=['GET']) +def get_job_list(): + return jsonify([job.to_json() for _, job in job_id_job.items()]) + +@app.route('/jobs/', methods=['GET']) +def get_job(job_id): + job = job_id_job.get(job_id) + if not job: + abort(404) + return jsonify(job.to_json()) + +@app.route('/jobs//log', methods=['GET']) +def get_job_log(job_id): + if job_id > counter: + abort(404) + + job = job_id_job.get(job_id) + if job: + job_log = job._read_log() + if job_log: + return job_log + else: + p = _log_path(job_id) + if os.path.exists(p): + return _read_file(p) + + abort(404) + +@app.route('/jobs//delete', methods=['DELETE']) +def delete_job(job_id): + job = job_id_job.get(job_id) + if not job: + abort(404) + job.delete() + return jsonify({}) + +@app.route('/jobs//cancel', methods=['POST']) +def cancel_job(job_id): + job = job_id_job.get(job_id) + if not job: + abort(404) + job.cancel() + return jsonify({}) + +batch_id_batch = {} + +class Batch(object): + def __init__(self, attributes): + self.attributes = attributes + self.id = next_id() + batch_id_batch[self.id] = self + self.jobs = [] + + def delete(self): + del batch_id_batch[self.id] + for j in self.jobs: + assert j.batch_id == self.id + j.batch_id = None + + def to_json(self): + state_count = Counter([j._state for j in self.jobs]) + return { + 'id': self.id, + 'jobs': { + 'Created': state_count.get('Created', 0), + 'Complete': state_count.get('Complete', 0), + 'Cancelled': state_count.get('Cancelled', 0) + }, + 'attributes': self.attributes + } + +@app.route('/batches/create', methods=['POST']) +def create_batch(): + parameters = request.json + + schema = { + 'attributes': { + 'type': 'dict', + 'keyschema': {'type': 'string'}, + 'valueschema': {'type': 'string'} + } + } + v = cerberus.Validator(schema) + if (not v.validate(parameters)): + abort(404, 'invalid request: {}'.format(v.errors)) + + batch = Batch(parameters.get('attributes')) + return jsonify(batch.to_json()) + +@app.route('/batches/', methods=['GET']) +def get_batch(batch_id): + batch = batch_id_batch.get(batch_id) + if not batch: + abort(404) + return jsonify(batch.to_json()) + +@app.route('/batches//delete', methods=['DELETE']) +def delete_batch(batch_id): + batch = batch_id_batch.get(batch_id) + if not batch: + abort(404) + batch.delete() + return jsonify({}) + +def update_job_with_pod(job, pod): + if pod: + if pod.status.container_statuses: + assert len(pod.status.container_statuses) == 1 + container_status = pod.status.container_statuses[0] + assert container_status.name == 'default' + + if container_status.state and container_status.state.terminated: + job.mark_complete(pod) + else: + job.mark_unscheduled() + +@app.route('/pod_changed', methods=['POST']) +def pod_changed(): + parameters = request.json + + pod_name = parameters['pod_name'] + + job = pod_name_job.get(pod_name) + if job and not job.is_complete(): + try: + pod = v1.read_namespaced_pod(pod_name, 'default') + except kube.client.rest.ApiException as e: + if e.status == 404: + pod = None + else: + raise + + update_job_with_pod(job, pod) + + return '', 204 + +@app.route('/refresh_k8s_state', methods=['POST']) +def refresh_k8s_state(): + log.info('started k8s state refresh') + + pods = v1.list_namespaced_pod( + 'default', + label_selector=f'app=batch-job,hail.is/batch-instance={instance_id}') + + seen_pods = set() + for pod in pods.items: + pod_name = pod.metadata.name + seen_pods.add(pod_name) + + job = pod_name_job.get(pod_name) + if job and not job.is_complete(): + update_job_with_pod(job, pod) + + for pod_name, job in pod_name_job.items(): + if pod_name not in seen_pods: + update_job_with_pod(job, None) + + log.info('k8s state refresh complete') + + return '', 204 + +def run_forever(target, *args, **kwargs): + # target should be a function + target_name = target.__name__ + + expected_retry_interval_ms = 15 * 1000 # 15s + while True: + start = time.time() + try: + log.info(f'run_forever: run target {target_name}') + target(*args, **kwargs) + log.info(f'run_forever: target {target_name} returned') + except: + log.error(f'run_forever: target {target_name} threw exception', exc_info=sys.exc_info()) + end = time.time() + + run_time_ms = int((end - start) * 1000 + 0.5) + t = random.randrange(expected_retry_interval_ms * 2) - run_time_ms + if t > 0: + log.debug(f'run_forever: {target_name}: sleep {t}ms') + time.sleep(t / 1000.0) + +def flask_event_loop(): + app.run(threaded=False, host='0.0.0.0') + +def kube_event_loop(): + w = kube.watch.Watch() + stream = w.stream( + v1.list_namespaced_pod, + 'default', + label_selector=f'app=batch-job,hail.is/batch-instance={instance_id}') + for event in stream: + pod = event['object'] + name = pod.metadata.name + requests.post('http://127.0.0.1:5000/pod_changed', json={'pod_name': name}, timeout=120) + +def polling_event_loop(): + time.sleep(1) + while True: + try: + r = requests.post('http://127.0.0.1:5000/refresh_k8s_state', timeout=120) + r.raise_for_status() + except requests.HTTPError as e: + log.error(f'Could not poll due to exception: {e}, text: {e.response.text}') + except Exception as e: + log.error(f'Could not poll due to exception: {e}') + pass + time.sleep(REFRESH_INTERVAL_IN_SECONDS) + +kube_thread = threading.Thread(target=run_forever, args=(kube_event_loop,)) +kube_thread.start() + +polling_thread = threading.Thread(target=run_forever, args=(polling_event_loop,)) +polling_thread.start() + +# debug/reloader must run in main thread +# see: https://stackoverflow.com/questions/31264826/start-a-flask-application-in-separate-thread +# flask_thread = threading.Thread(target=flask_event_loop) +# flask_thread.start() +run_forever(flask_event_loop) + +kube_thread.join() diff --git a/batch/deployment.yaml.in b/batch/deployment.yaml.in new file mode 100644 index 00000000000..3ea4aef7004 --- /dev/null +++ b/batch/deployment.yaml.in @@ -0,0 +1,36 @@ +apiVersion: apps/v1beta2 +kind: Deployment +metadata: + name: batch-deployment + labels: + hail.is/sha: @sha@ +spec: + selector: + matchLabels: + app: batch + replicas: 1 + template: + metadata: + labels: + app: batch + hail.is/sha: @sha@ + spec: + containers: + - name: batch + image: @image@ + ports: + - containerPort: 5000 +--- +apiVersion: v1 +kind: Service +metadata: + name: batch + labels: + app: batch +spec: + ports: + - port: 80 + protocol: TCP + targetPort: 5000 + selector: + app: batch diff --git a/batch/hail-ci-build.sh b/batch/hail-ci-build.sh new file mode 100644 index 00000000000..367868d33ec --- /dev/null +++ b/batch/hail-ci-build.sh @@ -0,0 +1,11 @@ +#!/bin/bash +set -ex + +source activate hail-batch + +# run the server in the background with in-cluster config +python batch/server.py & + +sleep 5 + +POD_IP='127.0.0.1' BATCH_URL='http://127.0.0.1:5000' python -m unittest test/test_batch.py diff --git a/batch/hail-ci-deploy.sh b/batch/hail-ci-deploy.sh new file mode 100644 index 00000000000..512ab98b96a --- /dev/null +++ b/batch/hail-ci-deploy.sh @@ -0,0 +1,36 @@ +set -ex + +cd batch + +gcloud -q auth activate-service-account \ + --key-file=/secrets/gcr-push-service-account-key.json + +gcloud -q auth configure-docker + +SHA=$(git rev-parse --short=12 HEAD) + +# get sha label of batch deployment +DEPLOYED_SHA=$(kubectl get --selector=app=batch deployments -o "jsonpath={.items[*].metadata.labels.hail\.is/sha}") + +if [[ $(git cat-file -t "$DEPLOYED_SHA" 2>/dev/null || true) == commit ]]; then + if [[ "$SHA" == "$DEPLOYED_SHA" ]]; then + exit 0 + fi + + NEEDS_REDEPLOY=$(cd $ROOT && python3 project-changed.py $DEPLOYED_SHA batch) + if [[ $NEEDS_REDEPLOY = no ]]; then + exit 0 + fi +fi + +# requires docker +make push-batch + +sed -e "s,@sha@,$SHA," \ + -e "s,@image@,$(cat batch-image)," \ + < deployment.yaml.in > deployment.yaml + +kubectl apply -f deployment.yaml + +# ci can't recover from batch restart yet +kubectl delete pods -l app=hail-ci diff --git a/batch/setup.py b/batch/setup.py new file mode 100644 index 00000000000..6efa2acbb36 --- /dev/null +++ b/batch/setup.py @@ -0,0 +1,11 @@ +from setuptools import setup, find_packages + +setup( + name = 'batch', + version = '0.0.1', + url = 'https://github.com/hail-is/batch.git', + author = 'Hail Team', + author_email = 'hail@broadinstitute.org', + description = 'Job manager for k8s', + packages = find_packages() +) diff --git a/batch/test/__init__.py b/batch/test/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/batch/test/test_batch.py b/batch/test/test_batch.py new file mode 100644 index 00000000000..2868f24c542 --- /dev/null +++ b/batch/test/test_batch.py @@ -0,0 +1,195 @@ +import threading +import time +import os +import unittest +import batch +import requests +from werkzeug.serving import make_server +from flask import Flask, request, jsonify, url_for, Response + +class ServerThread(threading.Thread): + def __init__(self, app, host='127.0.0.1', port=5000): + super().__init__() + + @app.route('/ping', methods=['GET']) + def ping(): + return Response(status=200) + + self.host = host + self.port = port + self.app = app + self.server = make_server(self.host, self.port, app) + self.context = app.app_context() + self.context.push() + + def ping(self): + ping_url = 'http://{}:{}/ping'.format(self.host, self.port) + + up = False + while not up: + try: + requests.get(ping_url) + up = True + except requests.exceptions.ConnectionError: + time.sleep(0.01) + + def start(self): + super().start() + self.ping() + + def run(self): + self.server.serve_forever() + + def shutdown(self): + self.server.shutdown() + +class Test(unittest.TestCase): + def setUp(self): + self.batch = batch.client.BatchClient( + url = os.environ.get('BATCH_URL')) + + self.ip = os.environ.get('POD_IP') + if not self.ip: + self.ip = '127.0.0.1' + + def test_job(self): + j = self.batch.create_job('alpine', ['echo', 'test']) + status = j.wait() + self.assertTrue('attributes' not in status) + self.assertEqual(status['state'], 'Complete') + self.assertEqual(status['exit_code'], 0) + + self.assertEqual(status['log'], 'test\n') + self.assertEqual(j.log(), 'test\n') + + self.assertTrue(j.is_complete()) + + def test_attributes(self): + a = { + 'name': 'test_attributes', + 'foo': 'bar' + } + j = self.batch.create_job( + 'alpine', ['true'], + attributes = a) + status = j.status() + assert(status['attributes'] == a) + + def test_fail(self): + j = self.batch.create_job('alpine', ['false']) + status = j.wait() + self.assertEqual(status['exit_code'], 1) + + def test_deleted_job_log(self): + j = self.batch.create_job('alpine', ['echo', 'test']) + id = j.id + j.wait() + j.delete() + self.assertEqual(self.batch._get_job_log(id), 'test\n') + + def test_delete_job(self): + j = self.batch.create_job('alpine', ['sleep', '30']) + id = j.id + j.delete() + + # verify doesn't exist + try: + self.batch._get_job(id) + except requests.HTTPError as e: + if e.response.status_code == 404: + pass + else: + raise + + def test_cancel_job(self): + j = self.batch.create_job('alpine', ['sleep', '30']) + status = j.status() + self.assertTrue(status['state'], 'Created') + + j.cancel() + + status = j.status() + self.assertTrue(status['state'], 'Cancelled') + self.assertTrue('log' not in status) + + # cancelled job has no log + try: + j.log() + except requests.HTTPError as e: + if e.response.status_code == 404: + pass + else: + raise + + def test_get_nonexistent_job(self): + try: + self.batch._get_job(666) + except requests.HTTPError as e: + if e.response.status_code == 404: + pass + else: + raise + + def test_api_cancel_nonexistent_job(self): + try: + self.batch._cancel_job(666) + except requests.HTTPError as e: + if e.response.status_code == 404: + pass + else: + raise + + def test_get_job(self): + j = self.batch.create_job('alpine', ['true']) + j2 = self.batch.get_job(j.id) + status2 = j2.status() + assert(status2['id'] == j.id) + + def test_batch(self): + b = self.batch.create_batch() + j1 = b.create_job('alpine', ['false']) + j2 = b.create_job('alpine', ['sleep', '1']) + j3 = b.create_job('alpine', ['sleep', '30']) + + # test list_jobs + jobs = self.batch.list_jobs() + self.assertTrue( + set([j.id for j in jobs]).issuperset([j1.id, j2.id, j3.id])) + + # test refresh_k8s_state + self.batch._refresh_k8s_state() + + j2.wait() + j3.cancel() + bstatus = b.wait() + + n_cancelled = bstatus['jobs']['Cancelled'] + n_complete = bstatus['jobs']['Complete'] + self.assertTrue(n_cancelled <= 1) + self.assertTrue(n_cancelled + n_complete == 3) + + def test_callback(self): + app = Flask('test-client') + + d = {} + + @app.route('/test', methods=['POST']) + def test(): + d['status'] = request.get_json() + return Response(status=200) + + port = 5869 + server = ServerThread(app, host=self.ip, port=port) + server.start() + + j = self.batch.create_job('alpine', ['echo', 'test'], + attributes={'foo': 'bar'}, + callback='http://{}:{}/test'.format(self.ip, port)) + j.wait() + + status = d['status'] + self.assertEqual(status['state'], 'Complete') + self.assertEqual(status['attributes'], {'foo': 'bar'}) + + server.shutdown() + server.join() diff --git a/hail-ci-build.sh b/hail-ci-build.sh index df5d33b239c..2d49ca1aeed 100755 --- a/hail-ci-build.sh +++ b/hail-ci-build.sh @@ -1,7 +1,15 @@ #!/bin/bash set -ex -HAIL_CHANGED=$(python3 project-changed.py target/$TARGET_BRANCH hail) -if [[ $HAIL_CHANGED != no ]]; then - (cd hail && /bin/bash hail-ci-build.sh) -fi +PROJECTS='hail batch ci site scorecard cloudtools' + +for project in $PROJECTS; do + CHANGED=$(python3 project-changed.py target/$TARGET_BRANCH $project) + if [[ $CHANGED != no ]]; then + if [[ -e $project/hail-ci-build.sh ]]; then + (cd $project && /bin/bash hail-ci-build.sh) + fi + fi +done + + diff --git a/hail-ci-deploy.sh b/hail-ci-deploy.sh index 1a3cb5ebb89..846fb1126e6 100644 --- a/hail-ci-deploy.sh +++ b/hail-ci-deploy.sh @@ -2,3 +2,4 @@ set -ex (cd hail && /bin/bash hail-ci-deploy.sh) +(cd batch && /bin/bash hail-ci-deploy.sh) diff --git a/project-changed.py b/project-changed.py index 2552f382207..6823832c785 100644 --- a/project-changed.py +++ b/project-changed.py @@ -14,7 +14,8 @@ 'batch': 'batch/', 'ci': 'ci/', 'site': 'site/', - 'scorecard': 'scorecard/' + 'scorecard': 'scorecard/', + 'cloudtools': 'cloudtools/' } orig_hash = sys.argv[1]