Skip to content

Commit

Permalink
Issue #635: cleaner job manager API for initializing the job database
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Sep 27, 2024
1 parent ffa7be2 commit 03a998f
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 33 deletions.
38 changes: 32 additions & 6 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ def get_by_status(self, statuses: List[str], max=None) -> pd.DataFrame:
"""
...


def _start_job_default(*args, **kwargs):
raise NotImplementedError


class MultiBackendJobManager:
"""
Tracker for multiple jobs on multiple backends.
Expand Down Expand Up @@ -253,8 +258,11 @@ def _make_resilient(connection):
connection.session.mount("https://", HTTPAdapter(max_retries=retries))
connection.session.mount("http://", HTTPAdapter(max_retries=retries))

def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
"""Ensure we have the required columns and the expected type for the geometry column.
@staticmethod
def __normalize_df__(df: pd.DataFrame) -> pd.DataFrame:
"""
Normalize given pandas dataframe (creating a new one):
ensure we have the required columns.
:param df: The dataframe to normalize.
:return: a new dataframe that is normalized.
Expand Down Expand Up @@ -366,8 +374,8 @@ def stop_job_thread(self, timeout_seconds: Optional[float] = _UNSET):

def run_jobs(
self,
df: Optional[pd.DataFrame],
start_job: Callable[[], BatchJob],
df: Optional[pd.DataFrame] = None,
start_job: Callable[[], BatchJob] = _start_job_default,
job_db: Union[str, Path, JobDatabaseInterface, None] = None,
**kwargs,
):
Expand Down Expand Up @@ -450,8 +458,7 @@ def run_jobs(
# Resume from existing db
_log.info(f"Resuming `run_jobs` from existing {job_db}")
elif df is not None:
df = self._normalize_df(df)
job_db.persist(df)
job_db.initialize_from_df(df)

while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
self._job_update_loop(df, job_db, start_job)
Expand Down Expand Up @@ -691,6 +698,25 @@ def __init__(self):
super().__init__()
self._df = None

def initialize_from_df(self, df: pd.DataFrame):
"""
Initialize the job database from a given dataframe,
which will be first normalized to be compatible
with :py:class:`MultiBackendJobManager` usage.
:param df: data frame with some columns that
:return: initialized job database.
.. versionadded:: 0.33.0
"""
# TODO: option to provide custom MultiBackendJobManager subclass with custom normalize?
if self.exists():
raise RuntimeError(f"Job database {self!r} already exists.")
df = MultiBackendJobManager.__normalize_df__(df)
self.persist(df)
# Return self to allow chaining with constructor.
return self

@property
def df(self) -> pd.DataFrame:
if self._df is None:
Expand Down
121 changes: 94 additions & 27 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ def sleep_mock(self):
with mock.patch("time.sleep") as sleep:
yield sleep

def test_basic(self, tmp_path, requests_mock, sleep_mock):
def test_basic_legacy(self, tmp_path, requests_mock, sleep_mock):
"""
Legacy `run_jobs()` usage with explicit dataframe and output file
"""
manager = self.create_basic_mocked_manager(requests_mock, tmp_path)

df = pd.DataFrame(
Expand Down Expand Up @@ -108,6 +111,42 @@ def start_job(row, connection, **kwargs):
metadata_path = manager.get_job_metadata_path(job_id="job-2022")
assert metadata_path.exists()


def test_basic(self, tmp_path, requests_mock, sleep_mock):
"""
`run_jobs()` usage with a `CsvJobDatabase`
(and no explicit dataframe or output file)
"""
manager = self.create_basic_mocked_manager(requests_mock, tmp_path)

df = pd.DataFrame(
{
"year": [2018, 2019, 2020, 2021, 2022],
# Use simple points in WKT format to test conversion to the geometry dtype
"geometry": ["POINT (1 2)"] * 5,
}
)
output_file = tmp_path / "jobs.csv"

def start_job(row, connection, **kwargs):
year = int(row["year"])
return BatchJob(job_id=f"job-{year}", connection=connection)

job_db = CsvJobDatabase(output_file).initialize_from_df(df)

manager.run_jobs(job_db=job_db, start_job=start_job)
assert sleep_mock.call_count > 10

result = pd.read_csv(output_file)
assert len(result) == 5
assert set(result.status) == {"finished"}
assert set(result.backend_name) == {"foo", "bar"}

# We expect that the job metadata was saved, so verify that it exists.
# Checking for one of the jobs is enough.
metadata_path = manager.get_job_metadata_path(job_id="job-2022")
assert metadata_path.exists()

def test_basic_threading(self, tmp_path, requests_mock, sleep_mock):
manager = self.create_basic_mocked_manager(requests_mock, tmp_path)

Expand All @@ -124,9 +163,7 @@ def start_job(row, connection, **kwargs):
year = int(row["year"])
return BatchJob(job_id=f"job-{year}", connection=connection)

job_db = CsvJobDatabase(output_file)
# TODO: avoid private _normalize_df API
job_db.persist(manager._normalize_df(df))
job_db = CsvJobDatabase(output_file).initialize_from_df(df)

manager.start_job_thread(start_job=start_job, job_db=job_db)
# Trigger context switch to job thread
Expand Down Expand Up @@ -202,28 +239,6 @@ def mock_job_status(job_id, queued=1, running=2):
manager.add_backend("bar", connection=openeo.connect("http://bar.test"))
return manager

def test_normalize_df(self):
df = pd.DataFrame(
{
"some_number": [3, 2, 1],
}
)

df_normalized = MultiBackendJobManager()._normalize_df(df)

assert set(df_normalized.columns) == set(
[
"some_number",
"status",
"id",
"start_time",
"running_start_time",
"cpu",
"memory",
"duration",
"backend_name",
]
)

def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock):
"""Make sure the MultiBackendJobManager does not hang after all processes finish.
Expand Down Expand Up @@ -654,7 +669,7 @@ def test_persist_and_read(self, tmp_path, orig: pandas.DataFrame):
],
)
def test_partial_read_write(self, tmp_path, orig: pandas.DataFrame):
path = tmp_path / "jobs.parquet"
path = tmp_path / "jobs.csv"

required_with_default = [
("status", "not_started"),
Expand Down Expand Up @@ -685,6 +700,34 @@ def test_partial_read_write(self, tmp_path, orig: pandas.DataFrame):
assert all.loc[2,"status"] == "not_started"
print(loaded.index)

def test_initialize_from_df(self, tmp_path):
orig_df = pd.DataFrame({"some_number": [3, 2, 1]})
path = tmp_path / "jobs.csv"

# Initialize the CSV from the dataframe
_ = CsvJobDatabase(path).initialize_from_df(orig_df)

# Check persisted CSV
assert path.exists()
expected_columns = {
"some_number",
"status",
"id",
"start_time",
"running_start_time",
"cpu",
"memory",
"duration",
"backend_name",
}

# Raw file content check
raw_columns = set(path.read_text().split("\n")[0].split(","))
# Higher level read
read_columns = set(CsvJobDatabase(path).read().columns)

assert raw_columns == expected_columns
assert read_columns == expected_columns


class TestParquetJobDatabase:
Expand Down Expand Up @@ -712,3 +755,27 @@ def test_persist_and_read(self, tmp_path, orig: pandas.DataFrame):
assert loaded.dtypes.to_dict() == orig.dtypes.to_dict()
assert loaded.equals(orig)
assert type(orig) is type(loaded)

def test_initialize_from_df(self, tmp_path):
orig_df = pd.DataFrame({"some_number": [3, 2, 1]})
path = tmp_path / "jobs.parquet"

# Initialize the CSV from the dataframe
_ = ParquetJobDatabase(path).initialize_from_df(orig_df)

# Check persisted CSV
assert path.exists()
expected_columns = {
"some_number",
"status",
"id",
"start_time",
"running_start_time",
"cpu",
"memory",
"duration",
"backend_name",
}

df_from_disk = ParquetJobDatabase(path).read()
assert set(df_from_disk.columns) == expected_columns

0 comments on commit 03a998f

Please sign in to comment.