Skip to content

Commit

Permalink
sensor for protobuf parsing errors
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg committed Nov 30, 2023
1 parent 84f2d6a commit ae76055
Show file tree
Hide file tree
Showing 14 changed files with 132 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ void TRequestCommonCounters::Register(const ::NMonitoring::TDynamicCounterPtr& c
InFly = requestCounters->GetCounter("InFly", false);
Ok = requestCounters->GetCounter("Ok", true);
Error = requestCounters->GetCounter("Error", true);
ParseProtobufError = requestCounters->GetCounter("ParseProtobufError", true);
Retry = requestCounters->GetCounter("Retry", true);
RequestBytes = requestCounters->GetCounter("RequestBytes", true);
ResponseBytes = requestCounters->GetCounter("ResponseBytes", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class TRequestCommonCounters: public virtual TThrRefBase {
::NMonitoring::TDynamicCounters::TCounterPtr InFly;
::NMonitoring::TDynamicCounters::TCounterPtr Ok;
::NMonitoring::TDynamicCounters::TCounterPtr Error;
::NMonitoring::TDynamicCounters::TCounterPtr ParseProtobufError;
::NMonitoring::TDynamicCounters::TCounterPtr Retry;
::NMonitoring::TDynamicCounters::TCounterPtr RequestBytes;
::NMonitoring::TDynamicCounters::TCounterPtr ResponseBytes;
Expand Down
9 changes: 7 additions & 2 deletions ydb/core/fq/libs/control_plane_storage/extractors.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

#include "validators.h"

#include <library/cpp/monlib/dynamic_counters/counters.h>

#include <ydb/core/fq/libs/db_schema/db_schema.h>


namespace NFq {

template<typename T, typename A>
Expand All @@ -13,7 +16,8 @@ TValidationQuery CreateEntityExtractor(const TString& scope,
const TString& idColumnName,
const TString& tableName,
std::shared_ptr<std::pair<T, A>> response,
const TString& tablePathPrefix) {
const TString& tablePathPrefix,
const ::NMonitoring::TDynamicCounters::TCounterPtr& parseProtobufError) {
TSqlQueryBuilder queryBuilder(tablePathPrefix);
queryBuilder.AddString("scope", scope);
queryBuilder.AddString("id", id);
Expand All @@ -22,7 +26,7 @@ TValidationQuery CreateEntityExtractor(const TString& scope,
"WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" + idColumnName + "` = $id;\n"
);

auto validator = [response, entityColumnName](NYdb::NTable::TDataQueryResult result) {
auto validator = [response, entityColumnName, parseProtobufError](NYdb::NTable::TDataQueryResult result) {
const auto& resultSets = result.GetResultSets();
if (resultSets.size() != 1) {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "internal error, result set size is not equal to 1 but equal " << resultSets.size();
Expand All @@ -34,6 +38,7 @@ TValidationQuery CreateEntityExtractor(const TString& scope,
}

if (!response->second.Before.ConstructInPlace().ParseFromString(*parser.ColumnParser(entityColumnName).GetOptionalString())) {
parseProtobufError->Inc();
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message. Please contact internal support";
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,21 @@ class TRateLimiterRequestActor : public TControlPlaneRequestActor<TRequest, TRes

FederatedQuery::Internal::QueryInternal internal;
if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
ReplyWithError(TStringBuilder() << "Error parsing proto message for query internal. Please contact internal support");
this->RequestCounters.Common->ParseProtobufError->Inc();
const TString error{"Error parsing proto message for query internal. Please contact internal support"};
CPS_LOG_E(error);
ReplyWithError(error);
return;
}
CloudId = internal.cloud_id();

if constexpr (TDerived::IsCreateRequest) {
FederatedQuery::Query query;
if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) {
ReplyWithError(TStringBuilder() << "Error parsing proto message for query. Please contact internal support");
this->RequestCounters.Common->ParseProtobufError->Inc();
const TString error{"Error parsing proto message for query. Please contact internal support"};
CPS_LOG_E(error);
ReplyWithError(error);
return;
}
if (i64 limit = query.content().limits().vcpu_rate_limit()) {
Expand Down
9 changes: 6 additions & 3 deletions ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam
bool disableCurrentIam,
const TDuration& automaticQueriesTtl,
const TDuration& resultSetsTtl,
std::shared_ptr<TTenantInfo> tenantInfo)
std::shared_ptr<TTenantInfo> tenantInfo,
const TRequestCommonCountersPtr& commonCounters)
{
const auto& task = taskInternal.Task;

Expand Down Expand Up @@ -158,11 +159,13 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam
task.Generation = parser.ColumnParser(GENERATION_COLUMN_NAME).GetOptionalUint64().GetOrElse(0) + 1;

if (!task.Query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) {
commonCounters->ParseProtobufError->Inc();
throw TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support";
}
const TInstant deadline = TInstant::Now() + (task.Query.content().automatic() ? std::min(automaticQueriesTtl, resultSetsTtl) : resultSetsTtl);
task.Deadline = deadline;
if (!task.Internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
commonCounters->ParseProtobufError->Inc();
throw TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support";
}

Expand Down Expand Up @@ -290,7 +293,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ

auto responseTasks = std::make_shared<TResponseTasks>();

auto prepareParams = [=, rootCounters=Counters.Counters,
auto prepareParams = [=, commonCounters=requestCounters.Common,
actorSystem=NActors::TActivationContext::ActorSystem(),
responseTasks=responseTasks,
tenantInfo=ev->Get()->TenantInfo
Expand Down Expand Up @@ -341,7 +344,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
for (size_t i = 0; i < numTasks; ++i) {
auto tupleParams = MakeGetTaskUpdateQuery(tasks[i],
responseTasks, now, now + Config->TaskLeaseTtl, Config->Proto.GetDisableCurrentIam(),
Config->AutomaticQueriesTtl, Config->ResultSetsTtl, tenantInfo); // using for win32 build
Config->AutomaticQueriesTtl, Config->ResultSetsTtl, tenantInfo, commonCounters); // using for win32 build
auto readQuery = std::get<0>(tupleParams);
auto readParams = std::get<1>(tupleParams);
auto prepareParams = std::get<2>(tupleParams);
Expand Down
14 changes: 10 additions & 4 deletions ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ TPingTaskParams ConstructHardPingTask(
const Fq::Private::PingTaskRequest& request, std::shared_ptr<Fq::Private::PingTaskResult> response,
const TString& tablePathPrefix, const TDuration& automaticQueriesTtl, const TDuration& taskLeaseTtl,
const THashMap<ui64, TRetryPolicyItem>& retryPolicies, ::NMonitoring::TDynamicCounterPtr rootCounters,
uint64_t maxRequestSize, bool dumpRawStatistics, const std::shared_ptr<TFinalStatus>& finalStatus) {
uint64_t maxRequestSize, bool dumpRawStatistics, const std::shared_ptr<TFinalStatus>& finalStatus,
const TRequestCommonCountersPtr& commonCounters) {

auto scope = request.scope();
auto query_id = request.query_id().value();
Expand Down Expand Up @@ -90,9 +91,11 @@ TPingTaskParams ConstructHardPingTask(
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"";
}
if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) {
commonCounters->ParseProtobufError->Inc();
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." QUERY_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\"";
}
if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
commonCounters->ParseProtobufError->Inc();
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\"";
}
}
Expand All @@ -103,6 +106,7 @@ TPingTaskParams ConstructHardPingTask(
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " JOBS_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"";
}
if (!job.ParseFromString(*parser.ColumnParser(JOB_COLUMN_NAME).GetOptionalString())) {
commonCounters->ParseProtobufError->Inc();
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " JOBS_TABLE_NAME "." JOB_COLUMN_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"";
}
jobId = *parser.ColumnParser(JOB_ID_COLUMN_NAME).GetOptionalString();
Expand Down Expand Up @@ -494,7 +498,7 @@ TPingTaskParams ConstructHardPingTask(

TPingTaskParams ConstructSoftPingTask(
const Fq::Private::PingTaskRequest& request, std::shared_ptr<Fq::Private::PingTaskResult> response,
const TString& tablePathPrefix, const TDuration& taskLeaseTtl) {
const TString& tablePathPrefix, const TDuration& taskLeaseTtl, const TRequestCommonCountersPtr& commonCounters) {
TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "SoftPingTask(read)");
readQueryBuilder.AddString("tenant", request.tenant());
readQueryBuilder.AddString("scope", request.scope());
Expand All @@ -520,13 +524,15 @@ TPingTaskParams ConstructSoftPingTask(
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"" ;
}
if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
commonCounters->ParseProtobufError->Inc();
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\"";
}
}

{
TResultSetParser parser(resultSets[1]);
if (!parser.TryNextRow()) {
commonCounters->ParseProtobufError->Inc();
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request.tenant() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"" ;
}
owner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString();
Expand Down Expand Up @@ -595,8 +601,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq
auto pingTaskParams = DoesPingTaskUpdateQueriesTable(request) ?
ConstructHardPingTask(request, response, YdbConnection->TablePathPrefix, Config->AutomaticQueriesTtl,
Config->TaskLeaseTtl, Config->RetryPolicies, Counters.Counters, Config->Proto.GetMaxRequestSize(),
Config->Proto.GetDumpRawStatistics(), finalStatus) :
ConstructSoftPingTask(request, response, YdbConnection->TablePathPrefix, Config->TaskLeaseTtl);
Config->Proto.GetDumpRawStatistics(), finalStatus, requestCounters.Common) :
ConstructSoftPingTask(request, response, YdbConnection->TablePathPrefix, Config->TaskLeaseTtl, requestCounters.Common);
auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
auto result = ReadModifyWrite(pingTaskParams.Query, pingTaskParams.Params, pingTaskParams.Prepare, requestCounters, debugInfo);
auto prepare = [response] { return *response; };
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/fq/libs/control_plane_storage/validators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ TValidationQuery CreateQueryComputeStatusValidator(const std::vector<FederatedQu
const TString& scope,
const TString& id,
const TString& error,
const TString& tablePathPrefix) {
const TString& tablePathPrefix,
const ::NMonitoring::TDynamicCounters::TCounterPtr& parseProtobufError) {
TSqlQueryBuilder queryBuilder(tablePathPrefix, "ComputeStatusValidator");
queryBuilder.AddString("scope", scope);
queryBuilder.AddString("query_id", id);
Expand All @@ -461,7 +462,7 @@ TValidationQuery CreateQueryComputeStatusValidator(const std::vector<FederatedQu
"WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
);

auto validator = [error, computeStatuses](NYdb::NTable::TDataQueryResult result) {
auto validator = [error, computeStatuses, parseProtobufError](NYdb::NTable::TDataQueryResult result) {
const auto& resultSets = result.GetResultSets();
if (resultSets.size() != 1) {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 1 but equal " << resultSets.size() << ". Please contact internal support";
Expand All @@ -474,6 +475,7 @@ TValidationQuery CreateQueryComputeStatusValidator(const std::vector<FederatedQu

FederatedQuery::Query query;
if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) {
parseProtobufError->Inc();
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support";
}

Expand Down
15 changes: 10 additions & 5 deletions ydb/core/fq/libs/control_plane_storage/validators.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,15 @@ TValidationQuery CreateQueryComputeStatusValidator(const std::vector<FederatedQu
const TString& scope,
const TString& id,
const TString& error,
const TString& tablePathPrefix);
const TString& tablePathPrefix,
const ::NMonitoring::TDynamicCounters::TCounterPtr& parseProtobufError);

template<typename T>
TValidationQuery CreateIdempotencyKeyValidator(const TString& scope,
const TString& idempotencyKey,
std::shared_ptr<T> response,
const TString& tablePathPrefix) {
const TString& tablePathPrefix,
const ::NMonitoring::TDynamicCounters::TCounterPtr& parseProtobufError) {
TSqlQueryBuilder queryBuilder(tablePathPrefix);
queryBuilder.AddString("idempotency_key", idempotencyKey);
queryBuilder.AddString("scope", scope);
Expand All @@ -125,7 +127,7 @@ TValidationQuery CreateIdempotencyKeyValidator(const TString& scope,
"WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" IDEMPOTENCY_KEY_COLUMN_NAME "` = $idempotency_key;\n"
);

auto validator = [response](NYdb::NTable::TDataQueryResult result) {
auto validator = [response, parseProtobufError](NYdb::NTable::TDataQueryResult result) {
const auto& resultSets = result.GetResultSets();
if (resultSets.size() != 1) {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "internal error, result set size is not equal to 1 but equal " << resultSets.size();
Expand All @@ -137,6 +139,7 @@ TValidationQuery CreateIdempotencyKeyValidator(const TString& scope,
}

if (!response->first.ParseFromString(*parser.ColumnParser(RESPONSE_COLUMN_NAME).GetOptionalString())) {
parseProtobufError->Inc();
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for response. Please contact internal support";
}

Expand All @@ -150,7 +153,8 @@ template<typename T, typename A>
TValidationQuery CreateIdempotencyKeyValidator(const TString& scope,
const TString& idempotencyKey,
std::shared_ptr<std::pair<T, A>> response,
const TString& tablePathPrefix) {
const TString& tablePathPrefix,
const ::NMonitoring::TDynamicCounters::TCounterPtr& parseProtobufError) {
TSqlQueryBuilder queryBuilder(tablePathPrefix);
queryBuilder.AddString("idempotency_key", idempotencyKey);
queryBuilder.AddString("scope", scope);
Expand All @@ -159,7 +163,7 @@ TValidationQuery CreateIdempotencyKeyValidator(const TString& scope,
"WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" IDEMPOTENCY_KEY_COLUMN_NAME "` = $idempotency_key;\n"
);

auto validator = [response](NYdb::NTable::TDataQueryResult result) {
auto validator = [response, parseProtobufError](NYdb::NTable::TDataQueryResult result) {
const auto& resultSets = result.GetResultSets();
if (resultSets.size() != 1) {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "internal error, result set size is not equal to 1 but equal " << resultSets.size();
Expand All @@ -171,6 +175,7 @@ TValidationQuery CreateIdempotencyKeyValidator(const TString& scope,
}

if (!response->first.ParseFromString(*parser.ColumnParser(RESPONSE_COLUMN_NAME).GetOptionalString())) {
parseProtobufError->Inc();
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for response. Please contact internal support";
}

Expand Down
Loading

0 comments on commit ae76055

Please sign in to comment.