diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 10ffc773c241..99c05a742c31 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -822,7 +822,7 @@ struct TEvPQ { }; struct TEvTxCalcPredicateResult : public TEventLocal { - TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, bool predicate) : + TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe predicate) : Step(step), TxId(txId), Partition(partition), @@ -833,7 +833,7 @@ struct TEvPQ { ui64 Step; ui64 TxId; NPQ::TPartitionId Partition; - bool Predicate = false; + TMaybe Predicate; }; struct TEvProposePartitionConfig : public TEventLocal { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 70aafdc97a13..bf216b3034ca 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -966,6 +966,21 @@ void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" << + " Step " << ev->Get()->Step << + ", TxId " << ev->Get()->TxId); + + if (PlanStep.Defined() && TxId.Defined()) { + if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) { + Send(Tablet, + MakeHolder(ev->Get()->Step, + ev->Get()->TxId, + Partition, + Nothing()).Release()); + return; + } + } + PushBackDistrTx(ev->Release()); ProcessTxsAndUserActs(ctx); diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 09532d2c85a3..5e625d12c2e0 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -3506,7 +3506,7 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC " Step " << event.Step << ", TxId " << event.TxId << ", Partition " << event.Partition << - ", Predicate " << (event.Predicate ? "true" : "false")); + ", Predicate " << event.Predicate); auto tx = GetTransaction(ctx, event.TxId); if (!tx) { @@ -4212,9 +4212,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, tx.WriteInProgress = false; - // - // запланированные события будут отправлены в EndWriteTxs - // + // scheduled events will be sent to EndWriteTxs tx.State = NKikimrPQ::TTransaction::PREPARED; PQ_LOG_D("TxId " << tx.TxId << @@ -4242,9 +4240,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, tx.WriteInProgress = false; - // - // запланированные события будут отправлены в EndWriteTxs - // + // scheduled events will be sent to EndWriteTxs tx.State = NKikimrPQ::TTransaction::PLANNED; PQ_LOG_D("TxId " << tx.TxId << @@ -4274,6 +4270,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, switch (tx.Kind) { case NKikimrPQ::TTransaction::KIND_DATA: case NKikimrPQ::TTransaction::KIND_CONFIG: + WriteTx(tx, NKikimrPQ::TTransaction::CALCULATED); + tx.State = NKikimrPQ::TTransaction::CALCULATED; PQ_LOG_D("TxId " << tx.TxId << ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); @@ -4283,14 +4281,12 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::KIND_UNKNOWN: Y_ABORT_UNLESS(false); } - } else { - break; } - [[fallthrough]]; + break; case NKikimrPQ::TTransaction::CALCULATED: - Y_ABORT_UNLESS(!tx.WriteInProgress, + Y_ABORT_UNLESS(tx.WriteInProgress, "PQ %" PRIu64 ", TxId %" PRIu64, TabletID(), tx.TxId); diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index 405e5d67b154..8bc361b938a1 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -215,8 +215,13 @@ void TDistributedTransaction::OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPred { PQ_LOG_D("Handle TEvTxCalcPredicateResult"); - OnPartitionResult(event, - event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT); + TMaybe decision; + + if (event.Predicate.Defined()) { + decision = *event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT; + } + + OnPartitionResult(event, decision); } void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvProposePartitionConfigResult& event) @@ -228,14 +233,16 @@ void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvPro } template -void TDistributedTransaction::OnPartitionResult(const E& event, EDecision decision) +void TDistributedTransaction::OnPartitionResult(const E& event, TMaybe decision) { Y_ABORT_UNLESS(Step == event.Step); Y_ABORT_UNLESS(TxId == event.TxId); Y_ABORT_UNLESS(Partitions.contains(event.Partition.OriginalPartitionId)); - SetDecision(SelfDecision, decision); + if (decision.Defined()) { + SetDecision(SelfDecision, *decision); + } ++PartitionRepliesCount; diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index 156f10643f71..549a10252062 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -90,7 +90,7 @@ struct TDistributedTransaction { void InitPartitions(); template - void OnPartitionResult(const E& event, EDecision decision); + void OnPartitionResult(const E& event, TMaybe decision); TString LogPrefix() const;