Skip to content

Commit

Permalink
[qob][batch] do not list all jobs on failure (plus: types!) (#13500)
Browse files Browse the repository at this point in the history
CHANGELOG: In Query-on-Batch, the client-side Python code will not try
to list every job when a QoB batch fails. This could take hours for
long-running pipelines or pipelines with many partitions.

I also added API types to the list jobs end point because I have to go
hunting for this every time anyway. Seems better to have this
information at our digital fingertips.
  • Loading branch information
danking authored Sep 6, 2023
1 parent 4d0379b commit 4fb659c
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 46 deletions.
11 changes: 5 additions & 6 deletions batch/batch/batch.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
import logging
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

from gear import transaction
from hailtop.batch_client.types import CostBreakdownEntry, JobListEntryV1Alpha
from hailtop.utils import humanize_timedelta_msecs, time_msecs_str

from .batch_format_version import BatchFormatVersion
Expand All @@ -12,7 +13,7 @@
log = logging.getLogger('batch')


def cost_breakdown_to_dict(cost_breakdown: dict):
def cost_breakdown_to_dict(cost_breakdown: Dict[str, float]) -> List[CostBreakdownEntry]:
return [{'resource': resource, 'cost': cost} for resource, cost in cost_breakdown.items()]


Expand Down Expand Up @@ -75,7 +76,7 @@ def _time_msecs_str(t):
return d


def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> Dict[str, Any]:
def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> JobListEntryV1Alpha:
format_version = BatchFormatVersion(record['format_version'])

db_status = record['status']
Expand All @@ -89,7 +90,7 @@ def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> Dict[str,
if record['cost_breakdown'] is not None:
record['cost_breakdown'] = cost_breakdown_to_dict(json.loads(record['cost_breakdown']))

result = {
return {
'batch_id': record['batch_id'],
'job_id': record['job_id'],
'name': name,
Expand All @@ -103,8 +104,6 @@ def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> Dict[str,
'cost_breakdown': record['cost_breakdown'],
}

return result


async def cancel_batch_in_db(db, batch_id):
@transaction(db)
Expand Down
4 changes: 3 additions & 1 deletion batch/batch/batch_format_version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional, Tuple

from hailtop.batch_client.aioclient import Job


Expand Down Expand Up @@ -117,7 +119,7 @@ def db_status(self, status):

return [ec, duration]

def get_status_exit_code_duration(self, status):
def get_status_exit_code_duration(self, status) -> Tuple[Optional[int], Optional[int]]:
if self.format_version == 1:
job_status = {'status': status}
return (Job.exit_code(job_status), Job.total_duration_msecs(job_status))
Expand Down
54 changes: 32 additions & 22 deletions batch/batch/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import traceback
from functools import wraps
from numbers import Number
from typing import Any, Awaitable, Callable, Dict, List, NoReturn, Optional, Tuple, TypeVar, Union
from typing import Any, Awaitable, Callable, Dict, List, NoReturn, Optional, Tuple, TypeVar, Union, cast

import aiohttp
import aiohttp.web_exceptions
Expand Down Expand Up @@ -45,6 +45,7 @@
from gear.profiling import install_profiler_if_requested
from hailtop import aiotools, dictfix, httpx, version
from hailtop.batch_client.parse import parse_cpu_in_mcpu, parse_memory_in_bytes, parse_storage_in_bytes
from hailtop.batch_client.types import GetJobResponseV1Alpha, GetJobsResponseV1Alpha, JobListEntryV1Alpha
from hailtop.config import get_deploy_config
from hailtop.hail_logging import AccessLogger
from hailtop.tls import internal_server_ssl_context
Expand Down Expand Up @@ -252,7 +253,9 @@ async def _handle_api_error(f: Callable[P, Awaitable[T]], *args: P.args, **kwarg
raise e.http_response()


async def _query_batch_jobs(request: web.Request, batch_id: int, version: int, q: str, last_job_id: Optional[int]):
async def _query_batch_jobs(
request: web.Request, batch_id: int, version: int, q: str, last_job_id: Optional[int]
) -> Tuple[List[JobListEntryV1Alpha], Optional[int]]:
db: Database = request.app['db']
if version == 1:
sql, sql_args = parse_batch_jobs_query_v1(batch_id, q, last_job_id)
Expand All @@ -270,7 +273,9 @@ async def _query_batch_jobs(request: web.Request, batch_id: int, version: int, q
return (jobs, last_job_id)


async def _get_jobs(request, batch_id: int, version: int, q: str, last_job_id: Optional[int]):
async def _get_jobs(
request: web.Request, batch_id: int, version: int, q: str, last_job_id: Optional[int]
) -> GetJobsResponseV1Alpha:
db = request.app['db']

record = await db.select_and_fetchone(
Expand Down Expand Up @@ -1739,7 +1744,7 @@ async def ui_batches(request: web.Request, userdata: UserData) -> web.Response:
return await render_template('batch', request, userdata, 'batches.html', page_context)


async def _get_job(app, batch_id, job_id):
async def _get_job(app, batch_id, job_id) -> GetJobResponseV1Alpha:
db: Database = app['db']

record = await db.select_and_fetchone(
Expand Down Expand Up @@ -1786,9 +1791,11 @@ async def _get_job(app, batch_id, job_id):
_get_full_job_status(app, record), _get_full_job_spec(app, record), _get_attributes(app, record)
)

job = job_record_to_dict(record, attributes.get('name'))
job['status'] = full_status
job['spec'] = full_spec
job: GetJobResponseV1Alpha = {
**job_record_to_dict(record, attributes.get('name')),
'status': full_status,
'spec': full_spec,
}
if attributes:
job['attributes'] = attributes
return job
Expand Down Expand Up @@ -2072,6 +2079,8 @@ async def ui_get_job(request, userdata, batch_id):
_get_job_resource_usage(app, batch_id, job_id),
)

job = cast(Dict[str, Any], job)

job['duration'] = humanize_timedelta_msecs(job['duration'])
job['cost'] = cost_str(job['cost'])

Expand Down Expand Up @@ -2131,21 +2140,22 @@ async def ui_get_job(request, userdata, batch_id):
non_io_storage_limit_bytes = None
memory_limit_bytes = None

resources = job_specification['resources']
if 'memory_bytes' in resources:
memory_limit_bytes = resources['memory_bytes']
resources['actual_memory'] = humanize.naturalsize(memory_limit_bytes, binary=True)
del resources['memory_bytes']
if 'storage_gib' in resources:
io_storage_limit_bytes = resources['storage_gib'] * 1024**3
resources['actual_storage'] = humanize.naturalsize(io_storage_limit_bytes, binary=True)
del resources['storage_gib']
if 'cores_mcpu' in resources:
cores = resources['cores_mcpu'] / 1000
non_io_storage_limit_gb = min(cores * RESERVED_STORAGE_GB_PER_CORE, RESERVED_STORAGE_GB_PER_CORE)
non_io_storage_limit_bytes = int(non_io_storage_limit_gb * 1024**3 + 1)
resources['actual_cpu'] = cores
del resources['cores_mcpu']
if job_specification is not None:
resources = job_specification['resources']
if 'memory_bytes' in resources:
memory_limit_bytes = resources['memory_bytes']
resources['actual_memory'] = humanize.naturalsize(memory_limit_bytes, binary=True)
del resources['memory_bytes']
if 'storage_gib' in resources:
io_storage_limit_bytes = resources['storage_gib'] * 1024**3
resources['actual_storage'] = humanize.naturalsize(io_storage_limit_bytes, binary=True)
del resources['storage_gib']
if 'cores_mcpu' in resources:
cores = resources['cores_mcpu'] / 1000
non_io_storage_limit_gb = min(cores * RESERVED_STORAGE_GB_PER_CORE, RESERVED_STORAGE_GB_PER_CORE)
non_io_storage_limit_bytes = int(non_io_storage_limit_gb * 1024**3 + 1)
resources['actual_cpu'] = cores
del resources['cores_mcpu']

# Not all logs will be proper utf-8 but we attempt to show them as
# str or else Jinja will present them surrounded by b''
Expand Down
10 changes: 8 additions & 2 deletions hail/python/hail/backend/service_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,10 @@ async def _read_output(self, ir: Optional[BaseIR], output_uri: str, input_uri: s
except FileNotFoundError as exc:
raise FatalError('Hail internal error. Please contact the Hail team and provide the following information.\n\n' + yamlx.dump({
'service_backend_debug_info': self.debug_info(),
'batch_debug_info': await self._batch.debug_info(),
'batch_debug_info': await self._batch.debug_info(
_jobs_query_string='failed',
_max_jobs=10
),
'input_uri': await self._async_fs.read(input_uri),
})) from exc

Expand All @@ -514,7 +517,10 @@ async def _read_output(self, ir: Optional[BaseIR], output_uri: str, input_uri: s
except UnexpectedEOFError as exc:
raise FatalError('Hail internal error. Please contact the Hail team and provide the following information.\n\n' + yamlx.dump({
'service_backend_debug_info': self.debug_info(),
'batch_debug_info': await self._batch.debug_info(),
'batch_debug_info': await self._batch.debug_info(
_jobs_query_string='failed',
_max_jobs=10
),
'in': await self._async_fs.read(input_uri),
'out': await self._async_fs.read(output_uri),
})) from exc
Expand Down
3 changes: 2 additions & 1 deletion hail/python/hailtop/batch_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from . import client, aioclient, parse
from . import client, aioclient, parse, types
from .aioclient import BatchAlreadyCreatedError, BatchNotCreatedError, JobAlreadySubmittedError, JobNotSubmittedError

__all__ = [
Expand All @@ -9,4 +9,5 @@
'client',
'aioclient',
'parse',
'types',
]
47 changes: 36 additions & 11 deletions hail/python/hailtop/batch_client/aioclient.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import AsyncIterator, Optional, Dict, Any, List, Tuple, Union
from typing import Optional, Dict, Any, List, Tuple, Union, AsyncIterator, TypedDict, cast
import math
import random
import logging
Expand All @@ -17,6 +17,7 @@
from hailtop.utils.rich_progress_bar import is_notebook, BatchProgressBar, BatchProgressBarTask
from hailtop import httpx

from .types import GetJobsResponseV1Alpha, JobListEntryV1Alpha, GetJobResponseV1Alpha
from .globals import tasks, complete_states

log = logging.getLogger('batch_client.aioclient')
Expand Down Expand Up @@ -173,7 +174,7 @@ def _get_duration(container_status):
return sum(durations) # type: ignore

@staticmethod
def submitted_job(batch: 'Batch', job_id: int, _status: Optional[dict] = None):
def submitted_job(batch: 'Batch', job_id: int, _status: Optional[GetJobResponseV1Alpha] = None):
return Job(batch, AbsoluteJobId(job_id), _status=_status)

@staticmethod
Expand All @@ -184,7 +185,7 @@ def __init__(self,
batch: 'Batch',
job_id: Union[AbsoluteJobId, InUpdateJobId],
*,
_status: Optional[dict] = None):
_status: Optional[GetJobResponseV1Alpha] = None):
self._batch = batch
self._job_id = job_id
self._status = _status
Expand Down Expand Up @@ -227,7 +228,9 @@ async def attributes(self):
if not self._status:
await self.status()
assert self._status is not None
return self._status['attributes']
if 'attributes' in self._status:
return self._status['attributes']
return {}

async def _is_job_in_state(self, states):
await self.status()
Expand Down Expand Up @@ -267,9 +270,12 @@ async def status(self) -> Dict[str, Any]:
return self._status

async def wait(self) -> Dict[str, Any]:
return await self._wait_for_states(*complete_states)
return cast(
Dict[str, Any], # https://stackoverflow.com/a/76515675/6823256
await self._wait_for_states(*complete_states)
)

async def _wait_for_states(self, *states: str) -> Dict[str, Any]:
async def _wait_for_states(self, *states: str) -> GetJobResponseV1Alpha:
tries = 0
while True:
if await self._is_job_in_state(states) or await self.is_complete():
Expand Down Expand Up @@ -307,6 +313,10 @@ class BatchAlreadyCreatedError(Exception):
pass


class BatchDebugInfo(TypedDict):
status: Dict[str, Any]
jobs: List[JobListEntryV1Alpha]

class Batch:
def __init__(self,
client: 'BatchClient',
Expand Down Expand Up @@ -356,7 +366,10 @@ async def cancel(self):
self._raise_if_not_created()
await self._client._patch(f'/api/v1alpha/batches/{self.id}/cancel')

async def jobs(self, q: Optional[str] = None, version: Optional[int] = None) -> AsyncIterator[Dict[str, Any]]:
async def jobs(self,
q: Optional[str] = None,
version: Optional[int] = None
) -> AsyncIterator[JobListEntryV1Alpha]:
self._raise_if_not_created()
if version is None:
version = 1
Expand All @@ -368,7 +381,10 @@ async def jobs(self, q: Optional[str] = None, version: Optional[int] = None) ->
if last_job_id is not None:
params['last_job_id'] = last_job_id
resp = await self._client._get(f'/api/v{version}alpha/batches/{self.id}/jobs', params=params)
body = await resp.json()
body = cast(
GetJobsResponseV1Alpha,
await resp.json()
)
for job in body['jobs']:
yield job
last_job_id = body.get('last_job_id')
Expand Down Expand Up @@ -463,11 +479,17 @@ async def wait(self,
with BatchProgressBar(disable=disable_progress_bar) as progress2:
return await self._wait(description, progress2, disable_progress_bar, starting_job)

async def debug_info(self):
async def debug_info(self,
_jobs_query_string: Optional[str] = None,
_max_jobs: Optional[int] = None,
) -> BatchDebugInfo:
self._raise_if_not_created()
batch_status = await self.status()
jobs = []
async for j_status in self.jobs():
async for j_status in self.jobs(q=_jobs_query_string):
if _max_jobs and len(jobs) == _max_jobs:
break

id = j_status['job_id']
log, job = await asyncio.gather(self.get_job_log(id), self.get_job(id))
jobs.append({'log': log, 'status': job._status})
Expand Down Expand Up @@ -926,7 +948,10 @@ async def list_batches(self, q=None, last_batch_id=None, limit=2 ** 64, version=
async def get_job(self, batch_id, job_id):
b = await self.get_batch(batch_id)
j_resp = await self._get(f'/api/v1alpha/batches/{batch_id}/jobs/{job_id}')
j = await j_resp.json()
j = cast(
GetJobResponseV1Alpha,
await j_resp.json()
)
return Job.submitted_job(b, j['job_id'], _status=j)

async def get_job_log(self, batch_id, job_id) -> Dict[str, Any]:
Expand Down
43 changes: 43 additions & 0 deletions hail/python/hailtop/batch_client/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import TypedDict, Literal, Optional, List, Any, Dict
from typing_extensions import NotRequired


class CostBreakdownEntry(TypedDict):
resource: str
cost: float


class GetJobResponseV1Alpha(TypedDict):
batch_id: int
job_id: int
name: Optional[str]
user: str
billing_project: str
state: Literal['Pending', 'Ready', 'Creating', 'Running', 'Failed', 'Cancelled', 'Error', 'Success']
exit_code: Optional[int]
duration: Optional[int]
cost: Optional[float]
msec_mcpu: int
cost_breakdown: List[CostBreakdownEntry]
status: Optional[Dict[str, Any]]
spec: Optional[Dict[str, Any]]
attributes: NotRequired[Dict[str, str]]


class JobListEntryV1Alpha(TypedDict):
batch_id: int
job_id: int
name: Optional[str]
user: str
billing_project: str
state: Literal['Pending', 'Ready', 'Creating', 'Running', 'Failed', 'Cancelled', 'Error', 'Success']
exit_code: Optional[int]
duration: Optional[int]
cost: Optional[float]
msec_mcpu: int
cost_breakdown: List[CostBreakdownEntry]


class GetJobsResponseV1Alpha(TypedDict):
jobs: List[JobListEntryV1Alpha]
last_job_id: NotRequired[int]
9 changes: 7 additions & 2 deletions hail/python/hailtop/hailctl/batch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typer import Option as Opt, Argument as Arg
import json

from typing import Optional, List, Annotated as Ann
from typing import Optional, List, Annotated as Ann, cast, Dict, Any

from . import list_batches
from . import billing
Expand Down Expand Up @@ -146,7 +146,12 @@ def job(batch_id: int, job_id: int, output: StructuredFormatOption = StructuredF

if job is not None:
assert job._status
print(make_formatter(output)([job._status]))
print(make_formatter(output)([
cast(
Dict[str, Any], # https://stackoverflow.com/q/71986632/6823256
job._status
)
]))
else:
print(f"Job with ID {job_id} on batch {batch_id} not found")

Expand Down
Loading

0 comments on commit 4fb659c

Please sign in to comment.