Skip to content

Commit

Permalink
Merge a1cdb54 into 98f1bfc
Browse files Browse the repository at this point in the history
  • Loading branch information
aavdonkin authored Jan 16, 2025
2 parents 98f1bfc + a1cdb54 commit c9e07c9
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 47 deletions.
3 changes: 3 additions & 0 deletions ydb/tests/olap/scenario/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def teardown_class(cls):
cls._ydb_instance.stop()

def test(self, ctx: TestContext):
test_path = ctx.test + get_external_param("table_suffix", "")
ScenarioTestHelper(None).remove_path(test_path, ctx.suite)
start_time = time.time()
try:
ctx.executable(self, ctx)
Expand All @@ -103,6 +105,7 @@ def test(self, ctx: TestContext):
allure_test_description(ctx.suite, ctx.test, start_time=start_time, end_time=time.time())
raise
allure_test_description(ctx.suite, ctx.test, start_time=start_time, end_time=time.time())
ScenarioTestHelper(None).remove_path(test_path, ctx.suite)


def pytest_generate_tests(metafunc):
Expand Down
21 changes: 11 additions & 10 deletions ydb/tests/olap/scenario/helpers/scenario_tests_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from abc import abstractmethod, ABC
from typing import Set, List, Dict, Any, Callable, Optional
from time import sleep
from ydb.tests.olap.lib.utils import get_external_param


class TestContext:
Expand Down Expand Up @@ -316,7 +317,7 @@ def _add_not_empty(p: str, dir: str):
result = os.path.join('/', YdbCluster.ydb_database, YdbCluster.tables_path)
if self.test_context is not None:
result = _add_not_empty(result, self.test_context.suite)
result = _add_not_empty(result, self.test_context.test)
result = _add_not_empty(result, self.test_context.test) + get_external_param("table_suffix", "")
result = _add_not_empty(result, path)
return result

Expand Down Expand Up @@ -463,7 +464,7 @@ def execute_scan_query(

@allure.step('Execute query')
def execute_query(
self, yql: str, expected_status: ydb.StatusCode | Set[ydb.StatusCode] = ydb.StatusCode.SUCCESS
self, yql: str, expected_status: ydb.StatusCode | Set[ydb.StatusCode] = ydb.StatusCode.SUCCESS, retries=0
):
"""Run a query on the tested database.
Expand All @@ -479,7 +480,7 @@ def execute_query(

allure.attach(yql, 'request', allure.attachment_type.TEXT)
with ydb.QuerySessionPool(YdbCluster.get_ydb_driver()) as pool:
self._run_with_expected_status(lambda: pool.execute_with_retries(yql), expected_status)
self._run_with_expected_status(lambda: pool.execute_with_retries(yql, None, ydb.RetrySettings(max_retries=retries)), expected_status)

def drop_if_exist(self, names: List[str], operation) -> None:
"""Erase entities in the tested database, if it exists.
Expand Down Expand Up @@ -653,7 +654,7 @@ def describe_table(self, path: str, settings: ydb.DescribeTableSettings = None)
)

@allure.step('List path {path}')
def list_path(self, path: str) -> List[ydb.SchemeEntry]:
def list_path(self, path: str, folder: str) -> List[ydb.SchemeEntry]:
"""Recursively describe the path in the database under test.
If the path is a directory or TableStore, then all subpaths are included in the description.
Expand All @@ -666,7 +667,7 @@ def list_path(self, path: str) -> List[ydb.SchemeEntry]:
If the path does not exist, an empty list is returned.
"""

root_path = self.get_full_path('')
root_path = self.get_full_path(folder)
try:
self_descr = YdbCluster._describe_path_impl(os.path.join(root_path, path))
except ydb.issues.SchemeError:
Expand All @@ -681,7 +682,7 @@ def list_path(self, path: str) -> List[ydb.SchemeEntry]:
return self_descr

@allure.step('Remove path {path}')
def remove_path(self, path: str) -> None:
def remove_path(self, path: str, folder: str = '') -> None:
"""Recursively delete a path in the tested database.
If the path is a directory or TableStore, then all nested paths are removed.
Expand All @@ -696,12 +697,12 @@ def remove_path(self, path: str) -> None:

import ydb.tests.olap.scenario.helpers.drop_helper as dh

root_path = self.get_full_path('')
for e in self.list_path(path):
root_path = self.get_full_path(folder)
for e in self.list_path(path, folder):
if e.is_any_table():
self.execute_scheme_query(dh.DropTable(e.name))
self.execute_scheme_query(dh.DropTable(os.path.join(folder, e.name)))
elif e.is_column_store():
self.execute_scheme_query(dh.DropTableStore(e.name))
self.execute_scheme_query(dh.DropTableStore(os.path.join(folder, e.name)))
elif e.is_directory():
self._run_with_expected_status(
lambda: YdbCluster.get_ydb_driver().scheme_client.remove_directory(os.path.join(root_path, e.name)),
Expand Down
2 changes: 2 additions & 0 deletions ydb/tests/olap/scenario/multitest.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/sh -e
make S3_ACCESS_KEY=$1 S3_SECRET_KEY=$2 YDB_ENDPOINT=$3 YDB_DB=$4 -rkj -f test.mk all.test.dst && echo OK || echo Error
6 changes: 6 additions & 0 deletions ydb/tests/olap/scenario/test.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
suffixes:=$(shell jot 20)

$(suffixes:=.test.dst): %.test.dst:
../../../../ya test --build=relwithdebinfo --test-disable-timeout --test-param ydb-endpoint=$(YDB_ENDPOINT) --test-param ydb-db=$(YDB_DB) --test-param tables-path=scenario --test-param s3-endpoint=http://storage.yandexcloud.net --test-param s3-access-key=$(S3_ACCESS_KEY) --test-param s3-secret-key=$(S3_SECRET_KEY) --test-param s3-buckets=ydb-test-test,ydb-test-test-2 --test-param test-duration-seconds=2400 --test-param table_suffix=$* --test-param rows_count=100 --test-param batches_count=1000 --test-param reuse-tables=True --test-param keep-tables=True --test-param tables_count=10 --test-param ignore_read_errors=True -F test_insert.py::TestInsert::test[read_data_during_bulk_upsert]

all.test.dst: $(suffixes:=.test.dst)
99 changes: 62 additions & 37 deletions ydb/tests/olap/scenario/test_insert.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from conftest import BaseTestSet
from ydb.tests.olap.scenario.helpers import (
ScenarioTestHelper,
Expand All @@ -7,7 +8,7 @@
from helpers.thread_helper import TestThread
from ydb import PrimitiveType
from typing import List, Dict, Any
from ydb.tests.olap.lib.utils import get_external_param
from ydb.tests.olap.lib.utils import get_external_param, external_param_is_true


class TestInsert(BaseTestSet):
Expand All @@ -24,19 +25,26 @@ class TestInsert(BaseTestSet):
.with_key_columns("key")
)

def _loop_upsert(self, ctx: TestContext, data: list):
def _loop_upsert(self, ctx: TestContext, data: list, table: str):
sth = ScenarioTestHelper(ctx)
table_name = "log" + table
for batch in data:
sth.bulk_upsert_data("log", self.schema_log, batch)
sth.bulk_upsert_data(table_name, self.schema_log, batch)

def _loop_insert(self, ctx: TestContext, rows_count: int):
def _loop_insert(self, ctx: TestContext, rows_count: int, table: str, ignore_read_errors: bool):
sth = ScenarioTestHelper(ctx)
log: str = sth.get_full_path("log")
cnt: str = sth.get_full_path("cnt")
log: str = sth.get_full_path("log" + table)
cnt: str = sth.get_full_path("cnt" + table)
for i in range(rows_count):
sth.execute_query(
f'$cnt = SELECT CAST(COUNT(*) AS INT64) from `{log}`; INSERT INTO `{cnt}` (key, c) values({i}, $cnt)'
)
try:
sth.execute_query(
yql=f'$cnt = SELECT CAST(COUNT(*) AS INT64) from `{log}`; INSERT INTO `{cnt}` (key, c) values({i}, $cnt)', retries=10
)
except Exception:
if ignore_read_errors:
pass
else:
raise

def scenario_read_data_during_bulk_upsert(self, ctx: TestContext):
sth = ScenarioTestHelper(ctx)
Expand All @@ -45,42 +53,59 @@ def scenario_read_data_during_bulk_upsert(self, ctx: TestContext):
batches_count = int(get_external_param("batches_count", "10"))
rows_count = int(get_external_param("rows_count", "1000"))
inserts_count = int(get_external_param("inserts_count", "200"))
sth.execute_scheme_query(
CreateTable(cnt_table_name).with_schema(self.schema_cnt)
)
sth.execute_scheme_query(
CreateTable(log_table_name).with_schema(self.schema_log)
)
tables_count = int(get_external_param("tables_count", "1"))
ignore_read_errors = external_param_is_true("ignore_read_errors")
for table in range(tables_count):
sth.execute_scheme_query(
CreateTable(cnt_table_name + str(table)).with_schema(self.schema_cnt)
)
for table in range(tables_count):
sth.execute_scheme_query(
CreateTable(log_table_name + str(table)).with_schema(self.schema_log)
)
data: List = []
for i in range(batches_count):
batch: List[Dict[str, Any]] = []
for j in range(rows_count):
batch.append({"key": j + rows_count * i})
data.append(batch)

thread1 = TestThread(target=self._loop_upsert, args=[ctx, data])
thread2 = TestThread(target=self._loop_insert, args=[ctx, inserts_count])
thread1 = []
thread2 = []
for table in range(tables_count):
thread1.append(TestThread(target=self._loop_upsert, args=[ctx, data, str(table)]))
for table in range(tables_count):
thread2.append(TestThread(target=self._loop_insert, args=[ctx, inserts_count, str(table), ignore_read_errors]))

for thread in thread1:
thread.start()

thread1.start()
thread2.start()
for thread in thread2:
thread.start()

thread2.join()
thread1.join()
for thread in thread2:
thread.join()

rows: int = sth.get_table_rows_count(cnt_table_name)
assert rows == inserts_count
scan_result = sth.execute_scan_query(
f"SELECT key, c FROM `{sth.get_full_path(cnt_table_name)}` ORDER BY key"
)
for i in range(rows):
if scan_result.result_set.rows[i]["key"] != i:
assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}"
for thread in thread1:
thread.join()

rows: int = sth.get_table_rows_count(log_table_name)
assert rows == rows_count * batches_count
scan_result = sth.execute_scan_query(
f"SELECT key FROM `{sth.get_full_path(log_table_name)}` ORDER BY key"
)
for i in range(rows):
if scan_result.result_set.rows[i]["key"] != i:
assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}"
for table in range(tables_count):
cnt_table_name0 = cnt_table_name + str(table)
rows: int = sth.get_table_rows_count(cnt_table_name0)
assert rows == inserts_count
scan_result = sth.execute_scan_query(
f"SELECT key, c FROM `{sth.get_full_path(cnt_table_name0)}` ORDER BY key"
)
for i in range(rows):
if scan_result.result_set.rows[i]["key"] != i:
assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}"

log_table_name0 = log_table_name + str(table)
rows: int = sth.get_table_rows_count(log_table_name0)
assert rows == rows_count * batches_count
scan_result = sth.execute_scan_query(
f"SELECT key FROM `{sth.get_full_path(log_table_name0)}` ORDER BY key"
)
for i in range(rows):
if scan_result.result_set.rows[i]["key"] != i:
assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}"

0 comments on commit c9e07c9

Please sign in to comment.