From 46bc25577fd5d6737417b71ee706fbfc22b8ceb6 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 4 May 2024 18:36:26 +0200 Subject: [PATCH 1/9] Implement a pending status for tasks. --- docs/source/changes.md | 7 ++++--- docs/source/coiled.md | 2 +- src/pytask_parallel/execute.py | 37 +++++++++++++++++++++------------- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/docs/source/changes.md b/docs/source/changes.md index bcaa1a6..0c2da68 100644 --- a/docs/source/changes.md +++ b/docs/source/changes.md @@ -19,12 +19,13 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and or processes automatically. - {pull}`96` handles local paths with remote executors. `PathNode`s are not supported as dependencies or products (except for return annotations). -- {pull}`99` changes that all tasks that are ready are being scheduled. It improves - interactions with adaptive scaling. {issue}`98` does handle the resulting issues: no - strong adherence to priorities, no pending status. +- {pull}`99` changes that all ready tasks are being scheduled. It improves interactions + with adaptive scaling. {issue}`98` does handle the resulting issues: no strong + adherence to priorities, no pending status. - {pull}`100` adds project management with rye. - {pull}`101` adds syncing for local paths as dependencies or products in remote environments with the same OS. +- {pull}`102` implements a pending status for scheduled but not started tasks. ## 0.4.1 - 2024-01-12 diff --git a/docs/source/coiled.md b/docs/source/coiled.md index 5463f7b..f53912e 100644 --- a/docs/source/coiled.md +++ b/docs/source/coiled.md @@ -1,7 +1,7 @@ # coiled ```{caution} -Currently, the coiled backend can only be used if your workflow code is organized in a +Currently, the coiled backend can only be used if your workflow code is organized as a package due to how pytask imports your code and dask serializes task functions ([issue](https://github.com/dask/distributed/issues/8607)). ``` diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 1ee0d71..afb9572 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -16,6 +16,7 @@ from pytask import PTask from pytask import PythonNode from pytask import Session +from pytask import TaskExecutionStatus from pytask import console from pytask import get_marks from pytask import hookimpl @@ -53,6 +54,9 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 reports = session.execution_reports running_tasks: dict[str, Future[Any]] = {} + # Get the live execution manager from the registry if it exists. + live_execution = session.config["pm"].get_plugin("live_execution") + # The executor can only be created after the collection to give users the # possibility to inject their own executors. session.config["_parallel_executor"] = registry.get_parallel_backend( @@ -68,17 +72,17 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 newly_collected_reports = [] ready_tasks = list(session.scheduler.get_ready(10_000)) - for task_name in ready_tasks: - task = session.dag.nodes[task_name]["task"] + for task_signature in ready_tasks: + task = session.dag.nodes[task_signature]["task"] session.hook.pytask_execute_task_log_start( - session=session, task=task + session=session, task=task, status=TaskExecutionStatus.PENDING ) try: session.hook.pytask_execute_task_setup( session=session, task=task ) - running_tasks[task_name] = session.hook.pytask_execute_task( - session=session, task=task + running_tasks[task_signature] = ( + session.hook.pytask_execute_task(session=session, task=task) ) sleeper.reset() except Exception: # noqa: BLE001 @@ -86,13 +90,13 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 task, sys.exc_info() ) newly_collected_reports.append(report) - session.scheduler.done(task_name) + session.scheduler.done(task_signature) if not ready_tasks: sleeper.increment() - for task_name in list(running_tasks): - future = running_tasks[task_name] + for task_signature in list(running_tasks): + future = running_tasks[task_signature] if future.done(): wrapper_result = parse_future_result(future) @@ -108,17 +112,17 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ) if wrapper_result.exc_info is not None: - task = session.dag.nodes[task_name]["task"] + task = session.dag.nodes[task_signature]["task"] newly_collected_reports.append( ExecutionReport.from_task_and_exception( task, wrapper_result.exc_info, # type: ignore[arg-type] ) ) - running_tasks.pop(task_name) - session.scheduler.done(task_name) + running_tasks.pop(task_signature) + session.scheduler.done(task_signature) else: - task = session.dag.nodes[task_name]["task"] + task = session.dag.nodes[task_signature]["task"] _update_carry_over_products( task, wrapper_result.carry_over_products ) @@ -134,9 +138,14 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 else: report = ExecutionReport.from_task(task) - running_tasks.pop(task_name) + running_tasks.pop(task_signature) newly_collected_reports.append(report) - session.scheduler.done(task_name) + session.scheduler.done(task_signature) + + elif live_execution and future.running(): + live_execution.update_task( + task_signature, status=TaskExecutionStatus.RUNNING + ) for report in newly_collected_reports: session.hook.pytask_execute_task_process_report( From 7131a1f8c44f0449e46fe9d7048b2089c3590aa4 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 21:00:42 +0200 Subject: [PATCH 2/9] Leftover commit. --- src/pytask_parallel/execute.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index afb9572..292aae2 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -62,7 +62,6 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.config["_parallel_executor"] = registry.get_parallel_backend( session.config["parallel_backend"], n_workers=session.config["n_workers"] ) - with session.config["_parallel_executor"]: sleeper = _Sleeper() @@ -142,10 +141,14 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 newly_collected_reports.append(report) session.scheduler.done(task_signature) - elif live_execution and future.running(): - live_execution.update_task( - task_signature, status=TaskExecutionStatus.RUNNING + elif not future.done(): + pass + elif live_execution: + status = _get_status_from_undone_task( + task_signature, future, session.config["_parallel_executor"] ) + if status == TaskExecutionStatus.RUNNING: + live_execution.update_task(task_signature, status=status) for report in newly_collected_reports: session.hook.pytask_execute_task_process_report( @@ -299,3 +302,15 @@ def increment(self) -> None: def sleep(self) -> None: time.sleep(self.timings[self.timing_idx]) + + +def _get_status_from_undone_task( + task_signature: str, future: Future, executor: Any +) -> TaskExecutionStatus: + """Get the status of a task that is undone.""" + if hasattr(future, "_state"): + status = future._state + if status == "RUNNING": + breakpoint() + return TaskExecutionStatus.RUNNING + return TaskExecutionStatus.PENDING From b73ed65382710c719575296571be2d345ca4fa47 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 21:32:45 +0200 Subject: [PATCH 3/9] Add correct pending status for cf and loky. --- src/pytask_parallel/execute.py | 44 ++++++++++++++++----------------- src/pytask_parallel/wrappers.py | 23 ++++++++++++++++- 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 292aae2..e471c5c 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -2,6 +2,7 @@ from __future__ import annotations +import multiprocessing import sys import time from typing import TYPE_CHECKING @@ -24,6 +25,7 @@ from pytask.tree_util import tree_map from pytask.tree_util import tree_structure +from pytask_parallel.backends import ParallelBackend from pytask_parallel.backends import WorkerType from pytask_parallel.backends import registry from pytask_parallel.typing import CarryOverPath @@ -53,6 +55,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 __tracebackhide__ = True reports = session.execution_reports running_tasks: dict[str, Future[Any]] = {} + sleeper = _Sleeper() # Get the live execution manager from the registry if it exists. live_execution = session.config["pm"].get_plugin("live_execution") @@ -63,7 +66,14 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.config["parallel_backend"], n_workers=session.config["n_workers"] ) with session.config["_parallel_executor"]: - sleeper = _Sleeper() + # Create a shared memory object to differentiate between running and pending + # tasks. + if session.config["parallel_backend"] in ( + ParallelBackend.PROCESSES, + ParallelBackend.THREADS, + ParallelBackend.LOKY, + ): + session.config["_shared_memory"] = multiprocessing.Manager().dict() i = 0 while session.scheduler.is_active(): @@ -141,14 +151,11 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 newly_collected_reports.append(report) session.scheduler.done(task_signature) - elif not future.done(): - pass - elif live_execution: - status = _get_status_from_undone_task( - task_signature, future, session.config["_parallel_executor"] - ) - if status == TaskExecutionStatus.RUNNING: - live_execution.update_task(task_signature, status=status) + elif live_execution and "_shared_memory" in session.config: + if task_signature in session.config["_shared_memory"]: + live_execution.update_task( + task_signature, status=TaskExecutionStatus.RUNNING + ) for report in newly_collected_reports: session.hook.pytask_execute_task_process_report( @@ -228,6 +235,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: kwargs=kwargs, remote=remote, session_filterwarnings=session.config["filterwarnings"], + shared_memory=session.config["_shared_memory"], show_locals=session.config["show_locals"], task_filterwarnings=get_marks(task, "filterwarnings"), ) @@ -236,7 +244,11 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: from pytask_parallel.wrappers import wrap_task_in_thread return session.config["_parallel_executor"].submit( - wrap_task_in_thread, task=task, remote=False, **kwargs + wrap_task_in_thread, + task=task, + remote=False, + shared_memory=session.config["_shared_memory"], + **kwargs, ) msg = f"Unknown worker type {worker_type}" raise ValueError(msg) @@ -302,15 +314,3 @@ def increment(self) -> None: def sleep(self) -> None: time.sleep(self.timings[self.timing_idx]) - - -def _get_status_from_undone_task( - task_signature: str, future: Future, executor: Any -) -> TaskExecutionStatus: - """Get the status of a task that is undone.""" - if hasattr(future, "_state"): - status = future._state - if status == "RUNNING": - breakpoint() - return TaskExecutionStatus.RUNNING - return TaskExecutionStatus.PENDING diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index 6157873..ad8c2ed 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -55,7 +55,9 @@ class WrapperResult: stderr: str -def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperResult: +def wrap_task_in_thread( + task: PTask, *, remote: bool, shared_memory: dict[str, bool] | None, **kwargs: Any +) -> WrapperResult: """Mock execution function such that it returns the same as for processes. The function for processes returns ``warning_reports`` and an ``exception``. With @@ -64,6 +66,11 @@ def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperR """ __tracebackhide__ = True + + # Add task to shared memory to indicate that it is currently being executed. + if shared_memory is not None: + shared_memory[task.signature] = True + try: out = task.function(**kwargs) except Exception: # noqa: BLE001 @@ -71,6 +78,11 @@ def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperR else: _handle_function_products(task, out, remote=remote) exc_info = None # type: ignore[assignment] + + # Remove task from shared memory to indicate that it is no longer being executed. + if shared_memory is not None: + shared_memory.pop(task.signature) + return WrapperResult( carry_over_products=None, # type: ignore[arg-type] warning_reports=[], @@ -87,6 +99,7 @@ def wrap_task_in_process( # noqa: PLR0913 kwargs: dict[str, Any], remote: bool, session_filterwarnings: tuple[str, ...], + shared_memory: dict[str, bool] | None, show_locals: bool, task_filterwarnings: tuple[Mark, ...], ) -> WrapperResult: @@ -99,6 +112,10 @@ def wrap_task_in_process( # noqa: PLR0913 # Hide this function from tracebacks. __tracebackhide__ = True + # Add task to shared memory to indicate that it is currently being executed. + if shared_memory is not None: + shared_memory[task.signature] = True + # Patch set_trace and breakpoint to show a better error message. _patch_set_trace_and_breakpoint() @@ -156,6 +173,10 @@ def wrap_task_in_process( # noqa: PLR0913 captured_stdout_buffer.close() captured_stderr_buffer.close() + # Remove task from shared memory to indicate that it is no longer being executed. + if shared_memory is not None: + shared_memory.pop(task.signature) + return WrapperResult( carry_over_products=products, # type: ignore[arg-type] warning_reports=warning_reports, From 90e05a2d5b1eb67d21f1163f70e357c512f720fe Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 21:38:05 +0200 Subject: [PATCH 4/9] Fix tests. --- src/pytask_parallel/execute.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index e471c5c..363b48c 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -235,7 +235,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: kwargs=kwargs, remote=remote, session_filterwarnings=session.config["filterwarnings"], - shared_memory=session.config["_shared_memory"], + shared_memory=session.config.get("_shared_memory"), show_locals=session.config["show_locals"], task_filterwarnings=get_marks(task, "filterwarnings"), ) @@ -247,7 +247,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: wrap_task_in_thread, task=task, remote=False, - shared_memory=session.config["_shared_memory"], + shared_memory=session.config.get("_shared_memory"), **kwargs, ) msg = f"Unknown worker type {worker_type}" From 1e2006c04c194934cded892c86ea17817544c734 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 21:47:43 +0200 Subject: [PATCH 5/9] FIx. --- src/pytask_parallel/execute.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 363b48c..c789b16 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -57,6 +57,18 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 running_tasks: dict[str, Future[Any]] = {} sleeper = _Sleeper() + # Create a shared memory object to differentiate between running and pending + # tasks for some parallel backends. + if session.config["parallel_backend"] in ( + ParallelBackend.PROCESSES, + ParallelBackend.THREADS, + ParallelBackend.LOKY, + ): + session.config["_shared_memory"] = multiprocessing.Manager().dict() + start_execution_state = TaskExecutionStatus.PENDING + else: + start_execution_state = TaskExecutionStatus.RUNNING + # Get the live execution manager from the registry if it exists. live_execution = session.config["pm"].get_plugin("live_execution") @@ -66,15 +78,6 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.config["parallel_backend"], n_workers=session.config["n_workers"] ) with session.config["_parallel_executor"]: - # Create a shared memory object to differentiate between running and pending - # tasks. - if session.config["parallel_backend"] in ( - ParallelBackend.PROCESSES, - ParallelBackend.THREADS, - ParallelBackend.LOKY, - ): - session.config["_shared_memory"] = multiprocessing.Manager().dict() - i = 0 while session.scheduler.is_active(): try: @@ -84,7 +87,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 for task_signature in ready_tasks: task = session.dag.nodes[task_signature]["task"] session.hook.pytask_execute_task_log_start( - session=session, task=task, status=TaskExecutionStatus.PENDING + session=session, task=task, status=start_execution_state ) try: session.hook.pytask_execute_task_setup( From 89ff770f05d1889a2a12688a9b1ff6712bfd2065 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 23:02:50 +0200 Subject: [PATCH 6/9] fix. --- src/pytask_parallel/execute.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index c789b16..79ba551 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -5,6 +5,7 @@ import multiprocessing import sys import time +from contextlib import ExitStack from typing import TYPE_CHECKING from typing import Any @@ -64,9 +65,10 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ParallelBackend.THREADS, ParallelBackend.LOKY, ): - session.config["_shared_memory"] = multiprocessing.Manager().dict() + manager_cls = multiprocessing.Manager start_execution_state = TaskExecutionStatus.PENDING else: + manager_cls = ExitStack start_execution_state = TaskExecutionStatus.RUNNING # Get the live execution manager from the registry if it exists. @@ -77,7 +79,14 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.config["_parallel_executor"] = registry.get_parallel_backend( session.config["parallel_backend"], n_workers=session.config["n_workers"] ) - with session.config["_parallel_executor"]: + with session.config["_parallel_executor"], manager_cls() as manager: + if session.config["parallel_backend"] in ( + ParallelBackend.PROCESSES, + ParallelBackend.THREADS, + ParallelBackend.LOKY, + ): + session.config["_shared_memory"] = manager.dict() + i = 0 while session.scheduler.is_active(): try: From ba20e229aa49062a4d59f9e383427ca490dc683f Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Tue, 7 May 2024 23:24:53 +0200 Subject: [PATCH 7/9] Fix types. --- src/pytask_parallel/execute.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 79ba551..135d31e 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -2,12 +2,13 @@ from __future__ import annotations -import multiprocessing import sys import time from contextlib import ExitStack +from multiprocessing import Manager from typing import TYPE_CHECKING from typing import Any +from typing import Callable import cloudpickle from _pytask.node_protocols import PPathNode @@ -37,6 +38,7 @@ if TYPE_CHECKING: from concurrent.futures import Future + from multiprocessing.managers import SyncManager from pytask_parallel.wrappers import WrapperResult @@ -65,7 +67,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ParallelBackend.THREADS, ParallelBackend.LOKY, ): - manager_cls = multiprocessing.Manager + manager_cls: Callable[[], SyncManager] | type[ExitStack] = Manager start_execution_state = TaskExecutionStatus.PENDING else: manager_cls = ExitStack @@ -85,7 +87,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ParallelBackend.THREADS, ParallelBackend.LOKY, ): - session.config["_shared_memory"] = manager.dict() + session.config["_shared_memory"] = manager.dict() # type: ignore[union-attr] i = 0 while session.scheduler.is_active(): From 1e2c9d21f32a079479a1929c7b096d56e3ce141d Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 27 Jul 2025 16:22:12 +0200 Subject: [PATCH 8/9] Finalize pending status. --- pyproject.toml | 2 +- src/pytask_parallel/execute.py | 9 ++++++--- src/pytask_parallel/wrappers.py | 4 ++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1b3452d..8c6d984 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ dependencies = [ "cloudpickle", "loky", "pluggy>=1.0.0", - "pytask>=0.5.2", + "git+https://github.com/pytask-dev/pytask@allow-setting-task-status", "rich", ] dynamic = ["version"] diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index c38e026..cc406eb 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -100,14 +100,15 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 # Unfortunately, all submitted tasks are shown as running although some # are pending. # - # Without coiled functions, we submit as many tasks as there are - # available workers since we cannot reliably detect a pending status. + # For all other backends, at least four more tasks are submitted and + # otherwise 10% more. This is a heuristic to avoid submitting too few + # tasks. # # See #98 for more information. if any_coiled_task: n_new_tasks = 10_000 else: - n_new_tasks = session.config["n_workers"] - len(running_tasks) + n_new_tasks = max(4, int(session.config["n_workers"] * 0.1)) ready_tasks = ( list(session.scheduler.get_ready(n_new_tasks)) @@ -185,6 +186,8 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 newly_collected_reports.append(report) session.scheduler.done(task_signature) + # Check if tasks are not pending but running and update the live + # status. elif live_execution and "_shared_memory" in session.config: if task_signature in session.config["_shared_memory"]: live_execution.update_task( diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index e989b2f..689dba2 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -81,7 +81,7 @@ def wrap_task_in_thread( # Remove task from shared memory to indicate that it is no longer being executed. if shared_memory is not None: - shared_memory.pop(task.signature) + shared_memory.pop(task.signature, None) return WrapperResult( carry_over_products=None, # type: ignore[arg-type] @@ -177,7 +177,7 @@ def wrap_task_in_process( # noqa: PLR0913 # Remove task from shared memory to indicate that it is no longer being executed. if shared_memory is not None: - shared_memory.pop(task.signature) + shared_memory.pop(task.signature, None) return WrapperResult( carry_over_products=products, # type: ignore[arg-type] From a14289d14c1ec8cd79ec5170a91070c3c4d59f6e Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 27 Jul 2025 16:25:15 +0200 Subject: [PATCH 9/9] Install pytask form github. --- pyproject.toml | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8c6d984..4ea9440 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ dependencies = [ "cloudpickle", "loky", "pluggy>=1.0.0", - "git+https://github.com/pytask-dev/pytask@allow-setting-task-status", + "pytask>=0.5.2", "rich", ] dynamic = ["version"] @@ -45,14 +45,14 @@ test = [ "nbmake", "pytest>=8.4.0", "pytest-cov>=5.0.0", - {include-group = "coiled"}, - {include-group = "dask"}, + { include-group = "coiled" }, + { include-group = "dask" }, ] typing = [ "pytask-parallel", "ty", - {include-group = "coiled"}, - {include-group = "dask"}, + { include-group = "coiled" }, + { include-group = "dask" }, ] [project.readme] @@ -76,6 +76,9 @@ pytask_parallel = "pytask_parallel.plugin" requires = ["hatchling", "hatch_vcs"] build-backend = "hatchling.build" +[tool.uv.source] +pytask = { git = "https://github.com/pytask-dev/pytask", rev = "allow-setting-task-status" } + [tool.hatch.build.hooks.vcs] version-file = "src/pytask_parallel/_version.py"