From 527525ecf1112e3f4ee3a804f57cc6b09a0b12bb Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Fri, 27 Dec 2024 14:28:01 +0300 Subject: [PATCH] rollback-buffer --- ydb/core/kqp/runtime/kqp_write_actor.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index b125bf431757..3ef60a971959 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -1987,9 +1987,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub } if (!TxManager->NeedCommit()) { - Rollback(); - State = EState::FINISHED; - Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{}); + RollbackAndDie(); } else if (TxManager->IsSingleShard() && !TxManager->HasOlapTable() && (!WriteInfos.empty() || TxManager->HasTopics())) { TxManager->StartExecute(); ImmediateCommit(); @@ -2001,9 +1999,14 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub void Handle(TEvKqpBuffer::TEvRollback::TPtr& ev) { ExecuterActorId = ev->Get()->ExecuterActorId; + RollbackAndDie(); + } + + void RollbackAndDie() { Rollback(); State = EState::FINISHED; Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{}); + PassAway(); } void Handle(NKikimr::NEvents::TDataEvents::TEvWriteResult::TPtr& ev) { @@ -2287,6 +2290,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub }); ExecuterActorId = {}; Y_ABORT_UNLESS(GetTotalMemory() == 0); + PassAway(); return; } }