Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAG API Enhancements: Introducing Downstream Task Parsing and Explicit Flow Definition #4067

Merged
merged 53 commits into from
Oct 21, 2024

Conversation

andylizf
Copy link
Collaborator

@andylizf andylizf commented Oct 10, 2024

Resolve #4054

This PR introduces the dependencies feature to SkyPilot, enabling users to specify task dependencies in their YAML configurations. This enhancement allows for more complex workflow definitions and ensures proper execution order of interdependent tasks.

Key changes:

  • Extended YAML parsing to support dependencies section
  • Implemented task graph construction based on dependencies
  • Provide much powerful interface for graph construction
  • Updated YAML generation to include dependency information

Tested (run the relevant ones):

  • Code formatting: bash format.sh
  • Any manual or new tests for this PR (please specify below)
  • All smoke tests: pytest tests/test_smoke.py
  • Relevant individual smoke tests: pytest tests/test_smoke.py::test_fill_in_the_name
  • Backward compatibility tests: conda deactivate; bash -i tests/backward_compatibility_tests.sh

@andylizf andylizf marked this pull request as ready for review October 11, 2024 07:34
@andylizf
Copy link
Collaborator Author

@cblmemo Hi, could you kindly review this PR when you have a moment? Thanks!

Copy link
Collaborator

@cblmemo cblmemo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this feature @andylizf ! This would be really helpful for specifying complex workflows. Left some comments. It mostly looks good to me ;)

I created a branch for our dag project and lets merge all features to this branch first. We might want to do some final checks before we merge this to our master branch and this advanced-dag branch can serve as a buffer :))

@andylizf andylizf changed the title Add depends_on feature for task dependency management Add dependencies feature for task dependency management Oct 14, 2024
@cblmemo
Copy link
Collaborator

cblmemo commented Oct 14, 2024

Also, as mentioned here, lets change the target branch to merge to advanced-dag.

@andylizf andylizf changed the base branch from master to advanced-dag October 14, 2024 22:25
Michaelvll and others added 3 commits October 14, 2024 22:48
skypilot-org#4084)

* fix narrow window and show log path during exception

* format

* format
* refactor

* lint

* refactor, dataclass

* refactor, dataclass

* refactor

* lint
Copy link
Collaborator

@cblmemo cblmemo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andylizf for the awesome work! Left some discussion here. I tested and the basic functionality works fine!

Comment on lines 105 to 107
if multi_tasks:
raise ValueError('Multiple task definitions without a valid'
'header.')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets default to use chain yaml for backward compatibility?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it also a chain yaml only if there is a name section?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your initial point now. We can default to chain YAML without header, add a generate_dag_name function, and use the YAML filename as the default name. Sound good?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly sounds good to me. for the name of the dag, lets keep align with current master branch?

configs = common_utils.read_yaml_all(path)
dag_name = None
if set(configs[0].keys()) == {'name'}:
dag_name = configs[0]['name']
configs = configs[1:]
elif len(configs) == 1:
dag_name = configs[0].get('name')
if len(configs) == 0:
# YAML has only `name: xxx`. Still instantiate a task.
configs = [{'name': dag_name}]

Copy link
Collaborator Author

@andylizf andylizf Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the current master's behavior is that if the header is missing and it is a multitasking pipeline, then dag_name will remain None. Is this the expected behavior?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allowing dag.name to be None creates a problematic cycle. If a user loads a DAG without a name, dag.name becomes None. When dumping this DAG to YAML, it results in name: null. Later, when the controller loads this YAML, it parses null back to None. This leads to an AssertionError in _get_dag_and_name function, which assumes dag.name is not None. This cycle of None -> null -> None causes unexpected runtime errors in the existing codebase and should be addressed at once to ensure system stability.

def _get_dag_and_name(dag_yaml: str) -> Tuple['sky.Dag', str]:
dag = dag_utils.load_chain_dag_from_yaml(dag_yaml)
dag_name = dag.name
assert dag_name is not None, dag
return dag, dag_name

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls check this

dag_utils.maybe_infer_and_fill_dag_and_task_names(dag)

andylizf and others added 6 commits October 15, 2024 12:20
* [Performance] Use new custom image to create GCP GPU VMs

* update image tags for both CPU and GPU

* always generate .sky/python_path

---------

Co-authored-by: Yika Luo <yikaluo@Yikas-MacBook-Pro.local>
* Add H100 mega support on GCP

* fix for some other regions

* format

* fix resource type

* fix catalog fetching
andylizf and others added 6 commits October 20, 2024 15:05
Co-authored-by: Tian Xia <cblmemo@gmail.com>
…or output (skypilot-org#4111)

* add `ux_utils.print_exception_no_traceback()` for cleaner error output

* Empty commit

* remove unnecessary with block
…ceback()` wrappers (skypilot-org#4130)

fix unnecessary with block for returning
…amed"

Otherwise, users can not refer to the task by name in the DAG.

This reverts commit 8486352.
@andylizf andylizf changed the base branch from advanced-dag to master October 20, 2024 23:56
romilbhardwaj and others added 3 commits October 21, 2024 01:26
* Bug fix for sky config file path resolution.

* format

* [OCI] Bug fix for image_id in Task YAML

* [OCI]: Support more OS types (esp. oraclelinux) in addition to ubuntu.

* format

* Disable system firewall

* Bug fix for validation of the Marketplace images

* Update sky/clouds/oci.py

Co-authored-by: Zhanghao Wu <zhanghao.wu@outlook.com>

* Update sky/clouds/oci.py

Co-authored-by: Zhanghao Wu <zhanghao.wu@outlook.com>

* variable/function naming

* address review comments: not to change the service_catalog api. call oci_catalog directly for get os type for a image.

* Update sky/clouds/oci.py

Co-authored-by: Zhanghao Wu <zhanghao.wu@outlook.com>

* Update sky/clouds/oci.py

Co-authored-by: Zhanghao Wu <zhanghao.wu@outlook.com>

* Update sky/clouds/oci.py

Co-authored-by: Zhanghao Wu <zhanghao.wu@outlook.com>

* address review comments

---------

Co-authored-by: Zhanghao Wu <zhanghao.wu@outlook.com>
@cblmemo
Copy link
Collaborator

cblmemo commented Oct 21, 2024

@andylizf Why did we change the base branch back?

Copy link
Collaborator

@cblmemo cblmemo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andylizf ! Final nits. Please update the base branch and it should be ready to go!

@andylizf
Copy link
Collaborator Author

@andylizf Why did we change the base branch back?

Because only the master branch has GitHub Actions. 😂I'll switch it back now.

@andylizf andylizf changed the base branch from master to advanced-dag October 21, 2024 18:49
@cblmemo
Copy link
Collaborator

cblmemo commented Oct 21, 2024

@andylizf Why did we change the base branch back?

Because only the master branch has GitHub Actions. 😂I'll switch it back now.

Actually, considering this, should we create a branch on your repo and run those actions?

@andylizf
Copy link
Collaborator Author

@andylizf Why did we change the base branch back?

Because only the master branch has GitHub Actions. 😂I'll switch it back now.

Actually, considering this, should we create a branch on your repo and run those actions?

Good idea. You mean creating a branch like advanced-dag in my repo?

@cblmemo
Copy link
Collaborator

cblmemo commented Oct 21, 2024

@andylizf Why did we change the base branch back?

Because only the master branch has GitHub Actions. 😂I'll switch it back now.

Actually, considering this, should we create a branch on your repo and run those actions?

Good idea. You mean creating a branch like advanced-dag in my repo?

yes, and deploy any necessary actions in the branch.

@andylizf andylizf changed the title Add dependencies feature for task dependency management DAG API Enhancements: Introducing Downstream Task Parsing and Explicit Flow Definition Oct 21, 2024
@andylizf
Copy link
Collaborator Author

@andylizf Why did we change the base branch back?

Because only the master branch has GitHub Actions. 😂I'll switch it back now.

Actually, considering this, should we create a branch on your repo and run those actions?

Good idea. You mean creating a branch like advanced-dag in my repo?

yes, and deploy any necessary actions in the branch.

But what reminds me is that GitHub handles actions in a bit of a weird way. Even if I add the GitHub Actions config directly to my PR, they can still run as expected, just like they're running now. So there's no need to create a PR in my repo. I can simply update the GitHub Actions in this PR to run when merging into advanced-dag. Once the PR detects it's targeting advanced-dag, the actions will trigger automatically.

@cblmemo
Copy link
Collaborator

cblmemo commented Oct 21, 2024

@andylizf Why did we change the base branch back?

Because only the master branch has GitHub Actions. 😂I'll switch it back now.

Actually, considering this, should we create a branch on your repo and run those actions?

Good idea. You mean creating a branch like advanced-dag in my repo?

yes, and deploy any necessary actions in the branch.

But what reminds me is that GitHub handles actions in a bit of a weird way. Even if I add the GitHub Actions config directly to my PR, they can still run as expected, just like they're running now. So there's no need to create a PR in my repo. I can simply update the GitHub Actions in this PR to run when merging into advanced-dag. Once the PR detects it's targeting advanced-dag, the actions will trigger automatically.

Sounds great. Lets fix the error in the CI and it should be ready to go ;)

@cblmemo cblmemo merged commit 7d93b75 into skypilot-org:advanced-dag Oct 21, 2024
18 checks passed
cblmemo added a commit that referenced this pull request Nov 9, 2024
* provide an example, edited from pipeline.yml

* more focus on dependencies for user dag lib

* more powerful user interface

* load and dump new yaml format

* fix

* fix: reversed logic in add_edge

* rename

* refactor due to reviewer's comments

* generate task.name if not given

* add comments for add_edge

* add `print_exception_no_traceback` when raise

* make `Dag.tasks` a property

* print dependencies for `__repr__`

* move `get_unique_task_name` to common_utils

* rename methods to use downstream/edge terminology

* Add dependencies feature for task dependency management (#4067)

provide an example, edited from pipeline.yml

more focus on dependencies for user dag lib

more powerful user interface

load and dump new yaml format

fix

fix: reversed logic in add_edge

rename

refactor due to reviewer's comments

generate task.name if not given

add comments for add_edge

add `print_exception_no_traceback` when raise

make `Dag.tasks` a property

print dependencies for `__repr__`

move `get_unique_task_name` to common_utils

rename methods to use downstream/edge terminology

* fix(jobs): type errors

* refactor: `_update_failed_task_state` for unified error handling

* refactor: separate finally block for a meaningful name

* feat: simple parallel execution support

* Apply suggestions from code review

Co-authored-by: Tian Xia <cblmemo@gmail.com>

* change wording all to up/downstream style

* Add unique suffix to task names, fallback to timestamp if unnamed

* Unify handling of single and multiple tasks without dependencies

* Refactor tasks initialization: use list comprehension and fail fast

* Fix remove task dependency description: upstream, not downstream

Co-authored-by: Tian Xia <cblmemo@gmail.com>

* Remove duplicated `self.edges`, use nx api instead

* Revert "Add unique suffix to task names, fallback to timestamp if unnamed"

Otherwise, users can not refer to the task by name in the DAG.

This reverts commit 8486352.

* comment the checking used as upstream logic

* remove is_chain restriction in jobs launch

* Change Static layered parallelism to dynamic queue parallelism

* fix: add blocked_tasks set

* refactor and update canceled tasks in database

* format: some types are unsubscriptable

* add some logging

* Fooled again by the silly finally block, mistakenly thought it only runs on errors!

* feat: redirect logging for each thread to a separate file to prevent interleaving output

* fix due to reviwer's suggestions and some nits

* chore: remove some debugging info

* partially revert "update canceled tasks in database", view a task as a whole

* add some comments to inform the future of thread-level redirector

* cancell all tasks when a task failed

* combine 3 sets to 1 dict

* add some comments to illustrate `_try_add_successors_to_queue` only queue each node once

* Cancel all non-running tasks when cancelling a job. Left a TODO for future
cancellation policy discussion.

* make those logging files and dir

* refactor: stream_logs_by_id

* refactor and format

* provide a cli to print run.log of a certain subtask

* format

* add some comments and checks

* clearer log

* hint users to see the log

* add some comments explaining why skip convetional code path

* add with exception_no_traceback

* fix os.makedirs, add expandusr first

* fix: diamond named diamond

* fix: use managed_job_id as dirname instead of runtimestamp

* fix: make strategy_executor local to prevent race conditions

* feat: implement stream logs for a task command

* fix: forgot to add parentheses around the addition

* chore: log indent

* fix: early checking and better comments

* refactor: reuse `stream_logs_by_id` for 2nd part run log tailing

* fix: deal with tasks already finished

* refactor: a generalized `follow_logs` and implement output checking on its basis

* format

* add return type annotations

Co-authored-by: Tian Xia <cblmemo@gmail.com>

* add all annotations and rename `task_queue` to `ready_queue`

* Apply suggestions from code review

Co-authored-by: Tian Xia <cblmemo@gmail.com>

* revert one-liner conditional for better readibility

* use tuple unpacking instead of `[1]`

Co-authored-by: Tian Xia <cblmemo@gmail.com>

* Apply suggestions from code review

Co-authored-by: Tian Xia <cblmemo@gmail.com>

* chore: fix more type annotations

* refactor: clearer responsibility split between `_ThreadAwareOutput` and `RedirectOutputForThread`

* feat: ensures `open` returns a `TextIO`

* fix: unbounded `is_dag_chain`

* fix: typing

* revert type subscription, stuck by pylint

* fix: use default false `cancel_all` argument to avoid breaking existing calls

* fix: add explicit write/flush methods to _ThreadAwareOutput for logging redirection

---------

Co-authored-by: Tian Xia <cblmemo@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Jobs] API Design for DAG execution
8 participants