Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Used anyio.abc full namespace instead of from imports #6784

Merged
merged 2 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/prefect/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from uuid import UUID

import anyio
import anyio.abc
import anyio.to_process
import pendulum
from anyio.abc import TaskGroup, TaskStatus

from prefect.blocks.core import Block
from prefect.client import OrionClient, get_client
Expand Down Expand Up @@ -41,7 +41,7 @@ def __init__(
self.submitting_flow_run_ids = set()
self.started = False
self.logger = get_logger("agent")
self.task_group: Optional[TaskGroup] = None
self.task_group: Optional[anyio.abc.TaskGroup] = None
self.client: Optional[OrionClient] = None

self._work_queue_cache_expiration: pendulum.DateTime = None
Expand Down Expand Up @@ -214,7 +214,7 @@ async def _submit_run_and_capture_errors(
self,
flow_run: FlowRun,
infrastructure: Infrastructure,
task_status: TaskStatus = None,
task_status: anyio.abc.TaskStatus = None,
) -> Union[InfrastructureResult, Exception]:

# Note: There is not a clear way to determine if task_status.started() has been
Expand Down
8 changes: 4 additions & 4 deletions src/prefect/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
Union,
)

import anyio.abc
import pendulum
from anyio.abc import BlockingPortal, CancelScope, TaskGroup
from pydantic import BaseModel, Field, PrivateAttr

import prefect.logging
Expand Down Expand Up @@ -222,11 +222,11 @@ class FlowRunContext(RunContext):

# The synchronous portal is only created for async flows for creating engine calls
# from synchronous task and subflow calls
sync_portal: Optional[BlockingPortal] = None
timeout_scope: Optional[CancelScope] = None
sync_portal: Optional[anyio.abc.BlockingPortal] = None
timeout_scope: Optional[anyio.abc.CancelScope] = None

# Task group that can be used for background tasks during the flow run
background_tasks: TaskGroup
background_tasks: anyio.abc.TaskGroup

__var__ = ContextVar("flow_run")

Expand Down
4 changes: 2 additions & 2 deletions src/prefect/infrastructure/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import abc
from typing import TYPE_CHECKING, Dict, List, Optional

import anyio.abc
import pydantic
from anyio.abc import TaskStatus
from typing_extensions import Self

import prefect
Expand Down Expand Up @@ -50,7 +50,7 @@ class Infrastructure(Block, abc.ABC):
@abc.abstractmethod
async def run(
self,
task_status: TaskStatus = None,
task_status: anyio.abc.TaskStatus = None,
) -> InfrastructureResult:
"""
Run the infrastructure.
Expand Down
4 changes: 2 additions & 2 deletions src/prefect/infrastructure/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Tuple

import anyio.abc
import packaging.version
from anyio.abc import TaskStatus
from pydantic import Field, validator
from slugify import slugify
from typing_extensions import Literal
Expand Down Expand Up @@ -232,7 +232,7 @@ def check_volume_format(cls, volumes):
@sync_compatible
async def run(
self,
task_status: Optional[TaskStatus] = None,
task_status: Optional[anyio.abc.TaskStatus] = None,
) -> Optional[bool]:
if not self.command:
raise ValueError("Docker container cannot be run with empty command.")
Expand Down
4 changes: 2 additions & 2 deletions src/prefect/infrastructure/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Union

import anyio.abc
import yaml
from anyio.abc import TaskStatus
from pydantic import Field, validator
from slugify import slugify
from typing_extensions import Literal
Expand Down Expand Up @@ -209,7 +209,7 @@ def customize_from_file(cls, filename: str) -> JsonPatch:
@sync_compatible
async def run(
self,
task_status: Optional[TaskStatus] = None,
task_status: Optional[anyio.abc.TaskStatus] = None,
) -> Optional[bool]:
if not self.command:
raise ValueError("Kubernetes job cannot be run with empty command.")
Expand Down
4 changes: 2 additions & 2 deletions src/prefect/infrastructure/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import tempfile
from typing import Optional

import anyio.abc
import sniffio
from anyio.abc import TaskStatus
from pydantic import Field
from typing_extensions import Literal

Expand Down Expand Up @@ -56,7 +56,7 @@ class Process(Infrastructure):
@sync_compatible
async def run(
self,
task_status: TaskStatus = None,
task_status: anyio.abc.TaskStatus = None,
) -> Optional[bool]:
if not self.command:
raise ValueError("Process cannot be run with empty command.")
Expand Down
4 changes: 2 additions & 2 deletions src/prefect/task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
from prefect.utilities.collections import AutoEnum

if TYPE_CHECKING:
from anyio.abc import TaskGroup
import anyio.abc

from prefect.logging import get_logger
from prefect.orion.schemas.states import State
Expand Down Expand Up @@ -229,7 +229,7 @@ def __init__(self):
# TODO: Consider adding `max_workers` support using anyio capacity limiters

# Runtime attributes
self._task_group: TaskGroup = None
self._task_group: anyio.abc.TaskGroup = None
self._results: Dict[UUID, Any] = {}
self._keys: Set[UUID] = set()
super().__init__()
Expand Down