From a0c201c0d8d27a840c515d3e480e6da3cfbea37e Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Thu, 19 Dec 2024 10:59:39 -0800 Subject: [PATCH 1/6] Add inline metadata to allow for local execution --- .github/ci-scripts/job_runner.py | 2 +- benchmarking/tpch/ray_job_runner.py | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/.github/ci-scripts/job_runner.py b/.github/ci-scripts/job_runner.py index 12c949136f..35ff4d8d17 100644 --- a/.github/ci-scripts/job_runner.py +++ b/.github/ci-scripts/job_runner.py @@ -1,6 +1,6 @@ # /// script # requires-python = ">=3.12" -# dependencies = [] +# dependencies = ["ray[default]"] # /// import argparse diff --git a/benchmarking/tpch/ray_job_runner.py b/benchmarking/tpch/ray_job_runner.py index 89301cf647..dc6141ad18 100644 --- a/benchmarking/tpch/ray_job_runner.py +++ b/benchmarking/tpch/ray_job_runner.py @@ -1,3 +1,11 @@ +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "ray[default]", +# "getdaft", +# ] +# /// + from __future__ import annotations import argparse From 643068445ec74c84e7c397ba883e7341b42583f9 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Thu, 19 Dec 2024 11:16:35 -0800 Subject: [PATCH 2/6] Add a tpch wrapper script --- tools/tpch.py | 101 +++++++++++++++++++++++++++++++++++++++++++++ tools/utils.py | 108 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 209 insertions(+) create mode 100644 tools/tpch.py create mode 100644 tools/utils.py diff --git a/tools/tpch.py b/tools/tpch.py new file mode 100644 index 0000000000..ffec5a5123 --- /dev/null +++ b/tools/tpch.py @@ -0,0 +1,101 @@ +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "PyGithub", +# "boto3", +# "inquirer", +# ] +# /// + +import argparse +import json +from typing import Optional + +import github +import inquirer +import utils + +WHEEL_NAME = "getdaft-0.3.0.dev0-cp38-abi3-manylinux_2_31_x86_64.whl" + + +def run( + branch_name: str, + commit_hash: str, + questions: str, + scale_factor: int, + cluster_profile: str, + env_vars: str, +): + user_wants_to_run_tpcds_benchmarking = inquirer.confirm( + message=f"Going to run the 'run-cluster' workflow on the branch '{branch_name}' (commit-hash: {commit_hash}); proceed?" + ) + + if not user_wants_to_run_tpcds_benchmarking: + print("Workflow aborted") + exit(1) + + expanded_questions = utils.parse_questions(questions) + print(f"Running scale-factor of {scale_factor}GB on questions: {', '.join(map(str, expanded_questions))}") + args_as_list = [f"--question={q} --scale-factor={scale_factor}" for q in expanded_questions] + entrypoint_args = json.dumps(args_as_list) + + workflow = utils.repo.get_workflow("run-cluster.yaml") + utils.dispatch( + workflow=workflow, + branch_name=branch_name, + inputs={ + "cluster_profile": cluster_profile, + "working_dir": "benchmarking/tpcds", + "entrypoint_script": "ray_entrypoint.py", + "entrypoint_args": entrypoint_args, + "env_vars": env_vars, + }, + ) + + +def main( + branch_name: Optional[str], + questions: str, + scale_factor: int, + cluster_profile: str, + env_vars: str, +): + branch_name, commit_hash = utils.get_name_and_commit_hash(branch_name) + run( + branch_name=branch_name, + commit_hash=commit_hash, + questions=questions, + scale_factor=scale_factor, + cluster_profile=cluster_profile, + env_vars=env_vars, + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--ref", type=str, required=False, help="The branch name to run on") + parser.add_argument( + "--questions", type=str, required=False, default="*", help="A comma separated list of questions to run" + ) + parser.add_argument("--scale-factor", type=int, required=False, default=2, help="The scale factor to run on") + parser.add_argument("--num-partitions", type=int, required=False, default=2, help="The number of partitions") + parser.add_argument("--cluster-profile", type=str, required=False, help="The ray cluster configuration to run on") + parser.add_argument( + "--env-vars", + type=str, + required=False, + help="A comma separated list of environment variables to pass to ray job", + ) + parser.add_argument("--verbose", action="store_true", help="Verbose debugging") + args = parser.parse_args() + + if args.verbose: + github.enable_console_debug_logging() + + main( + branch_name=args.ref, + questions=args.questions, + scale_factor=args.scale_factor, + cluster_profile=args.cluster_profile, + env_vars=args.env_vars, + ) diff --git a/tools/utils.py b/tools/utils.py new file mode 100644 index 0000000000..e389fd8931 --- /dev/null +++ b/tools/utils.py @@ -0,0 +1,108 @@ +import subprocess +import time +import typing +from typing import Optional + +import gha_run_cluster_job +from github import Auth, Github +from github.Workflow import Workflow +from github.WorkflowRun import WorkflowRun + +RETRY_ATTEMPTS = 5 + +auth = Auth.Token(gha_run_cluster_job.get_oauth_token()) +g = Github(auth=auth) +repo = g.get_repo("Eventual-Inc/Daft") + + +def dispatch(workflow: Workflow, branch_name: str, inputs: dict) -> WorkflowRun: + pre_creation_latest_run = get_latest_run(workflow) + + print(f"Launching workflow '{workflow.name}' on the branch '{branch_name}'") + created = workflow.create_dispatch( + ref=branch_name, + inputs=inputs, + ) + if not created: + raise RuntimeError("Could not create workflow, suggestion: run again with --verbose") + + post_creation_latest_run = None + for _ in range(RETRY_ATTEMPTS): + post_creation_latest_run = get_latest_run(workflow) + if pre_creation_latest_run.run_number == post_creation_latest_run.run_number: + sleep_and_then_retry() + elif pre_creation_latest_run.run_number < post_creation_latest_run.run_number: + break + else: + typing.assert_never( + "Run numbers are always returned in sorted order and are always monotonically increasing" + ) + if not post_creation_latest_run: + raise RuntimeError(f"Unable to locate the new run request for the '{workflow.name}' workflow") + + print(f"Launched new '{workflow.name}' workflow with id: {post_creation_latest_run.id}") + print(f"View the workflow run at: {post_creation_latest_run.html_url}") + + return post_creation_latest_run + + +def sleep_and_then_retry(sleep_amount_sec: int = 3): + time.sleep(sleep_amount_sec) + + +def get_latest_run(workflow: Workflow) -> WorkflowRun: + for _ in range(RETRY_ATTEMPTS): + runs = workflow.get_runs() + + if runs.totalCount > 0: + return runs[0] + + sleep_and_then_retry() + + raise RuntimeError("Unable to list all workflow invocations") + + +def get_name_and_commit_hash(branch_name: Optional[str]) -> tuple[str, str]: + branch_name = branch_name or "HEAD" + name = ( + subprocess.check_output(["git", "rev-parse", "--abbrev-ref", branch_name], stderr=subprocess.STDOUT) + .strip() + .decode("utf-8") + ) + commit_hash = ( + subprocess.check_output(["git", "rev-parse", branch_name], stderr=subprocess.STDOUT).strip().decode("utf-8") + ) + return name, commit_hash + + +def parse_questions(questions: str) -> list[int]: + if not questions: + return [] + + if questions == "*": + return list(range(1, 100)) + + items = questions.split(",") + nums = [] + for item in items: + try: + num = int(item) + nums.append(str(num)) + continue + except ValueError: + ... + + if "-" not in item: + raise ValueError("...") + + try: + lower, upper = item.split("-") + lower_int = int(lower) + upper_int = int(upper) + if lower_int > upper_int: + raise ValueError + nums.extend(range(lower_int, upper_int + 1)) + except ValueError: + raise ValueError(f"Invalid question item; expected a number or a range, instead got {item}") + + return nums From 971ef5bb93f3ab0d2050ce1c8ef59410f920fd38 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Thu, 19 Dec 2024 11:37:55 -0800 Subject: [PATCH 3/6] Finish tpch script --- tools/tpch.py | 43 ++++++++++++++----------------------------- 1 file changed, 14 insertions(+), 29 deletions(-) diff --git a/tools/tpch.py b/tools/tpch.py index ffec5a5123..d6467d92ea 100644 --- a/tools/tpch.py +++ b/tools/tpch.py @@ -9,34 +9,36 @@ import argparse import json -from typing import Optional import github import inquirer import utils -WHEEL_NAME = "getdaft-0.3.0.dev0-cp38-abi3-manylinux_2_31_x86_64.whl" - def run( branch_name: str, - commit_hash: str, questions: str, scale_factor: int, + num_partitions: int, cluster_profile: str, env_vars: str, ): - user_wants_to_run_tpcds_benchmarking = inquirer.confirm( - message=f"Going to run the 'run-cluster' workflow on the branch '{branch_name}' (commit-hash: {commit_hash}); proceed?" + branch_name, commit_hash = utils.get_name_and_commit_hash(branch_name) + user_wants_to_run_tpch_benchmarking = inquirer.confirm( + message=f"Going to run TPCH benchmarking with the 'run-cluster' workflow on the branch '{branch_name}' (commit-hash: {commit_hash}); proceed?" ) - - if not user_wants_to_run_tpcds_benchmarking: + if not user_wants_to_run_tpch_benchmarking: print("Workflow aborted") exit(1) expanded_questions = utils.parse_questions(questions) - print(f"Running scale-factor of {scale_factor}GB on questions: {', '.join(map(str, expanded_questions))}") - args_as_list = [f"--question={q} --scale-factor={scale_factor}" for q in expanded_questions] + print( + f"Running scale-factor of {scale_factor}GB with {num_partitions} partitions on questions: {', '.join(map(str, expanded_questions))}" + ) + args_as_list = [ + f"--question-number={q} --parquet-folder=s3://eventual-dev-benchmarking-fixtures/uncompressed/tpch-dbgen/{scale_factor}_0/{num_partitions}/parquet/" + for q in expanded_questions + ] entrypoint_args = json.dumps(args_as_list) workflow = utils.repo.get_workflow("run-cluster.yaml") @@ -53,24 +55,6 @@ def run( ) -def main( - branch_name: Optional[str], - questions: str, - scale_factor: int, - cluster_profile: str, - env_vars: str, -): - branch_name, commit_hash = utils.get_name_and_commit_hash(branch_name) - run( - branch_name=branch_name, - commit_hash=commit_hash, - questions=questions, - scale_factor=scale_factor, - cluster_profile=cluster_profile, - env_vars=env_vars, - ) - - if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--ref", type=str, required=False, help="The branch name to run on") @@ -92,10 +76,11 @@ def main( if args.verbose: github.enable_console_debug_logging() - main( + run( branch_name=args.ref, questions=args.questions, scale_factor=args.scale_factor, + num_partitions=args.num_partitions, cluster_profile=args.cluster_profile, env_vars=args.env_vars, ) From 7e8dfe2420dfe493ece87c2d31b9c514992162c1 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Thu, 19 Dec 2024 11:52:57 -0800 Subject: [PATCH 4/6] Add upper limit checking on questions; clean up logic --- tools/tpch.py | 14 ++++---------- tools/utils.py | 12 ++++++++++-- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/tools/tpch.py b/tools/tpch.py index d6467d92ea..805fb05adc 100644 --- a/tools/tpch.py +++ b/tools/tpch.py @@ -3,7 +3,6 @@ # dependencies = [ # "PyGithub", # "boto3", -# "inquirer", # ] # /// @@ -11,9 +10,10 @@ import json import github -import inquirer import utils +TOTAL_NUMBER_OF_QUESTIONS = 22 + def run( branch_name: str, @@ -23,15 +23,9 @@ def run( cluster_profile: str, env_vars: str, ): - branch_name, commit_hash = utils.get_name_and_commit_hash(branch_name) - user_wants_to_run_tpch_benchmarking = inquirer.confirm( - message=f"Going to run TPCH benchmarking with the 'run-cluster' workflow on the branch '{branch_name}' (commit-hash: {commit_hash}); proceed?" - ) - if not user_wants_to_run_tpch_benchmarking: - print("Workflow aborted") - exit(1) + branch_name, _ = utils.get_name_and_commit_hash(branch_name) - expanded_questions = utils.parse_questions(questions) + expanded_questions = utils.parse_questions(questions, TOTAL_NUMBER_OF_QUESTIONS) print( f"Running scale-factor of {scale_factor}GB with {num_partitions} partitions on questions: {', '.join(map(str, expanded_questions))}" ) diff --git a/tools/utils.py b/tools/utils.py index e389fd8931..97953ebd47 100644 --- a/tools/utils.py +++ b/tools/utils.py @@ -75,18 +75,22 @@ def get_name_and_commit_hash(branch_name: Optional[str]) -> tuple[str, str]: return name, commit_hash -def parse_questions(questions: str) -> list[int]: +def parse_questions(questions: str, total_number_of_questions: int) -> list[int]: if not questions: return [] if questions == "*": - return list(range(1, 100)) + return list(range(1, total_number_of_questions + 1)) items = questions.split(",") nums = [] for item in items: try: num = int(item) + if num > total_number_of_questions: + raise RuntimeError( + f"Requested question number ({num}) is greater than the total number of questions available ({total_number_of_questions})" + ) nums.append(str(num)) continue except ValueError: @@ -101,6 +105,10 @@ def parse_questions(questions: str) -> list[int]: upper_int = int(upper) if lower_int > upper_int: raise ValueError + if upper_int > total_number_of_questions: + raise RuntimeError( + f"Requested question number ({upper_int}) is greater than the total number of questions available ({total_number_of_questions})" + ) nums.extend(range(lower_int, upper_int + 1)) except ValueError: raise ValueError(f"Invalid question item; expected a number or a range, instead got {item}") From 5502a2ca2f81c73bd515c1cbaf3c0cf320a48d27 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Thu, 19 Dec 2024 11:57:14 -0800 Subject: [PATCH 5/6] Add tpcds launcher as well --- tools/tpcds.py | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 tools/tpcds.py diff --git a/tools/tpcds.py b/tools/tpcds.py new file mode 100644 index 0000000000..8db38a7798 --- /dev/null +++ b/tools/tpcds.py @@ -0,0 +1,70 @@ +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "PyGithub", +# "boto3", +# ] +# /// + +import argparse +import json + +import github +import utils + + +def run( + branch_name: str, + questions: str, + scale_factor: int, + cluster_profile: str, + env_vars: str, +): + branch_name, _ = utils.get_name_and_commit_hash(branch_name) + + expanded_questions = utils.parse_questions(questions, 99) + print(f"Running scale-factor of {scale_factor}GB on questions: {', '.join(map(str, expanded_questions))}") + args_as_list = [f"--question={q} --scale-factor={scale_factor}" for q in expanded_questions] + entrypoint_args = json.dumps(args_as_list) + + workflow = utils.repo.get_workflow("run-cluster.yaml") + utils.dispatch( + workflow=workflow, + branch_name=branch_name, + inputs={ + "cluster_profile": cluster_profile, + "working_dir": "benchmarking/tpcds", + "entrypoint_script": "ray_entrypoint.py", + "entrypoint_args": entrypoint_args, + "env_vars": env_vars, + }, + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--ref", type=str, required=False, help="The branch name to run on") + parser.add_argument( + "--questions", type=str, required=False, default="*", help="A comma separated list of questions to run" + ) + parser.add_argument("--scale-factor", type=int, required=False, default=2, help="The scale factor to run on") + parser.add_argument("--cluster-profile", type=str, required=False, help="The ray cluster configuration to run on") + parser.add_argument( + "--env-vars", + type=str, + required=False, + help="A comma separated list of environment variables to pass to ray job", + ) + parser.add_argument("--verbose", action="store_true", help="Verbose debugging") + args = parser.parse_args() + + if args.verbose: + github.enable_console_debug_logging() + + run( + branch_name=args.ref, + questions=args.questions, + scale_factor=args.scale_factor, + cluster_profile=args.cluster_profile, + env_vars=args.env_vars, + ) From 2378e68b08a8174d17c80046660af7337d75b807 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Thu, 19 Dec 2024 12:05:02 -0800 Subject: [PATCH 6/6] Change the args for tpch launcher --- tools/tpch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/tpch.py b/tools/tpch.py index 805fb05adc..097f9e1f51 100644 --- a/tools/tpch.py +++ b/tools/tpch.py @@ -41,8 +41,8 @@ def run( branch_name=branch_name, inputs={ "cluster_profile": cluster_profile, - "working_dir": "benchmarking/tpcds", - "entrypoint_script": "ray_entrypoint.py", + "working_dir": "benchmarking/tpch", + "entrypoint_script": "ray_job_runner.py", "entrypoint_args": entrypoint_args, "env_vars": env_vars, },