Skip to content

Commit

Permalink
Fix bug with determining if a job was preempted, minor improvements t…
Browse files Browse the repository at this point in the history
…o exception handling, data model, logging, and client (#202)

* Minor improvements to exc handling and logging

* Remove internal usage of `Job.is_done`

* make version test flaky

* Add `Beaker.job.preempt()` method

* Fix bug in `experiment.wait_for()` w/ preempted jobs
  • Loading branch information
epwalsh authored Jan 26, 2023
1 parent 105bd4d commit d5894a9
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 23 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ use patch releases for compatibility fixes instead.
### Added

- Added missing `user_restrictions` field to `Cluster` data model.
- Added `Beaker.job.preempt()` method.
- Added `Job.was_preempted` property.
- Added `job` attribute to `JobFailedError` and `task` attribute to `TaskStoppedError`.
- Added DEBUG logging statements for every request and response to/from the Beaker server. To see these, just enable logging at the DEBUG level (though you may want to disable DEBUG logging from the "urllib3" logger, as that will create a lot of noise). For example:
```python
import logging

logging.basicConfig(level=logging.DEBUG)
logging.getLogger("urllib3").setLevel(logging.INFO)
```

### Fixed

- Fixed bug where `Beaker.experiment.wait_for()` would fail if a job was preempted.

## [v1.15.0](https://github.com/allenai/beaker-py/releases/tag/v1.15.0) - 2023-01-19

Expand Down
7 changes: 6 additions & 1 deletion beaker/data_model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
logger = logging.getLogger("beaker")


__all__ = ["BaseModel", "MappedSequence", "StrEnum", "BasePage"]
__all__ = ["BaseModel", "MappedSequence", "StrEnum", "IntEnum", "BasePage"]


BUG_REPORT_URL = (
Expand Down Expand Up @@ -156,6 +156,11 @@ def __str__(self) -> str:
return self.value


class IntEnum(int, Enum):
def __str__(self) -> str:
return str(self.value)


class BasePage(BaseModel, Generic[T]):
data: Tuple[T, ...]
next_cursor: Optional[str] = None
Expand Down
22 changes: 15 additions & 7 deletions beaker/data_model/job.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional, Tuple

from pydantic import Field, validator

from .account import Account
from .base import BaseModel, StrEnum
from .base import BaseModel, IntEnum, StrEnum
from .experiment_spec import DataMount, EnvVar, ImageSource, Priority, TaskSpec

__all__ = [
Expand Down Expand Up @@ -41,7 +40,7 @@ class CurrentJobStatus(StrEnum):
preempted = "preempted"


class CanceledCode(Enum):
class CanceledCode(IntEnum):
not_set = 0
system_preemption = 1
user_preemption = 2
Expand Down Expand Up @@ -186,6 +185,13 @@ def is_done(self) -> bool:
"""
return self.status.current == CurrentJobStatus.finalized

@property
def was_preempted(self) -> bool:
return self.status.canceled is not None and self.status.canceled_code in {
CanceledCode.system_preemption,
CanceledCode.user_preemption,
}

def check(self):
"""
:raises JobFailedError: If the job failed or was canceled.
Expand All @@ -194,12 +200,13 @@ def check(self):

if self.status.exit_code is not None and self.status.exit_code > 0:
raise JobFailedError(
f"Job '{self.id}' exited with non-zero exit code ({self.status.exit_code})"
f"Job '{self.id}' exited with non-zero exit code ({self.status.exit_code})",
job=self,
)
elif self.status.canceled is not None:
raise JobFailedError(f"Job '{self.id}' was canceled")
raise JobFailedError(f"Job '{self.id}' was canceled", job=self)
elif self.status.failed is not None:
raise JobFailedError(f"Job '{self.id}' failed")
raise JobFailedError(f"Job '{self.id}' failed", job=self)


class Jobs(BaseModel):
Expand All @@ -215,7 +222,8 @@ class JobStatusUpdate(BaseModel):
failed: Optional[bool] = None
finalized: Optional[bool] = None
canceled: Optional[bool] = None
canceled_for: Optional[CanceledCode] = None
canceled_for: Optional[str] = None
canceled_code: Optional[CanceledCode] = None
idle: Optional[bool] = None
message: Optional[str] = None

Expand Down
16 changes: 14 additions & 2 deletions beaker/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,20 @@
and :exc:`ValidationError`, which is re-exported from `pydantic <https://pydantic-docs.helpmanual.io/>`_.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Optional

from pydantic import ValidationError # noqa: F401, re-imported here for convenience
from requests.exceptions import ( # noqa: F401, re-imported here for convenience
HTTPError,
RequestException,
)

if TYPE_CHECKING:
from .data_model.experiment import Task
from .data_model.job import Job

ValidationError.__doc__ = """
Raised when data passed into a :mod:`DataModel <beaker.data_model>` is invalid.
"""
Expand Down Expand Up @@ -222,11 +230,15 @@ class ChecksumFailedError(BeakerError):


class TaskStoppedError(BeakerError):
pass
def __init__(self, msg: Optional[str] = None, task: Optional[Task] = None):
super().__init__(msg)
self.task = task


class JobFailedError(BeakerError):
pass
def __init__(self, msg: Optional[str] = None, job: Optional[Job] = None):
super().__init__(msg)
self.job = job


class JobTimeoutError(BeakerError, TimeoutError):
Expand Down
18 changes: 8 additions & 10 deletions beaker/services/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,7 @@ def complete_experiment(exp_id: str) -> Experiment:
exp_id = job.execution.experiment
task_id = job.execution.task

if job.status.canceled_code in {
CanceledCode.system_preemption,
CanceledCode.user_preemption,
}:
if job.was_preempted:
# Job was preempted. Another job will start soon so we just stop
# tracking this one.
exp_to_task_to_job[exp_id][task_id] = None
Expand Down Expand Up @@ -612,11 +609,10 @@ def complete_experiment(exp_id: str) -> Experiment:
if task_to_job[task.id] is not None:
continue

if not task.jobs:
if not task.schedulable:
# Task was stopped before a job was created.
stopped_tasks.add(task.id)
else:
if not task.jobs and not task.schedulable:
# Task was stopped before a job was created.
stopped_tasks.add(task.id)
elif task.jobs:
latest_job = self._latest_job(task.jobs)
assert latest_job is not None
task_to_job[task.id] = latest_job.id
Expand Down Expand Up @@ -693,7 +689,9 @@ def follow(
job = self.latest_job(experiment, task=actual_task)
elif not actual_task.schedulable:
if strict:
raise TaskStoppedError(task)
raise TaskStoppedError(
task.id if isinstance(task, Task) else task, task=actual_task
)
else:
return self.get(
experiment.id if isinstance(experiment, Experiment) else experiment
Expand Down
37 changes: 34 additions & 3 deletions beaker/services/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,33 @@ def finalize(self, job: Union[str, Job]) -> Job:
).json()
)

def preempt(self, job: Union[str, Job]) -> Job:
"""
Preempt a job.
:param job: The Beaker job ID or object.
:raises JobNotFound: If the job can't be found.
:raises BeakerError: Any other :class:`~beaker.exceptions.BeakerError` type that can occur.
:raises RequestException: Any other exception that can occur when contacting the
Beaker server.
"""
job_id = job.id if isinstance(job, Job) else job
return Job.from_json(
self.request(
f"jobs/{job_id}",
method="PATCH",
exceptions_for_status={404: JobNotFound(job_id)},
data=JobPatch(
status=JobStatusUpdate(
canceled=True,
canceled_code=CanceledCode.user_preemption,
canceled_for=f"Preempted by user '{self.beaker.account.name}'",
)
),
).json()
)

def stop(self, job: Union[str, Job]) -> Job:
"""
Stop a job.
Expand All @@ -247,7 +274,11 @@ def stop(self, job: Union[str, Job]) -> Job:
f"jobs/{job_id}",
method="PATCH",
exceptions_for_status={404: JobNotFound(job_id)},
data=JobPatch(status=JobStatusUpdate(canceled=True)),
data=JobPatch(
status=JobStatusUpdate(
canceled=True, canceled_for=f"Stopped by user '{self.beaker.account.name}'"
)
),
).json()
)

Expand Down Expand Up @@ -470,7 +501,7 @@ def pull_logs_since(updated_job: Job, final: bool = False):
yield line

# Check status of job, finish if job is no-longer running.
if updated_job.is_done:
if updated_job.is_finalized:
break

# Check timeout if we're still waiting for job to complete.
Expand Down Expand Up @@ -549,7 +580,7 @@ def display_name(j: Job) -> str:
for job_id in list(job_id_to_progress_task):
task_id = job_id_to_progress_task[job_id]
job = self.get(job_id)
if not job.is_done:
if not job.is_finalized:
progress.update(task_id, total=polls + 1, advance=1)
else:
# Ensure job was successful if `strict==True`.
Expand Down
21 changes: 21 additions & 0 deletions beaker/services/service_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import json
import logging
import urllib.parse
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union

Expand Down Expand Up @@ -28,6 +29,10 @@ def config(self) -> Config:
def docker(self) -> docker.DockerClient:
return self.beaker.docker

@property
def logger(self) -> logging.Logger:
return self.beaker.logger

def request(
self,
resource: str,
Expand Down Expand Up @@ -69,6 +74,16 @@ def make_request(session: requests.Session) -> requests.Response:
f"Unexpected type for 'data'. Expected 'dict' or 'BaseModel', got {type(data)}"
)

# Log request at DEBUG.
if isinstance(request_data, str):
self.logger.debug("SEND %s %s - %s", method, url, request_data)
elif isinstance(request_data, bytes):
self.logger.debug("SEND %s %s - %d bytes", method, url, len(request_data))
elif request_data is not None:
self.logger.debug("SEND %s %s - ? bytes", method, url)
else:
self.logger.debug("SEND %s %s", method, url)

# Make request.
response = getattr(session, method.lower())(
url,
Expand All @@ -78,6 +93,12 @@ def make_request(session: requests.Session) -> requests.Response:
timeout=timeout or self.beaker._timeout,
)

# Log response at DEBUG.
if response.text:
self.logger.debug("RECV %s %s %s - %s", method, url, response, response.text)
else:
self.logger.debug("RECV %s %s %s", method, url, response)

if exceptions_for_status is not None and response.status_code in exceptions_for_status:
raise exceptions_for_status[response.status_code]

Expand Down
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ isort==5.11.4

# Running tests
pytest
flaky

# Allows generation of coverage reports with pytest.
pytest-cov
Expand Down
2 changes: 2 additions & 0 deletions tests/client_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import pytest
from flaky import flaky

from beaker import Beaker


@flaky # this can fail if the request to GitHub fails
def test_warn_for_newer_version(monkeypatch):
import beaker.client
import beaker.version
Expand Down

0 comments on commit d5894a9

Please sign in to comment.