From 1b20c813983d35b8818d4c7684a6c58702bbc13d Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Sun, 24 Dec 2023 21:05:31 +0300 Subject: [PATCH] buffered writing for columnshards --- .../blobs_action/abstract/read.cpp | 4 +- .../columnshard/blobs_action/abstract/ya.make | 4 + .../tx/columnshard/blobs_action/bs/ya.make | 4 + .../blobs_action/counters/write.cpp | 2 + .../columnshard/blobs_action/counters/write.h | 4 + .../columnshard/blobs_action/counters/ya.make | 4 + .../tx/columnshard/blobs_action/tier/ya.make | 4 + .../transaction/tx_gc_insert_table.cpp | 4 +- .../blobs_action/transaction/tx_write.cpp | 126 ++++++++----- .../blobs_action/transaction/tx_write.h | 5 +- .../transaction/tx_write_index.cpp | 1 - .../blobs_action/transaction/ya.make | 4 + ydb/core/tx/columnshard/blobs_action/ya.make | 4 + .../tx/columnshard/columnshard__write.cpp | 123 ++++++------ ydb/core/tx/columnshard/columnshard_impl.h | 5 +- .../tx/columnshard/columnshard_schema.cpp | 14 +- ydb/core/tx/columnshard/columnshard_schema.h | 9 +- .../engines/changes/abstract/settings.h | 4 +- .../engines/changes/abstract/ya.make | 4 + .../engines/changes/compaction/ya.make | 4 + .../engines/changes/counters/ya.make | 4 + .../engines/changes/indexation.cpp | 10 +- .../engines/changes/with_appended.cpp | 5 +- .../tx/columnshard/engines/changes/ya.make | 4 + ydb/core/tx/columnshard/engines/db_wrapper.h | 3 +- .../engines/insert_table/insert_table.cpp | 22 ++- .../engines/insert_table/insert_table.h | 20 +- .../engines/insert_table/rt_insertion.cpp | 1 + .../engines/insert_table/rt_insertion.h | 2 +- .../columnshard/engines/insert_table/ya.make | 4 + .../tx/columnshard/engines/portions/ya.make | 4 + .../tx/columnshard/engines/predicate/ya.make | 4 + .../engines/reader/plain_reader/ya.make | 4 + .../tx/columnshard/engines/reader/ya.make | 4 + .../tx/columnshard/engines/scheme/ya.make | 4 + .../storage/optimizer/abstract/optimizer.h | 12 +- .../storage/optimizer/abstract/ya.make | 4 + .../storage/optimizer/intervals/optimizer.cpp | 6 +- .../storage/optimizer/intervals/optimizer.h | 2 +- .../storage/optimizer/intervals/ya.make | 4 + .../storage/optimizer/lbuckets/optimizer.h | 82 ++++---- .../storage/optimizer/lbuckets/ya.make | 4 + .../storage/optimizer/levels/optimizer.h | 14 +- .../engines/storage/optimizer/levels/ya.make | 4 + .../engines/storage/optimizer/ut/ya.make | 4 + .../engines/storage/optimizer/ya.make | 4 + .../tx/columnshard/engines/storage/ya.make | 4 + ydb/core/tx/columnshard/engines/ut/ya.make | 5 + .../engines/writer/buffer/actor.cpp | 46 +++++ .../columnshard/engines/writer/buffer/actor.h | 39 ++++ .../engines/writer/buffer/events.cpp | 5 + .../engines/writer/buffer/events.h | 31 ++++ .../columnshard/engines/writer/buffer/ya.make | 20 ++ .../writer/indexed_blob_constructor.cpp | 46 +++-- .../engines/writer/indexed_blob_constructor.h | 175 +++++++++++++++++- .../engines/writer/write_controller.h | 19 +- .../tx/columnshard/engines/writer/ya.make | 5 + ydb/core/tx/columnshard/engines/ya.make | 5 + 58 files changed, 734 insertions(+), 234 deletions(-) create mode 100644 ydb/core/tx/columnshard/engines/writer/buffer/actor.cpp create mode 100644 ydb/core/tx/columnshard/engines/writer/buffer/actor.h create mode 100644 ydb/core/tx/columnshard/engines/writer/buffer/events.cpp create mode 100644 ydb/core/tx/columnshard/engines/writer/buffer/events.h create mode 100644 ydb/core/tx/columnshard/engines/writer/buffer/ya.make diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp b/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp index 1f0bc8559fb8..766dd21da79d 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp +++ b/ydb/core/tx/columnshard/blobs_action/abstract/read.cpp @@ -73,9 +73,9 @@ void IBlobsReadingAction::OnReadError(const TBlobRange& range, const TErrorStatu void IBlobsReadingAction::AddRange(const TBlobRange& range, const TString& result /*= Default()*/) { Y_ABORT_UNLESS(!Started); if (!result) { - Y_ABORT_UNLESS(RangesForRead[range.BlobId].emplace(range).second); + AFL_VERIFY(RangesForRead[range.BlobId].emplace(range).second)("range", range.ToString()); } else { - Y_ABORT_UNLESS(RangesForResult.emplace(range, result).second); + AFL_VERIFY(RangesForResult.emplace(range, result).second)("range", range.ToString()); } } diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/ya.make b/ydb/core/tx/columnshard/blobs_action/abstract/ya.make index 2f0b074a602a..f27dcb2c9ace 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/ya.make +++ b/ydb/core/tx/columnshard/blobs_action/abstract/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( gc.cpp common.cpp diff --git a/ydb/core/tx/columnshard/blobs_action/bs/ya.make b/ydb/core/tx/columnshard/blobs_action/bs/ya.make index 5974c3731cee..b6940aeec3f6 100644 --- a/ydb/core/tx/columnshard/blobs_action/bs/ya.make +++ b/ydb/core/tx/columnshard/blobs_action/bs/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( gc.cpp gc_actor.cpp diff --git a/ydb/core/tx/columnshard/blobs_action/counters/write.cpp b/ydb/core/tx/columnshard/blobs_action/counters/write.cpp index 5604d9d98b5a..25ef9ed6d1b2 100644 --- a/ydb/core/tx/columnshard/blobs_action/counters/write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/counters/write.cpp @@ -13,6 +13,8 @@ TWriteCounters::TWriteCounters(const TConsumerCounters& owner) ReplyBytes = TBase::GetDeriviative("Replies/Bytes"); ReplyDurationBySize = TBase::GetHistogram("Replies/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1)); ReplyDurationByCount = TBase::GetHistogram("Replies/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 1)); + WritesBySize = TBase::GetHistogram("Writes/Bytes", NMonitoring::ExponentialHistogram(25, 2, 1)); + VolumeByChunkSize = TBase::GetHistogram("Volume/Bytes", NMonitoring::ExponentialHistogram(25, 2, 1)); FailsCount = TBase::GetDeriviative("Fails/Count"); FailBytes = TBase::GetDeriviative("Fails/Bytes"); diff --git a/ydb/core/tx/columnshard/blobs_action/counters/write.h b/ydb/core/tx/columnshard/blobs_action/counters/write.h index 591937f0de2b..a9b7f0282817 100644 --- a/ydb/core/tx/columnshard/blobs_action/counters/write.h +++ b/ydb/core/tx/columnshard/blobs_action/counters/write.h @@ -16,6 +16,8 @@ class TWriteCounters: public NColumnShard::TCommonCountersOwner { NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes; NMonitoring::THistogramPtr ReplyDurationByCount; NMonitoring::THistogramPtr ReplyDurationBySize; + NMonitoring::THistogramPtr WritesBySize; + NMonitoring::THistogramPtr VolumeByChunkSize; NMonitoring::TDynamicCounters::TCounterPtr FailsCount; NMonitoring::TDynamicCounters::TCounterPtr FailBytes; @@ -34,6 +36,8 @@ class TWriteCounters: public NColumnShard::TCommonCountersOwner { ReplyBytes->Add(bytes); ReplyDurationByCount->Collect((i64)d.MilliSeconds()); ReplyDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes); + WritesBySize->Collect((i64)bytes); + VolumeByChunkSize->Collect((i64)bytes, (i64)bytes); } void OnFail(const ui64 bytes, const TDuration d) const { diff --git a/ydb/core/tx/columnshard/blobs_action/counters/ya.make b/ydb/core/tx/columnshard/blobs_action/counters/ya.make index 34b9f0747546..b2fabf7598ff 100644 --- a/ydb/core/tx/columnshard/blobs_action/counters/ya.make +++ b/ydb/core/tx/columnshard/blobs_action/counters/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( read.cpp storage.cpp diff --git a/ydb/core/tx/columnshard/blobs_action/tier/ya.make b/ydb/core/tx/columnshard/blobs_action/tier/ya.make index 2df622ce24e0..57bcad5afa37 100644 --- a/ydb/core/tx/columnshard/blobs_action/tier/ya.make +++ b/ydb/core/tx/columnshard/blobs_action/tier/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( adapter.cpp gc.cpp diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp index 26cedb6a87db..3b6529beed4c 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp @@ -14,9 +14,7 @@ bool TTxInsertTableCleanup::Execute(TTransactionContext& txc, const TActorContex auto storage = Self->StoragesManager->GetInsertOperator(); BlobsAction = storage->StartDeclareRemovingAction("TX_CLEANUP"); for (auto& [abortedWriteId, abortedData] : allAborted) { - Self->InsertTable->EraseAborted(dbTable, abortedData); - Y_ABORT_UNLESS(abortedData.GetBlobRange().IsFullBlob()); - BlobsAction->DeclareRemove(abortedData.GetBlobRange().GetBlobId()); + Self->InsertTable->EraseAborted(dbTable, abortedData, BlobsAction); } BlobsAction->OnExecuteTxAfterRemoving(*Self, blobManagerDb, true); return true; diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 16f5105e670f..6eca7b4878b0 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -1,26 +1,27 @@ #include "tx_write.h" namespace NKikimr::NColumnShard { -bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob) { - const NKikimrTxColumnShard::TLogicalMetadata& meta = blobData.GetLogicalMeta(); - - const auto& blobRange = blobData.GetBlobRange(); +bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId) { + NKikimrTxColumnShard::TLogicalMetadata meta; + meta.SetNumRows(batch->GetRowsCount()); + meta.SetRawBytes(batch->GetRawBytes()); + meta.SetDirtyWriteTimeSeconds(batch.GetStartInstant().Seconds()); + meta.SetSpecialKeysRawData(batch->GetSpecialKeysSafe().SerializeToString()); + + const auto& blobRange = batch.GetRange(); Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid()); // First write wins TBlobGroupSelector dsGroupSelector(Self->Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta()); - - auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(PutBlobResult->Get()->GetSchemaVersion()); + const auto& writeMeta = batch.GetAggregation().GetWriteData()->GetWriteMeta(); + auto schemeVersion = batch.GetAggregation().GetWriteData()->GetData()->GetSchemaVersion(); + auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion); - NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), blob); + NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData()); bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); if (ok) { - // Put new data into blob cache - Y_ABORT_UNLESS(blobRange.IsFullBlob()); - Self->UpdateInsertTableCounters(); return true; } @@ -31,62 +32,89 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute"); ACFL_DEBUG("event", "start_execute"); - const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta()); - Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId())); - - txc.DB.NoMoreReadsForTx(); - TWriteOperation::TPtr operation; - if (writeMeta.HasLongTxId()) { - AFL_VERIFY(PutBlobResult->Get()->GetBlobData().size() == 1)("count", PutBlobResult->Get()->GetBlobData().size()); - } else { - operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); - Y_ABORT_UNLESS(operation); - Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started); - } + const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer(); + for (auto&& aggr : buffer.GetAggregations()) { + const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta(); + Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId())); + txc.DB.NoMoreReadsForTx(); + TWriteOperation::TPtr operation; + if (writeMeta.HasLongTxId()) { + AFL_VERIFY(aggr->GetSplittedBlobs().size() == 1)("count", aggr->GetSplittedBlobs().size()); + } else { + operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); + Y_ABORT_UNLESS(operation); + Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started); + } - TVector writeIds; - for (auto blobData : PutBlobResult->Get()->GetBlobData()) { auto writeId = TWriteId(writeMeta.GetWriteId()); - if (operation) { - writeId = Self->BuildNextWriteId(txc); - } else { + if (!operation) { NIceDb::TNiceDb db(txc.DB); writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId()); + aggr->AddWriteId(writeId); } - if (!InsertOneBlob(txc, blobData, writeId, PutBlobResult->Get()->GetBlobVerified(blobData.GetBlobRange()))) { - LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix()); - Self->IncCounter(COUNTER_WRITE_DUPLICATE); + for (auto&& i : aggr->GetSplittedBlobs()) { + if (operation) { + writeId = Self->BuildNextWriteId(txc); + aggr->AddWriteId(writeId); + } + + if (!InsertOneBlob(txc, i, writeId)) { + LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix()); + Self->IncCounter(COUNTER_WRITE_DUPLICATE); + } } - writeIds.push_back(writeId); } TBlobManagerDb blobManagerDb(txc.DB); - AFL_VERIFY(PutBlobResult->Get()->GetActions().size() == 1); - AFL_VERIFY(PutBlobResult->Get()->GetActions().front()->GetBlobsCount() == PutBlobResult->Get()->GetBlobData().size()); - for (auto&& i : PutBlobResult->Get()->GetActions()) { + AFL_VERIFY(buffer.GetAddActions().size() == 1); + for (auto&& i : buffer.GetAddActions()) { i->OnExecuteTxAfterWrite(*Self, blobManagerDb, true); } - - if (operation) { - operation->OnWriteFinish(txc, writeIds); - auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc); - Y_UNUSED(txInfo); - NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId()); - Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo); - } else { - Y_ABORT_UNLESS(writeIds.size() == 1); - Result = std::make_unique(Self->TabletID(), writeMeta, (ui64)writeIds.front(), NKikimrTxColumnShard::EResultStatus::SUCCESS); + for (auto&& i : buffer.GetRemoveActions()) { + i->OnExecuteTxAfterRemoving(*Self, blobManagerDb, true); + } + for (auto&& aggr : buffer.GetAggregations()) { + const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta(); + std::unique_ptr result; + TWriteOperation::TPtr operation; + if (!writeMeta.HasLongTxId()) { + operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); + Y_ABORT_UNLESS(operation); + Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started); + } + if (operation) { + operation->OnWriteFinish(txc, aggr->GetWriteIds()); + auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc); + Y_UNUSED(txInfo); + NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId()); + Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo)); + } else { + Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1); + Results.emplace_back(std::make_unique(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS)); + } } return true; } void TTxWrite::Complete(const TActorContext& ctx) { NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete"); - AFL_VERIFY(Result); - Self->CSCounters.OnWriteTxComplete((TMonotonic::Now() - PutBlobResult->Get()->GetWriteMeta().GetWriteStartInstant()).MilliSeconds()); - Self->CSCounters.OnSuccessWriteResponse(); - ctx.Send(PutBlobResult->Get()->GetWriteMeta().GetSource(), Result.release()); + const auto now = TMonotonic::Now(); + const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer(); + for (auto&& i : buffer.GetAddActions()) { + i->OnCompleteTxAfterWrite(*Self); + } + for (auto&& i : buffer.GetRemoveActions()) { + i->OnCompleteTxAfterRemoving(*Self); + } + AFL_VERIFY(buffer.GetAggregations().size() == Results.size()); + for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) { + const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta(); + ctx.Send(writeMeta.GetSource(), Results[i].release()); + Self->CSCounters.OnWriteTxComplete((now - writeMeta.GetWriteStartInstant()).MilliSeconds()); + Self->CSCounters.OnSuccessWriteResponse(); + } + } } diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h index c0b6688c3064..6086542940f6 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h @@ -1,5 +1,6 @@ #pragma once #include +#include namespace NKikimr::NColumnShard { @@ -15,12 +16,12 @@ class TTxWrite : public TTransactionBase { void Complete(const TActorContext& ctx) override; TTxType GetTxType() const override { return TXTYPE_WRITE; } - bool InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob); + bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId); private: TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult; const ui32 TabletTxNo; - std::unique_ptr Result; + std::vector> Results; TStringBuilder TxPrefix() const { return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] "; diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp index 3d5746d91fdc..e163888990a0 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp @@ -64,7 +64,6 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) { Self->EnqueueBackgroundActivities(false, TriggerActivity); } - Self->UpdateResourceMetrics(ctx, Ev->Get()->PutResult->GetResourceUsage()); changes->MutableBlobsAction().OnCompleteTxAfterAction(*Self); NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexComplete(Self->TabletID(), changes->TypeString()); } diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/ya.make b/ydb/core/tx/columnshard/blobs_action/transaction/ya.make index 27268e5fd7f1..bab203fab75c 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/ya.make +++ b/ydb/core/tx/columnshard/blobs_action/transaction/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( tx_draft.cpp tx_write.cpp diff --git a/ydb/core/tx/columnshard/blobs_action/ya.make b/ydb/core/tx/columnshard/blobs_action/ya.make index 320cb63e4e79..fbb7ffa4b7f0 100644 --- a/ydb/core/tx/columnshard/blobs_action/ya.make +++ b/ydb/core/tx/columnshard/blobs_action/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( blob_manager_db.cpp memory.cpp diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 73b89494a7ad..101d3a296f96 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -56,55 +56,63 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo auto& putResult = ev->Get()->GetPutResult(); OnYellowChannels(putResult); - const auto& writeMeta = ev->Get()->GetWriteMeta(); + NOlap::TWritingBuffer& wBuffer = ev->Get()->MutableWritesBuffer(); + auto& baseAggregations = wBuffer.GetAggregations(); - if (!TablesManager.IsReadyForWrite(writeMeta.GetTableId())) { - ACFL_ERROR("event", "absent_pathId")("path_id", writeMeta.GetTableId())("has_index", TablesManager.HasPrimaryIndex()); - IncCounter(COUNTER_WRITE_FAIL); - - auto result = std::make_unique(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); - ctx.Send(writeMeta.GetSource(), result.release()); - CSCounters.OnFailedWriteResponse(); - return; - } + auto wg = WritesMonitor.FinishWrite(wBuffer.GetSumSize(), wBuffer.GetAggregations().size()); - auto wg = WritesMonitor.FinishWrite(putResult.GetResourceUsage().SourceMemorySize); + for (auto&& aggr : baseAggregations) { + const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta(); - if (putResult.GetPutStatus() != NKikimrProto::OK) { - CSCounters.OnWritePutBlobsFail((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds()); - IncCounter(COUNTER_WRITE_FAIL); + if (!TablesManager.IsReadyForWrite(writeMeta.GetTableId())) { + ACFL_ERROR("event", "absent_pathId")("path_id", writeMeta.GetTableId())("has_index", TablesManager.HasPrimaryIndex()); + IncCounter(COUNTER_WRITE_FAIL); - auto errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR; - if (putResult.GetPutStatus() == NKikimrProto::TIMEOUT || putResult.GetPutStatus() == NKikimrProto::DEADLINE) { - errCode = NKikimrTxColumnShard::EResultStatus::TIMEOUT; - } else if (putResult.GetPutStatus() == NKikimrProto::TRYLATER || putResult.GetPutStatus() == NKikimrProto::OUT_OF_SPACE) { - errCode = NKikimrTxColumnShard::EResultStatus::OVERLOADED; - } else if (putResult.GetPutStatus() == NKikimrProto::CORRUPTED) { - errCode = NKikimrTxColumnShard::EResultStatus::ERROR; + auto result = std::make_unique(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); + ctx.Send(writeMeta.GetSource(), result.release()); + CSCounters.OnFailedWriteResponse(); + wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator()); + continue; } - if (writeMeta.HasLongTxId()) { - auto result = std::make_unique(TabletID(), writeMeta, errCode); - ctx.Send(writeMeta.GetSource(), result.release()); + if (putResult.GetPutStatus() != NKikimrProto::OK) { + CSCounters.OnWritePutBlobsFail((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds()); + IncCounter(COUNTER_WRITE_FAIL); + + auto errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR; + if (putResult.GetPutStatus() == NKikimrProto::TIMEOUT || putResult.GetPutStatus() == NKikimrProto::DEADLINE) { + errCode = NKikimrTxColumnShard::EResultStatus::TIMEOUT; + } else if (putResult.GetPutStatus() == NKikimrProto::TRYLATER || putResult.GetPutStatus() == NKikimrProto::OUT_OF_SPACE) { + errCode = NKikimrTxColumnShard::EResultStatus::OVERLOADED; + } else if (putResult.GetPutStatus() == NKikimrProto::CORRUPTED) { + errCode = NKikimrTxColumnShard::EResultStatus::ERROR; + } + + if (writeMeta.HasLongTxId()) { + auto result = std::make_unique(TabletID(), writeMeta, errCode); + ctx.Send(writeMeta.GetSource(), result.release()); + } else { + auto operation = OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); + Y_ABORT_UNLESS(operation); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "put data fails"); + ctx.Send(writeMeta.GetSource(), result.release()); + } + CSCounters.OnFailedWriteResponse(); + wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator()); } else { - auto operation = OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); - Y_ABORT_UNLESS(operation); - auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "put data fails"); - ctx.Send(writeMeta.GetSource(), result.release()); + const TMonotonic now = TMonotonic::Now(); + CSCounters.OnWritePutBlobsSuccess((now - writeMeta.GetWriteStartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle1PutBlobsSuccess((now - writeMeta.GetWriteMiddle1StartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle2PutBlobsSuccess((now - writeMeta.GetWriteMiddle2StartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle3PutBlobsSuccess((now - writeMeta.GetWriteMiddle3StartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle4PutBlobsSuccess((now - writeMeta.GetWriteMiddle4StartInstant()).MilliSeconds()); + CSCounters.OnWriteMiddle5PutBlobsSuccess((now - writeMeta.GetWriteMiddle5StartInstant()).MilliSeconds()); + LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId() + << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID()); + } - CSCounters.OnFailedWriteResponse(); - } else { - CSCounters.OnWritePutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle1PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle1StartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle2PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle2StartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle3PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle3StartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle4PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle4StartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle5PutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteMiddle5StartInstant()).MilliSeconds()); - LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId() - << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID()); - - Execute(new TTxWrite(this, ev), ctx); } + Execute(new TTxWrite(this, ev), ctx); } void TColumnShard::Handle(TEvPrivate::TEvWriteDraft::TPtr& ev, const TActorContext& ctx) { @@ -127,15 +135,23 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex writeMeta.SetLongTxId(NLongTxService::TLongTxId::FromProto(record.GetLongTxId())); writeMeta.SetWritePartId(record.GetWritePartId()); - if (!TablesManager.IsReadyForWrite(tableId)) { - LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index") - << " at tablet " << TabletID()); - IncCounter(COUNTER_WRITE_FAIL); + const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex) { + IncCounter(signalIndex); - auto result = std::make_unique(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); - ctx.Send(source, result.release()); + ctx.Send(source, std::make_unique(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR)); CSCounters.OnFailedWriteResponse(); return; + }; + + if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "disabled"); + return returnFail(COUNTER_WRITE_FAIL); + } + + if (!TablesManager.IsReadyForWrite(tableId)) { + LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index") + << " at tablet " << TabletID()); + return returnFail(COUNTER_WRITE_FAIL); } const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema(); @@ -143,21 +159,17 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex if (!arrowData->ParseFromProto(record)) { LOG_S_ERROR("Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId() << " at tablet " << TabletID()); - IncCounter(COUNTER_WRITE_FAIL); - auto result = std::make_unique(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); - ctx.Send(source, result.release()); - CSCounters.OnFailedWriteResponse(); - return; + return returnFail(COUNTER_WRITE_FAIL); } - NEvWrite::TWriteData writeData(writeMeta, arrowData); + NEvWrite::TWriteData writeData(writeMeta, arrowData, snapshotSchema->GetIndexInfo().GetReplaceKey(), StoragesManager->GetInsertOperator()->StartWritingAction("WRITING")); auto overloadStatus = CheckOverloaded(tableId); if (overloadStatus != EOverloadStatus::None) { std::unique_ptr result = std::make_unique(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED); OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx); CSCounters.OnFailedWriteResponse(); } else { - if (ui64 writeId = (ui64) HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) { + if (ui64 writeId = (ui64)HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) { LOG_S_DEBUG("Write (duplicate) into pathId " << writeMeta.GetTableId() << " longTx " << writeMeta.GetLongTxIdUnsafe().ToString() << " at tablet " << TabletID()); @@ -178,8 +190,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex << WritesMonitor.DebugString() << " at tablet " << TabletID()); writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now()); - std::shared_ptr task = std::make_shared(TabletID(), SelfId(), - StoragesManager->GetInsertOperator()->StartWritingAction("WRITING"), writeData, snapshotSchema->GetIndexInfo().GetReplaceKey()); + std::shared_ptr task = std::make_shared(TabletID(), SelfId(), BufferizationWriteActorId, std::move(writeData)); NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task); } } @@ -240,7 +251,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor auto overloadStatus = CheckOverloaded(tableId); if (overloadStatus != EOverloadStatus::None) { - NEvWrite::TWriteData writeData(NEvWrite::TWriteMeta(0, tableId, source), arrowData); + NEvWrite::TWriteData writeData(NEvWrite::TWriteMeta(0, tableId, source), arrowData, nullptr, nullptr); std::unique_ptr result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error"); OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx); return; diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 73990e2e31e2..2f728fbc01a2 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -345,10 +345,10 @@ class TColumnShard return TGuard(*this); } - TGuard FinishWrite(const ui64 dataSize) { + TGuard FinishWrite(const ui64 dataSize, const ui32 writesCount = 1) { Y_ABORT_UNLESS(WritesInFlight > 0); Y_ABORT_UNLESS(WritesSizeInFlight >= dataSize); - --WritesInFlight; + WritesInFlight -= writesCount; WritesSizeInFlight -= dataSize; return TGuard(*this); } @@ -396,6 +396,7 @@ class TColumnShard TInstant LastStatsReport; TActorId ResourceSubscribeActor; + TActorId BufferizationWriteActorId; TActorId StatsReportPipe; std::shared_ptr StoragesManager; diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp index b1c173063be2..039faafa8794 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/columnshard_schema.cpp @@ -51,7 +51,18 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG if (metaStr) { Y_ABORT_UNLESS(meta.ParseFromString(metaStr)); } - TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, 0, blobId.BlobSize()), meta, schemaVersion, {}); + + std::optional rangeOffset; + if (rowset.HaveValue()) { + rangeOffset = rowset.GetValue(); + } + std::optional rangeSize; + if (rowset.HaveValue()) { + rangeSize = rowset.GetValue(); + } + + AFL_VERIFY(!!rangeOffset == !!rangeSize); + TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, rangeOffset.value_or(0), rangeSize.value_or(blobId.BlobSize())), meta, schemaVersion, {}); switch (recType) { case EInsertTableIds::Inserted: @@ -64,7 +75,6 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG insertTable.AddAborted(std::move(data), true); break; } - if (!rowset.Next()) { return false; } diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index b9116a0df772..1dc2ede7060f 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -206,14 +206,17 @@ struct Schema : NIceDb::Schema { struct WriteTxId : Column<3, NScheme::NTypeIds::Uint64> {}; struct PathId : Column<4, NScheme::NTypeIds::Uint64> {}; struct DedupId : Column<5, NScheme::NTypeIds::String> {}; - struct BlobId : Column<6, NScheme::NTypeIds::String> {}; + struct BlobId: Column<6, NScheme::NTypeIds::String> {}; struct Meta : Column<7, NScheme::NTypeIds::String> {}; struct IndexPlanStep : Column<8, NScheme::NTypeIds::Uint64> {}; struct IndexTxId : Column<9, NScheme::NTypeIds::Uint64> {}; struct SchemaVersion : Column<10, NScheme::NTypeIds::Uint64> {}; + struct BlobRangeOffset: Column<11, NScheme::NTypeIds::Uint64> {}; + struct BlobRangeSize: Column<12, NScheme::NTypeIds::Uint64> {}; + using TKey = TableKey; - using TColumns = TableColumns; + using TColumns = TableColumns; }; struct IndexGranules : NIceDb::Schema::Table { @@ -465,6 +468,8 @@ struct Schema : NIceDb::Schema { static void InsertTable_Upsert(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) { db.Table().Key((ui8)recType, data.PlanStep, data.WriteTxId, data.PathId, data.DedupId).Update( NIceDb::TUpdate(data.GetBlobRange().GetBlobId().ToStringLegacy()), + NIceDb::TUpdate(data.GetBlobRange().Offset), + NIceDb::TUpdate(data.GetBlobRange().Size), NIceDb::TUpdate(data.GetMeta().SerializeToProto().SerializeAsString()), NIceDb::TUpdate(data.GetSchemaVersion()) ); diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/settings.h b/ydb/core/tx/columnshard/engines/changes/abstract/settings.h index e2d0e994d273..ec966965d52d 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/settings.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract/settings.h @@ -12,8 +12,8 @@ struct TCompactionLimits { static constexpr const ui64 DEFAULT_EVICTION_BYTES = 64 * 1024 * 1024; static constexpr const ui64 MAX_BLOBS_TO_DELETE = 10000; - static constexpr const ui64 OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID = (ui64)2 << 30; - static constexpr const ui64 WARNING_INSERT_TABLE_SIZE_BY_PATH_ID = 0.3 * OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID; + static constexpr const ui64 OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID = (ui64)1 << 30; + static constexpr const ui64 WARNING_INSERT_TABLE_SIZE_BY_PATH_ID = 0.5 * OVERLOAD_INSERT_TABLE_SIZE_BY_PATH_ID; static constexpr const ui64 WARNING_INSERT_TABLE_COUNT_BY_PATH_ID = 100; static constexpr const i64 OVERLOAD_GRANULE_SIZE = 20 * MAX_BLOB_SIZE; diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/ya.make b/ydb/core/tx/columnshard/engines/changes/abstract/ya.make index 872a05a8540a..c78a00978832 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/ya.make +++ b/ydb/core/tx/columnshard/engines/changes/abstract/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( abstract.cpp compaction_info.cpp diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/ya.make b/ydb/core/tx/columnshard/engines/changes/compaction/ya.make index aa52c0f9d6a0..71f7f1745e22 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/ya.make +++ b/ydb/core/tx/columnshard/engines/changes/compaction/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( merge_context.cpp column_cursor.cpp diff --git a/ydb/core/tx/columnshard/engines/changes/counters/ya.make b/ydb/core/tx/columnshard/engines/changes/counters/ya.make index 4434502eefa3..5e97e4266b65 100644 --- a/ydb/core/tx/columnshard/engines/changes/counters/ya.make +++ b/ydb/core/tx/columnshard/engines/changes/counters/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( general.cpp ) diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 9247e37a6461..c68100010934 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -15,9 +15,9 @@ bool TInsertColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApp void TInsertColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) { TBase::DoWriteIndex(self, context); + auto removing = BlobsAction.GetRemoving(IStoragesManager::DefaultStorageId); for (const auto& insertedData : DataToIndex) { - self.InsertTable->EraseCommitted(context.DBWrapper, insertedData); - Y_ABORT_UNLESS(insertedData.GetBlobRange().IsFullBlob()); + self.InsertTable->EraseCommitted(context.DBWrapper, insertedData, removing); } if (!DataToIndex.empty()) { self.UpdateInsertTableCounters(); @@ -27,13 +27,9 @@ void TInsertColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, void TInsertColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { TBase::DoStart(self); Y_ABORT_UNLESS(DataToIndex.size()); - auto removing = BlobsAction.GetRemoving(IStoragesManager::DefaultStorageId); auto reading = BlobsAction.GetReading(IStoragesManager::DefaultStorageId); - for (size_t i = 0; i < DataToIndex.size(); ++i) { - const auto& insertedData = DataToIndex[i]; - Y_ABORT_UNLESS(insertedData.GetBlobRange().IsFullBlob()); + for (auto&& insertedData : DataToIndex) { reading->AddRange(insertedData.GetBlobRange(), insertedData.GetBlobData().value_or("")); - removing->DeclareRemove(insertedData.GetBlobRange().GetBlobId()); } self.BackgroundController.StartIndexing(*this); diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index bdda3b38c6aa..8e8423ece8eb 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -58,13 +58,13 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange // Save new portions (their column records) { auto g = self.GranulesStorage->StartPackModification(); - + THashSet usedPortionIds; for (auto& [_, portionInfo] : PortionsToRemove) { Y_ABORT_UNLESS(!portionInfo.Empty()); Y_ABORT_UNLESS(portionInfo.HasRemoveSnapshot()); const TPortionInfo& oldInfo = self.GetGranuleVerified(portionInfo.GetPathId()).GetPortionVerified(portionInfo.GetPortion()); - + AFL_VERIFY(usedPortionIds.emplace(portionInfo->GetPortionId()))("portion_info", portionInfo->DebugString(true)); self.UpsertPortion(portionInfo, &oldInfo); for (auto& record : portionInfo.Records) { @@ -74,6 +74,7 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange for (auto& portionInfoWithBlobs : AppendedPortions) { auto& portionInfo = portionInfoWithBlobs.GetPortionInfo(); Y_ABORT_UNLESS(!portionInfo.Empty()); + AFL_VERIFY(usedPortionIds.emplace(portionInfo->GetPortionId()))("portion_info", portionInfo->DebugString(true)); self.UpsertPortion(portionInfo); for (auto& record : portionInfo.Records) { self.ColumnsTable->Write(context.DB, portionInfo, record); diff --git a/ydb/core/tx/columnshard/engines/changes/ya.make b/ydb/core/tx/columnshard/engines/changes/ya.make index faf74a7c05bb..8a6bc24609d8 100644 --- a/ydb/core/tx/columnshard/engines/changes/ya.make +++ b/ydb/core/tx/columnshard/engines/changes/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( compaction.cpp ttl.cpp diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h index c44bc7130011..2767ec4f8848 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.h +++ b/ydb/core/tx/columnshard/engines/db_wrapper.h @@ -51,8 +51,7 @@ class TDbWrapper : public IDbWrapper { void EraseCommitted(const TInsertedData& data) override; void EraseAborted(const TInsertedData& data) override; - bool Load(TInsertTableAccessor& insertTable, - const TInstant& loadTime) override; + bool Load(TInsertTableAccessor& insertTable, const TInstant& loadTime) override; void WriteColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; void EraseColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index 87eaa6adae93..19a5a6eae3ff 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -7,6 +7,7 @@ namespace NKikimr::NOlap { bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) { if (auto* dataPtr = Summary.AddInserted(std::move(data))) { + AddBlobLink(dataPtr->GetBlobRange().BlobId); dbTable.Insert(*dataPtr); return true; } else { @@ -86,14 +87,16 @@ THashSet TInsertTable::DropPath(IDbWrapper& dbTable, ui64 pathId) { return Summary.GetInsertedByPathId(pathId); } -void TInsertTable::EraseCommitted(IDbWrapper& dbTable, const TInsertedData& data) { +void TInsertTable::EraseCommitted(IDbWrapper& dbTable, const TInsertedData& data, const std::shared_ptr& blobsAction) { if (Summary.EraseCommitted(data)) { + RemoveBlobLink(data.GetBlobRange().BlobId, blobsAction); dbTable.EraseCommitted(data); } } -void TInsertTable::EraseAborted(IDbWrapper& dbTable, const TInsertedData& data) { +void TInsertTable::EraseAborted(IDbWrapper& dbTable, const TInsertedData& data, const std::shared_ptr& blobsAction) { if (Summary.EraseAborted((TWriteId)data.WriteTxId)) { + RemoveBlobLink(data.GetBlobRange().BlobId, blobsAction); dbTable.EraseAborted(data); } } @@ -132,4 +135,19 @@ std::vector TInsertTable::Read(ui64 pathId, const TSnapshot& sna return result; } +bool TInsertTableAccessor::RemoveBlobLink(const TUnifiedBlobId& blobId, const std::shared_ptr& blobsAction) { + AFL_VERIFY(blobsAction); + auto itBlob = BlobLinks.find(blobId); + AFL_VERIFY(itBlob != BlobLinks.end()); + AFL_VERIFY(itBlob->second >= 1); + if (itBlob->second == 1) { + blobsAction->DeclareRemove(itBlob->first); + BlobLinks.erase(itBlob); + return true; + } else { + --itBlob->second; + return false; + } +} + } diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h index a5dccec179d6..65b8a8ef49be 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h @@ -15,18 +15,34 @@ class IDbWrapper; class TInsertTableAccessor { protected: TInsertionSummary Summary; + THashMap BlobLinks; + + void AddBlobLink(const TUnifiedBlobId& blobId) { + ++BlobLinks[blobId]; + } + + bool RemoveBlobLink(const TUnifiedBlobId& blobId, const std::shared_ptr& blobsAction); public: const std::map>& GetPathPriorities() const { return Summary.GetPathPriorities(); } bool AddInserted(TInsertedData&& data, const bool load) { + if (load) { + AddBlobLink(data.GetBlobRange().BlobId); + } return Summary.AddInserted(std::move(data), load); } bool AddAborted(TInsertedData&& data, const bool load) { + if (load) { + AddBlobLink(data.GetBlobRange().BlobId); + } return Summary.AddAborted(std::move(data), load); } bool AddCommitted(TInsertedData&& data, const bool load) { + if (load) { + AddBlobLink(data.GetBlobRange().BlobId); + } const ui64 pathId = data.PathId; return Summary.GetPathInfo(pathId).AddCommitted(std::move(data), load); } @@ -56,8 +72,8 @@ class TInsertTable: public TInsertTableAccessor { void Abort(IDbWrapper& dbTable, const THashSet& writeIds); THashSet OldWritesToAbort(const TInstant& now) const; THashSet DropPath(IDbWrapper& dbTable, ui64 pathId); - void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key); - void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key); + void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key, const std::shared_ptr& blobsAction); + void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key, const std::shared_ptr& blobsAction); std::vector Read(ui64 pathId, const TSnapshot& snapshot, const std::shared_ptr& pkSchema) const; bool Load(IDbWrapper& dbTable, const TInstant loadTime); private: diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp index 3672781917d9..cbe7d900356d 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp @@ -1,5 +1,6 @@ #include "rt_insertion.h" #include +#include namespace NKikimr::NOlap { diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h index edee77f8dbb2..97c2575c47a4 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h @@ -4,7 +4,7 @@ #include "path_info.h" namespace NKikimr::NOlap { - +class IBlobsDeclareRemovingAction; class TInsertionSummary { public: struct TCounters { diff --git a/ydb/core/tx/columnshard/engines/insert_table/ya.make b/ydb/core/tx/columnshard/engines/insert_table/ya.make index 5f1d92bfb0ee..8ed43df7d430 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/ya.make +++ b/ydb/core/tx/columnshard/engines/insert_table/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( insert_table.cpp rt_insertion.cpp diff --git a/ydb/core/tx/columnshard/engines/portions/ya.make b/ydb/core/tx/columnshard/engines/portions/ya.make index 7a6c96a9a8a2..58d80bf3f47a 100644 --- a/ydb/core/tx/columnshard/engines/portions/ya.make +++ b/ydb/core/tx/columnshard/engines/portions/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( portion_info.cpp column_record.cpp diff --git a/ydb/core/tx/columnshard/engines/predicate/ya.make b/ydb/core/tx/columnshard/engines/predicate/ya.make index 10d5ff3b45b7..8deb9ed79b3d 100644 --- a/ydb/core/tx/columnshard/engines/predicate/ya.make +++ b/ydb/core/tx/columnshard/engines/predicate/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( container.cpp range.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make index c9224b8d780c..1be24a42352a 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( scanner.cpp source.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/ya.make b/ydb/core/tx/columnshard/engines/reader/ya.make index f673de8200e8..45c372805a21 100644 --- a/ydb/core/tx/columnshard/engines/reader/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( conveyor_task.cpp description.cpp diff --git a/ydb/core/tx/columnshard/engines/scheme/ya.make b/ydb/core/tx/columnshard/engines/scheme/ya.make index bace1130d669..8cbb19be7f98 100644 --- a/ydb/core/tx/columnshard/engines/scheme/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( abstract_scheme.cpp snapshot_scheme.cpp diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h index d1d0ddfe1e77..9e9d1e222e3f 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h @@ -54,7 +54,7 @@ class IOptimizerPlanner { const ui64 PathId; YDB_READONLY(TInstant, ActualizationInstant, TInstant::Zero()); protected: - virtual void DoModifyPortions(const std::vector>& add, const std::vector>& remove) = 0; + virtual void DoModifyPortions(const THashMap>& add, const THashMap>& remove) = 0; virtual std::shared_ptr DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr granule, const THashSet& busyPortions) const = 0; virtual TOptimizationPriority DoGetUsefulMetric() const = 0; virtual void DoActualize(const TInstant currentInstant) = 0; @@ -76,15 +76,15 @@ class IOptimizerPlanner { class TModificationGuard: TNonCopyable { private: IOptimizerPlanner& Owner; - std::vector> AddPortions; - std::vector> RemovePortions; + THashMap> AddPortions; + THashMap> RemovePortions; public: TModificationGuard& AddPortion(const std::shared_ptr& portion) { if (HasAppData() && AppDataVerified().ColumnShardConfig.GetSkipOldGranules() && portion->GetDeprecatedGranuleId() > 0) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_granule")("granule_id", portion->GetDeprecatedGranuleId()); return *this; } - AddPortions.emplace_back(portion); + AFL_VERIFY(AddPortions.emplace(portion->GetPortionId(), portion).second); return*this; } @@ -93,7 +93,7 @@ class IOptimizerPlanner { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_granule")("granule_id", portion->GetDeprecatedGranuleId()); return *this; } - RemovePortions.emplace_back(portion); + AFL_VERIFY(RemovePortions.emplace(portion->GetPortionId(), portion).second); return*this; } @@ -121,7 +121,7 @@ class IOptimizerPlanner { return DoSerializeToJsonVisual(); } - void ModifyPortions(const std::vector>& add, const std::vector>& remove) { + void ModifyPortions(const THashMap>& add, const THashMap>& remove) { NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("path_id", PathId)); DoModifyPortions(add, remove); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/ya.make index 140b1ed351bf..3e623be27d15 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( optimizer.cpp ) diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp index acab75cd86aa..e7dca2655155 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp @@ -141,12 +141,12 @@ void TIntervalsOptimizerPlanner::AddPortion(const std::shared_ptr& AFL_VERIFY(RangedSegments.empty() == Positions.empty())("rs_size", RangedSegments.size())("p_size", Positions.size()); } -void TIntervalsOptimizerPlanner::DoModifyPortions(const std::vector>& add, const std::vector>& remove) { - for (auto&& i : remove) { +void TIntervalsOptimizerPlanner::DoModifyPortions(const THashMap>& add, const THashMap>& remove) { + for (auto&& [_, i] : remove) { SizeProblemBlobs.RemovePortion(i); RemovePortion(i); } - for (auto&& i : add) { + for (auto&& [_, i] : add) { SizeProblemBlobs.AddPortion(i); AddPortion(i); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h index 5d7d0cd1efb0..80f3a5edc583 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.h @@ -243,7 +243,7 @@ class TIntervalsOptimizerPlanner: public IOptimizerPlanner { std::vector> GetPortionsForIntervalStartedIn(const NArrow::TReplaceKey& keyStart, const ui32 countExpectation) const; protected: - virtual void DoModifyPortions(const std::vector>& add, const std::vector>& remove) override; + virtual void DoModifyPortions(const THashMap>& add, const THashMap>& remove) override; virtual std::shared_ptr DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr granule, const THashSet& busyPortions) const override; virtual TOptimizationPriority DoGetUsefulMetric() const override; diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/ya.make index f76c42447ab7..04930f12fcff 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( optimizer.cpp blob_size.cpp diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h index f33de360b57b..c286c92ef240 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h @@ -122,16 +122,6 @@ class TPortionsPool { } } - bool AddFuture(const std::shared_ptr& portion) { - auto portionMaxSnapshotInstant = TInstant::MilliSeconds(portion->RecordSnapshotMax().GetPlanStep()); - if (Futures[portionMaxSnapshotInstant].emplace(portion->GetPortionId(), portion).second) { - Counters->FuturePortions->AddPortion(portion); - return true; - } else { - return false; - } - } - bool RemoveFutures(const TInstant instant) { auto itFutures = Futures.find(instant); if (itFutures == Futures.end()) { @@ -144,26 +134,14 @@ class TPortionsPool { return true; } - bool RemoveFutures(const TInstant instant, const std::vector>& portions) { - if (portions.empty()) { + bool AddFuture(const std::shared_ptr& portion) { + auto portionMaxSnapshotInstant = TInstant::MilliSeconds(portion->RecordSnapshotMax().GetPlanStep()); + if (Futures[portionMaxSnapshotInstant].emplace(portion->GetPortionId(), portion).second) { + Counters->FuturePortions->AddPortion(portion); return true; - } - auto itFutures = Futures.find(instant); - if (itFutures == Futures.end()) { + } else { return false; } - bool hasAbsent = false; - for (auto&& i : portions) { - if (!itFutures->second.erase(i->GetPortionId())) { - hasAbsent = true; - } else { - Counters->FuturePortions->RemovePortion(i); - } - } - if (itFutures->second.empty()) { - Futures.erase(itFutures); - } - return !hasAbsent; } bool AddFutures(const TInstant instant, const THashMap>& portions) { @@ -197,6 +175,29 @@ class TPortionsPool { } return true; } + + bool RemoveFutures(const TInstant instant, const std::vector>& portions) { + if (portions.empty()) { + return true; + } + auto itFutures = Futures.find(instant); + if (itFutures == Futures.end()) { + return false; + } + bool hasAbsent = false; + for (auto&& i : portions) { + if (!itFutures->second.erase(i->GetPortionId())) { + hasAbsent = true; + } else { + Counters->FuturePortions->RemovePortion(i); + } + } + if (itFutures->second.empty()) { + Futures.erase(itFutures); + } + return !hasAbsent; + } + public: bool Validate(const std::shared_ptr& portion) const { if (portion) { @@ -387,14 +388,17 @@ class TPortionsPool { } } - void Remove(const std::shared_ptr& portion) { + bool Remove(const std::shared_ptr& portion) Y_WARN_UNUSED_RESULT { if (RemovePreActual(portion)) { - return; + return true; } if (RemoveActual(portion)) { - return; + return true; + } + if (RemoveFuture(portion)) { + return true; } - AFL_VERIFY(RemoveFuture(portion)); + return false; } void MergeFrom(TPortionsPool& source) { @@ -778,7 +782,7 @@ class TPortionsBucket: public TMoveOnly { void RemoveOther(const std::shared_ptr& portion) { auto gChartsThis = StartModificationGuard(); - Others.Remove(portion); + AFL_VERIFY(Others.Remove(portion))("portion", portion->DebugString())("bucket_start", MainPortion ? MainPortion->DebugString(true) : "-inf")("bucket_finish", NextBorder ? NextBorder->DebugString() : "undef"); } void MergeOthersFrom(TPortionsBucket& dest) { @@ -1016,25 +1020,25 @@ class TOptimizerPlanner: public IOptimizerPlanner { TPortionBuckets Buckets; const std::shared_ptr StoragesManager; protected: - virtual void DoModifyPortions(const std::vector>& add, const std::vector>& remove) override { + virtual void DoModifyPortions(const THashMap>& add, const THashMap>& remove) override { const TInstant now = TInstant::Now(); - for (auto&& i : add) { + for (auto&& [_, i] : remove) { if (i->GetMeta().GetTierName() != IStoragesManager::DefaultStorageId && i->GetMeta().GetTierName() != "") { continue; } + Buckets.RemovePortion(i); if (Buckets.IsEmpty()) { - Counters->OptimizersCount->Add(1); + Counters->OptimizersCount->Sub(1); } - Buckets.AddPortion(i, now); } - for (auto&& i : remove) { + for (auto&& [_, i] : add) { if (i->GetMeta().GetTierName() != IStoragesManager::DefaultStorageId && i->GetMeta().GetTierName() != "") { continue; } - Buckets.RemovePortion(i); if (Buckets.IsEmpty()) { - Counters->OptimizersCount->Sub(1); + Counters->OptimizersCount->Add(1); } + Buckets.AddPortion(i, now); } } virtual std::shared_ptr DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr granule, const THashSet& busyPortions) const override { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/ya.make index 3f96a5717477..26c76d72d790 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( optimizer.cpp counters.cpp diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h index 46a9988a130d..c670cd67716b 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h @@ -472,26 +472,26 @@ class TLevelsOptimizerPlanner: public IOptimizerPlanner { return {}; } - virtual void DoModifyPortions(const std::vector>& add, const std::vector>& remove) override { + virtual void DoModifyPortions(const THashMap>& add, const THashMap>& remove) override { const TInstant currentInstant = TInstant::Now(); - for (auto&& i : add) { + for (auto&& [_, i] : remove) { if (i->GetMeta().GetTierName() != IStoragesManager::DefaultStorageId && i->GetMeta().GetTierName() != "") { continue; } if (!i->GetMeta().RecordSnapshotMax) { - LMax->AddPortion(i, currentInstant); + LMax->RemovePortion(i, currentInstant); } else { - LStart->AddPortion(i, currentInstant); + LStart->RemovePortion(i, currentInstant); } } - for (auto&& i : remove) { + for (auto&& [_, i] : add) { if (i->GetMeta().GetTierName() != IStoragesManager::DefaultStorageId && i->GetMeta().GetTierName() != "") { continue; } if (!i->GetMeta().RecordSnapshotMax) { - LMax->RemovePortion(i, currentInstant); + LMax->AddPortion(i, currentInstant); } else { - LStart->RemovePortion(i, currentInstant); + LStart->AddPortion(i, currentInstant); } } } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/ya.make index 3f96a5717477..26c76d72d790 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( optimizer.cpp counters.cpp diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ya.make index 9b7fcea27fe8..65adb6f4ca07 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ya.make @@ -1,5 +1,9 @@ UNITTEST_FOR(ydb/core/tx/columnshard/engines/storage/optimizer) +OWNER( + g:kikimr +) + SIZE(SMALL) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make index e1362859aaea..d678a1dcc8f7 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + PEERDIR( ydb/core/tx/columnshard/engines/storage/optimizer/abstract ydb/core/tx/columnshard/engines/storage/optimizer/intervals diff --git a/ydb/core/tx/columnshard/engines/storage/ya.make b/ydb/core/tx/columnshard/engines/storage/ya.make index 811707b20d2d..3b45c823bc22 100644 --- a/ydb/core/tx/columnshard/engines/storage/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( granule.cpp storage.cpp diff --git a/ydb/core/tx/columnshard/engines/ut/ya.make b/ydb/core/tx/columnshard/engines/ut/ya.make index 13fc351c5a56..ef608ef83827 100644 --- a/ydb/core/tx/columnshard/engines/ut/ya.make +++ b/ydb/core/tx/columnshard/engines/ut/ya.make @@ -1,5 +1,10 @@ UNITTEST_FOR(ydb/core/tx/columnshard/engines) +OWNER( + chertus + g:kikimr +) + FORK_SUBTESTS() SPLIT_FACTOR(60) diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/actor.cpp b/ydb/core/tx/columnshard/engines/writer/buffer/actor.cpp new file mode 100644 index 000000000000..a020e149cec4 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/writer/buffer/actor.cpp @@ -0,0 +1,46 @@ +#include "actor.h" +#include + +namespace NKikimr::NColumnShard::NWriting { + +TActor::TActor(ui64 tabletId, const TActorId& parent) + : TabletId(tabletId) + , ParentActorId(parent) +{ + +} + +void TActor::Flush() { + if (Aggregations.size()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "flush_writing")("size", SumSize)("count", Aggregations.size()); + auto action = Aggregations.front()->GetWriteData()->GetBlobsAction(); + auto writeController = std::make_shared(ParentActorId, action, std::move(Aggregations)); + if (action->NeedDraftTransaction()) { + TActorContext::AsActorContext().Send(ParentActorId, std::make_unique(writeController)); + } else { + TActorContext::AsActorContext().Register(NColumnShard::CreateWriteActor(TabletId, writeController, TInstant::Max())); + } + Aggregations.clear(); + SumSize = 0; + } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_flush_writing"); + } +} + +void TActor::Handle(TEvFlushBuffer::TPtr& /*ev*/) { + Flush(); + Schedule(FlushDuration, new TEvFlushBuffer); +} + +void TActor::Handle(TEvAddInsertedDataToBuffer::TPtr& ev) { + auto* evBase = ev->Get(); + AFL_VERIFY(evBase->GetWriteData()->GetBlobsAction()->GetStorageId() == NOlap::IStoragesManager::DefaultStorageId); + SumSize += evBase->GetWriteData()->GetSize(); + Aggregations.emplace_back(std::make_shared(evBase->GetWriteData(), std::move(evBase->MutableBlobsToWrite()))); + if (SumSize > 4 * 1024 * 1024 || Aggregations.size() > 750) { + Flush(); + } +} + + +} diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/actor.h b/ydb/core/tx/columnshard/engines/writer/buffer/actor.h new file mode 100644 index 000000000000..246f1726eefb --- /dev/null +++ b/ydb/core/tx/columnshard/engines/writer/buffer/actor.h @@ -0,0 +1,39 @@ +#pragma once +#include "events.h" +#include +#include + +namespace NKikimr::NColumnShard::NWriting { + +class TActor: public TActorBootstrapped { +private: + std::vector> Aggregations; + const ui64 TabletId; + NActors::TActorId ParentActorId; + const TDuration FlushDuration = TDuration::MilliSeconds(300); + ui64 SumSize = 0; + void Flush(); +public: + TActor(ui64 tabletId, const TActorId& parent); + ~TActor() = default; + + void Handle(TEvAddInsertedDataToBuffer::TPtr& ev); + void Handle(TEvFlushBuffer::TPtr& ev); + void Bootstrap() { + Become(&TThis::StateWait); + Schedule(FlushDuration, new TEvFlushBuffer); + } + + STFUNC(StateWait) { + TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent", ParentActorId)); + switch (ev->GetTypeRewrite()) { + cFunc(NActors::TEvents::TEvPoison::EventType, PassAway); + hFunc(TEvAddInsertedDataToBuffer, Handle); + hFunc(TEvFlushBuffer, Handle); + default: + AFL_VERIFY(false); + } + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/events.cpp b/ydb/core/tx/columnshard/engines/writer/buffer/events.cpp new file mode 100644 index 000000000000..f2dd6ca1c00d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/writer/buffer/events.cpp @@ -0,0 +1,5 @@ +#include "events.h" + +namespace NKikimr::NColumnShard::NWriting { + +} diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/events.h b/ydb/core/tx/columnshard/engines/writer/buffer/events.h new file mode 100644 index 000000000000..ee750ad69bcf --- /dev/null +++ b/ydb/core/tx/columnshard/engines/writer/buffer/events.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace NKikimr::NColumnShard::NWriting { + +class TEvAddInsertedDataToBuffer: public NActors::TEventLocal { +private: + YDB_READONLY_DEF(std::shared_ptr, WriteData); + YDB_ACCESSOR_DEF(std::vector, BlobsToWrite); +public: + + explicit TEvAddInsertedDataToBuffer(const std::shared_ptr& writeData, std::vector&& blobs) + : WriteData(writeData) + , BlobsToWrite(blobs) { + } + +}; + +class TEvFlushBuffer: public NActors::TEventLocal { +private: + static inline NActors::NTests::TGlobalScheduledEvents::TRegistrator TestScheduledEventRegistrator = (ui32)NColumnShard::TEvPrivate::EEv::EvWritingFlushBuffer; +public: +}; + + +} diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/ya.make b/ydb/core/tx/columnshard/engines/writer/buffer/ya.make new file mode 100644 index 000000000000..78a3ee199907 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/writer/buffer/ya.make @@ -0,0 +1,20 @@ +LIBRARY() + +OWNER( + g:kikimr +) + +SRCS( + actor.cpp + events.cpp +) + +PEERDIR( + ydb/library/actors/core + ydb/core/protos + ydb/core/tablet_flat + ydb/library/yql/core/expr_nodes + ydb/library/actors/testlib/common +) + +END() diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp index a02d99f18ba0..318e4889fe07 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp @@ -2,37 +2,47 @@ #include #include +#include namespace NKikimr::NOlap { -TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData, const std::shared_ptr& action, std::vector&& blobsSplitted) - : BlobsSplitted(std::move(blobsSplitted)) - , WriteData(writeData) +TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const std::shared_ptr& action, std::vector>&& aggregations) + : Buffer(action, std::move(aggregations)) , DstActor(dstActor) - , Action(action) { - for (auto&& bInfo : BlobsSplitted) { - auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(bInfo.GetData(), Action)); - BlobData.emplace_back(TBlobRange::FromBlobId(task.GetBlobId()), bInfo.GetSpecialKeysSafe(), bInfo.GetRowsCount(), bInfo.GetRawBytes(), AppData()->TimeProvider->Now()); + auto blobs = Buffer.GroupIntoBlobs(); + for (auto&& b : blobs) { + auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.GetBlobData(), action)); + b.InitBlobId(task.GetBlobId()); } - ResourceUsage.SourceMemorySize = WriteData.GetSize(); } void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) { - WriteData.MutableWriteMeta().SetWriteMiddle4StartInstant(TMonotonic::Now()); - if (putResult->GetPutStatus() == NKikimrProto::OK) { - std::vector> actions = {Action}; - auto result = std::make_unique(putResult, std::move(BlobData), actions, WriteData.GetWriteMeta(), WriteData.GetData().GetSchemaVersion()); - ctx.Send(DstActor, result.release()); - } else { - auto result = std::make_unique(putResult, WriteData.GetWriteMeta()); - ctx.Send(DstActor, result.release()); - } + Buffer.InitReadyInstant(TMonotonic::Now()); + auto result = std::make_unique(putResult, std::move(Buffer)); + ctx.Send(DstActor, result.release()); } void TIndexedWriteController::DoOnStartSending() { - WriteData.MutableWriteMeta().SetWriteMiddle5StartInstant(TMonotonic::Now()); + Buffer.InitStartSending(TMonotonic::Now()); +} + +void TWideSerializedBatch::InitBlobId(const TUnifiedBlobId& id) { + AFL_VERIFY(!Range.BlobId.GetTabletId()); + Range.BlobId = id; +} + +void TWritingBuffer::InitReadyInstant(const TMonotonic instant) { + for (auto&& aggr : Aggregations) { + aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle4StartInstant(instant); + } +} + +void TWritingBuffer::InitStartSending(const TMonotonic instant) { + for (auto&& aggr : Aggregations) { + aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle5StartInstant(instant); + } } } diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h index 87a165c29d4b..7f977a171597 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h @@ -8,24 +8,187 @@ #include #include #include -#include #include namespace NKikimr::NOlap { +class TWriteAggregation; + +class TWideSerializedBatch { +private: + NArrow::TSerializedBatch SplittedBlobs; + YDB_ACCESSOR_DEF(TBlobRange, Range); + YDB_READONLY(TInstant, StartInstant, AppDataVerified().TimeProvider->Now()); + TWriteAggregation* ParentAggregation; +public: + void InitBlobId(const TUnifiedBlobId& id); + + const NArrow::TSerializedBatch& GetSplittedBlobs() const { + return SplittedBlobs; + } + + const NArrow::TSerializedBatch* operator->() const { + return &SplittedBlobs; + } + + TWriteAggregation& MutableAggregation() { + return *ParentAggregation; + } + + const TWriteAggregation& GetAggregation() const { + return *ParentAggregation; + } + + TWideSerializedBatch(NArrow::TSerializedBatch&& splitted, TWriteAggregation& parentAggregation) + : SplittedBlobs(std::move(splitted)) + , ParentAggregation(&parentAggregation) + { + + } +}; + +class TWritingBlob { +private: + std::vector Ranges; + YDB_READONLY_DEF(TString, BlobData); +public: + TWritingBlob() = default; + bool AddData(TWideSerializedBatch& batch) { + if (BlobData.size() + batch.GetSplittedBlobs().GetSize() < 8 * 1024 * 1024) { + Ranges.emplace_back(&batch); + batch.SetRange(TBlobRange(TUnifiedBlobId(0, 0, 0, 0, BlobData.size() + batch.GetSplittedBlobs().GetSize()), BlobData.size(), batch.GetSplittedBlobs().GetSize())); + BlobData += batch.GetSplittedBlobs().GetData(); + return true; + } else { + AFL_VERIFY(BlobData.size()); + return false; + } + } + + void InitBlobId(const TUnifiedBlobId& blobId) { + for (auto&& r : Ranges) { + r->InitBlobId(blobId); + } + } + + ui64 GetSize() const { + return BlobData.size(); + } +}; + +class TWriteAggregation { +private: + YDB_READONLY_DEF(std::shared_ptr, WriteData); + YDB_ACCESSOR_DEF(std::vector, SplittedBlobs); + YDB_READONLY_DEF(TVector, WriteIds); +public: + void AddWriteId(const TWriteId& id) { + WriteIds.emplace_back(id); + } + + TWriteAggregation(const std::shared_ptr& writeData, std::vector&& splittedBlobs) + : WriteData(writeData) { + for (auto&& s : splittedBlobs) { + SplittedBlobs.emplace_back(std::move(s), *this); + } + } + + TWriteAggregation(const std::shared_ptr& writeData) + : WriteData(writeData) { + } +}; + +class TWritingBuffer: public TMoveOnly { +private: + std::shared_ptr BlobsAction; + std::shared_ptr DeclareRemoveAction; + YDB_READONLY_DEF(std::vector>, Aggregations); + YDB_READONLY(ui64, SumSize, 0); +public: + TWritingBuffer() = default; + TWritingBuffer(const std::shared_ptr& action, std::vector>&& aggregations) + : BlobsAction(action) + , Aggregations(std::move(aggregations)) + { + AFL_VERIFY(BlobsAction); + for (auto&& aggr : Aggregations) { + SumSize += aggr->GetWriteData()->GetSize(); + } + } + + bool IsEmpty() const { + return Aggregations.empty(); + } + + void RemoveData(const std::shared_ptr& data, const std::shared_ptr& bOperator) { + THashMap linksCount; + for (auto&& a : Aggregations) { + for (auto&& s : a->GetSplittedBlobs()) { + ++linksCount[s.GetRange().BlobId]; + } + } + + for (ui32 i = 0; i < Aggregations.size(); ++i) { + if (Aggregations[i].get() == data.get()) { + for (auto&& s : Aggregations[i]->GetSplittedBlobs()) { + if (--linksCount[s.GetRange().BlobId] == 0) { + if (!DeclareRemoveAction) { + DeclareRemoveAction = bOperator->StartDeclareRemovingAction("WRITING_BUFFER"); + } + DeclareRemoveAction->DeclareRemove(s.GetRange().BlobId); + } + } + Aggregations.erase(Aggregations.begin() + i); + return; + } + } + AFL_VERIFY(false); + } + + std::vector> GetAddActions() const { + return {BlobsAction}; + } + + std::vector> GetRemoveActions() const { + if (DeclareRemoveAction) { + return {DeclareRemoveAction}; + } else { + return {}; + } + } + + void InitReadyInstant(const TMonotonic instant); + void InitStartSending(const TMonotonic instant); + + std::vector GroupIntoBlobs() { + std::vector result; + TWritingBlob currentBlob; + for (auto&& aggr : Aggregations) { + for (auto&& bInfo : aggr->MutableSplittedBlobs()) { + if (!currentBlob.AddData(bInfo)) { + result.emplace_back(std::move(currentBlob)); + currentBlob = TWritingBlob(); + AFL_VERIFY(currentBlob.AddData(bInfo)); + } + } + } + if (currentBlob.GetSize()) { + result.emplace_back(std::move(currentBlob)); + } + return result; + } +}; + class TIndexedWriteController : public NColumnShard::IWriteController, public NColumnShard::TMonitoringObjectsCounter { private: - std::vector BlobsSplitted; - NEvWrite::TWriteData WriteData; - TVector BlobData; + TWritingBuffer Buffer; TActorId DstActor; - std::shared_ptr Action; void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override; virtual void DoOnStartSending() override; public: - TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData, const std::shared_ptr& action, std::vector&& blobsSplitted); + TIndexedWriteController(const TActorId& dstActor, const std::shared_ptr& action, std::vector>&& aggregations); }; diff --git a/ydb/core/tx/columnshard/engines/writer/write_controller.h b/ydb/core/tx/columnshard/engines/writer/write_controller.h index 4566a9d2e9ee..554423c88863 100644 --- a/ydb/core/tx/columnshard/engines/writer/write_controller.h +++ b/ydb/core/tx/columnshard/engines/writer/write_controller.h @@ -11,15 +11,13 @@ namespace NKikimr::NColumnShard { -class TBlobPutResult : public NColumnShard::TPutStatus { +class TBlobPutResult: public NColumnShard::TPutStatus { public: using TPtr = std::shared_ptr; TBlobPutResult(NKikimrProto::EReplyStatus status, THashSet&& yellowMoveChannels, - THashSet&& yellowStopChannels, - const NColumnShard::TUsage& resourceUsage) - : ResourceUsage(resourceUsage) + THashSet&& yellowStopChannels) { SetPutStatus(status, std::move(yellowMoveChannels), std::move(yellowStopChannels)); } @@ -27,17 +25,6 @@ class TBlobPutResult : public NColumnShard::TPutStatus { TBlobPutResult(NKikimrProto::EReplyStatus status) { SetPutStatus(status); } - - void AddResources(const NColumnShard::TUsage& usage) { - ResourceUsage.Add(usage); - } - - TAutoPtr StartCpuGuard() { - return new TCpuGuard(ResourceUsage); - } - -private: - YDB_READONLY_DEF(NColumnShard::TUsage, ResourceUsage); }; class IWriteController { @@ -46,7 +33,6 @@ class IWriteController { THashMap> WritingActions; std::deque WriteTasks; protected: - TUsage ResourceUsage; virtual void DoOnReadyResult(const NActors::TActorContext& ctx, const TBlobPutResult::TPtr& putResult) = 0; virtual void DoOnBlobWriteResult(const TEvBlobStorage::TEvPutResult& /*result*/) { @@ -75,7 +61,6 @@ class IWriteController { } void OnReadyResult(const NActors::TActorContext& ctx, const TBlobPutResult::TPtr& putResult) { - putResult->AddResources(ResourceUsage); DoOnReadyResult(ctx, putResult); } diff --git a/ydb/core/tx/columnshard/engines/writer/ya.make b/ydb/core/tx/columnshard/engines/writer/ya.make index fac3935c8827..756815c0d5c1 100644 --- a/ydb/core/tx/columnshard/engines/writer/ya.make +++ b/ydb/core/tx/columnshard/engines/writer/ya.make @@ -1,5 +1,9 @@ LIBRARY() +OWNER( + g:kikimr +) + SRCS( compacted_blob_constructor.cpp indexed_blob_constructor.cpp @@ -14,6 +18,7 @@ PEERDIR( ydb/core/blobstorage/vdisk/protos ydb/core/tablet_flat ydb/core/formats/arrow + ydb/core/tx/columnshard/engines/writer/buffer ydb/library/actors/core diff --git a/ydb/core/tx/columnshard/engines/ya.make b/ydb/core/tx/columnshard/engines/ya.make index 4b058ca9bbca..5b046868bde2 100644 --- a/ydb/core/tx/columnshard/engines/ya.make +++ b/ydb/core/tx/columnshard/engines/ya.make @@ -4,6 +4,11 @@ RECURSE_FOR_TESTS( LIBRARY() +OWNER( + chertus + g:kikimr +) + SRCS( column_engine_logs.cpp column_engine.cpp