Skip to content

Commit

Permalink
PR #596: Further finetuning and changelog
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Sep 3, 2024
1 parent b656fd7 commit d47de26
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- `load_stac`/`metadata_from_stac`: add support for extracting actual temporal dimension metadata ([#567](https://github.com/Open-EO/openeo-python-client/issues/567))
- `MultiBackendJobManager`: add `cancel_running_job_after` option to automatically cancel jobs that are running for too long ([#590](https://github.com/Open-EO/openeo-python-client/issues/590))

### Changed

Expand Down
36 changes: 20 additions & 16 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def __init__(
self,
poll_sleep: int = 60,
root_dir: Optional[Union[str, Path]] = ".",
*,
cancel_running_job_after: Optional[int] = None,
):
"""Create a MultiBackendJobManager.
Expand All @@ -133,8 +134,11 @@ def __init__(
- get_job_metadata_path
:param cancel_running_job_after [seconds]:
A temporal limit for long running jobs to get automatically canceled.
The preset is None, which disables the feature.
Optional temporal limit (in seconds) after which running jobs should be canceled
by the job manager.
.. versionchanged:: 0.32.0
Added `cancel_running_job_after` parameter.
"""
self.backends: Dict[str, _Backend] = {}
self.poll_sleep = poll_sleep
Expand All @@ -143,7 +147,7 @@ def __init__(
# An explicit None or "" should also default to "."
self._root_dir = Path(root_dir or ".")

self.cancel_running_job_after = (
self._cancel_running_job_after = (
datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None
)

Expand Down Expand Up @@ -242,7 +246,6 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
("memory", None),
("duration", None),
("backend_name", None),
("backend_name", None),
]
new_columns = {col: val for (col, val) in required_with_default if col not in df.columns}
df = df.assign(**new_columns)
Expand Down Expand Up @@ -483,18 +486,15 @@ def on_job_cancel(self, job: BatchJob, row):
def _cancel_prolonged_job(self, job: BatchJob, row):
"""Cancel the job if it has been running for too long."""
job_running_start_time = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True)
current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True)

if current_time > job_running_start_time + self.cancel_running_job_after:
elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - job_running_start_time
if elapsed > self._cancel_running_job_after:
try:
_log.info(
f"Cancelling job {job.job_id} as it has been running for more than {self.cancel_running_job_after}"
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})"
)

job.stop()

except OpenEoApiError as e:
_log.error(f"Error Cancelling long-running job {job.job_id}: {e}")
_log.error(f"Failed to cancel long-running job {job.job_id}: {e}")

def get_job_dir(self, job_id: str) -> Path:
"""Path to directory where job metadata, results and error logs are be saved."""
Expand All @@ -515,21 +515,25 @@ def ensure_job_dir_exists(self, job_id: str) -> Path:
job_dir.mkdir(parents=True)

def _track_statuses(self, df: pd.DataFrame):
"""tracks status (and stats) of running jobs (in place). Optinally cancels jobs when running too long"""
"""
Tracks status (and stats) of running jobs (in place).
Optionally cancels jobs when running too long.
"""
active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
for i in active.index:
job_id = df.loc[i, "id"]
backend_name = df.loc[i, "backend_name"]
previous_status = df.loc[i, "status"]

try:
con = self._get_connection(backend_name)
the_job = con.job(job_id)
job_metadata = the_job.describe()

previous_status = df.loc[i, "status"]
new_status = job_metadata["status"]

_log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r}")
_log.info(
f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})"
)

if new_status == "finished":
self.on_job_done(the_job, df.loc[i])
Expand All @@ -543,7 +547,7 @@ def _track_statuses(self, df: pd.DataFrame):
if new_status == "canceled":
self.on_job_cancel(the_job, df.loc[i])

if self.cancel_running_job_after and new_status == "running":
if self._cancel_running_job_after and new_status == "running":
self._cancel_prolonged_job(the_job, df.loc[i])

df.loc[i, "status"] = new_status
Expand Down

0 comments on commit d47de26

Please sign in to comment.