From 1f88b11c2cb56fe35a57e2884f771cca6d4b1f3d Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Mon, 8 Jul 2024 10:40:10 +0300 Subject: [PATCH] correct insert table cleaning (#6379) --- .../transaction/tx_gc_insert_table.cpp | 2 + .../transaction/tx_gc_insert_table.h | 8 ++- ydb/core/tx/columnshard/columnshard_impl.cpp | 57 +++++++++++++------ ydb/core/tx/columnshard/columnshard_impl.h | 1 + 4 files changed, 48 insertions(+), 20 deletions(-) diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp index bda89c9c9daf..15a05e7108a7 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp @@ -9,6 +9,8 @@ bool TTxInsertTableCleanup::Execute(TTransactionContext& txc, const TActorContex NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); NIceDb::TNiceDb db(txc.DB); + Self->TryAbortWrites(db, dbTable, std::move(WriteIdsToAbort)); + NOlap::TBlobManagerDb blobManagerDb(txc.DB); auto allAborted = Self->InsertTable->GetAborted(); auto storage = Self->StoragesManager->GetInsertOperator(); diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.h b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.h index 6996333a0bd3..5ca66fe90a34 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.h +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.h @@ -5,11 +5,13 @@ namespace NKikimr::NColumnShard { class TTxInsertTableCleanup: public TTransactionBase { private: + THashSet WriteIdsToAbort; std::shared_ptr BlobsAction; public: - TTxInsertTableCleanup(TColumnShard* self) - : TBase(self) { - Y_ABORT_UNLESS(self->InsertTable->GetAborted().size()); + TTxInsertTableCleanup(TColumnShard* self, THashSet&& writeIdsToAbort) + : TBase(self) + , WriteIdsToAbort(std::move(writeIdsToAbort)) { + Y_ABORT_UNLESS(WriteIdsToAbort.size() || self->InsertTable->GetAborted().size()); } ~TTxInsertTableCleanup() { diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 79deb83616a9..ca68ab4c87cf 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -259,18 +259,38 @@ void TColumnShard::LoadLongTxWrite(TWriteId writeId, const ui32 writePartId, con } bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, const TWriteId writeId, const ui64 txId) { - auto* lw = LongTxWrites.FindPtr(writeId); - AFL_VERIFY(lw)("write_id", (ui64)writeId)("tx_id", txId); - const ui64 prepared = lw->PreparedTxId; - AFL_VERIFY(!prepared || txId == prepared)("tx", txId)("prepared", prepared); - Schema::EraseLongTxWrite(db, writeId); - auto& ltxParts = LongTxWritesByUniqueId[lw->LongTxId.UniqueId]; - ltxParts.erase(lw->WritePartId); - if (ltxParts.empty()) { - AFL_VERIFY(LongTxWritesByUniqueId.erase(lw->LongTxId.UniqueId)); - } - LongTxWrites.erase(writeId); - return true; + if (auto* lw = LongTxWrites.FindPtr(writeId)) { + ui64 prepared = lw->PreparedTxId; + if (!prepared || txId == prepared) { + Schema::EraseLongTxWrite(db, writeId); + auto& ltxParts = LongTxWritesByUniqueId[lw->LongTxId.UniqueId]; + ltxParts.erase(lw->WritePartId); + if (ltxParts.empty()) { + LongTxWritesByUniqueId.erase(lw->LongTxId.UniqueId); + } + LongTxWrites.erase(writeId); + return true; + } + } + return false; +} + +void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet&& writesToAbort) { + std::vector failedAborts; + for (auto& writeId : writesToAbort) { + if (!RemoveLongTxWrite(db, writeId, 0)) { + failedAborts.push_back(writeId); + } + } + if (failedAborts.size()) { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "failed_aborts")("count", failedAborts.size())("writes_count", writesToAbort.size()); + } + for (auto& writeId : failedAborts) { + writesToAbort.erase(writeId); + } + if (!writesToAbort.empty()) { + InsertTable->Abort(dbTable, writesToAbort); + } } void TColumnShard::UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc) { @@ -455,7 +475,9 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt // TODO: Allow to read old snapshots after DROP TBlobGroupSelector dsGroupSelector(Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - InsertTable->DropPath(dbTable, pathId); + THashSet writesToAbort = InsertTable->DropPath(dbTable, pathId); + + TryAbortWrites(db, dbTable, std::move(writesToAbort)); } void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const NOlap::TSnapshot& version, @@ -827,21 +849,22 @@ void TColumnShard::Handle(TEvPrivate::TEvGarbageCollectionFinished::TPtr& ev, co } void TColumnShard::SetupCleanupInsertTable() { + auto writeIdsToCleanup = InsertTable->OldWritesToAbort(AppData()->TimeProvider->Now()); + if (BackgroundController.IsCleanupInsertTableActive()) { ACFL_DEBUG("background", "cleanup_insert_table")("skip_reason", "in_progress"); return; } - if (!InsertTable->GetAborted().size()) { + if (!InsertTable->GetAborted().size() && !writeIdsToCleanup.size()) { return; } - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size()); + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size())("to_cleanup", writeIdsToCleanup.size()); BackgroundController.StartCleanupInsertTable(); - Execute(new TTxInsertTableCleanup(this), TActorContext::AsActorContext()); + Execute(new TTxInsertTableCleanup(this, std::move(writeIdsToCleanup)), TActorContext::AsActorContext()); } void TColumnShard::Die(const TActorContext& ctx) { - // TODO CleanupActors(ctx); NTabletPipe::CloseAndForgetClient(SelfId(), StatsReportPipe); UnregisterMediatorTimeCast(); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index ea6ca9b3164c..2fefad7e9e1a 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -535,6 +535,7 @@ class TColumnShard void TryRegisterMediatorTimeCast(); void UnregisterMediatorTimeCast(); + void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet&& writesToAbort); bool WaitPlanStep(ui64 step); void SendWaitPlanStep(ui64 step);