diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index 80d1cc9ee093..ba87dc66bd9c 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -323,8 +323,14 @@ void TKqpQueryState::AddOffsetsToTransaction() { const auto& operations = GetTopicOperations(); TMaybe consumer; - if (operations.HasConsumer()) + if (operations.HasConsumer()) { consumer = operations.GetConsumer(); + } + + TMaybe supportivePartition; + if (operations.HasSupportivePartition()) { + supportivePartition = operations.GetSupportivePartition(); + } TopicOperations = NTopic::TTopicOperations(); @@ -334,7 +340,7 @@ void TKqpQueryState::AddOffsetsToTransaction() { for (auto& partition : topic.partitions()) { if (partition.partition_offsets().empty()) { - TopicOperations.AddOperation(path, partition.partition_id()); + TopicOperations.AddOperation(path, partition.partition_id(), supportivePartition); } else { for (auto& range : partition.partition_offsets()) { YQL_ENSURE(consumer.Defined()); diff --git a/ydb/core/kqp/topics/kqp_topics.cpp b/ydb/core/kqp/topics/kqp_topics.cpp index a591b864fd95..bb190899dfad 100644 --- a/ydb/core/kqp/topics/kqp_topics.cpp +++ b/ydb/core/kqp/topics/kqp_topics.cpp @@ -8,6 +8,17 @@ namespace NKikimr::NKqp::NTopic { +static void UpdateSupportivePartition(TMaybe& lhs, const TMaybe& rhs) +{ + if (lhs) { + if ((rhs != Nothing()) && (rhs != lhs)) { + lhs = Max(); + } + } else { + lhs = rhs; + } +} + // // TConsumerOperations // @@ -78,7 +89,8 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio Operations_[consumer].AddOperation(consumer, range); } -void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition) +void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition, + TMaybe supportivePartition) { Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == topic); Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == partition); @@ -88,6 +100,8 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio Partition_ = partition; } + UpdateSupportivePartition(SupportivePartition_, supportivePartition); + HasWriteOperations_ = true; } @@ -112,6 +126,9 @@ void TTopicPartitionOperations::BuildTopicTxs(THashMapAdd(); o->SetPartitionId(*Partition_); o->SetPath(*Topic_); + if (SupportivePartition_.Defined()) { + o->SetSupportivePartition(*SupportivePartition_); + } } } @@ -127,6 +144,8 @@ void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs) TabletId_ = rhs.TabletId_; } + UpdateSupportivePartition(SupportivePartition_, rhs.SupportivePartition_); + for (auto& [key, value] : rhs.Operations_) { Operations_[key].Merge(value); } @@ -240,10 +259,11 @@ void TTopicOperations::AddOperation(const TString& topic, ui32 partition, HasReadOperations_ = true; } -void TTopicOperations::AddOperation(const TString& topic, ui32 partition) +void TTopicOperations::AddOperation(const TString& topic, ui32 partition, + TMaybe supportivePartition) { TTopicPartition key{topic, partition}; - Operations_[key].AddOperation(topic, partition); + Operations_[key].AddOperation(topic, partition, supportivePartition); HasWriteOperations_ = true; } diff --git a/ydb/core/kqp/topics/kqp_topics.h b/ydb/core/kqp/topics/kqp_topics.h index a9909824430b..e0e425c3c1f7 100644 --- a/ydb/core/kqp/topics/kqp_topics.h +++ b/ydb/core/kqp/topics/kqp_topics.h @@ -49,7 +49,8 @@ class TTopicPartitionOperations { void AddOperation(const TString& topic, ui32 partition, const TString& consumer, const Ydb::Topic::OffsetsRange& range); - void AddOperation(const TString& topic, ui32 partition); + void AddOperation(const TString& topic, ui32 partition, + TMaybe supportivePartition); void BuildTopicTxs(THashMap &txs); @@ -67,6 +68,7 @@ class TTopicPartitionOperations { THashMap Operations_; bool HasWriteOperations_ = false; TMaybe TabletId_; + TMaybe SupportivePartition_; }; struct TTopicPartition { @@ -98,7 +100,8 @@ class TTopicOperations { void AddOperation(const TString& topic, ui32 partition, const TString& consumer, const Ydb::Topic::OffsetsRange& range); - void AddOperation(const TString& topic, ui32 partition); + void AddOperation(const TString& topic, ui32 partition, + TMaybe supportivePartition); void FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate, TMaybe consumer); diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 07494f713837..f12db51db5ba 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -3042,6 +3042,8 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&) { + Y_ABORT_UNLESS(IsSupportive()); + PendingEvents.emplace_back(ev->ReleaseBase().Release()); } diff --git a/ydb/core/persqueue/partition_id.h b/ydb/core/persqueue/partition_id.h index ce3d53dc6c8b..7367e66af1a1 100644 --- a/ydb/core/persqueue/partition_id.h +++ b/ydb/core/persqueue/partition_id.h @@ -29,14 +29,15 @@ class TPartitionId { size_t GetHash() const { - return MultiHash(OriginalPartitionId, WriteId); + return MultiHash(MultiHash(OriginalPartitionId, WriteId), InternalPartitionId); } bool IsEqual(const TPartitionId& rhs) const { return (OriginalPartitionId == rhs.OriginalPartitionId) && - (WriteId == rhs.WriteId); + (WriteId == rhs.WriteId) && + (InternalPartitionId == rhs.InternalPartitionId); } void ToStream(IOutputStream& s) const diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 04dbf33f6c1b..1295038e1e51 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -41,6 +41,9 @@ void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TS r->SetOwnerCookie(cookie); r->SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active); r->SetSeqNo(seqNo); + if (IsSupportive()) { + r->SetSupportivePartition(Partition.InternalPartitionId); + } ctx.Send(Tablet, response.Release()); } diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 60e50fb463d9..dc68d7a42524 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -2001,7 +2001,8 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p "Tablet " << TabletID() << " Write in transaction." << " Partition: " << req.GetPartition() << - ", WriteId: " << req.GetWriteId()); + ", WriteId: " << req.GetWriteId() << + ", NeedSupportivePartition: " << req.GetNeedSupportivePartition()); } for (ui32 i = 0; i < req.CmdWriteSize(); ++i) { @@ -2198,7 +2199,8 @@ void TPersQueue::HandleReserveBytesRequest(const ui64 responseCookie, const TAct "Tablet " << TabletID() << " Reserve bytes in transaction." << " Partition: " << req.GetPartition() << - ", WriteId: " << req.GetWriteId()); + ", WriteId: " << req.GetWriteId() << + ", NeedSupportivePartition: " << req.GetNeedSupportivePartition()); } InitResponseBuilder(responseCookie, 1, COUNTER_LATENCY_PQ_RESERVE_BYTES); @@ -2574,6 +2576,8 @@ void TPersQueue::HandleWriteRequestForSupportivePartition(const ui64 responseCoo const NKikimrClient::TPersQueuePartitionRequest& req, const TActorContext& ctx) { + Y_ABORT_UNLESS(req.HasWriteId()); + const TPartitionInfo& partition = GetPartitionInfo(req); const TActorId& actorId = partition.Actor; @@ -2648,6 +2652,14 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie, sender); } } else { + if (!req.GetNeedSupportivePartition()) { + ReplyError(ctx, + responseCookie, + NPersQueue::NErrorCode::PRECONDITION_FAILED, + "lost messages"); + return; + } + // // этап 1: // - создать запись в TxWrites @@ -2665,6 +2677,7 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie, TPartitionId partitionId(originalPartitionId, writeId, NextSupportivePartitionId++); writeInfo.Partitions.emplace(originalPartitionId, partitionId); + TxWritesChanged = true; AddSupportivePartition(partitionId); Y_ABORT_UNLESS(Partitions.contains(partitionId)); @@ -3115,6 +3128,38 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc } +bool TPersQueue::CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation, + ui64 writeId) const +{ + TPartitionId partitionId(operation.GetPartitionId(), + writeId, + operation.GetSupportivePartition()); + return Partitions.contains(partitionId); +} + +bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBody) const +{ + if (!txBody.HasWriteId()) { + return true; + } + + ui64 writeId = txBody.GetWriteId(); + + for (auto& operation : txBody.GetOperations()) { + auto isWrite = [](const NKikimrPQ::TPartitionOperation& o) { + return !o.HasBegin(); + }; + + if (isWrite(operation)) { + if (!CheckTxWriteOperation(operation, writeId)) { + return false; + } + } + } + + return true; +} + void TPersQueue::HandleDataTransaction(TAutoPtr ev, const TActorContext& ctx) { @@ -3123,23 +3168,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr(); TString TEvPartitionWriter::TEvInitResult::TSuccess::ToString() const { auto out = TStringBuilder() << "Success {" @@ -288,20 +289,31 @@ class TPartitionWriter: public TActorBootstrapped, private TRl ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(Opts.TxId); - auto* topics = ev->Record.MutableRequest()->MutableTopicOperations()->AddTopics(); + auto* operations = ev->Record.MutableRequest()->MutableTopicOperations(); + auto* topics = operations->AddTopics(); topics->set_path(Opts.TopicPath); auto* partitions = topics->add_partitions(); partitions->set_partition_id(PartitionId); + if (HasSupportivePartitionId()) { + operations->SetSupportivePartition(SupportivePartitionId); + } + return ev; } void SetWriteId(NKikimrClient::TPersQueuePartitionRequest& request) { - if (WriteId != INVALID_WRITE_ID) { + if (HasWriteId()) { request.SetWriteId(WriteId); } } + void SetNeedSupportivePartition(NKikimrClient::TPersQueuePartitionRequest& request, bool value) { + if (HasWriteId()) { + request.SetNeedSupportivePartition(value); + } + } + /// GetOwnership void GetOwnership() { @@ -313,6 +325,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl cmd.SetForce(true); SetWriteId(request); + SetNeedSupportivePartition(request, true); NTabletPipe::SendData(SelfId(), PipeClient, ev.Release()); Become(&TThis::StateGetOwnership); @@ -322,6 +335,8 @@ class TPartitionWriter: public TActorBootstrapped, private TRl switch (ev->GetTypeRewrite()) { hFunc(TEvPersQueue::TEvResponse, HandleOwnership); hFunc(TEvPartitionWriter::TEvWriteRequest, HoldPending); + HFunc(NKqp::TEvKqp::TEvQueryResponse, HandlePartitionIdSaved); + SFunc(TEvents::TEvWakeup, SavePartitionId); default: return StateBase(ev); } @@ -344,7 +359,39 @@ class TPartitionWriter: public TActorBootstrapped, private TRl return InitResult("Partition is inactive", std::move(record)); } - OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie(); + auto& reply = response.GetCmdGetOwnershipResult(); + OwnerCookie = reply.GetOwnerCookie(); + if (reply.HasSupportivePartition()) { + SupportivePartitionId = reply.GetSupportivePartition(); + } + + if (HasWriteId()) { + SavePartitionId(ActorContext()); + } else { + GetMaxSeqNo(); + } + } + + void SavePartitionId(const TActorContext& ctx) { + Y_ABORT_UNLESS(HasWriteId()); + Y_ABORT_UNLESS(HasSupportivePartitionId()); + + auto ev = MakeWriteIdRequest(); + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + } + + void HandlePartitionIdSaved(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) { + auto& record = ev->Get()->Record.GetRef(); + switch (record.GetYdbStatus()) { + case Ydb::StatusIds::SUCCESS: + break; + case Ydb::StatusIds::SESSION_BUSY: + case Ydb::StatusIds::PRECONDITION_FAILED: // see TKqpSessionActor::ReplyBusy + return Retry(record.GetYdbStatus()); + default: + return InitResult("Invalid KQP session", record); + } + GetMaxSeqNo(); } @@ -489,7 +536,11 @@ class TPartitionWriter: public TActorBootstrapped, private TRl return false; } - Pending.emplace(cookie, std::move(ev->Get()->Record)); + auto& request = *record.MutablePartitionRequest(); + SetWriteId(request); + + Pending.emplace(cookie, std::move(record)); + return true; } @@ -829,6 +880,13 @@ class TPartitionWriter: public TActorBootstrapped, private TRl } private: + bool HasWriteId() const { + return WriteId != INVALID_WRITE_ID; + } + + bool HasSupportivePartitionId() const { + return SupportivePartitionId != INVALID_PARTITION_ID; + } const TActorId Client; const ui64 TabletId; @@ -865,6 +923,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl EErrorCode ErrorCode = EErrorCode::InternalError; ui64 WriteId = INVALID_WRITE_ID; + ui32 SupportivePartitionId = INVALID_PARTITION_ID; using IRetryPolicy = IRetryPolicy; using IRetryState = IRetryPolicy::IRetryState; diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index ce192c3a0366..9f64cd9d3bf5 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -75,6 +75,7 @@ enum EQueryReplyFlags { message TTopicOperationsRequest { optional string Consumer = 1; repeated Ydb.Topic.UpdateOffsetsInTransactionRequest.TopicOffsets Topics = 2; + optional uint32 SupportivePartition = 3; } message TTopicOperationsResponse { diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index 6e4c186869dd..f6034600b8ee 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -145,6 +145,7 @@ message TPersQueuePartitionRequest { optional int64 MessageNo = 12; //mandatory for write optional int64 CmdWriteOffset = 13; //optional optional int64 WriteId = 23; + optional bool NeedSupportivePartition = 28; repeated TCmdWrite CmdWrite = 4; optional TCmdGetMaxSeqNo CmdGetMaxSeqNo = 5; @@ -473,6 +474,9 @@ message TPersQueuePartitionResponse { optional string OwnerCookie = 1; optional NKikimrPQ.ETopicPartitionStatus Status = 2; optional int64 SeqNo = 3; + + // transactions + optional int32 SupportivePartition = 4; } message TCmdPrepareDirectReadResult { diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index cd41f66153c4..d1c75795706e 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -930,6 +930,7 @@ message TPartitionOperation { optional uint64 End = 3; optional string Consumer = 4; optional string Path = 5; // topic path + optional uint32 SupportivePartition = 6; }; message TDataTransaction { diff --git a/ydb/public/api/protos/draft/persqueue_error_codes.proto b/ydb/public/api/protos/draft/persqueue_error_codes.proto index ceb8655fc84a..7bc20d53d0f0 100644 --- a/ydb/public/api/protos/draft/persqueue_error_codes.proto +++ b/ydb/public/api/protos/draft/persqueue_error_codes.proto @@ -50,5 +50,7 @@ enum EErrorCode { UNKNOWN_TXID = 30; + PRECONDITION_FAILED = 31; + ERROR = 100; } diff --git a/ydb/public/api/protos/persqueue_error_codes_v1.proto b/ydb/public/api/protos/persqueue_error_codes_v1.proto index 69a11fbd23d2..c11c3fad8daf 100644 --- a/ydb/public/api/protos/persqueue_error_codes_v1.proto +++ b/ydb/public/api/protos/persqueue_error_codes_v1.proto @@ -53,6 +53,8 @@ enum ErrorCode { UNKNOWN_TXID = 500030; + PRECONDITION_FAILED = 500031; + ERROR = 500100; INVALID_ARGUMENT = 500040; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index 0d4b3c752e2d..18662535e1f3 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -6,6 +6,8 @@ #include #include +#include + #include #include @@ -79,6 +81,11 @@ class TFixture : public NUnitTest::TBaseFixture { const TDuration& duration); void WaitForAcks(const TString& topicPath, const TString& messageGroupId); + void WaitForSessionClose(const TString& topicPath, + const TString& messageGroupId, + NYdb::EStatus status); + void CloseTopicWriteSession(const TString& topicPath, + const TString& messageGroupId); enum EEndOfTransaction { Commit, @@ -92,6 +99,10 @@ class TFixture : public NUnitTest::TBaseFixture { }; void TestTheCompletionOfATransaction(const TTransactionCompletionTestDescription& d); + void RestartLongTxService(); + + void DeleteSupportivePartition(const TString& topicName, + ui32 partition); protected: const TDriver& GetDriver() const; @@ -107,6 +118,15 @@ class TFixture : public NUnitTest::TBaseFixture { ui32 partition); THashSet GetTabletKeys(const TActorId& actorId, ui64 tabletId); + ui64 GetTransactionWriteId(const TActorId& actorId, + ui64 tabletId); + void SendLongTxLockStatus(const TActorId& actorId, + ui64 tabletId, + ui64 writeId, + NKikimrLongTxService::TEvLockStatus::EStatus status); + void WaitForTheTabletToDeleteTheWriteInfo(const TActorId& actorId, + ui64 tabletId, + ui64 writeId); void CheckTabletKeys(const TString& topicName); @@ -526,6 +546,20 @@ void TFixture::TTopicWriteSessionContext::Write(const TString& message, NTable:: ContinuationToken = Nothing(); } +void TFixture::CloseTopicWriteSession(const TString& topicPath, + const TString& messageGroupId) +{ + std::pair key(topicPath, messageGroupId); + auto i = TopicWriteSessions.find(key); + + UNIT_ASSERT(i != TopicWriteSessions.end()); + + TTopicWriteSessionContext& context = i->second; + + context.Session->Close(); + TopicWriteSessions.erase(key); +} + void TFixture::WriteToTopic(const TString& topicPath, const TString& messageGroupId, const TString& message, @@ -599,6 +633,40 @@ void TFixture::WaitForAcks(const TString& topicPath, const TString& messageGroup UNIT_ASSERT(context.AckCount == context.WriteCount); } +void TFixture::WaitForSessionClose(const TString& topicPath, + const TString& messageGroupId, + NYdb::EStatus status) +{ + std::pair key(topicPath, messageGroupId); + auto i = TopicWriteSessions.find(key); + UNIT_ASSERT(i != TopicWriteSessions.end()); + + auto& context = i->second; + + UNIT_ASSERT(context.AckCount <= context.WriteCount); + + for(bool stop = false; !stop; ) { + context.Session->WaitEvent().Wait(); + for (auto& event : context.Session->GetEvents()) { + if (auto* e = std::get_if(&event)) { + context.ContinuationToken = std::move(e->ContinuationToken); + } else if (auto* e = std::get_if(&event)) { + for (auto& ack : e->Acks) { + if (ack.State == NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN) { + ++context.AckCount; + } + } + } else if (auto* e = std::get_if(&event)) { + UNIT_ASSERT_VALUES_EQUAL(e->GetStatus(), status); + UNIT_ASSERT_GT(e->GetIssues().Size(), 0); + stop = true; + } + } + } + + UNIT_ASSERT(context.AckCount <= context.WriteCount); +} + ui64 TFixture::GetTopicTabletId(const TActorId& actorId, const TString& topicPath, ui32 partition) { auto navigate = std::make_unique(); @@ -678,6 +746,19 @@ THashSet TFixture::GetTabletKeys(const TActorId& actorId, ui64 tabletId return keys; } +void TFixture::RestartLongTxService() +{ + auto& runtime = Setup->GetRuntime(); + TActorId edge = runtime.AllocateEdgeActor(); + + for (ui32 node = 0; node < runtime.GetNodeCount(); ++node) { + runtime.Send(NKikimr::NLongTxService::MakeLongTxServiceID(runtime.GetNodeId(node)), edge, + new TEvents::TEvPoison(), + 0, + true); + } +} + Y_UNIT_TEST_F(WriteToTopic_Demo_1, TFixture) { CreateTopic("topic_A"); @@ -1075,6 +1156,103 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_10, TFixture) } } +ui64 TFixture::GetTransactionWriteId(const TActorId& actorId, + ui64 tabletId) +{ + using TEvKeyValue = NKikimr::TEvKeyValue; + + auto request = std::make_unique(); + request->Record.SetCookie(12345); + request->Record.AddCmdRead()->SetKey("_txinfo"); + + auto& runtime = Setup->GetRuntime(); + + runtime.SendToPipe(tabletId, actorId, request.release()); + auto response = runtime.GrabEdgeEvent(); + + UNIT_ASSERT(response->Record.HasCookie()); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetCookie(), 12345); + UNIT_ASSERT_VALUES_EQUAL(response->Record.ReadResultSize(), 1); + + auto& read = response->Record.GetReadResult(0); + + NKikimrPQ::TTabletTxInfo info; + UNIT_ASSERT(info.ParseFromString(read.GetValue())); + + UNIT_ASSERT_VALUES_EQUAL(info.TxWritesSize(), 1); + + auto& writeInfo = info.GetTxWrites(0); + UNIT_ASSERT(writeInfo.HasWriteId()); + + return writeInfo.GetWriteId(); +} + +void TFixture::SendLongTxLockStatus(const TActorId& actorId, + ui64 tabletId, + ui64 writeId, + NKikimrLongTxService::TEvLockStatus::EStatus status) +{ + auto event = std::make_unique(writeId, 0, status); + auto& runtime = Setup->GetRuntime(); + runtime.SendToPipe(tabletId, actorId, event.release()); +} + +void TFixture::WaitForTheTabletToDeleteTheWriteInfo(const TActorId& actorId, + ui64 tabletId, + ui64 writeId) +{ + while (true) { + using TEvKeyValue = NKikimr::TEvKeyValue; + + auto request = std::make_unique(); + request->Record.SetCookie(12345); + request->Record.AddCmdRead()->SetKey("_txinfo"); + + auto& runtime = Setup->GetRuntime(); + + runtime.SendToPipe(tabletId, actorId, request.release()); + auto response = runtime.GrabEdgeEvent(); + + UNIT_ASSERT(response->Record.HasCookie()); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetCookie(), 12345); + UNIT_ASSERT_VALUES_EQUAL(response->Record.ReadResultSize(), 1); + + auto& read = response->Record.GetReadResult(0); + + NKikimrPQ::TTabletTxInfo info; + UNIT_ASSERT(info.ParseFromString(read.GetValue())); + + bool found = false; + + for (size_t i = 0; i < info.TxWritesSize(); ++i) { + auto& writeInfo = info.GetTxWrites(i); + UNIT_ASSERT(writeInfo.HasWriteId()); + if (writeInfo.GetWriteId() == writeId) { + found = true; + break; + } + } + + if (!found) { + break; + } + + Sleep(TDuration::MilliSeconds(100)); + } +} + +void TFixture::DeleteSupportivePartition(const TString& topicName, ui32 partition) +{ + auto& runtime = Setup->GetRuntime(); + TActorId edge = runtime.AllocateEdgeActor(); + ui64 tabletId = GetTopicTabletId(edge, "/Root/" + topicName, partition); + ui64 writeId = GetTransactionWriteId(edge, tabletId); + + SendLongTxLockStatus(edge, tabletId, writeId, NKikimrLongTxService::TEvLockStatus::STATUS_NOT_FOUND); + + WaitForTheTabletToDeleteTheWriteInfo(edge, tabletId, writeId); +} + void TFixture::CheckTabletKeys(const TString& topicName) { auto& runtime = Setup->GetRuntime(); @@ -1141,6 +1319,75 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_11, TFixture) } } +Y_UNIT_TEST_F(WriteToTopic_Demo_12, TFixture) +{ + CreateTopic("topic_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + DeleteSupportivePartition("topic_A", 0); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); + WaitForSessionClose("topic_A", TEST_MESSAGE_GROUP_ID, NYdb::EStatus::PRECONDITION_FAILED); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_13, TFixture) +{ + CreateTopic("topic_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message", &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + DeleteSupportivePartition("topic_A", 0); + + CommitTx(tx, EStatus::ABORTED); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_14, TFixture) +{ + CreateTopic("topic_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + DeleteSupportivePartition("topic_A", 0); + + CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + CommitTx(tx, EStatus::ABORTED); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_15, TFixture) +{ + CreateTopic("topic_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #1", &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1); + CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID_1); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, "message #2", &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2); + CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID_2); + + CommitTx(tx, EStatus::SUCCESS); +} + } } diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.cpp b/ydb/services/persqueue_v1/actors/persqueue_utils.cpp index 37b23507821e..5a2cdc0a6d73 100644 --- a/ydb/services/persqueue_v1/actors/persqueue_utils.cpp +++ b/ydb/services/persqueue_v1/actors/persqueue_utils.cpp @@ -129,7 +129,8 @@ Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const Ydb::PersQ return Ydb::StatusIds::UNAVAILABLE; case UNKNOWN_TXID: return Ydb::StatusIds::NOT_FOUND; - + case PRECONDITION_FAILED: + return Ydb::StatusIds::PRECONDITION_FAILED; default: return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; } diff --git a/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp b/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp index 8591b8245fdb..5e5a3c740bae 100644 --- a/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp +++ b/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp @@ -28,7 +28,9 @@ void TPQTabletMock::PrepareGetOwnershipResponse() partition->SetCookie(*cookie); } - partition->MutableCmdGetOwnershipResult()->SetOwnerCookie(OwnerCookie); + auto* result = partition->MutableCmdGetOwnershipResult(); + result->SetOwnerCookie(OwnerCookie); + result->SetSupportivePartition(1'000'000); // fictitious number of the supportive partition } void TPQTabletMock::PrepareGetMaxSeqNoResponse()