Skip to content

Commit

Permalink
Merge 162cb5b into 3033104
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored Sep 13, 2024
2 parents 3033104 + 162cb5b commit d5a5108
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 18 deletions.
4 changes: 2 additions & 2 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ struct TEvPQ {
};

struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, bool predicate) :
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate) :
Step(step),
TxId(txId),
Partition(partition),
Expand All @@ -833,7 +833,7 @@ struct TEvPQ {
ui64 Step;
ui64 TxId;
NPQ::TPartitionId Partition;
bool Predicate = false;
TMaybe<bool> Predicate;
};

struct TEvProposePartitionConfig : public TEventLocal<TEvProposePartitionConfig, EvProposePartitionConfig> {
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,21 @@ void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const

void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" <<
" Step " << ev->Get()->Step <<
", TxId " << ev->Get()->TxId);

if (PlanStep.Defined() && TxId.Defined()) {
if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) {
Send(Tablet,
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Get()->Step,
ev->Get()->TxId,
Partition,
Nothing()).Release());
return;
}
}

PushBackDistrTx(ev->Release());

ProcessTxsAndUserActs(ctx);
Expand Down
18 changes: 7 additions & 11 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3506,7 +3506,7 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC
" Step " << event.Step <<
", TxId " << event.TxId <<
", Partition " << event.Partition <<
", Predicate " << (event.Predicate ? "true" : "false"));
", Predicate " << event.Predicate);

auto tx = GetTransaction(ctx, event.TxId);
if (!tx) {
Expand Down Expand Up @@ -4212,9 +4212,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,

tx.WriteInProgress = false;

//
// запланированные события будут отправлены в EndWriteTxs
//
// scheduled events will be sent to EndWriteTxs

tx.State = NKikimrPQ::TTransaction::PREPARED;
PQ_LOG_D("TxId " << tx.TxId <<
Expand Down Expand Up @@ -4242,9 +4240,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,

tx.WriteInProgress = false;

//
// запланированные события будут отправлены в EndWriteTxs
//
// scheduled events will be sent to EndWriteTxs

tx.State = NKikimrPQ::TTransaction::PLANNED;
PQ_LOG_D("TxId " << tx.TxId <<
Expand Down Expand Up @@ -4274,6 +4270,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
switch (tx.Kind) {
case NKikimrPQ::TTransaction::KIND_DATA:
case NKikimrPQ::TTransaction::KIND_CONFIG:
WriteTx(tx, NKikimrPQ::TTransaction::CALCULATED);

tx.State = NKikimrPQ::TTransaction::CALCULATED;
PQ_LOG_D("TxId " << tx.TxId <<
", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
Expand All @@ -4283,14 +4281,12 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
case NKikimrPQ::TTransaction::KIND_UNKNOWN:
Y_ABORT_UNLESS(false);
}
} else {
break;
}

[[fallthrough]];
break;

case NKikimrPQ::TTransaction::CALCULATED:
Y_ABORT_UNLESS(!tx.WriteInProgress,
Y_ABORT_UNLESS(tx.WriteInProgress,
"PQ %" PRIu64 ", TxId %" PRIu64,
TabletID(), tx.TxId);

Expand Down
15 changes: 11 additions & 4 deletions ydb/core/persqueue/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,13 @@ void TDistributedTransaction::OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPred
{
PQ_LOG_D("Handle TEvTxCalcPredicateResult");

OnPartitionResult(event,
event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT);
TMaybe<EDecision> decision;

if (event.Predicate.Defined()) {
decision = *event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT;
}

OnPartitionResult(event, decision);
}

void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvProposePartitionConfigResult& event)
Expand All @@ -228,14 +233,16 @@ void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvPro
}

template<class E>
void TDistributedTransaction::OnPartitionResult(const E& event, EDecision decision)
void TDistributedTransaction::OnPartitionResult(const E& event, TMaybe<EDecision> decision)
{
Y_ABORT_UNLESS(Step == event.Step);
Y_ABORT_UNLESS(TxId == event.TxId);

Y_ABORT_UNLESS(Partitions.contains(event.Partition.OriginalPartitionId));

SetDecision(SelfDecision, decision);
if (decision.Defined()) {
SetDecision(SelfDecision, *decision);
}

++PartitionRepliesCount;

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ struct TDistributedTransaction {
void InitPartitions();

template<class E>
void OnPartitionResult(const E& event, EDecision decision);
void OnPartitionResult(const E& event, TMaybe<EDecision> decision);

TString LogPrefix() const;

Expand Down

0 comments on commit d5a5108

Please sign in to comment.