diff --git a/ydb/core/fq/libs/actors/logging/log.h b/ydb/core/fq/libs/actors/logging/log.h index b7ecaf35c5d9..d5513bc49b78 100644 --- a/ydb/core/fq/libs/actors/logging/log.h +++ b/ydb/core/fq/libs/actors/logging/log.h @@ -43,6 +43,9 @@ #define LOG_STREAMS_STORAGE_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_STORAGE_SERVICE, logRecordStream) #define LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, DEBUG, STREAMS_STORAGE_SERVICE, logRecordStream) +#define LOG_STREAMS_STORAGE_SERVICE_AS_INFO(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, INFO, STREAMS_STORAGE_SERVICE, logRecordStream) +#define LOG_STREAMS_STORAGE_SERVICE_AS_WARN(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, WARN, STREAMS_STORAGE_SERVICE, logRecordStream) +#define LOG_STREAMS_STORAGE_SERVICE_AS_ERROR(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, ERROR, STREAMS_STORAGE_SERVICE, logRecordStream) // Component: STREAMS_SCHEDULER_SERVICE. #define LOG_STREAMS_SCHEDULER_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_SCHEDULER_SERVICE, logRecordStream) diff --git a/ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp b/ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp index d27c4e8f7c76..d44002580ba8 100644 --- a/ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp +++ b/ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp @@ -21,10 +21,6 @@ #include #include -#define LOG_STORAGE_ASYNC_DEBUG(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_DEBUG, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream); -#define LOG_STORAGE_ASYNC_INFO(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_INFO, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream); -#define LOG_STORAGE_ASYNC_WARN(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_WARN, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream); - namespace NFq { using namespace NActors; @@ -90,6 +86,10 @@ static void FillDefaultParameters(NConfig::TCheckpointCoordinatorConfig& checkpo limits.SetMaxTaskStateSizeBytes(1099511627776); } + if (!limits.GetMaxRowSizeBytes()) { + limits.SetMaxRowSizeBytes(8000000); + } + if (!checkpointCoordinatorConfig.GetStorage().GetToken() && checkpointCoordinatorConfig.GetStorage().GetOAuthFile()) { checkpointCoordinatorConfig.MutableStorage()->SetToken(StripString(TFileInput(checkpointCoordinatorConfig.GetStorage().GetOAuthFile()).ReadAll())); } @@ -119,7 +119,7 @@ void TStorageProxy::Bootstrap() { LOG_STREAMS_STORAGE_SERVICE_ERROR("Failed to init checkpoint storage: " << issues.ToOneLineString()); } - StateStorage = NewYdbStateStorage(StorageConfig, CredentialsProviderFactory, YqSharedResources); + StateStorage = NewYdbStateStorage(Config, CredentialsProviderFactory, YqSharedResources); issues = StateStorage->Init().GetValueSync(); if (!issues.Empty()) { LOG_STREAMS_STORAGE_SERVICE_ERROR("Failed to init checkpoint state storage: " << issues.ToOneLineString()); @@ -149,11 +149,11 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvRegisterCoordinatorRequest:: auto response = std::make_unique(); response->Issues = issuesFuture.GetValue(); if (response->Issues) { - LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString()) + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString()) } else { - LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] Graph registered") + LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] Graph registered") } - LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse") + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse") context.Send(sender, response.release(), 0, cookie); }); } @@ -174,7 +174,7 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt auto issues = result.second; if (issues) { - LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString()); + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString()); context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie); return false; } @@ -185,9 +185,9 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt TStringStream ss; ss << "[" << coordinatorId << "] [" << checkpointId << "] Graph checkpoints size limit exceeded: limit " << totalGraphCheckpointsSizeLimit << ", current checkpoints size: " << totalGraphCheckpointsSize; auto message = ss.Str(); - LOG_STORAGE_ASYNC_WARN(context, message) + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, message) issues.AddIssue(message); - LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse"); + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse"); context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie); return false; } @@ -222,11 +222,11 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt auto issues = result.second; auto response = std::make_unique(checkpointId, std::move(issues), result.first); if (response->Issues) { - LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString()); + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString()); } else { - LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint created"); + LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint created"); } - LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse"); + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse"); context.Send(sender, response.release(), 0, cookie); }); } @@ -244,11 +244,11 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvSetCheckpointPendingCommitSt auto issues = issuesFuture.GetValue(); auto response = std::make_unique(checkpointId, std::move(issues)); if (response->Issues) { - LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString()) + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString()) } else { - LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'") + LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'") } - LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse") + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse") context.Send(sender, response.release(), 0, cookie); }); } @@ -268,16 +268,16 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCompleteCheckpointRequest::T auto issues = issuesFuture.GetValue(); auto response = std::make_unique(checkpointId, std::move(issues)); if (response->Issues) { - LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString()) + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString()) } else { - LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'") + LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'") if (gcEnabled) { auto request = std::make_unique(coordinatorId, checkpointId); - LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded") + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded") context.Send(actorGC, request.release(), 0); } } - LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse") + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse") context.Send(sender, response.release(), 0, cookie); }); } @@ -294,11 +294,11 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvAbortCheckpointRequest::TPtr auto issues = issuesFuture.GetValue(); auto response = std::make_unique(checkpointId, std::move(issues)); if (response->Issues) { - LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString()) + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString()) } else { - LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted") + LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted") } - LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse") + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse") context.Send(sender, response.release(), 0, cookie); }); } @@ -314,9 +314,9 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvGetCheckpointsMetadataReques auto result = futureResult.GetValue(); auto response = std::make_unique(result.first, result.second); if (response->Issues) { - LOG_STORAGE_ASYNC_WARN(context, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString()) + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString()) } - LOG_STORAGE_ASYNC_DEBUG(context, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse") + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse") context.Send(sender, response.release(), 0, cookie); }); } @@ -356,12 +356,12 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev) response->Record.SetTaskId(taskId); if (issues) { - LOG_STORAGE_ASYNC_WARN(context, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString()) + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString()) response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR); } else { response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::OK); } - LOG_STORAGE_ASYNC_DEBUG(context, "[" << checkpointId << "] Send TEvSaveTaskStateResult") + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << checkpointId << "] Send TEvSaveTaskStateResult") context.Send(sender, response.release(), 0, cookie); }); } @@ -383,9 +383,9 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) { auto response = std::make_unique(checkpointId, result.second, generation); std::swap(response->States, result.first); if (response->Issues) { - LOG_STORAGE_ASYNC_WARN(context, "[" << checkpointId << "] Failed to get task state: taskIds: {" << JoinSeq(", ", taskIds) << "}, issues: " << response->Issues.ToString()); + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << checkpointId << "] Failed to get task state: taskIds: {" << JoinSeq(", ", taskIds) << "}, issues: " << response->Issues.ToString()); } - LOG_STORAGE_ASYNC_DEBUG(context, "[" << checkpointId << "] Send TEvGetTaskStateResult"); + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << checkpointId << "] Send TEvGetTaskStateResult"); context.Send(sender, response.release(), 0, cookie); }); } diff --git a/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp b/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp index 638c47c395e3..3f4a228b93f2 100644 --- a/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp +++ b/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp @@ -1057,7 +1057,8 @@ TFuture TCheckpointStorage::DeleteMarkedCheckpoints( TFuture TCheckpointStorage::GetTotalCheckpointsStateSize(const TString& graphId) { auto result = MakeIntrusive(); auto future = YdbConnection->TableClient.RetryOperation( - [prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result](TSession session) { + [prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result, + context = NActors::TActivationContext::AsActorContext()](TSession session) { NYdb::TParamsBuilder paramsBuilder; paramsBuilder.AddParam("$graph_id").String(graphId).Build(); auto params = paramsBuilder.Build(); @@ -1079,13 +1080,12 @@ TFuture TCheckpointStor params, thisPtr->DefaultExecDataQuerySettings()) .Apply( - [graphId, result](const TFuture& future) { + [graphId, result, context](const TFuture& future) { const auto& queryResult = future.GetValue(); auto status = TStatus(queryResult); if (!queryResult.IsSuccess()) { - LOG_STREAMS_STORAGE_SERVICE_ERROR(TStringBuilder() << "GetTotalCheckpointsStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString()); - return status; + LOG_STREAMS_STORAGE_SERVICE_AS_ERROR(context, TStringBuilder() << "GetTotalCheckpointsStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString()); return status; } TResultSetParser parser = queryResult.GetResultSetParser(0);