Skip to content

Commit

Permalink
Improve in-test tpc benchs (#9124)
Browse files Browse the repository at this point in the history
  • Loading branch information
vladl2802 authored Sep 12, 2024
1 parent 5209b75 commit 1f633c9
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 24 deletions.
21 changes: 15 additions & 6 deletions ydb/library/benchmarks/runner/run_tests/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def parse_args():
subparser.add_argument('--datasize', type=int, default=1)
subparser.add_argument('--variant', type=variant, default='h')
subparser.add_argument('--tasks', type=int, default=1)
subparser.add_argument('--perf', action="store_true", default=False)

subparser.add_argument('-o', '--output', default="./results")
subparser.add_argument('--clean-old', action="store_true", default=False)
Expand All @@ -49,17 +50,25 @@ def parse_args():

parser.add_argument('--ydb-root', type=lambda path: pathlib.Path(path).resolve(), default="../../../../")

args = parser.parse_args(argv, namespace=args)
args, argv = parser.parse_known_args(argv, namespace=args)

args.dqrun = args.ydb_root / "ydb" / "library" / "yql" / "tools" / "dqrun" / "dqrun"
args.gen_queries = args.ydb_root / "ydb" / "library" / "benchmarks" / "gen_queries" / "gen_queries"
args.downloaders_dir = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner"
args.fs_cfg = args.ydb_root / "ydb" / "library" / "yql" / "tools" / "dqrun" / "examples" / "fs.conf"
args.flame_graph = args.ydb_root / "contrib" / "tools" / "flame-graph"
args.result_compare = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "result_compare" / "result_compare"
args.gateways_cfg = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "runner" / "test-gateways.conf"
args.runner_path = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "runner" / "runner"

def_dqrun = args.ydb_root / "ydb" / "library" / "yql" / "tools" / "dqrun" / "dqrun"
def_fs_cfg = args.ydb_root / "ydb" / "library" / "yql" / "tools" / "dqrun" / "examples" / "fs.conf"
def_gateways_cfg = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "runner" / "test-gateways.conf"

override_parser = argparse.ArgumentParser()
override_parser.add_argument('--dqrun', type=pathlib.Path, default=def_dqrun)
override_parser.add_argument('--fs-cfg', type=pathlib.Path, default=def_fs_cfg)
override_parser.add_argument('--gateways-cfg', type=pathlib.Path, default=def_gateways_cfg)

args = override_parser.parse_args(argv, namespace=args)

udfs_prefix = args.ydb_root / "ydb" / "library" / "yql" / "udfs" / "common"
args.udfs_dir = [udfs_prefix / name for name in ["set", "url_base", "datetime2", "re2", "math", "unicode_base"]]

Expand Down Expand Up @@ -102,7 +111,7 @@ def __init__(self, args, enable_spilling):
"dq.EnableSpillingNodes=All",
] if self.enable_spilling else [])
self.tpc_dir = pathlib.Path(f"{self.args.downloaders_dir}/tpc/{self.args.variant}/{self.args.datasize}")
if self.args.clean_old or not self.tpc_dir.exists():
if not self.tpc_dir.exists():
self.prepare_tpc_dir()
if not pathlib.Path("./tpc").exists():
os.symlink(f"{self.args.downloaders_dir}/tpc", f"{pathlib.Path("./tpc")}", target_is_directory=True)
Expand All @@ -112,7 +121,7 @@ def __init__(self, args, enable_spilling):

def run(self):
cmd = ["/usr/bin/time", f"{str(self.args.runner_path)}"]
# cmd += ["--perf"]
cmd += ["--perf"] if self.args.perf else []
for it in self.args.query_filter:
cmd += ["--include-q", it]
cmd += ["--query-dir", f"{str(self.queries_dir)}/{self.args.variant}"]
Expand Down
13 changes: 9 additions & 4 deletions ydb/library/benchmarks/runner/tpc_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,28 @@ def wrapped_run(self, variant, datasize, tasks, query_filter):
yatest.common.execute(cmd, stdout=sys.stdout, stderr=sys.stderr)


def upload(result_path, s3_folder):
def upload(result_path, s3_folder, try_num):
uploader = pathlib.Path(yatest.common.source_path("ydb/library/benchmarks/runner/upload_results.py")).resolve()
cmd = ["python3", str(uploader)]
cmd += ["--result-path", str(result_path)]
cmd += ["--s3-folder", str(s3_folder)]
cmd += ["--try-num", str(try_num)] if try_num else []
yatest.common.execute(cmd, stdout=sys.stdout, stderr=sys.stderr)


def test_tpc():
is_ci = os.environ.get("PUBLIC_DIR") is not None
is_ci = os.environ.get("CURRENT_PUBLIC_DIR") is not None

runner = Runner()
runner.wrapped_run("h", 1, 1, None)
result_path = runner.results_path.resolve()
print("Results path: ", result_path, file=sys.stderr)

if is_ci:
s3_folder = pathlib.Path(os.environ["PUBLIC_DIR"]).resolve()
s3_folder = pathlib.Path(os.environ["CURRENT_PUBLIC_DIR"]).resolve()
try:
try_num = int(s3_folder.name.split("try_")[-1])
except Exception:
try_num = None

upload(result_path, s3_folder)
upload(result_path, s3_folder, try_num)
54 changes: 40 additions & 14 deletions ydb/library/benchmarks/runner/upload_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ def __init__(self):
self.user_time_ms = None
self.system_time = None
self.rss = None
self.output_hash = None
self.result_hash = None
self.stdout_file_path = None
self.stderr_file_path = None
self.perf_file_path = None

def from_json(self, json):
Expand Down Expand Up @@ -64,6 +66,8 @@ def pretty_print(value):
return f"Unwrap(DateTime::FromSeconds({int(delt.total_seconds())}))"
if type(value) == datetime.timedelta:
return f"DateTime::IntervalFromMicroseconds({int(value / datetime.timedelta(microseconds=1))})"
if isinstance(value, pathlib.Path):
return f'\"{value}\"'
if type(value) == str:
return f'\"{value}\"'
if type(value) in [int, float]:
Expand All @@ -74,7 +78,21 @@ def pretty_print(value):
assert False, f"unrecognized type: {type(value)}"


def upload_results(result_path, s3_folder, test_start):
def upload_file_to_s3(s3_folder, result_path, file):
# copying files to folder that will be synced with s3
dst = file.relative_to(result_path)
s3_file = (s3_folder / dst).resolve()
s3_file.parent.mkdir(parents=True, exist_ok=True)
_ = shutil.copy2(str(file.resolve()), str(s3_file))
return dst


def upload_results(result_path, s3_folder, test_start, try_num):
def add_try_num_to_path(path):
if try_num:
path = f"try_{try_num}" / path
return path

results_map = {}
for entry in result_path.glob("*/*"):
if not entry.is_dir():
Expand All @@ -98,17 +116,22 @@ def upload_results(result_path, s3_folder, test_start):
if query_num not in this_result:
this_result[query_num] = RunResults()

# q<num>.svg
if file.suffix == ".svg":
dst = file.relative_to(result_path)
this_result[query_num].perf_file_path = dst
# copying files to folder that will be synced with s3
dst = (s3_folder / dst).resolve()
dst.parent.mkdir(parents=True, exist_ok=True)
_ = shutil.copy2(str(file.resolve()), str(dst))
this_result[query_num].perf_file_path = add_try_num_to_path(upload_file_to_s3(s3_folder, result_path, file))

# q<num>-result.yson
if file.stem == f"q{query_num}-result":
with open(file, "r") as result:
this_result[query_num].result_hash = str(hash(result.read().strip()))

# q<num>-stdout.txt
if file.stem == f"q{query_num}-stdout":
with open(file, "r") as stdout:
this_result[query_num].output_hash = str(hash(stdout.read().strip()))
this_result[query_num].stdout_file_path = add_try_num_to_path(upload_file_to_s3(s3_folder, result_path, file))

# q<num>-stderr.txt
if file.stem == f"q{query_num}-stderr":
this_result[query_num].stderr_file_path = add_try_num_to_path(upload_file_to_s3(s3_folder, result_path, file))

summary_file = entry / "summary.json"

Expand Down Expand Up @@ -144,12 +167,14 @@ def upload_results(result_path, s3_folder, test_start):
"WasSpillingInJoin" : None,
"WasSpillingInChannels" : None,
"MaxTasksPerStage" : params.tasks,
"PerfFileLink" : results.perf_file_path,
"ExitCode" : results.exitcode,
"ResultHash" : results.output_hash,
"ResultHash" : results.result_hash,
"SpilledBytes" : results.read_bytes,
"UserTime" : results.user_time,
"SystemTime" : results.system_time
"SystemTime" : results.system_time,
"StdoutFileLink" : results.stdout_file_path,
"StderrFileLink" : results.stderr_file_path,
"PerfFileLink" : results.perf_file_path
}
sql = 'UPSERT INTO `perfomance/olap/dq_spilling_nightly_runs`\n\t({columns})\nVALUES\n\t({values})'.format(
columns=", ".join(map(str, mapping.keys())),
Expand All @@ -164,14 +189,15 @@ def main():

parser.add_argument("--result-path", type=pathlib.Path)
parser.add_argument("--s3-folder", type=pathlib.Path)
parser.add_argument("--try-num", default=None)

args = parser.parse_args()

if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
raise AttributeError("Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping uploading")
os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ["CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"]

upload_results(args.result_path, args.s3_folder, upload_time)
upload_results(args.result_path, args.s3_folder, upload_time, args.try_num)


if __name__ == "__main__":
Expand Down

0 comments on commit 1f633c9

Please sign in to comment.