Skip to content

Commit

Permalink
Consolidates discovery of Prefect server Services to the base class
Browse files Browse the repository at this point in the history
Following on the work in #16898 to unify Prefect's `Service`
implementations, this moves the discovery routine from the CLI to the
base `Service` implementation.  This also includes a couple of
additional niceties:

- Each service is now responsible for knowing which `Setting` enables
  it.  This eventually means that when adding a new service, we won't
  need to modify the `api/server.py` lifespan or the CLI.  The
  conditional toggling of services on and off will be handled by asking
  the service if it should be enabled.

- The way service names are handles is more uniform across all the
  subclasses.  I also cleaned up the descriptions of all services to
  make them more punchy and imperative.

- The table of services uses the setting name as part of how it shows
  whether the service is enabled, which means folks shouldn't have to
  hunt through the code or docs to find the setting.

We do still have an explicit listing of all the known service modules,
because I wasn't too keen to go down the rabbithole of automatic
discovery from entrypoints or anything like that.  Definitely open to
ideas on a better place for that.

This version doesn't unify how the `api/server.py` lifespan starts
services, but that will be coming in a future update.
  • Loading branch information
chrisguidry committed Jan 30, 2025
1 parent 0b7c511 commit 72599c9
Show file tree
Hide file tree
Showing 16 changed files with 434 additions and 379 deletions.
105 changes: 13 additions & 92 deletions src/prefect/cli/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
import sys
import textwrap
from pathlib import Path
from types import ModuleType
from typing import TYPE_CHECKING

import typer
import uvicorn
from rich.table import Table
from rich.text import Text

import prefect
import prefect.settings
Expand All @@ -29,6 +29,7 @@
from prefect.cli.cloud import prompt_select_from_list
from prefect.cli.root import app, is_interactive
from prefect.logging import get_logger
from prefect.server.services.base import Service
from prefect.settings import (
PREFECT_API_SERVICES_LATE_RUNS_ENABLED,
PREFECT_API_SERVICES_SCHEDULER_ENABLED,
Expand Down Expand Up @@ -522,90 +523,12 @@ async def stamp(revision: str):
exit_with_success("Stamping database with revision succeeded!")


def _get_service_settings() -> dict[str, "prefect.settings.Setting"]:
"""Get mapping of service names to their enabled/disabled settings."""
return {
"Telemetry": prefect.settings.PREFECT_SERVER_ANALYTICS_ENABLED,
"Scheduler": prefect.settings.PREFECT_API_SERVICES_SCHEDULER_ENABLED,
"RecentDeploymentsScheduler": prefect.settings.PREFECT_API_SERVICES_SCHEDULER_ENABLED,
"MarkLateRuns": prefect.settings.PREFECT_API_SERVICES_LATE_RUNS_ENABLED,
"FailExpiredPauses": prefect.settings.PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_ENABLED,
"CancellationCleanup": prefect.settings.PREFECT_API_SERVICES_CANCELLATION_CLEANUP_ENABLED,
"FlowRunNotifications": prefect.settings.PREFECT_API_SERVICES_FLOW_RUN_NOTIFICATIONS_ENABLED,
"Foreman": prefect.settings.PREFECT_API_SERVICES_FOREMAN_ENABLED,
"ReactiveTriggers": prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED,
"ProactiveTriggers": prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED,
"Actions": prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED,
}


def _get_service_modules() -> list[ModuleType]:
"""Get list of modules containing service implementations."""
from prefect.server.events.services import triggers
from prefect.server.services import (
cancellation_cleanup,
flow_run_notifications,
foreman,
late_runs,
pause_expirations,
scheduler,
task_run_recorder,
telemetry,
)

return [
cancellation_cleanup,
flow_run_notifications,
foreman,
late_runs,
pause_expirations,
scheduler,
task_run_recorder,
telemetry,
triggers,
]


def _discover_service_classes() -> (
list[type["prefect.server.services.loop_service.LoopService"]]
):
"""Discover all available service classes."""
from prefect.server.services.loop_service import LoopService

discovered: list[type[LoopService]] = []
for module in _get_service_modules():
for _, obj in inspect.getmembers(module):
if (
inspect.isclass(obj)
and issubclass(obj, LoopService)
and obj != LoopService
):
discovered.append(obj)
return discovered


def _get_enabled_services() -> (
list[type["prefect.server.services.loop_service.LoopService"]]
):
"""Get list of enabled service classes."""
service_settings = _get_service_settings()
return [
svc
for svc in _discover_service_classes()
if service_settings.get(svc.__name__, False).value() # type: ignore
]


async def _run_services(
service_classes: list[type["prefect.server.services.loop_service.LoopService"]],
service_classes: list[type[Service]],
):
"""Run the given service classes until cancelled."""
services = [cls() for cls in service_classes]
tasks: list[
tuple[
asyncio.Task[None], type["prefect.server.services.loop_service.LoopService"]
]
] = []
tasks: list[tuple[asyncio.Task[None], type[Service]]] = []

for service in services:
task = asyncio.create_task(service.start())
Expand Down Expand Up @@ -663,7 +586,7 @@ def run_manager_process():
We do everything in sync so that the child won't exit until the user kills it.
"""
if not (enabled_services := _get_enabled_services()):
if not (enabled_services := Service.enabled_services()):
logger.error("No services are enabled! Exiting manager.")
sys.exit(1)

Expand All @@ -680,21 +603,19 @@ def run_manager_process():
@services_app.command(aliases=["ls"])
def list_services():
"""List all available services and their status."""
service_settings = _get_service_settings()
table = Table(title="Available Services", expand=True)
table.add_column("Name", style="blue", no_wrap=True)
table.add_column("Enabled?", style="green", no_wrap=True)
table.add_column("Name", no_wrap=True)
table.add_column("Enabled?", no_wrap=True)
table.add_column("Description", style="cyan", no_wrap=False)

for svc in _discover_service_classes():
for svc in Service.all_services():
name = svc.__name__
setting = service_settings.get(name, False)
is_enabled = setting.value() if setting else False # type: ignore
assert isinstance(is_enabled, bool), "Setting value is not a boolean"

doc = inspect.getdoc(svc) or ""
description = doc.split("\n", 1)[0].strip()
table.add_row(name, str(is_enabled), description)
setting_text = Text(f"✓ {svc.enabled_setting().name}", style="green")
if not svc.enabled():
setting_text = Text(f"x {svc.enabled_setting().name}", style="gray50")
table.add_row(name, setting_text, description)

app.console.print(table)

Expand All @@ -720,7 +641,7 @@ def start_services(
# Stale or invalid file
_cleanup_pid_file(SERVICES_PID_FILE)

if not (enabled_services := _get_enabled_services()):
if not (enabled_services := Service.enabled_services()):
app.console.print("[red]No services are enabled![/]")
raise typer.Exit(code=1)

Expand Down
9 changes: 6 additions & 3 deletions src/prefect/server/events/services/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
from typing import TYPE_CHECKING, NoReturn

import prefect.settings
from prefect.logging import get_logger
from prefect.server.events import actions
from prefect.server.services.base import Service
Expand All @@ -15,12 +16,14 @@


class Actions(Service):
"""Runs actions triggered by Automatinos"""

name: str = "Actions"
"""Runs the actions triggered by automations"""

consumer_task: asyncio.Task[None] | None = None

@classmethod
def enabled_setting(cls) -> prefect.settings.Setting:
return prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED

async def start(self) -> NoReturn:
assert self.consumer_task is None, "Actions already started"
self.consumer: Consumer = create_consumer("actions")
Expand Down
7 changes: 5 additions & 2 deletions src/prefect/server/events/services/event_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pendulum
import rich

import prefect.settings
from prefect.logging import get_logger
from prefect.server.events.schemas.events import ReceivedEvent
from prefect.server.services.base import Service
Expand All @@ -20,10 +21,12 @@
class EventLogger(Service):
"""A debugging service that logs events to the console as they arrive."""

name: str = "EventLogger"

consumer_task: asyncio.Task[None] | None = None

@classmethod
def enabled_setting(cls) -> prefect.settings.Setting:
return prefect.settings.PREFECT_API_SERVICES_EVENT_LOGGER_ENABLED

async def start(self) -> NoReturn:
assert self.consumer_task is None, "Logger already started"
self.consumer: Consumer = create_consumer("events")
Expand Down
8 changes: 6 additions & 2 deletions src/prefect/server/events/services/event_persister.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pendulum
import sqlalchemy as sa

import prefect.settings
from prefect.logging import get_logger
from prefect.server.database import provide_database_interface
from prefect.server.events.schemas.events import ReceivedEvent
Expand All @@ -39,11 +40,14 @@
class EventPersister(Service):
"""A service that persists events to the database as they arrive."""

name: str = "EventLogger"

consumer_task: asyncio.Task[None] | None = None

@classmethod
def enabled_setting(cls) -> prefect.settings.Setting:
return prefect.settings.PREFECT_API_SERVICES_EVENT_PERSISTER_ENABLED

def __init__(self):
super().__init__()
self._started_event: asyncio.Event | None = None

@property
Expand Down
19 changes: 13 additions & 6 deletions src/prefect/server/events/services/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,29 @@
import asyncio
from typing import TYPE_CHECKING, Any, NoReturn, Optional

import prefect.settings
from prefect.logging import get_logger
from prefect.server.events import triggers
from prefect.server.services.base import Service
from prefect.server.services.loop_service import LoopService
from prefect.server.services.base import LoopService, Service
from prefect.server.utilities.messaging import Consumer, create_consumer
from prefect.settings import PREFECT_EVENTS_PROACTIVE_GRANULARITY

if TYPE_CHECKING:
import logging


logger: "logging.Logger" = get_logger(__name__)


class ReactiveTriggers(Service):
"""Runs the reactive triggers consumer"""

name: str = "ReactiveTriggers"
"""Evaluates reactive automation triggers"""

consumer_task: asyncio.Task[None] | None = None

@classmethod
def enabled_setting(cls) -> prefect.settings.Setting:
return prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED

async def start(self) -> NoReturn:
assert self.consumer_task is None, "Reactive triggers already started"
self.consumer: Consumer = create_consumer("events")
Expand All @@ -49,7 +52,11 @@ async def stop(self) -> None:


class ProactiveTriggers(LoopService):
"""A loop service that runs the proactive triggers consumer"""
"""Evaluates proactive automation triggers"""

@classmethod
def enabled_setting(cls) -> prefect.settings.Setting:
return prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED

def __init__(self, loop_seconds: Optional[float] = None, **kwargs: Any):
super().__init__(
Expand Down
Loading

0 comments on commit 72599c9

Please sign in to comment.