diff --git a/ydb/core/fq/libs/control_plane_storage/control_plane_storage_counters.cpp b/ydb/core/fq/libs/control_plane_storage/control_plane_storage_counters.cpp index a8f29302fb62..8cec85fbabca 100644 --- a/ydb/core/fq/libs/control_plane_storage/control_plane_storage_counters.cpp +++ b/ydb/core/fq/libs/control_plane_storage/control_plane_storage_counters.cpp @@ -27,6 +27,7 @@ void TRequestCommonCounters::Register(const ::NMonitoring::TDynamicCounterPtr& c InFly = requestCounters->GetCounter("InFly", false); Ok = requestCounters->GetCounter("Ok", true); Error = requestCounters->GetCounter("Error", true); + ParseProtobufError = requestCounters->GetCounter("ParseProtobufError", true); Retry = requestCounters->GetCounter("Retry", true); RequestBytes = requestCounters->GetCounter("RequestBytes", true); ResponseBytes = requestCounters->GetCounter("ResponseBytes", true); diff --git a/ydb/core/fq/libs/control_plane_storage/control_plane_storage_counters.h b/ydb/core/fq/libs/control_plane_storage/control_plane_storage_counters.h index 9d19efdfd62f..0894b54be53b 100644 --- a/ydb/core/fq/libs/control_plane_storage/control_plane_storage_counters.h +++ b/ydb/core/fq/libs/control_plane_storage/control_plane_storage_counters.h @@ -30,6 +30,7 @@ class TRequestCommonCounters: public virtual TThrRefBase { ::NMonitoring::TDynamicCounters::TCounterPtr InFly; ::NMonitoring::TDynamicCounters::TCounterPtr Ok; ::NMonitoring::TDynamicCounters::TCounterPtr Error; + ::NMonitoring::TDynamicCounters::TCounterPtr ParseProtobufError; ::NMonitoring::TDynamicCounters::TCounterPtr Retry; ::NMonitoring::TDynamicCounters::TCounterPtr RequestBytes; ::NMonitoring::TDynamicCounters::TCounterPtr ResponseBytes; diff --git a/ydb/core/fq/libs/control_plane_storage/extractors.h b/ydb/core/fq/libs/control_plane_storage/extractors.h index 725b19557927..d669825db024 100644 --- a/ydb/core/fq/libs/control_plane_storage/extractors.h +++ b/ydb/core/fq/libs/control_plane_storage/extractors.h @@ -2,8 +2,11 @@ #include "validators.h" +#include + #include + namespace NFq { template @@ -13,7 +16,8 @@ TValidationQuery CreateEntityExtractor(const TString& scope, const TString& idColumnName, const TString& tableName, std::shared_ptr> response, - const TString& tablePathPrefix) { + const TString& tablePathPrefix, + const ::NMonitoring::TDynamicCounters::TCounterPtr& parseProtobufError) { TSqlQueryBuilder queryBuilder(tablePathPrefix); queryBuilder.AddString("scope", scope); queryBuilder.AddString("id", id); @@ -22,7 +26,7 @@ TValidationQuery CreateEntityExtractor(const TString& scope, "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" + idColumnName + "` = $id;\n" ); - auto validator = [response, entityColumnName](NYdb::NTable::TDataQueryResult result) { + auto validator = [response, entityColumnName, parseProtobufError](NYdb::NTable::TDataQueryResult result) { const auto& resultSets = result.GetResultSets(); if (resultSets.size() != 1) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "internal error, result set size is not equal to 1 but equal " << resultSets.size(); @@ -34,6 +38,7 @@ TValidationQuery CreateEntityExtractor(const TString& scope, } if (!response->second.Before.ConstructInPlace().ParseFromString(*parser.ColumnParser(entityColumnName).GetOptionalString())) { + parseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message. Please contact internal support"; } return false; diff --git a/ydb/core/fq/libs/control_plane_storage/internal/rate_limiter_resources.cpp b/ydb/core/fq/libs/control_plane_storage/internal/rate_limiter_resources.cpp index 86c89e9dcbce..52fb34934638 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/rate_limiter_resources.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/rate_limiter_resources.cpp @@ -107,7 +107,10 @@ class TRateLimiterRequestActor : public TControlPlaneRequestActorRequestCounters.Common->ParseProtobufError->Inc(); + const TString error{"Error parsing proto message for query internal. Please contact internal support"}; + CPS_LOG_E(error); + ReplyWithError(error); return; } CloudId = internal.cloud_id(); @@ -115,7 +118,10 @@ class TRateLimiterRequestActor : public TControlPlaneRequestActorRequestCounters.Common->ParseProtobufError->Inc(); + const TString error{"Error parsing proto message for query. Please contact internal support"}; + CPS_LOG_E(error); + ReplyWithError(error); return; } if (i64 limit = query.content().limits().vcpu_rate_limit()) { diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp index faf8fca794dc..44127db0165a 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp @@ -124,7 +124,8 @@ std::tuple tenantInfo) + std::shared_ptr tenantInfo, + const TRequestCommonCountersPtr& commonCounters) { const auto& task = taskInternal.Task; @@ -158,11 +159,13 @@ std::tupleParseProtobufError->Inc(); throw TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support"; } const TInstant deadline = TInstant::Now() + (task.Query.content().automatic() ? std::min(automaticQueriesTtl, resultSetsTtl) : resultSetsTtl); task.Deadline = deadline; if (!task.Internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); throw TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support"; } @@ -290,7 +293,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ auto responseTasks = std::make_shared(); - auto prepareParams = [=, rootCounters=Counters.Counters, + auto prepareParams = [=, commonCounters=requestCounters.Common, actorSystem=NActors::TActivationContext::ActorSystem(), responseTasks=responseTasks, tenantInfo=ev->Get()->TenantInfo @@ -341,7 +344,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ for (size_t i = 0; i < numTasks; ++i) { auto tupleParams = MakeGetTaskUpdateQuery(tasks[i], responseTasks, now, now + Config->TaskLeaseTtl, Config->Proto.GetDisableCurrentIam(), - Config->AutomaticQueriesTtl, Config->ResultSetsTtl, tenantInfo); // using for win32 build + Config->AutomaticQueriesTtl, Config->ResultSetsTtl, tenantInfo, commonCounters); // using for win32 build auto readQuery = std::get<0>(tupleParams); auto readParams = std::get<1>(tupleParams); auto prepareParams = std::get<2>(tupleParams); diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp index f1e987e8f1ea..5caf18c7fab2 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp @@ -50,7 +50,8 @@ TPingTaskParams ConstructHardPingTask( const Fq::Private::PingTaskRequest& request, std::shared_ptr response, const TString& tablePathPrefix, const TDuration& automaticQueriesTtl, const TDuration& taskLeaseTtl, const THashMap& retryPolicies, ::NMonitoring::TDynamicCounterPtr rootCounters, - uint64_t maxRequestSize, bool dumpRawStatistics, const std::shared_ptr& finalStatus) { + uint64_t maxRequestSize, bool dumpRawStatistics, const std::shared_ptr& finalStatus, + const TRequestCommonCountersPtr& commonCounters) { auto scope = request.scope(); auto query_id = request.query_id().value(); @@ -90,9 +91,11 @@ TPingTaskParams ConstructHardPingTask( ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\""; } if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." QUERY_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\""; } if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\""; } } @@ -103,6 +106,7 @@ TPingTaskParams ConstructHardPingTask( ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " JOBS_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\""; } if (!job.ParseFromString(*parser.ColumnParser(JOB_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " JOBS_TABLE_NAME "." JOB_COLUMN_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\""; } jobId = *parser.ColumnParser(JOB_ID_COLUMN_NAME).GetOptionalString(); @@ -494,7 +498,7 @@ TPingTaskParams ConstructHardPingTask( TPingTaskParams ConstructSoftPingTask( const Fq::Private::PingTaskRequest& request, std::shared_ptr response, - const TString& tablePathPrefix, const TDuration& taskLeaseTtl) { + const TString& tablePathPrefix, const TDuration& taskLeaseTtl, const TRequestCommonCountersPtr& commonCounters) { TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "SoftPingTask(read)"); readQueryBuilder.AddString("tenant", request.tenant()); readQueryBuilder.AddString("scope", request.scope()); @@ -520,6 +524,7 @@ TPingTaskParams ConstructSoftPingTask( ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"" ; } if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\""; } } @@ -527,6 +532,7 @@ TPingTaskParams ConstructSoftPingTask( { TResultSetParser parser(resultSets[1]); if (!parser.TryNextRow()) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request.tenant() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"" ; } owner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString(); @@ -595,8 +601,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq auto pingTaskParams = DoesPingTaskUpdateQueriesTable(request) ? ConstructHardPingTask(request, response, YdbConnection->TablePathPrefix, Config->AutomaticQueriesTtl, Config->TaskLeaseTtl, Config->RetryPolicies, Counters.Counters, Config->Proto.GetMaxRequestSize(), - Config->Proto.GetDumpRawStatistics(), finalStatus) : - ConstructSoftPingTask(request, response, YdbConnection->TablePathPrefix, Config->TaskLeaseTtl); + Config->Proto.GetDumpRawStatistics(), finalStatus, requestCounters.Common) : + ConstructSoftPingTask(request, response, YdbConnection->TablePathPrefix, Config->TaskLeaseTtl, requestCounters.Common); auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared() : TDebugInfoPtr{}; auto result = ReadModifyWrite(pingTaskParams.Query, pingTaskParams.Params, pingTaskParams.Prepare, requestCounters, debugInfo); auto prepare = [response] { return *response; }; diff --git a/ydb/core/fq/libs/control_plane_storage/validators.cpp b/ydb/core/fq/libs/control_plane_storage/validators.cpp index c1296ba8e034..fb85ec52c06f 100644 --- a/ydb/core/fq/libs/control_plane_storage/validators.cpp +++ b/ydb/core/fq/libs/control_plane_storage/validators.cpp @@ -451,7 +451,8 @@ TValidationQuery CreateQueryComputeStatusValidator(const std::vectorInc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support"; } diff --git a/ydb/core/fq/libs/control_plane_storage/validators.h b/ydb/core/fq/libs/control_plane_storage/validators.h index 51688affeaf0..bc563baa17a8 100644 --- a/ydb/core/fq/libs/control_plane_storage/validators.h +++ b/ydb/core/fq/libs/control_plane_storage/validators.h @@ -110,13 +110,15 @@ TValidationQuery CreateQueryComputeStatusValidator(const std::vector TValidationQuery CreateIdempotencyKeyValidator(const TString& scope, const TString& idempotencyKey, std::shared_ptr response, - const TString& tablePathPrefix) { + const TString& tablePathPrefix, + const ::NMonitoring::TDynamicCounters::TCounterPtr& parseProtobufError) { TSqlQueryBuilder queryBuilder(tablePathPrefix); queryBuilder.AddString("idempotency_key", idempotencyKey); queryBuilder.AddString("scope", scope); @@ -125,7 +127,7 @@ TValidationQuery CreateIdempotencyKeyValidator(const TString& scope, "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" IDEMPOTENCY_KEY_COLUMN_NAME "` = $idempotency_key;\n" ); - auto validator = [response](NYdb::NTable::TDataQueryResult result) { + auto validator = [response, parseProtobufError](NYdb::NTable::TDataQueryResult result) { const auto& resultSets = result.GetResultSets(); if (resultSets.size() != 1) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "internal error, result set size is not equal to 1 but equal " << resultSets.size(); @@ -137,6 +139,7 @@ TValidationQuery CreateIdempotencyKeyValidator(const TString& scope, } if (!response->first.ParseFromString(*parser.ColumnParser(RESPONSE_COLUMN_NAME).GetOptionalString())) { + parseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for response. Please contact internal support"; } @@ -150,7 +153,8 @@ template TValidationQuery CreateIdempotencyKeyValidator(const TString& scope, const TString& idempotencyKey, std::shared_ptr> response, - const TString& tablePathPrefix) { + const TString& tablePathPrefix, + const ::NMonitoring::TDynamicCounters::TCounterPtr& parseProtobufError) { TSqlQueryBuilder queryBuilder(tablePathPrefix); queryBuilder.AddString("idempotency_key", idempotencyKey); queryBuilder.AddString("scope", scope); @@ -159,7 +163,7 @@ TValidationQuery CreateIdempotencyKeyValidator(const TString& scope, "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" IDEMPOTENCY_KEY_COLUMN_NAME "` = $idempotency_key;\n" ); - auto validator = [response](NYdb::NTable::TDataQueryResult result) { + auto validator = [response, parseProtobufError](NYdb::NTable::TDataQueryResult result) { const auto& resultSets = result.GetResultSets(); if (resultSets.size() != 1) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "internal error, result set size is not equal to 1 but equal " << resultSets.size(); @@ -171,6 +175,7 @@ TValidationQuery CreateIdempotencyKeyValidator(const TString& scope, } if (!response->first.ParseFromString(*parser.ColumnParser(RESPONSE_COLUMN_NAME).GetOptionalString())) { + parseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for response. Please contact internal support"; } diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp index 78da9f396300..7347058203bb 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -479,7 +479,11 @@ TAsyncStatus TDbRequester::Write( } return writeHandler(session); } catch (const TCodeLineException& exception) { - CPS_LOG_AS_D(*actorSystem, "Validation: " << CurrentExceptionMessage()); + if (exception.Code == TIssuesIds::INTERNAL_ERROR) { + CPS_LOG_AS_E(*actorSystem, "Validation: " << CurrentExceptionMessage()); + } else { + CPS_LOG_AS_D(*actorSystem, "Validation: " << CurrentExceptionMessage()); + } return MakeFuture(TStatus{EStatus::GENERIC_ERROR, NYql::TIssues{MakeErrorIssue(exception.Code, exception.GetRawMessage())}}); } catch (const std::exception& exception) { CPS_LOG_AS_D(*actorSystem, "Validation: " << CurrentExceptionMessage()); @@ -586,7 +590,11 @@ TAsyncStatus TDbRequester::ReadModifyWrite( return status; }); } catch (const TCodeLineException& exception) { - CPS_LOG_AS_D(*actorSystem, "Validation: " << CurrentExceptionMessage()); + if (exception.Code == TIssuesIds::INTERNAL_ERROR) { + CPS_LOG_AS_E(*actorSystem, "Validation: " << CurrentExceptionMessage()); + } else { + CPS_LOG_AS_D(*actorSystem, "Validation: " << CurrentExceptionMessage()); + } return MakeFuture(TStatus{EStatus::GENERIC_ERROR, NYql::TIssues{MakeErrorIssue(exception.Code, exception.GetRawMessage())}}); } catch (const std::exception& exception) { CPS_LOG_AS_D(*actorSystem, "Validation: " << CurrentExceptionMessage()); @@ -616,7 +624,11 @@ TAsyncStatus TDbRequester::ReadModifyWrite( } return readModifyWriteHandler(session); } catch (const TCodeLineException& exception) { - CPS_LOG_AS_D(*actorSystem, "Validation: " << CurrentExceptionMessage()); + if (exception.Code == TIssuesIds::INTERNAL_ERROR) { + CPS_LOG_AS_E(*actorSystem, "Validation: " << CurrentExceptionMessage()); + } else { + CPS_LOG_AS_D(*actorSystem, "Validation: " << CurrentExceptionMessage()); + } return MakeFuture(TStatus{EStatus::GENERIC_ERROR, NYql::TIssues{MakeErrorIssue(exception.Code, exception.GetRawMessage())}}); } catch (const std::exception& exception) { CPS_LOG_AS_D(*actorSystem, "Validation: " << CurrentExceptionMessage()); diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp index 64b0d7e6e267..5bc2b76e7986 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp @@ -125,7 +125,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateBindi TVector validators; if (idempotencyKey) { - validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix)); + validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix, requestCounters.Common->ParseProtobufError)); } validators.push_back(connectionNameUniqueValidator); @@ -248,7 +248,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListBinding const auto query = queryBuilder.Build(); auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared() : TDebugInfoPtr{}; auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); - auto prepare = [resultSets=resultSets, limit] { + auto prepare = [resultSets=resultSets, limit, commonCounters=requestCounters.Common] { if (resultSets->size() != 1) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; } @@ -258,6 +258,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListBinding while (parser.TryNextRow()) { FederatedQuery::Binding binding; if (!binding.ParseFromString(*parser.ColumnParser(BINDING_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for binding. Please contact internal support"; } FederatedQuery::BriefBinding& briefBinding = *result.add_binding(); @@ -352,7 +353,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeBin const auto query = queryBuilder.Build(); auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared() : TDebugInfoPtr{}; auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); - auto prepare = [=, resultSets=resultSets] { + auto prepare = [=, resultSets=resultSets, commonCounters=requestCounters.Common] { if (resultSets->size() != 1) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; } @@ -364,6 +365,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeBin FederatedQuery::DescribeBindingResult result; if (!result.mutable_binding()->ParseFromString(*parser.ColumnParser(BINDING_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for binding. Please contact internal support"; } @@ -444,7 +446,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyBindi ); std::shared_ptr>> response = std::make_shared>>(); - auto prepareParams = [=, config=Config](const TVector& resultSets) { + auto prepareParams = [=, config=Config, commonCounters=requestCounters.Common](const TVector& resultSets) { if (resultSets.size() != 2) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 2 but equal " << resultSets.size() << ". Please contact internal support"; } @@ -457,6 +459,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyBindi } if (!binding.ParseFromString(*parser.ColumnParser(BINDING_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for binding. Please contact internal support"; } } @@ -527,7 +530,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyBindi TVector validators; if (idempotencyKey) { - validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix)); + validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix, requestCounters.Common->ParseProtobufError)); } auto accessValidator = CreateManageAccessValidator( @@ -654,7 +657,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDeleteBindi TVector validators; if (idempotencyKey) { - validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix)); + validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix, requestCounters.Common->ParseProtobufError)); } auto accessValidator = CreateManageAccessValidator( @@ -687,7 +690,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDeleteBindi BINDING_ID_COLUMN_NAME, BINDINGS_TABLE_NAME, response, - YdbConnection->TablePathPrefix)); + YdbConnection->TablePathPrefix, + requestCounters.Common->ParseProtobufError)); const auto query = queryBuilder.Build(); auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared() : TDebugInfoPtr{}; diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp index dedcd2f43a4c..39791ea71397 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_compute_database.cpp @@ -99,7 +99,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeDat const auto query = queryBuilder.Build(); auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared() : TDebugInfoPtr{}; auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); - auto prepare = [=, resultSets=resultSets] { + auto prepare = [=, resultSets=resultSets, commonCounters=requestCounters.Common] { if (resultSets->size() != 1) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; } @@ -111,6 +111,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeDat FederatedQuery::Internal::ComputeDatabaseInternal result; if (!result.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for internal compute database. Please contact internal support"; } @@ -189,7 +190,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyDatab "WHERE `" SCOPE_COLUMN_NAME "` = $scope;" ); - auto prepareParams = [=, synchronized = ev->Get()->Synchronized](const TVector& resultSets) { + auto prepareParams = [=, synchronized = ev->Get()->Synchronized, commonCounters=requestCounters.Common](const TVector& resultSets) { if (resultSets.size() != 1) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets.size() << ". Please contact internal support"; } @@ -201,6 +202,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyDatab FederatedQuery::Internal::ComputeDatabaseInternal result; if (!result.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for internal compute database. Please contact internal support"; } diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp index 30781997842f..c999bbae6177 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp @@ -131,7 +131,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateConne TVector validators; if (idempotencyKey) { - validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix)); + validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix, requestCounters.Common->ParseProtobufError)); } validators.push_back(connectionNameUniqueValidator); validators.push_back(bindingNameUniqueValidator); @@ -264,7 +264,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListConnect const auto query = queryBuilder.Build(); auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared() : TDebugInfoPtr{}; auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); - auto prepare = [resultSets=resultSets, limit, extractSensitiveFields] { + auto prepare = [resultSets=resultSets, limit, extractSensitiveFields, commonCounters=requestCounters.Common] { if (resultSets->size() != 1) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; } @@ -274,6 +274,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListConnect while (parser.TryNextRow()) { auto& connection = *result.add_connection(); if (!connection.ParseFromString(*parser.ColumnParser(CONNECTION_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for connection. Please contact internal support"; } PrepareSensitiveFields(connection, extractSensitiveFields); @@ -355,7 +356,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeCon const auto query = queryBuilder.Build(); auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared() : TDebugInfoPtr{}; auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); - auto prepare = [=, resultSets=resultSets] { + auto prepare = [=, resultSets=resultSets, commonCounters=requestCounters.Common] { if (resultSets->size() != 1) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; } @@ -367,6 +368,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeCon } if (!result.mutable_connection()->ParseFromString(*parser.ColumnParser(CONNECTION_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for connection. Please contact internal support"; } @@ -445,7 +447,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyConne ); std::shared_ptr>> response = std::make_shared>>(); - auto prepareParams = [=, config=Config](const TVector& resultSets) { + auto prepareParams = [=, config=Config, commonCounters=requestCounters.Common](const TVector& resultSets) { if (resultSets.size() != 1) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets.size() << ". Please contact internal support"; } @@ -457,6 +459,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyConne FederatedQuery::Connection connection; if (!connection.ParseFromString(*parser.ColumnParser(CONNECTION_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for connection. Please contact internal support"; } @@ -519,7 +522,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyConne TVector validators; if (idempotencyKey) { - validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix)); + validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix, requestCounters.Common->ParseProtobufError)); } auto accessValidator = CreateManageAccessValidator( @@ -646,7 +649,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDeleteConne TVector validators; if (idempotencyKey) { - validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix)); + validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix, requestCounters.Common->ParseProtobufError)); } auto accessValidator = CreateManageAccessValidator( @@ -687,7 +690,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDeleteConne CONNECTION_ID_COLUMN_NAME, CONNECTIONS_TABLE_NAME, response, - YdbConnection->TablePathPrefix)); + YdbConnection->TablePathPrefix, + requestCounters.Common->ParseProtobufError)); const auto query = queryBuilder.Build(); auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared() : TDebugInfoPtr{}; diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h index 9cb10fcdc2fc..1a6f22c6a40b 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h @@ -176,13 +176,16 @@ struct TRequestCounters { }; template -THashMap GetEntitiesWithVisibilityPriority(const TResultSet& resultSet, const TString& columnName, bool ignorePrivateSources) +THashMap GetEntitiesWithVisibilityPriority(const TResultSet& resultSet, const TString& columnName, bool ignorePrivateSources, const TRequestCommonCountersPtr& commonCounters) { THashMap entities; TResultSetParser parser(resultSet); while (parser.TryNextRow()) { T entity; - Y_ABORT_UNLESS(entity.ParseFromString(*parser.ColumnParser(columnName).GetOptionalString())); + if (!entity.ParseFromString(*parser.ColumnParser(columnName).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); + ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for GetEntitiesWithVisibilityPriority. Please contact internal support"; + } const auto visibility = entity.content().acl().visibility(); if (ignorePrivateSources && visibility == FederatedQuery::Acl::PRIVATE) { continue; @@ -201,13 +204,16 @@ THashMap GetEntitiesWithVisibilityPriority(const TResultSet& resultS } template -TVector GetEntities(const TResultSet& resultSet, const TString& columnName, bool ignorePrivateSources) +TVector GetEntities(const TResultSet& resultSet, const TString& columnName, bool ignorePrivateSources, const TRequestCommonCountersPtr& commonCounters) { TVector entities; TResultSetParser parser(resultSet); while (parser.TryNextRow()) { T entity; - Y_ABORT_UNLESS(entity.ParseFromString(*parser.ColumnParser(columnName).GetOptionalString())); + if (!entity.ParseFromString(*parser.ColumnParser(columnName).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); + ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for GetEntities. Please contact internal support"; + } const auto visibility = entity.content().acl().visibility(); if (ignorePrivateSources && visibility == FederatedQuery::Acl::PRIVATE) { continue; diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index 7b5a824879f5..19bdf5360134 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -185,7 +185,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery ); } - auto prepareParams = [=](const TVector& resultSets) mutable { + auto prepareParams = [=, as=TActivationContext::ActorSystem(), commonCounters=requestCounters.Common](const TVector& resultSets) mutable { const size_t countSets = (idempotencyKey ? 1 : 0) + (request.execute_mode() != FederatedQuery::SAVE ? 2 : 0); if (resultSets.size() != countSets) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to " << countSets << " but equal " << resultSets.size() << ". Please contact internal support"; @@ -195,6 +195,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery TResultSetParser parser(resultSets.front()); if (parser.TryNextRow()) { if (!response->first.ParseFromString(*parser.ColumnParser(RESPONSE_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for idempotency key request. Please contact internal support"; } response->second.IdempotencyResult = true; @@ -217,7 +218,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery // TODO: move to run actor priority selection *queryInternal.mutable_compute_connection() = computeDatabase.connection(); TSet disabledConnections; - for (const auto& connection: GetEntities(resultSets[resultSets.size() - 2], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources())) { + for (const auto& connection: GetEntities(resultSets[resultSets.size() - 2], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources(), commonCounters)) { if (!Config->AvailableConnections.contains(connection.content().setting().connection_case())) { disabledConnections.insert(connection.meta().id()); continue; @@ -230,7 +231,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery } TSet connectionIds; - auto connections = GetEntitiesWithVisibilityPriority(resultSets[resultSets.size() - 2], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources()); + auto connections = GetEntitiesWithVisibilityPriority(resultSets[resultSets.size() - 2], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources(), commonCounters); for (const auto& [_, connection]: connections) { if (disabledConnections.contains(connection.meta().id())) { continue; @@ -239,7 +240,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery connectionIds.insert(connection.meta().id()); } - auto bindings = GetEntitiesWithVisibilityPriority(resultSets[resultSets.size() - 1], BINDING_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources()); + auto bindings = GetEntitiesWithVisibilityPriority(resultSets[resultSets.size() - 1], BINDING_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources(), commonCounters); for (const auto& [_, binding]: bindings) { if (!Config->AvailableBindings.contains(binding.content().setting().binding_case())) { continue; @@ -462,7 +463,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListQueries const auto read = queryBuilder.Build(); auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared() : TDebugInfoPtr{}; auto [result, resultSets] = Read(read.Sql, read.Params, requestCounters, debugInfo); - auto prepare = [resultSets=resultSets, limit] { + auto prepare = [resultSets=resultSets, limit, commonCounters=requestCounters.Common] { if (resultSets->size() != 1) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; } @@ -472,6 +473,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListQueries while (parser.TryNextRow()) { FederatedQuery::Query query; if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support"; } FederatedQuery::BriefQuery briefQuery; @@ -552,7 +554,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeQue const auto query = queryBuilder.Build(); auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared() : TDebugInfoPtr{}; auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); - auto prepare = [resultSets=resultSets, user,permissions] { + auto prepare = [resultSets=resultSets, user, permissions, commonCounters=requestCounters.Common] { if (resultSets->size() != 1) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; } @@ -564,6 +566,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeQue FederatedQuery::DescribeQueryResult result; if (!result.mutable_query()->ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support"; } @@ -579,6 +582,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeQue FederatedQuery::Internal::QueryInternal internal; if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support"; } @@ -801,7 +805,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id AND (`" EXPIRE_AT_COLUMN_NAME "` is NULL OR `" EXPIRE_AT_COLUMN_NAME "` > $now);" ); - auto prepareParams = [=, config=Config](const TVector& resultSets) { + auto prepareParams = [=, config=Config, commonCounters=requestCounters.Common](const TVector& resultSets) { const size_t countSets = 1 + (request.execute_mode() != FederatedQuery::SAVE ? 2 : 0); if (resultSets.size() != countSets) { @@ -816,11 +820,13 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery FederatedQuery::Query query; if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support"; } FederatedQuery::Internal::QueryInternal internal; if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support"; } @@ -881,7 +887,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery // TODO: move to run actor priority selection TSet disabledConnections; - for (const auto& connection: GetEntities(resultSets[resultSets.size() - 3], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources())) { + for (const auto& connection: GetEntities(resultSets[resultSets.size() - 3], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources(), commonCounters)) { if (!Config->AvailableConnections.contains(connection.content().setting().connection_case())) { disabledConnections.insert(connection.meta().id()); continue; @@ -894,7 +900,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery } TSet connectionIds; - auto connections = GetEntitiesWithVisibilityPriority(resultSets[resultSets.size() - 3], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources()); + auto connections = GetEntitiesWithVisibilityPriority(resultSets[resultSets.size() - 3], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources(), commonCounters); for (const auto& [_, connection]: connections) { if (disabledConnections.contains(connection.meta().id())) { continue; @@ -903,7 +909,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery connectionIds.insert(connection.meta().id()); } - auto bindings = GetEntitiesWithVisibilityPriority(resultSets[resultSets.size() - 2], BINDING_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources()); + auto bindings = GetEntitiesWithVisibilityPriority(resultSets[resultSets.size() - 2], BINDING_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources(), commonCounters); for (const auto& [_, binding]: bindings) { if (!Config->AvailableBindings.contains(binding.content().setting().binding_case())) { continue; @@ -1055,7 +1061,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery TVector validators; if (idempotencyKey) { - validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix)); + validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix, requestCounters.Common->ParseProtobufError)); } auto accessValidator = CreateManageAccessValidator( @@ -1163,7 +1169,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDeleteQuery TVector validators; if (idempotencyKey) { - validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix)); + validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix, requestCounters.Common->ParseProtobufError)); } auto accessValidator = CreateManageAccessValidator( @@ -1196,14 +1202,16 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDeleteQuery QUERY_ID_COLUMN_NAME, QUERIES_TABLE_NAME, response, - YdbConnection->TablePathPrefix)); + YdbConnection->TablePathPrefix, + requestCounters.Common->ParseProtobufError)); validators.push_back(CreateQueryComputeStatusValidator( { FederatedQuery::QueryMeta::STARTING, FederatedQuery::QueryMeta::ABORTED_BY_USER, FederatedQuery::QueryMeta::ABORTED_BY_SYSTEM, FederatedQuery::QueryMeta::COMPLETED, FederatedQuery::QueryMeta::FAILED }, scope, queryId, "Can't delete running query", - YdbConnection->TablePathPrefix)); + YdbConnection->TablePathPrefix, + requestCounters.Common->ParseProtobufError)); const auto query = queryBuilder.Build(); auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared() : TDebugInfoPtr{}; @@ -1275,7 +1283,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvControlQuer "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" JOB_ID_COLUMN_NAME "` = $job_id;\n" ); - auto prepareParams = [=, config=Config](const TVector& resultSets) { + auto prepareParams = [=, config=Config, commonCounters=requestCounters.Common](const TVector& resultSets) { if (resultSets.size() != 2) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 2 but equal " << resultSets.size() << ". Please contact internal support"; } @@ -1291,10 +1299,12 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvControlQuer } if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support"; } if (!queryInternal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support"; } tenantName = *parser.ColumnParser(TENANT_COLUMN_NAME).GetOptionalString(); @@ -1309,6 +1319,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvControlQuer } if (!job.ParseFromString(parser.ColumnParser(JOB_COLUMN_NAME).GetOptionalString().GetOrElse(""))) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for job. Please contact internal support"; } @@ -1407,7 +1418,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvControlQuer TVector validators; if (idempotencyKey) { - validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix)); + validators.push_back(CreateIdempotencyKeyValidator(scope, idempotencyKey, response, YdbConnection->TablePathPrefix, requestCounters.Common->ParseProtobufError)); } auto accessValidator = CreateManageAccessValidator( @@ -1512,7 +1523,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetResultDa const auto query = queryBuilder.Build(); auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared() : TDebugInfoPtr{}; auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); - auto prepare = [resultSets=resultSets, resultSetIndex, user, permissions] { + auto prepare = [resultSets=resultSets, resultSetIndex, user, permissions, commonCounters=requestCounters.Common] { if (resultSets->size() != 2) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 2 but equal " << resultSets->size() << ". Please contact internal support"; } @@ -1528,6 +1539,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetResultDa FederatedQuery::Query query; if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support"; } @@ -1564,6 +1576,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetResultDa TResultSetParser parser(resultSet); while (parser.TryNextRow()) { if (!resultSetProto.add_rows()->ParseFromString(*parser.ColumnParser(RESULT_SET_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for row. Please contact internal support"; } } @@ -1670,7 +1683,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListJobsReq const auto query = queryBuilder.Build(); auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared() : TDebugInfoPtr{}; auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); - auto prepare = [resultSets=resultSets, limit] { + auto prepare = [resultSets=resultSets, limit, commonCounters=requestCounters.Common] { if (resultSets->size() != 1) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; } @@ -1680,6 +1693,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListJobsReq while (parser.TryNextRow()) { FederatedQuery::Job job; if (!job.ParseFromString(*parser.ColumnParser(JOB_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for job. Please contact internal support"; } const TString mergedId = job.meta().id() + "-" + job.query_meta().common().id(); @@ -1771,7 +1785,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeJob auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared() : TDebugInfoPtr{}; auto [result, resultSets] = Read(query.Sql, query.Params, requestCounters, debugInfo); - auto prepare = [=, id=request.job_id(), resultSets=resultSets] { + auto prepare = [=, id=request.job_id(), resultSets=resultSets, commonCounters=requestCounters.Common] { if (resultSets->size() != 1) { ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets->size() << ". Please contact internal support"; } @@ -1782,6 +1796,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeJob ythrow TCodeLineException(TIssuesIds::ACCESS_DENIED) << "Job does not exist or permission denied. Please check the job id or your access rights"; } if (!result.mutable_job()->ParseFromString(*parser.ColumnParser(JOB_COLUMN_NAME).GetOptionalString())) { + commonCounters->ParseProtobufError->Inc(); ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for job. Please contact internal support"; } auto visibility = static_cast(*parser.ColumnParser(VISIBILITY_COLUMN_NAME).GetOptionalInt64());