From 77324dfa2d30d5b47e7d905d796df8ba388e7df9 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Wed, 25 Dec 2024 18:18:05 +0300 Subject: [PATCH 1/2] fix --- ydb/core/kqp/session_actor/kqp_session_actor.cpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 5a1b29a1ac2e..a14a9418cf92 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -795,6 +795,12 @@ class TKqpSessionActor : public TActorBootstrapped { void BeginTx(const Ydb::Table::TransactionSettings& settings) { QueryState->TxId.SetValue(UlidGen.Next()); + if (QueryState->TxCtx) { + if (QueryState->TxCtx->BufferActorId) { + Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); + QueryState->TxCtx->BufferActorId = {}; + } + } QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider); @@ -881,6 +887,12 @@ class TKqpSessionActor : public TActorBootstrapped { break; } } else { + if (QueryState->TxCtx) { + if (QueryState->TxCtx->BufferActorId) { + Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); + QueryState->TxCtx->BufferActorId = {}; + } + } QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider); QueryState->QueryData = std::make_shared(QueryState->TxCtx->TxAlloc); @@ -2226,6 +2238,10 @@ class TKqpSessionActor : public TActorBootstrapped { Transactions.AddToBeAborted(txCtx); Transactions.ReleaseTransaction(QueryState->TxId.GetValue()); } + if (txCtx->BufferActorId) { + Send(txCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); + txCtx->BufferActorId = {}; + } DiscardPersistentSnapshot(txCtx->SnapshotHandle); } From 5b8b8a50903393fc49da1e7c0b7d2bdbbc95a251 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Wed, 25 Dec 2024 22:34:29 +0300 Subject: [PATCH 2/2] fix --- .../kqp/session_actor/kqp_session_actor.cpp | 33 +++++++------------ 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index a14a9418cf92..6404e5484648 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -795,12 +795,7 @@ class TKqpSessionActor : public TActorBootstrapped { void BeginTx(const Ydb::Table::TransactionSettings& settings) { QueryState->TxId.SetValue(UlidGen.Next()); - if (QueryState->TxCtx) { - if (QueryState->TxCtx->BufferActorId) { - Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); - QueryState->TxCtx->BufferActorId = {}; - } - } + TerminateBufferActor() QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider); @@ -887,12 +882,7 @@ class TKqpSessionActor : public TActorBootstrapped { break; } } else { - if (QueryState->TxCtx) { - if (QueryState->TxCtx->BufferActorId) { - Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); - QueryState->TxCtx->BufferActorId = {}; - } - } + TerminateBufferActor() QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider); QueryState->QueryData = std::make_shared(QueryState->TxCtx->TxAlloc); @@ -2215,12 +2205,7 @@ class TKqpSessionActor : public TActorBootstrapped { QueryState->TxCtx->ClearDeferredEffects(); QueryState->TxCtx->Locks.Clear(); QueryState->TxCtx->TxManager.reset(); - - if (QueryState->TxCtx->BufferActorId) { - Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); - QueryState->TxCtx->BufferActorId = {}; - } - + TerminateBufferActor() QueryState->TxCtx->Finish(); } } @@ -2238,10 +2223,7 @@ class TKqpSessionActor : public TActorBootstrapped { Transactions.AddToBeAborted(txCtx); Transactions.ReleaseTransaction(QueryState->TxId.GetValue()); } - if (txCtx->BufferActorId) { - Send(txCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); - txCtx->BufferActorId = {}; - } + TerminateBufferActor(); DiscardPersistentSnapshot(txCtx->SnapshotHandle); } @@ -2714,6 +2696,13 @@ class TKqpSessionActor : public TActorBootstrapped { QueryState->TxCtx->TopicOperations.SetWriteId(std::move(handle)); } + void TerminateBufferActor() { + if (QueryState && QueryState->TxCtx && QueryState->TxCtx->BufferActorId) { + Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{}); + QueryState->TxCtx->BufferActorId = {}; + } + } + private: TActorId Owner; TKqpQueryCachePtr QueryCache;