Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiprocess scenario test #13371

Merged
merged 6 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/tests/olap/scenario/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def teardown_class(cls):
cls._ydb_instance.stop()

def test(self, ctx: TestContext):
test_path = ctx.test + get_external_param("table_suffix", "")
ScenarioTestHelper(None).remove_path(test_path, ctx.suite)
vlad-gogov marked this conversation as resolved.
Show resolved Hide resolved
start_time = time.time()
try:
ctx.executable(self, ctx)
Expand All @@ -103,6 +105,7 @@ def test(self, ctx: TestContext):
allure_test_description(ctx.suite, ctx.test, start_time=start_time, end_time=time.time())
raise
allure_test_description(ctx.suite, ctx.test, start_time=start_time, end_time=time.time())
ScenarioTestHelper(None).remove_path(test_path, ctx.suite)


def pytest_generate_tests(metafunc):
Expand Down
17 changes: 9 additions & 8 deletions ydb/tests/olap/scenario/helpers/scenario_tests_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from abc import abstractmethod, ABC
from typing import Set, List, Dict, Any, Callable, Optional
from time import sleep
from ydb.tests.olap.lib.utils import get_external_param


class TestContext:
Expand Down Expand Up @@ -316,7 +317,7 @@ def _add_not_empty(p: str, dir: str):
result = os.path.join('/', YdbCluster.ydb_database, YdbCluster.tables_path)
if self.test_context is not None:
result = _add_not_empty(result, self.test_context.suite)
result = _add_not_empty(result, self.test_context.test)
result = _add_not_empty(result, self.test_context.test) + get_external_param("table_suffix", "")
result = _add_not_empty(result, path)
return result

Expand Down Expand Up @@ -653,7 +654,7 @@ def describe_table(self, path: str, settings: ydb.DescribeTableSettings = None)
)

@allure.step('List path {path}')
def list_path(self, path: str) -> List[ydb.SchemeEntry]:
def list_path(self, path: str, folder: str) -> List[ydb.SchemeEntry]:
"""Recursively describe the path in the database under test.

If the path is a directory or TableStore, then all subpaths are included in the description.
Expand All @@ -666,7 +667,7 @@ def list_path(self, path: str) -> List[ydb.SchemeEntry]:
If the path does not exist, an empty list is returned.
"""

root_path = self.get_full_path('')
root_path = self.get_full_path(folder)
try:
self_descr = YdbCluster._describe_path_impl(os.path.join(root_path, path))
except ydb.issues.SchemeError:
Expand All @@ -681,7 +682,7 @@ def list_path(self, path: str) -> List[ydb.SchemeEntry]:
return self_descr

@allure.step('Remove path {path}')
def remove_path(self, path: str) -> None:
def remove_path(self, path: str, folder: str = '') -> None:
"""Recursively delete a path in the tested database.

If the path is a directory or TableStore, then all nested paths are removed.
Expand All @@ -696,12 +697,12 @@ def remove_path(self, path: str) -> None:

import ydb.tests.olap.scenario.helpers.drop_helper as dh

root_path = self.get_full_path('')
for e in self.list_path(path):
root_path = self.get_full_path(folder)
for e in self.list_path(path, folder):
if e.is_any_table():
self.execute_scheme_query(dh.DropTable(e.name))
self.execute_scheme_query(dh.DropTable(os.path.join(folder, e.name)))
elif e.is_column_store():
self.execute_scheme_query(dh.DropTableStore(e.name))
self.execute_scheme_query(dh.DropTableStore(os.path.join(folder, e.name)))
elif e.is_directory():
self._run_with_expected_status(
lambda: YdbCluster.get_ydb_driver().scheme_client.remove_directory(os.path.join(root_path, e.name)),
Expand Down
2 changes: 2 additions & 0 deletions ydb/tests/olap/scenario/multitest.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/sh -e
make S3_ACCESS_KEY=$1 S3_SECRET_KEY=$2 -rkj -f test.mk all.test.dst && echo OK || echo Error
6 changes: 6 additions & 0 deletions ydb/tests/olap/scenario/test.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
suffixes:=$(shell jot 20)

$(suffixes:=.test.dst): %.test.dst:
../../../../ya test --build=relwithdebinfo --test-disable-timeout --test-param ydb-endpoint="grpc://ydb-olap-testing-vla-0000.search.yandex.net:2135" --test-param ydb-db="/olap-testing/kikimr/testing/testdb" --test-param tables-path=scenario --test-param s3-endpoint=http://storage.yandexcloud.net --test-param s3-access-key=$(S3_ACCESS_KEY) --test-param s3-secret-key=$(S3_SECRET_KEY) --test-param s3-buckets=ydb-test-test,ydb-test-test-2 --test-param test-duration-seconds=2400 --test-param table_suffix=$* --test-param rows_count=100 --test-param batches_count=1000 --test-param reuse-tables=True --test-param keep-tables=True --test-param table_count=10 -F test_insert.py::TestInsert::test[read_data_during_bulk_upsert]
aavdonkin marked this conversation as resolved.
Show resolved Hide resolved

all.test.dst: $(suffixes:=.test.dst)
71 changes: 47 additions & 24 deletions ydb/tests/olap/scenario/test_insert.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from conftest import BaseTestSet
from ydb.tests.olap.scenario.helpers import (
ScenarioTestHelper,
Expand All @@ -24,19 +25,26 @@ class TestInsert(BaseTestSet):
.with_key_columns("key")
)

def _loop_upsert(self, ctx: TestContext, data: list):
def _loop_upsert(self, ctx: TestContext, data: list, table: str):
sth = ScenarioTestHelper(ctx)
table_name = "log" + table
for batch in data:
sth.bulk_upsert_data("log", self.schema_log, batch)
sth.bulk_upsert_data(table_name, self.schema_log, batch)

def _loop_insert(self, ctx: TestContext, rows_count: int):
def _loop_insert(self, ctx: TestContext, rows_count: int, table: str):
sth = ScenarioTestHelper(ctx)
log: str = sth.get_full_path("log")
cnt: str = sth.get_full_path("cnt")
log: str = sth.get_full_path("log" + table)
cnt: str = sth.get_full_path("cnt" + table)
for i in range(rows_count):
sth.execute_query(
f'$cnt = SELECT CAST(COUNT(*) AS INT64) from `{log}`; INSERT INTO `{cnt}` (key, c) values({i}, $cnt)'
)
for j in range(10):
try:
sth.execute_query(
f'$cnt = SELECT CAST(COUNT(*) AS INT64) from `{log}`; INSERT INTO `{cnt}` (key, c) values({i}, $cnt)'
)
break
except Exception:
pass
aavdonkin marked this conversation as resolved.
Show resolved Hide resolved
time.sleep(1)
aavdonkin marked this conversation as resolved.
Show resolved Hide resolved

def scenario_read_data_during_bulk_upsert(self, ctx: TestContext):
sth = ScenarioTestHelper(ctx)
Expand All @@ -45,41 +53,56 @@ def scenario_read_data_during_bulk_upsert(self, ctx: TestContext):
batches_count = int(get_external_param("batches_count", "10"))
rows_count = int(get_external_param("rows_count", "1000"))
inserts_count = int(get_external_param("inserts_count", "200"))
sth.execute_scheme_query(
CreateTable(cnt_table_name).with_schema(self.schema_cnt)
)
sth.execute_scheme_query(
CreateTable(log_table_name).with_schema(self.schema_log)
)
table_count = int(get_external_param("table_count", "1"))
for table in range(table_count):
sth.execute_scheme_query(
CreateTable(cnt_table_name + str(table)).with_schema(self.schema_cnt)
)
for table in range(table_count):
sth.execute_scheme_query(
CreateTable(log_table_name + str(table)).with_schema(self.schema_log)
)
data: List = []
for i in range(batches_count):
batch: List[Dict[str, Any]] = []
for j in range(rows_count):
batch.append({"key": j + rows_count * i})
data.append(batch)

thread1 = TestThread(target=self._loop_upsert, args=[ctx, data])
thread2 = TestThread(target=self._loop_insert, args=[ctx, inserts_count])
thread1 = []
thread2 = []
for table in range(table_count):
thread1.append(TestThread(target=self._loop_upsert, args=[ctx, data, str(table)]))
for table in range(table_count):
thread2.append(TestThread(target=self._loop_insert, args=[ctx, inserts_count, str(table)]))

for thread in thread1:
thread.start()

for thread in thread2:
thread.start()

thread1.start()
thread2.start()
for thread in thread2:
thread.join()

thread2.join()
thread1.join()
for thread in thread1:
thread.join()

rows: int = sth.get_table_rows_count(cnt_table_name)
cnt_table_name0 = cnt_table_name + "0"
rows: int = sth.get_table_rows_count(cnt_table_name0)
assert rows == inserts_count
scan_result = sth.execute_scan_query(
f"SELECT key, c FROM `{sth.get_full_path(cnt_table_name)}` ORDER BY key"
f"SELECT key, c FROM `{sth.get_full_path(cnt_table_name0)}` ORDER BY key"
aavdonkin marked this conversation as resolved.
Show resolved Hide resolved
)
for i in range(rows):
if scan_result.result_set.rows[i]["key"] != i:
assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}"

rows: int = sth.get_table_rows_count(log_table_name)
log_table_name0 = log_table_name + "0"
rows: int = sth.get_table_rows_count(log_table_name0)
assert rows == rows_count * batches_count
scan_result = sth.execute_scan_query(
f"SELECT key FROM `{sth.get_full_path(log_table_name)}` ORDER BY key"
f"SELECT key FROM `{sth.get_full_path(log_table_name0)}` ORDER BY key"
aavdonkin marked this conversation as resolved.
Show resolved Hide resolved
)
for i in range(rows):
if scan_result.result_set.rows[i]["key"] != i:
Expand Down
Loading