Skip to content

Commit

Permalink
Transaction state after PQ tablet restart (#7015)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored Jul 24, 2024
1 parent 55903d1 commit bf2dd5d
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 54 deletions.
81 changes: 40 additions & 41 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -923,11 +923,9 @@ void TPersQueue::MoveTopTxToCalculating(TDistributedTransaction& tx,
converterFactory,
tx.TopicConverter,
ctx);
if (InitCompleted) {
CreateNewPartitions(tx.TabletConfig,
tx.TopicConverter,
ctx);
}
CreateNewPartitions(tx.TabletConfig,
tx.TopicConverter,
ctx);
SendEvProposePartitionConfig(ctx, tx);
break;
}
Expand All @@ -940,24 +938,6 @@ void TPersQueue::MoveTopTxToCalculating(TDistributedTransaction& tx,
", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
}

void TPersQueue::UpdateTopTxState(const TActorContext& ctx)
{
Y_ABORT_UNLESS(!InitCompleted);

if (TxQueue.empty()) {
return;
}

Y_ABORT_UNLESS(Txs.contains(TxQueue.front().second));
auto& tx = Txs.at(TxQueue.front().second);

if (tx.State <= NKikimrPQ::TTransaction::PLANNED) {
return;
}

MoveTopTxToCalculating(tx, ctx);
}

void TPersQueue::AddSupportivePartition(const TPartitionId& partitionId)
{
Partitions.emplace(partitionId,
Expand Down Expand Up @@ -1097,8 +1077,6 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
ctx);
}

UpdateTopTxState(ctx);

ConfigInited = true;

InitializeMeteringSink(ctx);
Expand Down Expand Up @@ -3961,7 +3939,9 @@ void TPersQueue::SendEvTxCommitToPartitions(const TActorContext& ctx,
auto event = std::make_unique<TEvPQ::TEvTxCommit>(tx.Step, tx.TxId);

auto p = Partitions.find(TPartitionId(partitionId));
Y_ABORT_UNLESS(p != Partitions.end());
Y_ABORT_UNLESS(p != Partitions.end(),
"Tablet %" PRIu64 ", Partition %" PRIu32 ", TxId %" PRIu64,
TabletID(), partitionId, tx.TxId);

ctx.Send(p->second.Actor, event.release());
}
Expand Down Expand Up @@ -4061,7 +4041,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,

switch (tx.State) {
case NKikimrPQ::TTransaction::UNKNOWN:
Y_ABORT_UNLESS(tx.TxId != Max<ui64>());
Y_ABORT_UNLESS(tx.TxId != Max<ui64>(),
"PQ %" PRIu64 ", TxId %" PRIu64,
TabletID(), tx.TxId);

WriteTx(tx, NKikimrPQ::TTransaction::PREPARED);
ScheduleProposeTransactionResult(tx);
Expand All @@ -4073,7 +4055,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
break;

case NKikimrPQ::TTransaction::PREPARING:
Y_ABORT_UNLESS(tx.WriteInProgress);
Y_ABORT_UNLESS(tx.WriteInProgress,
"PQ %" PRIu64 ", TxId %" PRIu64,
TabletID(), tx.TxId);

tx.WriteInProgress = false;

Expand All @@ -4088,7 +4072,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
break;

case NKikimrPQ::TTransaction::PREPARED:
Y_ABORT_UNLESS(tx.Step != Max<ui64>());
Y_ABORT_UNLESS(tx.Step != Max<ui64>(),
"PQ %" PRIu64 ", TxId %" PRIu64,
TabletID(), tx.TxId);

WriteTx(tx, NKikimrPQ::TTransaction::PLANNED);

Expand All @@ -4099,7 +4085,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
break;

case NKikimrPQ::TTransaction::PLANNING:
Y_ABORT_UNLESS(tx.WriteInProgress);
Y_ABORT_UNLESS(tx.WriteInProgress,
"PQ %" PRIu64 ", TxId %" PRIu64,
TabletID(), tx.TxId);

tx.WriteInProgress = false;

Expand All @@ -4123,7 +4111,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
break;

case NKikimrPQ::TTransaction::CALCULATING:
Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected);
Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected,
"PQ %" PRIu64 ", TxId %" PRIu64 ", PartitionRepliesCount %" PRISZT ", PartitionRepliesExpected %" PRISZT,
TabletID(), tx.TxId,
tx.PartitionRepliesCount, tx.PartitionRepliesExpected);

PQ_LOG_D("Received " << tx.PartitionRepliesCount <<
", Expected " << tx.PartitionRepliesExpected);
Expand All @@ -4132,8 +4123,6 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
switch (tx.Kind) {
case NKikimrPQ::TTransaction::KIND_DATA:
case NKikimrPQ::TTransaction::KIND_CONFIG:
WriteTx(tx, NKikimrPQ::TTransaction::WAIT_RS);

tx.State = NKikimrPQ::TTransaction::CALCULATED;
PQ_LOG_D("TxId " << tx.TxId <<
", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
Expand All @@ -4143,14 +4132,16 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
case NKikimrPQ::TTransaction::KIND_UNKNOWN:
Y_ABORT_UNLESS(false);
}
} else {
break;
}

break;
[[fallthrough]];

case NKikimrPQ::TTransaction::CALCULATED:
Y_ABORT_UNLESS(tx.WriteInProgress);

tx.WriteInProgress = false;
Y_ABORT_UNLESS(!tx.WriteInProgress,
"PQ %" PRIu64 ", TxId %" PRIu64,
TabletID(), tx.TxId);

tx.State = NKikimrPQ::TTransaction::WAIT_RS;
PQ_LOG_D("TxId " << tx.TxId <<
Expand All @@ -4164,7 +4155,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
// from TEvProposeTransaction
//
Y_ABORT_UNLESS(tx.ReadSetAcks.size() <= tx.PredicatesReceived.size(),
"tx.ReadSetAcks.size=%" PRISZT ", tx.PredicatesReceived.size=%" PRISZT,
"PQ %" PRIu64 ", TxId %" PRIu64 ", ReadSetAcks.size %" PRISZT ", PredicatesReceived.size %" PRISZT,
TabletID(), tx.TxId,
tx.ReadSetAcks.size(), tx.PredicatesReceived.size());

SendEvReadSetToReceivers(ctx, tx);
Expand All @@ -4188,14 +4180,21 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
[[fallthrough]];

case NKikimrPQ::TTransaction::EXECUTING:
Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected);
Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected,
"PQ %" PRIu64 ", TxId %" PRIu64 ", PartitionRepliesCount %" PRISZT ", PartitionRepliesExpected %" PRISZT,
TabletID(), tx.TxId,
tx.PartitionRepliesCount, tx.PartitionRepliesExpected);

PQ_LOG_D("Received " << tx.PartitionRepliesCount <<
", Expected " << tx.PartitionRepliesExpected);

if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) {
Y_ABORT_UNLESS(!TxQueue.empty());
Y_ABORT_UNLESS(TxQueue.front().second == tx.TxId);
Y_ABORT_UNLESS(!TxQueue.empty(),
"PQ %" PRIu64 ", TxId %" PRIu64,
TabletID(), tx.TxId);
Y_ABORT_UNLESS(TxQueue.front().second == tx.TxId,
"PQ %" PRIu64 ", TxId %" PRIu64,
TabletID(), tx.TxId);

SendEvProposeTransactionResult(ctx, tx);

Expand Down
1 change: 0 additions & 1 deletion ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,6 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
const TWriteId& writeId) const;
bool CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBody) const;

void UpdateTopTxState(const TActorContext& ctx);
void MoveTopTxToCalculating(TDistributedTransaction& tx, const TActorContext& ctx);
};

Expand Down
164 changes: 152 additions & 12 deletions ydb/core/persqueue/ut/pqtablet_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
void StartPQWriteTxsObserver();
void WaitForPQWriteTxs();

template <class T> void WaitForEvent();
void WaitForCalcPredicateResult();
void WaitForProposePartitionConfigResult();
template <class T> void WaitForEvent(size_t count);
void WaitForCalcPredicateResult(size_t count = 1);
void WaitForProposePartitionConfigResult(size_t count = 1);

void TestWaitingForTEvReadSet(size_t senders, size_t receivers);

Expand Down Expand Up @@ -524,14 +524,16 @@ void TPQTabletFixture::WaitDropTabletReply(const TDropTabletReplyMatcher& matche
}

template <class T>
void TPQTabletFixture::WaitForEvent()
void TPQTabletFixture::WaitForEvent(size_t count)
{
bool found = false;
size_t received = 0;

TTestActorRuntimeBase::TEventObserver prev;
auto observer = [&found, &prev](TAutoPtr<IEventHandle>& event) {
auto observer = [&found, &prev, &received, count](TAutoPtr<IEventHandle>& event) {
if (auto* msg = event->CastAsLocal<T>()) {
found = true;
++received;
found = (received >= count);
}

return prev ? prev(event) : TTestActorRuntimeBase::EEventAction::PROCESS;
Expand All @@ -549,14 +551,14 @@ void TPQTabletFixture::WaitForEvent()
Ctx->Runtime->SetObserverFunc(prev);
}

void TPQTabletFixture::WaitForCalcPredicateResult()
void TPQTabletFixture::WaitForCalcPredicateResult(size_t count)
{
WaitForEvent<TEvPQ::TEvTxCalcPredicateResult>();
WaitForEvent<TEvPQ::TEvTxCalcPredicateResult>(count);
}

void TPQTabletFixture::WaitForProposePartitionConfigResult()
void TPQTabletFixture::WaitForProposePartitionConfigResult(size_t count)
{
WaitForEvent<TEvPQ::TEvProposePartitionConfigResult>();
WaitForEvent<TEvPQ::TEvProposePartitionConfigResult>(count);
}

std::unique_ptr<TEvPersQueue::TEvRequest> TPQTabletFixture::MakeGetOwnershipRequest(const TGetOwnershipRequestParams& params,
Expand Down Expand Up @@ -1370,7 +1372,7 @@ Y_UNIT_TEST_F(Read_TEvTxCommit_After_Restart, TPQTabletFixture)

WaitForCalcPredicateResult();

// the transaction is now in the WAIT_RS state on disk and in memory
// the transaction is now in the WAIT_RS state in memory and PLANNED state in disk

PQTabletRestart(*Ctx);

Expand Down Expand Up @@ -1416,7 +1418,145 @@ Y_UNIT_TEST_F(Config_TEvTxCommit_After_Restart, TPQTabletFixture)

WaitForProposePartitionConfigResult();

// the transaction is now in the WAIT_RS state on disk and in memory
// the transaction is now in the WAIT_RS state in memory and PLANNED state in disk

PQTabletRestart(*Ctx);

tablet->SendReadSet(*Ctx->Runtime, {.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});

WaitProposeTransactionResponse({.TxId=txId,
.Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});

tablet->SendReadSetAck(*Ctx->Runtime, {.Step=100, .TxId=txId, .Source=Ctx->TabletId});
WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=mockTabletId, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId});
}

Y_UNIT_TEST_F(One_Tablet_For_All_Partitions, TPQTabletFixture)
{
const ui64 txId = 67890;

PQTabletPrepare({.partitions=1}, {}, *Ctx);

auto tabletConfig = NHelpers::MakeConfig({.Version=2,
.Consumers={
{.Consumer="client-1", .Generation=0},
{.Consumer="client-3", .Generation=7}
},
.Partitions={
{.Id=0},
{.Id=1},
{.Id=2}
},
.AllPartitions={
{.Id=0, .TabletId=Ctx->TabletId, .Children={1, 2}, .Parents={}},
{.Id=1, .TabletId=Ctx->TabletId, .Children={}, .Parents={0}},
{.Id=2, .TabletId=Ctx->TabletId, .Children={}, .Parents={0}}
}});

SendProposeTransactionRequest({.TxId=txId,
.Configs=NHelpers::TConfigParams{
.Tablet=tabletConfig,
.Bootstrap=NHelpers::MakeBootstrapConfig(),
}});
WaitProposeTransactionResponse({.TxId=txId,
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});

SendPlanStep({.Step=100, .TxIds={txId}});

WaitForProposePartitionConfigResult(2);

// the transaction is now in the WAIT_RS state in memory and PLANNED state in disk

PQTabletRestart(*Ctx);

WaitProposeTransactionResponse({.TxId=txId,
.Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
}

Y_UNIT_TEST_F(One_New_Partition_In_Another_Tablet, TPQTabletFixture)
{
const ui64 txId = 67890;
const ui64 mockTabletId = 22222;

NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(mockTabletId);
PQTabletPrepare({.partitions=1}, {}, *Ctx);

auto tabletConfig = NHelpers::MakeConfig({.Version=2,
.Consumers={
{.Consumer="client-1", .Generation=0},
{.Consumer="client-3", .Generation=7}
},
.Partitions={
{.Id=0},
{.Id=1},
},
.AllPartitions={
{.Id=0, .TabletId=Ctx->TabletId, .Children={1, 2}, .Parents={}},
{.Id=1, .TabletId=Ctx->TabletId, .Children={}, .Parents={0}},
{.Id=2, .TabletId=mockTabletId, .Children={}, .Parents={0}}
}});

SendProposeTransactionRequest({.TxId=txId,
.Configs=NHelpers::TConfigParams{
.Tablet=tabletConfig,
.Bootstrap=NHelpers::MakeBootstrapConfig(),
}});
WaitProposeTransactionResponse({.TxId=txId,
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});

SendPlanStep({.Step=100, .TxIds={txId}});

WaitForProposePartitionConfigResult(2);

// the transaction is now in the WAIT_RS state in memory and PLANNED state in disk

PQTabletRestart(*Ctx);

tablet->SendReadSet(*Ctx->Runtime, {.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});

WaitProposeTransactionResponse({.TxId=txId,
.Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});

tablet->SendReadSetAck(*Ctx->Runtime, {.Step=100, .TxId=txId, .Source=Ctx->TabletId});
WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=mockTabletId, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId});
}

Y_UNIT_TEST_F(All_New_Partitions_In_Another_Tablet, TPQTabletFixture)
{
const ui64 txId = 67890;
const ui64 mockTabletId = 22222;

NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(mockTabletId);
PQTabletPrepare({.partitions=1}, {}, *Ctx);

auto tabletConfig = NHelpers::MakeConfig({.Version=2,
.Consumers={
{.Consumer="client-1", .Generation=0},
{.Consumer="client-3", .Generation=7}
},
.Partitions={
{.Id=0},
{.Id=1},
},
.AllPartitions={
{.Id=0, .TabletId=Ctx->TabletId, .Children={}, .Parents={2}},
{.Id=1, .TabletId=Ctx->TabletId, .Children={}, .Parents={2}},
{.Id=2, .TabletId=mockTabletId, .Children={0, 1}, .Parents={}}
}});

SendProposeTransactionRequest({.TxId=txId,
.Configs=NHelpers::TConfigParams{
.Tablet=tabletConfig,
.Bootstrap=NHelpers::MakeBootstrapConfig(),
}});
WaitProposeTransactionResponse({.TxId=txId,
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});

SendPlanStep({.Step=100, .TxIds={txId}});

WaitForProposePartitionConfigResult(2);

// the transaction is now in the WAIT_RS state in memory and PLANNED state in disk

PQTabletRestart(*Ctx);

Expand Down

0 comments on commit bf2dd5d

Please sign in to comment.