Skip to content

Commit

Permalink
Merge 9e2333f into 68c8f3f
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 9, 2024
2 parents 68c8f3f + 9e2333f commit 3b789dc
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
8 changes: 7 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,13 @@ void TColumnShard::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorCon
}

void TColumnShard::Handle(TEvTxProcessing::TEvReadSetAck::TPtr& ev, const TActorContext& ctx) {
auto op = GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitSyncTransactionOperator>(ev->Get()->Record.GetTxId());
auto opPtr = GetProgressTxController().GetTxOperatorOptional(ev->Get()->Record.GetTxId());
if (!opPtr) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "missed_read_set_ack")("proto", ev->Get()->Record.DebugString())(
"tx_id", ev->Get()->Record.GetTxId());
continue;
}
auto op = TValidator::CheckNotNull(dynamic_pointer_cast<TEvWriteCommitSyncTransactionOperator>(opPtr));
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "read_set_ack")("proto", ev->Get()->Record.DebugString())("lock_id", op->GetLockId());
auto tx = op->CreateReceiveResultAckTx(*this, ev->Get()->Record.GetTabletConsumer());
Execute(tx.release(), ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
if (tabletId && *tabletId != i) {
continue;
}
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(false),
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
new TEvPipeCache::TEvForward(
new TEvTxProcessing::TEvReadSetAck(0, GetTxId(), owner.TabletID(), i, owner.TabletID(), 0), i, true),
IEventHandle::FlagTrackDelivery, GetTxId());
Expand All @@ -202,7 +202,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
readSetData.SetDecision(*TxBroken ? NKikimrTx::TReadSetData::DECISION_ABORT : NKikimrTx::TReadSetData::DECISION_COMMIT);
for (auto&& i : ReceivingShards) {
if (WaitShardsResultAck.contains(i)) {
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(false),
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
new TEvPipeCache::TEvForward(
new TEvTxProcessing::TEvReadSet(0, GetTxId(), owner.TabletID(), i, owner.TabletID(), readSetData.SerializeAsString()), i,
true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
}

void SendBrokenFlagAck(TColumnShard& owner) {
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(false),
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
new TEvPipeCache::TEvForward(
new TEvTxProcessing::TEvReadSetAck(0, GetTxId(), owner.TabletID(), ArbiterTabletId, owner.TabletID(), 0), ArbiterTabletId, true),
IEventHandle::FlagTrackDelivery, GetTxId());
Expand All @@ -145,7 +145,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
void SendResult(TColumnShard& owner) {
NKikimrTx::TReadSetData readSetData;
readSetData.SetDecision(SelfBroken ? NKikimrTx::TReadSetData::DECISION_ABORT : NKikimrTx::TReadSetData::DECISION_COMMIT);
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(false),
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
new TEvPipeCache::TEvForward(new TEvTxProcessing::TEvReadSet(
0, GetTxId(), owner.TabletID(), ArbiterTabletId, owner.TabletID(), readSetData.SerializeAsString()),
ArbiterTabletId, true),
Expand Down

0 comments on commit 3b789dc

Please sign in to comment.