diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index 31c153de8824..d91ab6de1fda 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -165,10 +166,15 @@ struct TEvDqCompute { using TBaseEventPB::TBaseEventPB; - TEvRestoreFromCheckpointResult(const NDqProto::TCheckpoint& checkpoint, ui64 taskId, NDqProto::TEvRestoreFromCheckpointResult::ERestoreStatus status) { + TEvRestoreFromCheckpointResult( + const NDqProto::TCheckpoint& checkpoint, + ui64 taskId, + NDqProto::TEvRestoreFromCheckpointResult::ERestoreStatus status, + const NYql::TIssues& issues) { Record.MutableCheckpoint()->CopyFrom(checkpoint); Record.SetTaskId(taskId); Record.SetStatus(status); + NYql::IssuesToMessage(issues, Record.MutableIssues()); } }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp index b3a8579a3d8d..a44f58756aa2 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp @@ -274,7 +274,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint:: switch (StateLoadPlan.GetStateType()) { case NDqProto::NDqStateLoadPlan::STATE_TYPE_EMPTY: { - EventsQueue.Send(MakeHolder(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK)); + EventsQueue.Send(MakeHolder(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK, NYql::TIssues{})); break; } case NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN: @@ -301,9 +301,12 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint:: } default: { - LOG_CP_E(checkpoint, "Unsupported state type: " - << NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()) << " (" << static_cast(StateLoadPlan.GetStateType()) << ")"); - EventsQueue.Send(MakeHolder(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR)); + auto message = TStringBuilder() << "Unsupported state type: " + << NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()) << " (" << static_cast(StateLoadPlan.GetStateType()) << ")"; + LOG_CP_E(checkpoint, message); + NYql::TIssues issues; + issues.AddIssue(message); + EventsQueue.Send(MakeHolder(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR, issues)); break; } } @@ -324,13 +327,17 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt if (!ev->Get()->Issues.Empty()) { LOG_CP_E(checkpoint, "TEvGetTaskStateResult error: " << ev->Get()->Issues.ToOneLineString()); - EventsQueue.Send(MakeHolder(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie); + EventsQueue.Send(MakeHolder(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR, ev->Get()->Issues), ev->Cookie); return; } if (ev->Get()->States.size() != taskIdsSize) { - LOG_CP_E(checkpoint, "TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize); - EventsQueue.Send(MakeHolder(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie); + + auto message = TStringBuilder() << "TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize; + LOG_CP_E(checkpoint, message); + NYql::TIssues issues; + issues.AddIssue(message); + EventsQueue.Send(MakeHolder(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR, issues), ev->Cookie); return; } @@ -352,11 +359,14 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt void TDqComputeActorCheckpoints::AfterStateLoading(const TMaybe& error) { auto& checkpoint = RestoringTaskRunnerForCheckpoint; if (error.Defined()) { - LOG_CP_E(checkpoint, "Failed to load state: " << error << ", ABORTED"); - EventsQueue.Send(MakeHolder(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR), RestoringTaskRunnerForEvent); + auto message = TStringBuilder() << "Failed to load state: " << error << ", ABORTED"; + LOG_CP_E(checkpoint, message); + NYql::TIssues issues; + issues.AddIssue(message); + EventsQueue.Send(MakeHolder(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR, issues), RestoringTaskRunnerForEvent); return; } - EventsQueue.Send(MakeHolder(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK), RestoringTaskRunnerForEvent); + EventsQueue.Send(MakeHolder(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK, NYql::TIssues{}), RestoringTaskRunnerForEvent); LOG_CP_D(checkpoint, "Checkpoint state restored"); } diff --git a/ydb/library/yql/dq/actors/protos/dq_events.proto b/ydb/library/yql/dq/actors/protos/dq_events.proto index 1a61ff2358c6..22d49898ba7b 100644 --- a/ydb/library/yql/dq/actors/protos/dq_events.proto +++ b/ydb/library/yql/dq/actors/protos/dq_events.proto @@ -204,6 +204,7 @@ message TEvRestoreFromCheckpointResult { optional TCheckpoint Checkpoint = 1; optional uint64 TaskId = 2; optional ERestoreStatus Status = 3; + repeated Ydb.Issue.IssueMessage Issues = 4; optional TMessageTransportMeta TransportMeta = 100; } diff --git a/ydb/tests/fq/yds/test_recovery.py b/ydb/tests/fq/yds/test_recovery.py index c9f0a4649136..f413b78bc6e6 100644 --- a/ydb/tests/fq/yds/test_recovery.py +++ b/ydb/tests/fq/yds/test_recovery.py @@ -14,6 +14,7 @@ import ydb.tests.library.common.yatest_common as yatest_common from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig +from ydb.tests.tools.fq_runner.fq_client import StreamingDisposition from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase from ydb.tests.tools.datastreams_helpers.control_plane import create_stream @@ -391,3 +392,44 @@ def test_ic_disconnection(self, client): time.sleep(yatest_common.plain_or_under_sanitizer(0.5, 2)) close_ic_sessions_future.wait() + + @yq_v1 + def test_program_state_recovery_error_if_no_states(self, client, kikimr): + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + kikimr.control_plane.wait_bootstrap(node_index) + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + kikimr.control_plane.wait_discovery(node_index) + self.init_topics("error_if_no_states", partitions_count=1) + + sql = R''' + INSERT INTO myyds.`{output_topic}` + SELECT STREAM * FROM myyds.`{input_topic}`;'''\ + .format( + input_topic=self.input_topic, + output_topic=self.output_topic, + ) + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + + query_id = client.create_query("error_if_no_states", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + kikimr.compute_plane.wait_completed_checkpoints(query_id, self.kikimr.compute_plane.get_completed_checkpoints(query_id) + 1) + + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + kikimr.control_plane.kikimr_cluster.nodes[node_index].stop() + + session = kikimr.driver.table_client.session().create() + checkpoint_table_prefix = "/local/CheckpointCoordinatorStorage_" + kikimr.uuid + '/states' + session.transaction().execute(f"DELETE FROM `{checkpoint_table_prefix}`",commit_tx=True) + + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + kikimr.control_plane.kikimr_cluster.nodes[node_index].start() + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + kikimr.control_plane.wait_bootstrap(node_index) + + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + describe_result = client.describe_query(query_id).result + logging.debug("Describe result: {}".format(describe_result)) + describe_string = "{}".format(describe_result) + assert r"Can\'t restore: STORAGE_ERROR" in describe_string + assert r"Checkpoint is not found" in describe_string