Skip to content

Commit

Permalink
YQ-2704 TCheckpointStorage failed on log (#594)
Browse files Browse the repository at this point in the history
* use context for log

* remove artifacts after other branch

* remove artifacts after other branch
  • Loading branch information
kardymonds authored Dec 26, 2023
1 parent fd9b5f5 commit 46ac4ea
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 33 deletions.
3 changes: 3 additions & 0 deletions ydb/core/fq/libs/actors/logging/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
#define LOG_STREAMS_STORAGE_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_STORAGE_SERVICE, logRecordStream)

#define LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, DEBUG, STREAMS_STORAGE_SERVICE, logRecordStream)
#define LOG_STREAMS_STORAGE_SERVICE_AS_INFO(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, INFO, STREAMS_STORAGE_SERVICE, logRecordStream)
#define LOG_STREAMS_STORAGE_SERVICE_AS_WARN(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, WARN, STREAMS_STORAGE_SERVICE, logRecordStream)
#define LOG_STREAMS_STORAGE_SERVICE_AS_ERROR(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, ERROR, STREAMS_STORAGE_SERVICE, logRecordStream)

// Component: STREAMS_SCHEDULER_SERVICE.
#define LOG_STREAMS_SCHEDULER_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_SCHEDULER_SERVICE, logRecordStream)
Expand Down
54 changes: 25 additions & 29 deletions ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@
#include <util/string/join.h>
#include <util/string/strip.h>

#define LOG_STORAGE_ASYNC_DEBUG(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_DEBUG, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream);
#define LOG_STORAGE_ASYNC_INFO(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_INFO, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream);
#define LOG_STORAGE_ASYNC_WARN(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_WARN, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream);

namespace NFq {

using namespace NActors;
Expand Down Expand Up @@ -149,11 +145,11 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvRegisterCoordinatorRequest::
auto response = std::make_unique<TEvCheckpointStorage::TEvRegisterCoordinatorResponse>();
response->Issues = issuesFuture.GetValue();
if (response->Issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString())
} else {
LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] Graph registered")
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] Graph registered")
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse")
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse")
context.Send(sender, response.release(), 0, cookie);
});
}
Expand All @@ -174,7 +170,7 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
auto issues = result.second;

if (issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString());
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString());
context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
return false;
}
Expand All @@ -185,9 +181,9 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
TStringStream ss;
ss << "[" << coordinatorId << "] [" << checkpointId << "] Graph checkpoints size limit exceeded: limit " << totalGraphCheckpointsSizeLimit << ", current checkpoints size: " << totalGraphCheckpointsSize;
auto message = ss.Str();
LOG_STORAGE_ASYNC_WARN(context, message)
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, message)
issues.AddIssue(message);
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
return false;
}
Expand Down Expand Up @@ -222,11 +218,11 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
auto issues = result.second;
auto response = std::make_unique<TEvCheckpointStorage::TEvCreateCheckpointResponse>(checkpointId, std::move(issues), result.first);
if (response->Issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString());
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString());
} else {
LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint created");
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint created");
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
context.Send(sender, response.release(), 0, cookie);
});
}
Expand All @@ -244,11 +240,11 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvSetCheckpointPendingCommitSt
auto issues = issuesFuture.GetValue();
auto response = std::make_unique<TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse>(checkpointId, std::move(issues));
if (response->Issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString())
} else {
LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'")
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'")
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse")
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse")
context.Send(sender, response.release(), 0, cookie);
});
}
Expand All @@ -268,16 +264,16 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCompleteCheckpointRequest::T
auto issues = issuesFuture.GetValue();
auto response = std::make_unique<TEvCheckpointStorage::TEvCompleteCheckpointResponse>(checkpointId, std::move(issues));
if (response->Issues) {
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString())
} else {
LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'")
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'")
if (gcEnabled) {
auto request = std::make_unique<TEvCheckpointStorage::TEvNewCheckpointSucceeded>(coordinatorId, checkpointId);
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded")
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded")
context.Send(actorGC, request.release(), 0);
}
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse")
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse")
context.Send(sender, response.release(), 0, cookie);
});
}
Expand All @@ -294,11 +290,11 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvAbortCheckpointRequest::TPtr
auto issues = issuesFuture.GetValue();
auto response = std::make_unique<TEvCheckpointStorage::TEvAbortCheckpointResponse>(checkpointId, std::move(issues));
if (response->Issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString())
} else {
LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted")
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted")
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse")
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse")
context.Send(sender, response.release(), 0, cookie);
});
}
Expand All @@ -314,9 +310,9 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvGetCheckpointsMetadataReques
auto result = futureResult.GetValue();
auto response = std::make_unique<TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse>(result.first, result.second);
if (response->Issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString())
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse")
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse")
context.Send(sender, response.release(), 0, cookie);
});
}
Expand Down Expand Up @@ -356,12 +352,12 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev)
response->Record.SetTaskId(taskId);

if (issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString())
response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR);
} else {
response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::OK);
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << checkpointId << "] Send TEvSaveTaskStateResult")
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << checkpointId << "] Send TEvSaveTaskStateResult")
context.Send(sender, response.release(), 0, cookie);
});
}
Expand All @@ -383,9 +379,9 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) {
auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvGetTaskStateResult>(checkpointId, result.second, generation);
std::swap(response->States, result.first);
if (response->Issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << checkpointId << "] Failed to get task state: taskIds: {" << JoinSeq(", ", taskIds) << "}, issues: " << response->Issues.ToString());
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << checkpointId << "] Failed to get task state: taskIds: {" << JoinSeq(", ", taskIds) << "}, issues: " << response->Issues.ToString());
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << checkpointId << "] Send TEvGetTaskStateResult");
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << checkpointId << "] Send TEvGetTaskStateResult");
context.Send(sender, response.release(), 0, cookie);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,8 @@ TFuture<TIssues> TCheckpointStorage::DeleteMarkedCheckpoints(
TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> TCheckpointStorage::GetTotalCheckpointsStateSize(const TString& graphId) {
auto result = MakeIntrusive<TGetTotalCheckpointsStateSizeContext>();
auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result](TSession session) {
[prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result,
context = NActors::TActivationContext::AsActorContext()](TSession session) {
NYdb::TParamsBuilder paramsBuilder;
paramsBuilder.AddParam("$graph_id").String(graphId).Build();
auto params = paramsBuilder.Build();
Expand All @@ -1079,13 +1080,12 @@ TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> TCheckpointStor
params,
thisPtr->DefaultExecDataQuerySettings())
.Apply(
[graphId, result](const TFuture<TDataQueryResult>& future) {
[graphId, result, context](const TFuture<TDataQueryResult>& future) {
const auto& queryResult = future.GetValue();
auto status = TStatus(queryResult);

if (!queryResult.IsSuccess()) {
LOG_STREAMS_STORAGE_SERVICE_ERROR(TStringBuilder() << "GetTotalCheckpointsStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString());
return status;
LOG_STREAMS_STORAGE_SERVICE_AS_ERROR(context, TStringBuilder() << "GetTotalCheckpointsStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString()); return status;
}

TResultSetParser parser = queryResult.GetResultSetParser(0);
Expand Down

0 comments on commit 46ac4ea

Please sign in to comment.