diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index e2ad5f3e0729..e057bc58b173 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -69,8 +70,8 @@ void TColumnShard::TrySwitchToWork(const TActorContext& ctx) { } ProgressTxController->OnTabletInit(); { - const TLogContextGuard gLogging = - NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("self_id", SelfId())("process", "SwitchToWork"); + const TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())( + "self_id", SelfId())("process", "SwitchToWork"); AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "SwitchToWork"); Become(&TThis::StateWork); SignalTabletActive(ctx); @@ -250,6 +251,8 @@ void TColumnShard::Handle(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorCon const TMonotonic now = TMonotonic::Now(); GetProgressTxController().PingTimeouts(now); ctx.Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup(0)); + } else if (ev->Get()->Tag == 1) { + WriteTasksQueue->Drain(true, ctx); } } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 05716eb6c571..1bf99d34aa9d 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -13,6 +13,7 @@ #include "transactions/operators/ev_write/secondary.h" #include "transactions/operators/ev_write/sync.h" +#include #include #include @@ -46,26 +47,27 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const Y_ABORT("invalid function usage"); } - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "write_overload")("size", writeSize)("path_id", writeMeta.GetTableId())( + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "write_overload")("size", writeSize)("path_id", writeMeta.GetTableId())( "reason", overloadReason); ctx.Send(writeMeta.GetSource(), event.release(), 0, cookie); } -TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 pathId) const { - if (IsAnyChannelYellowStop()) { - return EOverloadStatus::Disk; - } - +TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedWait(const ui64 pathId) const { if (InsertTable && InsertTable->IsOverloadedByCommitted(pathId)) { return EOverloadStatus::InsertTable; } - Counters.GetCSCounters().OnIndexMetadataLimit(NOlap::IColumnEngine::GetMetadataLimit()); if (TablesManager.GetPrimaryIndex() && TablesManager.GetPrimaryIndex()->IsOverloadedByMetadata(NOlap::IColumnEngine::GetMetadataLimit())) { return EOverloadStatus::OverloadMetadata; } + return EOverloadStatus::None; +} +TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedImmediate(const ui64 pathId) const { + if (IsAnyChannelYellowStop()) { + return EOverloadStatus::Disk; + } ui64 txLimit = Settings.OverloadTxInFlight; ui64 writesLimit = Settings.OverloadWritesInFlight; ui64 writesSizeLimit = Settings.OverloadWritesSizeInFlight; @@ -93,7 +95,8 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_WRITE)("tablet_id", TabletID())("event", "TEvWritePortionResult"); std::vector noDataWrites = ev->Get()->DetachNoDataWrites(); for (auto&& i : noDataWrites) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "no_data_write_finished")("writing_size", i.GetDataSize())("writing_id", i.GetWriteMeta().GetId()); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "no_data_write_finished")("writing_size", i.GetDataSize())( + "writing_id", i.GetWriteMeta().GetId()); Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1); } if (ev->Get()->GetWriteStatus() == NKikimrProto::OK) { @@ -255,7 +258,10 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex NEvWrite::TWriteData writeData(writeMeta, arrowData, snapshotSchema->GetIndexInfo().GetReplaceKey(), StoragesManager->GetInsertOperator()->StartWritingAction(NOlap::NBlobOperations::EConsumer::WRITING), false); - auto overloadStatus = CheckOverloaded(pathId); + auto overloadStatus = CheckOverloadedImmediate(pathId); + if (overloadStatus == EOverloadStatus::None) { + overloadStatus = CheckOverloadedWait(pathId); + } if (overloadStatus != EOverloadStatus::None) { std::unique_ptr result = std::make_unique( TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED); @@ -560,11 +566,17 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor return; } - auto overloadStatus = CheckOverloaded(pathId); + if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) { + sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED); + return; + } + + auto overloadStatus = CheckOverloadedImmediate(pathId); if (overloadStatus != EOverloadStatus::None) { std::unique_ptr result = NEvents::TDataEvents::TEvWriteResult::BuildError( TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error"); - OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, pathId, source, {}, TGUID::CreateTimebased().AsGuidString()), arrowData->GetSize(), cookie, std::move(result), ctx); + OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, pathId, source, {}, TGUID::CreateTimebased().AsGuidString()), + arrowData->GetSize(), cookie, std::move(result), ctx); return; } @@ -580,25 +592,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor lockId = record.GetLockTxId(); } - if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) { - sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED); - return; - } - - OperationsManager->RegisterLock(lockId, Generation()); - auto writeOperation = OperationsManager->RegisterOperation( - pathId, lockId, cookie, granuleShardingVersionId, *mType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert()); - - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", arrowData->GetSize())("operation_id", writeOperation->GetIdentifier())( - "in_flight", Counters.GetWritesMonitor()->GetWritesInFlight())("size_in_flight", Counters.GetWritesMonitor()->GetWritesSizeInFlight()); Counters.GetWritesMonitor()->OnStartWrite(arrowData->GetSize()); - - Y_ABORT_UNLESS(writeOperation); - writeOperation->SetBehaviour(behaviour); - NOlap::TWritingContext wContext(TabletID(), SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters, - Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(), writeOperation->GetActivityChecker()); - arrowData->SetSeparationPoints(GetIndexAs().GetGranulePtrVerified(pathId)->GetBucketPositions()); - writeOperation->Start(*this, arrowData, source, wContext); + WriteTasksQueue->Enqueue(TWriteTask(arrowData, schema, source, granuleShardingVersionId, pathId, cookie, lockId, *mType, behaviour)); + WriteTasksQueue->Drain(false, ctx); } } // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index fde0bba29cad..109ee620ad26 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -42,6 +42,7 @@ #include #include #include +#include #include #include #include @@ -77,6 +78,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) , TabletCountersHolder(new TProtobufTabletCounters()) , Counters(*TabletCountersHolder) + , WriteTasksQueue(std::make_unique(this)) , ProgressTxController(std::make_unique(*this)) , StoragesManager(std::make_shared(*this)) , DataLocksManager(std::make_shared()) diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index c01769721a0f..89c34e6cebbe 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -90,6 +90,7 @@ class TGeneralCompactColumnEngineChanges; namespace NKikimr::NColumnShard { +class TArrowData; class TEvWriteCommitPrimaryTransactionOperator; class TEvWriteCommitSecondaryTransactionOperator; class TTxFinishAsyncTransaction; @@ -99,6 +100,8 @@ class TOperationsManager; class TWaitEraseTablesTxSubscriber; class TTxBlobsWritingFinished; class TTxBlobsWritingFailed; +class TWriteTasksQueue; +class TWriteTask; namespace NLoading { class TInsertTableInitializer; @@ -228,6 +231,8 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa friend class NLoading::TInFlightReadsInitializer; friend class NLoading::TSpecialValuesInitializer; friend class NLoading::TTablesManagerInitializer; + friend class TWriteTasksQueue; + friend class TWriteTask; class TTxProgressTx; class TTxProposeCancel; @@ -373,7 +378,8 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa private: void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie, std::unique_ptr&& event, const TActorContext& ctx); - EOverloadStatus CheckOverloaded(const ui64 tableId) const; + EOverloadStatus CheckOverloadedImmediate(const ui64 tableId) const; + EOverloadStatus CheckOverloadedWait(const ui64 tableId) const; protected: STFUNC(StateInit) { @@ -464,6 +470,7 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa private: std::unique_ptr TabletCountersHolder; TCountersManager Counters; + std::unique_ptr WriteTasksQueue; std::unique_ptr ProgressTxController; std::unique_ptr OperationsManager; diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h index e5b55f713690..9becbcfc8755 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.h +++ b/ydb/core/tx/columnshard/counters/columnshard.h @@ -1,8 +1,9 @@ #pragma once -#include "common/owner.h" #include "initialization.h" #include "tx_progress.h" +#include "common/owner.h" + #include #include @@ -26,14 +27,23 @@ class TWriteCounters: public TCommonCountersOwner { NMonitoring::TDynamicCounters::TCounterPtr VolumeWriteData; NMonitoring::THistogramPtr HistogramBytesWriteDataCount; NMonitoring::THistogramPtr HistogramBytesWriteDataBytes; + NMonitoring::THistogramPtr HistogramDurationQueueWait; public: + const NMonitoring::TDynamicCounters::TCounterPtr QueueWaitSize; + + void OnWritingTaskDequeue(const TDuration d){ + HistogramDurationQueueWait->Collect(d.MilliSeconds()); + } + TWriteCounters(TCommonCountersOwner& owner) : TBase(owner, "activity", "writing") + , QueueWaitSize(TBase::GetValue("Write/Queue/Size")) { VolumeWriteData = TBase::GetDeriviative("Write/Incoming/Bytes"); HistogramBytesWriteDataCount = TBase::GetHistogram("Write/Incoming/ByBytes/Count", NMonitoring::ExponentialHistogram(18, 2, 100)); HistogramBytesWriteDataBytes = TBase::GetHistogram("Write/Incoming/ByBytes/Bytes", NMonitoring::ExponentialHistogram(18, 2, 100)); + HistogramDurationQueueWait = TBase::GetHistogram("Write/Queue/Waiting/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 100)); } void OnIncomingData(const ui64 dataSize) const { @@ -231,4 +241,4 @@ class TCSCounters: public TCommonCountersOwner { TCSCounters(); }; -} +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/tablet/write_queue.cpp b/ydb/core/tx/columnshard/tablet/write_queue.cpp new file mode 100644 index 000000000000..1e745018e259 --- /dev/null +++ b/ydb/core/tx/columnshard/tablet/write_queue.cpp @@ -0,0 +1,60 @@ +#include "write_queue.h" + +#include +#include +#include + +namespace NKikimr::NColumnShard { + +bool TWriteTask::Execute(TColumnShard* owner, const TActorContext& ctx) { + auto overloadStatus = owner->CheckOverloadedWait(PathId); + if (overloadStatus != TColumnShard::EOverloadStatus::None) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "wait_overload")("status", overloadStatus); + return false; + } + + owner->Counters.GetCSCounters().WritingCounters->OnWritingTaskDequeue(TMonotonic::Now() - Created); + owner->OperationsManager->RegisterLock(LockId, owner->Generation()); + auto writeOperation = owner->OperationsManager->RegisterOperation( + PathId, LockId, Cookie, GranuleShardingVersionId, ModificationType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert()); + + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", ArrowData->GetSize())("operation_id", writeOperation->GetIdentifier())( + "in_flight", owner->Counters.GetWritesMonitor()->GetWritesInFlight())( + "size_in_flight", owner->Counters.GetWritesMonitor()->GetWritesSizeInFlight()); + + AFL_VERIFY(writeOperation); + writeOperation->SetBehaviour(Behaviour); + NOlap::TWritingContext wContext(owner->TabletID(), owner->SelfId(), Schema, owner->StoragesManager, + owner->Counters.GetIndexationCounters().SplitterCounters, owner->Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(), + writeOperation->GetActivityChecker()); + ArrowData->SetSeparationPoints(owner->GetIndexAs().GetGranulePtrVerified(PathId)->GetBucketPositions()); + writeOperation->Start(*owner, ArrowData, SourceId, wContext); + return true; +} + +bool TWriteTasksQueue::Drain(const bool onWakeup, const TActorContext& ctx) { + if (onWakeup) { + WriteTasksOverloadCheckerScheduled = false; + } + while (WriteTasks.size() && WriteTasks.front().Execute(Owner, ctx)) { + WriteTasks.pop_front(); + } + if (WriteTasks.size() && !WriteTasksOverloadCheckerScheduled) { + Owner->Schedule(TDuration::MilliSeconds(300), new NActors::TEvents::TEvWakeup(1)); + WriteTasksOverloadCheckerScheduled = true; + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "queue_on_write")("size", WriteTasks.size()); + } + Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Add((i64)WriteTasks.size() - PredWriteTasksSize); + PredWriteTasksSize = (i64)WriteTasks.size(); + return !WriteTasks.size(); +} + +void TWriteTasksQueue::Enqueue(TWriteTask&& task) { + WriteTasks.emplace_back(std::move(task)); +} + +TWriteTasksQueue::~TWriteTasksQueue() { + Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Sub(PredWriteTasksSize); +} + +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/tablet/write_queue.h b/ydb/core/tx/columnshard/tablet/write_queue.h new file mode 100644 index 000000000000..191934b449a8 --- /dev/null +++ b/ydb/core/tx/columnshard/tablet/write_queue.h @@ -0,0 +1,62 @@ +#pragma once +#include +#include +#include + +namespace NKikimr::NColumnShard { +class TColumnShard; +class TArrowData; +class TWriteTask: TMoveOnly { +private: + std::shared_ptr ArrowData; + NOlap::ISnapshotSchema::TPtr Schema; + const NActors::TActorId SourceId; + const std::optional GranuleShardingVersionId; + const ui64 PathId; + const ui64 Cookie; + const ui64 LockId; + const NEvWrite::EModificationType ModificationType; + const EOperationBehaviour Behaviour; + const TMonotonic Created = TMonotonic::Now(); + +public: + TWriteTask(const std::shared_ptr& arrowData, const NOlap::ISnapshotSchema::TPtr& schema, const NActors::TActorId sourceId, + const std::optional& granuleShardingVersionId, const ui64 pathId, const ui64 cookie, const ui64 lockId, + const NEvWrite::EModificationType modificationType, const EOperationBehaviour behaviour) + : ArrowData(arrowData) + , Schema(schema) + , SourceId(sourceId) + , GranuleShardingVersionId(granuleShardingVersionId) + , PathId(pathId) + , Cookie(cookie) + , LockId(lockId) + , ModificationType(modificationType) + , Behaviour(behaviour) { + } + + const TMonotonic& GetCreatedMonotonic() const { + return Created; + } + + bool Execute(TColumnShard* owner, const TActorContext& ctx); +}; + +class TWriteTasksQueue { +private: + bool WriteTasksOverloadCheckerScheduled = false; + std::deque WriteTasks; + i64 PredWriteTasksSize = 0; + TColumnShard* Owner; + +public: + TWriteTasksQueue(TColumnShard* owner) + : Owner(owner) { + } + + ~TWriteTasksQueue(); + + void Enqueue(TWriteTask&& task); + bool Drain(const bool onWakeup, const TActorContext& ctx); +}; + +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/tablet/ya.make b/ydb/core/tx/columnshard/tablet/ya.make new file mode 100644 index 000000000000..71e125cb0ca9 --- /dev/null +++ b/ydb/core/tx/columnshard/tablet/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + write_queue.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/hooks/abstract +) + +END() diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index 055037e5d508..28d2f6cbe035 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -30,7 +30,6 @@ GENERATE_ENUM_SERIALIZATION(columnshard.h) GENERATE_ENUM_SERIALIZATION(columnshard_impl.h) PEERDIR( - ydb/library/actors/core ydb/core/actorlib_impl ydb/core/base ydb/core/control @@ -39,38 +38,40 @@ PEERDIR( ydb/core/protos ydb/core/tablet ydb/core/tablet_flat - ydb/core/tx/time_cast - ydb/core/tx/columnshard/engines - ydb/core/tx/columnshard/engines/writer - ydb/core/tx/columnshard/engines/reader/abstract - ydb/core/tx/columnshard/counters - ydb/core/tx/columnshard/common - ydb/core/tx/columnshard/splitter - ydb/core/tx/columnshard/operations - ydb/core/tx/columnshard/transactions - ydb/core/tx/columnshard/transactions/operators - ydb/core/tx/columnshard/blobs_reader ydb/core/tx/columnshard/blobs_action + ydb/core/tx/columnshard/blobs_action/storages_manager + ydb/core/tx/columnshard/blobs_reader + ydb/core/tx/columnshard/common + ydb/core/tx/columnshard/counters + ydb/core/tx/columnshard/data_accessor + ydb/core/tx/columnshard/data_accessor/in_mem ydb/core/tx/columnshard/data_locks ydb/core/tx/columnshard/data_sharing - ydb/core/tx/columnshard/subscriber + ydb/core/tx/columnshard/engines + ydb/core/tx/columnshard/engines/reader/abstract + ydb/core/tx/columnshard/engines/writer ydb/core/tx/columnshard/export - ydb/core/tx/columnshard/tx_reader ydb/core/tx/columnshard/loading - ydb/core/tx/columnshard/data_accessor - ydb/core/tx/columnshard/resource_subscriber ydb/core/tx/columnshard/normalizer - ydb/core/tx/columnshard/blobs_action/storages_manager - ydb/core/tx/columnshard/data_accessor/in_mem - ydb/core/tx/tiering + ydb/core/tx/columnshard/operations + ydb/core/tx/columnshard/resource_subscriber + ydb/core/tx/columnshard/splitter + ydb/core/tx/columnshard/subscriber + ydb/core/tx/columnshard/tablet + ydb/core/tx/columnshard/transactions + ydb/core/tx/columnshard/transactions/operators + ydb/core/tx/columnshard/tx_reader ydb/core/tx/conveyor/usage + ydb/core/tx/long_tx_service/public ydb/core/tx/priorities/service + ydb/core/tx/tiering + ydb/core/tx/time_cast ydb/core/tx/tracing - ydb/core/tx/long_tx_service/public ydb/core/util - ydb/public/api/protos - ydb/library/yql/dq/actors/compute + ydb/library/actors/core ydb/library/chunks_limiter + ydb/library/yql/dq/actors/compute + ydb/public/api/protos ) IF (OS_WINDOWS)