Skip to content

Commit

Permalink
[hailctl] Add hailctl batch submit (#12471)
Browse files Browse the repository at this point in the history
* [hailctl] Add hailctl batch submit

* make name an option not an argument

* lint'

* use relative file paths for files

* add basic test

* use the current hailgenetics/hail image not the dockerhub one

* get quiet mode working to fully test hailctl batch submit

* fix

* cleanup

* submit sets HAIL_QUERY_BACKEND to batch

* fix

* fixes

* lint

* name the batch that the submitted job spawns
  • Loading branch information
daniel-goldstein authored Dec 1, 2022
1 parent e795238 commit 60b43d4
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 4 deletions.
57 changes: 57 additions & 0 deletions build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2638,6 +2638,63 @@ steps:
- merge_code
- service_base_image
- deploy_batch
- kind: runImage
name: test_hailctl_batch
image:
valueFrom: hailgenetics_hail_image.image
script: |
set -ex
export HAIL_GENETICS_HAIL_IMAGE="{{ hailgenetics_hail_image.image }}"
export GOOGLE_APPLICATION_CREDENTIALS=/test-gsa-key/key.json
hailctl config set batch/billing_project test
hailctl config set batch/remote_tmpdir {{ global.test_storage_uri }}/{{ token }}/hailctl-test
mkdir -p foo
echo "bar" > foo/baz.txt
cat >simple_hail.py <<EOF
import hail as hl
with open('foo/baz.txt') as f:
print(f.read())
hl.init(app_name='test-hailctl-batch-submit-query')
hl.utils.range_table(10).collect()
EOF
BATCH_ID=$(hailctl batch submit simple_hail.py --name=test-hailctl-batch-submit --files=foo -o json | jq '.id')
STATUS=$(hailctl batch wait -o json $BATCH_ID | jq -jr '.state')
if [ "$STATUS" == "success" ]; then
exit 0;
else
exit 1;
fi
secrets:
- name: worker-deploy-config
namespace:
valueFrom: default_ns.name
mountPath: /deploy-config
- name: test-tokens
namespace:
valueFrom: default_ns.name
mountPath: /user-tokens
- name: test-gsa-key
namespace:
valueFrom: default_ns.name
mountPath: /test-gsa-key
- name: ssl-config-batch-tests
namespace:
valueFrom: default_ns.name
mountPath: /ssl-config
dependsOn:
- hailgenetics_hail_image
- create_deploy_config
- create_accounts
- default_ns
- merge_code
- deploy_batch
- kind: runImage
name: test_batch_docs
image:
Expand Down
1 change: 1 addition & 0 deletions docker/hailgenetics/hail/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ ENV MAKEFLAGS -j2
RUN hail-apt-get-install \
curl \
git \
jq \
liblapack3 \
openjdk-8-jre-headless \
rsync \
Expand Down
3 changes: 2 additions & 1 deletion hail/python/hailtop/aiotools/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ async def copy(*,
transfers,
files_listener=make_listener(progress, file_tid),
bytes_listener=make_listener(progress, bytes_tid))
copy_report.summarize()
if verbose:
copy_report.summarize()


def make_transfer(json_object: Dict[str, str]) -> Transfer:
Expand Down
2 changes: 1 addition & 1 deletion hail/python/hailtop/batch_client/aioclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ async def wait(self,
description += ': '
if progress is not None:
return await self._wait(description, progress, disable_progress_bar, starting_job)
with BatchProgressBar() as progress2:
with BatchProgressBar(disable=disable_progress_bar) as progress2:
return await self._wait(description, progress2, disable_progress_bar, starting_job)

async def debug_info(self):
Expand Down
12 changes: 11 additions & 1 deletion hail/python/hailtop/hailctl/batch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from . import log
from . import job
from . import billing
from . import submit


def parser():
Expand Down Expand Up @@ -39,6 +40,11 @@ def parser():
help='Delete a batch',
description='Delete a batch'
)
submit_parser = subparsers.add_parser(
'submit',
help='Submit a batch',
description='Submit a batch',
)
log_parser = subparsers.add_parser(
'log',
help='Get log for a job',
Expand Down Expand Up @@ -69,6 +75,9 @@ def parser():
delete_parser.set_defaults(module='delete')
delete.init_parser(delete_parser)

submit_parser.set_defaults(module='submit')
submit.init_parser(submit_parser)

log_parser.set_defaults(module='log')
log.init_parser(log_parser)

Expand All @@ -93,7 +102,8 @@ def main(args):
'cancel': cancel,
'log': log,
'job': job,
'wait': wait
'wait': wait,
'submit': submit,
}

args, pass_through_args = parser().parse_known_args(args=args)
Expand Down
74 changes: 74 additions & 0 deletions hail/python/hailtop/hailctl/batch/submit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import asyncio
import orjson
import os

import hailtop.batch as hb
import hailtop.batch_client.client as bc
from hailtop import pip_version
from hailtop.aiotools.copy import copy_from_dict
from hailtop.config import get_remote_tmpdir, get_user_config_path, get_deploy_config
from hailtop.utils import secret_alnum_string, unpack_comma_delimited_inputs

HAIL_GENETICS_HAIL_IMAGE = os.environ.get('HAIL_GENETICS_HAIL_IMAGE', f'hailgenetics/hail:{pip_version()}')


def init_parser(parser):
parser.add_argument('script', type=str, help='Path to script')
parser.add_argument('--name', type=str, default='', help='Batch name')
parser.add_argument('--image-name', type=str, required=False,
help='Name for Docker image. Defaults to hailgenetics/hail')
parser.add_argument('--files', nargs='+', action='append', default=[],
help='Comma-separated list of files or directories to add to the working directory of job')
parser.add_argument('-o', type=str, default='text', choices=['text', 'json'])


async def async_main(args):
script = args.script
files = unpack_comma_delimited_inputs(args.files)
user_config = get_user_config_path()
quiet = args.o != 'text'

remote_tmpdir = get_remote_tmpdir('hailctl batch submit')
tmpdir_path_prefix = secret_alnum_string()

def cloud_prefix(path):
return f'{remote_tmpdir}/{tmpdir_path_prefix}/{path}'

b = hb.Batch(name=args.name, backend=hb.ServiceBackend())
j = b.new_bash_job()
j.image(args.image_name or HAIL_GENETICS_HAIL_IMAGE)

rel_file_paths = [os.path.relpath(file) for file in files]
local_files_to_cloud_files = [{'from': local, 'to': cloud_prefix(local)} for local in rel_file_paths]
await copy_from_dict(files=[
{'from': script, 'to': cloud_prefix(script)},
{'from': str(user_config), 'to': cloud_prefix(user_config)},
*local_files_to_cloud_files,
])
for file in local_files_to_cloud_files:
local_file = file['from']
cloud_file = file['to']
in_file = b.read_input(cloud_file)
j.command(f'ln -s {in_file} {local_file}')

script_file = b.read_input(cloud_prefix(script))
config_file = b.read_input(cloud_prefix(user_config))
j.command(f'mkdir -p $HOME/.config/hail && ln -s {config_file} $HOME/.config/hail/config.ini')

j.env('HAIL_QUERY_BACKEND', 'batch')

command = 'python3' if script.endswith('.py') else 'bash'
j.command(f'{command} {script_file}')
batch_handle: bc.Batch = b.run(wait=False, disable_progress_bar=quiet) # type: ignore

if args.o == 'text':
deploy_config = get_deploy_config()
url = deploy_config.external_url('batch', f'/batches/{batch_handle.id}/jobs/1')
print(f'Submitted batch {batch_handle.id}, see {url}')
else:
assert args.o == 'json'
print(orjson.dumps({'id': batch_handle.id}).decode('utf-8'))


def main(args, pass_through_args, client): # pylint: disable=unused-argument
asyncio.run(async_main(args))
12 changes: 11 additions & 1 deletion hail/python/hailtop/hailctl/batch/wait.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import json
import sys
from .batch_cli_utils import get_batch_if_exists


def init_parser(parser):
parser.add_argument('batch_id', type=int)
parser.add_argument("--quiet", "-q",
action="store_true",
help="Do not print a progress bar for the batch")
parser.add_argument('-o', type=str, default='text', choices=['text', 'json'])


def main(args, pass_through_args, client): # pylint: disable=unused-argument
Expand All @@ -13,4 +18,9 @@ def main(args, pass_through_args, client): # pylint: disable=unused-argument
sys.exit(1)

batch = maybe_batch
print(batch.wait())
quiet = args.quiet or args.o != 'text'
out = batch.wait(disable_progress_bar=quiet)
if args.o == 'json':
print(json.dumps(out))
else:
print(out)

0 comments on commit 60b43d4

Please sign in to comment.