Skip to content

Commit

Permalink
Merge 8d336e8 into fbc9d17
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Feb 6, 2024
2 parents fbc9d17 + 8d336e8 commit 4c7139d
Show file tree
Hide file tree
Showing 6 changed files with 492 additions and 0 deletions.
71 changes: 71 additions & 0 deletions ydb/tests/fq/kikimr/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pytest

from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
from ydb.tests.tools.fq_runner.custom_hooks import * # noqa: F401,F403 Adding custom hooks for YQv2 support
from ydb.tests.tools.fq_runner.kikimr_utils import ExtensionPoint
from ydb.tests.tools.fq_runner.kikimr_utils import YQv2Extension
from ydb.tests.tools.fq_runner.kikimr_utils import ComputeExtension
from ydb.tests.tools.fq_runner.kikimr_utils import DefaultConfigExtension
from ydb.tests.tools.fq_runner.kikimr_utils import StatsModeExtension
from ydb.tests.tools.fq_runner.kikimr_utils import start_kikimr


@pytest.fixture
def stats_mode():
return ''


@pytest.fixture
def kikimr(request: pytest.FixtureRequest, yq_version: str, stats_mode: str):
kikimr_extensions = [DefaultConfigExtension(""),
YQv2Extension(yq_version),
ComputeExtension(),
StatsModeExtension(stats_mode)]
with start_kikimr(request, kikimr_extensions) as kikimr:
yield kikimr


class ManyRetriesConfigExtension(ExtensionPoint):
def __init__(self):
super().__init__()

def is_applicable(self, request):
return True

def apply_to_kikimr(self, request, kikimr):
kikimr.compute_plane.fq_config['control_plane_storage']['retry_policy_mapping'] = [
{
'status_code': [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13],
'policy': {
'retry_count': 10000
}
}
]


@pytest.fixture
def kikimr_many_retries(request: pytest.FixtureRequest, yq_version: str):
kikimr_extensions = [DefaultConfigExtension(""),
ManyRetriesConfigExtension(),
YQv2Extension(yq_version),
ComputeExtension()]
with start_kikimr(request, kikimr_extensions) as kikimr:
yield kikimr


def create_client(kikimr, request):
return FederatedQueryClient(request.param["folder_id"] if request is not None else "my_folder",
streaming_over_kikimr=kikimr)


@pytest.fixture
def client(kikimr, request=None):
return create_client(kikimr, request)


@pytest.fixture
def client_many_retries(kikimr_many_retries, request=None):
return create_client(kikimr_many_retries, request)
26 changes: 26 additions & 0 deletions ydb/tests/fq/kikimr/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase


class TestBaseWithAbortingConfigParams(TestYdsBase):

@classmethod
def setup_class(cls):
kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True)
cls.streaming_over_kikimr = StreamingOverKikimr(kikimr_conf)
cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_ttl'] = "2s"
cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy'] = {}
cls.streaming_over_kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_count'] = 1
cls.streaming_over_kikimr.compute_plane.fq_config['pinger']['ping_period'] = "1s"
cls.streaming_over_kikimr.start_mvp_mock_server()
cls.streaming_over_kikimr.start()

@classmethod
def teardown_class(cls):
if hasattr(cls, "streaming_over_kikimr"):
cls.streaming_over_kikimr.stop_mvp_mock_server()
cls.streaming_over_kikimr.stop()
160 changes: 160 additions & 0 deletions ydb/tests/fq/kikimr/test_recovery_match_recognize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pytest
import logging
import os
import time

import ydb.tests.library.common.yatest_common as yatest_common
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
import library.python.retry as retry
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
import ydb.public.api.protos.draft.fq_pb2 as fq


@pytest.fixture
def kikimr(request):
kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True, node_count=2)
kikimr = StreamingOverKikimr(kikimr_conf)
kikimr.start_mvp_mock_server()
kikimr.start()
yield kikimr
kikimr.stop_mvp_mock_server()
kikimr.stop()


class TestRecoveryMatchRecognize(TestYdsBase):

@classmethod
def setup_class(cls):
# for retry
cls.retry_conf = retry.RetryConf().upto(seconds=30).waiting(0.1)

@retry.retry_intrusive
def get_graph_master_node_id(self, kikimr, query_id):
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
if kikimr.control_plane.get_task_count(node_index, query_id) > 0:
return node_index
assert False, "No active graphs found"

def get_ca_count(self, kikimr, node_index):
result = kikimr.control_plane.get_sensors(node_index, "utils").find_sensor(
{"activity": "DQ_COMPUTE_ACTOR", "sensor": "ActorsAliveByActivity", "execpool": "User"}
)
return result if result is not None else 0

def dump_workers(self, kikimr, worker_count, ca_count, wait_time=yatest_common.plain_or_under_sanitizer(30, 150)):
deadline = time.time() + wait_time
while True:
wcs = 0
ccs = 0
list = []
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
wc = kikimr.control_plane.get_worker_count(node_index)
cc = self.get_ca_count(kikimr, node_index)
wcs += wc
ccs += cc
list.append([node_index, wc, cc])
if wcs == worker_count and ccs == ca_count:
for [s, w, c] in list:
if w * 2 != c:
continue
for [s, w, c] in list:
logging.debug("Node {}, workers {}, ca {}".format(s, w, c))
return
if time.time() > deadline:
for [s, w, c] in list:
logging.debug("Node {}, workers {}, ca {}".format(s, w, c))
assert False, "Workers={} and CAs={}, but {} and {} expected".format(wcs, ccs, worker_count, ca_count)

@yq_v1
@pytest.mark.parametrize("kikimr", [(None, None, None)], indirect=["kikimr"])
def test_program_state_recovery(self, kikimr, client, yq_version):

self.init_topics(f"pq_kikimr_streaming_{yq_version}")

sql = R'''
PRAGMA dq.MaxTasksPerStage="2";
pragma FeatureR010="prototype";
pragma config.flags("TimeOrderRecoverDelay", "-1000000");
pragma config.flags("TimeOrderRecoverAhead", "1000000");
INSERT INTO myyds.`{output_topic}`
SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow()))))
FROM (SELECT * FROM myyds.`{input_topic}`
WITH (
format=json_each_row,
SCHEMA
(
dt UINT64
)))
MATCH_RECOGNIZE(
ORDER BY CAST(dt as Timestamp)
MEASURES
LAST(ALL_TRUE.dt) as dt
ONE ROW PER MATCH
PATTERN ( ALL_TRUE )
DEFINE
ALL_TRUE as True)''' \
.format(
input_topic=self.input_topic,
output_topic=self.output_topic,
)

client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
kikimr.compute_plane.wait_zero_checkpoint(query_id)

master_node_index = self.get_graph_master_node_id(kikimr, query_id)
logging.debug("Master node {}".format(master_node_index))

messages1 = ['{"dt": 1696849942400002}', '{"dt": 1696849942000001}']
self.write_stream(messages1)

logging.debug("get_completed_checkpoints {}".format(kikimr.compute_plane.get_completed_checkpoints(query_id)))
kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
)

# restart node with CA
node_to_restart = None
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
wc = kikimr.control_plane.get_worker_count(node_index)
if wc is not None:
if wc > 0 and node_index != master_node_index and node_to_restart is None:
node_to_restart = node_index
assert node_to_restart is not None, "Can't find any task on non master node"

logging.debug("Restart non-master node {}".format(node_to_restart))

kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop()
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start()
kikimr.control_plane.wait_bootstrap(node_to_restart)

messages2 = [
'{"dt": 1696849942800000}',
'{"dt": 1696849943200003}',
'{"dt": 1696849943300003}',
'{"dt": 1696849943600003}',
'{"dt": 1696849943900003}'
]
self.write_stream(messages2)

assert client.get_query_status(query_id) == fq.QueryMeta.RUNNING

expected = ['{"dt":1696849942000001}', '{"dt":1696849942400002}', '{"dt":1696849942800000}']

read_data = self.read_stream(len(expected))
logging.info("Data was read: {}".format(read_data))

assert read_data == expected

client.abort_query(query_id)
client.wait_query(query_id)

self.dump_workers(kikimr, 0, 0)
Loading

0 comments on commit 4c7139d

Please sign in to comment.