Skip to content

Commit

Permalink
Merge 5b8b8a5 into c895fcc
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Dec 25, 2024
2 parents c895fcc + 5b8b8a5 commit b4fe9c1
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

void BeginTx(const Ydb::Table::TransactionSettings& settings) {
QueryState->TxId.SetValue(UlidGen.Next());
TerminateBufferActor()
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry,
AppData()->TimeProvider, AppData()->RandomProvider);

Expand Down Expand Up @@ -881,6 +882,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
break;
}
} else {
TerminateBufferActor()
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry,
AppData()->TimeProvider, AppData()->RandomProvider);
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
Expand Down Expand Up @@ -2207,12 +2209,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->TxCtx->ClearDeferredEffects();
QueryState->TxCtx->Locks.Clear();
QueryState->TxCtx->TxManager.reset();

if (QueryState->TxCtx->BufferActorId) {
Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{});
QueryState->TxCtx->BufferActorId = {};
}

TerminateBufferActor()
QueryState->TxCtx->Finish();
}
}
Expand All @@ -2230,6 +2227,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
Transactions.AddToBeAborted(txCtx);
Transactions.ReleaseTransaction(QueryState->TxId.GetValue());
}
TerminateBufferActor();
DiscardPersistentSnapshot(txCtx->SnapshotHandle);
}

Expand Down Expand Up @@ -2702,6 +2700,13 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->TxCtx->TopicOperations.SetWriteId(std::move(handle));
}

void TerminateBufferActor() {
if (QueryState && QueryState->TxCtx && QueryState->TxCtx->BufferActorId) {
Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{});
QueryState->TxCtx->BufferActorId = {};
}
}

private:
TActorId Owner;
TKqpQueryCachePtr QueryCache;
Expand Down

0 comments on commit b4fe9c1

Please sign in to comment.