diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 23e93e18f..405fc1e52 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -5,12 +5,13 @@ import logging import time from pathlib import Path -from typing import Callable, Union, Dict, Optional +from typing import Callable, Dict, Optional, Union import geopandas as gpd import pandas as pd import requests -from openeo import Connection, BatchJob + +from openeo import BatchJob, Connection from openeo.rest import OpenEoApiError from openeo.util import deep_get @@ -42,7 +43,7 @@ def start_job(row, connection, **kwargs): """ def __init__(self, poll_sleep: int = 60): - """Create a MultiBackendJobManager + """Create a MultiBackendJobManager. :param poll_sleep: How many seconds to sleep between polls. """ @@ -55,7 +56,16 @@ def add_backend( connection: Union[Connection, Callable[[], Connection]], parallel_jobs: int = 2, ): - """Register a backend with a name and a Connection getter""" + """ + Register a backend with a name and a Connection getter. + + :param name: + Name of the backend. + :param connection: + Either a Connection to the backend, or a callable to create a backend connection. + :param parallel_jobs: + Maximum number of jobs to allow in parallel on a backend. + """ if isinstance(connection, Connection): c = connection connection = lambda: c @@ -65,6 +75,12 @@ def add_backend( ) def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: + """Ensure we have the required columns and the expected type for the geometry column. + + :param df: The dataframe to normalize. + :return: a new dataframe that is normalized. + """ + # check for some required columns. required_with_default = [ ("status", "not_started"), @@ -192,7 +208,7 @@ def on_job_done(self, job: BatchJob, row): Default implementation downloads the results into a folder containing the title. - :param job: the job that has finished. + :param job: The job that has finished. :param row: DataFrame row containing the job's metadata. """ # TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use? @@ -203,16 +219,26 @@ def on_job_done(self, job: BatchJob, row): json.dump(job_metadata, f, ensure_ascii=False) def on_job_error(self, job: BatchJob, row): + """ + Handles jobs that stopped with errors. Can be overridden to provide custom behaviour. + + Default implementation writes the error logs to a JSON file. + + :param job: The job that has finished. + :param row: DataFrame row containing the job's metadata. + """ + # TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use? + logs = job.logs() error_logs = [l for l in logs if l.level.lower() == "error"] job_metadata = job.describe_job() title = job_metadata["title"] if len(error_logs) > 0: - (f"job_{title}_errors.json").write_text(json.dumps(error_logs, indent=2)) + f"job_{title}_errors.json".write_text(json.dumps(error_logs, indent=2)) def _update_statuses(self, df: pd.DataFrame): - """Update status (and stats) of running jobs (in place)""" + """Update status (and stats) of running jobs (in place).""" active = df.loc[ (df.status == "created") | (df.status == "queued") diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 7219d14f8..83861a1c7 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1,13 +1,12 @@ -import openeo -import pandas as pd -from openeo.rest.job import RESTJob - -from openeo.extra.job_management import MultiBackendJobManager +import os +from pathlib import Path +import pandas as pd import pytest -import os -from pathlib import Path +import openeo +from openeo.extra.job_management import MultiBackendJobManager +from openeo.rest.job import RESTJob @pytest.fixture