From 36fc0ee4aa8fefc4ea866f1b28c705fc2a78277f Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Sun, 21 Apr 2024 16:15:58 +0300 Subject: [PATCH] Reserve change queue capacity in order not to overflow the queue (#3841) (#3917) --- ydb/core/tx/datashard/cdc_stream_scan.cpp | 24 +++++++-- ydb/core/tx/datashard/datashard.cpp | 63 ++++++++++++++++++++--- ydb/core/tx/datashard/datashard_impl.h | 21 +++++--- 3 files changed, 93 insertions(+), 15 deletions(-) diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index 09bed6daf3b1..4bae27518d4f 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -213,7 +213,7 @@ class TDataShard::TTxCdcStreamScanProgress TTxType GetTxType() const override { return TXTYPE_CDC_STREAM_SCAN_PROGRESS; } bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { - const auto& ev = *Request->Get(); + auto& ev = *Request->Get(); const auto& tablePathId = ev.TablePathId; const auto& streamPathId = ev.StreamPathId; const auto& readVersion = ev.ReadVersion; @@ -238,7 +238,25 @@ class TDataShard::TTxCdcStreamScanProgress } ChangeRecords.clear(); - if (Self->CheckChangesQueueOverflow()) { + + if (!ev.ReservationCookie) { + ev.ReservationCookie = Self->ReserveChangeQueueCapacity(ev.Rows.size()); + } + + if (!ev.ReservationCookie) { + LOG_I("Cannot reserve change queue capacity"); + Reschedule = true; + return true; + } + + if (Self->GetFreeChangeQueueCapacity(ev.ReservationCookie) < ev.Rows.size()) { + LOG_I("Not enough change queue capacity"); + Reschedule = true; + return true; + } + + if (Self->CheckChangesQueueOverflow(ev.ReservationCookie)) { + LOG_I("Change queue overflow"); Reschedule = true; return true; } @@ -335,7 +353,7 @@ class TDataShard::TTxCdcStreamScanProgress LOG_I("Enqueue " << ChangeRecords.size() << " change record(s)" << ": streamPathId# " << Request->Get()->StreamPathId); - Self->EnqueueChangeRecords(std::move(ChangeRecords)); + Self->EnqueueChangeRecords(std::move(ChangeRecords), Request->Get()->ReservationCookie); ctx.Send(Request->Sender, Response.Release()); } else if (Reschedule) { LOG_I("Re-schedule progress tx" diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 12bc7e28e7cb..1bf0be413b01 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -899,6 +899,13 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { } } + if (auto rIt = ChangeQueueReservations.find(record.ReservationCookie); rIt != ChangeQueueReservations.end()) { + --ChangeQueueReservedCapacity; + if (!--rIt->second) { + ChangeQueueReservations.erase(rIt); + } + } + UpdateChangeExchangeLag(AppData()->TimeProvider->Now()); ChangesQueue.erase(it); @@ -908,7 +915,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { CheckChangesQueueNoOverflow(); } -void TDataShard::EnqueueChangeRecords(TVector&& records) { +void TDataShard::EnqueueChangeRecords(TVector&& records, ui64 cookie) { if (!records) { return; } @@ -933,7 +940,7 @@ void TDataShard::EnqueueChangeRecords(TVectorsecond); @@ -956,6 +963,38 @@ void TDataShard::EnqueueChangeRecords(TVectorDataShardConfig.GetChangesQueueItemsLimit(); + if (sizeLimit < ChangesQueue.size()) { + return 0; + } + + const auto free = Min(sizeLimit - ChangesQueue.size(), Max(sizeLimit / 2, 1ul)); + + ui32 reserved = ChangeQueueReservedCapacity; + if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) { + reserved -= it->second; + } + + if (free < reserved) { + return 0; + } + + return free - reserved; +} + +ui64 TDataShard::ReserveChangeQueueCapacity(ui32 capacity) { + const auto sizeLimit = AppData()->DataShardConfig.GetChangesQueueItemsLimit(); + if (Max(sizeLimit / 2, 1ul) < ChangeQueueReservedCapacity) { + return 0; + } + + const auto cookie = NextChangeQueueReservationCookie++; + ChangeQueueReservations.emplace(cookie, capacity); + ChangeQueueReservedCapacity += capacity; + return cookie; +} + void TDataShard::UpdateChangeExchangeLag(TInstant now) { if (!ChangesList.Empty()) { const auto* front = ChangesList.Front(); @@ -3391,19 +3430,31 @@ bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr return false; } -bool TDataShard::CheckChangesQueueOverflow() const { +bool TDataShard::CheckChangesQueueOverflow(ui64 cookie) const { const auto* appData = AppData(); const auto sizeLimit = appData->DataShardConfig.GetChangesQueueItemsLimit(); const auto bytesLimit = appData->DataShardConfig.GetChangesQueueBytesLimit(); - return ChangesQueue.size() >= sizeLimit || ChangesQueueBytes >= bytesLimit; + + ui32 reserved = ChangeQueueReservedCapacity; + if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) { + reserved -= it->second; + } + + return (ChangesQueue.size() + reserved) >= sizeLimit || ChangesQueueBytes >= bytesLimit; } -void TDataShard::CheckChangesQueueNoOverflow() { +void TDataShard::CheckChangesQueueNoOverflow(ui64 cookie) { if (OverloadSubscribersByReason[RejectReasonIndex(ERejectReason::ChangesQueueOverflow)]) { const auto* appData = AppData(); const auto sizeLimit = appData->DataShardConfig.GetChangesQueueItemsLimit(); const auto bytesLimit = appData->DataShardConfig.GetChangesQueueBytesLimit(); - if (ChangesQueue.size() < sizeLimit && ChangesQueueBytes < bytesLimit) { + + ui32 reserved = ChangeQueueReservedCapacity; + if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) { + reserved -= it->second; + } + + if ((ChangesQueue.size() + reserved) < sizeLimit && ChangesQueueBytes < bytesLimit) { NotifyOverloadSubscribers(ERejectReason::ChangesQueueOverflow); } } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index b277cfe1ef43..71847b2ef22c 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -531,6 +531,7 @@ class TDataShard const TRowVersion ReadVersion; const TVector ValueTags; TVector> Rows; + ui64 ReservationCookie = 0; const TCdcStreamScanManager::TStats Stats; }; @@ -1837,7 +1838,9 @@ class TDataShard void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId); void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId); void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order); - void EnqueueChangeRecords(TVector&& records); + void EnqueueChangeRecords(TVector&& records, ui64 cookie = 0); + ui32 GetFreeChangeQueueCapacity(ui64 cookie); + ui64 ReserveChangeQueueCapacity(ui32 capacity); void UpdateChangeExchangeLag(TInstant now); void CreateChangeSender(const TActorContext& ctx); void KillChangeSender(const TActorContext& ctx); @@ -1976,8 +1979,8 @@ class TDataShard void WaitPredictedPlanStep(ui64 step); void SchedulePlanPredictedTxs(); - bool CheckChangesQueueOverflow() const; - void CheckChangesQueueNoOverflow(); + bool CheckChangesQueueOverflow(ui64 cookie = 0) const; + void CheckChangesQueueNoOverflow(ui64 cookie = 0); void DeleteReadIterator(TReadIteratorsMap::iterator it); void CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx); @@ -2709,9 +2712,11 @@ class TDataShard TInstant EnqueuedAt; ui64 LockId; ui64 LockOffset; + ui64 ReservationCookie; explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, - ui64 schemaVersion, TInstant created, TInstant enqueued, ui64 lockId = 0, ui64 lockOffset = 0) + ui64 schemaVersion, TInstant created, TInstant enqueued, + ui64 lockId = 0, ui64 lockOffset = 0, ui64 cookie = 0) : BodySize(bodySize) , TableId(tableId) , SchemaVersion(schemaVersion) @@ -2720,12 +2725,13 @@ class TDataShard , EnqueuedAt(enqueued) , LockId(lockId) , LockOffset(lockOffset) + , ReservationCookie(cookie) { } - explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now) + explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now, ui64 cookie) : TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, record.CreatedAt(), now, - record.LockId, record.LockOffset) + record.LockId, record.LockOffset, cookie) { } }; @@ -2745,6 +2751,9 @@ class TDataShard THashMap ChangesQueue; // ui64 is order TIntrusiveList ChangesList; ui64 ChangesQueueBytes = 0; + THashMap ChangeQueueReservations; + ui64 NextChangeQueueReservationCookie = 1; + ui32 ChangeQueueReservedCapacity = 0; TActorId OutChangeSender; bool OutChangeSenderSuspended = false;