-
Notifications
You must be signed in to change notification settings - Fork 589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DAG API Enhancements: Introducing Downstream Task Parsing and Explicit Flow Definition #4067
Conversation
@cblmemo Hi, could you kindly review this PR when you have a moment? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this feature @andylizf ! This would be really helpful for specifying complex workflows. Left some comments. It mostly looks good to me ;)
I created a branch for our dag project and lets merge all features to this branch first. We might want to do some final checks before we merge this to our master branch and this advanced-dag
branch can serve as a buffer :))
unroll load balancer docs
depends_on
feature for task dependency managementdependencies
feature for task dependency management
* observability docs * comments
Also, as mentioned here, lets change the target branch to merge to |
skypilot-org#4084) * fix narrow window and show log path during exception * format * format
* refactor * lint * refactor, dataclass * refactor, dataclass * refactor * lint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @andylizf for the awesome work! Left some discussion here. I tested and the basic functionality works fine!
sky/utils/dag_utils.py
Outdated
if multi_tasks: | ||
raise ValueError('Multiple task definitions without a valid' | ||
'header.') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets default to use chain yaml for backward compatibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it also a chain yaml only if there is a name section?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your initial point now. We can default to chain YAML without header, add a generate_dag_name
function, and use the YAML filename as the default name. Sound good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly sounds good to me. for the name of the dag, lets keep align with current master branch?
skypilot/sky/utils/dag_utils.py
Lines 84 to 94 in 3e98afe
configs = common_utils.read_yaml_all(path) | |
dag_name = None | |
if set(configs[0].keys()) == {'name'}: | |
dag_name = configs[0]['name'] | |
configs = configs[1:] | |
elif len(configs) == 1: | |
dag_name = configs[0].get('name') | |
if len(configs) == 0: | |
# YAML has only `name: xxx`. Still instantiate a task. | |
configs = [{'name': dag_name}] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, the current master's behavior is that if the header is missing and it is a multitasking pipeline, then dag_name
will remain None
. Is this the expected behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allowing dag.name
to be None
creates a problematic cycle. If a user loads a DAG without a name, dag.name
becomes None
. When dumping this DAG to YAML, it results in name: null
. Later, when the controller loads this YAML, it parses null
back to None
. This leads to an AssertionError
in _get_dag_and_name
function, which assumes dag.name
is not None
. This cycle of None -> null -> None causes unexpected runtime errors in the existing codebase and should be addressed at once to ensure system stability.
skypilot/sky/jobs/controller.py
Lines 39 to 43 in 71a95f4
def _get_dag_and_name(dag_yaml: str) -> Tuple['sky.Dag', str]: | |
dag = dag_utils.load_chain_dag_from_yaml(dag_yaml) | |
dag_name = dag.name | |
assert dag_name is not None, dag | |
return dag, dag_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pls check this
Line 67 in c6ae536
dag_utils.maybe_infer_and_fill_dag_and_task_names(dag) |
* [Performance] Use new custom image to create GCP GPU VMs * update image tags for both CPU and GPU * always generate .sky/python_path --------- Co-authored-by: Yika Luo <yikaluo@Yikas-MacBook-Pro.local>
* Add H100 mega support on GCP * fix for some other regions * format * fix resource type * fix catalog fetching
Co-authored-by: Tian Xia <cblmemo@gmail.com>
…or output (skypilot-org#4111) * add `ux_utils.print_exception_no_traceback()` for cleaner error output * Empty commit * remove unnecessary with block
…ceback()` wrappers (skypilot-org#4130) fix unnecessary with block for returning
…amed" Otherwise, users can not refer to the task by name in the DAG. This reverts commit 8486352.
deepspeed kubernetes fixes
* Bug fix for sky config file path resolution. * format * [OCI] Bug fix for image_id in Task YAML * [OCI]: Support more OS types (esp. oraclelinux) in addition to ubuntu. * format * Disable system firewall * Bug fix for validation of the Marketplace images * Update sky/clouds/oci.py Co-authored-by: Zhanghao Wu <zhanghao.wu@outlook.com> * Update sky/clouds/oci.py Co-authored-by: Zhanghao Wu <zhanghao.wu@outlook.com> * variable/function naming * address review comments: not to change the service_catalog api. call oci_catalog directly for get os type for a image. * Update sky/clouds/oci.py Co-authored-by: Zhanghao Wu <zhanghao.wu@outlook.com> * Update sky/clouds/oci.py Co-authored-by: Zhanghao Wu <zhanghao.wu@outlook.com> * Update sky/clouds/oci.py Co-authored-by: Zhanghao Wu <zhanghao.wu@outlook.com> * address review comments --------- Co-authored-by: Zhanghao Wu <zhanghao.wu@outlook.com>
@andylizf Why did we change the base branch back? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @andylizf ! Final nits. Please update the base branch and it should be ready to go!
Because only the master branch has GitHub Actions. 😂I'll switch it back now. |
Co-authored-by: Tian Xia <cblmemo@gmail.com>
Actually, considering this, should we create a branch on your repo and run those actions? |
Good idea. You mean creating a branch like |
yes, and deploy any necessary actions in the branch. |
dependencies
feature for task dependency management
But what reminds me is that GitHub handles actions in a bit of a weird way. Even if I add the GitHub Actions config directly to my PR, they can still run as expected, just like they're running now. So there's no need to create a PR in my repo. I can simply update the GitHub Actions in this PR to run when merging into advanced-dag. Once the PR detects it's targeting advanced-dag, the actions will trigger automatically. |
Sounds great. Lets fix the error in the CI and it should be ready to go ;) |
* provide an example, edited from pipeline.yml * more focus on dependencies for user dag lib * more powerful user interface * load and dump new yaml format * fix * fix: reversed logic in add_edge * rename * refactor due to reviewer's comments * generate task.name if not given * add comments for add_edge * add `print_exception_no_traceback` when raise * make `Dag.tasks` a property * print dependencies for `__repr__` * move `get_unique_task_name` to common_utils * rename methods to use downstream/edge terminology * Add dependencies feature for task dependency management (#4067) provide an example, edited from pipeline.yml more focus on dependencies for user dag lib more powerful user interface load and dump new yaml format fix fix: reversed logic in add_edge rename refactor due to reviewer's comments generate task.name if not given add comments for add_edge add `print_exception_no_traceback` when raise make `Dag.tasks` a property print dependencies for `__repr__` move `get_unique_task_name` to common_utils rename methods to use downstream/edge terminology * fix(jobs): type errors * refactor: `_update_failed_task_state` for unified error handling * refactor: separate finally block for a meaningful name * feat: simple parallel execution support * Apply suggestions from code review Co-authored-by: Tian Xia <cblmemo@gmail.com> * change wording all to up/downstream style * Add unique suffix to task names, fallback to timestamp if unnamed * Unify handling of single and multiple tasks without dependencies * Refactor tasks initialization: use list comprehension and fail fast * Fix remove task dependency description: upstream, not downstream Co-authored-by: Tian Xia <cblmemo@gmail.com> * Remove duplicated `self.edges`, use nx api instead * Revert "Add unique suffix to task names, fallback to timestamp if unnamed" Otherwise, users can not refer to the task by name in the DAG. This reverts commit 8486352. * comment the checking used as upstream logic * remove is_chain restriction in jobs launch * Change Static layered parallelism to dynamic queue parallelism * fix: add blocked_tasks set * refactor and update canceled tasks in database * format: some types are unsubscriptable * add some logging * Fooled again by the silly finally block, mistakenly thought it only runs on errors! * feat: redirect logging for each thread to a separate file to prevent interleaving output * fix due to reviwer's suggestions and some nits * chore: remove some debugging info * partially revert "update canceled tasks in database", view a task as a whole * add some comments to inform the future of thread-level redirector * cancell all tasks when a task failed * combine 3 sets to 1 dict * add some comments to illustrate `_try_add_successors_to_queue` only queue each node once * Cancel all non-running tasks when cancelling a job. Left a TODO for future cancellation policy discussion. * make those logging files and dir * refactor: stream_logs_by_id * refactor and format * provide a cli to print run.log of a certain subtask * format * add some comments and checks * clearer log * hint users to see the log * add some comments explaining why skip convetional code path * add with exception_no_traceback * fix os.makedirs, add expandusr first * fix: diamond named diamond * fix: use managed_job_id as dirname instead of runtimestamp * fix: make strategy_executor local to prevent race conditions * feat: implement stream logs for a task command * fix: forgot to add parentheses around the addition * chore: log indent * fix: early checking and better comments * refactor: reuse `stream_logs_by_id` for 2nd part run log tailing * fix: deal with tasks already finished * refactor: a generalized `follow_logs` and implement output checking on its basis * format * add return type annotations Co-authored-by: Tian Xia <cblmemo@gmail.com> * add all annotations and rename `task_queue` to `ready_queue` * Apply suggestions from code review Co-authored-by: Tian Xia <cblmemo@gmail.com> * revert one-liner conditional for better readibility * use tuple unpacking instead of `[1]` Co-authored-by: Tian Xia <cblmemo@gmail.com> * Apply suggestions from code review Co-authored-by: Tian Xia <cblmemo@gmail.com> * chore: fix more type annotations * refactor: clearer responsibility split between `_ThreadAwareOutput` and `RedirectOutputForThread` * feat: ensures `open` returns a `TextIO` * fix: unbounded `is_dag_chain` * fix: typing * revert type subscription, stuck by pylint * fix: use default false `cancel_all` argument to avoid breaking existing calls * fix: add explicit write/flush methods to _ThreadAwareOutput for logging redirection --------- Co-authored-by: Tian Xia <cblmemo@gmail.com>
Resolve #4054
This PR introduces the
dependencies
feature to SkyPilot, enabling users to specify task dependencies in their YAML configurations. This enhancement allows for more complex workflow definitions and ensures proper execution order of interdependent tasks.Key changes:
dependencies
sectionTested (run the relevant ones):
bash format.sh
pytest tests/test_smoke.py
pytest tests/test_smoke.py::test_fill_in_the_name
conda deactivate; bash -i tests/backward_compatibility_tests.sh