Skip to content

Commit

Permalink
more pendulum consolidation (#16911)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Jan 30, 2025
1 parent 3ac3d54 commit c590d61
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 34 deletions.
3 changes: 1 addition & 2 deletions src/prefect/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
)
from uuid import UUID

import pendulum
from cachetools import LRUCache
from pydantic import (
BaseModel,
Expand Down Expand Up @@ -503,7 +502,7 @@ async def _exists(self, key: str) -> bool:
if metadata.expiration:
# if the result has an expiration,
# check if it is still in the future
exists = metadata.expiration > pendulum.now("utc")
exists = metadata.expiration > DateTime.now("utc")
else:
exists = True
return exists
Expand Down
14 changes: 7 additions & 7 deletions src/prefect/server/orchestration/core_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from typing import Any, Union, cast
from uuid import uuid4

import pendulum
import sqlalchemy as sa
from packaging.version import Version
from sqlalchemy import select
Expand Down Expand Up @@ -45,6 +44,7 @@
PREFECT_DEPLOYMENT_CONCURRENCY_SLOT_WAIT_SECONDS,
PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS,
)
from prefect.types._datetime import DateTime
from prefect.utilities.math import clamped_poisson_interval

from .instrumentation_policies import InstrumentFlowRunStateTransitions
Expand Down Expand Up @@ -478,7 +478,7 @@ async def before_transition( # type: ignore
await self.reject_transition(
state=states.Scheduled(
name="AwaitingConcurrencySlot",
scheduled_time=pendulum.now("UTC").add(
scheduled_time=DateTime.now("UTC").add(
seconds=PREFECT_DEPLOYMENT_CONCURRENCY_SLOT_WAIT_SECONDS.value()
),
),
Expand Down Expand Up @@ -643,7 +643,7 @@ async def before_transition(
db.TaskRunStateCache.cache_key == cache_key,
sa.or_(
db.TaskRunStateCache.cache_expiration.is_(None),
db.TaskRunStateCache.cache_expiration > pendulum.now("utc"),
db.TaskRunStateCache.cache_expiration > DateTime.now("utc"),
),
),
)
Expand Down Expand Up @@ -687,7 +687,7 @@ async def before_transition(
if run_settings.retries is None or run_count > run_settings.retries:
return # Retry count exceeded, allow transition to failed

scheduled_start_time = pendulum.now("UTC").add(
scheduled_start_time = DateTime.now("UTC").add(
seconds=run_settings.retry_delay or 0
)

Expand Down Expand Up @@ -781,7 +781,7 @@ async def before_transition(

if run_settings.retries is not None and run_count <= run_settings.retries:
retry_state = states.AwaitingRetry(
scheduled_time=pendulum.now("UTC").add(seconds=delay),
scheduled_time=DateTime.now("UTC").add(seconds=delay),
message=proposed_state.message,
data=proposed_state.data,
)
Expand Down Expand Up @@ -912,7 +912,7 @@ async def before_transition(

# At this moment, we round delay to the nearest second as the API schema
# specifies an integer return value.
delay = scheduled_time - pendulum.now("UTC")
delay = scheduled_time - DateTime.now("UTC")
delay_seconds = delay.in_seconds()
delay_seconds += round(delay.microseconds / 1e6)
if delay_seconds > 0:
Expand Down Expand Up @@ -1067,7 +1067,7 @@ async def before_transition(
)
return
pause_timeout = initial_state.state_details.pause_timeout
if pause_timeout and pause_timeout < pendulum.now("UTC"):
if pause_timeout and pause_timeout < DateTime.now("UTC"):
pause_timeout_failure = states.Failed(
message=(f"The flow was {display_state_name} and never resumed."),
)
Expand Down
10 changes: 6 additions & 4 deletions src/prefect/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import anyio
import httpx
import pendulum
from opentelemetry import propagate
from typing_extensions import TypeGuard

Expand All @@ -30,6 +29,7 @@
UnfinishedRun,
)
from prefect.logging.loggers import get_logger, get_run_logger
from prefect.types._datetime import DateTime, PendulumDuration
from prefect.utilities.annotations import BaseAnnotation
from prefect.utilities.asyncutils import in_async_main_thread, sync_compatible
from prefect.utilities.collections import ensure_iterable
Expand Down Expand Up @@ -631,7 +631,7 @@ def Scheduled(
"""
state_details = StateDetails.model_validate(kwargs.pop("state_details", {}))
if scheduled_time is None:
scheduled_time = pendulum.now("UTC")
scheduled_time = DateTime.now("UTC")
elif state_details.scheduled_time:
raise ValueError("An extra scheduled_time was provided in state_details")
state_details.scheduled_time = scheduled_time
Expand Down Expand Up @@ -729,8 +729,10 @@ def Paused(
if pause_expiration_time is None and timeout_seconds is None:
pass
else:
state_details.pause_timeout = pause_expiration_time or (
pendulum.now("UTC") + pendulum.Duration(seconds=timeout_seconds)
state_details.pause_timeout = (
DateTime.instance(pause_expiration_time)
if pause_expiration_time
else DateTime.now("UTC") + PendulumDuration(seconds=timeout_seconds or 0)
)

state_details.pause_reschedule = reschedule
Expand Down
18 changes: 9 additions & 9 deletions src/prefect/task_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from uuid import UUID

import anyio
import pendulum
from opentelemetry import trace
from typing_extensions import ParamSpec, Self

Expand Down Expand Up @@ -80,6 +79,7 @@
)
from prefect.telemetry.run_telemetry import RunTelemetry
from prefect.transactions import IsolationLevel, Transaction, transaction
from prefect.types._datetime import DateTime, PendulumDuration
from prefect.utilities._engine import get_hook_name
from prefect.utilities.annotations import NotSet
from prefect.utilities.asyncutils import run_coro_as_sync
Expand Down Expand Up @@ -437,7 +437,7 @@ def set_state(self, state: State[R], force: bool = False) -> State[R]:
if last_state.timestamp == new_state.timestamp:
# Ensure that the state timestamp is unique, or at least not equal to the last state.
# This might occur especially on Windows where the timestamp resolution is limited.
new_state.timestamp += pendulum.duration(microseconds=1)
new_state.timestamp += PendulumDuration(microseconds=1)

# Ensure that the state_details are populated with the current run IDs
new_state.state_details.task_run_id = self.task_run.id
Expand Down Expand Up @@ -486,7 +486,7 @@ def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]":

def handle_success(self, result: R, transaction: Transaction) -> R:
if self.task.cache_expiration is not None:
expiration = pendulum.now("utc") + self.task.cache_expiration
expiration = DateTime.now("utc") + self.task.cache_expiration
else:
expiration = None

Expand Down Expand Up @@ -535,7 +535,7 @@ def handle_retry(self, exc: Exception) -> bool:
else self.task.retry_delay_seconds
)
new_state = AwaitingRetry(
scheduled_time=pendulum.now("utc").add(seconds=delay)
scheduled_time=DateTime.now("utc").add(seconds=delay)
)
else:
delay = None
Expand Down Expand Up @@ -728,7 +728,7 @@ def initialize_run(
async def wait_until_ready(self) -> None:
"""Waits until the scheduled time (if its the future), then enters Running."""
if scheduled_time := self.state.state_details.scheduled_time:
sleep_time = (scheduled_time - pendulum.now("utc")).total_seconds()
sleep_time = (scheduled_time - DateTime.now("utc")).total_seconds()
await anyio.sleep(sleep_time if sleep_time > 0 else 0)
new_state = Retrying() if self.state.name == "AwaitingRetry" else Running()
self.set_state(
Expand Down Expand Up @@ -970,7 +970,7 @@ async def set_state(self, state: State, force: bool = False) -> State:
if last_state.timestamp == new_state.timestamp:
# Ensure that the state timestamp is unique, or at least not equal to the last state.
# This might occur especially on Windows where the timestamp resolution is limited.
new_state.timestamp += pendulum.duration(microseconds=1)
new_state.timestamp += PendulumDuration(microseconds=1)

# Ensure that the state_details are populated with the current run IDs
new_state.state_details.task_run_id = self.task_run.id
Expand Down Expand Up @@ -1020,7 +1020,7 @@ async def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]"

async def handle_success(self, result: R, transaction: Transaction) -> R:
if self.task.cache_expiration is not None:
expiration = pendulum.now("utc") + self.task.cache_expiration
expiration = DateTime.now("utc") + self.task.cache_expiration
else:
expiration = None

Expand Down Expand Up @@ -1068,7 +1068,7 @@ async def handle_retry(self, exc: Exception) -> bool:
else self.task.retry_delay_seconds
)
new_state = AwaitingRetry(
scheduled_time=pendulum.now("utc").add(seconds=delay)
scheduled_time=DateTime.now("utc").add(seconds=delay)
)
else:
delay = None
Expand Down Expand Up @@ -1259,7 +1259,7 @@ async def initialize_run(
async def wait_until_ready(self) -> None:
"""Waits until the scheduled time (if its the future), then enters Running."""
if scheduled_time := self.state.state_details.scheduled_time:
sleep_time = (scheduled_time - pendulum.now("utc")).total_seconds()
sleep_time = (scheduled_time - DateTime.now("utc")).total_seconds()
await anyio.sleep(sleep_time if sleep_time > 0 else 0)
new_state = Retrying() if self.state.name == "AwaitingRetry" else Running()
await self.set_state(
Expand Down
12 changes: 6 additions & 6 deletions src/prefect/task_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import anyio
import anyio.abc
import pendulum
import uvicorn
from exceptiongroup import BaseExceptionGroup # novermin
from fastapi import FastAPI
Expand All @@ -35,6 +34,7 @@
)
from prefect.states import Pending
from prefect.task_engine import run_task_async, run_task_sync
from prefect.types._datetime import DateTime
from prefect.utilities.annotations import NotSet
from prefect.utilities.asyncutils import asyncnullcontext, sync_compatible
from prefect.utilities.engine import emit_task_run_state_change_event
Expand Down Expand Up @@ -107,7 +107,7 @@ def __init__(

self.task_keys: set[str] = set(t.task_key for t in tasks if isinstance(t, Task)) # pyright: ignore[reportUnnecessaryIsInstance]

self._started_at: Optional[pendulum.DateTime] = None
self._started_at: Optional[DateTime] = None
self.stopping: bool = False

self._client = get_client()
Expand All @@ -124,7 +124,7 @@ def __init__(
self._executor = ThreadPoolExecutor(max_workers=limit if limit else None)
self._limiter = anyio.CapacityLimiter(limit) if limit else None

self.in_flight_task_runs: dict[str, dict[UUID, pendulum.DateTime]] = {
self.in_flight_task_runs: dict[str, dict[UUID, DateTime]] = {
task_key: {} for task_key in self.task_keys
}
self.finished_task_runs: dict[str, int] = {
Expand All @@ -136,7 +136,7 @@ def client_id(self) -> str:
return f"{socket.gethostname()}-{os.getpid()}"

@property
def started_at(self) -> Optional[pendulum.DateTime]:
def started_at(self) -> Optional[DateTime]:
return self._started_at

@property
Expand Down Expand Up @@ -256,7 +256,7 @@ async def _subscribe_to_task_scheduling(self):
)

async def _safe_submit_scheduled_task_run(self, task_run: TaskRun):
self.in_flight_task_runs[task_run.task_key][task_run.id] = pendulum.now()
self.in_flight_task_runs[task_run.task_key][task_run.id] = DateTime.now()
try:
await self._submit_scheduled_task_run(task_run)
except BaseException as exc:
Expand Down Expand Up @@ -379,7 +379,7 @@ async def __aenter__(self) -> Self:
await self._exit_stack.enter_async_context(self._runs_task_group)
self._exit_stack.enter_context(self._executor)

self._started_at = pendulum.now()
self._started_at = DateTime.now()
return self

async def __aexit__(self, *exc_info: Any) -> None:
Expand Down
4 changes: 2 additions & 2 deletions tests/fixtures/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def frozen_time(tz: Optional[Union[str, Timezone]] = None):

@pytest.fixture
def advance_time(monkeypatch: pytest.MonkeyPatch) -> Callable[[timedelta], DateTime]:
clock = pendulum.now("UTC")
clock = DateTime.now("UTC")

def advance(amount: timedelta):
nonlocal clock
Expand All @@ -39,6 +39,6 @@ def nowish(tz: Optional[Union[str, Timezone]] = None):

return clock.in_timezone(tz)

monkeypatch.setattr(pendulum, "now", nowish)
monkeypatch.setattr(DateTime, "now", nowish)

return advance
8 changes: 4 additions & 4 deletions tests/server/orchestration/test_core_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3587,7 +3587,7 @@ async def test_secure_concurrency_slots(
)

with mock.patch(
"prefect.server.orchestration.core_policy.pendulum.now"
"prefect.server.orchestration.core_policy.DateTime.now"
) as mock_pendulum_now:
expected_now: pendulum.DateTime = pendulum.parse("2024-01-01T00:00:00Z") # type: ignore
mock_pendulum_now.return_value = expected_now
Expand Down Expand Up @@ -3646,7 +3646,7 @@ async def test_release_concurrency_slots(
)

with mock.patch(
"prefect.server.orchestration.core_policy.pendulum.now"
"prefect.server.orchestration.core_policy.DateTime.now"
) as mock_pendulum_now:
expected_now: pendulum.DateTime = pendulum.parse("2024-01-01T00:00:00Z") # type: ignore
mock_pendulum_now.return_value = expected_now
Expand Down Expand Up @@ -3779,7 +3779,7 @@ async def test_enqueue_collision_strategy(
)

with mock.patch(
"prefect.server.orchestration.core_policy.pendulum.now"
"prefect.server.orchestration.core_policy.DateTime.now"
) as mock_pendulum_now:
expected_now: pendulum.DateTime = pendulum.parse("2024-01-01T00:00:00Z") # type: ignore
mock_pendulum_now.return_value = expected_now
Expand Down Expand Up @@ -3864,7 +3864,7 @@ async def test_uses_enqueue_collision_strategy_by_default(
)

with mock.patch(
"prefect.server.orchestration.core_policy.pendulum.now"
"prefect.server.orchestration.core_policy.DateTime.now"
) as mock_pendulum_now:
expected_now: pendulum.DateTime = pendulum.parse("2024-01-01T00:00:00Z") # type: ignore
mock_pendulum_now.return_value = expected_now
Expand Down

0 comments on commit c590d61

Please sign in to comment.