From 4fb659c1fcdd5a1f86cab8954218679f84c2f07e Mon Sep 17 00:00:00 2001 From: Dan King Date: Wed, 6 Sep 2023 10:01:37 -0400 Subject: [PATCH] [qob][batch] do not list all jobs on failure (plus: types!) (#13500) CHANGELOG: In Query-on-Batch, the client-side Python code will not try to list every job when a QoB batch fails. This could take hours for long-running pipelines or pipelines with many partitions. I also added API types to the list jobs end point because I have to go hunting for this every time anyway. Seems better to have this information at our digital fingertips. --- batch/batch/batch.py | 11 ++-- batch/batch/batch_format_version.py | 4 +- batch/batch/front_end/front_end.py | 54 +++++++++++-------- hail/python/hail/backend/service_backend.py | 10 +++- hail/python/hailtop/batch_client/__init__.py | 3 +- hail/python/hailtop/batch_client/aioclient.py | 47 ++++++++++++---- hail/python/hailtop/batch_client/types.py | 43 +++++++++++++++ hail/python/hailtop/hailctl/batch/cli.py | 9 +++- hail/python/hailtop/utils/utils.py | 2 +- 9 files changed, 137 insertions(+), 46 deletions(-) create mode 100644 hail/python/hailtop/batch_client/types.py diff --git a/batch/batch/batch.py b/batch/batch/batch.py index fcfc4e41058..8eb5209a1e6 100644 --- a/batch/batch/batch.py +++ b/batch/batch/batch.py @@ -1,8 +1,9 @@ import json import logging -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from gear import transaction +from hailtop.batch_client.types import CostBreakdownEntry, JobListEntryV1Alpha from hailtop.utils import humanize_timedelta_msecs, time_msecs_str from .batch_format_version import BatchFormatVersion @@ -12,7 +13,7 @@ log = logging.getLogger('batch') -def cost_breakdown_to_dict(cost_breakdown: dict): +def cost_breakdown_to_dict(cost_breakdown: Dict[str, float]) -> List[CostBreakdownEntry]: return [{'resource': resource, 'cost': cost} for resource, cost in cost_breakdown.items()] @@ -75,7 +76,7 @@ def _time_msecs_str(t): return d -def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> Dict[str, Any]: +def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> JobListEntryV1Alpha: format_version = BatchFormatVersion(record['format_version']) db_status = record['status'] @@ -89,7 +90,7 @@ def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> Dict[str, if record['cost_breakdown'] is not None: record['cost_breakdown'] = cost_breakdown_to_dict(json.loads(record['cost_breakdown'])) - result = { + return { 'batch_id': record['batch_id'], 'job_id': record['job_id'], 'name': name, @@ -103,8 +104,6 @@ def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> Dict[str, 'cost_breakdown': record['cost_breakdown'], } - return result - async def cancel_batch_in_db(db, batch_id): @transaction(db) diff --git a/batch/batch/batch_format_version.py b/batch/batch/batch_format_version.py index 088885ef312..06c8dc07f2a 100644 --- a/batch/batch/batch_format_version.py +++ b/batch/batch/batch_format_version.py @@ -1,3 +1,5 @@ +from typing import Optional, Tuple + from hailtop.batch_client.aioclient import Job @@ -117,7 +119,7 @@ def db_status(self, status): return [ec, duration] - def get_status_exit_code_duration(self, status): + def get_status_exit_code_duration(self, status) -> Tuple[Optional[int], Optional[int]]: if self.format_version == 1: job_status = {'status': status} return (Job.exit_code(job_status), Job.total_duration_msecs(job_status)) diff --git a/batch/batch/front_end/front_end.py b/batch/batch/front_end/front_end.py index 68fd981df8c..ed111e36213 100644 --- a/batch/batch/front_end/front_end.py +++ b/batch/batch/front_end/front_end.py @@ -11,7 +11,7 @@ import traceback from functools import wraps from numbers import Number -from typing import Any, Awaitable, Callable, Dict, List, NoReturn, Optional, Tuple, TypeVar, Union +from typing import Any, Awaitable, Callable, Dict, List, NoReturn, Optional, Tuple, TypeVar, Union, cast import aiohttp import aiohttp.web_exceptions @@ -45,6 +45,7 @@ from gear.profiling import install_profiler_if_requested from hailtop import aiotools, dictfix, httpx, version from hailtop.batch_client.parse import parse_cpu_in_mcpu, parse_memory_in_bytes, parse_storage_in_bytes +from hailtop.batch_client.types import GetJobResponseV1Alpha, GetJobsResponseV1Alpha, JobListEntryV1Alpha from hailtop.config import get_deploy_config from hailtop.hail_logging import AccessLogger from hailtop.tls import internal_server_ssl_context @@ -252,7 +253,9 @@ async def _handle_api_error(f: Callable[P, Awaitable[T]], *args: P.args, **kwarg raise e.http_response() -async def _query_batch_jobs(request: web.Request, batch_id: int, version: int, q: str, last_job_id: Optional[int]): +async def _query_batch_jobs( + request: web.Request, batch_id: int, version: int, q: str, last_job_id: Optional[int] +) -> Tuple[List[JobListEntryV1Alpha], Optional[int]]: db: Database = request.app['db'] if version == 1: sql, sql_args = parse_batch_jobs_query_v1(batch_id, q, last_job_id) @@ -270,7 +273,9 @@ async def _query_batch_jobs(request: web.Request, batch_id: int, version: int, q return (jobs, last_job_id) -async def _get_jobs(request, batch_id: int, version: int, q: str, last_job_id: Optional[int]): +async def _get_jobs( + request: web.Request, batch_id: int, version: int, q: str, last_job_id: Optional[int] +) -> GetJobsResponseV1Alpha: db = request.app['db'] record = await db.select_and_fetchone( @@ -1739,7 +1744,7 @@ async def ui_batches(request: web.Request, userdata: UserData) -> web.Response: return await render_template('batch', request, userdata, 'batches.html', page_context) -async def _get_job(app, batch_id, job_id): +async def _get_job(app, batch_id, job_id) -> GetJobResponseV1Alpha: db: Database = app['db'] record = await db.select_and_fetchone( @@ -1786,9 +1791,11 @@ async def _get_job(app, batch_id, job_id): _get_full_job_status(app, record), _get_full_job_spec(app, record), _get_attributes(app, record) ) - job = job_record_to_dict(record, attributes.get('name')) - job['status'] = full_status - job['spec'] = full_spec + job: GetJobResponseV1Alpha = { + **job_record_to_dict(record, attributes.get('name')), + 'status': full_status, + 'spec': full_spec, + } if attributes: job['attributes'] = attributes return job @@ -2072,6 +2079,8 @@ async def ui_get_job(request, userdata, batch_id): _get_job_resource_usage(app, batch_id, job_id), ) + job = cast(Dict[str, Any], job) + job['duration'] = humanize_timedelta_msecs(job['duration']) job['cost'] = cost_str(job['cost']) @@ -2131,21 +2140,22 @@ async def ui_get_job(request, userdata, batch_id): non_io_storage_limit_bytes = None memory_limit_bytes = None - resources = job_specification['resources'] - if 'memory_bytes' in resources: - memory_limit_bytes = resources['memory_bytes'] - resources['actual_memory'] = humanize.naturalsize(memory_limit_bytes, binary=True) - del resources['memory_bytes'] - if 'storage_gib' in resources: - io_storage_limit_bytes = resources['storage_gib'] * 1024**3 - resources['actual_storage'] = humanize.naturalsize(io_storage_limit_bytes, binary=True) - del resources['storage_gib'] - if 'cores_mcpu' in resources: - cores = resources['cores_mcpu'] / 1000 - non_io_storage_limit_gb = min(cores * RESERVED_STORAGE_GB_PER_CORE, RESERVED_STORAGE_GB_PER_CORE) - non_io_storage_limit_bytes = int(non_io_storage_limit_gb * 1024**3 + 1) - resources['actual_cpu'] = cores - del resources['cores_mcpu'] + if job_specification is not None: + resources = job_specification['resources'] + if 'memory_bytes' in resources: + memory_limit_bytes = resources['memory_bytes'] + resources['actual_memory'] = humanize.naturalsize(memory_limit_bytes, binary=True) + del resources['memory_bytes'] + if 'storage_gib' in resources: + io_storage_limit_bytes = resources['storage_gib'] * 1024**3 + resources['actual_storage'] = humanize.naturalsize(io_storage_limit_bytes, binary=True) + del resources['storage_gib'] + if 'cores_mcpu' in resources: + cores = resources['cores_mcpu'] / 1000 + non_io_storage_limit_gb = min(cores * RESERVED_STORAGE_GB_PER_CORE, RESERVED_STORAGE_GB_PER_CORE) + non_io_storage_limit_bytes = int(non_io_storage_limit_gb * 1024**3 + 1) + resources['actual_cpu'] = cores + del resources['cores_mcpu'] # Not all logs will be proper utf-8 but we attempt to show them as # str or else Jinja will present them surrounded by b'' diff --git a/hail/python/hail/backend/service_backend.py b/hail/python/hail/backend/service_backend.py index 3a82a0df61c..4478e2efe85 100644 --- a/hail/python/hail/backend/service_backend.py +++ b/hail/python/hail/backend/service_backend.py @@ -493,7 +493,10 @@ async def _read_output(self, ir: Optional[BaseIR], output_uri: str, input_uri: s except FileNotFoundError as exc: raise FatalError('Hail internal error. Please contact the Hail team and provide the following information.\n\n' + yamlx.dump({ 'service_backend_debug_info': self.debug_info(), - 'batch_debug_info': await self._batch.debug_info(), + 'batch_debug_info': await self._batch.debug_info( + _jobs_query_string='failed', + _max_jobs=10 + ), 'input_uri': await self._async_fs.read(input_uri), })) from exc @@ -514,7 +517,10 @@ async def _read_output(self, ir: Optional[BaseIR], output_uri: str, input_uri: s except UnexpectedEOFError as exc: raise FatalError('Hail internal error. Please contact the Hail team and provide the following information.\n\n' + yamlx.dump({ 'service_backend_debug_info': self.debug_info(), - 'batch_debug_info': await self._batch.debug_info(), + 'batch_debug_info': await self._batch.debug_info( + _jobs_query_string='failed', + _max_jobs=10 + ), 'in': await self._async_fs.read(input_uri), 'out': await self._async_fs.read(output_uri), })) from exc diff --git a/hail/python/hailtop/batch_client/__init__.py b/hail/python/hailtop/batch_client/__init__.py index 60de0b330b8..f9219abf05d 100644 --- a/hail/python/hailtop/batch_client/__init__.py +++ b/hail/python/hailtop/batch_client/__init__.py @@ -1,4 +1,4 @@ -from . import client, aioclient, parse +from . import client, aioclient, parse, types from .aioclient import BatchAlreadyCreatedError, BatchNotCreatedError, JobAlreadySubmittedError, JobNotSubmittedError __all__ = [ @@ -9,4 +9,5 @@ 'client', 'aioclient', 'parse', + 'types', ] diff --git a/hail/python/hailtop/batch_client/aioclient.py b/hail/python/hailtop/batch_client/aioclient.py index 7e607c12074..7697476b8fe 100644 --- a/hail/python/hailtop/batch_client/aioclient.py +++ b/hail/python/hailtop/batch_client/aioclient.py @@ -1,4 +1,4 @@ -from typing import AsyncIterator, Optional, Dict, Any, List, Tuple, Union +from typing import Optional, Dict, Any, List, Tuple, Union, AsyncIterator, TypedDict, cast import math import random import logging @@ -17,6 +17,7 @@ from hailtop.utils.rich_progress_bar import is_notebook, BatchProgressBar, BatchProgressBarTask from hailtop import httpx +from .types import GetJobsResponseV1Alpha, JobListEntryV1Alpha, GetJobResponseV1Alpha from .globals import tasks, complete_states log = logging.getLogger('batch_client.aioclient') @@ -173,7 +174,7 @@ def _get_duration(container_status): return sum(durations) # type: ignore @staticmethod - def submitted_job(batch: 'Batch', job_id: int, _status: Optional[dict] = None): + def submitted_job(batch: 'Batch', job_id: int, _status: Optional[GetJobResponseV1Alpha] = None): return Job(batch, AbsoluteJobId(job_id), _status=_status) @staticmethod @@ -184,7 +185,7 @@ def __init__(self, batch: 'Batch', job_id: Union[AbsoluteJobId, InUpdateJobId], *, - _status: Optional[dict] = None): + _status: Optional[GetJobResponseV1Alpha] = None): self._batch = batch self._job_id = job_id self._status = _status @@ -227,7 +228,9 @@ async def attributes(self): if not self._status: await self.status() assert self._status is not None - return self._status['attributes'] + if 'attributes' in self._status: + return self._status['attributes'] + return {} async def _is_job_in_state(self, states): await self.status() @@ -267,9 +270,12 @@ async def status(self) -> Dict[str, Any]: return self._status async def wait(self) -> Dict[str, Any]: - return await self._wait_for_states(*complete_states) + return cast( + Dict[str, Any], # https://stackoverflow.com/a/76515675/6823256 + await self._wait_for_states(*complete_states) + ) - async def _wait_for_states(self, *states: str) -> Dict[str, Any]: + async def _wait_for_states(self, *states: str) -> GetJobResponseV1Alpha: tries = 0 while True: if await self._is_job_in_state(states) or await self.is_complete(): @@ -307,6 +313,10 @@ class BatchAlreadyCreatedError(Exception): pass +class BatchDebugInfo(TypedDict): + status: Dict[str, Any] + jobs: List[JobListEntryV1Alpha] + class Batch: def __init__(self, client: 'BatchClient', @@ -356,7 +366,10 @@ async def cancel(self): self._raise_if_not_created() await self._client._patch(f'/api/v1alpha/batches/{self.id}/cancel') - async def jobs(self, q: Optional[str] = None, version: Optional[int] = None) -> AsyncIterator[Dict[str, Any]]: + async def jobs(self, + q: Optional[str] = None, + version: Optional[int] = None + ) -> AsyncIterator[JobListEntryV1Alpha]: self._raise_if_not_created() if version is None: version = 1 @@ -368,7 +381,10 @@ async def jobs(self, q: Optional[str] = None, version: Optional[int] = None) -> if last_job_id is not None: params['last_job_id'] = last_job_id resp = await self._client._get(f'/api/v{version}alpha/batches/{self.id}/jobs', params=params) - body = await resp.json() + body = cast( + GetJobsResponseV1Alpha, + await resp.json() + ) for job in body['jobs']: yield job last_job_id = body.get('last_job_id') @@ -463,11 +479,17 @@ async def wait(self, with BatchProgressBar(disable=disable_progress_bar) as progress2: return await self._wait(description, progress2, disable_progress_bar, starting_job) - async def debug_info(self): + async def debug_info(self, + _jobs_query_string: Optional[str] = None, + _max_jobs: Optional[int] = None, + ) -> BatchDebugInfo: self._raise_if_not_created() batch_status = await self.status() jobs = [] - async for j_status in self.jobs(): + async for j_status in self.jobs(q=_jobs_query_string): + if _max_jobs and len(jobs) == _max_jobs: + break + id = j_status['job_id'] log, job = await asyncio.gather(self.get_job_log(id), self.get_job(id)) jobs.append({'log': log, 'status': job._status}) @@ -926,7 +948,10 @@ async def list_batches(self, q=None, last_batch_id=None, limit=2 ** 64, version= async def get_job(self, batch_id, job_id): b = await self.get_batch(batch_id) j_resp = await self._get(f'/api/v1alpha/batches/{batch_id}/jobs/{job_id}') - j = await j_resp.json() + j = cast( + GetJobResponseV1Alpha, + await j_resp.json() + ) return Job.submitted_job(b, j['job_id'], _status=j) async def get_job_log(self, batch_id, job_id) -> Dict[str, Any]: diff --git a/hail/python/hailtop/batch_client/types.py b/hail/python/hailtop/batch_client/types.py new file mode 100644 index 00000000000..2697cb34ee6 --- /dev/null +++ b/hail/python/hailtop/batch_client/types.py @@ -0,0 +1,43 @@ +from typing import TypedDict, Literal, Optional, List, Any, Dict +from typing_extensions import NotRequired + + +class CostBreakdownEntry(TypedDict): + resource: str + cost: float + + +class GetJobResponseV1Alpha(TypedDict): + batch_id: int + job_id: int + name: Optional[str] + user: str + billing_project: str + state: Literal['Pending', 'Ready', 'Creating', 'Running', 'Failed', 'Cancelled', 'Error', 'Success'] + exit_code: Optional[int] + duration: Optional[int] + cost: Optional[float] + msec_mcpu: int + cost_breakdown: List[CostBreakdownEntry] + status: Optional[Dict[str, Any]] + spec: Optional[Dict[str, Any]] + attributes: NotRequired[Dict[str, str]] + + +class JobListEntryV1Alpha(TypedDict): + batch_id: int + job_id: int + name: Optional[str] + user: str + billing_project: str + state: Literal['Pending', 'Ready', 'Creating', 'Running', 'Failed', 'Cancelled', 'Error', 'Success'] + exit_code: Optional[int] + duration: Optional[int] + cost: Optional[float] + msec_mcpu: int + cost_breakdown: List[CostBreakdownEntry] + + +class GetJobsResponseV1Alpha(TypedDict): + jobs: List[JobListEntryV1Alpha] + last_job_id: NotRequired[int] diff --git a/hail/python/hailtop/hailctl/batch/cli.py b/hail/python/hailtop/hailctl/batch/cli.py index 09bd5a740b9..c8e0017f9f3 100644 --- a/hail/python/hailtop/hailctl/batch/cli.py +++ b/hail/python/hailtop/hailctl/batch/cli.py @@ -4,7 +4,7 @@ from typer import Option as Opt, Argument as Arg import json -from typing import Optional, List, Annotated as Ann +from typing import Optional, List, Annotated as Ann, cast, Dict, Any from . import list_batches from . import billing @@ -146,7 +146,12 @@ def job(batch_id: int, job_id: int, output: StructuredFormatOption = StructuredF if job is not None: assert job._status - print(make_formatter(output)([job._status])) + print(make_formatter(output)([ + cast( + Dict[str, Any], # https://stackoverflow.com/q/71986632/6823256 + job._status + ) + ])) else: print(f"Job with ID {job_id} on batch {batch_id} not found") diff --git a/hail/python/hailtop/utils/utils.py b/hail/python/hailtop/utils/utils.py index ba0b0eea65d..574ce4646c0 100644 --- a/hail/python/hailtop/utils/utils.py +++ b/hail/python/hailtop/utils/utils.py @@ -74,7 +74,7 @@ def first_extant_file(*files: Optional[str]) -> Optional[str]: return None -def cost_str(cost: Optional[int]) -> Optional[str]: +def cost_str(cost: Optional[float]) -> Optional[str]: if cost is None: return None if cost == 0.0: