diff --git a/ydb/core/protos/counters_statistics_aggregator.proto b/ydb/core/protos/counters_statistics_aggregator.proto index 0947ca795180..80cf9cbcbb4e 100644 --- a/ydb/core/protos/counters_statistics_aggregator.proto +++ b/ydb/core/protos/counters_statistics_aggregator.proto @@ -23,4 +23,5 @@ enum ETxTypes { TXTYPE_ANALYZE_TABLE_REQUEST = 13 [(TxTypeOpts) = {Name: "TxAnalyzeTableRequest"}]; TXTYPE_ANALYZE_TABLE_RESPONSE = 14 [(TxTypeOpts) = {Name: "TxAnalyzeTableResponse"}]; TXTYPE_ANALYZE_TABLE_DELIVERY_PROBLEM = 15 [(TxTypeOpts) = {Name: "TTxAnalyzeTableDeliveryProblem"}]; + TXTYPE_ANALYZE_DEADLINE = 16 [(TxTypeOpts) = {Name: "TTxAnalyzeDeadline"}]; } diff --git a/ydb/core/statistics/aggregator/aggregator_impl.h b/ydb/core/statistics/aggregator/aggregator_impl.h index 5b0e0715fa4b..3c24c27462b9 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.h +++ b/ydb/core/statistics/aggregator/aggregator_impl.h @@ -49,6 +49,7 @@ class TStatisticsAggregator : public TActor, public NTabl struct TTxAnalyzeTableRequest; struct TTxAnalyzeTableResponse; struct TTxAnalyzeTableDeliveryProblem; + struct TTxAnalyzeDeadline; struct TTxNavigate; struct TTxResolve; struct TTxDatashardScanResponse; @@ -70,6 +71,7 @@ class TStatisticsAggregator : public TActor, public NTabl EvAckTimeout, EvSendAnalyze, EvAnalyzeDeliveryProblem, + EvAnalyzeDeadline, EvEnd }; @@ -83,6 +85,7 @@ class TStatisticsAggregator : public TActor, public NTabl struct TEvResolve : public TEventLocal {}; struct TEvSendAnalyze : public TEventLocal {}; struct TEvAnalyzeDeliveryProblem : public TEventLocal {}; + struct TEvAnalyzeDeadline : public TEventLocal {}; struct TEvAckTimeout : public TEventLocal { size_t SeqNo = 0; @@ -146,6 +149,7 @@ class TStatisticsAggregator : public TActor, public NTabl void Handle(TEvPrivate::TEvAckTimeout::TPtr& ev); void Handle(TEvPrivate::TEvSendAnalyze::TPtr& ev); void Handle(TEvPrivate::TEvAnalyzeDeliveryProblem::TPtr& ev); + void Handle(TEvPrivate::TEvAnalyzeDeadline::TPtr& ev); void InitializeStatisticsTable(); void Navigate(); @@ -209,6 +213,7 @@ class TStatisticsAggregator : public TActor, public NTabl hFunc(TEvPrivate::TEvAckTimeout, Handle); hFunc(TEvPrivate::TEvSendAnalyze, Handle); hFunc(TEvPrivate::TEvAnalyzeDeliveryProblem, Handle); + hFunc(TEvPrivate::TEvAnalyzeDeadline, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { @@ -316,6 +321,8 @@ class TStatisticsAggregator : public TActor, public NTabl static constexpr size_t SendAnalyzeCount = 100; static constexpr TDuration SendAnalyzePeriod = TDuration::Seconds(1); static constexpr TDuration AnalyzeDeliveryProblemPeriod = TDuration::Seconds(1); + static constexpr TDuration AnalyzeDeadline = TDuration::Days(1); + static constexpr TDuration AnalyzeDeadlinePeriod = TDuration::Seconds(1); enum ENavigateType { Analyze, diff --git a/ydb/core/statistics/aggregator/tx_analyze_deadline.cpp b/ydb/core/statistics/aggregator/tx_analyze_deadline.cpp new file mode 100644 index 000000000000..67730beadb57 --- /dev/null +++ b/ydb/core/statistics/aggregator/tx_analyze_deadline.cpp @@ -0,0 +1,65 @@ +#include "aggregator_impl.h" + +#include +#include + +#include + +namespace NKikimr::NStat { + +struct TStatisticsAggregator::TTxAnalyzeDeadline : public TTxBase { + TString OperationId; + TActorId ReplyToActorId; + + TTxAnalyzeDeadline(TSelf* self) + : TTxBase(self) + {} + + TTxType GetTxType() const override { return TXTYPE_ANALYZE_DEADLINE; } + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + SA_LOG_T("[" << Self->TabletID() << "] TTxAnalyzeDeadline::Execute"); + + NIceDb::TNiceDb db(txc.DB); + auto now = ctx.Now(); + + for (TForceTraversalOperation& operation : Self->ForceTraversals) { + if (operation.CreatedAt + Self->AnalyzeDeadline < now) { + SA_LOG_E("[" << Self->TabletID() << "] Delete long analyze operation, OperationId=" << operation.OperationId); + + OperationId = operation.OperationId; + ReplyToActorId = operation.ReplyToActorId; + Self->DeleteForceTraversalOperation(operation.OperationId, db); + break; + } + } + + return true; + } + + void Complete(const TActorContext& ctx) override { + SA_LOG_T("[" << Self->TabletID() << "] TTxAnalyzeDeadline::Complete"); + + if (OperationId) { + if (ReplyToActorId) { + SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeDeadline::Complete. " << + "Send TEvAnalyzeResponse for deleted operation, OperationId=" << OperationId << ", ActorId=" << ReplyToActorId); + auto response = std::make_unique(); + response->Record.SetOperationId(OperationId); + ctx.Send(ReplyToActorId, response.release()); + } else { + SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeDeadline::Complete. No ActorId to send reply. OperationId=" << OperationId); + } + ctx.Send(Self->SelfId(), new TEvPrivate::TEvAnalyzeDeadline()); + } else { + ctx.Schedule(AnalyzeDeadlinePeriod, new TEvPrivate::TEvAnalyzeDeadline()); + } + } +}; + +void TStatisticsAggregator::Handle(TEvPrivate::TEvAnalyzeDeadline::TPtr&) { + Execute(new TTxAnalyzeDeadline(this), + TActivationContext::AsActorContext()); +} + +} // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/tx_analyze_table_delivery_problem.cpp b/ydb/core/statistics/aggregator/tx_analyze_table_delivery_problem.cpp index b975069797e6..6e04a23e339e 100644 --- a/ydb/core/statistics/aggregator/tx_analyze_table_delivery_problem.cpp +++ b/ydb/core/statistics/aggregator/tx_analyze_table_delivery_problem.cpp @@ -8,8 +8,6 @@ namespace NKikimr::NStat { struct TStatisticsAggregator::TTxAnalyzeTableDeliveryProblem : public TTxBase { - std::vector> Events; - TTxAnalyzeTableDeliveryProblem(TSelf* self) : TTxBase(self) {} diff --git a/ydb/core/statistics/aggregator/tx_init.cpp b/ydb/core/statistics/aggregator/tx_init.cpp index 1fc622511061..55eaa4a13148 100644 --- a/ydb/core/statistics/aggregator/tx_init.cpp +++ b/ydb/core/statistics/aggregator/tx_init.cpp @@ -277,6 +277,7 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { Self->Schedule(Self->TraversalPeriod, new TEvPrivate::TEvScheduleTraversal()); Self->Schedule(Self->SendAnalyzePeriod, new TEvPrivate::TEvSendAnalyze()); Self->Schedule(Self->AnalyzeDeliveryProblemPeriod, new TEvPrivate::TEvAnalyzeDeliveryProblem()); + Self->Schedule(Self->AnalyzeDeadlinePeriod, new TEvPrivate::TEvAnalyzeDeadline()); } else { SA_LOG_W("[" << Self->TabletID() << "] TTxInit::Complete. EnableColumnStatistics=false"); } diff --git a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp index 361c7992c6a6..def906babfe6 100644 --- a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp @@ -258,6 +258,22 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { runtime.GrabEdgeEventRethrow(sender); } + + Y_UNIT_TEST(AnalyzeDeadline) { + TTestEnv env(1, 1); + auto& runtime = *env.GetServer().GetRuntime(); + auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + auto sender = runtime.AllocateEdgeActor(); + + TBlockEvents block(runtime); + + auto analyzeRequest = MakeAnalyzeRequest({tableInfo.PathId}); + runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest.release()); + + runtime.SimulateSleep(TDuration::Hours(25)); + + runtime.GrabEdgeEventRethrow(sender); + } } } // NStat diff --git a/ydb/core/statistics/aggregator/ya.make b/ydb/core/statistics/aggregator/ya.make index e63230eebcc1..bab460ef0055 100644 --- a/ydb/core/statistics/aggregator/ya.make +++ b/ydb/core/statistics/aggregator/ya.make @@ -10,6 +10,7 @@ SRCS( tx_ack_timeout.cpp tx_aggr_stat_response.cpp tx_analyze.cpp + tx_analyze_deadline.cpp tx_analyze_table_delivery_problem.cpp tx_analyze_table_request.cpp tx_analyze_table_response.cpp