Skip to content

Commit

Permalink
Reserve change queue capacity in order not to overflow the queue (#3841
Browse files Browse the repository at this point in the history
…) (#3917)
  • Loading branch information
CyberROFL authored Apr 21, 2024
1 parent 1520164 commit 36fc0ee
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 15 deletions.
24 changes: 21 additions & 3 deletions ydb/core/tx/datashard/cdc_stream_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class TDataShard::TTxCdcStreamScanProgress
TTxType GetTxType() const override { return TXTYPE_CDC_STREAM_SCAN_PROGRESS; }

bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
const auto& ev = *Request->Get();
auto& ev = *Request->Get();
const auto& tablePathId = ev.TablePathId;
const auto& streamPathId = ev.StreamPathId;
const auto& readVersion = ev.ReadVersion;
Expand All @@ -238,7 +238,25 @@ class TDataShard::TTxCdcStreamScanProgress
}

ChangeRecords.clear();
if (Self->CheckChangesQueueOverflow()) {

if (!ev.ReservationCookie) {
ev.ReservationCookie = Self->ReserveChangeQueueCapacity(ev.Rows.size());
}

if (!ev.ReservationCookie) {
LOG_I("Cannot reserve change queue capacity");
Reschedule = true;
return true;
}

if (Self->GetFreeChangeQueueCapacity(ev.ReservationCookie) < ev.Rows.size()) {
LOG_I("Not enough change queue capacity");
Reschedule = true;
return true;
}

if (Self->CheckChangesQueueOverflow(ev.ReservationCookie)) {
LOG_I("Change queue overflow");
Reschedule = true;
return true;
}
Expand Down Expand Up @@ -335,7 +353,7 @@ class TDataShard::TTxCdcStreamScanProgress
LOG_I("Enqueue " << ChangeRecords.size() << " change record(s)"
<< ": streamPathId# " << Request->Get()->StreamPathId);

Self->EnqueueChangeRecords(std::move(ChangeRecords));
Self->EnqueueChangeRecords(std::move(ChangeRecords), Request->Get()->ReservationCookie);
ctx.Send(Request->Sender, Response.Release());
} else if (Reschedule) {
LOG_I("Re-schedule progress tx"
Expand Down
63 changes: 57 additions & 6 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,13 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
}
}

if (auto rIt = ChangeQueueReservations.find(record.ReservationCookie); rIt != ChangeQueueReservations.end()) {
--ChangeQueueReservedCapacity;
if (!--rIt->second) {
ChangeQueueReservations.erase(rIt);
}
}

UpdateChangeExchangeLag(AppData()->TimeProvider->Now());
ChangesQueue.erase(it);

Expand All @@ -908,7 +915,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
CheckChangesQueueNoOverflow();
}

void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records) {
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie) {
if (!records) {
return;
}
Expand All @@ -933,7 +940,7 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
auto res = ChangesQueue.emplace(
std::piecewise_construct,
std::forward_as_tuple(record.Order),
std::forward_as_tuple(record, now)
std::forward_as_tuple(record, now, cookie)
);
if (res.second) {
ChangesList.PushBack(&res.first->second);
Expand All @@ -956,6 +963,38 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
Send(OutChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));
}

ui32 TDataShard::GetFreeChangeQueueCapacity(ui64 cookie) {
const auto sizeLimit = AppData()->DataShardConfig.GetChangesQueueItemsLimit();
if (sizeLimit < ChangesQueue.size()) {
return 0;
}

const auto free = Min(sizeLimit - ChangesQueue.size(), Max(sizeLimit / 2, 1ul));

ui32 reserved = ChangeQueueReservedCapacity;
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
reserved -= it->second;
}

if (free < reserved) {
return 0;
}

return free - reserved;
}

ui64 TDataShard::ReserveChangeQueueCapacity(ui32 capacity) {
const auto sizeLimit = AppData()->DataShardConfig.GetChangesQueueItemsLimit();
if (Max(sizeLimit / 2, 1ul) < ChangeQueueReservedCapacity) {
return 0;
}

const auto cookie = NextChangeQueueReservationCookie++;
ChangeQueueReservations.emplace(cookie, capacity);
ChangeQueueReservedCapacity += capacity;
return cookie;
}

void TDataShard::UpdateChangeExchangeLag(TInstant now) {
if (!ChangesList.Empty()) {
const auto* front = ChangesList.Front();
Expand Down Expand Up @@ -3391,19 +3430,31 @@ bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr
return false;
}

bool TDataShard::CheckChangesQueueOverflow() const {
bool TDataShard::CheckChangesQueueOverflow(ui64 cookie) const {
const auto* appData = AppData();
const auto sizeLimit = appData->DataShardConfig.GetChangesQueueItemsLimit();
const auto bytesLimit = appData->DataShardConfig.GetChangesQueueBytesLimit();
return ChangesQueue.size() >= sizeLimit || ChangesQueueBytes >= bytesLimit;

ui32 reserved = ChangeQueueReservedCapacity;
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
reserved -= it->second;
}

return (ChangesQueue.size() + reserved) >= sizeLimit || ChangesQueueBytes >= bytesLimit;
}

void TDataShard::CheckChangesQueueNoOverflow() {
void TDataShard::CheckChangesQueueNoOverflow(ui64 cookie) {
if (OverloadSubscribersByReason[RejectReasonIndex(ERejectReason::ChangesQueueOverflow)]) {
const auto* appData = AppData();
const auto sizeLimit = appData->DataShardConfig.GetChangesQueueItemsLimit();
const auto bytesLimit = appData->DataShardConfig.GetChangesQueueBytesLimit();
if (ChangesQueue.size() < sizeLimit && ChangesQueueBytes < bytesLimit) {

ui32 reserved = ChangeQueueReservedCapacity;
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
reserved -= it->second;
}

if ((ChangesQueue.size() + reserved) < sizeLimit && ChangesQueueBytes < bytesLimit) {
NotifyOverloadSubscribers(ERejectReason::ChangesQueueOverflow);
}
}
Expand Down
21 changes: 15 additions & 6 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ class TDataShard
const TRowVersion ReadVersion;
const TVector<ui32> ValueTags;
TVector<std::pair<TSerializedCellVec, TSerializedCellVec>> Rows;
ui64 ReservationCookie = 0;
const TCdcStreamScanManager::TStats Stats;
};

Expand Down Expand Up @@ -1837,7 +1838,9 @@ class TDataShard
void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId);
void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId);
void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order);
void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records);
void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie = 0);
ui32 GetFreeChangeQueueCapacity(ui64 cookie);
ui64 ReserveChangeQueueCapacity(ui32 capacity);
void UpdateChangeExchangeLag(TInstant now);
void CreateChangeSender(const TActorContext& ctx);
void KillChangeSender(const TActorContext& ctx);
Expand Down Expand Up @@ -1976,8 +1979,8 @@ class TDataShard
void WaitPredictedPlanStep(ui64 step);
void SchedulePlanPredictedTxs();

bool CheckChangesQueueOverflow() const;
void CheckChangesQueueNoOverflow();
bool CheckChangesQueueOverflow(ui64 cookie = 0) const;
void CheckChangesQueueNoOverflow(ui64 cookie = 0);

void DeleteReadIterator(TReadIteratorsMap::iterator it);
void CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx);
Expand Down Expand Up @@ -2709,9 +2712,11 @@ class TDataShard
TInstant EnqueuedAt;
ui64 LockId;
ui64 LockOffset;
ui64 ReservationCookie;

explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId,
ui64 schemaVersion, TInstant created, TInstant enqueued, ui64 lockId = 0, ui64 lockOffset = 0)
ui64 schemaVersion, TInstant created, TInstant enqueued,
ui64 lockId = 0, ui64 lockOffset = 0, ui64 cookie = 0)
: BodySize(bodySize)
, TableId(tableId)
, SchemaVersion(schemaVersion)
Expand All @@ -2720,12 +2725,13 @@ class TDataShard
, EnqueuedAt(enqueued)
, LockId(lockId)
, LockOffset(lockOffset)
, ReservationCookie(cookie)
{
}

explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now)
explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now, ui64 cookie)
: TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, record.CreatedAt(), now,
record.LockId, record.LockOffset)
record.LockId, record.LockOffset, cookie)
{
}
};
Expand All @@ -2745,6 +2751,9 @@ class TDataShard
THashMap<ui64, TEnqueuedRecord> ChangesQueue; // ui64 is order
TIntrusiveList<TEnqueuedRecord, TEnqueuedRecordTag> ChangesList;
ui64 ChangesQueueBytes = 0;
THashMap<ui64, ui32> ChangeQueueReservations;
ui64 NextChangeQueueReservationCookie = 1;
ui32 ChangeQueueReservedCapacity = 0;
TActorId OutChangeSender;
bool OutChangeSenderSuspended = false;

Expand Down

0 comments on commit 36fc0ee

Please sign in to comment.