Skip to content

Commit

Permalink
Consolidates discovery of Prefect server Services to the base class (
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisguidry authored Jan 30, 2025
1 parent c590d61 commit ffa9951
Show file tree
Hide file tree
Showing 23 changed files with 565 additions and 396 deletions.
21 changes: 21 additions & 0 deletions docs/v3/develop/settings-ref.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,21 @@ The cancellation cleanup service will look for non-terminal tasks and subflows t
**Supported environment variables**:
`PREFECT_SERVER_SERVICES_CANCELLATION_CLEANUP_LOOP_SECONDS`, `PREFECT_API_SERVICES_CANCELLATION_CLEANUP_LOOP_SECONDS`

---
## ServerServicesEventLoggerSettings
Settings for controlling the event logger service
### `enabled`
Whether or not to start the event logger service in the server application.

**Type**: `boolean`

**Default**: `False`

**TOML dotted key path**: `server.services.event_logger.enabled`

**Supported environment variables**:
`PREFECT_SERVER_SERVICES_EVENT_LOGGER_ENABLED`, `PREFECT_API_SERVICES_EVENT_LOGGER_ENABLED`

---
## ServerServicesEventPersisterSettings
Settings for controlling the event persister service
Expand Down Expand Up @@ -1891,6 +1906,12 @@ Settings for controlling server services

**TOML dotted key path**: `server.services.event_persister`

### `event_logger`

**Type**: [ServerServicesEventLoggerSettings](#serverserviceseventloggersettings)

**TOML dotted key path**: `server.services.event_logger`

### `flow_run_notifications`

**Type**: [ServerServicesFlowRunNotificationsSettings](#serverservicesflowrunnotificationssettings)
Expand Down
21 changes: 21 additions & 0 deletions schemas/settings.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,23 @@
"title": "ServerServicesCancellationCleanupSettings",
"type": "object"
},
"ServerServicesEventLoggerSettings": {
"description": "Settings for controlling the event logger service",
"properties": {
"enabled": {
"default": false,
"description": "Whether or not to start the event logger service in the server application.",
"supported_environment_variables": [
"PREFECT_SERVER_SERVICES_EVENT_LOGGER_ENABLED",
"PREFECT_API_SERVICES_EVENT_LOGGER_ENABLED"
],
"title": "Enabled",
"type": "boolean"
}
},
"title": "ServerServicesEventLoggerSettings",
"type": "object"
},
"ServerServicesEventPersisterSettings": {
"description": "Settings for controlling the event persister service",
"properties": {
Expand Down Expand Up @@ -1653,6 +1670,10 @@
"$ref": "#/$defs/ServerServicesEventPersisterSettings",
"supported_environment_variables": []
},
"event_logger": {
"$ref": "#/$defs/ServerServicesEventLoggerSettings",
"supported_environment_variables": []
},
"flow_run_notifications": {
"$ref": "#/$defs/ServerServicesFlowRunNotificationsSettings",
"supported_environment_variables": []
Expand Down
106 changes: 15 additions & 91 deletions src/prefect/cli/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
import sys
import textwrap
from pathlib import Path
from types import ModuleType
from typing import TYPE_CHECKING

import typer
import uvicorn
from rich.table import Table
from rich.text import Text

import prefect
import prefect.settings
Expand All @@ -29,6 +29,7 @@
from prefect.cli.cloud import prompt_select_from_list
from prefect.cli.root import app, is_interactive
from prefect.logging import get_logger
from prefect.server.services.base import Service
from prefect.settings import (
PREFECT_API_SERVICES_LATE_RUNS_ENABLED,
PREFECT_API_SERVICES_SCHEDULER_ENABLED,
Expand Down Expand Up @@ -522,90 +523,12 @@ async def stamp(revision: str):
exit_with_success("Stamping database with revision succeeded!")


def _get_service_settings() -> dict[str, "prefect.settings.Setting"]:
"""Get mapping of service names to their enabled/disabled settings."""
return {
"Telemetry": prefect.settings.PREFECT_SERVER_ANALYTICS_ENABLED,
"Scheduler": prefect.settings.PREFECT_API_SERVICES_SCHEDULER_ENABLED,
"RecentDeploymentsScheduler": prefect.settings.PREFECT_API_SERVICES_SCHEDULER_ENABLED,
"MarkLateRuns": prefect.settings.PREFECT_API_SERVICES_LATE_RUNS_ENABLED,
"FailExpiredPauses": prefect.settings.PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_ENABLED,
"CancellationCleanup": prefect.settings.PREFECT_API_SERVICES_CANCELLATION_CLEANUP_ENABLED,
"FlowRunNotifications": prefect.settings.PREFECT_API_SERVICES_FLOW_RUN_NOTIFICATIONS_ENABLED,
"Foreman": prefect.settings.PREFECT_API_SERVICES_FOREMAN_ENABLED,
"ReactiveTriggers": prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED,
"ProactiveTriggers": prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED,
"Actions": prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED,
}


def _get_service_modules() -> list[ModuleType]:
"""Get list of modules containing service implementations."""
from prefect.server.events.services import triggers
from prefect.server.services import (
cancellation_cleanup,
flow_run_notifications,
foreman,
late_runs,
pause_expirations,
scheduler,
task_run_recorder,
telemetry,
)

return [
cancellation_cleanup,
flow_run_notifications,
foreman,
late_runs,
pause_expirations,
scheduler,
task_run_recorder,
telemetry,
triggers,
]


def _discover_service_classes() -> (
list[type["prefect.server.services.loop_service.LoopService"]]
):
"""Discover all available service classes."""
from prefect.server.services.loop_service import LoopService

discovered: list[type[LoopService]] = []
for module in _get_service_modules():
for _, obj in inspect.getmembers(module):
if (
inspect.isclass(obj)
and issubclass(obj, LoopService)
and obj != LoopService
):
discovered.append(obj)
return discovered


def _get_enabled_services() -> (
list[type["prefect.server.services.loop_service.LoopService"]]
):
"""Get list of enabled service classes."""
service_settings = _get_service_settings()
return [
svc
for svc in _discover_service_classes()
if service_settings.get(svc.__name__, False).value() # type: ignore
]


async def _run_services(
service_classes: list[type["prefect.server.services.loop_service.LoopService"]],
service_classes: list[type[Service]],
):
"""Run the given service classes until cancelled."""
services = [cls() for cls in service_classes]
tasks: list[
tuple[
asyncio.Task[None], type["prefect.server.services.loop_service.LoopService"]
]
] = []
tasks: list[tuple[asyncio.Task[None], type[Service]]] = []

for service in services:
task = asyncio.create_task(service.start())
Expand Down Expand Up @@ -663,7 +586,7 @@ def run_manager_process():
We do everything in sync so that the child won't exit until the user kills it.
"""
if not (enabled_services := _get_enabled_services()):
if not (enabled_services := Service.enabled_services()):
logger.error("No services are enabled! Exiting manager.")
sys.exit(1)

Expand All @@ -680,21 +603,22 @@ def run_manager_process():
@services_app.command(aliases=["ls"])
def list_services():
"""List all available services and their status."""
service_settings = _get_service_settings()
table = Table(title="Available Services", expand=True)
table.add_column("Name", style="blue", no_wrap=True)
table.add_column("Enabled?", style="green", no_wrap=True)
table.add_column("Name", no_wrap=True)
table.add_column("Enabled?", no_wrap=True)
table.add_column("Description", style="cyan", no_wrap=False)

for svc in _discover_service_classes():
for svc in Service.all_services():
name = svc.__name__
setting = service_settings.get(name, False)
is_enabled = setting.value() if setting else False # type: ignore
assert isinstance(is_enabled, bool), "Setting value is not a boolean"

setting_text = Text(f"✓ {svc.environment_variable_name()}", style="green")
if not svc.enabled():
setting_text = Text(f"x {svc.environment_variable_name()}", style="gray50")

doc = inspect.getdoc(svc) or ""
description = doc.split("\n", 1)[0].strip()
table.add_row(name, str(is_enabled), description)

table.add_row(name, setting_text, description)

app.console.print(table)

Expand All @@ -720,7 +644,7 @@ def start_services(
# Stale or invalid file
_cleanup_pid_file(SERVICES_PID_FILE)

if not (enabled_services := _get_enabled_services()):
if not (enabled_services := Service.enabled_services()):
app.console.print("[red]No services are enabled![/]")
raise typer.Exit(code=1)

Expand Down
10 changes: 7 additions & 3 deletions src/prefect/server/events/services/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from prefect.server.events import actions
from prefect.server.services.base import Service
from prefect.server.utilities.messaging import Consumer, create_consumer
from prefect.settings.context import get_current_settings
from prefect.settings.models.server.services import ServicesBaseSetting

if TYPE_CHECKING:
import logging
Expand All @@ -15,12 +17,14 @@


class Actions(Service):
"""Runs actions triggered by Automatinos"""

name: str = "Actions"
"""Runs the actions triggered by automations"""

consumer_task: asyncio.Task[None] | None = None

@classmethod
def service_settings(cls) -> ServicesBaseSetting:
return get_current_settings().server.services.triggers

async def start(self) -> NoReturn:
assert self.consumer_task is None, "Actions already started"
self.consumer: Consumer = create_consumer("actions")
Expand Down
8 changes: 6 additions & 2 deletions src/prefect/server/events/services/event_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from prefect.server.events.schemas.events import ReceivedEvent
from prefect.server.services.base import Service
from prefect.server.utilities.messaging import Consumer, Message, create_consumer
from prefect.settings.context import get_current_settings
from prefect.settings.models.server.services import ServicesBaseSetting

if TYPE_CHECKING:
import logging
Expand All @@ -20,10 +22,12 @@
class EventLogger(Service):
"""A debugging service that logs events to the console as they arrive."""

name: str = "EventLogger"

consumer_task: asyncio.Task[None] | None = None

@classmethod
def service_settings(cls) -> ServicesBaseSetting:
return get_current_settings().server.services.event_logger

async def start(self) -> NoReturn:
assert self.consumer_task is None, "Logger already started"
self.consumer: Consumer = create_consumer("events")
Expand Down
9 changes: 7 additions & 2 deletions src/prefect/server/events/services/event_persister.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL,
PREFECT_EVENTS_RETENTION_PERIOD,
)
from prefect.settings.context import get_current_settings
from prefect.settings.models.server.services import ServicesBaseSetting

if TYPE_CHECKING:
import logging
Expand All @@ -39,11 +41,14 @@
class EventPersister(Service):
"""A service that persists events to the database as they arrive."""

name: str = "EventLogger"

consumer_task: asyncio.Task[None] | None = None

@classmethod
def service_settings(cls) -> ServicesBaseSetting:
return get_current_settings().server.services.event_persister

def __init__(self):
super().__init__()
self._started_event: asyncio.Event | None = None

@property
Expand Down
20 changes: 14 additions & 6 deletions src/prefect/server/events/services/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,28 @@

from prefect.logging import get_logger
from prefect.server.events import triggers
from prefect.server.services.base import Service
from prefect.server.services.loop_service import LoopService
from prefect.server.services.base import LoopService, Service
from prefect.server.utilities.messaging import Consumer, create_consumer
from prefect.settings import PREFECT_EVENTS_PROACTIVE_GRANULARITY
from prefect.settings.context import get_current_settings
from prefect.settings.models.server.services import ServicesBaseSetting

if TYPE_CHECKING:
import logging


logger: "logging.Logger" = get_logger(__name__)


class ReactiveTriggers(Service):
"""Runs the reactive triggers consumer"""

name: str = "ReactiveTriggers"
"""Evaluates reactive automation triggers"""

consumer_task: asyncio.Task[None] | None = None

@classmethod
def service_settings(cls) -> ServicesBaseSetting:
return get_current_settings().server.services.triggers

async def start(self) -> NoReturn:
assert self.consumer_task is None, "Reactive triggers already started"
self.consumer: Consumer = create_consumer("events")
Expand All @@ -49,7 +53,11 @@ async def stop(self) -> None:


class ProactiveTriggers(LoopService):
"""A loop service that runs the proactive triggers consumer"""
"""Evaluates proactive automation triggers"""

@classmethod
def service_settings(cls) -> ServicesBaseSetting:
return get_current_settings().server.services.triggers

def __init__(self, loop_seconds: Optional[float] = None, **kwargs: Any):
super().__init__(
Expand Down
Loading

0 comments on commit ffa9951

Please sign in to comment.