Skip to content

Commit

Permalink
feat: command to split traffic among revisions
Browse files Browse the repository at this point in the history
  • Loading branch information
jjleng committed May 16, 2024
1 parent dc0a77d commit 0690bc1
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 14 deletions.
3 changes: 1 addition & 2 deletions paka/cli/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import time
from typing import List

import click
import typer

from paka.cli.utils import load_cluster_manager, load_kubeconfig
Expand Down Expand Up @@ -65,7 +64,7 @@ def down(
"""
Tears down the Kubernetes cluster, removing all associated resources and data.
"""
if yes or click.confirm(
if yes or typer.confirm(
f"Are you sure you want to proceed with the operation? Please note that "
"all resources and data will be permanently deleted.",
default=False,
Expand Down
143 changes: 134 additions & 9 deletions paka/cli/function.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

import os
import re
from datetime import datetime, timezone
from typing import Literal, Optional
from typing import List, Literal, Optional, Tuple

import click
import typer
Expand All @@ -20,12 +21,63 @@
delete_knative_service,
list_knative_revisions,
list_knative_services,
split_traffic_among_revisions,
)
from paka.logger import logger
from paka.utils import kubify_name

function_app = typer.Typer()

VALID_PERCENTAGE_RANGE = (0, 100)


def validate_traffic_split(split: str) -> Tuple[str, int]:
"""
Validate a single traffic split string and return the revision and percentage.
Args:
split (str): The traffic split string in the format 'revision=percentage'.
Returns:
Tuple[str, int]: A tuple containing the revision and percentage.
Raises:
ValueError: If the input string is not in the expected format or the percentage is out of range.
"""
if "=" not in split:
raise ValueError(f"Invalid format, missing '=': {split}")

revision, percent_str = split.split("=")
if not percent_str.strip().isdigit():
raise ValueError(f"Invalid format or non-numeric percentage: {split}")

percent = int(percent_str.strip())
if percent not in range(*VALID_PERCENTAGE_RANGE):
raise ValueError(
f"Traffic percentage out of valid range ({VALID_PERCENTAGE_RANGE[0]}-{VALID_PERCENTAGE_RANGE[1]}): {percent}"
)

return revision, percent


def process_traffic_splits(
traffic_splits: List[str],
) -> Tuple[List[Tuple[str, int]], int]:
total_traffic_percent = 0
splits = []
revisions = set()
for split_str in traffic_splits:
for split in re.split(r",\s*", split_str):
revision, percent = validate_traffic_split(split)
if revision in revisions:
logger.error(f"Error: Duplicate revision '{revision}' provided.")
raise typer.Exit(1)
revisions.add(revision)
splits.append((revision, percent))
total_traffic_percent += percent

return splits, total_traffic_percent


@function_app.command()
def deploy(
Expand Down Expand Up @@ -192,11 +244,11 @@ def list_revisions(
"-c",
help="The name of the cluster.",
),
service_name: Optional[str] = typer.Option(
name: Optional[str] = typer.Option(
None,
"--service",
"-s",
help="The name of the service to list revisions for. If not provided, list revisions for all services.",
"--name",
"-n",
help="The name of the function to list revisions for. If not provided, list revisions for all services.",
),
) -> None:
"""
Expand All @@ -206,9 +258,7 @@ def list_revisions(
None
"""
load_kubeconfig(cluster_name)
revisions = list_knative_revisions(
get_cluster_namespace(cluster_name), service_name
)
revisions = list_knative_revisions(get_cluster_namespace(cluster_name), name)

if not revisions:
logger.info("No revisions found.")
Expand Down Expand Up @@ -272,6 +322,81 @@ def list_revisions(
)


@function_app.command()
def update_traffic(
cluster_name: Optional[str] = typer.Option(
os.getenv("PAKA_CURRENT_CLUSTER"),
"--cluster",
"-c",
help="The name of the cluster.",
),
name: str = typer.Argument(
...,
help="The name of the function to update traffic for.",
),
traffic_splits: List[str] = typer.Option(
[],
"--traffic",
help="Specify traffic splits for each revision in the format 'revision=percentage'. "
"Multiple splits can be provided.",
show_default=False,
),
latest_revision_traffic: int = typer.Option(
0,
"--latest-revision-traffic",
help="The percentage of traffic to send to the latest revision.",
),
yes: bool = typer.Option(
False,
"--yes",
"-y",
help="Automatic yes to prompts. Use this option to bypass the confirmation "
"prompt and directly proceed with the operation.",
),
) -> None:
"""
Update the traffic distribution among the revisions of a function.
You can provide the traffic splits in two ways:
1. Using multiple `--traffic` options, e.g.:
`update_traffic --traffic revision1=50 --traffic revision2=30 --traffic revision3=20`
2. Using a comma-separated list within a single `--traffic` option, e.g.:
`update_traffic --traffic revision1=50,revision2=30,revision3=20`
If the total traffic percentage is less than 100% and `--latest-revision-traffic` is not provided,
the user will be prompted to confirm whether the remaining traffic should be assigned to the latest revision.
"""
splits, total_traffic_percent = process_traffic_splits(traffic_splits)

if total_traffic_percent + latest_revision_traffic > 100:
logger.error("Total traffic percent should not exceed 100%")
raise typer.Exit(1)

if total_traffic_percent < 100 and latest_revision_traffic == 0:
remaining_traffic = 100 - total_traffic_percent
confirm = yes or typer.confirm(
f"Assign remaining {remaining_traffic}% traffic to the latest revision?",
default=True,
)
if confirm:
latest_revision_traffic = remaining_traffic
else:
logger.error("Traffic distribution aborted by user.")
raise typer.Abort()

load_kubeconfig(cluster_name)
logger.info(f"Updating traffic for function {name}")
split_traffic_among_revisions(
get_cluster_namespace(cluster_name),
name,
splits,
latest_revision_traffic,
)
logger.info(f"Successfully updated traffic for service {name}")


@function_app.command()
def delete(
name: str,
Expand Down Expand Up @@ -300,7 +425,7 @@ def delete(
Returns:
None
"""
if yes or click.confirm(
if yes or typer.confirm(
f"Are you sure you want to delete the function {name}?", default=False
):
load_kubeconfig(cluster_name)
Expand Down
3 changes: 1 addition & 2 deletions paka/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
from typing import Optional

import click
import typer
from kubernetes import client

Expand Down Expand Up @@ -150,7 +149,7 @@ def delete(
Returns:
None
"""
if yes or click.confirm(
if yes or typer.confirm(
f"Are you sure you want to delete the job {name}?", default=False
):
load_kubeconfig(cluster_name)
Expand Down
72 changes: 71 additions & 1 deletion paka/k8s/function/service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import shlex
from typing import Any, Literal, Optional, Tuple
from typing import Any, List, Literal, Optional, Tuple

from kubernetes import client
from kubernetes.dynamic import DynamicClient # type: ignore
Expand Down Expand Up @@ -233,3 +233,73 @@ def list_knative_revisions(namespace: str, service_name: Optional[str] = None) -
),
reverse=True,
)


def split_traffic_among_revisions(
namespace: str,
service_name: str,
traffic_splits: List[Tuple[str, int]],
latest_revision_traffic: int,
) -> None:
"""
Split traffic among the specified revisions of a service.
Args:
service_name (str): The name of the service.
namespace (str): The namespace of the service.
traffic_splits (List[Tuple[str, int]]): A list of tuples, where each tuple contains a revision name and a traffic percent.
latest_revision_traffic (int): The traffic percent to assign to the latest revision.
Raises:
ValueError: If the traffic percent is not valid.
Returns:
None
"""
total_traffic_percent = (
sum(percent for _, percent in traffic_splits) + latest_revision_traffic
)

if not all(0 <= percent <= 100 for _, percent in traffic_splits) or not (
0 <= latest_revision_traffic <= 100
):
raise ValueError("All traffic percents must be between 0 and 100")

if total_traffic_percent != 100:
raise ValueError("Total traffic percent should be 100%")

k8s_client = client.ApiClient()
dyn_client = DynamicClient(k8s_client)

service_resource = dyn_client.resources.get(
api_version="serving.knative.dev/v1", kind="Service"
)

service = service_resource.get(name=service_name, namespace=namespace)

traffic = [
{
"revisionName": revision_name,
"percent": percent,
}
for revision_name, percent in traffic_splits
]

if latest_revision_traffic > 0:
traffic.append(
{
"latestRevision": True,
"percent": latest_revision_traffic,
}
)

service_spec = service.to_dict()

service_spec["spec"]["traffic"] = traffic

service_resource.patch(
body=service_spec,
namespace=namespace,
name=service_name,
content_type="application/merge-patch+json",
)
36 changes: 36 additions & 0 deletions tests/cli/test_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import pytest
import typer

from paka.cli.function import process_traffic_splits, validate_traffic_split


def test_validate_traffic_split() -> None:
# Test valid input
assert validate_traffic_split("rev1=20") == ("rev1", 20)

# Test missing '='
with pytest.raises(ValueError):
validate_traffic_split("rev120")

# Test non-numeric percentage
with pytest.raises(ValueError):
validate_traffic_split("rev1=twenty")

# Test percentage out of range
with pytest.raises(ValueError):
validate_traffic_split("rev1=101")


def test_process_traffic_splits() -> None:
# Test valid input
splits, total = process_traffic_splits(["rev1=20", "rev2=30"])
assert splits == [("rev1", 20), ("rev2", 30)]
assert total == 50

# Test duplicate revisions
with pytest.raises(typer.Exit):
process_traffic_splits(["rev1=20", "rev1=30"])

# Test invalid split
with pytest.raises(ValueError):
process_traffic_splits(["rev1=20", "rev2=thirty"])

0 comments on commit 0690bc1

Please sign in to comment.