Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3116 Add restoring issues to query status #3977

Merged
merged 6 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFro
const TString& statusName = NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_Name(status);
CC_LOG_D("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult; taskId: "<< record.GetTaskId()
<< ", checkpoint: " << checkpoint
<< ", status: " << statusName);
<< ", status: " << statusName
<< ", issues: " << record.GetIssues());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not good in common to have multiline string with issues in log. There is special logging method in TIssues, that is better to use here: https://github.com/ydb-platform/ydb/blob/main/ydb/library/yql/public/issue/yql_issue.h#L320


if (!PendingRestoreCheckpoint) {
CC_LOG_E("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint");
Expand All @@ -301,9 +302,10 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFro
}

if (status != NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_OK) {
CC_LOG_E("[" << checkpoint << "] Can't restore: " << statusName);
auto msg = TStringBuilder() << "Can't restore: " << statusName << ", " << record.GetIssues();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not good in common to have multiline string with issues in log. There is special logging method in TIssues, that is better to use here: https://github.com/ydb-platform/ydb/blob/main/ydb/library/yql/public/issue/yql_issue.h#L320

But these issues should be subissues in checkpoint coordinator error message and TEvDqFailure

CC_LOG_E("[" << checkpoint << "] " << msg);
++*Metrics.RestoringError;
NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnError(NYql::NDqProto::StatusIds::ABORTED, "Can't restore: " + statusName, {});
NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnError(NYql::NDqProto::StatusIds::ABORTED, msg, {});
return;
}

Expand Down
8 changes: 7 additions & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h>
#include <ydb/library/yql/dq/runtime/dq_transport.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
Expand Down Expand Up @@ -165,10 +166,15 @@ struct TEvDqCompute {

using TBaseEventPB::TBaseEventPB;

TEvRestoreFromCheckpointResult(const NDqProto::TCheckpoint& checkpoint, ui64 taskId, NDqProto::TEvRestoreFromCheckpointResult::ERestoreStatus status) {
TEvRestoreFromCheckpointResult(
const NDqProto::TCheckpoint& checkpoint,
ui64 taskId,
NDqProto::TEvRestoreFromCheckpointResult::ERestoreStatus status,
const NYql::TIssues& issues) {
Record.MutableCheckpoint()->CopyFrom(checkpoint);
Record.SetTaskId(taskId);
Record.SetStatus(status);
NYql::IssuesToMessage(issues, Record.MutableIssues());
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::
switch (StateLoadPlan.GetStateType()) {
case NDqProto::NDqStateLoadPlan::STATE_TYPE_EMPTY:
{
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK));
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK, NYql::TIssues{}));
break;
}
case NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN:
Expand All @@ -301,9 +301,12 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::
}
default:
{
LOG_CP_E(checkpoint, "Unsupported state type: "
<< NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()) << " (" << static_cast<int>(StateLoadPlan.GetStateType()) << ")");
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR));
auto message = TStringBuilder() << "Unsupported state type: "
<< NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()) << " (" << static_cast<int>(StateLoadPlan.GetStateType()) << ")";
LOG_CP_E(checkpoint, message);
NYql::TIssues issues;
issues.AddIssue(message);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR, issues));
break;
}
}
Expand All @@ -324,13 +327,17 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt

if (!ev->Get()->Issues.Empty()) {
LOG_CP_E(checkpoint, "TEvGetTaskStateResult error: " << ev->Get()->Issues.ToOneLineString());
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR, ev->Get()->Issues), ev->Cookie);
return;
}

if (ev->Get()->States.size() != taskIdsSize) {
LOG_CP_E(checkpoint, "TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie);

auto message = TStringBuilder() << "TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize;
LOG_CP_E(checkpoint, message);
NYql::TIssues issues;
issues.AddIssue(message);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR, issues), ev->Cookie);
return;
}

Expand All @@ -352,11 +359,14 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt
void TDqComputeActorCheckpoints::AfterStateLoading(const TMaybe<TString>& error) {
auto& checkpoint = RestoringTaskRunnerForCheckpoint;
if (error.Defined()) {
LOG_CP_E(checkpoint, "Failed to load state: " << error << ", ABORTED");
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR), RestoringTaskRunnerForEvent);
auto message = TStringBuilder() << "Failed to load state: " << error << ", ABORTED";
LOG_CP_E(checkpoint, message);
NYql::TIssues issues;
issues.AddIssue(message);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR, issues), RestoringTaskRunnerForEvent);
return;
}
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK), RestoringTaskRunnerForEvent);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK, NYql::TIssues{}), RestoringTaskRunnerForEvent);
LOG_CP_D(checkpoint, "Checkpoint state restored");
}

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/dq/actors/protos/dq_events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ message TEvRestoreFromCheckpointResult {
optional TCheckpoint Checkpoint = 1;
optional uint64 TaskId = 2;
optional ERestoreStatus Status = 3;
repeated Ydb.Issue.IssueMessage Issues = 4;

optional TMessageTransportMeta TransportMeta = 100;
}
Expand Down
42 changes: 42 additions & 0 deletions ydb/tests/fq/yds/test_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import ydb.tests.library.common.yatest_common as yatest_common
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
from ydb.tests.tools.fq_runner.fq_client import StreamingDisposition
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
from ydb.tests.tools.datastreams_helpers.control_plane import create_stream
Expand Down Expand Up @@ -391,3 +392,44 @@ def test_ic_disconnection(self, client):
time.sleep(yatest_common.plain_or_under_sanitizer(0.5, 2))

close_ic_sessions_future.wait()

@yq_v1
def test_program_state_recovery_error_if_no_states(self, client, kikimr):
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_discovery(node_index)
self.init_topics("error_if_no_states", partitions_count=1)

sql = R'''
INSERT INTO myyds.`{output_topic}`
SELECT STREAM * FROM myyds.`{input_topic}`;'''\
.format(
input_topic=self.input_topic,
output_topic=self.output_topic,
)
client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))

query_id = client.create_query("error_if_no_states", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
kikimr.compute_plane.wait_zero_checkpoint(query_id)
kikimr.compute_plane.wait_completed_checkpoints(query_id, self.kikimr.compute_plane.get_completed_checkpoints(query_id) + 1)

for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.kikimr_cluster.nodes[node_index].stop()

session = kikimr.driver.table_client.session().create()
checkpoint_table_prefix = "/local/CheckpointCoordinatorStorage_" + kikimr.uuid + '/states'
session.transaction().execute(f"DELETE FROM `{checkpoint_table_prefix}`",commit_tx=True)

for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.kikimr_cluster.nodes[node_index].start()
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)

client.wait_query_status(query_id, fq.QueryMeta.FAILED)
describe_result = client.describe_query(query_id).result
logging.debug("Describe result: {}".format(describe_result))
describe_string = "{}".format(describe_result)
assert r"Can\'t restore: STORAGE_ERROR" in describe_string
assert r"Checkpoint is not found" in describe_string
Loading