Skip to content

Commit

Permalink
update tiering configuration in scenario tests (#11709)
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Feb 7, 2025
1 parent 02b921d commit 2cd8bae
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 380 deletions.
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[36]
ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[67]
ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[86]
ydb/tests/olap/scenario sole chunk chunk
ydb/tests/olap/scenario test_alter_tiering.py.TestAlterTiering.test[many_tables]
ydb/tests/olap/scenario test_insert.py.TestInsert.test[read_data_during_bulk_upsert]
ydb/tests/olap/ttl_tiering data_migration_when_alter_ttl.py.TestDataMigrationWhenAlterTtl.test
ydb/tests/olap/ttl_tiering sole chunk chunk
Expand Down
3 changes: 3 additions & 0 deletions ydb/tests/library/harness/kikimr_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def __init__(
extra_grpc_services=None, # list[str]
hive_config=None,
datashard_config=None,
columnshard_config=None,
enforce_user_token_requirement=False,
default_user_sid=None,
pg_compatible_expirement=False,
Expand Down Expand Up @@ -352,6 +353,8 @@ def __init__(

if datashard_config:
self.yaml_config["data_shard_config"] = datashard_config
if columnshard_config:
self.yaml_config["column_shard_config"] = columnshard_config

self.__build()

Expand Down
9 changes: 6 additions & 3 deletions ydb/tests/olap/scenario/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ class YdbClusterInstance():
_endpoint = None
_database = None

def __init__(self, endpoint, database):
def __init__(self, endpoint, database, config):
if endpoint is not None:
self._endpoint = endpoint
self._database = database
self._mon_port = 8765
else:
config = KikimrConfigGenerator(extra_feature_flags=["enable_column_store"])
cluster = KiKiMR(configurator=config)
cluster.start()
node = cluster.nodes[1]
Expand Down Expand Up @@ -65,7 +64,7 @@ def get_suite_name(cls):
def setup_class(cls):
ydb_endpoint = get_external_param('ydb-endpoint', None)
ydb_database = get_external_param('ydb-db', "").lstrip('/')
cls._ydb_instance = YdbClusterInstance(ydb_endpoint, ydb_database)
cls._ydb_instance = YdbClusterInstance(ydb_endpoint, ydb_database, cls._get_cluster_config())
YdbCluster.reset(cls._ydb_instance.endpoint(), cls._ydb_instance.database(), cls._ydb_instance.mon_port())
if not external_param_is_true('reuse-tables'):
ScenarioTestHelper(None).remove_path(cls.get_suite_name())
Expand Down Expand Up @@ -107,6 +106,10 @@ def test(self, ctx: TestContext):
allure_test_description(ctx.suite, ctx.test, start_time=start_time, end_time=time.time())
ScenarioTestHelper(None).remove_path(test_path, ctx.suite)

@classmethod
def _get_cluster_config(cls):
return KikimrConfigGenerator(extra_feature_flags=["enable_column_store"])


def pytest_generate_tests(metafunc):
idlist = []
Expand Down
2 changes: 2 additions & 0 deletions ydb/tests/olap/scenario/helpers/data_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ def generate_value(self, column: ScenarioTestHelper.Column) -> Any:
return str(self._value)
elif column.type in {PrimitiveType.Json, PrimitiveType.JsonDocument}:
return f'"{str(self._value)}"'
elif column.type == PrimitiveType.Timestamp:
return self._value
raise TypeError(f'Unsupported type {column.type}')


Expand Down
15 changes: 15 additions & 0 deletions ydb/tests/olap/scenario/helpers/drop_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,18 @@ class DropTableStore(DropObject):
@override
def _type(self) -> str:
return 'tablestore'


class DropExternalDataSource(DropObject):
"""Class of requests to delete an external data source.
See {ScenarioTestHelper.execute_scheme_query}.
Example:
sth = ScenarioTestHelper(ctx)
sth.execute_scheme_query(DropExternalDataSource('eds'))
"""

@override
def _type(self) -> str:
return 'external data source'
53 changes: 26 additions & 27 deletions ydb/tests/olap/scenario/helpers/table_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
TestContext,
)
from abc import abstractmethod, ABC
from typing import override, Dict
from typing import override, Dict, Iterable, Optional
from datetime import timedelta


class CreateTableLikeObject(ScenarioTestHelper.IYqlble):
Expand All @@ -22,6 +23,7 @@ def __init__(self, name: str) -> None:
super().__init__(name)
self._schema = None
self._partitions = 64
self._existing_ok = False

def with_schema(self, schema: ScenarioTestHelper.Schema) -> CreateTableLikeObject:
"""Specify the schema of the created object.
Expand All @@ -47,6 +49,18 @@ def with_partitions_count(self, partitions: int) -> CreateTableLikeObject:
self._partitions = partitions
return self

def existing_ok(self, value: bool = True) -> CreateTableLikeObject:
"""Set existing_ok value.
Args:
value: existing_ok.
Returns:
self."""

self._existing_ok = value
return self

@override
def params(self) -> Dict[str, str]:
return {self._type(): self._name}
Expand All @@ -60,7 +74,7 @@ def to_yql(self, ctx: TestContext) -> str:
schema_str = ',\n '.join([c.to_yql() for c in self._schema.columns])
column_families_str = ',\n'.join([c.to_yql() for c in self._schema.column_families])
keys = ', '.join(self._schema.key_columns)
return f'''CREATE {self._type().upper()} `{ScenarioTestHelper(ctx).get_full_path(self._name)}` (
return f'''CREATE {self._type().upper()}{' IF NOT EXISTS' if self.existing_ok else ''} `{ScenarioTestHelper(ctx).get_full_path(self._name)}` (
{schema_str},
PRIMARY KEY({keys})
{"" if not column_families_str else f", {column_families_str}"}
Expand Down Expand Up @@ -537,30 +551,7 @@ def drop_column(self, column: str) -> AlterTableLikeObject:

return self(DropColumn(column))

def set_tiering(self, tiering_rule: str) -> AlterTableLikeObject:
"""Set a tiering policy.
The method is similar to calling {AlterTableLikeObject.action} with a {SetSetting} instance.
Args:
tiering_rule: Name of a TIERING_RULE object.
Returns:
self."""

return self(SetSetting('TIERING', f'"{tiering_rule}"'))

def reset_tiering(self) -> AlterTableLikeObject:
"""Remove a tiering policy.
The method is similar to calling {AlterTableLikeObject.action} with a {SetSetting} instance.
Returns:
self."""

return self(ResetSetting('TIERING'))

def set_ttl(self, interval: str, column: str) -> AlterTableLikeObject:
def set_ttl(self, tiers: Iterable[(timedelta, Optional[str])], column: str) -> AlterTableLikeObject:
"""Set TTL for rows.
The method is similar to calling {AlterTableLikeObject.action} with a {SetSetting} instance.
Expand All @@ -571,7 +562,15 @@ def set_ttl(self, interval: str, column: str) -> AlterTableLikeObject:
Returns:
self."""

return self(SetSetting('TTL', f'Interval("{interval}") ON `{column}`'))
def make_tier_literal(delay: timedelta, storage_path: Optional[str]):
delay_literal = f'Interval("PT{delay.total_seconds()}S")'
if storage_path:
return delay_literal + ' TO EXTERNAL DATA SOURCE `' + storage_path + '`'
else:
return delay_literal + ' DELETE'

tiers_literal = ', '.join(map(lambda x: make_tier_literal(*x), tiers))
return self(SetSetting('TTL', f'{tiers_literal} ON {column}'))

def add_column_family(self, column_family: ScenarioTestHelper.ColumnFamily) -> AlterTableLikeObject:
"""Add a column_family.
Expand Down
Loading

0 comments on commit 2cd8bae

Please sign in to comment.