Skip to content

Commit

Permalink
Merge e3b3419 into ca6b15f
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Apr 22, 2024
2 parents ca6b15f + e3b3419 commit afcaca2
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 11 deletions.
8 changes: 7 additions & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h>
#include <ydb/library/yql/dq/runtime/dq_transport.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
Expand Down Expand Up @@ -165,10 +166,15 @@ struct TEvDqCompute {

using TBaseEventPB::TBaseEventPB;

TEvRestoreFromCheckpointResult(const NDqProto::TCheckpoint& checkpoint, ui64 taskId, NDqProto::TEvRestoreFromCheckpointResult::ERestoreStatus status) {
TEvRestoreFromCheckpointResult(
const NDqProto::TCheckpoint& checkpoint,
ui64 taskId,
NDqProto::TEvRestoreFromCheckpointResult::ERestoreStatus status,
const NYql::TIssues& issues) {
Record.MutableCheckpoint()->CopyFrom(checkpoint);
Record.SetTaskId(taskId);
Record.SetStatus(status);
NYql::IssuesToMessage(issues, Record.MutableIssues());
}
};

Expand Down
30 changes: 20 additions & 10 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::
switch (StateLoadPlan.GetStateType()) {
case NDqProto::NDqStateLoadPlan::STATE_TYPE_EMPTY:
{
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK));
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK, NYql::TIssues{}));
break;
}
case NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN:
Expand All @@ -301,9 +301,12 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::
}
default:
{
LOG_CP_E(checkpoint, "Unsupported state type: "
<< NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()) << " (" << static_cast<int>(StateLoadPlan.GetStateType()) << ")");
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR));
auto message = TStringBuilder() << "Unsupported state type: "
<< NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()) << " (" << static_cast<int>(StateLoadPlan.GetStateType()) << ")";
LOG_CP_E(checkpoint, message);
NYql::TIssues issues;
issues.AddIssue(message);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR, issues));
break;
}
}
Expand All @@ -324,13 +327,17 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt

if (!ev->Get()->Issues.Empty()) {
LOG_CP_E(checkpoint, "TEvGetTaskStateResult error: " << ev->Get()->Issues.ToOneLineString());
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR, ev->Get()->Issues), ev->Cookie);
return;
}

if (ev->Get()->States.size() != taskIdsSize) {
LOG_CP_E(checkpoint, "TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie);

auto message = TStringBuilder() << "TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize;
LOG_CP_E(checkpoint, message);
NYql::TIssues issues;
issues.AddIssue(message);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR, issues), ev->Cookie);
return;
}

Expand All @@ -352,11 +359,14 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt
void TDqComputeActorCheckpoints::AfterStateLoading(const TMaybe<TString>& error) {
auto& checkpoint = RestoringTaskRunnerForCheckpoint;
if (error.Defined()) {
LOG_CP_E(checkpoint, "Failed to load state: " << error << ", ABORTED");
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR), RestoringTaskRunnerForEvent);
auto message = TStringBuilder() << "Failed to load state: " << error << ", ABORTED";
LOG_CP_E(checkpoint, message);
NYql::TIssues issues;
issues.AddIssue(message);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR, issues), RestoringTaskRunnerForEvent);
return;
}
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK), RestoringTaskRunnerForEvent);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK, NYql::TIssues{}), RestoringTaskRunnerForEvent);
LOG_CP_D(checkpoint, "Checkpoint state restored");
}

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/dq/actors/protos/dq_events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ message TEvRestoreFromCheckpointResult {
optional TCheckpoint Checkpoint = 1;
optional uint64 TaskId = 2;
optional ERestoreStatus Status = 3;
repeated Ydb.Issue.IssueMessage Issues = 4;

optional TMessageTransportMeta TransportMeta = 100;
}
Expand Down
42 changes: 42 additions & 0 deletions ydb/tests/fq/yds/test_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import ydb.tests.library.common.yatest_common as yatest_common
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
from ydb.tests.tools.fq_runner.fq_client import StreamingDisposition
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
from ydb.tests.tools.datastreams_helpers.control_plane import create_stream
Expand Down Expand Up @@ -391,3 +392,44 @@ def test_ic_disconnection(self, client):
time.sleep(yatest_common.plain_or_under_sanitizer(0.5, 2))

close_ic_sessions_future.wait()

@yq_v1
def test_program_state_recovery_error_if_no_states(self, client, kikimr):
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_discovery(node_index)
self.init_topics("error_if_no_states", partitions_count=1)

sql = R'''
INSERT INTO myyds.`{output_topic}`
SELECT STREAM * FROM myyds.`{input_topic}`;'''\
.format(
input_topic=self.input_topic,
output_topic=self.output_topic,
)
client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))

query_id = client.create_query("error_if_no_states", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
kikimr.compute_plane.wait_zero_checkpoint(query_id)
kikimr.compute_plane.wait_completed_checkpoints(query_id, self.kikimr.compute_plane.get_completed_checkpoints(query_id) + 1)

for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.kikimr_cluster.nodes[node_index].stop()

session = kikimr.driver.table_client.session().create()
checkpoint_table_prefix = "/local/CheckpointCoordinatorStorage_" + kikimr.uuid + '/states'
session.transaction().execute(f"DELETE FROM `{checkpoint_table_prefix}`",commit_tx=True)

for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.kikimr_cluster.nodes[node_index].start()
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)

client.wait_query_status(query_id, fq.QueryMeta.FAILED)
describe_result = client.describe_query(query_id).result
logging.debug("Describe result: {}".format(describe_result))
describe_string = "{}".format(describe_result)
assert r"Can\'t restore: STORAGE_ERROR" in describe_string
assert r"Checkpoint is not found" in describe_string

0 comments on commit afcaca2

Please sign in to comment.