Skip to content

Commit

Permalink
Diff-checks (#9926)
Browse files Browse the repository at this point in the history
  • Loading branch information
iddqdex authored Oct 2, 2024
1 parent a12e17b commit 1a36438
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 140 deletions.
255 changes: 142 additions & 113 deletions ydb/tests/olap/lib/ydb_cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations
from typing import Optional
from typing import Any, Optional
import yatest.common
import json
import os
Expand Down Expand Up @@ -36,54 +36,79 @@ def __init__(self, plan: dict | None = None, table: str | None = None, ast: str
self.svg = svg

class WorkloadRunResult:
def __init__(
self, stats: dict[str, dict[str, any]] = {}, query_out: Optional[str] = None, stdout: Optional[str] = None, stderr: Optional[str] = None,
error_message: Optional[str] = None, plans: Optional[list[YdbCliHelper.QueryPlan]] = None,
errors_by_iter: Optional[dict[int, str]] = None, explain_plan: Optional[YdbCliHelper.QueryPlan] = None, traceback: Optional[TracebackType] = None
) -> None:
self.stats = stats
self.query_out = query_out if str != '' else None
self.stdout = stdout if stdout != '' else None
self.stderr = stderr if stderr != '' else None
self.success = error_message is None
self.error_message = '' if self.success else error_message
self.plans = plans
self.explain_plan = explain_plan
self.errors_by_iter = errors_by_iter
self.traceback = traceback
def __init__(self):
self.stats: dict[str, dict[str, Any]] = {}
self.query_out: Optional[str] = None
self.stdout: Optional[str] = None
self.stderr: Optional[str] = None
self.error_message: str = ''
self.plans: Optional[list[YdbCliHelper.QueryPlan]] = None
self.explain_plan: Optional[YdbCliHelper.QueryPlan] = None
self.errors_by_iter: dict[int, str] = {}
self.traceback: Optional[TracebackType] = None

@property
def success(self) -> bool:
return len(self.error_message) == 0

class WorkloadProcessor:
def __init__(self,
workload_type: WorkloadType,
db_path: str,
query_num: int,
iterations: int,
timeout: float,
check_canonical: bool):
def _get_output_path(ext: str) -> str:
return yatest.common.work_path(f'q{query_num}.{ext}')

self.result = YdbCliHelper.WorkloadRunResult()
self.workload_type = workload_type
self.db_path = db_path
self.query_num = query_num
self.iterations = iterations
self.timeout = timeout
self.check_canonical = check_canonical
self._nodes_info: dict[str, dict[str, int]] = {}
self._plan_path = _get_output_path('plan')
self._query_output_path = _get_output_path('out')
self._json_path = _get_output_path('json')

def _add_error(self, msg: Optional[str]):
if msg is not None and len(msg) > 0:
if len(self.result.error_message) > 0:
self.result.error_message += f'\n\n{msg}'
else:
self.result.error_message = msg

@staticmethod
def workload_run(type: WorkloadType, path: str, query_num: int, iterations: int = 5,
timeout: float = 100.) -> YdbCliHelper.WorkloadRunResult:
def _try_extract_error_message(stderr: str) -> str:
result = {}
begin_str = f'{query_num}:'
def _process_returncode(self, returncode, stderr: str) -> None:
begin_str = f'{self.query_num}:'
end_str = 'Query text:'
iter_str = 'iteration '
begin_pos = stderr.find(begin_str)
if begin_pos < 0:
return result
while True:
begin_pos = stderr.find(iter_str, begin_pos)
if begin_pos < 0:
return result
begin_pos += len(iter_str)
end_pos = stderr.find('\n', begin_pos)
if end_pos < 0:
iter = int(stderr[begin_pos:])
begin_pos = len(stderr) - 1
else:
iter = int(stderr[begin_pos:end_pos])
begin_pos = end_pos + 1
end_pos = stderr.find(end_str, begin_pos)
if end_pos < 0:
result[iter] = stderr[begin_pos:].strip()
else:
result[iter] = stderr[begin_pos:end_pos].strip()

def _load_plans(plan_path: str, name: str) -> YdbCliHelper.QueryPlan:
if begin_pos >= 0:
while True:
begin_pos = stderr.find(iter_str, begin_pos)
if begin_pos < 0:
break
begin_pos += len(iter_str)
end_pos = stderr.find('\n', begin_pos)
if end_pos < 0:
iter = int(stderr[begin_pos:])
begin_pos = len(stderr) - 1
else:
iter = int(stderr[begin_pos:end_pos])
begin_pos = end_pos + 1
end_pos = stderr.find(end_str, begin_pos)
msg = (stderr[begin_pos:] if end_pos < 0 else stderr[begin_pos:end_pos]).strip()
self.result.errors_by_iter[iter] = msg
self._add_error(f'Iteration {iter}: {msg}')
if returncode != 0 and len(self.result.errors_by_iter) == 0:
self._add_error(f'Invalid return code: {returncode} instead 0.')

def _load_plan(self, name: str) -> YdbCliHelper.QueryPlan:
result = YdbCliHelper.QueryPlan()
pp = f'{plan_path}.{query_num}.{name}'
pp = f'{self._plan_path}.{self.query_num}.{name}'
if (os.path.exists(f'{pp}.json')):
with open(f'{pp}.json') as f:
result.plan = json.load(f)
Expand All @@ -98,6 +123,29 @@ def _load_plans(plan_path: str, name: str) -> YdbCliHelper.QueryPlan:
result.svg = f.read()
return result

def _load_plans(self) -> None:
self.result.plans = [self._load_plan(str(i)) for i in range(self.iterations)]
self.result.explain_plan = self._load_plan('explain')

def _load_stats(self):
if not os.path.exists(self._json_path):
return
with open(self._json_path, 'r') as r:
json_data = r.read()
for signal in json.loads(json_data):
q = signal['labels']['query']
if q not in self.result.stats:
self.result.stats[q] = {}
self.result.stats[q][signal['sensor']] = signal['value']
if self.result.stats.get(f'Query{self.query_num:02d}', {}).get("DiffsCount", 0) > 0:
self._add_error('There is diff in query results')

def _load_query_out(self) -> None:
if (os.path.exists(self._query_output_path)):
with open(self._query_output_path, 'r') as r:
self.result.query_out = r.read()

@staticmethod
def _get_nodes_info() -> dict[str, dict[str, int]]:
nodes, _ = YdbCluster.get_cluster_nodes()
return {
Expand All @@ -107,86 +155,67 @@ def _get_nodes_info() -> dict[str, dict[str, int]]:
for n in nodes
}

errors_by_iter = {}
try:
wait_error = YdbCluster.wait_ydb_alive(300, path)
if wait_error is not None:
return YdbCliHelper.WorkloadRunResult(error_message=f'Ydb cluster is dead: {wait_error}')

nodes_info = _get_nodes_info()
def _check_nodes(self):
node_errors = []
for node, info in self._get_nodes_info().items():
if node in self._nodes_info:
if info['start_time'] > self._nodes_info[node]['start_time']:
node_errors.append(f'Node {node} was restarted')
self._nodes_info[node]['processed'] = True
for node, info in self._nodes_info.items():
if not info.get('processed', False):
node_errors.append(f'Node {node} is down')
self._add_error('\n'.join(node_errors))

json_path = yatest.common.work_path(f'q{query_num}.json')
qout_path = yatest.common.work_path(f'q{query_num}.out')
plan_path = yatest.common.work_path(f'q{query_num}.plan')
def _get_cmd(self) -> list[str]:
cmd = YdbCliHelper.get_cli_command() + [
'-e', YdbCluster.ydb_endpoint,
'-d', f'/{YdbCluster.ydb_database}',
'workload', str(type), '--path', path, 'run',
'--json', json_path,
'--output', qout_path,
'workload', str(self.workload_type), '--path', self.db_path, 'run',
'--json', self._json_path,
'--output', self._query_output_path,
'--executer', 'generic',
'--include', str(query_num),
'--iterations', str(iterations),
'--plan', plan_path,
'--include', str(self.query_num),
'--iterations', str(self.iterations),
'--plan', self._plan_path,
'--verbose'
]
query_preffix = get_external_param('query-prefix', '')
if query_preffix:
cmd += ['--query-settings', query_preffix]
err = None
if self.check_canonical:
cmd.append('--check-cannonical')
return cmd

def _exec_cli(self) -> None:
try:
exec: yatest.common.process._Execution = yatest.common.process.execute(cmd, wait=False, check_exit_code=False)
exec.wait(check_exit_code=False, timeout=timeout)
if exec.returncode != 0:
errors_by_iter = _try_extract_error_message(exec.stderr.decode('utf-8'))
err = '\n\n'.join([f'Iteration {i}: {e}' for i, e in errors_by_iter.items()])
if not err:
err = f'Invalid return code: {exec.returncode} instesd 0.'
process = yatest.common.process.execute(self._get_cmd(), wait=False, check_exit_code=False)
process.wait(check_exit_code=False, timeout=self.timeout)
self._process_returncode(process.returncode, process.stderr.decode('utf-8', 'replace'))
except (yatest.common.process.TimeoutError, yatest.common.process.ExecutionTimeoutError):
err = f'Timeout {timeout}s expeared.'
self._process_returncode(0, process.stderr.decode('utf-8', 'replace'))
self._add_error(f'Timeout {self.timeout}s expeared.')
self.result.stdout = process.stdout.decode('utf-8', 'replace')
self.result.stderr = process.stderr.decode('utf-8', 'replace')

node_errors = []
for node, info in _get_nodes_info().items():
if node in nodes_info:
if info['start_time'] > nodes_info[node]['start_time']:
node_errors.append(f'Node {node} was restarted')
nodes_info[node]['processed'] = True
for node, info in nodes_info.items():
if not info.get('processed', False):
node_errors.append(f'Node {node} is down')
if len(node_errors) > 0:
if err is None:
err = ''
def process(self) -> YdbCliHelper.WorkloadRunResult:
try:
wait_error = YdbCluster.wait_ydb_alive(300, self.db_path)
if wait_error is not None:
self.result.error_message = wait_error
else:
err += '\n\n'
err += '\n'.join(node_errors)

stats = {}
if (os.path.exists(json_path)):
with open(json_path, 'r') as r:
json_data = r.read()
if json_data:
for signal in json.loads(json_data):
q = signal['labels']['query']
if q not in stats:
stats[q] = {}
stats[q][signal['sensor']] = signal['value']

if (os.path.exists(qout_path)):
with open(qout_path, 'r') as r:
qout = r.read()
plans = [_load_plans(plan_path, str(i)) for i in range(iterations)]
explain_plan = _load_plans(plan_path, 'explain')

return YdbCliHelper.WorkloadRunResult(
stats=stats,
query_out=qout,
plans=plans,
explain_plan=explain_plan,
stdout=exec.stdout.decode('utf-8', 'ignore'),
stderr=exec.stderr.decode('utf-8', 'ignore'),
error_message=err,
errors_by_iter=errors_by_iter
)
except BaseException as e:
return YdbCliHelper.WorkloadRunResult(error_message=str(e), traceback=e.__traceback__)
self._nodes_info = self._get_nodes_info()
self._exec_cli()
self._check_nodes()
self._load_stats()
self._load_query_out()
self._load_plans()
except BaseException as e:
self._add_error(str(e))
self.result.traceback = e.__traceback__
return self.result

@staticmethod
def workload_run(workload_type: WorkloadType, path: str, query_num: int, iterations: int = 5,
timeout: float = 100., check_canonical: bool = False) -> YdbCliHelper.WorkloadRunResult:
return YdbCliHelper.WorkloadProcessor(workload_type, path, query_num, iterations, timeout, check_canonical).process()
64 changes: 39 additions & 25 deletions ydb/tests/olap/load/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class LoadSuiteBase:
workload_type: WorkloadType = None
timeout: float = 1800.
refference: str = ''
check_canonical = False

@property
def suite(self) -> str:
Expand All @@ -22,7 +23,11 @@ def suite(self) -> str:
return result[4:]
return result

def run_workload_test(self, path: str, query_num: int) -> None:
@classmethod
def _test_name(cls, query_num: int) -> str:
return f'Query{query_num:02d}'

def process_query_result(self, result: YdbCliHelper.WorkloadRunResult, query_num: int, iterations: int, upload: bool):
def _get_duraton(stats, field):
if stats is None:
return None
Expand All @@ -39,17 +44,7 @@ def _attach_plans(plan: YdbCliHelper.QueryPlan) -> None:
if plan.svg is not None:
allure.attach(plan.svg, 'Plan svg', attachment_type=allure.attachment_type.SVG)

test = f'Query{query_num:02d}'
allure_listener = next(filter(lambda x: isinstance(x, AllureListener), plugin_manager.get_plugin_manager().get_plugins()))
allure_test_result = allure_listener.allure_logger.get_test(None)
query_num_param = next(filter(lambda x: x.name == 'query_num', allure_test_result.parameters), None)
if query_num_param:
query_num_param.mode = allure.parameter_mode.HIDDEN.value
start_time = time()
result = YdbCliHelper.workload_run(
path=path, query_num=query_num, iterations=self.iterations, type=self.workload_type, timeout=self.timeout
)
allure_test_description(self.suite, test, refference_set=self.refference, start_time=start_time, end_time=time())
test = self._test_name(query_num)
stats = result.stats.get(test)
if stats is not None:
allure.attach(json.dumps(stats, indent=2), 'Stats', attachment_type=allure.attachment_type.JSON)
Expand All @@ -63,7 +58,7 @@ def _attach_plans(plan: YdbCliHelper.QueryPlan) -> None:
_attach_plans(result.explain_plan)

if result.plans is not None:
for i in range(self.iterations):
for i in range(iterations):
try:
with allure.step(f'Iteration {i}'):
_attach_plans(result.plans[i])
Expand Down Expand Up @@ -99,20 +94,39 @@ def _attach_plans(plan: YdbCliHelper.QueryPlan) -> None:
elif stats.get('FailsCount', 0) != 0:
success = False
error_message = 'There are fail attemps'
ResultsProcessor.upload_results(
kind='Load',
suite=self.suite,
test=test,
timestamp=start_time,
is_successful=success,
min_duration=_get_duraton(stats, 'Min'),
max_duration=_get_duraton(stats, 'Max'),
mean_duration=_get_duraton(stats, 'Mean'),
median_duration=_get_duraton(stats, 'Median'),
statistics=stats,
)
if upload:
ResultsProcessor.upload_results(
kind='Load',
suite=self.suite,
test=test,
timestamp=time(),
is_successful=success,
min_duration=_get_duraton(stats, 'Min'),
max_duration=_get_duraton(stats, 'Max'),
mean_duration=_get_duraton(stats, 'Mean'),
median_duration=_get_duraton(stats, 'Median'),
statistics=stats,
)
if not success:
exc = pytest.fail.Exception(error_message)
if result.traceback is not None:
exc = exc.with_traceback(result.traceback)
raise exc

def run_workload_test(self, path: str, query_num: int) -> None:
allure_listener = next(filter(lambda x: isinstance(x, AllureListener), plugin_manager.get_plugin_manager().get_plugins()))
allure_test_result = allure_listener.allure_logger.get_test(None)
query_num_param = next(filter(lambda x: x.name == 'query_num', allure_test_result.parameters), None)
if query_num_param:
query_num_param.mode = allure.parameter_mode.HIDDEN.value
start_time = time()
result = YdbCliHelper.workload_run(
path=path,
query_num=query_num,
iterations=self.iterations,
workload_type=self.workload_type,
timeout=self.timeout,
check_canonical=self.check_canonical
)
allure_test_description(self.suite, self._test_name(query_num), refference_set=self.refference, start_time=start_time, end_time=time())
self.process_query_result(result, query_num, self.iterations, True)
Loading

0 comments on commit 1a36438

Please sign in to comment.