From 8166e43b0d27bb504d944403bf6537c3fea3c364 Mon Sep 17 00:00:00 2001 From: Dan King Date: Fri, 25 Aug 2023 11:20:42 -0400 Subject: [PATCH 01/11] [qob][batch] do not list all jobs on failure (plus: types!) CHANGELOG: In Query-on-Batch, the client-side Python code will not try to list every job when a QoB batch fails. This could take hours for long-running pipelines or pipelines with many partitions. I also added API types to the list jobs end point because I have to go hunting for this every time anyway. Seems better to have this information at our digital fingertips. --- batch/batch/batch.py | 8 ++--- batch/batch/batch_format_version.py | 3 +- batch/batch/front_end/front_end.py | 11 ++++-- hail/python/hail/backend/service_backend.py | 10 ++++-- hail/python/hailtop/batch_client/__init__.py | 3 +- hail/python/hailtop/batch_client/aioclient.py | 36 ++++++++++++++----- hail/python/hailtop/batch_client/types.py | 20 +++++++++++ 7 files changed, 72 insertions(+), 19 deletions(-) create mode 100644 hail/python/hailtop/batch_client/types.py diff --git a/batch/batch/batch.py b/batch/batch/batch.py index 19b7f092b64..1a9bf70c0cb 100644 --- a/batch/batch/batch.py +++ b/batch/batch/batch.py @@ -1,8 +1,10 @@ +from typing import Any, Dict import json import logging from gear import transaction from hailtop.utils import humanize_timedelta_msecs, time_msecs_str +from hailtop.batch_client.types import JobListEntry from .batch_format_version import BatchFormatVersion from .exceptions import NonExistentBatchError, OpenBatchError @@ -66,7 +68,7 @@ def _time_msecs_str(t): return d -def job_record_to_dict(record, name): +def job_record_to_dict(record: Dict[str, Any], name: str) -> JobListEntry: format_version = BatchFormatVersion(record['format_version']) db_status = record['status'] @@ -77,7 +79,7 @@ def job_record_to_dict(record, name): exit_code = None duration = None - result = { + return { 'batch_id': record['batch_id'], 'job_id': record['job_id'], 'name': name, @@ -90,8 +92,6 @@ def job_record_to_dict(record, name): 'msec_mcpu': record['msec_mcpu'], } - return result - async def cancel_batch_in_db(db, batch_id): @transaction(db) diff --git a/batch/batch/batch_format_version.py b/batch/batch/batch_format_version.py index 088885ef312..683faa1eb39 100644 --- a/batch/batch/batch_format_version.py +++ b/batch/batch/batch_format_version.py @@ -1,3 +1,4 @@ +from typing import Tuple, Optional from hailtop.batch_client.aioclient import Job @@ -117,7 +118,7 @@ def db_status(self, status): return [ec, duration] - def get_status_exit_code_duration(self, status): + def get_status_exit_code_duration(self, status) -> Tuple[Optional[int], Optional[int]]: if self.format_version == 1: job_status = {'status': status} return (Job.exit_code(job_status), Job.total_duration_msecs(job_status)) diff --git a/batch/batch/front_end/front_end.py b/batch/batch/front_end/front_end.py index 4badd5dea48..81be9c49961 100644 --- a/batch/batch/front_end/front_end.py +++ b/batch/batch/front_end/front_end.py @@ -44,6 +44,7 @@ from gear.database import CallError from gear.profiling import install_profiler_if_requested from hailtop import aiotools, dictfix, httpx, version +from hailtop.batch_client.types import JobListEntry, GetJobsResponse from hailtop.batch_client.parse import parse_cpu_in_mcpu, parse_memory_in_bytes, parse_storage_in_bytes from hailtop.config import get_deploy_config from hailtop.hail_logging import AccessLogger @@ -252,7 +253,9 @@ async def _handle_api_error(f: Callable[P, Awaitable[T]], *args: P.args, **kwarg raise e.http_response() -async def _query_batch_jobs(request: web.Request, batch_id: int, version: int, q: str, last_job_id: Optional[int]): +async def _query_batch_jobs( + request: web.Request, batch_id: int, version: int, q: str, last_job_id: Optional[int] +) -> Tuple[List[JobListEntry], Optional[int]]: db: Database = request.app['db'] if version == 1: sql, sql_args = parse_batch_jobs_query_v1(batch_id, q, last_job_id) @@ -270,7 +273,9 @@ async def _query_batch_jobs(request: web.Request, batch_id: int, version: int, q return (jobs, last_job_id) -async def _get_jobs(request, batch_id: int, version: int, q: str, last_job_id: Optional[int]): +async def _get_jobs( + request: web.Request, batch_id: int, version: int, q: str, last_job_id: Optional[int] +) -> GetJobsResponse: db = request.app['db'] record = await db.select_and_fetchone( @@ -285,7 +290,7 @@ async def _get_jobs(request, batch_id: int, version: int, q: str, last_job_id: O jobs, last_job_id = await _query_batch_jobs(request, batch_id, version, q, last_job_id) - resp = {'jobs': jobs} + resp: GetJobsResponse = {'jobs': jobs} if last_job_id is not None: resp['last_job_id'] = last_job_id return resp diff --git a/hail/python/hail/backend/service_backend.py b/hail/python/hail/backend/service_backend.py index 3a82a0df61c..3c75b1cb140 100644 --- a/hail/python/hail/backend/service_backend.py +++ b/hail/python/hail/backend/service_backend.py @@ -493,7 +493,10 @@ async def _read_output(self, ir: Optional[BaseIR], output_uri: str, input_uri: s except FileNotFoundError as exc: raise FatalError('Hail internal error. Please contact the Hail team and provide the following information.\n\n' + yamlx.dump({ 'service_backend_debug_info': self.debug_info(), - 'batch_debug_info': await self._batch.debug_info(), + 'batch_debug_info': await self._batch.debug_info( + _job_filter=lambda x: x['state'] == 'Failed', + _max_jobs=10 + ), 'input_uri': await self._async_fs.read(input_uri), })) from exc @@ -514,7 +517,10 @@ async def _read_output(self, ir: Optional[BaseIR], output_uri: str, input_uri: s except UnexpectedEOFError as exc: raise FatalError('Hail internal error. Please contact the Hail team and provide the following information.\n\n' + yamlx.dump({ 'service_backend_debug_info': self.debug_info(), - 'batch_debug_info': await self._batch.debug_info(), + 'batch_debug_info': await self._batch.debug_info( + _job_filter=lambda x: x['state'] == 'Failed', + _max_jobs=10 + ), 'in': await self._async_fs.read(input_uri), 'out': await self._async_fs.read(output_uri), })) from exc diff --git a/hail/python/hailtop/batch_client/__init__.py b/hail/python/hailtop/batch_client/__init__.py index 60de0b330b8..f9219abf05d 100644 --- a/hail/python/hailtop/batch_client/__init__.py +++ b/hail/python/hailtop/batch_client/__init__.py @@ -1,4 +1,4 @@ -from . import client, aioclient, parse +from . import client, aioclient, parse, types from .aioclient import BatchAlreadyCreatedError, BatchNotCreatedError, JobAlreadySubmittedError, JobNotSubmittedError __all__ = [ @@ -9,4 +9,5 @@ 'client', 'aioclient', 'parse', + 'types', ] diff --git a/hail/python/hailtop/batch_client/aioclient.py b/hail/python/hailtop/batch_client/aioclient.py index 9fcda0fc8f7..02214c4ab00 100644 --- a/hail/python/hailtop/batch_client/aioclient.py +++ b/hail/python/hailtop/batch_client/aioclient.py @@ -1,4 +1,5 @@ -from typing import Optional, Dict, Any, List, Tuple, Union +from typing import Optional, Dict, Any, List, Tuple, Union, AsyncIterable, TypedDict, Callable, cast +from enum import Enum import math import random import logging @@ -17,6 +18,7 @@ from hailtop.utils.rich_progress_bar import is_notebook, BatchProgressBar, BatchProgressBarTask from hailtop import httpx +from .types import GetJobsResponse, JobListEntry from .globals import tasks, complete_states log = logging.getLogger('batch_client.aioclient') @@ -306,6 +308,10 @@ class BatchAlreadyCreatedError(Exception): pass +class BatchDebugInfo(TypedDict): + status: Dict[str, Any] + jobs: List[JobListEntry] + class Batch: def __init__(self, client: 'BatchClient', @@ -352,11 +358,15 @@ def is_created(self): return self._id is not None async def cancel(self): - self._raise_if_not_created() +v self._raise_if_not_created() await self._client._patch(f'/api/v1alpha/batches/{self.id}/cancel') - async def jobs(self, q: Optional[str] = None, version: Optional[int] = None): + async def jobs(self, + q: Optional[str] = None, + version: Optional[int] = None + ) -> AsyncIterable[JobListEntry]: self._raise_if_not_created() +>>>>>>> Stashed changes if version is None: version = 1 last_job_id = None @@ -367,7 +377,10 @@ async def jobs(self, q: Optional[str] = None, version: Optional[int] = None): if last_job_id is not None: params['last_job_id'] = last_job_id resp = await self._client._get(f'/api/v{version}alpha/batches/{self.id}/jobs', params=params) - body = await resp.json() + body = cast( + GetJobsResponse, + await resp.json() + ) for job in body['jobs']: yield job last_job_id = body.get('last_job_id') @@ -462,14 +475,21 @@ async def wait(self, with BatchProgressBar(disable=disable_progress_bar) as progress2: return await self._wait(description, progress2, disable_progress_bar, starting_job) - async def debug_info(self): + async def debug_info(self, + _job_filter: Optional[Callable[[JobListEntry], bool]] = None, + _max_jobs: Optional[int] = None + ) -> BatchDebugInfo: self._raise_if_not_created() batch_status = await self.status() jobs = [] async for j_status in self.jobs(): - id = j_status['job_id'] - log, job = await asyncio.gather(self.get_job_log(id), self.get_job(id)) - jobs.append({'log': log, 'status': job._status}) + if _max_jobs and len(jobs) == _max_jobs: + break + + if _job_filter is None or _job_filter(j_status): + id = j_status['job_id'] + log, job = await asyncio.gather(self.get_job_log(id), self.get_job(id)) + jobs.append({'log': log, 'status': job._status}) return {'status': batch_status, 'jobs': jobs} async def delete(self): diff --git a/hail/python/hailtop/batch_client/types.py b/hail/python/hailtop/batch_client/types.py new file mode 100644 index 00000000000..5dccb061563 --- /dev/null +++ b/hail/python/hailtop/batch_client/types.py @@ -0,0 +1,20 @@ +from typing import TypedDict, Literal, Optional, List +from typing_extensions import NotRequired + + +class JobListEntry(TypedDict): + batch_id: int + job_id: int + name: str + user: str + billing_project: str + state: Literal['Pending', 'Ready', 'Creating', 'Running', 'Failed', 'Cancelled', 'Error', 'Success'] + exit_code: Optional[int] + duration: Optional[int] + cost: float + msec_mcpu: int + + +class GetJobsResponse(TypedDict): + jobs: List[JobListEntry] + last_job_id: NotRequired[int] From 8b1ff3b52e99768431bff7f93971abe22f50208f Mon Sep 17 00:00:00 2001 From: Dan King Date: Fri, 25 Aug 2023 11:22:25 -0400 Subject: [PATCH 02/11] rebase cruft --- hail/python/hailtop/batch_client/aioclient.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hail/python/hailtop/batch_client/aioclient.py b/hail/python/hailtop/batch_client/aioclient.py index 02214c4ab00..73cfa80273c 100644 --- a/hail/python/hailtop/batch_client/aioclient.py +++ b/hail/python/hailtop/batch_client/aioclient.py @@ -358,7 +358,7 @@ def is_created(self): return self._id is not None async def cancel(self): -v self._raise_if_not_created() + self._raise_if_not_created() await self._client._patch(f'/api/v1alpha/batches/{self.id}/cancel') async def jobs(self, @@ -366,7 +366,6 @@ async def jobs(self, version: Optional[int] = None ) -> AsyncIterable[JobListEntry]: self._raise_if_not_created() ->>>>>>> Stashed changes if version is None: version = 1 last_job_id = None From 476559817150752cbfaaa3cf79f9bfeb8601f1bb Mon Sep 17 00:00:00 2001 From: Dan King Date: Fri, 25 Aug 2023 11:47:35 -0400 Subject: [PATCH 03/11] add v1 --- hail/python/hailtop/batch_client/aioclient.py | 10 +++++----- hail/python/hailtop/batch_client/types.py | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/hail/python/hailtop/batch_client/aioclient.py b/hail/python/hailtop/batch_client/aioclient.py index 73cfa80273c..8aa2e8d403b 100644 --- a/hail/python/hailtop/batch_client/aioclient.py +++ b/hail/python/hailtop/batch_client/aioclient.py @@ -18,7 +18,7 @@ from hailtop.utils.rich_progress_bar import is_notebook, BatchProgressBar, BatchProgressBarTask from hailtop import httpx -from .types import GetJobsResponse, JobListEntry +from .types import GetJobsResponsv1, JobListEntryv1 from .globals import tasks, complete_states log = logging.getLogger('batch_client.aioclient') @@ -310,7 +310,7 @@ class BatchAlreadyCreatedError(Exception): class BatchDebugInfo(TypedDict): status: Dict[str, Any] - jobs: List[JobListEntry] + jobs: List[JobListEntryv1] class Batch: def __init__(self, @@ -364,7 +364,7 @@ async def cancel(self): async def jobs(self, q: Optional[str] = None, version: Optional[int] = None - ) -> AsyncIterable[JobListEntry]: + ) -> AsyncIterable[JobListEntryv1]: self._raise_if_not_created() if version is None: version = 1 @@ -377,7 +377,7 @@ async def jobs(self, params['last_job_id'] = last_job_id resp = await self._client._get(f'/api/v{version}alpha/batches/{self.id}/jobs', params=params) body = cast( - GetJobsResponse, + GetJobsResponsev1, await resp.json() ) for job in body['jobs']: @@ -475,7 +475,7 @@ async def wait(self, return await self._wait(description, progress2, disable_progress_bar, starting_job) async def debug_info(self, - _job_filter: Optional[Callable[[JobListEntry], bool]] = None, + _job_filter: Optional[Callable[[JobListEntryv1], bool]] = None, _max_jobs: Optional[int] = None ) -> BatchDebugInfo: self._raise_if_not_created() diff --git a/hail/python/hailtop/batch_client/types.py b/hail/python/hailtop/batch_client/types.py index 5dccb061563..443031dec52 100644 --- a/hail/python/hailtop/batch_client/types.py +++ b/hail/python/hailtop/batch_client/types.py @@ -2,7 +2,7 @@ from typing_extensions import NotRequired -class JobListEntry(TypedDict): +class JobListEntryv1(TypedDict): batch_id: int job_id: int name: str @@ -15,6 +15,6 @@ class JobListEntry(TypedDict): msec_mcpu: int -class GetJobsResponse(TypedDict): - jobs: List[JobListEntry] +class GetJobsResponsev1(TypedDict): + jobs: List[JobListEntryv1] last_job_id: NotRequired[int] From b0bd3b568cdb7b04b06531985cd34e6da800f712 Mon Sep 17 00:00:00 2001 From: Dan King Date: Fri, 25 Aug 2023 11:50:06 -0400 Subject: [PATCH 04/11] mispelling --- hail/python/hailtop/batch_client/aioclient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hail/python/hailtop/batch_client/aioclient.py b/hail/python/hailtop/batch_client/aioclient.py index 8aa2e8d403b..3f42eb10586 100644 --- a/hail/python/hailtop/batch_client/aioclient.py +++ b/hail/python/hailtop/batch_client/aioclient.py @@ -18,7 +18,7 @@ from hailtop.utils.rich_progress_bar import is_notebook, BatchProgressBar, BatchProgressBarTask from hailtop import httpx -from .types import GetJobsResponsv1, JobListEntryv1 +from .types import GetJobsResponsev1, JobListEntryv1 from .globals import tasks, complete_states log = logging.getLogger('batch_client.aioclient') From 16649c7ceb06fdaebdb850f79d6d8fede0e4a1ec Mon Sep 17 00:00:00 2001 From: Dan King Date: Fri, 25 Aug 2023 11:54:09 -0400 Subject: [PATCH 05/11] use query string isntead of client-side filtering --- hail/python/hail/backend/service_backend.py | 4 ++-- hail/python/hailtop/batch_client/aioclient.py | 13 ++++++------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/hail/python/hail/backend/service_backend.py b/hail/python/hail/backend/service_backend.py index 3c75b1cb140..4478e2efe85 100644 --- a/hail/python/hail/backend/service_backend.py +++ b/hail/python/hail/backend/service_backend.py @@ -494,7 +494,7 @@ async def _read_output(self, ir: Optional[BaseIR], output_uri: str, input_uri: s raise FatalError('Hail internal error. Please contact the Hail team and provide the following information.\n\n' + yamlx.dump({ 'service_backend_debug_info': self.debug_info(), 'batch_debug_info': await self._batch.debug_info( - _job_filter=lambda x: x['state'] == 'Failed', + _jobs_query_string='failed', _max_jobs=10 ), 'input_uri': await self._async_fs.read(input_uri), @@ -518,7 +518,7 @@ async def _read_output(self, ir: Optional[BaseIR], output_uri: str, input_uri: s raise FatalError('Hail internal error. Please contact the Hail team and provide the following information.\n\n' + yamlx.dump({ 'service_backend_debug_info': self.debug_info(), 'batch_debug_info': await self._batch.debug_info( - _job_filter=lambda x: x['state'] == 'Failed', + _jobs_query_string='failed', _max_jobs=10 ), 'in': await self._async_fs.read(input_uri), diff --git a/hail/python/hailtop/batch_client/aioclient.py b/hail/python/hailtop/batch_client/aioclient.py index 3f42eb10586..c2b3a377f22 100644 --- a/hail/python/hailtop/batch_client/aioclient.py +++ b/hail/python/hailtop/batch_client/aioclient.py @@ -475,20 +475,19 @@ async def wait(self, return await self._wait(description, progress2, disable_progress_bar, starting_job) async def debug_info(self, - _job_filter: Optional[Callable[[JobListEntryv1], bool]] = None, - _max_jobs: Optional[int] = None + _jobs_query_string: Optional[str] = None, + _max_jobs: Optional[int] = None, ) -> BatchDebugInfo: self._raise_if_not_created() batch_status = await self.status() jobs = [] - async for j_status in self.jobs(): + async for j_status in self.jobs(q=_jobs_query_string): if _max_jobs and len(jobs) == _max_jobs: break - if _job_filter is None or _job_filter(j_status): - id = j_status['job_id'] - log, job = await asyncio.gather(self.get_job_log(id), self.get_job(id)) - jobs.append({'log': log, 'status': job._status}) + id = j_status['job_id'] + log, job = await asyncio.gather(self.get_job_log(id), self.get_job(id)) + jobs.append({'log': log, 'status': job._status}) return {'status': batch_status, 'jobs': jobs} async def delete(self): From 902ac1339dbd94834d49d9fe5140f27f4e6a11d1 Mon Sep 17 00:00:00 2001 From: Dan King Date: Fri, 25 Aug 2023 12:38:13 -0400 Subject: [PATCH 06/11] match versioning in URLs --- hail/python/hailtop/batch_client/aioclient.py | 8 ++++---- hail/python/hailtop/batch_client/types.py | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/hail/python/hailtop/batch_client/aioclient.py b/hail/python/hailtop/batch_client/aioclient.py index c2b3a377f22..787eee006a0 100644 --- a/hail/python/hailtop/batch_client/aioclient.py +++ b/hail/python/hailtop/batch_client/aioclient.py @@ -18,7 +18,7 @@ from hailtop.utils.rich_progress_bar import is_notebook, BatchProgressBar, BatchProgressBarTask from hailtop import httpx -from .types import GetJobsResponsev1, JobListEntryv1 +from .types import GetJobsResponseV1Alpha, JobListEntryV1Alpha from .globals import tasks, complete_states log = logging.getLogger('batch_client.aioclient') @@ -310,7 +310,7 @@ class BatchAlreadyCreatedError(Exception): class BatchDebugInfo(TypedDict): status: Dict[str, Any] - jobs: List[JobListEntryv1] + jobs: List[JobListEntryV1Alpha] class Batch: def __init__(self, @@ -364,7 +364,7 @@ async def cancel(self): async def jobs(self, q: Optional[str] = None, version: Optional[int] = None - ) -> AsyncIterable[JobListEntryv1]: + ) -> AsyncIterable[JobListEntryV1Alpha]: self._raise_if_not_created() if version is None: version = 1 @@ -377,7 +377,7 @@ async def jobs(self, params['last_job_id'] = last_job_id resp = await self._client._get(f'/api/v{version}alpha/batches/{self.id}/jobs', params=params) body = cast( - GetJobsResponsev1, + GetJobsResponseV1Alpha, await resp.json() ) for job in body['jobs']: diff --git a/hail/python/hailtop/batch_client/types.py b/hail/python/hailtop/batch_client/types.py index 443031dec52..8539bb7a935 100644 --- a/hail/python/hailtop/batch_client/types.py +++ b/hail/python/hailtop/batch_client/types.py @@ -2,7 +2,7 @@ from typing_extensions import NotRequired -class JobListEntryv1(TypedDict): +class JobListEntryV1Alpha(TypedDict): batch_id: int job_id: int name: str @@ -15,6 +15,6 @@ class JobListEntryv1(TypedDict): msec_mcpu: int -class GetJobsResponsev1(TypedDict): - jobs: List[JobListEntryv1] +class GetJobsResponseV1Alpha(TypedDict): + jobs: List[JobListEntryV1Alpha] last_job_id: NotRequired[int] From c86c4b95024a9daa43fda0c5405c18332cb85d49 Mon Sep 17 00:00:00 2001 From: Dan King Date: Thu, 31 Aug 2023 14:30:48 -0400 Subject: [PATCH 07/11] also fix front_end.py --- batch/batch/front_end/front_end.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/batch/batch/front_end/front_end.py b/batch/batch/front_end/front_end.py index 81be9c49961..35921fd59f9 100644 --- a/batch/batch/front_end/front_end.py +++ b/batch/batch/front_end/front_end.py @@ -44,7 +44,7 @@ from gear.database import CallError from gear.profiling import install_profiler_if_requested from hailtop import aiotools, dictfix, httpx, version -from hailtop.batch_client.types import JobListEntry, GetJobsResponse +from hailtop.batch_client.types import JobListEntryV1Alpha, GetJobsResponseV1Alpha from hailtop.batch_client.parse import parse_cpu_in_mcpu, parse_memory_in_bytes, parse_storage_in_bytes from hailtop.config import get_deploy_config from hailtop.hail_logging import AccessLogger @@ -255,7 +255,7 @@ async def _handle_api_error(f: Callable[P, Awaitable[T]], *args: P.args, **kwarg async def _query_batch_jobs( request: web.Request, batch_id: int, version: int, q: str, last_job_id: Optional[int] -) -> Tuple[List[JobListEntry], Optional[int]]: +) -> Tuple[List[JobListEntryV1Alpha], Optional[int]]: db: Database = request.app['db'] if version == 1: sql, sql_args = parse_batch_jobs_query_v1(batch_id, q, last_job_id) @@ -275,7 +275,7 @@ async def _query_batch_jobs( async def _get_jobs( request: web.Request, batch_id: int, version: int, q: str, last_job_id: Optional[int] -) -> GetJobsResponse: +) -> GetJobsResponseV1Alpha: db = request.app['db'] record = await db.select_and_fetchone( @@ -290,7 +290,7 @@ async def _get_jobs( jobs, last_job_id = await _query_batch_jobs(request, batch_id, version, q, last_job_id) - resp: GetJobsResponse = {'jobs': jobs} + resp: GetJobsResponseV1Alpha = {'jobs': jobs} if last_job_id is not None: resp['last_job_id'] = last_job_id return resp From eec7cb930f5ce1e2439a009674b951cdd051b83b Mon Sep 17 00:00:00 2001 From: Dan King Date: Thu, 31 Aug 2023 18:39:56 -0400 Subject: [PATCH 08/11] sort import blocks --- batch/batch/batch.py | 3 +-- batch/batch/batch_format_version.py | 3 ++- batch/batch/front_end/front_end.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/batch/batch/batch.py b/batch/batch/batch.py index 4227084f5f3..0798252a4a9 100644 --- a/batch/batch/batch.py +++ b/batch/batch/batch.py @@ -1,11 +1,10 @@ -from typing import Any, Dict import json import logging from typing import Any, Dict, Optional from gear import transaction -from hailtop.utils import humanize_timedelta_msecs, time_msecs_str from hailtop.batch_client.types import JobListEntryV1Alpha +from hailtop.utils import humanize_timedelta_msecs, time_msecs_str from .batch_format_version import BatchFormatVersion from .exceptions import NonExistentBatchError, OpenBatchError diff --git a/batch/batch/batch_format_version.py b/batch/batch/batch_format_version.py index 683faa1eb39..06c8dc07f2a 100644 --- a/batch/batch/batch_format_version.py +++ b/batch/batch/batch_format_version.py @@ -1,4 +1,5 @@ -from typing import Tuple, Optional +from typing import Optional, Tuple + from hailtop.batch_client.aioclient import Job diff --git a/batch/batch/front_end/front_end.py b/batch/batch/front_end/front_end.py index 80ff8d1c8ad..1918ee6ff48 100644 --- a/batch/batch/front_end/front_end.py +++ b/batch/batch/front_end/front_end.py @@ -44,8 +44,8 @@ from gear.database import CallError from gear.profiling import install_profiler_if_requested from hailtop import aiotools, dictfix, httpx, version -from hailtop.batch_client.types import JobListEntryV1Alpha, GetJobsResponseV1Alpha, GetJobResponseV1Alpha from hailtop.batch_client.parse import parse_cpu_in_mcpu, parse_memory_in_bytes, parse_storage_in_bytes +from hailtop.batch_client.types import GetJobResponseV1Alpha, GetJobsResponseV1Alpha, JobListEntryV1Alpha from hailtop.config import get_deploy_config from hailtop.hail_logging import AccessLogger from hailtop.tls import internal_server_ssl_context From 02295e6f8c93ff5c9bf644de267403bea07a4365 Mon Sep 17 00:00:00 2001 From: Dan King Date: Fri, 1 Sep 2023 11:26:29 -0400 Subject: [PATCH 09/11] cleanup merge conflict issues --- batch/batch/batch.py | 6 +++--- hail/python/hailtop/batch_client/types.py | 7 +++++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/batch/batch/batch.py b/batch/batch/batch.py index ad182b39df9..355fa063314 100644 --- a/batch/batch/batch.py +++ b/batch/batch/batch.py @@ -1,9 +1,9 @@ import json import logging -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, List from gear import transaction -from hailtop.batch_client.types import JobListEntryV1Alpha +from hailtop.batch_client.types import JobListEntryV1Alpha, CostBreakdownEntry from hailtop.utils import humanize_timedelta_msecs, time_msecs_str from .batch_format_version import BatchFormatVersion @@ -13,7 +13,7 @@ log = logging.getLogger('batch') -def cost_breakdown_to_dict(cost_breakdown: dict): +def cost_breakdown_to_dict(cost_breakdown: Dict[str, float]) -> List[CostBreakdownEntry]: return [{'resource': resource, 'cost': cost} for resource, cost in cost_breakdown.items()] diff --git a/hail/python/hailtop/batch_client/types.py b/hail/python/hailtop/batch_client/types.py index 64189726aa5..2697cb34ee6 100644 --- a/hail/python/hailtop/batch_client/types.py +++ b/hail/python/hailtop/batch_client/types.py @@ -2,6 +2,11 @@ from typing_extensions import NotRequired +class CostBreakdownEntry(TypedDict): + resource: str + cost: float + + class GetJobResponseV1Alpha(TypedDict): batch_id: int job_id: int @@ -13,6 +18,7 @@ class GetJobResponseV1Alpha(TypedDict): duration: Optional[int] cost: Optional[float] msec_mcpu: int + cost_breakdown: List[CostBreakdownEntry] status: Optional[Dict[str, Any]] spec: Optional[Dict[str, Any]] attributes: NotRequired[Dict[str, str]] @@ -29,6 +35,7 @@ class JobListEntryV1Alpha(TypedDict): duration: Optional[int] cost: Optional[float] msec_mcpu: int + cost_breakdown: List[CostBreakdownEntry] class GetJobsResponseV1Alpha(TypedDict): From ffdcfad5389a65bc227c0f1b5c1dfb5b01d0e278 Mon Sep 17 00:00:00 2001 From: Dan King Date: Fri, 1 Sep 2023 11:39:58 -0400 Subject: [PATCH 10/11] cast away types when creating UI elements --- batch/batch/front_end/front_end.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/batch/batch/front_end/front_end.py b/batch/batch/front_end/front_end.py index a9f96ba175c..ed111e36213 100644 --- a/batch/batch/front_end/front_end.py +++ b/batch/batch/front_end/front_end.py @@ -11,7 +11,7 @@ import traceback from functools import wraps from numbers import Number -from typing import Any, Awaitable, Callable, Dict, List, NoReturn, Optional, Tuple, TypeVar, Union +from typing import Any, Awaitable, Callable, Dict, List, NoReturn, Optional, Tuple, TypeVar, Union, cast import aiohttp import aiohttp.web_exceptions @@ -1744,7 +1744,7 @@ async def ui_batches(request: web.Request, userdata: UserData) -> web.Response: return await render_template('batch', request, userdata, 'batches.html', page_context) -async def _get_job(app, batch_id, job_id): +async def _get_job(app, batch_id, job_id) -> GetJobResponseV1Alpha: db: Database = app['db'] record = await db.select_and_fetchone( @@ -2079,6 +2079,8 @@ async def ui_get_job(request, userdata, batch_id): _get_job_resource_usage(app, batch_id, job_id), ) + job = cast(Dict[str, Any], job) + job['duration'] = humanize_timedelta_msecs(job['duration']) job['cost'] = cost_str(job['cost']) From 629c3becff4bbf25a3facefe0c173440233c1667 Mon Sep 17 00:00:00 2001 From: Dan King Date: Fri, 1 Sep 2023 14:01:41 -0400 Subject: [PATCH 11/11] sort --- batch/batch/batch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/batch/batch/batch.py b/batch/batch/batch.py index 355fa063314..8eb5209a1e6 100644 --- a/batch/batch/batch.py +++ b/batch/batch/batch.py @@ -1,9 +1,9 @@ import json import logging -from typing import Any, Dict, Optional, List +from typing import Any, Dict, List, Optional from gear import transaction -from hailtop.batch_client.types import JobListEntryV1Alpha, CostBreakdownEntry +from hailtop.batch_client.types import CostBreakdownEntry, JobListEntryV1Alpha from hailtop.utils import humanize_timedelta_msecs, time_msecs_str from .batch_format_version import BatchFormatVersion