Skip to content

Commit

Permalink
Statistic: Delete analyze after deadline (ydb-platform#8214)
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored and stanislav-shchetinin committed Aug 30, 2024
1 parent dba2bab commit 7b765a8
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 2 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/counters_statistics_aggregator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ enum ETxTypes {
TXTYPE_ANALYZE_TABLE_REQUEST = 13 [(TxTypeOpts) = {Name: "TxAnalyzeTableRequest"}];
TXTYPE_ANALYZE_TABLE_RESPONSE = 14 [(TxTypeOpts) = {Name: "TxAnalyzeTableResponse"}];
TXTYPE_ANALYZE_TABLE_DELIVERY_PROBLEM = 15 [(TxTypeOpts) = {Name: "TTxAnalyzeTableDeliveryProblem"}];
TXTYPE_ANALYZE_DEADLINE = 16 [(TxTypeOpts) = {Name: "TTxAnalyzeDeadline"}];
}
7 changes: 7 additions & 0 deletions ydb/core/statistics/aggregator/aggregator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
struct TTxAnalyzeTableRequest;
struct TTxAnalyzeTableResponse;
struct TTxAnalyzeTableDeliveryProblem;
struct TTxAnalyzeDeadline;
struct TTxNavigate;
struct TTxResolve;
struct TTxDatashardScanResponse;
Expand All @@ -70,6 +71,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
EvAckTimeout,
EvSendAnalyze,
EvAnalyzeDeliveryProblem,
EvAnalyzeDeadline,

EvEnd
};
Expand All @@ -83,6 +85,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
struct TEvResolve : public TEventLocal<TEvResolve, EvResolve> {};
struct TEvSendAnalyze : public TEventLocal<TEvSendAnalyze, EvSendAnalyze> {};
struct TEvAnalyzeDeliveryProblem : public TEventLocal<TEvAnalyzeDeliveryProblem, EvAnalyzeDeliveryProblem> {};
struct TEvAnalyzeDeadline : public TEventLocal<TEvAnalyzeDeadline, EvAnalyzeDeadline> {};

struct TEvAckTimeout : public TEventLocal<TEvAckTimeout, EvAckTimeout> {
size_t SeqNo = 0;
Expand Down Expand Up @@ -146,6 +149,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
void Handle(TEvPrivate::TEvAckTimeout::TPtr& ev);
void Handle(TEvPrivate::TEvSendAnalyze::TPtr& ev);
void Handle(TEvPrivate::TEvAnalyzeDeliveryProblem::TPtr& ev);
void Handle(TEvPrivate::TEvAnalyzeDeadline::TPtr& ev);

void InitializeStatisticsTable();
void Navigate();
Expand Down Expand Up @@ -209,6 +213,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
hFunc(TEvPrivate::TEvAckTimeout, Handle);
hFunc(TEvPrivate::TEvSendAnalyze, Handle);
hFunc(TEvPrivate::TEvAnalyzeDeliveryProblem, Handle);
hFunc(TEvPrivate::TEvAnalyzeDeadline, Handle);

default:
if (!HandleDefaultEvents(ev, SelfId())) {
Expand Down Expand Up @@ -316,6 +321,8 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
static constexpr size_t SendAnalyzeCount = 100;
static constexpr TDuration SendAnalyzePeriod = TDuration::Seconds(1);
static constexpr TDuration AnalyzeDeliveryProblemPeriod = TDuration::Seconds(1);
static constexpr TDuration AnalyzeDeadline = TDuration::Days(1);
static constexpr TDuration AnalyzeDeadlinePeriod = TDuration::Seconds(1);

enum ENavigateType {
Analyze,
Expand Down
65 changes: 65 additions & 0 deletions ydb/core/statistics/aggregator/tx_analyze_deadline.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include "aggregator_impl.h"

#include <ydb/core/protos/hive.pb.h>
#include <ydb/core/statistics/service/service.h>

#include <util/string/vector.h>

namespace NKikimr::NStat {

struct TStatisticsAggregator::TTxAnalyzeDeadline : public TTxBase {
TString OperationId;
TActorId ReplyToActorId;

TTxAnalyzeDeadline(TSelf* self)
: TTxBase(self)
{}

TTxType GetTxType() const override { return TXTYPE_ANALYZE_DEADLINE; }

bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
SA_LOG_T("[" << Self->TabletID() << "] TTxAnalyzeDeadline::Execute");

NIceDb::TNiceDb db(txc.DB);
auto now = ctx.Now();

for (TForceTraversalOperation& operation : Self->ForceTraversals) {
if (operation.CreatedAt + Self->AnalyzeDeadline < now) {
SA_LOG_E("[" << Self->TabletID() << "] Delete long analyze operation, OperationId=" << operation.OperationId);

OperationId = operation.OperationId;
ReplyToActorId = operation.ReplyToActorId;
Self->DeleteForceTraversalOperation(operation.OperationId, db);
break;
}
}

return true;
}

void Complete(const TActorContext& ctx) override {
SA_LOG_T("[" << Self->TabletID() << "] TTxAnalyzeDeadline::Complete");

if (OperationId) {
if (ReplyToActorId) {
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeDeadline::Complete. " <<
"Send TEvAnalyzeResponse for deleted operation, OperationId=" << OperationId << ", ActorId=" << ReplyToActorId);
auto response = std::make_unique<TEvStatistics::TEvAnalyzeResponse>();
response->Record.SetOperationId(OperationId);
ctx.Send(ReplyToActorId, response.release());
} else {
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeDeadline::Complete. No ActorId to send reply. OperationId=" << OperationId);
}
ctx.Send(Self->SelfId(), new TEvPrivate::TEvAnalyzeDeadline());
} else {
ctx.Schedule(AnalyzeDeadlinePeriod, new TEvPrivate::TEvAnalyzeDeadline());
}
}
};

void TStatisticsAggregator::Handle(TEvPrivate::TEvAnalyzeDeadline::TPtr&) {
Execute(new TTxAnalyzeDeadline(this),
TActivationContext::AsActorContext());
}

} // NKikimr::NStat
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
namespace NKikimr::NStat {

struct TStatisticsAggregator::TTxAnalyzeTableDeliveryProblem : public TTxBase {
std::vector<std::unique_ptr<IEventBase>> Events;

TTxAnalyzeTableDeliveryProblem(TSelf* self)
: TTxBase(self)
{}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/statistics/aggregator/tx_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
Self->Schedule(Self->TraversalPeriod, new TEvPrivate::TEvScheduleTraversal());
Self->Schedule(Self->SendAnalyzePeriod, new TEvPrivate::TEvSendAnalyze());
Self->Schedule(Self->AnalyzeDeliveryProblemPeriod, new TEvPrivate::TEvAnalyzeDeliveryProblem());
Self->Schedule(Self->AnalyzeDeadlinePeriod, new TEvPrivate::TEvAnalyzeDeadline());
} else {
SA_LOG_W("[" << Self->TabletID() << "] TTxInit::Complete. EnableColumnStatistics=false");
}
Expand Down
17 changes: 17 additions & 0 deletions ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,23 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {

runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
}

Y_UNIT_TEST(AnalyzeDeadline) {
TTestEnv env(1, 1);
auto& runtime = *env.GetServer().GetRuntime();
auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0];
auto sender = runtime.AllocateEdgeActor();

TBlockEvents<TEvStatistics::TEvAnalyzeTableResponse> block(runtime);

auto analyzeRequest = MakeAnalyzeRequest({tableInfo.PathId});
runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest.release());

runtime.WaitFor("TEvAnalyzeTableResponse", [&]{ return block.size(); });
runtime.AdvanceCurrentTime(TDuration::Days(2));

runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
}
}

} // NStat
Expand Down
1 change: 1 addition & 0 deletions ydb/core/statistics/aggregator/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ SRCS(
tx_ack_timeout.cpp
tx_aggr_stat_response.cpp
tx_analyze.cpp
tx_analyze_deadline.cpp
tx_analyze_table_delivery_problem.cpp
tx_analyze_table_request.cpp
tx_analyze_table_response.cpp
Expand Down

0 comments on commit 7b765a8

Please sign in to comment.