Skip to content

Commit

Permalink
correct insert table cleaning (#6379)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jul 8, 2024
1 parent 1d57390 commit 1f88b11
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ bool TTxInsertTableCleanup::Execute(TTransactionContext& txc, const TActorContex
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
NIceDb::TNiceDb db(txc.DB);

Self->TryAbortWrites(db, dbTable, std::move(WriteIdsToAbort));

NOlap::TBlobManagerDb blobManagerDb(txc.DB);
auto allAborted = Self->InsertTable->GetAborted();
auto storage = Self->StoragesManager->GetInsertOperator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
namespace NKikimr::NColumnShard {
class TTxInsertTableCleanup: public TTransactionBase<TColumnShard> {
private:
THashSet<TWriteId> WriteIdsToAbort;
std::shared_ptr<NOlap::IBlobsDeclareRemovingAction> BlobsAction;
public:
TTxInsertTableCleanup(TColumnShard* self)
: TBase(self) {
Y_ABORT_UNLESS(self->InsertTable->GetAborted().size());
TTxInsertTableCleanup(TColumnShard* self, THashSet<TWriteId>&& writeIdsToAbort)
: TBase(self)
, WriteIdsToAbort(std::move(writeIdsToAbort)) {
Y_ABORT_UNLESS(WriteIdsToAbort.size() || self->InsertTable->GetAborted().size());
}

~TTxInsertTableCleanup() {
Expand Down
57 changes: 40 additions & 17 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,18 +259,38 @@ void TColumnShard::LoadLongTxWrite(TWriteId writeId, const ui32 writePartId, con
}

bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, const TWriteId writeId, const ui64 txId) {
auto* lw = LongTxWrites.FindPtr(writeId);
AFL_VERIFY(lw)("write_id", (ui64)writeId)("tx_id", txId);
const ui64 prepared = lw->PreparedTxId;
AFL_VERIFY(!prepared || txId == prepared)("tx", txId)("prepared", prepared);
Schema::EraseLongTxWrite(db, writeId);
auto& ltxParts = LongTxWritesByUniqueId[lw->LongTxId.UniqueId];
ltxParts.erase(lw->WritePartId);
if (ltxParts.empty()) {
AFL_VERIFY(LongTxWritesByUniqueId.erase(lw->LongTxId.UniqueId));
}
LongTxWrites.erase(writeId);
return true;
if (auto* lw = LongTxWrites.FindPtr(writeId)) {
ui64 prepared = lw->PreparedTxId;
if (!prepared || txId == prepared) {
Schema::EraseLongTxWrite(db, writeId);
auto& ltxParts = LongTxWritesByUniqueId[lw->LongTxId.UniqueId];
ltxParts.erase(lw->WritePartId);
if (ltxParts.empty()) {
LongTxWritesByUniqueId.erase(lw->LongTxId.UniqueId);
}
LongTxWrites.erase(writeId);
return true;
}
}
return false;
}

void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort) {
std::vector<TWriteId> failedAborts;
for (auto& writeId : writesToAbort) {
if (!RemoveLongTxWrite(db, writeId, 0)) {
failedAborts.push_back(writeId);
}
}
if (failedAborts.size()) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "failed_aborts")("count", failedAborts.size())("writes_count", writesToAbort.size());
}
for (auto& writeId : failedAborts) {
writesToAbort.erase(writeId);
}
if (!writesToAbort.empty()) {
InsertTable->Abort(dbTable, writesToAbort);
}
}

void TColumnShard::UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc) {
Expand Down Expand Up @@ -455,7 +475,9 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt
// TODO: Allow to read old snapshots after DROP
TBlobGroupSelector dsGroupSelector(Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
InsertTable->DropPath(dbTable, pathId);
THashSet<TWriteId> writesToAbort = InsertTable->DropPath(dbTable, pathId);

TryAbortWrites(db, dbTable, std::move(writesToAbort));
}

void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const NOlap::TSnapshot& version,
Expand Down Expand Up @@ -827,21 +849,22 @@ void TColumnShard::Handle(TEvPrivate::TEvGarbageCollectionFinished::TPtr& ev, co
}

void TColumnShard::SetupCleanupInsertTable() {
auto writeIdsToCleanup = InsertTable->OldWritesToAbort(AppData()->TimeProvider->Now());

if (BackgroundController.IsCleanupInsertTableActive()) {
ACFL_DEBUG("background", "cleanup_insert_table")("skip_reason", "in_progress");
return;
}

if (!InsertTable->GetAborted().size()) {
if (!InsertTable->GetAborted().size() && !writeIdsToCleanup.size()) {
return;
}
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size());
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size())("to_cleanup", writeIdsToCleanup.size());
BackgroundController.StartCleanupInsertTable();
Execute(new TTxInsertTableCleanup(this), TActorContext::AsActorContext());
Execute(new TTxInsertTableCleanup(this, std::move(writeIdsToCleanup)), TActorContext::AsActorContext());
}

void TColumnShard::Die(const TActorContext& ctx) {
// TODO
CleanupActors(ctx);
NTabletPipe::CloseAndForgetClient(SelfId(), StatsReportPipe);
UnregisterMediatorTimeCast();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ class TColumnShard

void TryRegisterMediatorTimeCast();
void UnregisterMediatorTimeCast();
void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort);

bool WaitPlanStep(ui64 step);
void SendWaitPlanStep(ui64 step);
Expand Down

0 comments on commit 1f88b11

Please sign in to comment.