Skip to content

Commit

Permalink
tc
Browse files Browse the repository at this point in the history
  • Loading branch information
clee2000 committed Jan 22, 2025
1 parent 2b17a1f commit 3d18cc9
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 4 deletions.
2 changes: 1 addition & 1 deletion tools/torchci/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def query_clickhouse_saved(queryName: str, inputParams: Dict[str, Any]) -> Any:
with open(path / "query.sql") as f:
queryText = f.read()
with open(path / "params.json") as f:
paramsText = json.load(f)
paramsText = json.load(f).get("params", {})

queryParams = {name: inputParams[name] for name in paramsText}
return query_clickhouse(queryText, queryParams)
Expand Down
185 changes: 185 additions & 0 deletions tools/torchci/clickhouse_query_perf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
"""
Check query results and performance. Note that query performance is not stable
and can vary significantly between runs.
"""

import argparse
import json
import subprocess
import time
from datetime import datetime, timedelta, timezone
from functools import cache
from typing import Optional

from prettytable import PrettyTable
from torchci.clickhouse import get_clickhouse_client, query_clickhouse
from torchci.utils import REPO_ROOT
from tqdm import tqdm


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Queue alert for torchci")
parser.add_argument("--query", type=str, help="Query name", required=True)
parser.add_argument(
"--perf", action="store_true", help="Run performance comparison"
)
parser.add_argument("--results", action="store_true", help="Run results comparison")
parser.add_argument(
"--times",
type=int,
help="Number of times to run the query. Only relevant if --perf is used",
default=10,
)
parser.add_argument(
"--compare",
type=str,
help="Either a sha or a branch name to compare against. These should be available locally. Required for --results",
)
args = parser.parse_args()
return args


def get_query_id(query: str, params: dict) -> Optional[str]:
try:
res = get_clickhouse_client().query(query, params)
return res.query_id
except Exception as e:
print(f"Error: {e}")
return None


@cache
def get_base_query(query: str, sha: str) -> str:
return subprocess.check_output(
["git", "show", f"{sha}:torchci/clickhouse_queries/{query}/query.sql"]
).decode("utf-8")


EXECUTION_METRICS = """
SELECT
round(avg(query_duration_ms)) AS realTimeMSAvg,
avg(memory_usage) as memoryBytesAvg
FROM
clusterAllReplicas(default, system.query_log)
where
has({query_ids: Array(String)}, query_id)
and type = 'QueryFinish'
"""


def get_avg_stats(query_ids: list) -> tuple:
metrics = query_clickhouse(EXECUTION_METRICS, {"query_ids": query_ids})
return metrics[0]["realTimeMSAvg"], metrics[0]["memoryBytesAvg"]


def get_query_ids(query: str, params: dict, times: int) -> tuple:
return [
x for _ in tqdm(range(times)) if (x := get_query_id(query, params)) is not None
]


def format_comparision_string(new: float, old: float) -> str:
return f"{new} vs {old} ({new - old}, {round(100 * (new - old) / old)}%)"


@cache
def get_query(query: str) -> tuple:
with open(
REPO_ROOT / "torchci" / "clickhouse_queries" / query / "params.json"
) as f:
tests = json.load(f).get("tests", [])
with open(REPO_ROOT / "torchci" / "clickhouse_queries" / query / "query.sql") as f:
query = f.read()
for test in tests:
for key, value in test.items():
if isinstance(value, dict):
# special syntax for time values
test[key] = (
datetime.now(timezone.utc) + timedelta(days=value["from_now"])
).strftime("%Y-%m-%d %H:%M:%S")
return query, tests


def perf_compare(args: argparse.Namespace) -> None:
query, tests = get_query(args.query)

print(
f"Gathering perf stats for: {args.query}\nNum tests: {len(tests)}\nNum times: {args.times}"
)

query_ids = []
for i, test in enumerate(tests):
new = get_query_ids(query, test, args.times)

base = None
if args.compare:
base_query = get_base_query(args.query, args.compare)
base = get_query_ids(base_query, test, args.times)
query_ids.append((new, base))

# Split up the query execution and the stats collection because the stats
# table needs time to populate. Also sleep for 10 seconds to the table more
# time to populate
time.sleep(20)
table = PrettyTable()
if args.compare:
table.field_names = [
"Test",
"Avg Time",
"Base Time",
"Time Change",
"% Time Change",
"Avg Mem",
"Base Mem",
"Mem Change",
"% Mem Change",
]
else:
table.field_names = ["Test", "Avg Time", "Avg Mem"]
for i, (new, base) in enumerate(query_ids):
avg_time, avg_bytes = get_avg_stats(new)
if base:
old_avg_time, old_avg_bytes = get_avg_stats(base)
table.add_row(
[
i,
avg_time,
old_avg_time,
avg_time - old_avg_time,
round(100 * (avg_time - old_avg_time) / old_avg_time),
avg_bytes,
old_avg_bytes,
avg_bytes - old_avg_bytes,
round(100 * (avg_bytes - old_avg_bytes) / old_avg_bytes),
]
)
else:
table.add_row([i, avg_time, avg_bytes])
print(table)


def results_compare(args: argparse.Namespace) -> None:
query, tests = get_query(args.query)
if not args.compare:
return
base_query = get_base_query(args.query, args.compare)
print(
f"Comparing results for query: {args.query}\nNum tests: {len(tests)}\nBase: {args.compare}"
)
for i, test in enumerate(tests):
new_results = query_clickhouse(query, test)
base_results = query_clickhouse(base_query, test)
if new_results != base_results:
print(f"Results for test {i} differ")
print(f"Test: {json.dumps(test, indent=2)}")
print(f"New: {new_results}")
print(f"Base: {base_results}")
print()


if __name__ == "__main__":
args = parse_args()
if args.perf:
perf_compare(args)
if args.results:
results_compare(args)
24 changes: 22 additions & 2 deletions torchci/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ and HUD sends the entire query text to ClickHouse in the same way Rockset did
for queries not defined using a query lambda.

Each query should have a folder in `clickhouse_queries/` with two files: one
containing the query and the other containing a json dictionary mapping
parameters to their types.
containing the query and the other containing a json dictionary with a
dictionary `params`, mapping parameters to their types, and a list `tests` of
sample values for the query.

To edit the query, only these files need to be changed. The change will be
reflected immediately in your local development and in the Vercel preview when
Expand All @@ -96,6 +97,25 @@ query back into the file.
To get access to ClickHouse Cloud's console, please see
[here](https://github.com/pytorch/test-infra/wiki/Querying-ClickHouse-database-for-fun-and-profit#prerequisites).

### `params.json`
An example `params.json` file with params and tests:
```
{
"params": {
"A": "DateTime64(3)"
},
"tests": [
{"A": "2024-01-01 00:00:00.000"},
{"A": "2024-01-07 00:00:00.000"},
{"A": "2025-01-01 00:00:00.000"},
{"A": {"from_now": 0}}
]
}
```
A test can set a parameter to be a dictionary with the field `from_now` to get a
dynamic timestamp where the entry is the difference from now in days. For
example `from_now: 0` is now and `from_now: -7` would be 7 days in the past.

## Alerts

Code is in `test-infra/tools/torchci/check_alerts.py`. It queries HUD, filters out pending jobs, and then checks to see if there are 2 consecutive
Expand Down
5 changes: 4 additions & 1 deletion torchci/lib/clickhouse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ export async function queryClickhouseSaved(
`${process.cwd()}/clickhouse_queries/${queryName}/query.sql`,
"utf8"
);
const paramsText = require(`clickhouse_queries/${queryName}/params.json`);
let paramsText = require(`clickhouse_queries/${queryName}/params.json`).params;
if (paramsText === undefined) {
paramsText = {};
}

const queryParams = new Map(
Object.entries(paramsText).map(([key, _]) => [key, inputParams[key]])
Expand Down

0 comments on commit 3d18cc9

Please sign in to comment.