Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tiering waiting index policy simplification to use instants directly #5123

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ TTieringProcessContext::TTieringProcessContext(const ui64 memoryUsageLimit, cons
, SaverContext(saverContext)
, Counters(counters)
, Controller(controller)
, ActualInstant(TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now())
, DataLocksManager(dataLocksManager)
, Now(TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now())
{

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,17 @@ class TTieringProcessContext {
THashMap<TRWAddress, std::vector<TTaskConstructor>> Tasks;
const NColumnShard::TEngineLogsCounters Counters;
std::shared_ptr<NActualizer::TController> Controller;
TInstant ActualInstant = AppData()->TimeProvider->Now();
public:
const std::shared_ptr<NDataLocks::TManager> DataLocksManager;
const TInstant Now = AppDataVerified().TimeProvider->Now();

TInstant GetActualInstant() const {
return ActualInstant;
}

void ResetActualInstantForTest() {
ActualInstant = TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now();
}

const NColumnShard::TEngineLogsCounters GetCounters() const {
return Counters;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::Star
g->StartActualizationIndex();
}
g->RefreshTiering(i.second);
context.ResetActualInstantForTest();
g->BuildActualizationTasks(context, actualizationLag);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ void TTieringActualizer::DoAddPortion(const TPortionInfo& portion, const TAddExt
if (!info) {
return;
}
AFL_VERIFY(PortionIdByWaitDuration[info->GetAddress()].AddPortion(*info, portion.GetPortionId(), addContext.GetNow() - StartInstant));
AFL_VERIFY(PortionIdByWaitDuration[info->GetAddress()].AddPortion(*info, portion.GetPortionId(), addContext.GetNow()));
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
auto address = info->GetAddress();
TFindActualizationInfo findId(std::move(address), info->GetWaitDuration() + (addContext.GetNow() - StartInstant));
TFindActualizationInfo findId(std::move(address), info->GetWaitInstant(addContext.GetNow()));
AFL_VERIFY(PortionsInfo.emplace(portion.GetPortionId(), std::move(findId)).second);
}

Expand All @@ -105,16 +105,16 @@ void TTieringActualizer::DoRemovePortion(const ui64 portionId) {
void TTieringActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& /*internalContext*/) {
THashSet<ui64> portionIds;
for (auto&& [address, addressPortions] : PortionIdByWaitDuration) {
if (addressPortions.GetPortions().size() && tasksContext.Now - StartInstant < addressPortions.GetPortions().begin()->first) {
if (addressPortions.GetPortions().size() && tasksContext.GetActualInstant() < addressPortions.GetPortions().begin()->first) {
Counters.SkipEvictionForLimit->Add(1);
continue;
}
if (!tasksContext.IsRWAddressAvailable(address)) {
Counters.SkipEvictionForLimit->Add(1);
continue;
}
for (auto&& [duration, portions] : addressPortions.GetPortions()) {
if (tasksContext.Now - StartInstant < duration) {
for (auto&& [wInstant, portions] : addressPortions.GetPortions()) {
if (tasksContext.GetActualInstant() < wInstant) {
break;
}
bool limitEnriched = false;
Expand All @@ -126,7 +126,7 @@ void TTieringActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, co
continue;
}
}
auto info = BuildActualizationInfo(*portion, tasksContext.Now);
auto info = BuildActualizationInfo(*portion, tasksContext.GetActualInstant());
AFL_VERIFY(info);
auto portionScheme = portion->GetSchema(VersionedIndex);
TPortionEvictionFeatures features(portionScheme, info->GetTargetScheme(), portion->GetTierNameDef(IStoragesManager::DefaultStorageId));
Expand All @@ -153,9 +153,9 @@ void TTieringActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, co
std::shared_ptr<NColumnShard::TValueAggregationClient> waitDurationSignal;
std::shared_ptr<NColumnShard::TValueAggregationClient> queueSizeSignal;
if (i.first.WriteIs(NTiering::NCommon::DeleteTierName)) {
i.second.CorrectSignals(waitQueueDelete, waitDurationDelete, tasksContext.Now - StartInstant);
i.second.CorrectSignals(waitQueueDelete, waitDurationDelete, tasksContext.GetActualInstant());
} else {
i.second.CorrectSignals(waitQueueEvict, waitDurationEvict, tasksContext.Now - StartInstant);
i.second.CorrectSignals(waitQueueEvict, waitDurationEvict, tasksContext.GetActualInstant());
}
}
Counters.DifferenceWaitToDelete->SetValue(waitDurationDelete);
Expand All @@ -170,7 +170,6 @@ void TTieringActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, co
}

void TTieringActualizer::Refresh(const std::optional<TTiering>& info, const TAddExternalContext& externalContext) {
StartInstant = externalContext.GetNow();
Tiering = info;
if (Tiering) {
TieringColumnId = VersionedIndex.GetLastSchema()->GetColumnId(Tiering->GetTtlColumn());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ class TTieringActualizer: public IActualizer {

}

TDuration GetWaitDuration() const {
TInstant GetWaitInstant(const TInstant now) const {
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
if (WaitDurationValue >= 0) {
return TDuration::FromValue(WaitDurationValue);
return now + TDuration::FromValue(WaitDurationValue);
} else {
return TDuration::Zero();
return now;
}
}

Expand All @@ -58,52 +58,52 @@ class TTieringActualizer: public IActualizer {
class TFindActualizationInfo {
private:
TRWAddress RWAddress;
YDB_READONLY_DEF(TDuration, WaitDuration);
YDB_READONLY_DEF(TInstant, WaitInstant);
public:
const TRWAddress& GetRWAddress() const {
return RWAddress;
}

TFindActualizationInfo(TRWAddress&& rwAddress, const TDuration waitDuration)
TFindActualizationInfo(TRWAddress&& rwAddress, const TInstant waitInstant)
: RWAddress(std::move(rwAddress))
, WaitDuration(waitDuration) {
, WaitInstant(waitInstant) {

}
};

class TRWAddressPortionsInfo {
private:
std::map<TDuration, THashSet<ui64>> Portions;
std::map<TInstant, THashSet<ui64>> Portions;
public:
const std::map<TDuration, THashSet<ui64>>& GetPortions() const {
const std::map<TInstant, THashSet<ui64>>& GetPortions() const {
return Portions;
}

void CorrectSignals(ui64& queueSize, ui64& waitSeconds, const TDuration dCorrect) const {
void CorrectSignals(ui64& queueSize, ui64& waitSeconds, const TInstant now) const {
if (Portions.empty()) {
return;
}
for (auto&& i : Portions) {
if (i.first > dCorrect) {
if (i.first > now) {
break;
}
queueSize += i.second.size();
}
if (Portions.begin()->first < dCorrect) {
waitSeconds = std::max(waitSeconds, (dCorrect - Portions.begin()->first).Seconds());
if (Portions.begin()->first < now) {
waitSeconds = std::max(waitSeconds, (now - Portions.begin()->first).Seconds());
}
}

[[nodiscard]] bool AddPortion(const TFullActualizationInfo& info, const ui64 portionId, const TDuration dCorrection) {
return Portions[info.GetWaitDuration() + dCorrection].emplace(portionId).second;
[[nodiscard]] bool AddPortion(const TFullActualizationInfo& info, const ui64 portionId, const TInstant now) {
return Portions[info.GetWaitInstant(now)].emplace(portionId).second;
}

bool RemovePortion(const TFindActualizationInfo& info, const ui64 portionId) {
auto itDuration = Portions.find(info.GetWaitDuration());
AFL_VERIFY(itDuration != Portions.end());
AFL_VERIFY(itDuration->second.erase(portionId));
if (itDuration->second.empty()) {
Portions.erase(itDuration);
auto itInstant = Portions.find(info.GetWaitInstant());
AFL_VERIFY(itInstant != Portions.end());
AFL_VERIFY(itInstant->second.erase(portionId));
if (itInstant->second.empty()) {
Portions.erase(itInstant);
}
return Portions.empty();
}
Expand All @@ -116,7 +116,6 @@ class TTieringActualizer: public IActualizer {
const ui64 PathId;
const TVersionedIndex& VersionedIndex;

TInstant StartInstant = TInstant::Zero();
THashMap<TRWAddress, TRWAddressPortionsInfo> PortionIdByWaitDuration;
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
THashMap<ui64, TFindActualizationInfo> PortionsInfo;

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/storage/granule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ std::shared_ptr<TPortionInfo> TGranuleMeta::UpsertPortionOnLoad(TPortionInfo&& p
}

void TGranuleMeta::BuildActualizationTasks(NActualizer::TTieringProcessContext& context, const TDuration actualizationLag) const {
if (context.Now - LastActualizations < actualizationLag) {
if (context.GetActualInstant() - LastActualizations < actualizationLag) {
return;
}
NActualizer::TExternalTasksContext extTasks(Portions);
ActualizationIndex->ExtractActualizationTasks(context, extTasks);
LastActualizations = context.Now;
LastActualizations = context.GetActualInstant();
}

} // namespace NKikimr::NOlap
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
Y_UNIT_TEST(IndexTtl) {
TTestDbWrapper db;
TIndexInfo tableInfo = NColumnShard::BuildTableInfo(testColumns, testKey);
auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
csDefaultControllerGuard->SetTasksActualizationLag(TDuration::Zero());

ui64 pathId = 1;
ui32 step = 1000;
Expand Down
Loading