Skip to content

Commit

Permalink
Update prefect deploy to use PATCH /deployments/{id} endpoint for…
Browse files Browse the repository at this point in the history
… existing deployments
  • Loading branch information
desertaxle committed Feb 7, 2025
1 parent 41ba1e9 commit 6467278
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 23 deletions.
14 changes: 14 additions & 0 deletions docs/v3/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -16782,6 +16782,20 @@
],
"title": "Enforce Parameter Schema",
"description": "Whether or not the deployment should enforce the parameter schema."
},
"pull_steps": {
"anyOf": [
{
"items": {
"type": "object"
},
"type": "array"
},
{
"type": "null"
}
],
"title": "Pull Steps"
}
},
"additionalProperties": false,
Expand Down
99 changes: 77 additions & 22 deletions src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from prefect.cli.root import app, is_interactive
from prefect.client.base import ServerType
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import DeploymentScheduleCreate
from prefect.client.schemas.actions import DeploymentScheduleCreate, DeploymentUpdate
from prefect.client.schemas.filters import WorkerFilter
from prefect.client.schemas.objects import ConcurrencyLimitConfig
from prefect.client.schemas.schedules import (
Expand Down Expand Up @@ -732,27 +732,82 @@ async def _run_single_deploy(

flow_id = await client.create_flow_from_name(deploy_config["flow_name"])

deployment_id = await client.create_deployment(
flow_id=flow_id,
name=deploy_config.get("name"),
work_queue_name=get_from_dict(deploy_config, "work_pool.work_queue_name"),
work_pool_name=get_from_dict(deploy_config, "work_pool.name"),
version=deploy_config.get("version"),
schedules=deploy_config.get("schedules"),
paused=deploy_config.get("paused"),
enforce_parameter_schema=deploy_config.get("enforce_parameter_schema", True),
parameter_openapi_schema=deploy_config.get(
"parameter_openapi_schema"
).model_dump_for_openapi(),
parameters=deploy_config.get("parameters"),
description=deploy_config.get("description"),
tags=deploy_config.get("tags", []),
concurrency_limit=deploy_config.get("concurrency_limit"),
concurrency_options=deploy_config.get("concurrency_options"),
entrypoint=deploy_config.get("entrypoint"),
pull_steps=pull_steps,
job_variables=get_from_dict(deploy_config, "work_pool.job_variables"),
)
try:
existing_deployment = await client.read_deployment_by_name(
name=f"{deploy_config.get('flow_name')}/{deploy_config.get('name')}"
)
except ObjectNotFound:
existing_deployment = None

if existing_deployment:
deployment_id = existing_deployment.id
update = DeploymentUpdate(pull_steps=pull_steps)
if (
get_from_dict(deploy_config, "work_pool.work_queue_name", NotSet)
is not NotSet
):
update.work_pool_name = get_from_dict(deploy_config, "work_pool.name")
if get_from_dict(deploy_config, "work_pool.name", NotSet) is not NotSet:
update.work_queue_name = get_from_dict(
deploy_config, "work_pool.work_queue_name"
)
if deploy_config.get("version", NotSet) is not NotSet:
update.version = deploy_config.get("version")
if deploy_config.get("schedules", NotSet) is not NotSet:
update.schedules = deploy_config.get("schedules")
if deploy_config.get("paused", NotSet) is not NotSet:
update.paused = deploy_config.get("paused")
if deploy_config.get("enforce_parameter_schema", NotSet) is not NotSet:
update.enforce_parameter_schema = deploy_config.get(
"enforce_parameter_schema"
)
if deploy_config.get("parameter_openapi_schema", NotSet) is not NotSet:
update.parameter_openapi_schema = deploy_config.get(
"parameter_openapi_schema"
).model_dump_for_openapi()
if deploy_config.get("parameters", NotSet) is not NotSet:
update.parameters = deploy_config.get("parameters")
if deploy_config.get("description", NotSet) is not NotSet:
update.description = deploy_config.get("description")
if deploy_config.get("tags", NotSet) is not NotSet:
update.tags = deploy_config.get("tags")
if deploy_config.get("concurrency_limit", NotSet) is not NotSet:
update.concurrency_limit = deploy_config.get("concurrency_limit")
if deploy_config.get("concurrency_options", NotSet) is not NotSet:
update.concurrency_options = deploy_config.get("concurrency_options")
if deploy_config.get("entrypoint", NotSet) is not NotSet:
update.entrypoint = deploy_config.get("entrypoint")
if (
get_from_dict(deploy_config, "work_pool.job_variables", NotSet)
is not NotSet
):
update.job_variables = deploy_config.get("job_variables")

await client.update_deployment(deployment_id=deployment_id, deployment=update)
else:
deployment_id = await client.create_deployment(
flow_id=flow_id,
name=deploy_config.get("name"),
work_queue_name=get_from_dict(deploy_config, "work_pool.work_queue_name"),
work_pool_name=get_from_dict(deploy_config, "work_pool.name"),
version=deploy_config.get("version"),
schedules=deploy_config.get("schedules"),
paused=deploy_config.get("paused"),
enforce_parameter_schema=deploy_config.get(
"enforce_parameter_schema", True
),
parameter_openapi_schema=deploy_config.get(
"parameter_openapi_schema"
).model_dump_for_openapi(),
parameters=deploy_config.get("parameters"),
description=deploy_config.get("description"),
tags=deploy_config.get("tags", []),
concurrency_limit=deploy_config.get("concurrency_limit"),
concurrency_options=deploy_config.get("concurrency_options"),
entrypoint=deploy_config.get("entrypoint"),
pull_steps=pull_steps,
job_variables=get_from_dict(deploy_config, "work_pool.job_variables"),
)

await _create_deployment_triggers(client, deployment_id, triggers)

Expand Down
2 changes: 2 additions & 0 deletions src/prefect/client/schemas/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ def remove_old_fields(cls, values: dict[str, Any]) -> dict[str, Any]:
"Whether or not the deployment should enforce the parameter schema."
),
)
parameter_openapi_schema: Optional[dict[str, Any]] = Field(default_factory=dict)
pull_steps: Optional[list[dict[str, Any]]] = Field(default=None)

def check_valid_configuration(self, base_job_template: dict[str, Any]) -> None:
"""Check that the combination of base_job_template defaults
Expand Down
1 change: 1 addition & 0 deletions src/prefect/server/schemas/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ def remove_old_fields(cls, values: dict[str, Any]) -> dict[str, Any]:
"Whether or not the deployment should enforce the parameter schema."
),
)
pull_steps: Optional[List[dict[str, Any]]] = Field(None)
model_config: ClassVar[ConfigDict] = ConfigDict(populate_by_name=True)

def check_valid_configuration(self, base_job_template: dict[str, Any]) -> None:
Expand Down
74 changes: 73 additions & 1 deletion tests/cli/test_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
_initialize_deployment_triggers,
)
from prefect.client.orchestration import PrefectClient, ServerType
from prefect.client.schemas.actions import WorkPoolCreate
from prefect.client.schemas.actions import DeploymentScheduleCreate, WorkPoolCreate
from prefect.client.schemas.objects import Worker, WorkerStatus, WorkPool
from prefect.client.schemas.schedules import (
CronSchedule,
Expand Down Expand Up @@ -2913,6 +2913,78 @@ async def test_deploy_with_inactive_schedule(
assert deployment_schedule.schedule.cron == "0 4 * * *"
assert deployment_schedule.schedule.timezone == "America/Chicago"

@pytest.mark.usefixtures("project_dir")
async def test_deploy_does_not_activate_schedule_outside_of_yaml(
self, prefect_client: PrefectClient, work_pool: WorkPool
):
prefect_file = Path("prefect.yaml")
with prefect_file.open(mode="r") as f:
deploy_config = yaml.safe_load(f)

# Create a deployment with a schedule that is not active
deploy_config["deployments"][0]["name"] = "test-name"
deploy_config["deployments"][0]["schedules"] = [
{
"cron": "0 4 * * *",
"timezone": "America/Chicago",
"active": False,
"slug": "test-yaml-slug",
}
]

with prefect_file.open(mode="w") as f:
yaml.safe_dump(deploy_config, f)

result = await run_sync_in_worker_thread(
invoke_and_assert,
command=f"deploy ./flows/hello.py:my_flow -n test-name --pool {work_pool.name}",
)

assert result.exit_code == 0

deployment = await prefect_client.read_deployment_by_name(
"An important name/test-name"
)

deployment_schedule = deployment.schedules[0]
assert deployment_schedule.active is False
assert deployment_schedule.schedule.cron == "0 4 * * *"
assert deployment_schedule.schedule.timezone == "America/Chicago"

# Create another schedule outside of the yaml
# Using the https client directly because the PrefectClient does not support
# creating schedules with slugs
await prefect_client._client.post(
f"/deployments/{deployment.id}/schedules",
json=[
DeploymentScheduleCreate(
schedule=CronSchedule(cron="0 4 * * *"),
active=False,
slug="test-client-slug",
).model_dump(mode="json"),
],
)

deploy_config["deployments"][0]["schedules"][0]["active"] = True

with prefect_file.open(mode="w") as f:
yaml.safe_dump(deploy_config, f)

result = await run_sync_in_worker_thread(
invoke_and_assert,
command=f"deploy ./flows/hello.py:my_flow -n test-name --pool {work_pool.name}",
)

assert result.exit_code == 0

deployment = await prefect_client.read_deployment_by_name(
"An important name/test-name"
)

assert len(deployment.schedules) == 2
assert deployment.schedules[0].active is True
assert deployment.schedules[1].active is False

@pytest.mark.usefixtures("project_dir")
async def test_yaml_null_schedules(
self, prefect_client: PrefectClient, work_pool: WorkPool
Expand Down
2 changes: 2 additions & 0 deletions ui-v2/src/api/prefect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5782,6 +5782,8 @@ export interface components {
* @description Whether or not the deployment should enforce the parameter schema.
*/
enforce_parameter_schema?: boolean | null;
/** Pull Steps */
pull_steps?: Record<string, never>[] | null;
};
/**
* DoNothing
Expand Down

0 comments on commit 6467278

Please sign in to comment.