From 77324dfa2d30d5b47e7d905d796df8ba388e7df9 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Wed, 25 Dec 2024 18:18:05 +0300 Subject: [PATCH 1/5] fix --- ydb/core/kqp/session_actor/kqp_session_actor.cpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 5a1b29a1ac2e..a14a9418cf92 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -795,6 +795,12 @@ class TKqpSessionActor : public TActorBootstrapped { void BeginTx(const Ydb::Table::TransactionSettings& settings) { QueryState->TxId.SetValue(UlidGen.Next()); + if (QueryState->TxCtx) { + if (QueryState->TxCtx->BufferActorId) { + Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); + QueryState->TxCtx->BufferActorId = {}; + } + } QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider); @@ -881,6 +887,12 @@ class TKqpSessionActor : public TActorBootstrapped { break; } } else { + if (QueryState->TxCtx) { + if (QueryState->TxCtx->BufferActorId) { + Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); + QueryState->TxCtx->BufferActorId = {}; + } + } QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider); QueryState->QueryData = std::make_shared(QueryState->TxCtx->TxAlloc); @@ -2226,6 +2238,10 @@ class TKqpSessionActor : public TActorBootstrapped { Transactions.AddToBeAborted(txCtx); Transactions.ReleaseTransaction(QueryState->TxId.GetValue()); } + if (txCtx->BufferActorId) { + Send(txCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); + txCtx->BufferActorId = {}; + } DiscardPersistentSnapshot(txCtx->SnapshotHandle); } From 5b8b8a50903393fc49da1e7c0b7d2bdbbc95a251 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Wed, 25 Dec 2024 22:34:29 +0300 Subject: [PATCH 2/5] fix --- .../kqp/session_actor/kqp_session_actor.cpp | 33 +++++++------------ 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index a14a9418cf92..6404e5484648 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -795,12 +795,7 @@ class TKqpSessionActor : public TActorBootstrapped { void BeginTx(const Ydb::Table::TransactionSettings& settings) { QueryState->TxId.SetValue(UlidGen.Next()); - if (QueryState->TxCtx) { - if (QueryState->TxCtx->BufferActorId) { - Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); - QueryState->TxCtx->BufferActorId = {}; - } - } + TerminateBufferActor() QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider); @@ -887,12 +882,7 @@ class TKqpSessionActor : public TActorBootstrapped { break; } } else { - if (QueryState->TxCtx) { - if (QueryState->TxCtx->BufferActorId) { - Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); - QueryState->TxCtx->BufferActorId = {}; - } - } + TerminateBufferActor() QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider); QueryState->QueryData = std::make_shared(QueryState->TxCtx->TxAlloc); @@ -2215,12 +2205,7 @@ class TKqpSessionActor : public TActorBootstrapped { QueryState->TxCtx->ClearDeferredEffects(); QueryState->TxCtx->Locks.Clear(); QueryState->TxCtx->TxManager.reset(); - - if (QueryState->TxCtx->BufferActorId) { - Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); - QueryState->TxCtx->BufferActorId = {}; - } - + TerminateBufferActor() QueryState->TxCtx->Finish(); } } @@ -2238,10 +2223,7 @@ class TKqpSessionActor : public TActorBootstrapped { Transactions.AddToBeAborted(txCtx); Transactions.ReleaseTransaction(QueryState->TxId.GetValue()); } - if (txCtx->BufferActorId) { - Send(txCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); - txCtx->BufferActorId = {}; - } + TerminateBufferActor(); DiscardPersistentSnapshot(txCtx->SnapshotHandle); } @@ -2714,6 +2696,13 @@ class TKqpSessionActor : public TActorBootstrapped { QueryState->TxCtx->TopicOperations.SetWriteId(std::move(handle)); } + void TerminateBufferActor() { + if (QueryState && QueryState->TxCtx && QueryState->TxCtx->BufferActorId) { + Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); + QueryState->TxCtx->BufferActorId = {}; + } + } + private: TActorId Owner; TKqpQueryCachePtr QueryCache; From 3ae5370f8ac96136a8f7a3ab3b0b74a293d6810b Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Thu, 26 Dec 2024 12:14:37 +0300 Subject: [PATCH 3/5] fix --- ydb/core/kqp/session_actor/kqp_session_actor.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 6404e5484648..526ace4de237 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -795,7 +795,7 @@ class TKqpSessionActor : public TActorBootstrapped { void BeginTx(const Ydb::Table::TransactionSettings& settings) { QueryState->TxId.SetValue(UlidGen.Next()); - TerminateBufferActor() + TerminateBufferActor(); QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider); @@ -882,7 +882,7 @@ class TKqpSessionActor : public TActorBootstrapped { break; } } else { - TerminateBufferActor() + TerminateBufferActor(); QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider); QueryState->QueryData = std::make_shared(QueryState->TxCtx->TxAlloc); @@ -2205,7 +2205,7 @@ class TKqpSessionActor : public TActorBootstrapped { QueryState->TxCtx->ClearDeferredEffects(); QueryState->TxCtx->Locks.Clear(); QueryState->TxCtx->TxManager.reset(); - TerminateBufferActor() + TerminateBufferActor(); QueryState->TxCtx->Finish(); } } From a131f66bf52bc4cadcd5caba72ac31de468fed1f Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Thu, 26 Dec 2024 13:14:43 +0300 Subject: [PATCH 4/5] fix --- ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 6 +++--- ydb/core/kqp/runtime/kqp_write_actor.cpp | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 30854284d07b..5b48f609e877 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -307,7 +307,7 @@ class TKqpDataExecuter : public TKqpExecuterBase(); event->ExecuterActorId = SelfId(); event->TxId = TxId; - Send(BufferActorId, event.release()); + Send(BufferActorId, event.release()); return; } else if (Request.LocksOp == ELocksOp::Rollback) { Become(&TKqpDataExecuter::FinalizeState); @@ -315,7 +315,7 @@ class TKqpDataExecuter : public TKqpExecuterBase(); event->ExecuterActorId = SelfId(); - Send(BufferActorId, event.release()); + Send(BufferActorId, event.release()); MakeResponseAndPassAway(); return; } else if (Request.UseImmediateEffects) { @@ -324,7 +324,7 @@ class TKqpDataExecuter : public TKqpExecuterBase(); event->ExecuterActorId = SelfId(); - Send(BufferActorId, event.release()); + Send(BufferActorId, event.release()); return; } else { Become(&TKqpDataExecuter::FinalizeState); diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index a5244a8550e2..b125bf431757 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -1572,7 +1572,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub if (GetTotalFreeSpace() >= item.DataSize) { auto result = std::make_unique(); result->Token = AckQueue.front().Token; - Send(AckQueue.front().ForwardActorId, result.release()); + Send(AckQueue.front().ForwardActorId, result.release()); AckQueue.pop(); } else { YQL_ENSURE(false); @@ -1989,7 +1989,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub if (!TxManager->NeedCommit()) { Rollback(); State = EState::FINISHED; - Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{}); + Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{}); } else if (TxManager->IsSingleShard() && !TxManager->HasOlapTable() && (!WriteInfos.empty() || TxManager->HasTopics())) { TxManager->StartExecute(); ImmediateCommit(); @@ -2003,7 +2003,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub ExecuterActorId = ev->Get()->ExecuterActorId; Rollback(); State = EState::FINISHED; - Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{}); + Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{}); } void Handle(NKikimr::NEvents::TDataEvents::TEvWriteResult::TPtr& ev) { @@ -2282,7 +2282,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub if (TxManager->ConsumeCommitResult(shardId)) { CA_LOG_D("Committed"); State = EState::FINISHED; - Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{ + Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{ BuildStats() }); ExecuterActorId = {}; @@ -2301,7 +2301,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub "BufferWriteActorState::Writing", NWilson::EFlags::AUTO_END); CA_LOG_D("Flushed"); State = EState::WRITING; - Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{ + Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{ BuildStats() }); ExecuterActorId = {}; @@ -2504,7 +2504,7 @@ class TKqpForwardWriteActor : public TActorBootstrapped, SendTime = TInstant::Now(); CA_LOG_D("Send data=" << DataSize << ", closed=" << Closed << ", bufferActorId=" << BufferActorId); - AFL_ENSURE(Send(BufferActorId, ev.release())); + AFL_ENSURE(Send(BufferActorId, ev.release())); } void CommitState(const NYql::NDqProto::TCheckpoint&) final {}; From 527525ecf1112e3f4ee3a804f57cc6b09a0b12bb Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Fri, 27 Dec 2024 14:28:01 +0300 Subject: [PATCH 5/5] rollback-buffer --- ydb/core/kqp/runtime/kqp_write_actor.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index b125bf431757..3ef60a971959 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -1987,9 +1987,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub } if (!TxManager->NeedCommit()) { - Rollback(); - State = EState::FINISHED; - Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{}); + RollbackAndDie(); } else if (TxManager->IsSingleShard() && !TxManager->HasOlapTable() && (!WriteInfos.empty() || TxManager->HasTopics())) { TxManager->StartExecute(); ImmediateCommit(); @@ -2001,9 +1999,14 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub void Handle(TEvKqpBuffer::TEvRollback::TPtr& ev) { ExecuterActorId = ev->Get()->ExecuterActorId; + RollbackAndDie(); + } + + void RollbackAndDie() { Rollback(); State = EState::FINISHED; Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{}); + PassAway(); } void Handle(NKikimr::NEvents::TDataEvents::TEvWriteResult::TPtr& ev) { @@ -2287,6 +2290,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub }); ExecuterActorId = {}; Y_ABORT_UNLESS(GetTotalMemory() == 0); + PassAway(); return; } }