Skip to content

Commit

Permalink
#361 Code review, docstrings and organize imports
Browse files Browse the repository at this point in the history
  • Loading branch information
JohanKJSchreurs committed Jan 17, 2023
1 parent 9f4f427 commit 94983d9
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 14 deletions.
40 changes: 33 additions & 7 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import logging
import time
from pathlib import Path
from typing import Callable, Union, Dict, Optional
from typing import Callable, Dict, Optional, Union

import geopandas as gpd
import pandas as pd
import requests
from openeo import Connection, BatchJob

from openeo import BatchJob, Connection
from openeo.rest import OpenEoApiError
from openeo.util import deep_get

Expand Down Expand Up @@ -42,7 +43,7 @@ def start_job(row, connection, **kwargs):
"""

def __init__(self, poll_sleep: int = 60):
"""Create a MultiBackendJobManager
"""Create a MultiBackendJobManager.
:param poll_sleep: How many seconds to sleep between polls.
"""
Expand All @@ -55,7 +56,16 @@ def add_backend(
connection: Union[Connection, Callable[[], Connection]],
parallel_jobs: int = 2,
):
"""Register a backend with a name and a Connection getter"""
"""
Register a backend with a name and a Connection getter.
:param name:
Name of the backend.
:param connection:
Either a Connection to the backend, or a callable to create a backend connection.
:param parallel_jobs:
Maximum number of jobs to allow in parallel on a backend.
"""
if isinstance(connection, Connection):
c = connection
connection = lambda: c
Expand All @@ -65,6 +75,12 @@ def add_backend(
)

def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
"""Ensure we have the required columns and the expected type for the geometry column.
:param df: The dataframe to normalize.
:return: a new dataframe that is normalized.
"""

# check for some required columns.
required_with_default = [
("status", "not_started"),
Expand Down Expand Up @@ -192,7 +208,7 @@ def on_job_done(self, job: BatchJob, row):
Default implementation downloads the results into a folder containing the title.
:param job: the job that has finished.
:param job: The job that has finished.
:param row: DataFrame row containing the job's metadata.
"""
# TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use?
Expand All @@ -203,16 +219,26 @@ def on_job_done(self, job: BatchJob, row):
json.dump(job_metadata, f, ensure_ascii=False)

def on_job_error(self, job: BatchJob, row):
"""
Handles jobs that stopped with errors. Can be overridden to provide custom behaviour.
Default implementation writes the error logs to a JSON file.
:param job: The job that has finished.
:param row: DataFrame row containing the job's metadata.
"""
# TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use?

logs = job.logs()
error_logs = [l for l in logs if l.level.lower() == "error"]
job_metadata = job.describe_job()

title = job_metadata["title"]
if len(error_logs) > 0:
(f"job_{title}_errors.json").write_text(json.dumps(error_logs, indent=2))
f"job_{title}_errors.json".write_text(json.dumps(error_logs, indent=2))

def _update_statuses(self, df: pd.DataFrame):
"""Update status (and stats) of running jobs (in place)"""
"""Update status (and stats) of running jobs (in place)."""
active = df.loc[
(df.status == "created")
| (df.status == "queued")
Expand Down
13 changes: 6 additions & 7 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import openeo
import pandas as pd
from openeo.rest.job import RESTJob

from openeo.extra.job_management import MultiBackendJobManager
import os
from pathlib import Path

import pandas as pd
import pytest
import os

from pathlib import Path
import openeo
from openeo.extra.job_management import MultiBackendJobManager
from openeo.rest.job import RESTJob


@pytest.fixture
Expand Down

0 comments on commit 94983d9

Please sign in to comment.