Skip to content

Commit

Permalink
tiering waiting index policy simplification to use instants directly (y…
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jun 4, 2024
1 parent c6c214c commit b0b6d2e
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ TTieringProcessContext::TTieringProcessContext(const ui64 memoryUsageLimit, cons
, SaverContext(saverContext)
, Counters(counters)
, Controller(controller)
, ActualInstant(TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now())
, DataLocksManager(dataLocksManager)
, Now(TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now())
{

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,17 @@ class TTieringProcessContext {
THashMap<TRWAddress, std::vector<TTaskConstructor>> Tasks;
const NColumnShard::TEngineLogsCounters Counters;
std::shared_ptr<NActualizer::TController> Controller;
TInstant ActualInstant = AppData()->TimeProvider->Now();
public:
const std::shared_ptr<NDataLocks::TManager> DataLocksManager;
const TInstant Now = AppDataVerified().TimeProvider->Now();

TInstant GetActualInstant() const {
return ActualInstant;
}

void ResetActualInstantForTest() {
ActualInstant = TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now();
}

const NColumnShard::TEngineLogsCounters GetCounters() const {
return Counters;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::Star
g->StartActualizationIndex();
}
g->RefreshTiering(i.second);
context.ResetActualInstantForTest();
g->BuildActualizationTasks(context, actualizationLag);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ void TTieringActualizer::DoAddPortion(const TPortionInfo& portion, const TAddExt
if (!info) {
return;
}
AFL_VERIFY(PortionIdByWaitDuration[info->GetAddress()].AddPortion(*info, portion.GetPortionId(), addContext.GetNow() - StartInstant));
AFL_VERIFY(PortionIdByWaitDuration[info->GetAddress()].AddPortion(*info, portion.GetPortionId(), addContext.GetNow()));
auto address = info->GetAddress();
TFindActualizationInfo findId(std::move(address), info->GetWaitDuration() + (addContext.GetNow() - StartInstant));
TFindActualizationInfo findId(std::move(address), info->GetWaitInstant(addContext.GetNow()));
AFL_VERIFY(PortionsInfo.emplace(portion.GetPortionId(), std::move(findId)).second);
}

Expand All @@ -105,16 +105,16 @@ void TTieringActualizer::DoRemovePortion(const ui64 portionId) {
void TTieringActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& /*internalContext*/) {
THashSet<ui64> portionIds;
for (auto&& [address, addressPortions] : PortionIdByWaitDuration) {
if (addressPortions.GetPortions().size() && tasksContext.Now - StartInstant < addressPortions.GetPortions().begin()->first) {
if (addressPortions.GetPortions().size() && tasksContext.GetActualInstant() < addressPortions.GetPortions().begin()->first) {
Counters.SkipEvictionForLimit->Add(1);
continue;
}
if (!tasksContext.IsRWAddressAvailable(address)) {
Counters.SkipEvictionForLimit->Add(1);
continue;
}
for (auto&& [duration, portions] : addressPortions.GetPortions()) {
if (tasksContext.Now - StartInstant < duration) {
for (auto&& [wInstant, portions] : addressPortions.GetPortions()) {
if (tasksContext.GetActualInstant() < wInstant) {
break;
}
bool limitEnriched = false;
Expand All @@ -126,7 +126,7 @@ void TTieringActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, co
continue;
}
}
auto info = BuildActualizationInfo(*portion, tasksContext.Now);
auto info = BuildActualizationInfo(*portion, tasksContext.GetActualInstant());
AFL_VERIFY(info);
auto portionScheme = portion->GetSchema(VersionedIndex);
TPortionEvictionFeatures features(portionScheme, info->GetTargetScheme(), portion->GetTierNameDef(IStoragesManager::DefaultStorageId));
Expand All @@ -153,9 +153,9 @@ void TTieringActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, co
std::shared_ptr<NColumnShard::TValueAggregationClient> waitDurationSignal;
std::shared_ptr<NColumnShard::TValueAggregationClient> queueSizeSignal;
if (i.first.WriteIs(NTiering::NCommon::DeleteTierName)) {
i.second.CorrectSignals(waitQueueDelete, waitDurationDelete, tasksContext.Now - StartInstant);
i.second.CorrectSignals(waitQueueDelete, waitDurationDelete, tasksContext.GetActualInstant());
} else {
i.second.CorrectSignals(waitQueueEvict, waitDurationEvict, tasksContext.Now - StartInstant);
i.second.CorrectSignals(waitQueueEvict, waitDurationEvict, tasksContext.GetActualInstant());
}
}
Counters.DifferenceWaitToDelete->SetValue(waitDurationDelete);
Expand All @@ -170,7 +170,6 @@ void TTieringActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, co
}

void TTieringActualizer::Refresh(const std::optional<TTiering>& info, const TAddExternalContext& externalContext) {
StartInstant = externalContext.GetNow();
Tiering = info;
if (Tiering) {
TieringColumnId = VersionedIndex.GetLastSchema()->GetColumnId(Tiering->GetTtlColumn());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ class TTieringActualizer: public IActualizer {

}

TDuration GetWaitDuration() const {
TInstant GetWaitInstant(const TInstant now) const {
if (WaitDurationValue >= 0) {
return TDuration::FromValue(WaitDurationValue);
return now + TDuration::FromValue(WaitDurationValue);
} else {
return TDuration::Zero();
return now;
}
}

Expand All @@ -58,52 +58,52 @@ class TTieringActualizer: public IActualizer {
class TFindActualizationInfo {
private:
TRWAddress RWAddress;
YDB_READONLY_DEF(TDuration, WaitDuration);
YDB_READONLY_DEF(TInstant, WaitInstant);
public:
const TRWAddress& GetRWAddress() const {
return RWAddress;
}

TFindActualizationInfo(TRWAddress&& rwAddress, const TDuration waitDuration)
TFindActualizationInfo(TRWAddress&& rwAddress, const TInstant waitInstant)
: RWAddress(std::move(rwAddress))
, WaitDuration(waitDuration) {
, WaitInstant(waitInstant) {

}
};

class TRWAddressPortionsInfo {
private:
std::map<TDuration, THashSet<ui64>> Portions;
std::map<TInstant, THashSet<ui64>> Portions;
public:
const std::map<TDuration, THashSet<ui64>>& GetPortions() const {
const std::map<TInstant, THashSet<ui64>>& GetPortions() const {
return Portions;
}

void CorrectSignals(ui64& queueSize, ui64& waitSeconds, const TDuration dCorrect) const {
void CorrectSignals(ui64& queueSize, ui64& waitSeconds, const TInstant now) const {
if (Portions.empty()) {
return;
}
for (auto&& i : Portions) {
if (i.first > dCorrect) {
if (i.first > now) {
break;
}
queueSize += i.second.size();
}
if (Portions.begin()->first < dCorrect) {
waitSeconds = std::max(waitSeconds, (dCorrect - Portions.begin()->first).Seconds());
if (Portions.begin()->first < now) {
waitSeconds = std::max(waitSeconds, (now - Portions.begin()->first).Seconds());
}
}

[[nodiscard]] bool AddPortion(const TFullActualizationInfo& info, const ui64 portionId, const TDuration dCorrection) {
return Portions[info.GetWaitDuration() + dCorrection].emplace(portionId).second;
[[nodiscard]] bool AddPortion(const TFullActualizationInfo& info, const ui64 portionId, const TInstant now) {
return Portions[info.GetWaitInstant(now)].emplace(portionId).second;
}

bool RemovePortion(const TFindActualizationInfo& info, const ui64 portionId) {
auto itDuration = Portions.find(info.GetWaitDuration());
AFL_VERIFY(itDuration != Portions.end());
AFL_VERIFY(itDuration->second.erase(portionId));
if (itDuration->second.empty()) {
Portions.erase(itDuration);
auto itInstant = Portions.find(info.GetWaitInstant());
AFL_VERIFY(itInstant != Portions.end());
AFL_VERIFY(itInstant->second.erase(portionId));
if (itInstant->second.empty()) {
Portions.erase(itInstant);
}
return Portions.empty();
}
Expand All @@ -116,7 +116,6 @@ class TTieringActualizer: public IActualizer {
const ui64 PathId;
const TVersionedIndex& VersionedIndex;

TInstant StartInstant = TInstant::Zero();
THashMap<TRWAddress, TRWAddressPortionsInfo> PortionIdByWaitDuration;
THashMap<ui64, TFindActualizationInfo> PortionsInfo;

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/storage/granule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ std::shared_ptr<TPortionInfo> TGranuleMeta::UpsertPortionOnLoad(TPortionInfo&& p
}

void TGranuleMeta::BuildActualizationTasks(NActualizer::TTieringProcessContext& context, const TDuration actualizationLag) const {
if (context.Now - LastActualizations < actualizationLag) {
if (context.GetActualInstant() - LastActualizations < actualizationLag) {
return;
}
NActualizer::TExternalTasksContext extTasks(Portions);
ActualizationIndex->ExtractActualizationTasks(context, extTasks);
LastActualizations = context.Now;
LastActualizations = context.GetActualInstant();
}

} // namespace NKikimr::NOlap
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
Y_UNIT_TEST(IndexTtl) {
TTestDbWrapper db;
TIndexInfo tableInfo = NColumnShard::BuildTableInfo(testColumns, testKey);
auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
csDefaultControllerGuard->SetTasksActualizationLag(TDuration::Zero());

ui64 pathId = 1;
ui32 step = 1000;
Expand Down

0 comments on commit b0b6d2e

Please sign in to comment.