From f02074659a47dfcc60a1ffae49f3e62d5b7ea6f0 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 1 Jan 2025 18:34:46 +0300 Subject: [PATCH 1/7] reduce throughput on special overload status --- ydb/core/tx/columnshard/columnshard.cpp | 2 + .../tx/columnshard/columnshard__write.cpp | 30 +----- ydb/core/tx/columnshard/columnshard_impl.cpp | 1 + ydb/core/tx/columnshard/columnshard_impl.h | 101 ++++++++++++++++++ .../tx/columnshard/counters/columnshard.h | 17 ++- 5 files changed, 122 insertions(+), 29 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index e2ad5f3e0729..a07620240103 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -250,6 +250,8 @@ void TColumnShard::Handle(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorCon const TMonotonic now = TMonotonic::Now(); GetProgressTxController().PingTimeouts(now); ctx.Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup(0)); + } else if (ev->Get()->Tag == 1) { + WriteTasksQueue.Drain(this, true); } } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 05716eb6c571..7b36e540f40f 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -560,11 +560,8 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor return; } - auto overloadStatus = CheckOverloaded(pathId); - if (overloadStatus != EOverloadStatus::None) { - std::unique_ptr result = NEvents::TDataEvents::TEvWriteResult::BuildError( - TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error"); - OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, pathId, source, {}, TGUID::CreateTimebased().AsGuidString()), arrowData->GetSize(), cookie, std::move(result), ctx); + if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) { + sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED); return; } @@ -575,30 +572,13 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor ui64 lockId = 0; if (behaviour == EOperationBehaviour::NoTxWrite) { - lockId = BuildEphemeralTxId(); + lockId = owner->BuildEphemeralTxId(); } else { lockId = record.GetLockTxId(); } - if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) { - sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED); - return; - } - - OperationsManager->RegisterLock(lockId, Generation()); - auto writeOperation = OperationsManager->RegisterOperation( - pathId, lockId, cookie, granuleShardingVersionId, *mType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert()); - - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", arrowData->GetSize())("operation_id", writeOperation->GetIdentifier())( - "in_flight", Counters.GetWritesMonitor()->GetWritesInFlight())("size_in_flight", Counters.GetWritesMonitor()->GetWritesSizeInFlight()); - Counters.GetWritesMonitor()->OnStartWrite(arrowData->GetSize()); - - Y_ABORT_UNLESS(writeOperation); - writeOperation->SetBehaviour(behaviour); - NOlap::TWritingContext wContext(TabletID(), SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters, - Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(), writeOperation->GetActivityChecker()); - arrowData->SetSeparationPoints(GetIndexAs().GetGranulePtrVerified(pathId)->GetBucketPositions()); - writeOperation->Start(*this, arrowData, source, wContext); + WriteTasksQueue.Enqueue(TWriteTask(arrowData, schema, source, granuleShardingVersionId, pathId, cookie, lockId, *mType, behaviour)); + WriteTasksQueue.Drain(this, false); } } // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index fde0bba29cad..332cd7abb0bd 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -77,6 +77,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) , TabletCountersHolder(new TProtobufTabletCounters()) , Counters(*TabletCountersHolder) + , WriteTasksQueue(this) , ProgressTxController(std::make_unique(*this)) , StoragesManager(std::make_shared(*this)) , DataLocksManager(std::make_shared()) diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index c01769721a0f..0f3e377459da 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -461,9 +461,110 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa } } + class TWriteTask: TNonCopyable { + private: + std::shared_ptr ArrowData; + ISnapshotSchema::TPtr Schema; + const NActors::TActorId SourceId; + const std::optional GranuleShardingVersionId; + const ui64 PathId; + const ui64 Cookie; + const ui64 LockId; + const NEvWrite::EModificationType ModificationType; + const EOperationBehaviour Behaviour; + const TMonotonic Created = TMonotonic::Now(); + public: + TWriteTask(const std::shared_ptr& arrowData, const ISnapshotSchema::TPtr& schema, const NActors::TActorId sourceId, + const std::optional& granuleShardingVersionId, const ui64 pathId, const ui64 cookie, const ui64 lockId, + const NEvWrite::EModificationType modificationType, const EOperationBehaviour behaviour) + : ArrowData(arrowData) + , Schema(schema) + , SourceId(sourceId) + , GranuleShardingVersionId(granuleShardingVersionId) + , PathId(pathId) + , Cookie(cookie) + , LockId(lockId) + , Behaviour(behaviour) { + } + + const TMonotonic& GetCreatedMonotonic() const { + return Created; + } + + bool Execute(TColumnShard* owner) { + auto overloadStatus = owner->CheckOverloaded(PathId); + if (overloadStatus == EOverloadStatus::OverloadMetadata) { + AFL_INFO(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "wait_overload")("status", overloadStatus); + return false; + } + Owner->Counters.GetCSCounters().WritingCounters->OnWritingTaskDequeue(TMonotonic::Now() - Created); + if (overloadStatus != EOverloadStatus::None) { + std::unique_ptr result = NEvents::TDataEvents::TEvWriteResult::BuildError( + TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error"); + owner->OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, PathId, Source, {}, TGUID::CreateTimebased().AsGuidString()), + arrowData->GetSize(), Cookie, std::move(result), ctx); + return true; + } + + owner->OperationsManager->RegisterLock(LockId, owner->Generation()); + auto writeOperation = owner->OperationsManager->RegisterOperation( + PathId, LockId, Cookie, GranuleShardingVersionId, *ModificationType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert()); + + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", ArrowData->GetSize())( + "operation_id", writeOperation->GetIdentifier())("in_flight", owner->Counters.GetWritesMonitor()->GetWritesInFlight())( + "size_in_flight", owner->Counters.GetWritesMonitor()->GetWritesSizeInFlight()); + owner->Counters.GetWritesMonitor()->OnStartWrite(ArrowData->GetSize()); + + AFL_VERIFY(writeOperation); + writeOperation->SetBehaviour(Behaviour); + NOlap::TWritingContext wContext(TabletID(), SelfId(), Schema, owner->StoragesManager, + owner->Counters.GetIndexationCounters().SplitterCounters, + owner->Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(), writeOperation->GetActivityChecker()); + ArrowData->SetSeparationPoints(GetIndexAs().GetGranulePtrVerified(PathId)->GetBucketPositions()); + writeOperation->Start(*this, ArrowData, Source, wContext); + return true; + } + }; + + class TWriteTasksQueue { + private: + bool WriteTasksOverloadCheckerScheduled = false; + std::deque WriteTasks; + i64 PredWriteTasksSize = 0; + TColumnShard* Owner; + public: + TWriteTasksQueue(TColumnShard* owner) + : Owner(owner) { + } + + ~TWriteTasksQueue() { + owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Sub(PredWriteTasksSize); + } + + void Enqueue(TWriteTask&& task) { + WriteTasks.emplace_back(std::move(task)); + } + + void Drain(const bool onWakeup) { + if (onWakeup) { + WriteTasksOverloadCheckerScheduled = false; + } + while (WriteTasks.size() && WriteTasks.front().Execute(Owner)) { + WriteTasks.pop_front(); + } + if (!WriteTasks.size() && !WriteTasksOverloadCheckerScheduled) { + Owner->Schedule(TDuration::MilliSeconds(100), NActors::TEvents::TEvWakeup(1)); + WriteTasksOverloadCheckerScheduled = true; + } + Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Add((i64)WriteTasks.size() - PredWriteTasksSize); + PredWriteTasksSize = (i64)WriteTasks.size(); + } + }; + private: std::unique_ptr TabletCountersHolder; TCountersManager Counters; + TWriteTasksQueue WriteTasksQueue; std::unique_ptr ProgressTxController; std::unique_ptr OperationsManager; diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h index e5b55f713690..b19a8e3a1086 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.h +++ b/ydb/core/tx/columnshard/counters/columnshard.h @@ -1,8 +1,9 @@ #pragma once -#include "common/owner.h" #include "initialization.h" #include "tx_progress.h" +#include "common/owner.h" + #include #include @@ -26,14 +27,22 @@ class TWriteCounters: public TCommonCountersOwner { NMonitoring::TDynamicCounters::TCounterPtr VolumeWriteData; NMonitoring::THistogramPtr HistogramBytesWriteDataCount; NMonitoring::THistogramPtr HistogramBytesWriteDataBytes; + NMonitoring::THistogramPtr HistogramDurationQueueWait; public: + const NMonitoring::TDynamicCounters::TCounterPtr QueueWaitSize; + + void OnWritingTaskDequeue(const TDuration d){ + HistogramDurationQueueWait->Collect(d.MilliSeconds()); + } + TWriteCounters(TCommonCountersOwner& owner) - : TBase(owner, "activity", "writing") - { + : TBase(owner, "activity", "writing") { VolumeWriteData = TBase::GetDeriviative("Write/Incoming/Bytes"); HistogramBytesWriteDataCount = TBase::GetHistogram("Write/Incoming/ByBytes/Count", NMonitoring::ExponentialHistogram(18, 2, 100)); HistogramBytesWriteDataBytes = TBase::GetHistogram("Write/Incoming/ByBytes/Bytes", NMonitoring::ExponentialHistogram(18, 2, 100)); + HistogramDurationQueueWait = TBase::GetHistogram("Write/Queue/Waiting/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 100)); + QueueWaitSize = TBase::GetValue("Write/Queue/Size"); } void OnIncomingData(const ui64 dataSize) const { @@ -231,4 +240,4 @@ class TCSCounters: public TCommonCountersOwner { TCSCounters(); }; -} +} // namespace NKikimr::NColumnShard From 0e29fced55996560f1ebb36fa71d8584615e44c8 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 1 Jan 2025 18:54:57 +0300 Subject: [PATCH 2/7] fix build --- ydb/core/tx/columnshard/columnshard.cpp | 2 +- .../tx/columnshard/columnshard__write.cpp | 38 +++++++++++++- ydb/core/tx/columnshard/columnshard_impl.h | 50 ++++--------------- 3 files changed, 47 insertions(+), 43 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index a07620240103..626cfd6fd62d 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -251,7 +251,7 @@ void TColumnShard::Handle(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorCon GetProgressTxController().PingTimeouts(now); ctx.Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup(0)); } else if (ev->Get()->Tag == 1) { - WriteTasksQueue.Drain(this, true); + WriteTasksQueue.Drain(true, ctx); } } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 7b36e540f40f..ef533486f03b 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -447,6 +447,40 @@ class TAbortWriteTransaction: public NTabletFlatExecutor::TTransactionBaseCheckOverloaded(PathId); + if (overloadStatus == EOverloadStatus::OverloadMetadata) { + AFL_INFO(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "wait_overload")("status", overloadStatus); + return false; + } + owner->Counters.GetCSCounters().WritingCounters->OnWritingTaskDequeue(TMonotonic::Now() - Created); + if (overloadStatus != EOverloadStatus::None) { + std::unique_ptr result = NEvents::TDataEvents::TEvWriteResult::BuildError( + owner->TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error"); + owner->OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, PathId, SourceId, {}, TGUID::CreateTimebased().AsGuidString()), + ArrowData->GetSize(), Cookie, std::move(result), ctx); + return true; + } + + owner->OperationsManager->RegisterLock(LockId, owner->Generation()); + auto writeOperation = owner->OperationsManager->RegisterOperation( + PathId, LockId, Cookie, GranuleShardingVersionId, ModificationType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert()); + + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", ArrowData->GetSize())("operation_id", writeOperation->GetIdentifier())( + "in_flight", owner->Counters.GetWritesMonitor()->GetWritesInFlight())( + "size_in_flight", owner->Counters.GetWritesMonitor()->GetWritesSizeInFlight()); + owner->Counters.GetWritesMonitor()->OnStartWrite(ArrowData->GetSize()); + + AFL_VERIFY(writeOperation); + writeOperation->SetBehaviour(Behaviour); + NOlap::TWritingContext wContext(owner->TabletID(), owner->SelfId(), Schema, owner->StoragesManager, + owner->Counters.GetIndexationCounters().SplitterCounters, owner->Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(), + writeOperation->GetActivityChecker()); + ArrowData->SetSeparationPoints(owner->GetIndexAs().GetGranulePtrVerified(PathId)->GetBucketPositions()); + writeOperation->Start(*owner, ArrowData, SourceId, wContext); + return true; +} + void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx) { TMemoryProfileGuard mpg("NEvents::TDataEvents::TEvWrite"); NActors::TLogContextGuard gLogging = @@ -572,13 +606,13 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor ui64 lockId = 0; if (behaviour == EOperationBehaviour::NoTxWrite) { - lockId = owner->BuildEphemeralTxId(); + lockId = BuildEphemeralTxId(); } else { lockId = record.GetLockTxId(); } WriteTasksQueue.Enqueue(TWriteTask(arrowData, schema, source, granuleShardingVersionId, pathId, cookie, lockId, *mType, behaviour)); - WriteTasksQueue.Drain(this, false); + WriteTasksQueue.Drain(false, ctx); } } // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 0f3e377459da..690877577a20 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -90,6 +90,7 @@ class TGeneralCompactColumnEngineChanges; namespace NKikimr::NColumnShard { +class TArrowData; class TEvWriteCommitPrimaryTransactionOperator; class TEvWriteCommitSecondaryTransactionOperator; class TTxFinishAsyncTransaction; @@ -461,10 +462,10 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa } } - class TWriteTask: TNonCopyable { + class TWriteTask: TMoveOnly { private: std::shared_ptr ArrowData; - ISnapshotSchema::TPtr Schema; + NOlap::ISnapshotSchema::TPtr Schema; const NActors::TActorId SourceId; const std::optional GranuleShardingVersionId; const ui64 PathId; @@ -474,7 +475,7 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa const EOperationBehaviour Behaviour; const TMonotonic Created = TMonotonic::Now(); public: - TWriteTask(const std::shared_ptr& arrowData, const ISnapshotSchema::TPtr& schema, const NActors::TActorId sourceId, + TWriteTask(const std::shared_ptr& arrowData, const NOlap::ISnapshotSchema::TPtr& schema, const NActors::TActorId sourceId, const std::optional& granuleShardingVersionId, const ui64 pathId, const ui64 cookie, const ui64 lockId, const NEvWrite::EModificationType modificationType, const EOperationBehaviour behaviour) : ArrowData(arrowData) @@ -484,6 +485,7 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa , PathId(pathId) , Cookie(cookie) , LockId(lockId) + , ModificationType(modificationType) , Behaviour(behaviour) { } @@ -491,39 +493,7 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa return Created; } - bool Execute(TColumnShard* owner) { - auto overloadStatus = owner->CheckOverloaded(PathId); - if (overloadStatus == EOverloadStatus::OverloadMetadata) { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "wait_overload")("status", overloadStatus); - return false; - } - Owner->Counters.GetCSCounters().WritingCounters->OnWritingTaskDequeue(TMonotonic::Now() - Created); - if (overloadStatus != EOverloadStatus::None) { - std::unique_ptr result = NEvents::TDataEvents::TEvWriteResult::BuildError( - TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error"); - owner->OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, PathId, Source, {}, TGUID::CreateTimebased().AsGuidString()), - arrowData->GetSize(), Cookie, std::move(result), ctx); - return true; - } - - owner->OperationsManager->RegisterLock(LockId, owner->Generation()); - auto writeOperation = owner->OperationsManager->RegisterOperation( - PathId, LockId, Cookie, GranuleShardingVersionId, *ModificationType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert()); - - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", ArrowData->GetSize())( - "operation_id", writeOperation->GetIdentifier())("in_flight", owner->Counters.GetWritesMonitor()->GetWritesInFlight())( - "size_in_flight", owner->Counters.GetWritesMonitor()->GetWritesSizeInFlight()); - owner->Counters.GetWritesMonitor()->OnStartWrite(ArrowData->GetSize()); - - AFL_VERIFY(writeOperation); - writeOperation->SetBehaviour(Behaviour); - NOlap::TWritingContext wContext(TabletID(), SelfId(), Schema, owner->StoragesManager, - owner->Counters.GetIndexationCounters().SplitterCounters, - owner->Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(), writeOperation->GetActivityChecker()); - ArrowData->SetSeparationPoints(GetIndexAs().GetGranulePtrVerified(PathId)->GetBucketPositions()); - writeOperation->Start(*this, ArrowData, Source, wContext); - return true; - } + bool Execute(TColumnShard* owner, const TActorContext& ctx); }; class TWriteTasksQueue { @@ -538,22 +508,22 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa } ~TWriteTasksQueue() { - owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Sub(PredWriteTasksSize); + Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Sub(PredWriteTasksSize); } void Enqueue(TWriteTask&& task) { WriteTasks.emplace_back(std::move(task)); } - void Drain(const bool onWakeup) { + void Drain(const bool onWakeup, const TActorContext& ctx) { if (onWakeup) { WriteTasksOverloadCheckerScheduled = false; } - while (WriteTasks.size() && WriteTasks.front().Execute(Owner)) { + while (WriteTasks.size() && WriteTasks.front().Execute(Owner, ctx)) { WriteTasks.pop_front(); } if (!WriteTasks.size() && !WriteTasksOverloadCheckerScheduled) { - Owner->Schedule(TDuration::MilliSeconds(100), NActors::TEvents::TEvWakeup(1)); + Owner->Schedule(TDuration::MilliSeconds(100), new NActors::TEvents::TEvWakeup(1)); WriteTasksOverloadCheckerScheduled = true; } Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Add((i64)WriteTasks.size() - PredWriteTasksSize); From 37bd04bb7e8910ee883256dd0db43b45e9042fda Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 1 Jan 2025 19:33:44 +0300 Subject: [PATCH 3/7] fix build --- ydb/core/tx/columnshard/counters/columnshard.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h index b19a8e3a1086..9becbcfc8755 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.h +++ b/ydb/core/tx/columnshard/counters/columnshard.h @@ -37,12 +37,13 @@ class TWriteCounters: public TCommonCountersOwner { } TWriteCounters(TCommonCountersOwner& owner) - : TBase(owner, "activity", "writing") { + : TBase(owner, "activity", "writing") + , QueueWaitSize(TBase::GetValue("Write/Queue/Size")) + { VolumeWriteData = TBase::GetDeriviative("Write/Incoming/Bytes"); HistogramBytesWriteDataCount = TBase::GetHistogram("Write/Incoming/ByBytes/Count", NMonitoring::ExponentialHistogram(18, 2, 100)); HistogramBytesWriteDataBytes = TBase::GetHistogram("Write/Incoming/ByBytes/Bytes", NMonitoring::ExponentialHistogram(18, 2, 100)); HistogramDurationQueueWait = TBase::GetHistogram("Write/Queue/Waiting/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 100)); - QueueWaitSize = TBase::GetValue("Write/Queue/Size"); } void OnIncomingData(const ui64 dataSize) const { From 9ec3a745afedb0155cedf845fd0496d80e609591 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Thu, 2 Jan 2025 10:32:37 +0300 Subject: [PATCH 4/7] logs --- ydb/core/tx/columnshard/columnshard_impl.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 690877577a20..d3b171697b0b 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -522,9 +522,10 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa while (WriteTasks.size() && WriteTasks.front().Execute(Owner, ctx)) { WriteTasks.pop_front(); } - if (!WriteTasks.size() && !WriteTasksOverloadCheckerScheduled) { - Owner->Schedule(TDuration::MilliSeconds(100), new NActors::TEvents::TEvWakeup(1)); + if (WriteTasks.size() && !WriteTasksOverloadCheckerScheduled) { + Owner->Schedule(TDuration::MilliSeconds(300), new NActors::TEvents::TEvWakeup(1)); WriteTasksOverloadCheckerScheduled = true; + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "queue_on_write")("size", WriteTasks.size()); } Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Add((i64)WriteTasks.size() - PredWriteTasksSize); PredWriteTasksSize = (i64)WriteTasks.size(); From 862044743c40fbe324405c740bbe408453c2ec56 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Thu, 2 Jan 2025 13:34:47 +0300 Subject: [PATCH 5/7] correction --- ydb/core/tx/columnshard/columnshard.cpp | 7 +- .../tx/columnshard/columnshard__write.cpp | 69 +++++----------- ydb/core/tx/columnshard/columnshard_impl.cpp | 3 +- ydb/core/tx/columnshard/columnshard_impl.h | 76 +----------------- .../tx/columnshard/tablet/write_queue.cpp | 78 +++++++++++++++++++ ydb/core/tx/columnshard/tablet/write_queue.h | 62 +++++++++++++++ ydb/core/tx/columnshard/tablet/ya.make | 11 +++ ydb/core/tx/columnshard/ya.make | 45 +++++------ 8 files changed, 205 insertions(+), 146 deletions(-) create mode 100644 ydb/core/tx/columnshard/tablet/write_queue.cpp create mode 100644 ydb/core/tx/columnshard/tablet/write_queue.h create mode 100644 ydb/core/tx/columnshard/tablet/ya.make diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 626cfd6fd62d..e057bc58b173 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -69,8 +70,8 @@ void TColumnShard::TrySwitchToWork(const TActorContext& ctx) { } ProgressTxController->OnTabletInit(); { - const TLogContextGuard gLogging = - NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("self_id", SelfId())("process", "SwitchToWork"); + const TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())( + "self_id", SelfId())("process", "SwitchToWork"); AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "SwitchToWork"); Become(&TThis::StateWork); SignalTabletActive(ctx); @@ -251,7 +252,7 @@ void TColumnShard::Handle(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorCon GetProgressTxController().PingTimeouts(now); ctx.Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup(0)); } else if (ev->Get()->Tag == 1) { - WriteTasksQueue.Drain(true, ctx); + WriteTasksQueue->Drain(true, ctx); } } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index ef533486f03b..8d2236ee85de 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -13,6 +13,7 @@ #include "transactions/operators/ev_write/secondary.h" #include "transactions/operators/ev_write/sync.h" +#include #include #include @@ -46,26 +47,27 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const Y_ABORT("invalid function usage"); } - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "write_overload")("size", writeSize)("path_id", writeMeta.GetTableId())( + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "write_overload")("size", writeSize)("path_id", writeMeta.GetTableId())( "reason", overloadReason); ctx.Send(writeMeta.GetSource(), event.release(), 0, cookie); } -TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 pathId) const { +TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedWait(const ui64 pathId) const { + Counters.GetCSCounters().OnIndexMetadataLimit(NOlap::IColumnEngine::GetMetadataLimit()); + if (TablesManager.GetPrimaryIndex() && TablesManager.GetPrimaryIndex()->IsOverloadedByMetadata(NOlap::IColumnEngine::GetMetadataLimit())) { + return EOverloadStatus::OverloadMetadata; + } + return EOverloadStatus::None; +} + +TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedImmediate(const ui64 pathId) const { if (IsAnyChannelYellowStop()) { return EOverloadStatus::Disk; } - if (InsertTable && InsertTable->IsOverloadedByCommitted(pathId)) { return EOverloadStatus::InsertTable; } - - Counters.GetCSCounters().OnIndexMetadataLimit(NOlap::IColumnEngine::GetMetadataLimit()); - if (TablesManager.GetPrimaryIndex() && TablesManager.GetPrimaryIndex()->IsOverloadedByMetadata(NOlap::IColumnEngine::GetMetadataLimit())) { - return EOverloadStatus::OverloadMetadata; - } - ui64 txLimit = Settings.OverloadTxInFlight; ui64 writesLimit = Settings.OverloadWritesInFlight; ui64 writesSizeLimit = Settings.OverloadWritesSizeInFlight; @@ -93,7 +95,8 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_WRITE)("tablet_id", TabletID())("event", "TEvWritePortionResult"); std::vector noDataWrites = ev->Get()->DetachNoDataWrites(); for (auto&& i : noDataWrites) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "no_data_write_finished")("writing_size", i.GetDataSize())("writing_id", i.GetWriteMeta().GetId()); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "no_data_write_finished")("writing_size", i.GetDataSize())( + "writing_id", i.GetWriteMeta().GetId()); Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1); } if (ev->Get()->GetWriteStatus() == NKikimrProto::OK) { @@ -255,7 +258,10 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex NEvWrite::TWriteData writeData(writeMeta, arrowData, snapshotSchema->GetIndexInfo().GetReplaceKey(), StoragesManager->GetInsertOperator()->StartWritingAction(NOlap::NBlobOperations::EConsumer::WRITING), false); - auto overloadStatus = CheckOverloaded(pathId); + auto overloadStatus = CheckOverloadedImmediate(pathId); + if (overloadStatus == EOverloadStatus::None) { + overloadStatus = CheckOverloadedWait(pathId); + } if (overloadStatus != EOverloadStatus::None) { std::unique_ptr result = std::make_unique( TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED); @@ -447,40 +453,6 @@ class TAbortWriteTransaction: public NTabletFlatExecutor::TTransactionBaseCheckOverloaded(PathId); - if (overloadStatus == EOverloadStatus::OverloadMetadata) { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "wait_overload")("status", overloadStatus); - return false; - } - owner->Counters.GetCSCounters().WritingCounters->OnWritingTaskDequeue(TMonotonic::Now() - Created); - if (overloadStatus != EOverloadStatus::None) { - std::unique_ptr result = NEvents::TDataEvents::TEvWriteResult::BuildError( - owner->TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error"); - owner->OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, PathId, SourceId, {}, TGUID::CreateTimebased().AsGuidString()), - ArrowData->GetSize(), Cookie, std::move(result), ctx); - return true; - } - - owner->OperationsManager->RegisterLock(LockId, owner->Generation()); - auto writeOperation = owner->OperationsManager->RegisterOperation( - PathId, LockId, Cookie, GranuleShardingVersionId, ModificationType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert()); - - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", ArrowData->GetSize())("operation_id", writeOperation->GetIdentifier())( - "in_flight", owner->Counters.GetWritesMonitor()->GetWritesInFlight())( - "size_in_flight", owner->Counters.GetWritesMonitor()->GetWritesSizeInFlight()); - owner->Counters.GetWritesMonitor()->OnStartWrite(ArrowData->GetSize()); - - AFL_VERIFY(writeOperation); - writeOperation->SetBehaviour(Behaviour); - NOlap::TWritingContext wContext(owner->TabletID(), owner->SelfId(), Schema, owner->StoragesManager, - owner->Counters.GetIndexationCounters().SplitterCounters, owner->Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(), - writeOperation->GetActivityChecker()); - ArrowData->SetSeparationPoints(owner->GetIndexAs().GetGranulePtrVerified(PathId)->GetBucketPositions()); - writeOperation->Start(*owner, ArrowData, SourceId, wContext); - return true; -} - void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx) { TMemoryProfileGuard mpg("NEvents::TDataEvents::TEvWrite"); NActors::TLogContextGuard gLogging = @@ -595,7 +567,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor } if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) { - sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED); + sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED); return; } @@ -611,8 +583,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor lockId = record.GetLockTxId(); } - WriteTasksQueue.Enqueue(TWriteTask(arrowData, schema, source, granuleShardingVersionId, pathId, cookie, lockId, *mType, behaviour)); - WriteTasksQueue.Drain(false, ctx); + WriteTasksQueue->TryEnqueue( + this, ctx, TWriteTask(arrowData, schema, source, granuleShardingVersionId, pathId, cookie, lockId, *mType, behaviour)); + WriteTasksQueue->Drain(false, ctx); } } // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 332cd7abb0bd..109ee620ad26 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -42,6 +42,7 @@ #include #include #include +#include #include #include #include @@ -77,7 +78,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) , TabletCountersHolder(new TProtobufTabletCounters()) , Counters(*TabletCountersHolder) - , WriteTasksQueue(this) + , WriteTasksQueue(std::make_unique(this)) , ProgressTxController(std::make_unique(*this)) , StoragesManager(std::make_shared(*this)) , DataLocksManager(std::make_shared()) diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index d3b171697b0b..537ad9128897 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -100,6 +100,7 @@ class TOperationsManager; class TWaitEraseTablesTxSubscriber; class TTxBlobsWritingFinished; class TTxBlobsWritingFailed; +class TWriteTasksQueue; namespace NLoading { class TInsertTableInitializer; @@ -374,7 +375,8 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa private: void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie, std::unique_ptr&& event, const TActorContext& ctx); - EOverloadStatus CheckOverloaded(const ui64 tableId) const; + EOverloadStatus CheckOverloadedImmediate(const ui64 tableId) const; + EOverloadStatus CheckOverloadedWait(const ui64 tableId) const; protected: STFUNC(StateInit) { @@ -462,80 +464,10 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa } } - class TWriteTask: TMoveOnly { - private: - std::shared_ptr ArrowData; - NOlap::ISnapshotSchema::TPtr Schema; - const NActors::TActorId SourceId; - const std::optional GranuleShardingVersionId; - const ui64 PathId; - const ui64 Cookie; - const ui64 LockId; - const NEvWrite::EModificationType ModificationType; - const EOperationBehaviour Behaviour; - const TMonotonic Created = TMonotonic::Now(); - public: - TWriteTask(const std::shared_ptr& arrowData, const NOlap::ISnapshotSchema::TPtr& schema, const NActors::TActorId sourceId, - const std::optional& granuleShardingVersionId, const ui64 pathId, const ui64 cookie, const ui64 lockId, - const NEvWrite::EModificationType modificationType, const EOperationBehaviour behaviour) - : ArrowData(arrowData) - , Schema(schema) - , SourceId(sourceId) - , GranuleShardingVersionId(granuleShardingVersionId) - , PathId(pathId) - , Cookie(cookie) - , LockId(lockId) - , ModificationType(modificationType) - , Behaviour(behaviour) { - } - - const TMonotonic& GetCreatedMonotonic() const { - return Created; - } - - bool Execute(TColumnShard* owner, const TActorContext& ctx); - }; - - class TWriteTasksQueue { - private: - bool WriteTasksOverloadCheckerScheduled = false; - std::deque WriteTasks; - i64 PredWriteTasksSize = 0; - TColumnShard* Owner; - public: - TWriteTasksQueue(TColumnShard* owner) - : Owner(owner) { - } - - ~TWriteTasksQueue() { - Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Sub(PredWriteTasksSize); - } - - void Enqueue(TWriteTask&& task) { - WriteTasks.emplace_back(std::move(task)); - } - - void Drain(const bool onWakeup, const TActorContext& ctx) { - if (onWakeup) { - WriteTasksOverloadCheckerScheduled = false; - } - while (WriteTasks.size() && WriteTasks.front().Execute(Owner, ctx)) { - WriteTasks.pop_front(); - } - if (WriteTasks.size() && !WriteTasksOverloadCheckerScheduled) { - Owner->Schedule(TDuration::MilliSeconds(300), new NActors::TEvents::TEvWakeup(1)); - WriteTasksOverloadCheckerScheduled = true; - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "queue_on_write")("size", WriteTasks.size()); - } - Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Add((i64)WriteTasks.size() - PredWriteTasksSize); - PredWriteTasksSize = (i64)WriteTasks.size(); - } - }; - private: std::unique_ptr TabletCountersHolder; TCountersManager Counters; - TWriteTasksQueue WriteTasksQueue; + std::unique_ptr WriteTasksQueue; std::unique_ptr ProgressTxController; std::unique_ptr OperationsManager; diff --git a/ydb/core/tx/columnshard/tablet/write_queue.cpp b/ydb/core/tx/columnshard/tablet/write_queue.cpp new file mode 100644 index 000000000000..c8e36146f06e --- /dev/null +++ b/ydb/core/tx/columnshard/tablet/write_queue.cpp @@ -0,0 +1,78 @@ +#include "write_queue.h" + +#include + +namespace NKikimr::NColumnShard { + +bool TWriteTask::CheckOverloadImmediate(TColumnShard* owner, const TActorContext& ctx) { + auto overloadStatus = owner->CheckOverloadedImmediate(PathId); + if (overloadStatus == EOverloadStatus::None) { + return false; + } + owner->Counters.GetWritesMonitor()->OnFinishWrite(ArrowData->GetSize()); + owner->Counters.GetCSCounters().WritingCounters->OnWritingTaskDequeue(TMonotonic::Now() - Created); + std::unique_ptr result = NEvents::TDataEvents::TEvWriteResult::BuildError( + owner->TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error: " + ::ToString(overloadStatus)); + owner->OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, PathId, SourceId, {}, TGUID::CreateTimebased().AsGuidString()), + ArrowData->GetSize(), Cookie, std::move(result), ctx); + return true; +} + +bool TWriteTask::Execute(TColumnShard* owner, const TActorContext& ctx) { + overloadStatus = owner->CheckOverloadedWait(PathId); + if (overloadStatus == EOverloadStatus::OverloadMetadata) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "wait_overload")("status", overloadStatus); + return false; + } + AFL_VERIFY(overloadStatus == EOverloadStatus::None); + + owner->OperationsManager->RegisterLock(LockId, owner->Generation()); + auto writeOperation = owner->OperationsManager->RegisterOperation( + PathId, LockId, Cookie, GranuleShardingVersionId, ModificationType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert()); + + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", ArrowData->GetSize())("operation_id", writeOperation->GetIdentifier())( + "in_flight", owner->Counters.GetWritesMonitor()->GetWritesInFlight())( + "size_in_flight", owner->Counters.GetWritesMonitor()->GetWritesSizeInFlight()); + + AFL_VERIFY(writeOperation); + writeOperation->SetBehaviour(Behaviour); + NOlap::TWritingContext wContext(owner->TabletID(), owner->SelfId(), Schema, owner->StoragesManager, + owner->Counters.GetIndexationCounters().SplitterCounters, owner->Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(), + writeOperation->GetActivityChecker()); + ArrowData->SetSeparationPoints(owner->GetIndexAs().GetGranulePtrVerified(PathId)->GetBucketPositions()); + writeOperation->Start(*owner, ArrowData, SourceId, wContext); + return true; +} + +bool TWriteTasksQueue::Drain(const bool onWakeup, const TActorContext& ctx) { + if (onWakeup) { + WriteTasksOverloadCheckerScheduled = false; + } + while (WriteTasks.size() && (WriteTasks.front().CheckOverloadImmediate(Owner, ctx) || WriteTasks.front().Execute(Owner, ctx))) { + WriteTasks.pop_front(); + } + if (WriteTasks.size() && !WriteTasksOverloadCheckerScheduled) { + Owner->Schedule(TDuration::MilliSeconds(300), new NActors::TEvents::TEvWakeup(1)); + WriteTasksOverloadCheckerScheduled = true; + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "queue_on_write")("size", WriteTasks.size()); + } + Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Add((i64)WriteTasks.size() - PredWriteTasksSize); + PredWriteTasksSize = (i64)WriteTasks.size(); + return !WriteTasks.size(); +} + +bool TWriteTasksQueue::TryEnqueue(TColumnShard* owner, const TActorContext& ctx, TWriteTask&& task) { + owner->Counters.GetWritesMonitor()->OnStartWrite(task.GetSize()); + if (!task.CheckOverloadImmediate(owner, ctx)) { + WriteTasks.emplace_back(std::move(task)); + return true; + } else { + return false; + } +} + +TWriteTasksQueue::~TWriteTasksQueue() { + Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Sub(PredWriteTasksSize); +} + +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/tablet/write_queue.h b/ydb/core/tx/columnshard/tablet/write_queue.h new file mode 100644 index 000000000000..81b94766114b --- /dev/null +++ b/ydb/core/tx/columnshard/tablet/write_queue.h @@ -0,0 +1,62 @@ +#pragma once +#include +#include +#include +#include + +namespace NKikimr::NColumnShard { +class TColumnShard; +class TWriteTask: TMoveOnly { +private: + std::shared_ptr ArrowData; + NOlap::ISnapshotSchema::TPtr Schema; + const NActors::TActorId SourceId; + const std::optional GranuleShardingVersionId; + const ui64 PathId; + const ui64 Cookie; + const ui64 LockId; + const NEvWrite::EModificationType ModificationType; + const EOperationBehaviour Behaviour; + const TMonotonic Created = TMonotonic::Now(); + +public: + TWriteTask(const std::shared_ptr& arrowData, const NOlap::ISnapshotSchema::TPtr& schema, const NActors::TActorId sourceId, + const std::optional& granuleShardingVersionId, const ui64 pathId, const ui64 cookie, const ui64 lockId, + const NEvWrite::EModificationType modificationType, const EOperationBehaviour behaviour) + : ArrowData(arrowData) + , Schema(schema) + , SourceId(sourceId) + , GranuleShardingVersionId(granuleShardingVersionId) + , PathId(pathId) + , Cookie(cookie) + , LockId(lockId) + , ModificationType(modificationType) + , Behaviour(behaviour) { + } + + const TMonotonic& GetCreatedMonotonic() const { + return Created; + } + + bool Execute(TColumnShard* owner, const TActorContext& ctx); +}; + +class TWriteTasksQueue { +private: + bool WriteTasksOverloadCheckerScheduled = false; + std::deque WriteTasks; + i64 PredWriteTasksSize = 0; + TColumnShard* Owner; + +public: + TWriteTasksQueue(TColumnShard* owner) + : Owner(owner) { + } + + ~TWriteTasksQueue(); + + bool TryEnqueue(TColumnShard* owner, const TActorContext& ctx, TWriteTask&& task); + bool Drain(const bool onWakeup, const TActorContext& ctx); +}; + +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/tablet/ya.make b/ydb/core/tx/columnshard/tablet/ya.make new file mode 100644 index 000000000000..71e125cb0ca9 --- /dev/null +++ b/ydb/core/tx/columnshard/tablet/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + write_queue.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/hooks/abstract +) + +END() diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index 055037e5d508..28d2f6cbe035 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -30,7 +30,6 @@ GENERATE_ENUM_SERIALIZATION(columnshard.h) GENERATE_ENUM_SERIALIZATION(columnshard_impl.h) PEERDIR( - ydb/library/actors/core ydb/core/actorlib_impl ydb/core/base ydb/core/control @@ -39,38 +38,40 @@ PEERDIR( ydb/core/protos ydb/core/tablet ydb/core/tablet_flat - ydb/core/tx/time_cast - ydb/core/tx/columnshard/engines - ydb/core/tx/columnshard/engines/writer - ydb/core/tx/columnshard/engines/reader/abstract - ydb/core/tx/columnshard/counters - ydb/core/tx/columnshard/common - ydb/core/tx/columnshard/splitter - ydb/core/tx/columnshard/operations - ydb/core/tx/columnshard/transactions - ydb/core/tx/columnshard/transactions/operators - ydb/core/tx/columnshard/blobs_reader ydb/core/tx/columnshard/blobs_action + ydb/core/tx/columnshard/blobs_action/storages_manager + ydb/core/tx/columnshard/blobs_reader + ydb/core/tx/columnshard/common + ydb/core/tx/columnshard/counters + ydb/core/tx/columnshard/data_accessor + ydb/core/tx/columnshard/data_accessor/in_mem ydb/core/tx/columnshard/data_locks ydb/core/tx/columnshard/data_sharing - ydb/core/tx/columnshard/subscriber + ydb/core/tx/columnshard/engines + ydb/core/tx/columnshard/engines/reader/abstract + ydb/core/tx/columnshard/engines/writer ydb/core/tx/columnshard/export - ydb/core/tx/columnshard/tx_reader ydb/core/tx/columnshard/loading - ydb/core/tx/columnshard/data_accessor - ydb/core/tx/columnshard/resource_subscriber ydb/core/tx/columnshard/normalizer - ydb/core/tx/columnshard/blobs_action/storages_manager - ydb/core/tx/columnshard/data_accessor/in_mem - ydb/core/tx/tiering + ydb/core/tx/columnshard/operations + ydb/core/tx/columnshard/resource_subscriber + ydb/core/tx/columnshard/splitter + ydb/core/tx/columnshard/subscriber + ydb/core/tx/columnshard/tablet + ydb/core/tx/columnshard/transactions + ydb/core/tx/columnshard/transactions/operators + ydb/core/tx/columnshard/tx_reader ydb/core/tx/conveyor/usage + ydb/core/tx/long_tx_service/public ydb/core/tx/priorities/service + ydb/core/tx/tiering + ydb/core/tx/time_cast ydb/core/tx/tracing - ydb/core/tx/long_tx_service/public ydb/core/util - ydb/public/api/protos - ydb/library/yql/dq/actors/compute + ydb/library/actors/core ydb/library/chunks_limiter + ydb/library/yql/dq/actors/compute + ydb/public/api/protos ) IF (OS_WINDOWS) From f9ccf909291c1512a74bf73be13544b4c0dfd8ef Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Thu, 2 Jan 2025 13:47:19 +0300 Subject: [PATCH 6/7] correction --- ydb/core/tx/columnshard/columnshard_impl.h | 3 +++ ydb/core/tx/columnshard/tablet/write_queue.cpp | 14 ++++++++++---- ydb/core/tx/columnshard/tablet/write_queue.h | 5 ++++- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 537ad9128897..89c34e6cebbe 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -101,6 +101,7 @@ class TWaitEraseTablesTxSubscriber; class TTxBlobsWritingFinished; class TTxBlobsWritingFailed; class TWriteTasksQueue; +class TWriteTask; namespace NLoading { class TInsertTableInitializer; @@ -230,6 +231,8 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa friend class NLoading::TInFlightReadsInitializer; friend class NLoading::TSpecialValuesInitializer; friend class NLoading::TTablesManagerInitializer; + friend class TWriteTasksQueue; + friend class TWriteTask; class TTxProgressTx; class TTxProposeCancel; diff --git a/ydb/core/tx/columnshard/tablet/write_queue.cpp b/ydb/core/tx/columnshard/tablet/write_queue.cpp index c8e36146f06e..3e8e1a22cea4 100644 --- a/ydb/core/tx/columnshard/tablet/write_queue.cpp +++ b/ydb/core/tx/columnshard/tablet/write_queue.cpp @@ -1,12 +1,14 @@ #include "write_queue.h" #include +#include +#include namespace NKikimr::NColumnShard { bool TWriteTask::CheckOverloadImmediate(TColumnShard* owner, const TActorContext& ctx) { auto overloadStatus = owner->CheckOverloadedImmediate(PathId); - if (overloadStatus == EOverloadStatus::None) { + if (overloadStatus == TColumnShard::EOverloadStatus::None) { return false; } owner->Counters.GetWritesMonitor()->OnFinishWrite(ArrowData->GetSize()); @@ -19,12 +21,12 @@ bool TWriteTask::CheckOverloadImmediate(TColumnShard* owner, const TActorContext } bool TWriteTask::Execute(TColumnShard* owner, const TActorContext& ctx) { - overloadStatus = owner->CheckOverloadedWait(PathId); - if (overloadStatus == EOverloadStatus::OverloadMetadata) { + auto overloadStatus = owner->CheckOverloadedWait(PathId); + if (overloadStatus == TColumnShard::EOverloadStatus::OverloadMetadata) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "wait_overload")("status", overloadStatus); return false; } - AFL_VERIFY(overloadStatus == EOverloadStatus::None); + AFL_VERIFY(overloadStatus == TColumnShard::EOverloadStatus::None); owner->OperationsManager->RegisterLock(LockId, owner->Generation()); auto writeOperation = owner->OperationsManager->RegisterOperation( @@ -44,6 +46,10 @@ bool TWriteTask::Execute(TColumnShard* owner, const TActorContext& ctx) { return true; } +ui64 TWriteTask::GetSize() const { + return ArrowData->GetSize(); +} + bool TWriteTasksQueue::Drain(const bool onWakeup, const TActorContext& ctx) { if (onWakeup) { WriteTasksOverloadCheckerScheduled = false; diff --git a/ydb/core/tx/columnshard/tablet/write_queue.h b/ydb/core/tx/columnshard/tablet/write_queue.h index 81b94766114b..682cdbf7e823 100644 --- a/ydb/core/tx/columnshard/tablet/write_queue.h +++ b/ydb/core/tx/columnshard/tablet/write_queue.h @@ -2,10 +2,10 @@ #include #include #include -#include namespace NKikimr::NColumnShard { class TColumnShard; +class TArrowData; class TWriteTask: TMoveOnly { private: std::shared_ptr ArrowData; @@ -34,10 +34,13 @@ class TWriteTask: TMoveOnly { , Behaviour(behaviour) { } + ui64 GetSize() const; + const TMonotonic& GetCreatedMonotonic() const { return Created; } + bool CheckOverloadImmediate(TColumnShard* owner, const TActorContext& ctx); bool Execute(TColumnShard* owner, const TActorContext& ctx); }; From 15ae8f0af08e24e05e1c94d3e24e3a6cd50ae51a Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Thu, 2 Jan 2025 16:27:41 +0300 Subject: [PATCH 7/7] correction --- .../tx/columnshard/columnshard__write.cpp | 6 ++-- .../tx/columnshard/tablet/write_queue.cpp | 33 +++---------------- ydb/core/tx/columnshard/tablet/write_queue.h | 5 +-- 3 files changed, 8 insertions(+), 36 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 8d2236ee85de..8bff7e300f80 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -54,6 +54,9 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const } TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedWait(const ui64 pathId) const { + if (InsertTable && InsertTable->IsOverloadedByCommitted(pathId)) { + return EOverloadStatus::InsertTable; + } Counters.GetCSCounters().OnIndexMetadataLimit(NOlap::IColumnEngine::GetMetadataLimit()); if (TablesManager.GetPrimaryIndex() && TablesManager.GetPrimaryIndex()->IsOverloadedByMetadata(NOlap::IColumnEngine::GetMetadataLimit())) { return EOverloadStatus::OverloadMetadata; @@ -65,9 +68,6 @@ TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedImmediate(const ui64 if (IsAnyChannelYellowStop()) { return EOverloadStatus::Disk; } - if (InsertTable && InsertTable->IsOverloadedByCommitted(pathId)) { - return EOverloadStatus::InsertTable; - } ui64 txLimit = Settings.OverloadTxInFlight; ui64 writesLimit = Settings.OverloadWritesInFlight; ui64 writesSizeLimit = Settings.OverloadWritesSizeInFlight; diff --git a/ydb/core/tx/columnshard/tablet/write_queue.cpp b/ydb/core/tx/columnshard/tablet/write_queue.cpp index 3e8e1a22cea4..edfa4003ad3c 100644 --- a/ydb/core/tx/columnshard/tablet/write_queue.cpp +++ b/ydb/core/tx/columnshard/tablet/write_queue.cpp @@ -6,27 +6,12 @@ namespace NKikimr::NColumnShard { -bool TWriteTask::CheckOverloadImmediate(TColumnShard* owner, const TActorContext& ctx) { - auto overloadStatus = owner->CheckOverloadedImmediate(PathId); - if (overloadStatus == TColumnShard::EOverloadStatus::None) { - return false; - } - owner->Counters.GetWritesMonitor()->OnFinishWrite(ArrowData->GetSize()); - owner->Counters.GetCSCounters().WritingCounters->OnWritingTaskDequeue(TMonotonic::Now() - Created); - std::unique_ptr result = NEvents::TDataEvents::TEvWriteResult::BuildError( - owner->TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error: " + ::ToString(overloadStatus)); - owner->OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, PathId, SourceId, {}, TGUID::CreateTimebased().AsGuidString()), - ArrowData->GetSize(), Cookie, std::move(result), ctx); - return true; -} - bool TWriteTask::Execute(TColumnShard* owner, const TActorContext& ctx) { auto overloadStatus = owner->CheckOverloadedWait(PathId); - if (overloadStatus == TColumnShard::EOverloadStatus::OverloadMetadata) { + if (overloadStatus != TColumnShard::EOverloadStatus::None) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "wait_overload")("status", overloadStatus); return false; } - AFL_VERIFY(overloadStatus == TColumnShard::EOverloadStatus::None); owner->OperationsManager->RegisterLock(LockId, owner->Generation()); auto writeOperation = owner->OperationsManager->RegisterOperation( @@ -46,15 +31,11 @@ bool TWriteTask::Execute(TColumnShard* owner, const TActorContext& ctx) { return true; } -ui64 TWriteTask::GetSize() const { - return ArrowData->GetSize(); -} - bool TWriteTasksQueue::Drain(const bool onWakeup, const TActorContext& ctx) { if (onWakeup) { WriteTasksOverloadCheckerScheduled = false; } - while (WriteTasks.size() && (WriteTasks.front().CheckOverloadImmediate(Owner, ctx) || WriteTasks.front().Execute(Owner, ctx))) { + while (WriteTasks.size() && WriteTasks.front().Execute(Owner, ctx)) { WriteTasks.pop_front(); } if (WriteTasks.size() && !WriteTasksOverloadCheckerScheduled) { @@ -67,14 +48,8 @@ bool TWriteTasksQueue::Drain(const bool onWakeup, const TActorContext& ctx) { return !WriteTasks.size(); } -bool TWriteTasksQueue::TryEnqueue(TColumnShard* owner, const TActorContext& ctx, TWriteTask&& task) { - owner->Counters.GetWritesMonitor()->OnStartWrite(task.GetSize()); - if (!task.CheckOverloadImmediate(owner, ctx)) { - WriteTasks.emplace_back(std::move(task)); - return true; - } else { - return false; - } +void TWriteTasksQueue::Enqueue(TWriteTask&& task) { + WriteTasks.emplace_back(std::move(task)); } TWriteTasksQueue::~TWriteTasksQueue() { diff --git a/ydb/core/tx/columnshard/tablet/write_queue.h b/ydb/core/tx/columnshard/tablet/write_queue.h index 682cdbf7e823..191934b449a8 100644 --- a/ydb/core/tx/columnshard/tablet/write_queue.h +++ b/ydb/core/tx/columnshard/tablet/write_queue.h @@ -34,13 +34,10 @@ class TWriteTask: TMoveOnly { , Behaviour(behaviour) { } - ui64 GetSize() const; - const TMonotonic& GetCreatedMonotonic() const { return Created; } - bool CheckOverloadImmediate(TColumnShard* owner, const TActorContext& ctx); bool Execute(TColumnShard* owner, const TActorContext& ctx); }; @@ -58,7 +55,7 @@ class TWriteTasksQueue { ~TWriteTasksQueue(); - bool TryEnqueue(TColumnShard* owner, const TActorContext& ctx, TWriteTask&& task); + void Enqueue(TWriteTask&& task); bool Drain(const bool onWakeup, const TActorContext& ctx); };