Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAG API Enhancements: Introducing Downstream Task Parsing and Explicit Flow Definition #4067

Merged
merged 53 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
7c0965a
provide an example, edited from pipeline.yml
andylizf Oct 10, 2024
6949ac1
more focus on dependencies for user dag lib
andylizf Oct 10, 2024
55db40b
more powerful user interface
andylizf Oct 11, 2024
db7ff9f
load and dump new yaml format
andylizf Oct 11, 2024
054cc26
fix
andylizf Oct 11, 2024
24ef94e
fix: reversed logic in add_edge
andylizf Oct 11, 2024
1ff843f
[docs] Unroll k8s internal load balancer docs (#4083)
romilbhardwaj Oct 14, 2024
129bdbf
rename
andylizf Oct 14, 2024
12ec5a4
refactor due to reviewer's comments
andylizf Oct 14, 2024
9497a3e
generate task.name if not given
andylizf Oct 14, 2024
a0243e5
[docs] `sky status --kubernetes` docs (#4064)
romilbhardwaj Oct 14, 2024
9243113
[UX] Show log after failure and fix the color issue with narrow windo…
Michaelvll Oct 14, 2024
a4e2fcd
[k8s] `sky status --k8s` refactor (#4079)
romilbhardwaj Oct 15, 2024
ff528a5
add comments for add_edge
andylizf Oct 15, 2024
04c6f9d
add `print_exception_no_traceback` when raise
andylizf Oct 15, 2024
4985813
make `Dag.tasks` a property
andylizf Oct 15, 2024
48a2826
print dependencies for `__repr__`
andylizf Oct 15, 2024
78d826d
move `get_unique_task_name` to common_utils
andylizf Oct 15, 2024
53380e2
[Performance] Use new GCP custom images (#4027)
yika-luo Oct 15, 2024
724e97e
[GCP] Add H100 mega (#4099)
Michaelvll Oct 17, 2024
3e98afe
[GCP] Add gVNIC support (#4095)
romilbhardwaj Oct 17, 2024
c2e12af
[Lambda] Lambda Cloud SkyPilot provisioner (#3865)
kmushegi Oct 17, 2024
93e2b56
[Docs] GKE Nvidia Driver installation instructions update (#4106)
romilbhardwaj Oct 17, 2024
5dc70e8
[Performance] Use new AWS custom images (#4091)
yika-luo Oct 17, 2024
e88acc1
rename methods to use downstream/edge terminology
andylizf Oct 17, 2024
92fd109
[Performance] Add Packer image generation scripts for GCP and AWS (#4…
yika-luo Oct 18, 2024
3042a27
Disable AWS images.csv refreshing (#4116)
yika-luo Oct 18, 2024
a34ccb7
[Docs] .skyignore doc (#4114)
yika-luo Oct 18, 2024
71a95f4
[Core] Raise error for none existing cluster when endpoint is called …
Michaelvll Oct 18, 2024
7971aa2
Refresh local aws images.csv when image not found (#4127)
yika-luo Oct 18, 2024
9201def
[Docs] News revamps. (#4126)
concretevitamin Oct 19, 2024
c6ae536
[Serve] Support manually terminating a replica and with purge option …
andylizf Oct 19, 2024
63e96f4
[Provisioner] Support docker in Lambda Cloud and TPU (#4115)
cblmemo Oct 20, 2024
e4fbb28
Apply suggestions from code review
andylizf Oct 20, 2024
a27969b
change wording all to up/downstream style
andylizf Oct 20, 2024
8486352
Add unique suffix to task names, fallback to timestamp if unnamed
andylizf Oct 20, 2024
c14980e
Unify handling of single and multiple tasks without dependencies
andylizf Oct 20, 2024
66fc864
Refactor tasks initialization: use list comprehension and fail fast
andylizf Oct 20, 2024
65d0bdd
Fix remove task dependency description: upstream, not downstream
andylizf Oct 20, 2024
28b6482
Remove duplicated `self.edges`, use nx api instead
andylizf Oct 20, 2024
03c2adb
[Serve] Add `ux_utils.print_exception_no_traceback()` for cleaner err…
andylizf Oct 20, 2024
985df83
Partially revert: Remove unnecessary `ux_utils.print_exception_no_tra…
andylizf Oct 20, 2024
1792ba6
Revert "Add unique suffix to task names, fallback to timestamp if unn…
andylizf Oct 20, 2024
f600d16
comment the checking used as upstream logic
andylizf Oct 20, 2024
067a0a3
[examples] Deepspeed fixes + k8s support (#4124)
romilbhardwaj Oct 21, 2024
635c564
Empty commit
andylizf Oct 21, 2024
3c3bcee
[OCI] Support more OS types in addition to ubuntu (#4080)
HysunHe Oct 21, 2024
b63eb09
Apply suggestions from code review
andylizf Oct 21, 2024
95b0936
fix: typing.cast
andylizf Oct 21, 2024
32a7cc6
add TODOs for future function migration
andylizf Oct 21, 2024
b7a8948
remove dependencies wording to reduce ambiguity
andylizf Oct 21, 2024
686bba0
temporarily add github actions
andylizf Oct 21, 2024
8992f23
Merge remote-tracking branch 'upstream/master' into job-dag
andylizf Oct 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 28 additions & 26 deletions sky/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,12 @@ def add(self, task: 'task.Task') -> None:
ValueError: If the task already exists in the DAG or if its name
is already used.
"""
if task.name is None:
task.name = common_utils.get_unique_task_name(task)
task.name = common_utils.get_unique_task_name(task)
if task.name in self._task_name_lookup:
with ux_utils.print_exception_no_traceback():
raise ValueError(
f'Task {task.name!r} already exists in the DAG.'
f' Or the task name is already used by another task.')
f'Task {task.name!r} already exists in the DAG, '
f'or the task name is already used by another task.')
self.graph.add_node(task)
self._task_name_lookup[task.name] = task

Expand All @@ -91,22 +90,24 @@ def remove(self, task: Union['task.Task', str]) -> None:
task: The Task object or name of the task to remove.

Raises:
ValueError: If the task is still being depended on by other tasks.
ValueError: If the task is still being used as a downstream task by
other tasks.
"""
task = self._get_task(task)

# TODO(andy): Stuck by optimizer's wrong way to remove dummy sources
# and sink nodes.
# if dependents:
# dependent_names = ', '.join([dep.name for dep in dependents])
# with ux_utils.print_exception_no_traceback():
# raise ValueError(f'Task {task.name} is still being depended '
# f'by tasks {dependent_names!r}. Try to '
# 'remove the dependencies first.')
downstreams = self.get_downstream(task)
if downstreams:
downstream_names = ', '.join(
cast(str, downstream.name) for downstream in downstreams)
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Task {task.name} is still being used as a '
f'downstream task by {downstream_names!r}. '
'Try to remove the downstream tasks first.')
# Here's a workaround, proactively remove all downstream edges.
dependents = self.get_downstream(task)
for dependent in dependents:
self.remove_edge(task, dependent)
for downstream in downstreams:
self.remove_edge(task, downstream)

self.edges.pop(task, None)
self.graph.remove_node(task)
Expand All @@ -117,19 +118,20 @@ def add_edge(self, source: TaskOrName, target: TaskOrName) -> None:
"""Add an edge from source task to target task.

Args:
source: The task that the target task depends on.
target: The task that depends on the source task.
source: The upstream task.
target: The downstream task to be added.

Raises:
ValueError: If a task tries to depend on itself or if the
target task is not in the DAG.
ValueError: If a task is set as its own downstream task or if the
downstream task is not in the DAG.
"""
source = self._get_task(source)
target = self._get_task(target)

if source.name == target.name:
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Task {source.name} cannot depend on itself.')
raise ValueError(f'Task {source.name} should not be its own '
'downstream task.')
assert target.name is not None
if target.name not in self._task_name_lookup:
with ux_utils.print_exception_no_traceback():
Expand All @@ -145,8 +147,8 @@ def add_downstream(self, source: TaskOrName, target: TaskOrName) -> None:
"""Add downstream tasks for a source task.

Args:
source: The task that the downstream tasks depend on.
target: The task(s) to add as downstream tasks.
source: The upstream task.
target: The downstream task to be added.
"""
return self.add_edge(source, target)

Expand All @@ -157,8 +159,8 @@ def set_downstream(self, source: TaskOrName,
This replaces any existing downstream tasks for the given source.

Args:
source: The task that the downstream tasks depend on.
targets: The task(s) that depend on the source task.
source: The upstream task.
targets: The downstream task(s) to be added.
"""
source = self._get_task(source)
if not isinstance(targets, list):
Expand All @@ -171,8 +173,8 @@ def remove_edge(self, source: TaskOrName, target: TaskOrName) -> None:
"""Remove an edge between two tasks.

Args:
source: The source task to remove the edge from.
target: The target task to remove the edge to.
source: The upstream task.
target: The downstream task to remove the edge to.
"""
source = self._get_task(source)
target = self._get_task(target)
Expand Down Expand Up @@ -200,7 +202,7 @@ def get_downstream(self, task: TaskOrName) -> Set['task.Task']:
task: The task to get downstream tasks for.

Returns:
A set of tasks that depend on the given task.
A set of downstream tasks.
"""
task = self._get_task(task)
return self.edges.get(task, set())
Expand Down
1 change: 0 additions & 1 deletion sky/serve/serve_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
if typing.TYPE_CHECKING:
import fastapi

from sky import task as task_lib
from sky.serve import replica_managers

SKY_SERVE_CONTROLLER_NAME: str = (
Expand Down
2 changes: 1 addition & 1 deletion sky/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ def __rshift__(self, b):
if current_dag is None:
with ux_utils.print_exception_no_traceback():
raise ValueError('No current DAG context found. '
'Use `with sky.dag() as dag: ...` '
'Use `with sky.Dag() as dag: ...` '
'to define a DAG.')
current_dag.add_edge(self, b)

Expand Down
7 changes: 3 additions & 4 deletions sky/utils/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,10 @@ def get_global_job_id(job_timestamp: str,
return global_job_id


def get_unique_task_name(_: 'task_lib.Task') -> str:
timestamp = int(time.time())
def get_unique_task_name(task: 'task_lib.Task') -> str:
name = task.name or f'task_{int(time.time())}'
unique_suffix = uuid.uuid4().hex[:6]
name = f'task_{timestamp}_{unique_suffix}'
return name
return f'{name}_{unique_suffix}'


class Backoff:
Expand Down
38 changes: 15 additions & 23 deletions sky/utils/dag_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ def load_dag_from_yaml(
"""Loads a DAG from a YAML file.

Supports various formats:
1. Single task without separators
2. Multiple tasks with separators (linear dependency by default)
3. DAG with explicit 'dependencies' field
1. Tasks without explicit dependencies:
- Single task
- Multiple tasks separated, with implicit linear dependency
2. DAG with explicit 'dependencies' field

Has special handling for an initial section in YAML that contains only the
'name' field, which is the DAG name.
Expand Down Expand Up @@ -102,29 +103,24 @@ def load_dag_from_yaml(
dag.name = header.get('name')
downstream = header.get('downstream', {})
else:
multi_tasks = len(configs) > 1
if multi_tasks:
with ux_utils.print_exception_no_traceback():
raise ValueError('Multiple task definitions without a valid'
'header.')
# Single task without header
dag.name = configs[0].get('name')
downstream = {}

# Handle YAML with only 'name'
if not configs:
configs = [{'name': dag.name}]

tasks = []

# Create tasks
for config in configs:
if not isinstance(config, dict):
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Invalid task configuration: {config}')
tasks = [
task_lib.Task.from_yaml_config(config, env_overrides)
for config in configs
]

if not tasks:
with ux_utils.print_exception_no_traceback():
raise ValueError('No tasks defined in the YAML file')

task = task_lib.Task.from_yaml_config(config, env_overrides)
tasks.append(task)
for task in tasks:
dag.add(task)

# Handle dependencies
Expand All @@ -134,15 +130,11 @@ def load_dag_from_yaml(
downs = [downs] if not isinstance(downs, list) else downs
for down in downs:
dag.add_edge(upstream, down)
elif len(tasks) > 1:
# Implicit linear dependency
else:
# Implicit dependency
for i in range(len(tasks) - 1):
dag.add_edge(tasks[i], tasks[i + 1])

if not tasks:
with ux_utils.print_exception_no_traceback():
raise ValueError('No tasks defined in the YAML file')

return dag


Expand Down