diff --git a/ydb/library/benchmarks/runner/run_tests/run_tests.py b/ydb/library/benchmarks/runner/run_tests/run_tests.py index 9e6a8a8837e6..704d27369577 100644 --- a/ydb/library/benchmarks/runner/run_tests/run_tests.py +++ b/ydb/library/benchmarks/runner/run_tests/run_tests.py @@ -23,6 +23,7 @@ def parse_args(): subparser.add_argument('--datasize', type=int, default=1) subparser.add_argument('--variant', type=variant, default='h') subparser.add_argument('--tasks', type=int, default=1) + subparser.add_argument('--perf', action="store_true", default=False) subparser.add_argument('-o', '--output', default="./results") subparser.add_argument('--clean-old', action="store_true", default=False) @@ -49,17 +50,25 @@ def parse_args(): parser.add_argument('--ydb-root', type=lambda path: pathlib.Path(path).resolve(), default="../../../../") - args = parser.parse_args(argv, namespace=args) + args, argv = parser.parse_known_args(argv, namespace=args) - args.dqrun = args.ydb_root / "ydb" / "library" / "yql" / "tools" / "dqrun" / "dqrun" args.gen_queries = args.ydb_root / "ydb" / "library" / "benchmarks" / "gen_queries" / "gen_queries" args.downloaders_dir = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" - args.fs_cfg = args.ydb_root / "ydb" / "library" / "yql" / "tools" / "dqrun" / "examples" / "fs.conf" args.flame_graph = args.ydb_root / "contrib" / "tools" / "flame-graph" args.result_compare = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "result_compare" / "result_compare" - args.gateways_cfg = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "runner" / "test-gateways.conf" args.runner_path = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "runner" / "runner" + def_dqrun = args.ydb_root / "ydb" / "library" / "yql" / "tools" / "dqrun" / "dqrun" + def_fs_cfg = args.ydb_root / "ydb" / "library" / "yql" / "tools" / "dqrun" / "examples" / "fs.conf" + def_gateways_cfg = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "runner" / "test-gateways.conf" + + override_parser = argparse.ArgumentParser() + override_parser.add_argument('--dqrun', type=pathlib.Path, default=def_dqrun) + override_parser.add_argument('--fs-cfg', type=pathlib.Path, default=def_fs_cfg) + override_parser.add_argument('--gateways-cfg', type=pathlib.Path, default=def_gateways_cfg) + + args = override_parser.parse_args(argv, namespace=args) + udfs_prefix = args.ydb_root / "ydb" / "library" / "yql" / "udfs" / "common" args.udfs_dir = [udfs_prefix / name for name in ["set", "url_base", "datetime2", "re2", "math", "unicode_base"]] @@ -102,7 +111,7 @@ def __init__(self, args, enable_spilling): "dq.EnableSpillingNodes=All", ] if self.enable_spilling else []) self.tpc_dir = pathlib.Path(f"{self.args.downloaders_dir}/tpc/{self.args.variant}/{self.args.datasize}") - if self.args.clean_old or not self.tpc_dir.exists(): + if not self.tpc_dir.exists(): self.prepare_tpc_dir() if not pathlib.Path("./tpc").exists(): os.symlink(f"{self.args.downloaders_dir}/tpc", f"{pathlib.Path("./tpc")}", target_is_directory=True) @@ -112,7 +121,7 @@ def __init__(self, args, enable_spilling): def run(self): cmd = ["/usr/bin/time", f"{str(self.args.runner_path)}"] - # cmd += ["--perf"] + cmd += ["--perf"] if self.args.perf else [] for it in self.args.query_filter: cmd += ["--include-q", it] cmd += ["--query-dir", f"{str(self.queries_dir)}/{self.args.variant}"] diff --git a/ydb/library/benchmarks/runner/tpc_tests.py b/ydb/library/benchmarks/runner/tpc_tests.py index 8943479d96eb..2d4711e3cefe 100644 --- a/ydb/library/benchmarks/runner/tpc_tests.py +++ b/ydb/library/benchmarks/runner/tpc_tests.py @@ -58,16 +58,17 @@ def wrapped_run(self, variant, datasize, tasks, query_filter): yatest.common.execute(cmd, stdout=sys.stdout, stderr=sys.stderr) -def upload(result_path, s3_folder): +def upload(result_path, s3_folder, try_num): uploader = pathlib.Path(yatest.common.source_path("ydb/library/benchmarks/runner/upload_results.py")).resolve() cmd = ["python3", str(uploader)] cmd += ["--result-path", str(result_path)] cmd += ["--s3-folder", str(s3_folder)] + cmd += ["--try-num", str(try_num)] if try_num else [] yatest.common.execute(cmd, stdout=sys.stdout, stderr=sys.stderr) def test_tpc(): - is_ci = os.environ.get("PUBLIC_DIR") is not None + is_ci = os.environ.get("CURRENT_PUBLIC_DIR") is not None runner = Runner() runner.wrapped_run("h", 1, 1, None) @@ -75,6 +76,10 @@ def test_tpc(): print("Results path: ", result_path, file=sys.stderr) if is_ci: - s3_folder = pathlib.Path(os.environ["PUBLIC_DIR"]).resolve() + s3_folder = pathlib.Path(os.environ["CURRENT_PUBLIC_DIR"]).resolve() + try: + try_num = int(s3_folder.name.split("try_")[-1]) + except Exception: + try_num = None - upload(result_path, s3_folder) + upload(result_path, s3_folder, try_num) diff --git a/ydb/library/benchmarks/runner/upload_results.py b/ydb/library/benchmarks/runner/upload_results.py index 01c86250155e..38e8c61c87c3 100644 --- a/ydb/library/benchmarks/runner/upload_results.py +++ b/ydb/library/benchmarks/runner/upload_results.py @@ -35,7 +35,9 @@ def __init__(self): self.user_time_ms = None self.system_time = None self.rss = None - self.output_hash = None + self.result_hash = None + self.stdout_file_path = None + self.stderr_file_path = None self.perf_file_path = None def from_json(self, json): @@ -64,6 +66,8 @@ def pretty_print(value): return f"Unwrap(DateTime::FromSeconds({int(delt.total_seconds())}))" if type(value) == datetime.timedelta: return f"DateTime::IntervalFromMicroseconds({int(value / datetime.timedelta(microseconds=1))})" + if isinstance(value, pathlib.Path): + return f'\"{value}\"' if type(value) == str: return f'\"{value}\"' if type(value) in [int, float]: @@ -74,7 +78,21 @@ def pretty_print(value): assert False, f"unrecognized type: {type(value)}" -def upload_results(result_path, s3_folder, test_start): +def upload_file_to_s3(s3_folder, result_path, file): + # copying files to folder that will be synced with s3 + dst = file.relative_to(result_path) + s3_file = (s3_folder / dst).resolve() + s3_file.parent.mkdir(parents=True, exist_ok=True) + _ = shutil.copy2(str(file.resolve()), str(s3_file)) + return dst + + +def upload_results(result_path, s3_folder, test_start, try_num): + def add_try_num_to_path(path): + if try_num: + path = f"try_{try_num}" / path + return path + results_map = {} for entry in result_path.glob("*/*"): if not entry.is_dir(): @@ -98,17 +116,22 @@ def upload_results(result_path, s3_folder, test_start): if query_num not in this_result: this_result[query_num] = RunResults() + # q.svg if file.suffix == ".svg": - dst = file.relative_to(result_path) - this_result[query_num].perf_file_path = dst - # copying files to folder that will be synced with s3 - dst = (s3_folder / dst).resolve() - dst.parent.mkdir(parents=True, exist_ok=True) - _ = shutil.copy2(str(file.resolve()), str(dst)) + this_result[query_num].perf_file_path = add_try_num_to_path(upload_file_to_s3(s3_folder, result_path, file)) + + # q-result.yson + if file.stem == f"q{query_num}-result": + with open(file, "r") as result: + this_result[query_num].result_hash = str(hash(result.read().strip())) + # q-stdout.txt if file.stem == f"q{query_num}-stdout": - with open(file, "r") as stdout: - this_result[query_num].output_hash = str(hash(stdout.read().strip())) + this_result[query_num].stdout_file_path = add_try_num_to_path(upload_file_to_s3(s3_folder, result_path, file)) + + # q-stderr.txt + if file.stem == f"q{query_num}-stderr": + this_result[query_num].stderr_file_path = add_try_num_to_path(upload_file_to_s3(s3_folder, result_path, file)) summary_file = entry / "summary.json" @@ -144,12 +167,14 @@ def upload_results(result_path, s3_folder, test_start): "WasSpillingInJoin" : None, "WasSpillingInChannels" : None, "MaxTasksPerStage" : params.tasks, - "PerfFileLink" : results.perf_file_path, "ExitCode" : results.exitcode, - "ResultHash" : results.output_hash, + "ResultHash" : results.result_hash, "SpilledBytes" : results.read_bytes, "UserTime" : results.user_time, - "SystemTime" : results.system_time + "SystemTime" : results.system_time, + "StdoutFileLink" : results.stdout_file_path, + "StderrFileLink" : results.stderr_file_path, + "PerfFileLink" : results.perf_file_path } sql = 'UPSERT INTO `perfomance/olap/dq_spilling_nightly_runs`\n\t({columns})\nVALUES\n\t({values})'.format( columns=", ".join(map(str, mapping.keys())), @@ -164,6 +189,7 @@ def main(): parser.add_argument("--result-path", type=pathlib.Path) parser.add_argument("--s3-folder", type=pathlib.Path) + parser.add_argument("--try-num", default=None) args = parser.parse_args() @@ -171,7 +197,7 @@ def main(): raise AttributeError("Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping uploading") os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ["CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] - upload_results(args.result_path, args.s3_folder, upload_time) + upload_results(args.result_path, args.s3_folder, upload_time, args.try_num) if __name__ == "__main__":