diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 844c793c7112..54f0bf11e258 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1422,6 +1422,8 @@ message TColumnShardConfig { optional bool IndexationEnabled = 4 [default = true]; optional bool CompactionEnabled = 5 [default = true]; optional bool TTLEnabled = 6 [default = true]; + optional bool WritingEnabled = 7 [default = true]; + optional uint32 WritingBufferDurationMs = 8 [default = 0]; } message TSchemeShardConfig { diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp b/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp index 1f0bc8559fb8..766dd21da79d 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp +++ b/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp @@ -73,9 +73,9 @@ void IBlobsReadingAction::OnReadError(const TBlobRange& range, const TErrorStatu void IBlobsReadingAction::AddRange(const TBlobRange& range, const TString& result /*= Default()*/) { Y_ABORT_UNLESS(!Started); if (!result) { - Y_ABORT_UNLESS(RangesForRead[range.BlobId].emplace(range).second); + AFL_VERIFY(RangesForRead[range.BlobId].emplace(range).second)("range", range.ToString()); } else { - Y_ABORT_UNLESS(RangesForResult.emplace(range, result).second); + AFL_VERIFY(RangesForResult.emplace(range, result).second)("range", range.ToString()); } } diff --git a/ydb/core/tx/columnshard/blobs_action/counters/write.cpp b/ydb/core/tx/columnshard/blobs_action/counters/write.cpp index 5604d9d98b5a..25ef9ed6d1b2 100644 --- a/ydb/core/tx/columnshard/blobs_action/counters/write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/counters/write.cpp @@ -13,6 +13,8 @@ TWriteCounters::TWriteCounters(const TConsumerCounters& owner) ReplyBytes = TBase::GetDeriviative("Replies/Bytes"); ReplyDurationBySize = TBase::GetHistogram("Replies/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1)); ReplyDurationByCount = TBase::GetHistogram("Replies/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 1)); + WritesBySize = TBase::GetHistogram("Writes/Bytes", NMonitoring::ExponentialHistogram(25, 2, 1)); + VolumeByChunkSize = TBase::GetHistogram("Volume/Bytes", NMonitoring::ExponentialHistogram(25, 2, 1)); FailsCount = TBase::GetDeriviative("Fails/Count"); FailBytes = TBase::GetDeriviative("Fails/Bytes"); diff --git a/ydb/core/tx/columnshard/blobs_action/counters/write.h b/ydb/core/tx/columnshard/blobs_action/counters/write.h index 591937f0de2b..a9b7f0282817 100644 --- a/ydb/core/tx/columnshard/blobs_action/counters/write.h +++ b/ydb/core/tx/columnshard/blobs_action/counters/write.h @@ -16,6 +16,8 @@ class TWriteCounters: public NColumnShard::TCommonCountersOwner { NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes; NMonitoring::THistogramPtr ReplyDurationByCount; NMonitoring::THistogramPtr ReplyDurationBySize; + NMonitoring::THistogramPtr WritesBySize; + NMonitoring::THistogramPtr VolumeByChunkSize; NMonitoring::TDynamicCounters::TCounterPtr FailsCount; NMonitoring::TDynamicCounters::TCounterPtr FailBytes; @@ -34,6 +36,8 @@ class TWriteCounters: public NColumnShard::TCommonCountersOwner { ReplyBytes->Add(bytes); ReplyDurationByCount->Collect((i64)d.MilliSeconds()); ReplyDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes); + WritesBySize->Collect((i64)bytes); + VolumeByChunkSize->Collect((i64)bytes, (i64)bytes); } void OnFail(const ui64 bytes, const TDuration d) const { diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp index 26cedb6a87db..3b6529beed4c 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp @@ -14,9 +14,7 @@ bool TTxInsertTableCleanup::Execute(TTransactionContext& txc, const TActorContex auto storage = Self->StoragesManager->GetInsertOperator(); BlobsAction = storage->StartDeclareRemovingAction("TX_CLEANUP"); for (auto& [abortedWriteId, abortedData] : allAborted) { - Self->InsertTable->EraseAborted(dbTable, abortedData); - Y_ABORT_UNLESS(abortedData.GetBlobRange().IsFullBlob()); - BlobsAction->DeclareRemove(abortedData.GetBlobRange().GetBlobId()); + Self->InsertTable->EraseAborted(dbTable, abortedData, BlobsAction); } BlobsAction->OnExecuteTxAfterRemoving(*Self, blobManagerDb, true); return true; diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 16f5105e670f..6eca7b4878b0 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -1,26 +1,27 @@ #include "tx_write.h" namespace NKikimr::NColumnShard { -bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob) { - const NKikimrTxColumnShard::TLogicalMetadata& meta = blobData.GetLogicalMeta(); - - const auto& blobRange = blobData.GetBlobRange(); +bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId) { + NKikimrTxColumnShard::TLogicalMetadata meta; + meta.SetNumRows(batch->GetRowsCount()); + meta.SetRawBytes(batch->GetRawBytes()); + meta.SetDirtyWriteTimeSeconds(batch.GetStartInstant().Seconds()); + meta.SetSpecialKeysRawData(batch->GetSpecialKeysSafe().SerializeToString()); + + const auto& blobRange = batch.GetRange(); Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid()); // First write wins TBlobGroupSelector dsGroupSelector(Self->Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta()); - - auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(PutBlobResult->Get()->GetSchemaVersion()); + const auto& writeMeta = batch.GetAggregation().GetWriteData()->GetWriteMeta(); + auto schemeVersion = batch.GetAggregation().GetWriteData()->GetData()->GetSchemaVersion(); + auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion); - NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), blob); + NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData()); bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); if (ok) { - // Put new data into blob cache - Y_ABORT_UNLESS(blobRange.IsFullBlob()); - Self->UpdateInsertTableCounters(); return true; } @@ -31,62 +32,89 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute"); ACFL_DEBUG("event", "start_execute"); - const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta()); - Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId())); - - txc.DB.NoMoreReadsForTx(); - TWriteOperation::TPtr operation; - if (writeMeta.HasLongTxId()) { - AFL_VERIFY(PutBlobResult->Get()->GetBlobData().size() == 1)("count", PutBlobResult->Get()->GetBlobData().size()); - } else { - operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); - Y_ABORT_UNLESS(operation); - Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started); - } + const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer(); + for (auto&& aggr : buffer.GetAggregations()) { + const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta(); + Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId())); + txc.DB.NoMoreReadsForTx(); + TWriteOperation::TPtr operation; + if (writeMeta.HasLongTxId()) { + AFL_VERIFY(aggr->GetSplittedBlobs().size() == 1)("count", aggr->GetSplittedBlobs().size()); + } else { + operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); + Y_ABORT_UNLESS(operation); + Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started); + } - TVector writeIds; - for (auto blobData : PutBlobResult->Get()->GetBlobData()) { auto writeId = TWriteId(writeMeta.GetWriteId()); - if (operation) { - writeId = Self->BuildNextWriteId(txc); - } else { + if (!operation) { NIceDb::TNiceDb db(txc.DB); writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId()); + aggr->AddWriteId(writeId); } - if (!InsertOneBlob(txc, blobData, writeId, PutBlobResult->Get()->GetBlobVerified(blobData.GetBlobRange()))) { - LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix()); - Self->IncCounter(COUNTER_WRITE_DUPLICATE); + for (auto&& i : aggr->GetSplittedBlobs()) { + if (operation) { + writeId = Self->BuildNextWriteId(txc); + aggr->AddWriteId(writeId); + } + + if (!InsertOneBlob(txc, i, writeId)) { + LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix()); + Self->IncCounter(COUNTER_WRITE_DUPLICATE); + } } - writeIds.push_back(writeId); } TBlobManagerDb blobManagerDb(txc.DB); - AFL_VERIFY(PutBlobResult->Get()->GetActions().size() == 1); - AFL_VERIFY(PutBlobResult->Get()->GetActions().front()->GetBlobsCount() == PutBlobResult->Get()->GetBlobData().size()); - for (auto&& i : PutBlobResult->Get()->GetActions()) { + AFL_VERIFY(buffer.GetAddActions().size() == 1); + for (auto&& i : buffer.GetAddActions()) { i->OnExecuteTxAfterWrite(*Self, blobManagerDb, true); } - - if (operation) { - operation->OnWriteFinish(txc, writeIds); - auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc); - Y_UNUSED(txInfo); - NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId()); - Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo); - } else { - Y_ABORT_UNLESS(writeIds.size() == 1); - Result = std::make_unique(Self->TabletID(), writeMeta, (ui64)writeIds.front(), NKikimrTxColumnShard::EResultStatus::SUCCESS); + for (auto&& i : buffer.GetRemoveActions()) { + i->OnExecuteTxAfterRemoving(*Self, blobManagerDb, true); + } + for (auto&& aggr : buffer.GetAggregations()) { + const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta(); + std::unique_ptr result; + TWriteOperation::TPtr operation; + if (!writeMeta.HasLongTxId()) { + operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); + Y_ABORT_UNLESS(operation); + Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started); + } + if (operation) { + operation->OnWriteFinish(txc, aggr->GetWriteIds()); + auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc); + Y_UNUSED(txInfo); + NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId()); + Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo)); + } else { + Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1); + Results.emplace_back(std::make_unique(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS)); + } } return true; } void TTxWrite::Complete(const TActorContext& ctx) { NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete"); - AFL_VERIFY(Result); - Self->CSCounters.OnWriteTxComplete((TMonotonic::Now() - PutBlobResult->Get()->GetWriteMeta().GetWriteStartInstant()).MilliSeconds()); - Self->CSCounters.OnSuccessWriteResponse(); - ctx.Send(PutBlobResult->Get()->GetWriteMeta().GetSource(), Result.release()); + const auto now = TMonotonic::Now(); + const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer(); + for (auto&& i : buffer.GetAddActions()) { + i->OnCompleteTxAfterWrite(*Self); + } + for (auto&& i : buffer.GetRemoveActions()) { + i->OnCompleteTxAfterRemoving(*Self); + } + AFL_VERIFY(buffer.GetAggregations().size() == Results.size()); + for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) { + const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta(); + ctx.Send(writeMeta.GetSource(), Results[i].release()); + Self->CSCounters.OnWriteTxComplete((now - writeMeta.GetWriteStartInstant()).MilliSeconds()); + Self->CSCounters.OnSuccessWriteResponse(); + } + } } diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h index c0b6688c3064..6086542940f6 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h @@ -1,5 +1,6 @@ #pragma once #include +#include namespace NKikimr::NColumnShard { @@ -15,12 +16,12 @@ class TTxWrite : public TTransactionBase { void Complete(const TActorContext& ctx) override; TTxType GetTxType() const override { return TXTYPE_WRITE; } - bool InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob); + bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId); private: TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult; const ui32 TabletTxNo; - std::unique_ptr Result; + std::vector> Results; TStringBuilder TxPrefix() const { return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] "; diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp index 3d5746d91fdc..e163888990a0 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp @@ -64,7 +64,6 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) { Self->EnqueueBackgroundActivities(false, TriggerActivity); } - Self->UpdateResourceMetrics(ctx, Ev->Get()->PutResult->GetResourceUsage()); changes->MutableBlobsAction().OnCompleteTxAfterAction(*Self); NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexComplete(Self->TabletID(), changes->TypeString()); } diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index ea375fae52fd..b4459dca0c7e 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -2,6 +2,7 @@ #include "blobs_reader/actor.h" #include "hooks/abstract/abstract.h" #include "resource_subscriber/actor.h" +#include "engines/writer/buffer/actor.h" namespace NKikimr { @@ -15,6 +16,8 @@ namespace NKikimr::NColumnShard { void TColumnShard::CleanupActors(const TActorContext& ctx) { ctx.Send(ResourceSubscribeActor, new TEvents::TEvPoisonPill); + ctx.Send(BufferizationWriteActorId, new TEvents::TEvPoisonPill); + StoragesManager->Stop(); if (Tiers) { Tiers->Stop(); @@ -69,6 +72,7 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) { CompactionLimits.RegisterControls(icb); Settings.RegisterControls(icb); ResourceSubscribeActor = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletID(), SelfId())); + BufferizationWriteActorId = ctx.Register(new NColumnShard::NWriting::TActor(TabletID(), SelfId())); Execute(CreateTxInitSchema(), ctx); } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 73b89494a7ad..101d3a296f96 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -56,55 +56,63 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo auto& putResult = ev->Get()->GetPutResult(); OnYellowChannels(putResult); - const auto& writeMeta = ev->Get()->GetWriteMeta(); + NOlap::TWritingBuffer& wBuffer = ev->Get()->MutableWritesBuffer(); + auto& baseAggregations = wBuffer.GetAggregations(); - if (!TablesManager.IsReadyForWrite(writeMeta.GetTableId())) { - ACFL_ERROR("event", "absent_pathId")("path_id", writeMeta.GetTableId())("has_index", TablesManager.HasPrimaryIndex()); - IncCounter(COUNTER_WRITE_FAIL); - - auto result = std::make_unique(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); - ctx.Send(writeMeta.GetSource(), result.release()); - CSCounters.OnFailedWriteResponse(); - return; - } + auto wg = WritesMonitor.FinishWrite(wBuffer.GetSumSize(), wBuffer.GetAggregations().size()); - auto wg = WritesMonitor.FinishWrite(putResult.GetResourceUsage().SourceMemorySize); + for (auto&& aggr : baseAggregations) { + const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta(); - if (putResult.GetPutStatus() != NKikimrProto::OK) { - CSCounters.OnWritePutBlobsFail((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds()); - IncCounter(COUNTER_WRITE_FAIL); + if (!TablesManager.IsReadyForWrite(writeMeta.GetTableId())) { + ACFL_ERROR("event", "absent_pathId")("path_id", writeMeta.GetTableId())("has_index", TablesManager.HasPrimaryIndex()); + IncCounter(COUNTER_WRITE_FAIL); - auto errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR; - if (putResult.GetPutStatus() == NKikimrProto::TIMEOUT || putResult.GetPutStatus() == NKikimrProto::DEADLINE) { - errCode = NKikimrTxColumnShard::EResultStatus::TIMEOUT; - } else if (putResult.GetPutStatus() == NKikimrProto::TRYLATER || putResult.GetPutStatus() == NKikimrProto::OUT_OF_SPACE) { - errCode = NKikimrTxColumnShard::EResultStatus::OVERLOADED; - } else if (putResult.GetPutStatus() == NKikimrProto::CORRUPTED) { - errCode = NKikimrTxColumnShard::EResultStatus::ERROR; + auto result = std::make_unique(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); + ctx.Send(writeMeta.GetSource(), result.release()); + CSCounters.OnFailedWriteResponse(); + wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator()); + continue; } - if (writeMeta.HasLongTxId()) { - auto result = std::make_unique(TabletID(), writeMeta, errCode); - ctx.Send(writeMeta.GetSource(), result.release()); + if (putResult.GetPutStatus() != NKikimrProto::OK) { + CSCounters.OnWritePutBlobsFail((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds()); + IncCounter(COUNTER_WRITE_FAIL); + + auto errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR; + if (putResult.GetPutStatus() == NKikimrProto::TIMEOUT || putResult.GetPutStatus() == NKikimrProto::DEADLINE) { + errCode = NKikimrTxColumnShard::EResultStatus::TIMEOUT; + } else if (putResult.GetPutStatus() == NKikimrProto::TRYLATER || putResult.GetPutStatus() == NKikimrProto::OUT_OF_SPACE) { + errCode = NKikimrTxColumnShard::EResultStatus::OVERLOADED; + } else if (putResult.GetPutStatus() == NKikimrProto::CORRUPTED) { + errCode = NKikimrTxColumnShard::EResultStatus::ERROR; + } + + if (writeMeta.HasLongTxId()) { + auto result = std::make_unique(TabletID(), writeMeta, errCode); + ctx.Send(writeMeta.GetSource(), result.release()); + } else { + auto operation = OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); + Y_ABORT_UNLESS(operation); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "put data fails"); + ctx.Send(writeMeta.GetSource(), result.release()); + } + CSCounters.OnFailedWriteResponse(); + wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator()); } else { - auto operation = OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); - Y_ABORT_UNLESS(operation); - auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "put data fails"); - ctx.Send(writeMeta.GetSource(), result.release()); + const TMonotonic now = TMonotonic::Now(); + CSCounters.OnWritePutBlobsSuccess((now - writeMeta.GetWriteStartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle1PutBlobsSuccess((now - writeMeta.GetWriteMiddle1StartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle2PutBlobsSuccess((now - writeMeta.GetWriteMiddle2StartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle3PutBlobsSuccess((now - writeMeta.GetWriteMiddle3StartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle4PutBlobsSuccess((now - writeMeta.GetWriteMiddle4StartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle5PutBlobsSuccess((now - writeMeta.GetWriteMiddle5StartInstant()).MilliSeconds()); + LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId() + << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID()); + } - CSCounters.OnFailedWriteResponse(); - } else { - CSCounters.OnWritePutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle1PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle1StartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle2PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle2StartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle3PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle3StartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle4PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle4StartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle5PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle5StartInstant()).MilliSeconds()); - LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId() - << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID()); - - Execute(new TTxWrite(this, ev), ctx); } + Execute(new TTxWrite(this, ev), ctx); } void TColumnShard::Handle(TEvPrivate::TEvWriteDraft::TPtr& ev, const TActorContext& ctx) { @@ -127,15 +135,23 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex writeMeta.SetLongTxId(NLongTxService::TLongTxId::FromProto(record.GetLongTxId())); writeMeta.SetWritePartId(record.GetWritePartId()); - if (!TablesManager.IsReadyForWrite(tableId)) { - LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index") - << " at tablet " << TabletID()); - IncCounter(COUNTER_WRITE_FAIL); + const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex) { + IncCounter(signalIndex); - auto result = std::make_unique(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); - ctx.Send(source, result.release()); + ctx.Send(source, std::make_unique(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR)); CSCounters.OnFailedWriteResponse(); return; + }; + + if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "disabled"); + return returnFail(COUNTER_WRITE_FAIL); + } + + if (!TablesManager.IsReadyForWrite(tableId)) { + LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index") + << " at tablet " << TabletID()); + return returnFail(COUNTER_WRITE_FAIL); } const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema(); @@ -143,21 +159,17 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex if (!arrowData->ParseFromProto(record)) { LOG_S_ERROR("Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId() << " at tablet " << TabletID()); - IncCounter(COUNTER_WRITE_FAIL); - auto result = std::make_unique(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); - ctx.Send(source, result.release()); - CSCounters.OnFailedWriteResponse(); - return; + return returnFail(COUNTER_WRITE_FAIL); } - NEvWrite::TWriteData writeData(writeMeta, arrowData); + NEvWrite::TWriteData writeData(writeMeta, arrowData, snapshotSchema->GetIndexInfo().GetReplaceKey(), StoragesManager->GetInsertOperator()->StartWritingAction("WRITING")); auto overloadStatus = CheckOverloaded(tableId); if (overloadStatus != EOverloadStatus::None) { std::unique_ptr result = std::make_unique(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED); OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx); CSCounters.OnFailedWriteResponse(); } else { - if (ui64 writeId = (ui64) HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) { + if (ui64 writeId = (ui64)HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) { LOG_S_DEBUG("Write (duplicate) into pathId " << writeMeta.GetTableId() << " longTx " << writeMeta.GetLongTxIdUnsafe().ToString() << " at tablet " << TabletID()); @@ -178,8 +190,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex << WritesMonitor.DebugString() << " at tablet " << TabletID()); writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now()); - std::shared_ptr task = std::make_shared(TabletID(), SelfId(), - StoragesManager->GetInsertOperator()->StartWritingAction("WRITING"), writeData, snapshotSchema->GetIndexInfo().GetReplaceKey()); + std::shared_ptr task = std::make_shared(TabletID(), SelfId(), BufferizationWriteActorId, std::move(writeData)); NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task); } } @@ -240,7 +251,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor auto overloadStatus = CheckOverloaded(tableId); if (overloadStatus != EOverloadStatus::None) { - NEvWrite::TWriteData writeData(NEvWrite::TWriteMeta(0, tableId, source), arrowData); + NEvWrite::TWriteData writeData(NEvWrite::TWriteMeta(0, tableId, source), arrowData, nullptr, nullptr); std::unique_ptr result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error"); OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx); return; diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 7e27ea998b73..7a39a30cc7b7 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -680,7 +680,6 @@ class TChangesTask: public NConveyor::ITask { virtual bool DoExecute() override { NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent_id", ParentActorId)); { - auto guard = TxEvent->PutResult->StartCpuGuard(); NOlap::TConstructionContext context(TxEvent->IndexInfo, Counters); Y_ABORT_UNLESS(TxEvent->IndexChanges->ConstructBlobs(context).Ok()); if (!TxEvent->IndexChanges->GetWritePortionsCount()) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 73990e2e31e2..2f728fbc01a2 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -345,10 +345,10 @@ class TColumnShard return TGuard(*this); } - TGuard FinishWrite(const ui64 dataSize) { + TGuard FinishWrite(const ui64 dataSize, const ui32 writesCount = 1) { Y_ABORT_UNLESS(WritesInFlight > 0); Y_ABORT_UNLESS(WritesSizeInFlight >= dataSize); - --WritesInFlight; + WritesInFlight -= writesCount; WritesSizeInFlight -= dataSize; return TGuard(*this); } @@ -396,6 +396,7 @@ class TColumnShard TInstant LastStatsReport; TActorId ResourceSubscribeActor; + TActorId BufferizationWriteActorId; TActorId StatsReportPipe; std::shared_ptr StoragesManager; diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index 9161d8ca1c20..e258f724fcf3 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -32,6 +33,10 @@ struct TEvPrivate { EvTieringModified, EvStartResourceUsageTask, EvNormalizerResult, + + EvWritingAddDataToBuffer, + EvWritingFlushBuffer, + EvEnd }; @@ -105,97 +110,6 @@ struct TEvPrivate { } }; - struct TEvS3Settings : public TEventLocal { - NKikimrSchemeOp::TS3Settings Settings; - - explicit TEvS3Settings(const NKikimrSchemeOp::TS3Settings& settings) - : Settings(settings) - {} - }; - - struct TEvExport : public TEventLocal { - using TBlobDataMap = THashMap; - - NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN; - ui64 ExportNo = 0; - TString TierName; - TActorId DstActor; - TBlobDataMap Blobs; // src: blobId -> data map; dst: exported blobIds set - THashMap SrcToDstBlobs; - TMap ErrorStrings; - - explicit TEvExport(ui64 exportNo, const TString& tierName, ui64 pathId, - const THashSet& blobIds) - : ExportNo(exportNo) - , TierName(tierName) - { - Y_ABORT_UNLESS(ExportNo); - Y_ABORT_UNLESS(!TierName.empty()); - Y_ABORT_UNLESS(pathId); - Y_ABORT_UNLESS(!blobIds.empty()); - - for (auto& blobId : blobIds) { - Blobs.emplace(blobId, TString()); - SrcToDstBlobs[blobId] = blobId.MakeS3BlobId(pathId); - } - } - - explicit TEvExport(ui64 exportNo, const TString& tierName, const THashSet& evictSet) - : ExportNo(exportNo) - , TierName(tierName) - { - Y_ABORT_UNLESS(ExportNo); - Y_ABORT_UNLESS(!TierName.empty()); - Y_ABORT_UNLESS(!evictSet.empty()); - - for (auto& evict : evictSet) { - Y_ABORT_UNLESS(evict.IsEvicting()); - Y_ABORT_UNLESS(evict.ExternBlob.IsS3Blob()); - - Blobs.emplace(evict.Blob, TString()); - SrcToDstBlobs[evict.Blob] = evict.ExternBlob; - } - } - - void AddResult(const TUnifiedBlobId& blobId, const TString& key, const bool hasError, const TString& errStr) { - if (hasError) { - Status = NKikimrProto::ERROR; - Y_ABORT_UNLESS(ErrorStrings.emplace(key, errStr).second, "%s", key.data()); - Blobs.erase(blobId); - } else if (!ErrorStrings.contains(key)) { // (OK + !OK) == !OK - Y_ABORT_UNLESS(Blobs.contains(blobId)); - if (Status == NKikimrProto::UNKNOWN) { - Status = NKikimrProto::OK; - } - } - } - - bool Finished() const { - return (Blobs.size() + ErrorStrings.size()) == SrcToDstBlobs.size(); - } - - TString SerializeErrorsToString() const { - TStringBuilder sb; - for (auto&& i : ErrorStrings) { - sb << i.first << "=" << i.second << ";"; - } - return sb; - } - }; - - struct TEvForget: public TEventLocal { - NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN; - std::vector Evicted; - TString ErrorStr; - }; - - struct TEvGetExported : public TEventLocal { - TActorId DstActor; // It's a BlobCache actor. S3 actor sends TEvReadBlobRangesResult to it as result - ui64 DstCookie; - NOlap::TEvictedBlob Evicted; - std::vector BlobRanges; - }; - struct TEvScanStats : public TEventLocal { TEvScanStats(ui64 rows, ui64 bytes) : Rows(rows), Bytes(bytes) {} ui64 Rows; @@ -220,89 +134,28 @@ struct TEvPrivate { }; class TEvWriteBlobsResult : public TEventLocal { + private: + NColumnShard::TBlobPutResult::TPtr PutResult; + NOlap::TWritingBuffer WritesBuffer; public: - class TPutBlobData { - YDB_READONLY_DEF(TBlobRange, BlobRange); - YDB_READONLY_DEF(NKikimrTxColumnShard::TLogicalMetadata, LogicalMeta); - YDB_ACCESSOR(ui64, RowsCount, 0); - YDB_ACCESSOR(ui64, RawBytes, 0); - public: - TPutBlobData() = default; - - TPutBlobData(const TBlobRange& blobRange, const NArrow::TFirstLastSpecialKeys& specialKeys, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime) - : BlobRange(blobRange) - , RowsCount(rowsCount) - , RawBytes(rawBytes) - { - LogicalMeta.SetNumRows(rowsCount); - LogicalMeta.SetRawBytes(rawBytes); - LogicalMeta.SetDirtyWriteTimeSeconds(dirtyTime.Seconds()); - LogicalMeta.SetSpecialKeysRawData(specialKeys.SerializeToString()); - } - }; - - TString GetBlobVerified(const TBlobRange& bRange) const { - for (auto&& i : Actions) { - for (auto&& b : i->GetBlobsForWrite()) { - if (bRange.GetBlobId() == b.first) { - AFL_VERIFY(bRange.Size + bRange.Offset <= b.second.size()); - if (bRange.Size == b.second.size()) { - return b.second; - } else { - return b.second.substr(bRange.Offset, bRange.Size); - } - } - } - } - AFL_VERIFY(false); - return ""; - } - - TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, const NEvWrite::TWriteMeta& writeMeta) + TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, NOlap::TWritingBuffer&& writesBuffer) : PutResult(putResult) - , WriteMeta(writeMeta) + , WritesBuffer(std::move(writesBuffer)) { Y_ABORT_UNLESS(PutResult); } - TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, TVector&& blobData, const std::vector>& actions, const NEvWrite::TWriteMeta& writeMeta, const ui64 schemaVersion) - : TEvWriteBlobsResult(putResult, writeMeta) - { - Actions = actions; - BlobData = std::move(blobData); - SchemaVersion = schemaVersion; - } - - const std::vector>& GetActions() const { - return Actions; - } - - const TVector& GetBlobData() const { - return BlobData; - } - const NColumnShard::TBlobPutResult& GetPutResult() const { return *PutResult; } - const NColumnShard::TBlobPutResult::TPtr GetPutResultPtr() { - return PutResult; + const NOlap::TWritingBuffer& GetWritesBuffer() const { + return WritesBuffer; } - const NEvWrite::TWriteMeta& GetWriteMeta() const { - return WriteMeta; + NOlap::TWritingBuffer& MutableWritesBuffer() { + return WritesBuffer; } - - ui64 GetSchemaVersion() const { - return SchemaVersion; - } - - private: - NColumnShard::TBlobPutResult::TPtr PutResult; - TVector BlobData; - std::vector> Actions; - NEvWrite::TWriteMeta WriteMeta; - ui64 SchemaVersion = 0; }; }; diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp index b1c173063be2..039faafa8794 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/columnshard_schema.cpp @@ -51,7 +51,18 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG if (metaStr) { Y_ABORT_UNLESS(meta.ParseFromString(metaStr)); } - TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, 0, blobId.BlobSize()), meta, schemaVersion, {}); + + std::optional rangeOffset; + if (rowset.HaveValue()) { + rangeOffset = rowset.GetValue(); + } + std::optional rangeSize; + if (rowset.HaveValue()) { + rangeSize = rowset.GetValue(); + } + + AFL_VERIFY(!!rangeOffset == !!rangeSize); + TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, rangeOffset.value_or(0), rangeSize.value_or(blobId.BlobSize())), meta, schemaVersion, {}); switch (recType) { case EInsertTableIds::Inserted: @@ -64,7 +75,6 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG insertTable.AddAborted(std::move(data), true); break; } - if (!rowset.Next()) { return false; } diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index b9116a0df772..1dc2ede7060f 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -206,14 +206,17 @@ struct Schema : NIceDb::Schema { struct WriteTxId : Column<3, NScheme::NTypeIds::Uint64> {}; struct PathId : Column<4, NScheme::NTypeIds::Uint64> {}; struct DedupId : Column<5, NScheme::NTypeIds::String> {}; - struct BlobId : Column<6, NScheme::NTypeIds::String> {}; + struct BlobId: Column<6, NScheme::NTypeIds::String> {}; struct Meta : Column<7, NScheme::NTypeIds::String> {}; struct IndexPlanStep : Column<8, NScheme::NTypeIds::Uint64> {}; struct IndexTxId : Column<9, NScheme::NTypeIds::Uint64> {}; struct SchemaVersion : Column<10, NScheme::NTypeIds::Uint64> {}; + struct BlobRangeOffset: Column<11, NScheme::NTypeIds::Uint64> {}; + struct BlobRangeSize: Column<12, NScheme::NTypeIds::Uint64> {}; + using TKey = TableKey; - using TColumns = TableColumns; + using TColumns = TableColumns; }; struct IndexGranules : NIceDb::Schema::Table { @@ -465,6 +468,8 @@ struct Schema : NIceDb::Schema { static void InsertTable_Upsert(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) { db.Table().Key((ui8)recType, data.PlanStep, data.WriteTxId, data.PathId, data.DedupId).Update( NIceDb::TUpdate(data.GetBlobRange().GetBlobId().ToStringLegacy()), + NIceDb::TUpdate(data.GetBlobRange().Offset), + NIceDb::TUpdate(data.GetBlobRange().Size), NIceDb::TUpdate(data.GetMeta().SerializeToProto().SerializeAsString()), NIceDb::TUpdate(data.GetSchemaVersion()) ); diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/settings.h b/ydb/core/tx/columnshard/engines/changes/abstract/settings.h index e2d0e994d273..ec966965d52d 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/settings.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract/settings.h @@ -12,8 +12,8 @@ struct TCompactionLimits { static constexpr const ui64 DEFAULT_EVICTION_BYTES = 64 * 1024 * 1024; static constexpr const ui64 MAX_BLOBS_TO_DELETE = 10000; - static constexpr const ui64 OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID = (ui64)2 << 30; - static constexpr const ui64 WARNING_INSERT_TABLE_SIZE_BY_PATH_ID = 0.3 * OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID; + static constexpr const ui64 OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID = (ui64)1 << 30; + static constexpr const ui64 WARNING_INSERT_TABLE_SIZE_BY_PATH_ID = 0.5 * OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID; static constexpr const ui64 WARNING_INSERT_TABLE_COUNT_BY_PATH_ID = 100; static constexpr const i64 OVERLOAD_GRANULE_SIZE = 20 * MAX_BLOB_SIZE; diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 9247e37a6461..c68100010934 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -15,9 +15,9 @@ bool TInsertColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApp void TInsertColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) { TBase::DoWriteIndex(self, context); + auto removing = BlobsAction.GetRemoving(IStoragesManager::DefaultStorageId); for (const auto& insertedData : DataToIndex) { - self.InsertTable->EraseCommitted(context.DBWrapper, insertedData); - Y_ABORT_UNLESS(insertedData.GetBlobRange().IsFullBlob()); + self.InsertTable->EraseCommitted(context.DBWrapper, insertedData, removing); } if (!DataToIndex.empty()) { self.UpdateInsertTableCounters(); @@ -27,13 +27,9 @@ void TInsertColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, void TInsertColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { TBase::DoStart(self); Y_ABORT_UNLESS(DataToIndex.size()); - auto removing = BlobsAction.GetRemoving(IStoragesManager::DefaultStorageId); auto reading = BlobsAction.GetReading(IStoragesManager::DefaultStorageId); - for (size_t i = 0; i < DataToIndex.size(); ++i) { - const auto& insertedData = DataToIndex[i]; - Y_ABORT_UNLESS(insertedData.GetBlobRange().IsFullBlob()); + for (auto&& insertedData : DataToIndex) { reading->AddRange(insertedData.GetBlobRange(), insertedData.GetBlobData().value_or("")); - removing->DeclareRemove(insertedData.GetBlobRange().GetBlobId()); } self.BackgroundController.StartIndexing(*this); diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index bdda3b38c6aa..24913b1ebbb6 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -58,13 +58,13 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange // Save new portions (their column records) { auto g = self.GranulesStorage->StartPackModification(); - + THashSet usedPortionIds; for (auto& [_, portionInfo] : PortionsToRemove) { Y_ABORT_UNLESS(!portionInfo.Empty()); Y_ABORT_UNLESS(portionInfo.HasRemoveSnapshot()); const TPortionInfo& oldInfo = self.GetGranuleVerified(portionInfo.GetPathId()).GetPortionVerified(portionInfo.GetPortion()); - + AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true)); self.UpsertPortion(portionInfo, &oldInfo); for (auto& record : portionInfo.Records) { @@ -74,6 +74,7 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange for (auto& portionInfoWithBlobs : AppendedPortions) { auto& portionInfo = portionInfoWithBlobs.GetPortionInfo(); Y_ABORT_UNLESS(!portionInfo.Empty()); + AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true)); self.UpsertPortion(portionInfo); for (auto& record : portionInfo.Records) { self.ColumnsTable->Write(context.DB, portionInfo, record); diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h index c44bc7130011..2767ec4f8848 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.h +++ b/ydb/core/tx/columnshard/engines/db_wrapper.h @@ -51,8 +51,7 @@ class TDbWrapper : public IDbWrapper { void EraseCommitted(const TInsertedData& data) override; void EraseAborted(const TInsertedData& data) override; - bool Load(TInsertTableAccessor& insertTable, - const TInstant& loadTime) override; + bool Load(TInsertTableAccessor& insertTable, const TInstant& loadTime) override; void WriteColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; void EraseColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index 87eaa6adae93..19a5a6eae3ff 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -7,6 +7,7 @@ namespace NKikimr::NOlap { bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) { if (auto* dataPtr = Summary.AddInserted(std::move(data))) { + AddBlobLink(dataPtr->GetBlobRange().BlobId); dbTable.Insert(*dataPtr); return true; } else { @@ -86,14 +87,16 @@ THashSet TInsertTable::DropPath(IDbWrapper& dbTable, ui64 pathId) { return Summary.GetInsertedByPathId(pathId); } -void TInsertTable::EraseCommitted(IDbWrapper& dbTable, const TInsertedData& data) { +void TInsertTable::EraseCommitted(IDbWrapper& dbTable, const TInsertedData& data, const std::shared_ptr& blobsAction) { if (Summary.EraseCommitted(data)) { + RemoveBlobLink(data.GetBlobRange().BlobId, blobsAction); dbTable.EraseCommitted(data); } } -void TInsertTable::EraseAborted(IDbWrapper& dbTable, const TInsertedData& data) { +void TInsertTable::EraseAborted(IDbWrapper& dbTable, const TInsertedData& data, const std::shared_ptr& blobsAction) { if (Summary.EraseAborted((TWriteId)data.WriteTxId)) { + RemoveBlobLink(data.GetBlobRange().BlobId, blobsAction); dbTable.EraseAborted(data); } } @@ -132,4 +135,19 @@ std::vector TInsertTable::Read(ui64 pathId, const TSnapshot& sna return result; } +bool TInsertTableAccessor::RemoveBlobLink(const TUnifiedBlobId& blobId, const std::shared_ptr& blobsAction) { + AFL_VERIFY(blobsAction); + auto itBlob = BlobLinks.find(blobId); + AFL_VERIFY(itBlob != BlobLinks.end()); + AFL_VERIFY(itBlob->second >= 1); + if (itBlob->second == 1) { + blobsAction->DeclareRemove(itBlob->first); + BlobLinks.erase(itBlob); + return true; + } else { + --itBlob->second; + return false; + } +} + } diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h index a5dccec179d6..65b8a8ef49be 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h @@ -15,18 +15,34 @@ class IDbWrapper; class TInsertTableAccessor { protected: TInsertionSummary Summary; + THashMap BlobLinks; + + void AddBlobLink(const TUnifiedBlobId& blobId) { + ++BlobLinks[blobId]; + } + + bool RemoveBlobLink(const TUnifiedBlobId& blobId, const std::shared_ptr& blobsAction); public: const std::map>& GetPathPriorities() const { return Summary.GetPathPriorities(); } bool AddInserted(TInsertedData&& data, const bool load) { + if (load) { + AddBlobLink(data.GetBlobRange().BlobId); + } return Summary.AddInserted(std::move(data), load); } bool AddAborted(TInsertedData&& data, const bool load) { + if (load) { + AddBlobLink(data.GetBlobRange().BlobId); + } return Summary.AddAborted(std::move(data), load); } bool AddCommitted(TInsertedData&& data, const bool load) { + if (load) { + AddBlobLink(data.GetBlobRange().BlobId); + } const ui64 pathId = data.PathId; return Summary.GetPathInfo(pathId).AddCommitted(std::move(data), load); } @@ -56,8 +72,8 @@ class TInsertTable: public TInsertTableAccessor { void Abort(IDbWrapper& dbTable, const THashSet& writeIds); THashSet OldWritesToAbort(const TInstant& now) const; THashSet DropPath(IDbWrapper& dbTable, ui64 pathId); - void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key); - void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key); + void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key, const std::shared_ptr& blobsAction); + void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key, const std::shared_ptr& blobsAction); std::vector Read(ui64 pathId, const TSnapshot& snapshot, const std::shared_ptr& pkSchema) const; bool Load(IDbWrapper& dbTable, const TInstant loadTime); private: diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp index 3672781917d9..cbe7d900356d 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp @@ -1,5 +1,6 @@ #include "rt_insertion.h" #include +#include namespace NKikimr::NOlap { diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h index edee77f8dbb2..97c2575c47a4 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h @@ -4,7 +4,7 @@ #include "path_info.h" namespace NKikimr::NOlap { - +class IBlobsDeclareRemovingAction; class TInsertionSummary { public: struct TCounters { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h index d1d0ddfe1e77..9e9d1e222e3f 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h @@ -54,7 +54,7 @@ class IOptimizerPlanner { const ui64 PathId; YDB_READONLY(TInstant, ActualizationInstant, TInstant::Zero()); protected: - virtual void DoModifyPortions(const std::vector>& add, const std::vector>& remove) = 0; + virtual void DoModifyPortions(const THashMap>& add, const THashMap>& remove) = 0; virtual std::shared_ptr DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr granule, const THashSet& busyPortions) const = 0; virtual TOptimizationPriority DoGetUsefulMetric() const = 0; virtual void DoActualize(const TInstant currentInstant) = 0; @@ -76,15 +76,15 @@ class IOptimizerPlanner { class TModificationGuard: TNonCopyable { private: IOptimizerPlanner& Owner; - std::vector> AddPortions; - std::vector> RemovePortions; + THashMap> AddPortions; + THashMap> RemovePortions; public: TModificationGuard& AddPortion(const std::shared_ptr& portion) { if (HasAppData() && AppDataVerified().ColumnShardConfig.GetSkipOldGranules() && portion->GetDeprecatedGranuleId() > 0) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_granule")("granule_id", portion->GetDeprecatedGranuleId()); return *this; } - AddPortions.emplace_back(portion); + AFL_VERIFY(AddPortions.emplace(portion->GetPortionId(), portion).second); return*this; } @@ -93,7 +93,7 @@ class IOptimizerPlanner { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_granule")("granule_id", portion->GetDeprecatedGranuleId()); return *this; } - RemovePortions.emplace_back(portion); + AFL_VERIFY(RemovePortions.emplace(portion->GetPortionId(), portion).second); return*this; } @@ -121,7 +121,7 @@ class IOptimizerPlanner { return DoSerializeToJsonVisual(); } - void ModifyPortions(const std::vector>& add, const std::vector>& remove) { + void ModifyPortions(const THashMap>& add, const THashMap>& remove) { NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("path_id", PathId)); DoModifyPortions(add, remove); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp index acab75cd86aa..e7dca2655155 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp @@ -141,12 +141,12 @@ void TIntervalsOptimizerPlanner::AddPortion(const std::shared_ptr& AFL_VERIFY(RangedSegments.empty() == Positions.empty())("rs_size", RangedSegments.size())("p_size", Positions.size()); } -void TIntervalsOptimizerPlanner::DoModifyPortions(const std::vector>& add, const std::vector>& remove) { - for (auto&& i : remove) { +void TIntervalsOptimizerPlanner::DoModifyPortions(const THashMap>& add, const THashMap>& remove) { + for (auto&& [_, i] : remove) { SizeProblemBlobs.RemovePortion(i); RemovePortion(i); } - for (auto&& i : add) { + for (auto&& [_, i] : add) { SizeProblemBlobs.AddPortion(i); AddPortion(i); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h index 5d7d0cd1efb0..80f3a5edc583 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h @@ -243,7 +243,7 @@ class TIntervalsOptimizerPlanner: public IOptimizerPlanner { std::vector> GetPortionsForIntervalStartedIn(const NArrow::TReplaceKey& keyStart, const ui32 countExpectation) const; protected: - virtual void DoModifyPortions(const std::vector>& add, const std::vector>& remove) override; + virtual void DoModifyPortions(const THashMap>& add, const THashMap>& remove) override; virtual std::shared_ptr DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr granule, const THashSet& busyPortions) const override; virtual TOptimizationPriority DoGetUsefulMetric() const override; diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h index f33de360b57b..c286c92ef240 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h @@ -122,16 +122,6 @@ class TPortionsPool { } } - bool AddFuture(const std::shared_ptr& portion) { - auto portionMaxSnapshotInstant = TInstant::MilliSeconds(portion->RecordSnapshotMax().GetPlanStep()); - if (Futures[portionMaxSnapshotInstant].emplace(portion->GetPortionId(), portion).second) { - Counters->FuturePortions->AddPortion(portion); - return true; - } else { - return false; - } - } - bool RemoveFutures(const TInstant instant) { auto itFutures = Futures.find(instant); if (itFutures == Futures.end()) { @@ -144,26 +134,14 @@ class TPortionsPool { return true; } - bool RemoveFutures(const TInstant instant, const std::vector>& portions) { - if (portions.empty()) { + bool AddFuture(const std::shared_ptr& portion) { + auto portionMaxSnapshotInstant = TInstant::MilliSeconds(portion->RecordSnapshotMax().GetPlanStep()); + if (Futures[portionMaxSnapshotInstant].emplace(portion->GetPortionId(), portion).second) { + Counters->FuturePortions->AddPortion(portion); return true; - } - auto itFutures = Futures.find(instant); - if (itFutures == Futures.end()) { + } else { return false; } - bool hasAbsent = false; - for (auto&& i : portions) { - if (!itFutures->second.erase(i->GetPortionId())) { - hasAbsent = true; - } else { - Counters->FuturePortions->RemovePortion(i); - } - } - if (itFutures->second.empty()) { - Futures.erase(itFutures); - } - return !hasAbsent; } bool AddFutures(const TInstant instant, const THashMap>& portions) { @@ -197,6 +175,29 @@ class TPortionsPool { } return true; } + + bool RemoveFutures(const TInstant instant, const std::vector>& portions) { + if (portions.empty()) { + return true; + } + auto itFutures = Futures.find(instant); + if (itFutures == Futures.end()) { + return false; + } + bool hasAbsent = false; + for (auto&& i : portions) { + if (!itFutures->second.erase(i->GetPortionId())) { + hasAbsent = true; + } else { + Counters->FuturePortions->RemovePortion(i); + } + } + if (itFutures->second.empty()) { + Futures.erase(itFutures); + } + return !hasAbsent; + } + public: bool Validate(const std::shared_ptr& portion) const { if (portion) { @@ -387,14 +388,17 @@ class TPortionsPool { } } - void Remove(const std::shared_ptr& portion) { + bool Remove(const std::shared_ptr& portion) Y_WARN_UNUSED_RESULT { if (RemovePreActual(portion)) { - return; + return true; } if (RemoveActual(portion)) { - return; + return true; + } + if (RemoveFuture(portion)) { + return true; } - AFL_VERIFY(RemoveFuture(portion)); + return false; } void MergeFrom(TPortionsPool& source) { @@ -778,7 +782,7 @@ class TPortionsBucket: public TMoveOnly { void RemoveOther(const std::shared_ptr& portion) { auto gChartsThis = StartModificationGuard(); - Others.Remove(portion); + AFL_VERIFY(Others.Remove(portion))("portion", portion->DebugString())("bucket_start", MainPortion ? MainPortion->DebugString(true) : "-inf")("bucket_finish", NextBorder ? NextBorder->DebugString() : "undef"); } void MergeOthersFrom(TPortionsBucket& dest) { @@ -1016,25 +1020,25 @@ class TOptimizerPlanner: public IOptimizerPlanner { TPortionBuckets Buckets; const std::shared_ptr StoragesManager; protected: - virtual void DoModifyPortions(const std::vector>& add, const std::vector>& remove) override { + virtual void DoModifyPortions(const THashMap>& add, const THashMap>& remove) override { const TInstant now = TInstant::Now(); - for (auto&& i : add) { + for (auto&& [_, i] : remove) { if (i->GetMeta().GetTierName() != IStoragesManager::DefaultStorageId && i->GetMeta().GetTierName() != "") { continue; } + Buckets.RemovePortion(i); if (Buckets.IsEmpty()) { - Counters->OptimizersCount->Add(1); + Counters->OptimizersCount->Sub(1); } - Buckets.AddPortion(i, now); } - for (auto&& i : remove) { + for (auto&& [_, i] : add) { if (i->GetMeta().GetTierName() != IStoragesManager::DefaultStorageId && i->GetMeta().GetTierName() != "") { continue; } - Buckets.RemovePortion(i); if (Buckets.IsEmpty()) { - Counters->OptimizersCount->Sub(1); + Counters->OptimizersCount->Add(1); } + Buckets.AddPortion(i, now); } } virtual std::shared_ptr DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr granule, const THashSet& busyPortions) const override { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h index 46a9988a130d..c670cd67716b 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h @@ -472,26 +472,26 @@ class TLevelsOptimizerPlanner: public IOptimizerPlanner { return {}; } - virtual void DoModifyPortions(const std::vector>& add, const std::vector>& remove) override { + virtual void DoModifyPortions(const THashMap>& add, const THashMap>& remove) override { const TInstant currentInstant = TInstant::Now(); - for (auto&& i : add) { + for (auto&& [_, i] : remove) { if (i->GetMeta().GetTierName() != IStoragesManager::DefaultStorageId && i->GetMeta().GetTierName() != "") { continue; } if (!i->GetMeta().RecordSnapshotMax) { - LMax->AddPortion(i, currentInstant); + LMax->RemovePortion(i, currentInstant); } else { - LStart->AddPortion(i, currentInstant); + LStart->RemovePortion(i, currentInstant); } } - for (auto&& i : remove) { + for (auto&& [_, i] : add) { if (i->GetMeta().GetTierName() != IStoragesManager::DefaultStorageId && i->GetMeta().GetTierName() != "") { continue; } if (!i->GetMeta().RecordSnapshotMax) { - LMax->RemovePortion(i, currentInstant); + LMax->AddPortion(i, currentInstant); } else { - LStart->RemovePortion(i, currentInstant); + LStart->AddPortion(i, currentInstant); } } } diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/actor.cpp b/ydb/core/tx/columnshard/engines/writer/buffer/actor.cpp new file mode 100644 index 000000000000..28e0ab6ca203 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/writer/buffer/actor.cpp @@ -0,0 +1,57 @@ +#include "actor.h" +#include + +namespace NKikimr::NColumnShard::NWriting { + +TActor::TActor(ui64 tabletId, const TActorId& parent) + : TabletId(tabletId) + , ParentActorId(parent) +{ + +} + +void TActor::Bootstrap() { + Become(&TThis::StateWait); + Schedule(FlushDuration, new TEvFlushBuffer); + FlushDuration = TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetWritingBufferDurationMs()); +} + +void TActor::Flush() { + if (Aggregations.size()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "flush_writing")("size", SumSize)("count", Aggregations.size()); + auto action = Aggregations.front()->GetWriteData()->GetBlobsAction(); + auto writeController = std::make_shared(ParentActorId, action, std::move(Aggregations)); + if (action->NeedDraftTransaction()) { + TActorContext::AsActorContext().Send(ParentActorId, std::make_unique(writeController)); + } else { + TActorContext::AsActorContext().Register(NColumnShard::CreateWriteActor(TabletId, writeController, TInstant::Max())); + } + Aggregations.clear(); + SumSize = 0; + } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_flush_writing"); + } +} + +void TActor::Handle(TEvFlushBuffer::TPtr& /*ev*/) { + FlushDuration = TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetWritingBufferDurationMs()); + Flush(); + if (!FlushDuration) { + Schedule(TDuration::MilliSeconds(500), new TEvFlushBuffer); + } else { + Schedule(FlushDuration, new TEvFlushBuffer); + } +} + +void TActor::Handle(TEvAddInsertedDataToBuffer::TPtr& ev) { + auto* evBase = ev->Get(); + AFL_VERIFY(evBase->GetWriteData()->GetBlobsAction()->GetStorageId() == NOlap::IStoragesManager::DefaultStorageId); + SumSize += evBase->GetWriteData()->GetSize(); + Aggregations.emplace_back(std::make_shared(evBase->GetWriteData(), std::move(evBase->MutableBlobsToWrite()))); + if (SumSize > 4 * 1024 * 1024 || Aggregations.size() > 750 || !FlushDuration) { + Flush(); + } +} + + +} diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/actor.h b/ydb/core/tx/columnshard/engines/writer/buffer/actor.h new file mode 100644 index 000000000000..0a31e7bd6278 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/writer/buffer/actor.h @@ -0,0 +1,36 @@ +#pragma once +#include "events.h" +#include +#include + +namespace NKikimr::NColumnShard::NWriting { + +class TActor: public TActorBootstrapped { +private: + std::vector> Aggregations; + const ui64 TabletId; + NActors::TActorId ParentActorId; + TDuration FlushDuration = TDuration::Zero(); + ui64 SumSize = 0; + void Flush(); +public: + TActor(ui64 tabletId, const TActorId& parent); + ~TActor() = default; + + void Handle(TEvAddInsertedDataToBuffer::TPtr& ev); + void Handle(TEvFlushBuffer::TPtr& ev); + void Bootstrap(); + + STFUNC(StateWait) { + TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent", ParentActorId)); + switch (ev->GetTypeRewrite()) { + cFunc(NActors::TEvents::TEvPoison::EventType, PassAway); + hFunc(TEvAddInsertedDataToBuffer, Handle); + hFunc(TEvFlushBuffer, Handle); + default: + AFL_VERIFY(false); + } + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/events.cpp b/ydb/core/tx/columnshard/engines/writer/buffer/events.cpp new file mode 100644 index 000000000000..f2dd6ca1c00d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/writer/buffer/events.cpp @@ -0,0 +1,5 @@ +#include "events.h" + +namespace NKikimr::NColumnShard::NWriting { + +} diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/events.h b/ydb/core/tx/columnshard/engines/writer/buffer/events.h new file mode 100644 index 000000000000..ee750ad69bcf --- /dev/null +++ b/ydb/core/tx/columnshard/engines/writer/buffer/events.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace NKikimr::NColumnShard::NWriting { + +class TEvAddInsertedDataToBuffer: public NActors::TEventLocal { +private: + YDB_READONLY_DEF(std::shared_ptr, WriteData); + YDB_ACCESSOR_DEF(std::vector, BlobsToWrite); +public: + + explicit TEvAddInsertedDataToBuffer(const std::shared_ptr& writeData, std::vector&& blobs) + : WriteData(writeData) + , BlobsToWrite(blobs) { + } + +}; + +class TEvFlushBuffer: public NActors::TEventLocal { +private: + static inline NActors::NTests::TGlobalScheduledEvents::TRegistrator TestScheduledEventRegistrator = (ui32)NColumnShard::TEvPrivate::EEv::EvWritingFlushBuffer; +public: +}; + + +} diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/ya.make b/ydb/core/tx/columnshard/engines/writer/buffer/ya.make new file mode 100644 index 000000000000..78a3ee199907 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/writer/buffer/ya.make @@ -0,0 +1,20 @@ +LIBRARY() + +OWNER( + g:kikimr +) + +SRCS( + actor.cpp + events.cpp +) + +PEERDIR( + ydb/library/actors/core + ydb/core/protos + ydb/core/tablet_flat + ydb/library/yql/core/expr_nodes + ydb/library/actors/testlib/common +) + +END() diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp index a02d99f18ba0..318e4889fe07 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp @@ -2,37 +2,47 @@ #include #include +#include namespace NKikimr::NOlap { -TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData, const std::shared_ptr& action, std::vector&& blobsSplitted) - : BlobsSplitted(std::move(blobsSplitted)) - , WriteData(writeData) +TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const std::shared_ptr& action, std::vector>&& aggregations) + : Buffer(action, std::move(aggregations)) , DstActor(dstActor) - , Action(action) { - for (auto&& bInfo : BlobsSplitted) { - auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(bInfo.GetData(), Action)); - BlobData.emplace_back(TBlobRange::FromBlobId(task.GetBlobId()), bInfo.GetSpecialKeysSafe(), bInfo.GetRowsCount(), bInfo.GetRawBytes(), AppData()->TimeProvider->Now()); + auto blobs = Buffer.GroupIntoBlobs(); + for (auto&& b : blobs) { + auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.GetBlobData(), action)); + b.InitBlobId(task.GetBlobId()); } - ResourceUsage.SourceMemorySize = WriteData.GetSize(); } void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) { - WriteData.MutableWriteMeta().SetWriteMiddle4StartInstant(TMonotonic::Now()); - if (putResult->GetPutStatus() == NKikimrProto::OK) { - std::vector> actions = {Action}; - auto result = std::make_unique(putResult, std::move(BlobData), actions, WriteData.GetWriteMeta(), WriteData.GetData().GetSchemaVersion()); - ctx.Send(DstActor, result.release()); - } else { - auto result = std::make_unique(putResult, WriteData.GetWriteMeta()); - ctx.Send(DstActor, result.release()); - } + Buffer.InitReadyInstant(TMonotonic::Now()); + auto result = std::make_unique(putResult, std::move(Buffer)); + ctx.Send(DstActor, result.release()); } void TIndexedWriteController::DoOnStartSending() { - WriteData.MutableWriteMeta().SetWriteMiddle5StartInstant(TMonotonic::Now()); + Buffer.InitStartSending(TMonotonic::Now()); +} + +void TWideSerializedBatch::InitBlobId(const TUnifiedBlobId& id) { + AFL_VERIFY(!Range.BlobId.GetTabletId()); + Range.BlobId = id; +} + +void TWritingBuffer::InitReadyInstant(const TMonotonic instant) { + for (auto&& aggr : Aggregations) { + aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle4StartInstant(instant); + } +} + +void TWritingBuffer::InitStartSending(const TMonotonic instant) { + for (auto&& aggr : Aggregations) { + aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle5StartInstant(instant); + } } } diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h index 87a165c29d4b..7f977a171597 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h @@ -8,24 +8,187 @@ #include #include #include -#include #include namespace NKikimr::NOlap { +class TWriteAggregation; + +class TWideSerializedBatch { +private: + NArrow::TSerializedBatch SplittedBlobs; + YDB_ACCESSOR_DEF(TBlobRange, Range); + YDB_READONLY(TInstant, StartInstant, AppDataVerified().TimeProvider->Now()); + TWriteAggregation* ParentAggregation; +public: + void InitBlobId(const TUnifiedBlobId& id); + + const NArrow::TSerializedBatch& GetSplittedBlobs() const { + return SplittedBlobs; + } + + const NArrow::TSerializedBatch* operator->() const { + return &SplittedBlobs; + } + + TWriteAggregation& MutableAggregation() { + return *ParentAggregation; + } + + const TWriteAggregation& GetAggregation() const { + return *ParentAggregation; + } + + TWideSerializedBatch(NArrow::TSerializedBatch&& splitted, TWriteAggregation& parentAggregation) + : SplittedBlobs(std::move(splitted)) + , ParentAggregation(&parentAggregation) + { + + } +}; + +class TWritingBlob { +private: + std::vector Ranges; + YDB_READONLY_DEF(TString, BlobData); +public: + TWritingBlob() = default; + bool AddData(TWideSerializedBatch& batch) { + if (BlobData.size() + batch.GetSplittedBlobs().GetSize() < 8 * 1024 * 1024) { + Ranges.emplace_back(&batch); + batch.SetRange(TBlobRange(TUnifiedBlobId(0, 0, 0, 0, BlobData.size() + batch.GetSplittedBlobs().GetSize()), BlobData.size(), batch.GetSplittedBlobs().GetSize())); + BlobData += batch.GetSplittedBlobs().GetData(); + return true; + } else { + AFL_VERIFY(BlobData.size()); + return false; + } + } + + void InitBlobId(const TUnifiedBlobId& blobId) { + for (auto&& r : Ranges) { + r->InitBlobId(blobId); + } + } + + ui64 GetSize() const { + return BlobData.size(); + } +}; + +class TWriteAggregation { +private: + YDB_READONLY_DEF(std::shared_ptr, WriteData); + YDB_ACCESSOR_DEF(std::vector, SplittedBlobs); + YDB_READONLY_DEF(TVector, WriteIds); +public: + void AddWriteId(const TWriteId& id) { + WriteIds.emplace_back(id); + } + + TWriteAggregation(const std::shared_ptr& writeData, std::vector&& splittedBlobs) + : WriteData(writeData) { + for (auto&& s : splittedBlobs) { + SplittedBlobs.emplace_back(std::move(s), *this); + } + } + + TWriteAggregation(const std::shared_ptr& writeData) + : WriteData(writeData) { + } +}; + +class TWritingBuffer: public TMoveOnly { +private: + std::shared_ptr BlobsAction; + std::shared_ptr DeclareRemoveAction; + YDB_READONLY_DEF(std::vector>, Aggregations); + YDB_READONLY(ui64, SumSize, 0); +public: + TWritingBuffer() = default; + TWritingBuffer(const std::shared_ptr& action, std::vector>&& aggregations) + : BlobsAction(action) + , Aggregations(std::move(aggregations)) + { + AFL_VERIFY(BlobsAction); + for (auto&& aggr : Aggregations) { + SumSize += aggr->GetWriteData()->GetSize(); + } + } + + bool IsEmpty() const { + return Aggregations.empty(); + } + + void RemoveData(const std::shared_ptr& data, const std::shared_ptr& bOperator) { + THashMap linksCount; + for (auto&& a : Aggregations) { + for (auto&& s : a->GetSplittedBlobs()) { + ++linksCount[s.GetRange().BlobId]; + } + } + + for (ui32 i = 0; i < Aggregations.size(); ++i) { + if (Aggregations[i].get() == data.get()) { + for (auto&& s : Aggregations[i]->GetSplittedBlobs()) { + if (--linksCount[s.GetRange().BlobId] == 0) { + if (!DeclareRemoveAction) { + DeclareRemoveAction = bOperator->StartDeclareRemovingAction("WRITING_BUFFER"); + } + DeclareRemoveAction->DeclareRemove(s.GetRange().BlobId); + } + } + Aggregations.erase(Aggregations.begin() + i); + return; + } + } + AFL_VERIFY(false); + } + + std::vector> GetAddActions() const { + return {BlobsAction}; + } + + std::vector> GetRemoveActions() const { + if (DeclareRemoveAction) { + return {DeclareRemoveAction}; + } else { + return {}; + } + } + + void InitReadyInstant(const TMonotonic instant); + void InitStartSending(const TMonotonic instant); + + std::vector GroupIntoBlobs() { + std::vector result; + TWritingBlob currentBlob; + for (auto&& aggr : Aggregations) { + for (auto&& bInfo : aggr->MutableSplittedBlobs()) { + if (!currentBlob.AddData(bInfo)) { + result.emplace_back(std::move(currentBlob)); + currentBlob = TWritingBlob(); + AFL_VERIFY(currentBlob.AddData(bInfo)); + } + } + } + if (currentBlob.GetSize()) { + result.emplace_back(std::move(currentBlob)); + } + return result; + } +}; + class TIndexedWriteController : public NColumnShard::IWriteController, public NColumnShard::TMonitoringObjectsCounter { private: - std::vector BlobsSplitted; - NEvWrite::TWriteData WriteData; - TVector BlobData; + TWritingBuffer Buffer; TActorId DstActor; - std::shared_ptr Action; void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override; virtual void DoOnStartSending() override; public: - TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData, const std::shared_ptr& action, std::vector&& blobsSplitted); + TIndexedWriteController(const TActorId& dstActor, const std::shared_ptr& action, std::vector>&& aggregations); }; diff --git a/ydb/core/tx/columnshard/engines/writer/write_controller.h b/ydb/core/tx/columnshard/engines/writer/write_controller.h index 4566a9d2e9ee..554423c88863 100644 --- a/ydb/core/tx/columnshard/engines/writer/write_controller.h +++ b/ydb/core/tx/columnshard/engines/writer/write_controller.h @@ -11,15 +11,13 @@ namespace NKikimr::NColumnShard { -class TBlobPutResult : public NColumnShard::TPutStatus { +class TBlobPutResult: public NColumnShard::TPutStatus { public: using TPtr = std::shared_ptr; TBlobPutResult(NKikimrProto::EReplyStatus status, THashSet&& yellowMoveChannels, - THashSet&& yellowStopChannels, - const NColumnShard::TUsage& resourceUsage) - : ResourceUsage(resourceUsage) + THashSet&& yellowStopChannels) { SetPutStatus(status, std::move(yellowMoveChannels), std::move(yellowStopChannels)); } @@ -27,17 +25,6 @@ class TBlobPutResult : public NColumnShard::TPutStatus { TBlobPutResult(NKikimrProto::EReplyStatus status) { SetPutStatus(status); } - - void AddResources(const NColumnShard::TUsage& usage) { - ResourceUsage.Add(usage); - } - - TAutoPtr StartCpuGuard() { - return new TCpuGuard(ResourceUsage); - } - -private: - YDB_READONLY_DEF(NColumnShard::TUsage, ResourceUsage); }; class IWriteController { @@ -46,7 +33,6 @@ class IWriteController { THashMap> WritingActions; std::deque WriteTasks; protected: - TUsage ResourceUsage; virtual void DoOnReadyResult(const NActors::TActorContext& ctx, const TBlobPutResult::TPtr& putResult) = 0; virtual void DoOnBlobWriteResult(const TEvBlobStorage::TEvPutResult& /*result*/) { @@ -75,7 +61,6 @@ class IWriteController { } void OnReadyResult(const NActors::TActorContext& ctx, const TBlobPutResult::TPtr& putResult) { - putResult->AddResources(ResourceUsage); DoOnReadyResult(ctx, putResult); } diff --git a/ydb/core/tx/columnshard/engines/writer/ya.make b/ydb/core/tx/columnshard/engines/writer/ya.make index fac3935c8827..c4f0c8d118be 100644 --- a/ydb/core/tx/columnshard/engines/writer/ya.make +++ b/ydb/core/tx/columnshard/engines/writer/ya.make @@ -14,6 +14,7 @@ PEERDIR( ydb/core/blobstorage/vdisk/protos ydb/core/tablet_flat ydb/core/formats/arrow + ydb/core/tx/columnshard/engines/writer/buffer ydb/library/actors/core diff --git a/ydb/core/tx/columnshard/operations/slice_builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder.cpp index 23f5b80ba5a6..08726dcd280d 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder.cpp +++ b/ydb/core/tx/columnshard/operations/slice_builder.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace NKikimr::NOlap { @@ -12,7 +13,7 @@ std::optional> TBuildSlicesTask:: const ui64 tableId = writeMeta.GetTableId(); const ui64 writeId = writeMeta.GetWriteId(); - std::shared_ptr batch = WriteData.GetDataPtr()->ExtractBatch(); + std::shared_ptr batch = WriteData.GetData()->ExtractBatch(); if (!batch) { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_bad_data")("write_id", writeId)("table_id", tableId); @@ -20,7 +21,7 @@ std::optional> TBuildSlicesTask:: } NArrow::TBatchSplitttingContext context(NColumnShard::TLimits::GetMaxBlobSize()); - context.SetFieldsForSpecialKeys(PrimaryKeySchema); + context.SetFieldsForSpecialKeys(WriteData.GetPrimaryKeySchema()); auto splitResult = NArrow::SplitByBlobSize(batch, context); if (!splitResult) { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", TStringBuilder() << "cannot split batch in according to limits: " + splitResult.GetErrorMessage()); @@ -39,15 +40,13 @@ bool TBuildSlicesTask::DoExecute() { WriteData.MutableWriteMeta().SetWriteMiddle2StartInstant(TMonotonic::Now()); auto batches = BuildSlices(); WriteData.MutableWriteMeta().SetWriteMiddle3StartInstant(TMonotonic::Now()); + auto writeDataPtr = std::make_shared(std::move(WriteData)); if (batches) { - auto writeController = std::make_shared(ParentActorId, WriteData, Action, std::move(*batches)); - if (batches && Action->NeedDraftTransaction()) { - TActorContext::AsActorContext().Send(ParentActorId, std::make_unique(writeController)); - } else { - TActorContext::AsActorContext().Register(NColumnShard::CreateWriteActor(TabletId, writeController, TInstant::Max())); - } + auto result = std::make_unique(writeDataPtr, std::move(*batches)); + TActorContext::AsActorContext().Send(BufferActorId, result.release()); } else { - auto result = std::make_unique(std::make_shared(NKikimrProto::EReplyStatus::CORRUPTED), WriteData.GetWriteMeta()); + TWritingBuffer buffer(writeDataPtr->GetBlobsAction(), {std::make_shared(writeDataPtr)}); + auto result = std::make_unique(std::make_shared(NKikimrProto::EReplyStatus::CORRUPTED), std::move(buffer)); TActorContext::AsActorContext().Send(ParentActorId, result.release()); } diff --git a/ydb/core/tx/columnshard/operations/slice_builder.h b/ydb/core/tx/columnshard/operations/slice_builder.h index 0a45ad16c99d..0b5704ed260c 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder.h +++ b/ydb/core/tx/columnshard/operations/slice_builder.h @@ -8,12 +8,11 @@ namespace NKikimr::NOlap { class TBuildSlicesTask: public NConveyor::ITask { private: - std::shared_ptr Action; NEvWrite::TWriteData WriteData; const ui64 TabletId; const NActors::TActorId ParentActorId; + const NActors::TActorId BufferActorId; std::optional> BuildSlices(); - std::shared_ptr PrimaryKeySchema; protected: virtual bool DoExecute() override; public: @@ -21,15 +20,13 @@ class TBuildSlicesTask: public NConveyor::ITask { return "Write::ConstructBlobs::Slices"; } - TBuildSlicesTask(const ui64 tabletId, const NActors::TActorId parentActorId, const std::shared_ptr& action, - const NEvWrite::TWriteData& writeData, const std::shared_ptr& primaryKeySchema) - : Action(action) - , WriteData(writeData) + TBuildSlicesTask(const ui64 tabletId, const NActors::TActorId parentActorId, + const NActors::TActorId bufferActorId, NEvWrite::TWriteData&& writeData) + : WriteData(std::move(writeData)) , TabletId(tabletId) , ParentActorId(parentActorId) - , PrimaryKeySchema(primaryKeySchema) + , BufferActorId(bufferActorId) { - Y_ABORT_UNLESS(Action); } }; } diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp index 7e023405b45c..b580b2c2f340 100644 --- a/ydb/core/tx/columnshard/operations/write.cpp +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -25,8 +25,8 @@ namespace NKikimr::NColumnShard { Y_ABORT_UNLESS(Status == EOperationStatus::Draft); NEvWrite::TWriteMeta writeMeta((ui64)WriteId, tableId, source); - std::shared_ptr task = std::make_shared(owner.TabletID(), ctx.SelfID, - owner.StoragesManager->GetInsertOperator()->StartWritingAction("WRITING_OPERATOR"), NEvWrite::TWriteData(writeMeta, data), owner.TablesManager.GetPrimaryIndex()->GetReplaceKey()); + std::shared_ptr task = std::make_shared(owner.TabletID(), ctx.SelfID, owner.BufferizationWriteActorId, + NEvWrite::TWriteData(writeMeta, data, owner.TablesManager.GetPrimaryIndex()->GetReplaceKey(), owner.StoragesManager->GetInsertOperator()->StartWritingAction("WRITING_OPERATOR"))); NConveyor::TCompServiceOperator::SendTaskToExecute(task); Status = EOperationStatus::Started; diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp index 7037cd4217c4..5d858afe6564 100644 --- a/ydb/core/tx/columnshard/write_actor.cpp +++ b/ydb/core/tx/columnshard/write_actor.cpp @@ -8,10 +8,8 @@ namespace NKikimr::NColumnShard { namespace { -class TWriteActor : public TActorBootstrapped, public TMonitoringObjectsCounter { +class TWriteActor: public TActorBootstrapped, public TMonitoringObjectsCounter { ui64 TabletId; - TUsage ResourceUsage; - IWriteController::TPtr WriteController; THashSet YellowMoveChannels; @@ -22,8 +20,8 @@ class TWriteActor : public TActorBootstrapped, public TMonitoringOb TWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant deadline) : TabletId(tabletId) , WriteController(writeController) - , Deadline(deadline) - {} + , Deadline(deadline) { + } void Handle(TEvBlobStorage::TEvPutResult::TPtr& ev, const TActorContext& ctx) { TEvBlobStorage::TEvPutResult* msg = ev->Get(); @@ -63,9 +61,8 @@ class TWriteActor : public TActorBootstrapped, public TMonitoringOb } auto putResult = std::make_shared(putStatus, - std::move(YellowMoveChannels), - std::move(YellowStopChannels), - ResourceUsage); + std::move(YellowMoveChannels), + std::move(YellowStopChannels)); WriteController->OnReadyResult(ctx, putResult); Die(ctx); @@ -84,7 +81,6 @@ class TWriteActor : public TActorBootstrapped, public TMonitoringOb } while (auto writeInfo = WriteController->Next()) { - ResourceUsage.Network += writeInfo->GetData().size(); writeInfo->GetWriteOperator()->SendWriteBlobRequest(writeInfo->GetData(), writeInfo->GetBlobId()); } diff --git a/ydb/core/tx/data_events/write_data.cpp b/ydb/core/tx/data_events/write_data.cpp index 0bd361fda227..0457f72d86a7 100644 --- a/ydb/core/tx/data_events/write_data.cpp +++ b/ydb/core/tx/data_events/write_data.cpp @@ -6,14 +6,18 @@ namespace NKikimr::NEvWrite { -TWriteData::TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data) +TWriteData::TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data, const std::shared_ptr& primaryKeySchema, const std::shared_ptr& blobsAction) : WriteMeta(writeMeta) , Data(data) + , PrimaryKeySchema(primaryKeySchema) + , BlobsAction(blobsAction) { Y_ABORT_UNLESS(Data); + Y_ABORT_UNLESS(PrimaryKeySchema); + Y_ABORT_UNLESS(BlobsAction); } -const NKikimr::NEvWrite::IDataContainer& TWriteData::GetData() const { +const NKikimr::NEvWrite::IDataContainer& TWriteData::GetDataVerified() const { AFL_VERIFY(Data); return *Data; } diff --git a/ydb/core/tx/data_events/write_data.h b/ydb/core/tx/data_events/write_data.h index 8393cb7f11dc..739d9b3e89c5 100644 --- a/ydb/core/tx/data_events/write_data.h +++ b/ydb/core/tx/data_events/write_data.h @@ -6,6 +6,10 @@ #include +namespace NKikimr::NOlap { +class IBlobsWritingAction; +} + namespace NKikimr::NEvWrite { class IDataContainer { @@ -44,15 +48,13 @@ class TWriteMeta { class TWriteData { private: TWriteMeta WriteMeta; - IDataContainer::TPtr Data; + YDB_READONLY_DEF(IDataContainer::TPtr, Data); + YDB_READONLY_DEF(std::shared_ptr, PrimaryKeySchema); + YDB_READONLY_DEF(std::shared_ptr, BlobsAction); public: - TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data); + TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data, const std::shared_ptr& primaryKeySchema, const std::shared_ptr& blobsAction); - const IDataContainer& GetData() const; - - const IDataContainer::TPtr& GetDataPtr() const { - return Data; - } + const IDataContainer& GetDataVerified() const; const TWriteMeta& GetWriteMeta() const { return WriteMeta; diff --git a/ydb/library/actors/testlib/common/events_scheduling.cpp b/ydb/library/actors/testlib/common/events_scheduling.cpp new file mode 100644 index 000000000000..7e7847929435 --- /dev/null +++ b/ydb/library/actors/testlib/common/events_scheduling.cpp @@ -0,0 +1,23 @@ +#include "events_scheduling.h" + +namespace NActors::NTests { + +void TGlobalScheduledEvents::RegisterImpl(const ui32 eventType, const TEventSchedulingFeatures features) { + TGuard g(Mutex); + EventsForScheduling[eventType] = features; +} + +bool TGlobalScheduledEvents::ContainsImpl(const ui32 eventType) const { + return !!FeaturesImpl(eventType); +} + +std::optional TGlobalScheduledEvents::FeaturesImpl(const ui32 eventType) const { + TGuard g(Mutex); + auto it = EventsForScheduling.find(eventType); + if (it != EventsForScheduling.end()) { + return it->second; + } + return {}; +} + +} diff --git a/ydb/library/actors/testlib/common/events_scheduling.h b/ydb/library/actors/testlib/common/events_scheduling.h new file mode 100644 index 000000000000..df031f33f373 --- /dev/null +++ b/ydb/library/actors/testlib/common/events_scheduling.h @@ -0,0 +1,53 @@ +#pragma once +#include +#include +#include +#include + +namespace NActors::NTests { + +class TEventSchedulingFeatures { +private: + bool UseSchedulingLimit = false; +public: + TEventSchedulingFeatures& SetUseSchedulingLimit(const bool value = true) { + UseSchedulingLimit = value; + return *this; + } + bool GetUseSchedulingLimit() const { + return UseSchedulingLimit; + } +}; + +class TGlobalScheduledEvents { +private: + TMutex Mutex; + + THashMap EventsForScheduling; + + void RegisterImpl(const ui32 eventType, const TEventSchedulingFeatures features); + bool ContainsImpl(const ui32 eventType) const; + std::optional FeaturesImpl(const ui32 eventType) const; +public: + + class TRegistrator { + public: + TRegistrator(const ui32 eventType, const TEventSchedulingFeatures features = TEventSchedulingFeatures()) { + Singleton()->RegisterImpl(eventType, features); + } + }; + + static bool Contains(const ui32 eventType) { + return Singleton()->ContainsImpl(eventType); + } + + static std::optional Features(const ui32 eventType) { + return Singleton()->FeaturesImpl(eventType); + } + + static void Register(const ui32 eventType, const TEventSchedulingFeatures features = TEventSchedulingFeatures()) { + return Singleton()->RegisterImpl(eventType, features); + } + +}; +} diff --git a/ydb/library/actors/testlib/common/ya.make b/ydb/library/actors/testlib/common/ya.make new file mode 100644 index 000000000000..cd8c0f37e55c --- /dev/null +++ b/ydb/library/actors/testlib/common/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +OWNER( + g:kikimr +) + +SRCS( + events_scheduling.cpp +) + +PEERDIR( + ydb/library/actors/core +) + +END() diff --git a/ydb/library/actors/testlib/ya.make b/ydb/library/actors/testlib/ya.make index 91321ef888a1..4db7c55b6481 100644 --- a/ydb/library/actors/testlib/ya.make +++ b/ydb/library/actors/testlib/ya.make @@ -1,5 +1,10 @@ LIBRARY() +OWNER( + g:cloud-nbs + g:kikimr +) + SRCS( test_runtime.cpp ) @@ -8,6 +13,7 @@ PEERDIR( ydb/library/actors/core ydb/library/actors/interconnect/mock ydb/library/actors/protos + ydb/library/actors/testlib/common library/cpp/random_provider library/cpp/time_provider )