Skip to content

Commit

Permalink
fix cs latency spikes (ydb-platform#6158) (ydb-platform#6205)
Browse files Browse the repository at this point in the history
Co-authored-by: ivanmorozov333 <ivanmorozov@ydb.tech>
  • Loading branch information
2 people authored and uzhastik committed Jul 6, 2024
1 parent f790bd1 commit bdda5b1
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 22 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,7 @@ message TColumnShardConfig {

optional TIndexMetadataMemoryLimit IndexMetadataMemoryLimit = 12;
optional bool CleanupEnabled = 13 [default = true];
optional uint32 MaxInFlightIntervalsOnRequest = 16;
}

message TSchemeShardConfig {
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/tx/columnshard/background_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class TBackgroundController {

bool ActiveCleanupPortions = false;
bool ActiveCleanupTables = false;
bool ActiveCleanupInsertTable = false;
YDB_READONLY(TMonotonic, LastIndexationInstant, TMonotonic::Zero());
public:
THashSet<NOlap::TPortionAddress> GetConflictTTLPortions() const;
Expand Down Expand Up @@ -66,6 +67,18 @@ class TBackgroundController {
bool IsCleanupTablesActive() const {
return ActiveCleanupTables;
}

void StartCleanupInsertTable() {
Y_ABORT_UNLESS(!ActiveCleanupInsertTable);
ActiveCleanupInsertTable = true;
}
void FinishCleanupInsertTable() {
Y_ABORT_UNLESS(ActiveCleanupInsertTable);
ActiveCleanupInsertTable = false;
}
bool IsCleanupInsertTableActive() const {
return ActiveCleanupInsertTable;
}
};

}
13 changes: 12 additions & 1 deletion ydb/core/tx/columnshard/blobs_action/abstract/blob_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,18 @@ class TTabletsByBlob {
bool IsEmpty() const {
return Data.empty();
}

using TGenStep = std::tuple<ui32, ui32>;
std::deque<TUnifiedBlobId> GroupByGenStep() const {
std::deque<TUnifiedBlobId> result;
for (const auto& i : Data) {
result.emplace_back(i.first);
}
const auto pred = [](const TUnifiedBlobId& l, const TUnifiedBlobId& r) {
return TGenStep(l.GetLogoBlobId().Generation(), l.GetLogoBlobId().Step()) < TGenStep(r.GetLogoBlobId().Generation(), r.GetLogoBlobId().Step());
};
std::sort(result.begin(), result.end(), pred);
return result;
}
template <class TFilter>
TTabletsByBlob ExtractBlobs(const TFilter& filter, const std::optional<ui32> countLimit = {}) {
TTabletsByBlob result;
Expand Down
51 changes: 37 additions & 14 deletions ydb/core/tx/columnshard/blobs_action/bs/blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ std::vector<TGenStep> TBlobManager::FindNewGCBarriers() {
return result;
}

std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTask(const TString& storageId,
std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTask(const TString& storageId,
const std::shared_ptr<TBlobManager>& manager, const std::shared_ptr<NDataSharing::TStorageSharedBlobsManager>& sharedBlobsInfo,
const std::shared_ptr<NBlobOperations::TRemoveGCCounters>& counters) noexcept {
AFL_VERIFY(!CollectGenStepInFlight);
Expand Down Expand Up @@ -244,7 +244,7 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
}
}

static const ui32 blobsGCCountLimit = 50000;
static const ui32 blobsGCCountLimit = 500000;

const auto predShared = [&](const TUnifiedBlobId& id, const THashSet<TTabletId>& /*tabletIds*/) {
return id.GetLogoBlobId().TabletID() != (ui64)SelfTabletId;
Expand All @@ -262,9 +262,8 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas

TTabletsByBlob extractedOld = BlobsToDelete.ExtractBlobs(predRemoveOld, blobsGCCountLimit - extractedToRemoveFromDB.GetSize());
extractedToRemoveFromDB.Add(extractedOld);
TTabletId tabletId;
TUnifiedBlobId unifiedBlobId;
while (extractedOld.ExtractFront(tabletId, unifiedBlobId)) {
for (TTabletsByBlob::TIterator itExtractedOld(extractedOld); itExtractedOld.IsValid(); ++itExtractedOld) {
const TUnifiedBlobId unifiedBlobId = itExtractedOld.GetBlobId();
auto logoBlobId = unifiedBlobId.GetLogoBlobId();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", logoBlobId);
NBlobOperations::NBlobStorage::TGCTask::TGCLists& gl = perGroupGCListsInFlight[unifiedBlobId.GetDsGroup()];
Expand All @@ -278,6 +277,10 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas


std::deque<TUnifiedBlobId> keepsToErase;
std::deque<TUnifiedBlobId> deleteIndex;
if (extractedToRemoveFromDB.GetSize() + keepsToErase.size() < blobsGCCountLimit) {
deleteIndex = BlobsToDelete.GroupByGenStep();
}
for (auto&& newCollectGenStep : newCollectGenSteps) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "PreparePerGroupGCRequests")("gen", std::get<0>(newCollectGenStep))("step", std::get<1>(newCollectGenStep));
BlobsManagerCounters.OnNewCollectStep(std::get<0>(newCollectGenStep), std::get<1>(newCollectGenStep));
Expand All @@ -297,20 +300,38 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
perGroupGCListsInFlight[blobGroup].KeepList.insert(*keepBlobIt);
keepsToErase.emplace_back(TUnifiedBlobId(blobGroup, *keepBlobIt));
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep_gc", *keepBlobIt);
if (extractedToRemoveFromDB.GetSize() + keepsToErase.size() > blobsGCCountLimit) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit);
break;
}
}
if (extractedToRemoveFromDB.GetSize() + keepsToErase.size() > blobsGCCountLimit) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit);
break;
}
BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt);
BlobsManagerCounters.OnBlobsKeep(BlobsToKeep);

const auto predSelf = [&](const TUnifiedBlobId& id, const THashSet<TTabletId>& /*tabletIds*/) {
auto logoBlobId = id.GetLogoBlobId();
TGenStep genStep{logoBlobId.Generation(), logoBlobId.Step()};
return genStep <= newCollectGenStep && id.GetLogoBlobId().TabletID() == (ui64)SelfTabletId;
};
TTabletsByBlob extractedSelf = BlobsToDelete.ExtractBlobs(predSelf);
TTabletsByBlob extractedSelf;
{
while (deleteIndex.size()) {
const auto& blobId = deleteIndex.front().GetLogoBlobId();
if (newCollectGenStep < TGenStep(blobId.Generation(), blobId.Step())) {
break;
}
BlobsToDelete.ExtractBlobTo(deleteIndex.front(), extractedSelf);
if (extractedToRemoveFromDB.GetSize() + extractedSelf.GetSize() + keepsToErase.size() > blobsGCCountLimit) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit);
break;
}
deleteIndex.pop_front();
}

}

extractedToRemoveFromDB.Add(extractedSelf);
TTabletId tabletId;
TUnifiedBlobId unifiedBlobId;
while (extractedSelf.ExtractFront(tabletId, unifiedBlobId)) {
for (TTabletsByBlob::TIterator itExtractedSelf(extractedSelf); itExtractedSelf.IsValid(); ++itExtractedSelf) {
const TUnifiedBlobId unifiedBlobId = itExtractedSelf.GetBlobId();
auto logoBlobId = unifiedBlobId.GetLogoBlobId();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", logoBlobId);
NBlobOperations::NBlobStorage::TGCTask::TGCLists& gl = perGroupGCListsInFlight[unifiedBlobId.GetDsGroup()];
Expand Down Expand Up @@ -359,6 +380,8 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
return result;
}



TBlobBatch TBlobManager::StartBlobBatch(ui32 channel) {
++CountersUpdate.BatchesStarted;
Y_ABORT_UNLESS(channel == BLOB_CHANNEL, "Support for mutiple blob channels is not implemented yet");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ void TTxInsertTableCleanup::Complete(const TActorContext& /*ctx*/) {

Y_ABORT_UNLESS(BlobsAction);
BlobsAction->OnCompleteTxAfterRemoving(true);
Self->EnqueueBackgroundActivities();
Self->BackgroundController.FinishCleanupInsertTable();
Self->SetupCleanupInsertTable();
}

}
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -845,13 +845,18 @@ void TColumnShard::Handle(TEvPrivate::TEvGarbageCollectionFinished::TPtr& ev, co
}

void TColumnShard::SetupCleanupInsertTable() {
if (BackgroundController.IsCleanupInsertTableActive()) {
ACFL_DEBUG("background", "cleanup_insert_table")("skip_reason", "in_progress");
return;
}
auto writeIdsToCleanup = InsertTable->OldWritesToAbort(AppData()->TimeProvider->Now());

if (!InsertTable->GetAborted().size() && !writeIdsToCleanup.size()) {
return;
}
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size())("to_cleanup", writeIdsToCleanup.size());

BackgroundController.StartCleanupInsertTable();
Execute(new TTxInsertTableCleanup(this, std::move(writeIdsToCleanup)), TActorContext::AsActorContext());
}

Expand Down
9 changes: 5 additions & 4 deletions ydb/core/tx/columnshard/engines/column_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ const std::shared_ptr<arrow::Schema>& IColumnEngine::GetReplaceKey() const {
}

ui64 IColumnEngine::GetMetadataLimit() {
static const auto MemoryTotal = NSystemInfo::TotalMemorySize();
if (!HasAppData()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("total", NSystemInfo::TotalMemorySize());
return NSystemInfo::TotalMemorySize() * 0.3;
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("total", MemoryTotal);
return MemoryTotal * 0.3;
} else if (AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().HasAbsoluteValue()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("value", AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetAbsoluteValue());
return AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetAbsoluteValue();
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("total", NSystemInfo::TotalMemorySize())("kff", AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetTotalRatio());
return NSystemInfo::TotalMemorySize() * AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetTotalRatio();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("total", MemoryTotal)("kff", AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetTotalRatio());
return MemoryTotal * AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetTotalRatio();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ void TScanHead::OnIntervalResult(const std::optional<NArrow::TShardedRecordBatch
std::unique_ptr<NArrow::NMerger::TMergePartialStream>&& merger, const ui32 intervalIdx, TPlainReadData& reader) {
if (Context->GetReadMetadata()->Limit && (!newBatch || newBatch->GetRecordsCount() == 0) && InFlightLimit < 1000) {
if (++ZeroCount == std::max<ui64>(16, InFlightLimit)) {
InFlightLimit *= 2;
InFlightLimit = std::max<ui32>(MaxInFlight, InFlightLimit * 2);
ZeroCount = 0;
}
} else {
Expand Down Expand Up @@ -96,7 +96,17 @@ TConclusionStatus TScanHead::Start() {
TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context)
: Context(context)
{
InFlightLimit = Context->GetReadMetadata()->Limit ? 1 : Max<ui32>();
if (!HasAppData() || !AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) {
MaxInFlight = 256;
} else {
MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest();
}

if (Context->GetReadMetadata()->Limit) {
InFlightLimit = 1;
} else {
InFlightLimit = MaxInFlight;
}
while (sources.size()) {
auto source = sources.front();
BorderPoints[source->GetStart()].AddStart(source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class TScanHead {
ui32 SegmentIdxCounter = 0;
std::vector<TIntervalStat> IntervalStats;
ui64 InFlightLimit = 1;
ui64 MaxInFlight = 256;
ui64 ZeroCount = 0;
bool AbortFlag = false;
void DrainSources();
Expand Down

0 comments on commit bdda5b1

Please sign in to comment.