From 03a998f4b6220c37d01dc46ff76384daadc722e6 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 27 Sep 2024 13:50:25 +0200 Subject: [PATCH] Issue #635: cleaner job manager API for initializing the job database --- openeo/extra/job_management.py | 38 +++++++-- tests/extra/test_job_management.py | 121 ++++++++++++++++++++++------- 2 files changed, 126 insertions(+), 33 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 84a347e6b..6e12a4357 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -90,6 +90,11 @@ def get_by_status(self, statuses: List[str], max=None) -> pd.DataFrame: """ ... + +def _start_job_default(*args, **kwargs): + raise NotImplementedError + + class MultiBackendJobManager: """ Tracker for multiple jobs on multiple backends. @@ -253,8 +258,11 @@ def _make_resilient(connection): connection.session.mount("https://", HTTPAdapter(max_retries=retries)) connection.session.mount("http://", HTTPAdapter(max_retries=retries)) - def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: - """Ensure we have the required columns and the expected type for the geometry column. + @staticmethod + def __normalize_df__(df: pd.DataFrame) -> pd.DataFrame: + """ + Normalize given pandas dataframe (creating a new one): + ensure we have the required columns. :param df: The dataframe to normalize. :return: a new dataframe that is normalized. @@ -366,8 +374,8 @@ def stop_job_thread(self, timeout_seconds: Optional[float] = _UNSET): def run_jobs( self, - df: Optional[pd.DataFrame], - start_job: Callable[[], BatchJob], + df: Optional[pd.DataFrame] = None, + start_job: Callable[[], BatchJob] = _start_job_default, job_db: Union[str, Path, JobDatabaseInterface, None] = None, **kwargs, ): @@ -450,8 +458,7 @@ def run_jobs( # Resume from existing db _log.info(f"Resuming `run_jobs` from existing {job_db}") elif df is not None: - df = self._normalize_df(df) - job_db.persist(df) + job_db.initialize_from_df(df) while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0: self._job_update_loop(df, job_db, start_job) @@ -691,6 +698,25 @@ def __init__(self): super().__init__() self._df = None + def initialize_from_df(self, df: pd.DataFrame): + """ + Initialize the job database from a given dataframe, + which will be first normalized to be compatible + with :py:class:`MultiBackendJobManager` usage. + + :param df: data frame with some columns that + :return: initialized job database. + + .. versionadded:: 0.33.0 + """ + # TODO: option to provide custom MultiBackendJobManager subclass with custom normalize? + if self.exists(): + raise RuntimeError(f"Job database {self!r} already exists.") + df = MultiBackendJobManager.__normalize_df__(df) + self.persist(df) + # Return self to allow chaining with constructor. + return self + @property def df(self) -> pd.DataFrame: if self._df is None: diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 42908508a..39e9d112d 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -79,7 +79,10 @@ def sleep_mock(self): with mock.patch("time.sleep") as sleep: yield sleep - def test_basic(self, tmp_path, requests_mock, sleep_mock): + def test_basic_legacy(self, tmp_path, requests_mock, sleep_mock): + """ + Legacy `run_jobs()` usage with explicit dataframe and output file + """ manager = self.create_basic_mocked_manager(requests_mock, tmp_path) df = pd.DataFrame( @@ -108,6 +111,42 @@ def start_job(row, connection, **kwargs): metadata_path = manager.get_job_metadata_path(job_id="job-2022") assert metadata_path.exists() + + def test_basic(self, tmp_path, requests_mock, sleep_mock): + """ + `run_jobs()` usage with a `CsvJobDatabase` + (and no explicit dataframe or output file) + """ + manager = self.create_basic_mocked_manager(requests_mock, tmp_path) + + df = pd.DataFrame( + { + "year": [2018, 2019, 2020, 2021, 2022], + # Use simple points in WKT format to test conversion to the geometry dtype + "geometry": ["POINT (1 2)"] * 5, + } + ) + output_file = tmp_path / "jobs.csv" + + def start_job(row, connection, **kwargs): + year = int(row["year"]) + return BatchJob(job_id=f"job-{year}", connection=connection) + + job_db = CsvJobDatabase(output_file).initialize_from_df(df) + + manager.run_jobs(job_db=job_db, start_job=start_job) + assert sleep_mock.call_count > 10 + + result = pd.read_csv(output_file) + assert len(result) == 5 + assert set(result.status) == {"finished"} + assert set(result.backend_name) == {"foo", "bar"} + + # We expect that the job metadata was saved, so verify that it exists. + # Checking for one of the jobs is enough. + metadata_path = manager.get_job_metadata_path(job_id="job-2022") + assert metadata_path.exists() + def test_basic_threading(self, tmp_path, requests_mock, sleep_mock): manager = self.create_basic_mocked_manager(requests_mock, tmp_path) @@ -124,9 +163,7 @@ def start_job(row, connection, **kwargs): year = int(row["year"]) return BatchJob(job_id=f"job-{year}", connection=connection) - job_db = CsvJobDatabase(output_file) - # TODO: avoid private _normalize_df API - job_db.persist(manager._normalize_df(df)) + job_db = CsvJobDatabase(output_file).initialize_from_df(df) manager.start_job_thread(start_job=start_job, job_db=job_db) # Trigger context switch to job thread @@ -202,28 +239,6 @@ def mock_job_status(job_id, queued=1, running=2): manager.add_backend("bar", connection=openeo.connect("http://bar.test")) return manager - def test_normalize_df(self): - df = pd.DataFrame( - { - "some_number": [3, 2, 1], - } - ) - - df_normalized = MultiBackendJobManager()._normalize_df(df) - - assert set(df_normalized.columns) == set( - [ - "some_number", - "status", - "id", - "start_time", - "running_start_time", - "cpu", - "memory", - "duration", - "backend_name", - ] - ) def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock): """Make sure the MultiBackendJobManager does not hang after all processes finish. @@ -654,7 +669,7 @@ def test_persist_and_read(self, tmp_path, orig: pandas.DataFrame): ], ) def test_partial_read_write(self, tmp_path, orig: pandas.DataFrame): - path = tmp_path / "jobs.parquet" + path = tmp_path / "jobs.csv" required_with_default = [ ("status", "not_started"), @@ -685,6 +700,34 @@ def test_partial_read_write(self, tmp_path, orig: pandas.DataFrame): assert all.loc[2,"status"] == "not_started" print(loaded.index) + def test_initialize_from_df(self, tmp_path): + orig_df = pd.DataFrame({"some_number": [3, 2, 1]}) + path = tmp_path / "jobs.csv" + + # Initialize the CSV from the dataframe + _ = CsvJobDatabase(path).initialize_from_df(orig_df) + + # Check persisted CSV + assert path.exists() + expected_columns = { + "some_number", + "status", + "id", + "start_time", + "running_start_time", + "cpu", + "memory", + "duration", + "backend_name", + } + + # Raw file content check + raw_columns = set(path.read_text().split("\n")[0].split(",")) + # Higher level read + read_columns = set(CsvJobDatabase(path).read().columns) + + assert raw_columns == expected_columns + assert read_columns == expected_columns class TestParquetJobDatabase: @@ -712,3 +755,27 @@ def test_persist_and_read(self, tmp_path, orig: pandas.DataFrame): assert loaded.dtypes.to_dict() == orig.dtypes.to_dict() assert loaded.equals(orig) assert type(orig) is type(loaded) + + def test_initialize_from_df(self, tmp_path): + orig_df = pd.DataFrame({"some_number": [3, 2, 1]}) + path = tmp_path / "jobs.parquet" + + # Initialize the CSV from the dataframe + _ = ParquetJobDatabase(path).initialize_from_df(orig_df) + + # Check persisted CSV + assert path.exists() + expected_columns = { + "some_number", + "status", + "id", + "start_time", + "running_start_time", + "cpu", + "memory", + "duration", + "backend_name", + } + + df_from_disk = ParquetJobDatabase(path).read() + assert set(df_from_disk.columns) == expected_columns