Skip to content

Commit

Permalink
Merge 527525e into c278ecf
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Dec 27, 2024
2 parents c278ecf + 527525e commit 4efbe25
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
6 changes: 3 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,15 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
auto event = std::make_unique<NKikimr::NKqp::TEvKqpBuffer::TEvCommit>();
event->ExecuterActorId = SelfId();
event->TxId = TxId;
Send(BufferActorId, event.release());
Send<ESendingType::Tail>(BufferActorId, event.release());
return;
} else if (Request.LocksOp == ELocksOp::Rollback) {
Become(&TKqpDataExecuter::FinalizeState);
LOG_D("Send Rollback to BufferActor=" << BufferActorId);

auto event = std::make_unique<NKikimr::NKqp::TEvKqpBuffer::TEvRollback>();
event->ExecuterActorId = SelfId();
Send(BufferActorId, event.release());
Send<ESendingType::Tail>(BufferActorId, event.release());
MakeResponseAndPassAway();
return;
} else if (Request.UseImmediateEffects) {
Expand All @@ -324,7 +324,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

auto event = std::make_unique<NKikimr::NKqp::TEvKqpBuffer::TEvFlush>();
event->ExecuterActorId = SelfId();
Send(BufferActorId, event.release());
Send<ESendingType::Tail>(BufferActorId, event.release());
return;
} else {
Become(&TKqpDataExecuter::FinalizeState);
Expand Down
20 changes: 12 additions & 8 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
if (GetTotalFreeSpace() >= item.DataSize) {
auto result = std::make_unique<TEvBufferWriteResult>();
result->Token = AckQueue.front().Token;
Send(AckQueue.front().ForwardActorId, result.release());
Send<ESendingType::Tail>(AckQueue.front().ForwardActorId, result.release());
AckQueue.pop();
} else {
YQL_ENSURE(false);
Expand Down Expand Up @@ -1990,9 +1990,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
}

if (!TxManager->NeedCommit()) {
Rollback();
State = EState::FINISHED;
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{});
RollbackAndDie();
} else if (TxManager->IsSingleShard() && !TxManager->HasOlapTable() && (!WriteInfos.empty() || TxManager->HasTopics())) {
TxManager->StartExecute();
ImmediateCommit();
Expand All @@ -2004,9 +2002,14 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub

void Handle(TEvKqpBuffer::TEvRollback::TPtr& ev) {
ExecuterActorId = ev->Get()->ExecuterActorId;
RollbackAndDie();
}

void RollbackAndDie() {
Rollback();
State = EState::FINISHED;
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{});
Send<ESendingType::Tail>(ExecuterActorId, new TEvKqpBuffer::TEvResult{});
PassAway();
}

void Handle(NKikimr::NEvents::TDataEvents::TEvWriteResult::TPtr& ev) {
Expand Down Expand Up @@ -2285,11 +2288,12 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
if (TxManager->ConsumeCommitResult(shardId)) {
CA_LOG_D("Committed");
State = EState::FINISHED;
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{
Send<ESendingType::Tail>(ExecuterActorId, new TEvKqpBuffer::TEvResult{
BuildStats()
});
ExecuterActorId = {};
Y_ABORT_UNLESS(GetTotalMemory() == 0);
PassAway();
return;
}
}
Expand All @@ -2304,7 +2308,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
"BufferWriteActorState::Writing", NWilson::EFlags::AUTO_END);
CA_LOG_D("Flushed");
State = EState::WRITING;
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{
Send<ESendingType::Tail>(ExecuterActorId, new TEvKqpBuffer::TEvResult{
BuildStats()
});
ExecuterActorId = {};
Expand Down Expand Up @@ -2507,7 +2511,7 @@ class TKqpForwardWriteActor : public TActorBootstrapped<TKqpForwardWriteActor>,
SendTime = TInstant::Now();

CA_LOG_D("Send data=" << DataSize << ", closed=" << Closed << ", bufferActorId=" << BufferActorId);
AFL_ENSURE(Send(BufferActorId, ev.release()));
AFL_ENSURE(Send<ESendingType::Tail>(BufferActorId, ev.release()));
}

void CommitState(const NYql::NDqProto::TCheckpoint&) final {};
Expand Down
17 changes: 11 additions & 6 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

void BeginTx(const Ydb::Table::TransactionSettings& settings) {
QueryState->TxId.SetValue(UlidGen.Next());
TerminateBufferActor();
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry,
AppData()->TimeProvider, AppData()->RandomProvider);

Expand Down Expand Up @@ -881,6 +882,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
break;
}
} else {
TerminateBufferActor();
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry,
AppData()->TimeProvider, AppData()->RandomProvider);
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
Expand Down Expand Up @@ -2207,12 +2209,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->TxCtx->ClearDeferredEffects();
QueryState->TxCtx->Locks.Clear();
QueryState->TxCtx->TxManager.reset();

if (QueryState->TxCtx->BufferActorId) {
Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{});
QueryState->TxCtx->BufferActorId = {};
}

TerminateBufferActor();
QueryState->TxCtx->Finish();
}
}
Expand All @@ -2230,6 +2227,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
Transactions.AddToBeAborted(txCtx);
Transactions.ReleaseTransaction(QueryState->TxId.GetValue());
}
TerminateBufferActor();
DiscardPersistentSnapshot(txCtx->SnapshotHandle);
}

Expand Down Expand Up @@ -2702,6 +2700,13 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->TxCtx->TopicOperations.SetWriteId(std::move(handle));
}

void TerminateBufferActor() {
if (QueryState && QueryState->TxCtx && QueryState->TxCtx->BufferActorId) {
Send(QueryState->TxCtx->BufferActorId, new TEvKqpBuffer::TEvTerminate{});
QueryState->TxCtx->BufferActorId = {};
}
}

private:
TActorId Owner;
TKqpQueryCachePtr QueryCache;
Expand Down

0 comments on commit 4efbe25

Please sign in to comment.