diff --git a/benchmarking/tpch/ray_job_runner.py b/benchmarking/tpch/ray_job_runner.py index 89301cf647..dc6141ad18 100644 --- a/benchmarking/tpch/ray_job_runner.py +++ b/benchmarking/tpch/ray_job_runner.py @@ -1,3 +1,11 @@ +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "ray[default]", +# "getdaft", +# ] +# /// + from __future__ import annotations import argparse diff --git a/tools/tpcds.py b/tools/tpcds.py new file mode 100644 index 0000000000..8db38a7798 --- /dev/null +++ b/tools/tpcds.py @@ -0,0 +1,70 @@ +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "PyGithub", +# "boto3", +# ] +# /// + +import argparse +import json + +import github +import utils + + +def run( + branch_name: str, + questions: str, + scale_factor: int, + cluster_profile: str, + env_vars: str, +): + branch_name, _ = utils.get_name_and_commit_hash(branch_name) + + expanded_questions = utils.parse_questions(questions, 99) + print(f"Running scale-factor of {scale_factor}GB on questions: {', '.join(map(str, expanded_questions))}") + args_as_list = [f"--question={q} --scale-factor={scale_factor}" for q in expanded_questions] + entrypoint_args = json.dumps(args_as_list) + + workflow = utils.repo.get_workflow("run-cluster.yaml") + utils.dispatch( + workflow=workflow, + branch_name=branch_name, + inputs={ + "cluster_profile": cluster_profile, + "working_dir": "benchmarking/tpcds", + "entrypoint_script": "ray_entrypoint.py", + "entrypoint_args": entrypoint_args, + "env_vars": env_vars, + }, + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--ref", type=str, required=False, help="The branch name to run on") + parser.add_argument( + "--questions", type=str, required=False, default="*", help="A comma separated list of questions to run" + ) + parser.add_argument("--scale-factor", type=int, required=False, default=2, help="The scale factor to run on") + parser.add_argument("--cluster-profile", type=str, required=False, help="The ray cluster configuration to run on") + parser.add_argument( + "--env-vars", + type=str, + required=False, + help="A comma separated list of environment variables to pass to ray job", + ) + parser.add_argument("--verbose", action="store_true", help="Verbose debugging") + args = parser.parse_args() + + if args.verbose: + github.enable_console_debug_logging() + + run( + branch_name=args.ref, + questions=args.questions, + scale_factor=args.scale_factor, + cluster_profile=args.cluster_profile, + env_vars=args.env_vars, + ) diff --git a/tools/tpch.py b/tools/tpch.py new file mode 100644 index 0000000000..097f9e1f51 --- /dev/null +++ b/tools/tpch.py @@ -0,0 +1,80 @@ +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "PyGithub", +# "boto3", +# ] +# /// + +import argparse +import json + +import github +import utils + +TOTAL_NUMBER_OF_QUESTIONS = 22 + + +def run( + branch_name: str, + questions: str, + scale_factor: int, + num_partitions: int, + cluster_profile: str, + env_vars: str, +): + branch_name, _ = utils.get_name_and_commit_hash(branch_name) + + expanded_questions = utils.parse_questions(questions, TOTAL_NUMBER_OF_QUESTIONS) + print( + f"Running scale-factor of {scale_factor}GB with {num_partitions} partitions on questions: {', '.join(map(str, expanded_questions))}" + ) + args_as_list = [ + f"--question-number={q} --parquet-folder=s3://eventual-dev-benchmarking-fixtures/uncompressed/tpch-dbgen/{scale_factor}_0/{num_partitions}/parquet/" + for q in expanded_questions + ] + entrypoint_args = json.dumps(args_as_list) + + workflow = utils.repo.get_workflow("run-cluster.yaml") + utils.dispatch( + workflow=workflow, + branch_name=branch_name, + inputs={ + "cluster_profile": cluster_profile, + "working_dir": "benchmarking/tpch", + "entrypoint_script": "ray_job_runner.py", + "entrypoint_args": entrypoint_args, + "env_vars": env_vars, + }, + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--ref", type=str, required=False, help="The branch name to run on") + parser.add_argument( + "--questions", type=str, required=False, default="*", help="A comma separated list of questions to run" + ) + parser.add_argument("--scale-factor", type=int, required=False, default=2, help="The scale factor to run on") + parser.add_argument("--num-partitions", type=int, required=False, default=2, help="The number of partitions") + parser.add_argument("--cluster-profile", type=str, required=False, help="The ray cluster configuration to run on") + parser.add_argument( + "--env-vars", + type=str, + required=False, + help="A comma separated list of environment variables to pass to ray job", + ) + parser.add_argument("--verbose", action="store_true", help="Verbose debugging") + args = parser.parse_args() + + if args.verbose: + github.enable_console_debug_logging() + + run( + branch_name=args.ref, + questions=args.questions, + scale_factor=args.scale_factor, + num_partitions=args.num_partitions, + cluster_profile=args.cluster_profile, + env_vars=args.env_vars, + ) diff --git a/tools/utils.py b/tools/utils.py new file mode 100644 index 0000000000..97953ebd47 --- /dev/null +++ b/tools/utils.py @@ -0,0 +1,116 @@ +import subprocess +import time +import typing +from typing import Optional + +import gha_run_cluster_job +from github import Auth, Github +from github.Workflow import Workflow +from github.WorkflowRun import WorkflowRun + +RETRY_ATTEMPTS = 5 + +auth = Auth.Token(gha_run_cluster_job.get_oauth_token()) +g = Github(auth=auth) +repo = g.get_repo("Eventual-Inc/Daft") + + +def dispatch(workflow: Workflow, branch_name: str, inputs: dict) -> WorkflowRun: + pre_creation_latest_run = get_latest_run(workflow) + + print(f"Launching workflow '{workflow.name}' on the branch '{branch_name}'") + created = workflow.create_dispatch( + ref=branch_name, + inputs=inputs, + ) + if not created: + raise RuntimeError("Could not create workflow, suggestion: run again with --verbose") + + post_creation_latest_run = None + for _ in range(RETRY_ATTEMPTS): + post_creation_latest_run = get_latest_run(workflow) + if pre_creation_latest_run.run_number == post_creation_latest_run.run_number: + sleep_and_then_retry() + elif pre_creation_latest_run.run_number < post_creation_latest_run.run_number: + break + else: + typing.assert_never( + "Run numbers are always returned in sorted order and are always monotonically increasing" + ) + if not post_creation_latest_run: + raise RuntimeError(f"Unable to locate the new run request for the '{workflow.name}' workflow") + + print(f"Launched new '{workflow.name}' workflow with id: {post_creation_latest_run.id}") + print(f"View the workflow run at: {post_creation_latest_run.html_url}") + + return post_creation_latest_run + + +def sleep_and_then_retry(sleep_amount_sec: int = 3): + time.sleep(sleep_amount_sec) + + +def get_latest_run(workflow: Workflow) -> WorkflowRun: + for _ in range(RETRY_ATTEMPTS): + runs = workflow.get_runs() + + if runs.totalCount > 0: + return runs[0] + + sleep_and_then_retry() + + raise RuntimeError("Unable to list all workflow invocations") + + +def get_name_and_commit_hash(branch_name: Optional[str]) -> tuple[str, str]: + branch_name = branch_name or "HEAD" + name = ( + subprocess.check_output(["git", "rev-parse", "--abbrev-ref", branch_name], stderr=subprocess.STDOUT) + .strip() + .decode("utf-8") + ) + commit_hash = ( + subprocess.check_output(["git", "rev-parse", branch_name], stderr=subprocess.STDOUT).strip().decode("utf-8") + ) + return name, commit_hash + + +def parse_questions(questions: str, total_number_of_questions: int) -> list[int]: + if not questions: + return [] + + if questions == "*": + return list(range(1, total_number_of_questions + 1)) + + items = questions.split(",") + nums = [] + for item in items: + try: + num = int(item) + if num > total_number_of_questions: + raise RuntimeError( + f"Requested question number ({num}) is greater than the total number of questions available ({total_number_of_questions})" + ) + nums.append(str(num)) + continue + except ValueError: + ... + + if "-" not in item: + raise ValueError("...") + + try: + lower, upper = item.split("-") + lower_int = int(lower) + upper_int = int(upper) + if lower_int > upper_int: + raise ValueError + if upper_int > total_number_of_questions: + raise RuntimeError( + f"Requested question number ({upper_int}) is greater than the total number of questions available ({total_number_of_questions})" + ) + nums.extend(range(lower_int, upper_int + 1)) + except ValueError: + raise ValueError(f"Invalid question item; expected a number or a range, instead got {item}") + + return nums