Skip to content

Commit

Permalink
YQ-3116 Add restoring issues to query status (ydb-platform#3977)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds committed May 17, 2024
1 parent 3b35c2f commit d58f69e
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 14 deletions.
8 changes: 5 additions & 3 deletions ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFro
const TString& statusName = NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_Name(status);
CC_LOG_D("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult; taskId: "<< record.GetTaskId()
<< ", checkpoint: " << checkpoint
<< ", status: " << statusName);
<< ", status: " << statusName
<< ", issues: " << NYql::IssuesFromMessageAsString(record.GetIssues()));

if (!PendingRestoreCheckpoint) {
CC_LOG_E("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint");
Expand All @@ -299,9 +300,10 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFro
}

if (status != NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_OK) {
CC_LOG_E("[" << checkpoint << "] Can't restore: " << statusName);
auto msg = TStringBuilder() << "Can't restore: " << statusName << ", " << NYql::IssuesFromMessageAsString(record.GetIssues());
CC_LOG_E("[" << checkpoint << "] " << msg);
++*Metrics.RestoringError;
NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnError(NYql::NDqProto::StatusIds::ABORTED, "Can't restore: " + statusName, {});
NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnError(NYql::NDqProto::StatusIds::ABORTED, msg, {});
return;
}

Expand Down
8 changes: 7 additions & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h>
#include <ydb/library/yql/dq/runtime/dq_transport.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
Expand Down Expand Up @@ -164,10 +165,15 @@ struct TEvDqCompute {

using TBaseEventPB::TBaseEventPB;

TEvRestoreFromCheckpointResult(const NDqProto::TCheckpoint& checkpoint, ui64 taskId, NDqProto::TEvRestoreFromCheckpointResult::ERestoreStatus status) {
TEvRestoreFromCheckpointResult(
const NDqProto::TCheckpoint& checkpoint,
ui64 taskId,
NDqProto::TEvRestoreFromCheckpointResult::ERestoreStatus status,
const NYql::TIssues& issues) {
Record.MutableCheckpoint()->CopyFrom(checkpoint);
Record.SetTaskId(taskId);
Record.SetStatus(status);
NYql::IssuesToMessage(issues, Record.MutableIssues());
}
};

Expand Down
30 changes: 20 additions & 10 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::
switch (StateLoadPlan.GetStateType()) {
case NDqProto::NDqStateLoadPlan::STATE_TYPE_EMPTY:
{
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK));
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK, NYql::TIssues{}));
break;
}
case NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN:
Expand All @@ -301,9 +301,12 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::
}
default:
{
LOG_CP_E(checkpoint, "Unsupported state type: "
<< NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()) << " (" << static_cast<int>(StateLoadPlan.GetStateType()) << ")");
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR));
auto message = TStringBuilder() << "Unsupported state type: "
<< NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()) << " (" << static_cast<int>(StateLoadPlan.GetStateType()) << ")";
LOG_CP_E(checkpoint, message);
NYql::TIssues issues;
issues.AddIssue(message);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR, issues));
break;
}
}
Expand All @@ -324,13 +327,17 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt

if (!ev->Get()->Issues.Empty()) {
LOG_CP_E(checkpoint, "TEvGetTaskStateResult error: " << ev->Get()->Issues.ToOneLineString());
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR, ev->Get()->Issues), ev->Cookie);
return;
}

if (ev->Get()->States.size() != taskIdsSize) {
LOG_CP_E(checkpoint, "TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie);

auto message = TStringBuilder() << "TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize;
LOG_CP_E(checkpoint, message);
NYql::TIssues issues;
issues.AddIssue(message);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR, issues), ev->Cookie);
return;
}

Expand All @@ -352,11 +359,14 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt
void TDqComputeActorCheckpoints::AfterStateLoading(const TMaybe<TString>& error) {
auto& checkpoint = RestoringTaskRunnerForCheckpoint;
if (error.Defined()) {
LOG_CP_E(checkpoint, "Failed to load state: " << error << "ABORTED");
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR), RestoringTaskRunnerForEvent);
auto message = TStringBuilder() << "Failed to load state: " << error << ", ABORTED";
LOG_CP_E(checkpoint, message);
NYql::TIssues issues;
issues.AddIssue(message);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR, issues), RestoringTaskRunnerForEvent);
return;
}
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK), RestoringTaskRunnerForEvent);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK, NYql::TIssues{}), RestoringTaskRunnerForEvent);
LOG_CP_D(checkpoint, "Checkpoint state restored");
}

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/dq/actors/protos/dq_events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ message TEvRestoreFromCheckpointResult {
optional TCheckpoint Checkpoint = 1;
optional uint64 TaskId = 2;
optional ERestoreStatus Status = 3;
repeated Ydb.Issue.IssueMessage Issues = 4;

optional TMessageTransportMeta TransportMeta = 100;
}
Expand Down
41 changes: 41 additions & 0 deletions ydb/tests/fq/yds/test_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,44 @@ def test_ic_disconnection(self, client):
time.sleep(yatest_common.plain_or_under_sanitizer(0.5, 2))

close_ic_sessions_future.wait()

@yq_v1
def test_program_state_recovery_error_if_no_states(self, client, kikimr):
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_discovery(node_index)
self.init_topics("error_if_no_states", partitions_count=1)

sql = R'''
INSERT INTO myyds.`{output_topic}`
SELECT STREAM * FROM myyds.`{input_topic}`;'''\
.format(
input_topic=self.input_topic,
output_topic=self.output_topic,
)
client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))

query_id = client.create_query("error_if_no_states", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
kikimr.compute_plane.wait_zero_checkpoint(query_id)
kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1)

for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.kikimr_cluster.nodes[node_index].stop()

session = kikimr.driver.table_client.session().create()
checkpoint_table_prefix = "/local/CheckpointCoordinatorStorage_" + kikimr.uuid + '/states'
session.transaction().execute(f"DELETE FROM `{checkpoint_table_prefix}`", commit_tx=True)

for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.kikimr_cluster.nodes[node_index].start()
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)

client.wait_query_status(query_id, fq.QueryMeta.FAILED)
describe_result = client.describe_query(query_id).result
logging.debug("Describe result: {}".format(describe_result))
describe_string = "{}".format(describe_result)
assert r"Can\'t restore: STORAGE_ERROR" in describe_string
assert r"Checkpoint is not found" in describe_string

0 comments on commit d58f69e

Please sign in to comment.