Skip to content

Commit

Permalink
Merge branch 'main' into kevin/null-join-keys
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinzwang committed Dec 17, 2024
2 parents a5e9e4a + 5165e5e commit b9865b8
Show file tree
Hide file tree
Showing 38 changed files with 1,448 additions and 732 deletions.
114 changes: 114 additions & 0 deletions .github/ci-scripts/job_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# /// script
# requires-python = ">=3.12"
# dependencies = []
# ///

import argparse
import asyncio
import json
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional

from ray.job_submission import JobStatus, JobSubmissionClient


def parse_env_var_str(env_var_str: str) -> dict:
iter = map(
lambda s: s.strip().split("="),
filter(lambda s: s, env_var_str.split(",")),
)
return {k: v for k, v in iter}


async def print_logs(logs):
async for lines in logs:
print(lines, end="")


async def wait_on_job(logs, timeout_s):
await asyncio.wait_for(print_logs(logs), timeout=timeout_s)


@dataclass
class Result:
query: int
duration: timedelta
error_msg: Optional[str]


def submit_job(
working_dir: Path,
entrypoint_script: str,
entrypoint_args: str,
env_vars: str,
enable_ray_tracing: bool,
):
env_vars_dict = parse_env_var_str(env_vars)
if enable_ray_tracing:
env_vars_dict["DAFT_ENABLE_RAY_TRACING"] = "1"

client = JobSubmissionClient(address="http://localhost:8265")

if entrypoint_args.startswith("[") and entrypoint_args.endswith("]"):
# this is a json-encoded list of strings; parse accordingly
list_of_entrypoint_args: list[str] = json.loads(entrypoint_args)
else:
list_of_entrypoint_args: list[str] = [entrypoint_args]

results = []

for index, args in enumerate(list_of_entrypoint_args):
entrypoint = f"DAFT_RUNNER=ray python {entrypoint_script} {args}"
print(f"{entrypoint=}")
start = datetime.now()
job_id = client.submit_job(
entrypoint=entrypoint,
runtime_env={
"working_dir": working_dir,
"env_vars": env_vars_dict,
},
)

asyncio.run(wait_on_job(client.tail_job_logs(job_id), timeout_s=60 * 30))

status = client.get_job_status(job_id)
assert status.is_terminal(), "Job should have terminated"
end = datetime.now()
duration = end - start
error_msg = None
if status != JobStatus.SUCCEEDED:
job_info = client.get_job_info(job_id)
error_msg = job_info.message

result = Result(query=index, duration=duration, error_msg=error_msg)
results.append(result)

print(f"{results=}")


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--working-dir", type=Path, required=True)
parser.add_argument("--entrypoint-script", type=str, required=True)
parser.add_argument("--entrypoint-args", type=str, required=True)
parser.add_argument("--env-vars", type=str, required=True)
parser.add_argument("--enable-ray-tracing", action="store_true")

args = parser.parse_args()

if not (args.working_dir.exists() and args.working_dir.is_dir()):
raise ValueError("The working-dir must exist and be a directory")

entrypoint: Path = args.working_dir / args.entrypoint_script
if not (entrypoint.exists() and entrypoint.is_file()):
raise ValueError("The entrypoint script must exist and be a file")

submit_job(
working_dir=args.working_dir,
entrypoint_script=args.entrypoint_script,
entrypoint_args=args.entrypoint_args,
env_vars=args.env_vars,
enable_ray_tracing=args.enable_ray_tracing,
)
2 changes: 2 additions & 0 deletions .github/ci-scripts/templatize_ray_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,7 @@ class Metadata(BaseModel, extra="allow"):
if metadata:
metadata = Metadata(**metadata)
content = content.replace(OTHER_INSTALL_PLACEHOLDER, " ".join(metadata.dependencies))
else:
content = content.replace(OTHER_INSTALL_PLACEHOLDER, "")

print(content)
36 changes: 14 additions & 22 deletions .github/workflows/run-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ on:
type: string
required: true
entrypoint_args:
description: Entry-point arguments
description: Entry-point arguments (either a simple string or a JSON list)
type: string
required: false
default: ""
Expand Down Expand Up @@ -79,24 +79,15 @@ jobs:
uv run \
--python 3.12 \
.github/ci-scripts/templatize_ray_config.py \
--cluster-name "ray-ci-run-${{ github.run_id }}_${{ github.run_attempt }}" \
--daft-wheel-url '${{ inputs.daft_wheel_url }}' \
--daft-version '${{ inputs.daft_version }}' \
--python-version '${{ inputs.python_version }}' \
--cluster-profile '${{ inputs.cluster_profile }}' \
--working-dir '${{ inputs.working_dir }}' \
--entrypoint-script '${{ inputs.entrypoint_script }}'
--cluster-name="ray-ci-run-${{ github.run_id }}_${{ github.run_attempt }}" \
--daft-wheel-url='${{ inputs.daft_wheel_url }}' \
--daft-version='${{ inputs.daft_version }}' \
--python-version='${{ inputs.python_version }}' \
--cluster-profile='${{ inputs.cluster_profile }}' \
--working-dir='${{ inputs.working_dir }}' \
--entrypoint-script='${{ inputs.entrypoint_script }}'
) >> .github/assets/ray.yaml
cat .github/assets/ray.yaml
- name: Setup ray env vars
run: |
source .venv/bin/activate
ray_env_var=$(python .github/ci-scripts/format_env_vars.py \
--env-vars '${{ inputs.env_vars }}' \
--enable-ray-tracing \
)
echo $ray_env_var
echo "ray_env_var=$ray_env_var" >> $GITHUB_ENV
- name: Download private ssh key
run: |
KEY=$(aws secretsmanager get-secret-value --secret-id ci-github-actions-ray-cluster-key-3 --query SecretString --output text)
Expand All @@ -117,11 +108,12 @@ jobs:
echo 'Invalid command submitted; command cannot be empty'
exit 1
fi
ray job submit \
--working-dir ${{ inputs.working_dir }} \
--address http://localhost:8265 \
--runtime-env-json "$ray_env_var" \
-- python ${{ inputs.entrypoint_script }} ${{ inputs.entrypoint_args }}
python .github/ci-scripts/job_runner.py \
--working-dir='${{ inputs.working_dir }}' \
--entrypoint-script='${{ inputs.entrypoint_script }}' \
--entrypoint-args='${{ inputs.entrypoint_args }}' \
--env-vars='${{ inputs.env_vars }}' \
--enable-ray-tracing
- name: Download log files from ray cluster
run: |
source .venv/bin/activate
Expand Down
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ common-scan-info = {path = "src/common/scan-info", default-features = false}
common-system-info = {path = "src/common/system-info", default-features = false}
common-tracing = {path = "src/common/tracing", default-features = false}
common-version = {path = "src/common/version", default-features = false}
daft-algebra = {path = "src/daft-algebra", default-features = false}
daft-catalog = {path = "src/daft-catalog", default-features = false}
daft-catalog-python-catalog = {path = "src/daft-catalog/python-catalog", optional = true}
daft-compression = {path = "src/daft-compression", default-features = false}
Expand Down Expand Up @@ -149,6 +150,7 @@ members = [
"src/common/scan-info",
"src/common/system-info",
"src/common/treenode",
"src/daft-algebra",
"src/daft-catalog",
"src/daft-core",
"src/daft-csv",
Expand Down Expand Up @@ -204,6 +206,7 @@ daft-logical-plan = {path = "src/daft-logical-plan"}
daft-micropartition = {path = "src/daft-micropartition"}
daft-scan = {path = "src/daft-scan"}
daft-schema = {path = "src/daft-schema"}
daft-sql = {path = "src/daft-sql"}
daft-table = {path = "src/daft-table"}
derivative = "2.2.0"
derive_builder = "0.20.2"
Expand Down
64 changes: 50 additions & 14 deletions benchmarking/tpcds/ray_entrypoint.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,54 @@
import argparse
from pathlib import Path

import helpers

import daft
from daft.sql.sql import SQLCatalog

TABLE_NAMES = [
"call_center",
"catalog_page",
"catalog_returns",
"catalog_sales",
"customer",
"customer_address",
"customer_demographics",
"date_dim",
"household_demographics",
"income_band",
"inventory",
"item",
"promotion",
"reason",
"ship_mode",
"store",
"store_returns",
"store_sales",
"time_dim",
"warehouse",
"web_page",
"web_returns",
"web_sales",
"web_site",
]


def register_catalog(scale_factor: int) -> SQLCatalog:
return SQLCatalog(
tables={
table: daft.read_parquet(
f"s3://eventual-dev-benchmarking-fixtures/uncompressed/tpcds-dbgen/{scale_factor}/{table}.parquet"
)
for table in TABLE_NAMES
}
)


def run(
parquet_folder: Path,
question: int,
dry_run: bool,
scale_factor: int,
):
catalog = helpers.generate_catalog(parquet_folder)
catalog = register_catalog(scale_factor)
query_file = Path(__file__).parent / "queries" / f"{question:02}.sql"
with open(query_file) as f:
query = f.read()
Expand All @@ -23,27 +60,26 @@ def run(

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--tpcds-gen-folder",
required=True,
type=Path,
help="Path to the TPC-DS data generation folder",
)
parser.add_argument(
"--question",
required=True,
type=int,
help="The TPC-DS question index to run",
required=True,
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Whether or not to run the query in dry-run mode; if true, only the plan will be printed out",
)
parser.add_argument(
"--scale-factor",
type=int,
help="Which scale factor to run this data at",
required=False,
default=2,
)
args = parser.parse_args()

tpcds_gen_folder: Path = args.tpcds_gen_folder
assert tpcds_gen_folder.exists()
assert args.question in range(1, 100)

run(args.tpcds_gen_folder, args.question, args.dry_run)
run(question=args.question, dry_run=args.dry_run, scale_factor=args.scale_factor)
8 changes: 7 additions & 1 deletion daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ def set_execution_config(
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int | None = None,
partial_aggregation_threshold: int | None = None,
high_cardinality_aggregation_threshold: float | None = None,
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
Expand Down Expand Up @@ -384,7 +386,9 @@ def set_execution_config(
parquet_inflation_factor: Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0
csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB
csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5
shuffle_aggregation_default_partitions: Maximum number of partitions to create when performing aggregations. Defaults to 200, unless the number of input partitions is less than 200.
shuffle_aggregation_default_partitions: Maximum number of partitions to create when performing aggregations on the Ray Runner. Defaults to 200, unless the number of input partitions is less than 200.
partial_aggregation_threshold: Threshold for performing partial aggregations on the Native Runner. Defaults to 10000 rows.
high_cardinality_aggregation_threshold: Threshold selectivity for performing high cardinality aggregations on the Native Runner. Defaults to 0.8.
read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB
enable_aqe: Enables Adaptive Query Execution, Defaults to False
enable_native_executor: Enables the native executor, Defaults to False
Expand Down Expand Up @@ -413,6 +417,8 @@ def set_execution_config(
csv_target_filesize=csv_target_filesize,
csv_inflation_factor=csv_inflation_factor,
shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions,
partial_aggregation_threshold=partial_aggregation_threshold,
high_cardinality_aggregation_threshold=high_cardinality_aggregation_threshold,
read_sql_partition_size_bytes=read_sql_partition_size_bytes,
enable_aqe=enable_aqe,
enable_native_executor=enable_native_executor,
Expand Down
Loading

0 comments on commit b9865b8

Please sign in to comment.