Skip to content

Commit

Permalink
Merge pull request #570 from materialsproject/response-job-dir-attr
Browse files Browse the repository at this point in the history
Add `job_dir` attribute to `Response` class to record where a job ran
  • Loading branch information
janosh authored Mar 29, 2024
2 parents 9d9fe24 + 227071e commit e88c571
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 21 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ default_language_version:
exclude: "^src/atomate2/vasp/schemas/calc_types/"
repos:
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.1.11
rev: v0.3.4
hooks:
- id: ruff
args: [--fix]
Expand Down Expand Up @@ -43,7 +43,7 @@ repos:
- id: rst-directive-colons
- id: rst-inline-touching-normal
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.8.0
rev: v1.9.0
hooks:
- id: mypy
files: ^src/
Expand Down
1 change: 1 addition & 0 deletions examples/encode_decode.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""A simple example to show message passing between jobs."""

from jobflow import Flow, job, run_locally


Expand Down
1 change: 1 addition & 0 deletions examples/fibonacci.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""A dynamic workflow that calculates the Fibonacci sequence."""

from jobflow import Response, job, run_locally


Expand Down
1 change: 0 additions & 1 deletion examples/replace.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""A demonstration of how to use dynamic workflows with replace."""


from jobflow import Flow, job, run_locally


Expand Down
11 changes: 7 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ docs = [
"sphinx-copybutton==0.5.2",
"sphinx==7.2.6",
]
dev = ["pre-commit>=2.12.1"]
dev = ["pre-commit>=2.12.1", "typing_extensions; python_version < '3.11'"]
tests = ["moto==4.2.13", "pytest-cov==5.0.0", "pytest==8.1.1"]
vis = ["matplotlib", "pydot"]
fireworks = ["FireWorks"]
Expand All @@ -62,8 +62,8 @@ strict = [
"pydantic==2.6.4",
"pydash==7.0.7",
"pydot==2.0.0",
"typing-extensions==4.10.0",
"python-ulid==2.2.0",
"typing-extensions==4.10.0",
]

[project.urls]
Expand Down Expand Up @@ -121,7 +121,8 @@ exclude_lines = [

[tool.ruff]
target-version = "py39"
ignore-init-module-imports = true

[tool.ruff.lint]
select = [
"B", # flake8-bugbear
"C4", # flake8-comprehensions
Expand Down Expand Up @@ -159,6 +160,7 @@ ignore = [
"DTZ005",
"FBT001",
"FBT002",
"ISC001",
"PLR0911", # too-many-return-statements
"PLR0912", # too-many-branches
"PLR0913", # too-many-arguments
Expand All @@ -169,8 +171,9 @@ ignore = [
]
pydocstyle.convention = "numpy"
isort.known-first-party = ["jobflow"]
ignore-init-module-imports = true

[tool.ruff.per-file-ignores]
[tool.ruff.lint.per-file-ignores]
# F401: unused import
"__init__.py" = ["F401"]
# D: pydocstyle
Expand Down
26 changes: 19 additions & 7 deletions src/jobflow/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
import typing
import warnings
from dataclasses import dataclass, field
from typing import cast

from monty.json import MSONable, jsanitize
from typing_extensions import Self

from jobflow.core.reference import OnMissing, OutputReference
from jobflow.utils.uid import suid

if typing.TYPE_CHECKING:
from collections.abc import Hashable, Sequence
from pathlib import Path
from typing import Any, Callable

from networkx import DiGraph
Expand Down Expand Up @@ -526,7 +529,7 @@ def set_uuid(self, uuid: str) -> None:
self.uuid = uuid
self.output = self.output.set_uuid(uuid)

def run(self, store: jobflow.JobStore) -> Response:
def run(self, store: jobflow.JobStore, job_dir: Path = None) -> Response:
"""
Run the job.
Expand Down Expand Up @@ -581,7 +584,9 @@ def run(self, store: jobflow.JobStore) -> Response:
function = types.MethodType(function, bound)

response = function(*self.function_args, **self.function_kwargs)
response = Response.from_job_returns(response, self.output_schema)
response = Response.from_job_returns(
response, self.output_schema, job_dir=job_dir
)

if response.replace is not None:
response.replace = prepare_replace(response.replace, self)
Expand Down Expand Up @@ -1170,6 +1175,8 @@ class Response(typing.Generic[T]):
Stop any children of the current flow.
stop_jobflow
Stop executing all remaining jobs.
job_dir
The directory where the job was run.
"""

output: T = None
Expand All @@ -1179,13 +1186,15 @@ class Response(typing.Generic[T]):
stored_data: dict[Hashable, Any] = None
stop_children: bool = False
stop_jobflow: bool = False
job_dir: str | Path = None

@classmethod
def from_job_returns(
cls,
job_returns: Any | None,
output_schema: type[BaseModel] = None,
) -> Response:
job_dir: str | Path = None,
) -> Self:
"""
Generate a :obj:`Response` from the outputs of a :obj:`Job`.
Expand All @@ -1199,6 +1208,8 @@ def from_job_returns(
output_schema
A pydantic model associated with the job. Used to enforce a schema for the
outputs.
job_dir
The directory where the job was run.
Raises
------
Expand All @@ -1215,17 +1226,18 @@ def from_job_returns(
# only apply output schema if there is no replace.
job_returns.output = apply_schema(job_returns.output, output_schema)

return job_returns
job_returns.job_dir = job_dir
return cast(Self, job_returns)

if isinstance(job_returns, (list, tuple)):
# check that a Response object is not given as one of many outputs
for r in job_returns:
if isinstance(r, Response):
for resp in job_returns:
if isinstance(resp, Response):
raise ValueError(
"Response cannot be returned in combination with other outputs."
)

return cls(output=apply_schema(job_returns, output_schema))
return cls(output=apply_schema(job_returns, output_schema), job_dir=job_dir)


def apply_schema(output: Any, schema: type[BaseModel] | None):
Expand Down
5 changes: 3 additions & 2 deletions src/jobflow/core/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Any, Optional, Union

from maggma.core import Sort
from typing_extensions import Self

from jobflow.core.schemas import JobStoreDocument

Expand Down Expand Up @@ -545,7 +546,7 @@ def get_output(
)

@classmethod
def from_file(cls: type[T], db_file: str | Path, **kwargs) -> T:
def from_file(cls, db_file: str | Path, **kwargs) -> Self:
"""
Create a JobStore from a database file.
Expand Down Expand Up @@ -605,7 +606,7 @@ def from_file(cls: type[T], db_file: str | Path, **kwargs) -> T:
return cls.from_dict_spec(store_info, **kwargs)

@classmethod
def from_dict_spec(cls: type[T], spec: dict, **kwargs) -> T:
def from_dict_spec(cls, spec: dict, **kwargs) -> Self:
"""
Create an JobStore from a dict specification.
Expand Down
6 changes: 4 additions & 2 deletions src/jobflow/managers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def run_locally(
The responses of the jobs, as a dict of ``{uuid: {index: response}}``.
"""
from collections import defaultdict
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from random import randint

Expand Down Expand Up @@ -152,7 +152,7 @@ def _run_job(job: jobflow.Job, parents):

def _get_job_dir():
if create_folders:
time_now = datetime.utcnow().strftime(SETTINGS.DIRECTORY_FORMAT)
time_now = datetime.now(tz=timezone.utc).strftime(SETTINGS.DIRECTORY_FORMAT)
job_dir = root_dir / f"job_{time_now}-{randint(10000, 99999)}"
job_dir.mkdir()
return job_dir
Expand All @@ -165,6 +165,8 @@ def _run(root_flow):
with cd(job_dir):
response, jobflow_stopped = _run_job(job, parents)

if response is not None:
response.job_dir = job_dir
encountered_bad_response = encountered_bad_response or response is None
if jobflow_stopped:
return False
Expand Down
1 change: 1 addition & 0 deletions src/jobflow/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Utility functions for logging, enumerations and manipulating dictionaries."""

from jobflow.utils.enum import ValueEnum
from jobflow.utils.find import (
contains_flow_or_job,
Expand Down
1 change: 1 addition & 0 deletions src/jobflow/utils/uid.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Tools for generating UUIDs."""

from __future__ import annotations

from uuid import UUID
Expand Down
1 change: 1 addition & 0 deletions src/jobflow/utils/uuid.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Tools for generating UUIDs."""

from monty.dev import deprecated


Expand Down
23 changes: 20 additions & 3 deletions tests/managers/test_local.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
from pathlib import Path

import pytest


Expand All @@ -10,7 +13,11 @@ def test_simple_job(memory_jobstore, clean_dir, simple_job):
responses = run_locally(job, store=memory_jobstore)

# check responses has been filled
assert responses[uuid][1].output == "12345_end"
response1 = responses[uuid][1]
assert response1.output == "12345_end"
# check job_dir
assert isinstance(response1.job_dir, Path)
assert os.path.isdir(response1.job_dir)

# check store has the activity output
result = memory_jobstore.query_one({"uuid": uuid})
Expand Down Expand Up @@ -63,7 +70,8 @@ def test_simple_flow(memory_jobstore, clean_dir, simple_flow, capsys):
assert len(folders) == 1

# run with folders and root_dir
assert Path(root_dir := "test").exists() is False
root_dir = "test"
assert Path(root_dir).exists() is False
responses = run_locally(
flow, store=memory_jobstore, create_folders=True, root_dir=root_dir
)
Expand Down Expand Up @@ -159,12 +167,14 @@ def test_detour_flow(memory_jobstore, clean_dir, detour_flow):

# run with log
responses = run_locally(flow, store=memory_jobstore)
uuid2 = next(u for u in responses if u not in {uuid1, uuid3})
uuid2 = next(uuid for uuid in responses if uuid not in {uuid1, uuid3})

# check responses has been filled
assert len(responses) == 3
assert responses[uuid1][1].output == 11
assert responses[uuid1][1].detour is not None
assert isinstance(responses[uuid1][1].job_dir, Path)
assert os.path.isdir(responses[uuid1][1].job_dir)
assert responses[uuid2][1].output == "11_end"
assert responses[uuid3][1].output == "12345_end"

Expand Down Expand Up @@ -196,6 +206,8 @@ def test_replace_flow(memory_jobstore, clean_dir, replace_flow):
assert len(responses[uuid1]) == 2
assert responses[uuid1][1].output == 11
assert responses[uuid1][1].replace is not None
assert isinstance(responses[uuid1][1].job_dir, Path)
assert os.path.isdir(responses[uuid1][1].job_dir)
assert responses[uuid1][2].output == "11_end"
assert responses[uuid2][1].output == "12345_end"

Expand Down Expand Up @@ -416,7 +428,10 @@ def test_detour_stop_flow(memory_jobstore, clean_dir, detour_stop_flow):
assert len(responses) == 2
assert responses[uuid1][1].output == 11
assert responses[uuid1][1].detour is not None
assert responses[uuid1][1].job_dir is None
assert responses[uuid2][1].output == "1234"
# TODO maybe find way to set artificial job_dir and test is not None
assert responses[uuid2][1].job_dir is None

# check store has the activity output
result1 = memory_jobstore.query_one({"uuid": uuid1})
Expand Down Expand Up @@ -444,3 +459,5 @@ def test_external_reference(memory_jobstore, clean_dir, simple_job):
uuid2 = job2.uuid
responses = run_locally(job2, store=memory_jobstore, allow_external_references=True)
assert responses[uuid2][1].output == "12345_end_end"
assert isinstance(responses[uuid2][1].job_dir, Path)
assert os.path.isdir(responses[uuid2][1].job_dir)

0 comments on commit e88c571

Please sign in to comment.