From f5755b81188391dc042d0dcaf1300892e33e2d6e Mon Sep 17 00:00:00 2001 From: Dan King Date: Fri, 25 Aug 2023 13:35:14 -0400 Subject: [PATCH 1/5] [batch] re-allow sending kwargs to python jobs CHANGELOG: Fix bug introduced in 0.2.117 by commit `c9de81108` which prevented the passing of keyword arguments to Python jobs. This manifested as "ValueError: too many values to unpack". --- hail/python/hailtop/batch/job.py | 51 ++++++++++++++------ hail/python/hailtop/batch/resource.py | 3 ++ hail/python/test/hailtop/batch/test_batch.py | 44 ++++++++++++++++- 3 files changed, 81 insertions(+), 17 deletions(-) diff --git a/hail/python/hailtop/batch/job.py b/hail/python/hailtop/batch/job.py index ce5e914db47..619d6e60285 100644 --- a/hail/python/hailtop/batch/job.py +++ b/hail/python/hailtop/batch/job.py @@ -5,7 +5,7 @@ import textwrap import warnings from shlex import quote as shq -from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, cast +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, cast, Literal import hailtop.batch_client.client as bc @@ -877,6 +877,19 @@ async def _compile(self, local_tmpdir, remote_tmpdir, *, dry_run=False): return True +UnpreparedArg = '_resource.ResourceType' | List['UnpreparedArg'] | Tuple['UnpreparedArg', ...] | Dict[str, 'UnpreparedArg'] | Any + +PreparedArg = ( + Tuple[Literal['py_path'], str] | + Tuple[Literal['path'], str] | + Tuple[Literal['dict_path'], Dict[str, str]] | + Tuple[Literal['list'], List['PreparedArg']] | + Tuple[Literal['dict'], Dict[str, 'PreparedArg']] | + Tuple[Literal['tuple'], Tuple['PreparedArg', ...]] | + Tuple[Literal['value'], Any] +) + + class PythonJob(Job): """ Object representing a single Python job to execute. @@ -923,7 +936,7 @@ def __init__(self, super().__init__(batch, token, name=name, attributes=attributes, shell=None) self._resources: Dict[str, _resource.Resource] = {} self._resources_inverse: Dict[_resource.Resource, str] = {} - self._function_calls: List[Tuple[_resource.PythonResult, int, Tuple[Any, ...], Dict[str, Any]]] = [] + self._function_calls: List[Tuple[_resource.PythonResult, int, Tuple[UnpreparedArg, ...], Dict[str, UnpreparedArg]]] = [] self.n_results = 0 def _get_python_resource(self, item: str) -> '_resource.PythonResult': @@ -969,7 +982,7 @@ def image(self, image: str) -> 'PythonJob': self._image = image return self - def call(self, unapplied: Callable, *args, **kwargs) -> '_resource.PythonResult': + def call(self, unapplied: Callable, *args: UnpreparedArg, **kwargs: UnpreparedArg) -> '_resource.PythonResult': """Execute a Python function. Examples @@ -1147,7 +1160,7 @@ def handle_args(r): return result async def _compile(self, local_tmpdir, remote_tmpdir, *, dry_run=False): - def prepare_argument_for_serialization(arg): + def preserialize(arg: UnpreparedArg) -> PreparedArg: if isinstance(arg, _resource.PythonResult): return ('py_path', arg._get_path(local_tmpdir)) if isinstance(arg, _resource.ResourceFile): @@ -1155,20 +1168,24 @@ def prepare_argument_for_serialization(arg): if isinstance(arg, _resource.ResourceGroup): return ('dict_path', {name: resource._get_path(local_tmpdir) for name, resource in arg._resources.items()}) - if isinstance(arg, (list, tuple)): - return ('value', [prepare_argument_for_serialization(elt) for elt in arg]) + if isinstance(arg, list): + return ('list', [preserialize(elt) for elt in arg]) + if isinstance(arg, tuple): + return ('tuple', tuple((preserialize(elt) for elt in arg))) if isinstance(arg, dict): - return ('value', {k: prepare_argument_for_serialization(v) for k, v in arg.items()}) + return ('dict', {k: preserialize(v) for k, v in arg.items()}) return ('value', arg) for i, (result, unapplied_id, args, kwargs) in enumerate(self._function_calls): func_file = self._batch._python_function_files[unapplied_id] - prepared_args = prepare_argument_for_serialization(args)[1] - prepared_kwargs = prepare_argument_for_serialization(kwargs)[1] + preserialized_args = [preserialize(arg) for arg in args] + del args + preserialized_kwargs = {keyword: preserialize(arg) for keyword, arg in kwargs.items()} + del kwargs args_file = await self._batch._serialize_python_to_input_file( - os.path.dirname(result._get_path(remote_tmpdir)), "args", i, (prepared_args, prepared_kwargs), dry_run + os.path.dirname(result._get_path(remote_tmpdir)), "args", i, (preserialized_args, preserialized_kwargs), dry_run ) json_write, str_write, repr_write = [ @@ -1190,14 +1207,16 @@ def prepare_argument_for_serialization(arg): def deserialize_argument(arg): typ, val = arg - if typ == 'value' and isinstance(val, dict): - return {{k: deserialize_argument(v) for k, v in val.items()}} - if typ == 'value' and isinstance(val, (list, tuple)): - return [deserialize_argument(elt) for elt in val] if typ == 'py_path': return dill.load(open(val, 'rb')) if typ in ('path', 'dict_path'): return val + if typ == 'list': + return [deserialize_argument(elt) for elt in val] + if typ == 'tuple': + return tuple((deserialize_argument(elt) for elt in val)) + if typ == 'dict': + return {{k: deserialize_argument(v) for k, v in val.items()}} assert typ == 'value' return val @@ -1225,8 +1244,8 @@ def deserialize_argument(arg): unapplied = self._batch._python_function_defs[unapplied_id] self._user_code.append(textwrap.dedent(inspect.getsource(unapplied))) - args_str = ', '.join([f'{arg!r}' for _, arg in prepared_args]) - kwargs_str = ', '.join([f'{k}={v!r}' for k, (_, v) in kwargs.items()]) + args_str = ', '.join([f'{arg!r}' for _, arg in preserialized_args]) + kwargs_str = ', '.join([f'{k}={v!r}' for k, (_, v) in preserialized_kwargs.items()]) separator = ', ' if args_str and kwargs_str else '' func_call = f'{unapplied.__name__}({args_str}{separator}{kwargs_str})' self._user_code.append(self._interpolate_command(func_call, allow_python_results=True)) diff --git a/hail/python/hailtop/batch/resource.py b/hail/python/hailtop/batch/resource.py index b8df03ddca0..d0bbec47962 100644 --- a/hail/python/hailtop/batch/resource.py +++ b/hail/python/hailtop/batch/resource.py @@ -448,3 +448,6 @@ def __str__(self): def __repr__(self): return self._uid # pylint: disable=no-member + + +ResourceType = PythonResult | ResourceFile | ResourceGroup diff --git a/hail/python/test/hailtop/batch/test_batch.py b/hail/python/test/hailtop/batch/test_batch.py index 9f5fc08d166..06e544d3061 100644 --- a/hail/python/test/hailtop/batch/test_batch.py +++ b/hail/python/test/hailtop/batch/test_batch.py @@ -10,7 +10,10 @@ from shlex import quote as shq import uuid import re +import orjson +import hailtop.fs as hfs +import hailtop.batch_client.client as bc from hailtop import pip_version from hailtop.batch import Batch, ServiceBackend, LocalBackend from hailtop.batch.exceptions import BatchException @@ -1270,7 +1273,46 @@ def test_update_batch_from_batch_id(self): res_status = res.status() assert res_status['state'] == 'success', str((res_status, res.debug_info())) - def test_list_recursive_resource_extraction_in_python_jobs(self): + def test_python_job_with_kwarg(self): + def foo(*, kwarg): + return kwarg + + b = self.batch(default_python_image=PYTHON_DILL_IMAGE) + j = b.new_python_job() + r = j.call(foo, kwarg='hello world') + + output_path = f'{self.cloud_output_dir}/test_python_job_with_kwarg' + b.write_output(r.as_json(), output_path) + res = b.run() + assert isinstance(res, bc.Batch) + + assert res.status()['state'] == 'success', str((res, res.debug_info())) + with hfs.open(output_path) as f: + assert orjson.loads(f.read()) == 'hello world' + + def test_tuple_recursive_resource_extraction_in_python_jobs(self): + b = self.batch(default_python_image=PYTHON_DILL_IMAGE) + + def write(paths): + assert isinstance(paths, tuple) + for i, path in enumerate(paths): + with open(path, 'w') as f: + f.write(f'{i}') + + head = b.new_python_job() + head.call(write, (head.ofile1, head.ofile2)) + + tail = b.new_bash_job() + tail.command(f'cat {head.ofile1}') + tail.command(f'cat {head.ofile2}') + + res = b.run() + assert res + res_status = res.status() + assert res_status['state'] == 'success', str((res_status, res.debug_info())) + assert res.get_job_log(tail._job_id)['main'] == '01', str(res.debug_info()) + + def test_list_recursive_resource_extraction_in_python_jobs(self): b = self.batch(default_python_image=PYTHON_DILL_IMAGE) def write(paths): From 19807dab625f542dc59e9f5dc32052e24fdb4968 Mon Sep 17 00:00:00 2001 From: Dan King Date: Mon, 28 Aug 2023 12:10:18 -0400 Subject: [PATCH 2/5] remove types --- hail/python/hailtop/batch/job.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/hail/python/hailtop/batch/job.py b/hail/python/hailtop/batch/job.py index 619d6e60285..598ae63db44 100644 --- a/hail/python/hailtop/batch/job.py +++ b/hail/python/hailtop/batch/job.py @@ -877,17 +877,19 @@ async def _compile(self, local_tmpdir, remote_tmpdir, *, dry_run=False): return True -UnpreparedArg = '_resource.ResourceType' | List['UnpreparedArg'] | Tuple['UnpreparedArg', ...] | Dict[str, 'UnpreparedArg'] | Any +# Needs Python 3.10 +# UnpreparedArg = '_resource.ResourceType' | List['UnpreparedArg'] | Tuple['UnpreparedArg', ...] | Dict[str, 'UnpreparedArg'] | Any -PreparedArg = ( - Tuple[Literal['py_path'], str] | - Tuple[Literal['path'], str] | - Tuple[Literal['dict_path'], Dict[str, str]] | - Tuple[Literal['list'], List['PreparedArg']] | - Tuple[Literal['dict'], Dict[str, 'PreparedArg']] | - Tuple[Literal['tuple'], Tuple['PreparedArg', ...]] | - Tuple[Literal['value'], Any] -) +# Needs Python 3.10 +# PreparedArg = ( +# Tuple[Literal['py_path'], str] | +# Tuple[Literal['path'], str] | +# Tuple[Literal['dict_path'], Dict[str, str]] | +# Tuple[Literal['list'], List['PreparedArg']] | +# Tuple[Literal['dict'], Dict[str, 'PreparedArg']] | +# Tuple[Literal['tuple'], Tuple['PreparedArg', ...]] | +# Tuple[Literal['value'], Any] +# ) class PythonJob(Job): @@ -936,7 +938,7 @@ def __init__(self, super().__init__(batch, token, name=name, attributes=attributes, shell=None) self._resources: Dict[str, _resource.Resource] = {} self._resources_inverse: Dict[_resource.Resource, str] = {} - self._function_calls: List[Tuple[_resource.PythonResult, int, Tuple[UnpreparedArg, ...], Dict[str, UnpreparedArg]]] = [] + self._function_calls: List[Tuple[_resource.PythonResult, int, Tuple[Any, ...], Dict[str, Any]]] = [] self.n_results = 0 def _get_python_resource(self, item: str) -> '_resource.PythonResult': @@ -982,7 +984,7 @@ def image(self, image: str) -> 'PythonJob': self._image = image return self - def call(self, unapplied: Callable, *args: UnpreparedArg, **kwargs: UnpreparedArg) -> '_resource.PythonResult': + def call(self, unapplied: Callable, *args: Any, **kwargs: Any) -> '_resource.PythonResult': """Execute a Python function. Examples @@ -1160,7 +1162,7 @@ def handle_args(r): return result async def _compile(self, local_tmpdir, remote_tmpdir, *, dry_run=False): - def preserialize(arg: UnpreparedArg) -> PreparedArg: + def preserialize(arg: Any) -> Any: if isinstance(arg, _resource.PythonResult): return ('py_path', arg._get_path(local_tmpdir)) if isinstance(arg, _resource.ResourceFile): From 6224e672dd81ad4c4d90ea7acf5fdb7e17a2a04c Mon Sep 17 00:00:00 2001 From: Dan King Date: Thu, 31 Aug 2023 15:01:00 -0400 Subject: [PATCH 3/5] Revert "remove types" This reverts commit 19807dab625f542dc59e9f5dc32052e24fdb4968. --- hail/python/hailtop/batch/job.py | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/hail/python/hailtop/batch/job.py b/hail/python/hailtop/batch/job.py index 598ae63db44..619d6e60285 100644 --- a/hail/python/hailtop/batch/job.py +++ b/hail/python/hailtop/batch/job.py @@ -877,19 +877,17 @@ async def _compile(self, local_tmpdir, remote_tmpdir, *, dry_run=False): return True -# Needs Python 3.10 -# UnpreparedArg = '_resource.ResourceType' | List['UnpreparedArg'] | Tuple['UnpreparedArg', ...] | Dict[str, 'UnpreparedArg'] | Any +UnpreparedArg = '_resource.ResourceType' | List['UnpreparedArg'] | Tuple['UnpreparedArg', ...] | Dict[str, 'UnpreparedArg'] | Any -# Needs Python 3.10 -# PreparedArg = ( -# Tuple[Literal['py_path'], str] | -# Tuple[Literal['path'], str] | -# Tuple[Literal['dict_path'], Dict[str, str]] | -# Tuple[Literal['list'], List['PreparedArg']] | -# Tuple[Literal['dict'], Dict[str, 'PreparedArg']] | -# Tuple[Literal['tuple'], Tuple['PreparedArg', ...]] | -# Tuple[Literal['value'], Any] -# ) +PreparedArg = ( + Tuple[Literal['py_path'], str] | + Tuple[Literal['path'], str] | + Tuple[Literal['dict_path'], Dict[str, str]] | + Tuple[Literal['list'], List['PreparedArg']] | + Tuple[Literal['dict'], Dict[str, 'PreparedArg']] | + Tuple[Literal['tuple'], Tuple['PreparedArg', ...]] | + Tuple[Literal['value'], Any] +) class PythonJob(Job): @@ -938,7 +936,7 @@ def __init__(self, super().__init__(batch, token, name=name, attributes=attributes, shell=None) self._resources: Dict[str, _resource.Resource] = {} self._resources_inverse: Dict[_resource.Resource, str] = {} - self._function_calls: List[Tuple[_resource.PythonResult, int, Tuple[Any, ...], Dict[str, Any]]] = [] + self._function_calls: List[Tuple[_resource.PythonResult, int, Tuple[UnpreparedArg, ...], Dict[str, UnpreparedArg]]] = [] self.n_results = 0 def _get_python_resource(self, item: str) -> '_resource.PythonResult': @@ -984,7 +982,7 @@ def image(self, image: str) -> 'PythonJob': self._image = image return self - def call(self, unapplied: Callable, *args: Any, **kwargs: Any) -> '_resource.PythonResult': + def call(self, unapplied: Callable, *args: UnpreparedArg, **kwargs: UnpreparedArg) -> '_resource.PythonResult': """Execute a Python function. Examples @@ -1162,7 +1160,7 @@ def handle_args(r): return result async def _compile(self, local_tmpdir, remote_tmpdir, *, dry_run=False): - def preserialize(arg: Any) -> Any: + def preserialize(arg: UnpreparedArg) -> PreparedArg: if isinstance(arg, _resource.PythonResult): return ('py_path', arg._get_path(local_tmpdir)) if isinstance(arg, _resource.ResourceFile): From 9b87c935185a6499f7d263d219284ec86ae28ce1 Mon Sep 17 00:00:00 2001 From: Dan King Date: Thu, 31 Aug 2023 15:29:22 -0400 Subject: [PATCH 4/5] actually address the comment about typing --- hail/python/hailtop/batch/job.py | 20 ++++++++++---------- hail/python/hailtop/batch/resource.py | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/hail/python/hailtop/batch/job.py b/hail/python/hailtop/batch/job.py index 619d6e60285..93ef8a5d296 100644 --- a/hail/python/hailtop/batch/job.py +++ b/hail/python/hailtop/batch/job.py @@ -877,17 +877,17 @@ async def _compile(self, local_tmpdir, remote_tmpdir, *, dry_run=False): return True -UnpreparedArg = '_resource.ResourceType' | List['UnpreparedArg'] | Tuple['UnpreparedArg', ...] | Dict[str, 'UnpreparedArg'] | Any - -PreparedArg = ( - Tuple[Literal['py_path'], str] | - Tuple[Literal['path'], str] | - Tuple[Literal['dict_path'], Dict[str, str]] | - Tuple[Literal['list'], List['PreparedArg']] | - Tuple[Literal['dict'], Dict[str, 'PreparedArg']] | - Tuple[Literal['tuple'], Tuple['PreparedArg', ...]] | +UnpreparedArg = Union['_resource.ResourceType', List['UnpreparedArg'], Tuple['UnpreparedArg', ...], Dict[str, 'UnpreparedArg'], Any] + +PreparedArg = Union[ + Tuple[Literal['py_path'], str], + Tuple[Literal['path'], str], + Tuple[Literal['dict_path'], Dict[str, str]], + Tuple[Literal['list'], List['PreparedArg']], + Tuple[Literal['dict'], Dict[str, 'PreparedArg']], + Tuple[Literal['tuple'], Tuple['PreparedArg', ...]], Tuple[Literal['value'], Any] -) +] class PythonJob(Job): diff --git a/hail/python/hailtop/batch/resource.py b/hail/python/hailtop/batch/resource.py index d0bbec47962..3ebd1577dc5 100644 --- a/hail/python/hailtop/batch/resource.py +++ b/hail/python/hailtop/batch/resource.py @@ -450,4 +450,4 @@ def __repr__(self): return self._uid # pylint: disable=no-member -ResourceType = PythonResult | ResourceFile | ResourceGroup +ResourceType = Union[PythonResult, ResourceFile, ResourceGroup] From 42dfef518a6764c0739774b4c4f17d002223f116 Mon Sep 17 00:00:00 2001 From: Dan King Date: Fri, 1 Sep 2023 11:18:19 -0400 Subject: [PATCH 5/5] avoid assert, which tries to serialize pytest --- hail/python/test/hailtop/batch/test_batch.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hail/python/test/hailtop/batch/test_batch.py b/hail/python/test/hailtop/batch/test_batch.py index 0c5d0db81a6..efcc88d9d97 100644 --- a/hail/python/test/hailtop/batch/test_batch.py +++ b/hail/python/test/hailtop/batch/test_batch.py @@ -1315,7 +1315,8 @@ def test_tuple_recursive_resource_extraction_in_python_jobs(self): b = self.batch(default_python_image=PYTHON_DILL_IMAGE) def write(paths): - assert isinstance(paths, tuple) + if not isinstance(paths, tuple): + raise ValueError('paths must be a tuple') for i, path in enumerate(paths): with open(path, 'w') as f: f.write(f'{i}') @@ -1329,9 +1330,9 @@ def write(paths): res = b.run() assert res + assert tail._job_id res_status = res.status() assert res_status['state'] == 'success', str((res_status, res.debug_info())) - assert tail._job_id is not None assert res.get_job_log(tail._job_id)['main'] == '01', str(res.debug_info()) def test_list_recursive_resource_extraction_in_python_jobs(self):