From 4bd5a10371fb345eb08fc58d907a562540a3aa8f Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Tue, 13 Aug 2024 18:43:54 +0300 Subject: [PATCH 1/7] [*] proto-files --- ydb/core/protos/msgbus_pq.proto | 2 ++ ydb/public/api/protos/ydb_topic.proto | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index 88112500d1e0..8a2a07fac398 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -446,6 +446,8 @@ message TPersQueuePartitionResponse { optional uint32 TopicQuotedTimeMs = 11; optional uint32 TotalTimeInPartitionQueueMs = 9; optional uint32 WriteTimeMs = 10; + + optional bool WrittenInTx = 12; } message TCmdGetMaxSeqNoResult { diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index b335c158a133..a46af6cdf2bd 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -198,6 +198,7 @@ message StreamWriteMessage { oneof message_write_status { Written written = 2; Skipped skipped = 3; + WrittenInTx written_in_tx = 4; } message Written { @@ -213,6 +214,9 @@ message StreamWriteMessage { REASON_ALREADY_WRITTEN = 1; } } + + message WrittenInTx { + } } // Message with write statistics. From 312006e3073ead8b31d8581cfc158817d4a4a20e Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Tue, 13 Aug 2024 18:45:00 +0300 Subject: [PATCH 2/7] [*] WriteSessionActor --- ydb/services/persqueue_v1/actors/write_session_actor.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.cpp b/ydb/services/persqueue_v1/actors/write_session_actor.cpp index 28c33183ccf2..3d3aaf05f475 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.cpp @@ -928,14 +928,16 @@ void TWriteSessionActor::ProcessWriteResponse( }; auto addAck = [this](const TPersQueuePartitionResponse::TCmdWriteResult& res, - Topic::StreamWriteMessage::WriteResponse* writeResponse, - Topic::StreamWriteMessage::WriteResponse::WriteStatistics* stat) { + Topic::StreamWriteMessage::WriteResponse* writeResponse, + Topic::StreamWriteMessage::WriteResponse::WriteStatistics* stat) { auto ack = writeResponse->add_acks(); // TODO (ildar-khisam@): validate res before filling ack fields ack->set_seq_no(res.GetSeqNo()); if (res.GetAlreadyWritten()) { Y_ABORT_UNLESS(UseDeduplication); ack->mutable_skipped()->set_reason(Topic::StreamWriteMessage::WriteResponse::WriteAck::Skipped::REASON_ALREADY_WRITTEN); + } else if (res.GetWrittenInTx()) { + ack->mutable_written_in_tx(); } else { ack->mutable_written()->set_offset(res.GetOffset()); } From ba162862d4ad9e54e046a7e7447486d384b287ac Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Tue, 13 Aug 2024 18:45:56 +0300 Subject: [PATCH 3/7] [*] refactoring --- .../sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index e6f616bb08fe..da21ad95d437 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -981,9 +981,8 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess writeStat->PartitionQuotedTime = durationConv(stat.partition_quota_wait_time()); writeStat->TopicQuotedTime = durationConv(stat.topic_quota_wait_time()); - for (size_t messageIndex = 0, endIndex = batchWriteResponse.acks_size(); messageIndex != endIndex; ++messageIndex) { + for (const auto& ack : batchWriteResponse.acks()) { // TODO: Fill writer statistics - auto ack = batchWriteResponse.acks(messageIndex); ui64 sequenceNumber = ack.seq_no(); Y_ABORT_UNLESS(ack.has_written() || ack.has_skipped()); From 6e64f5e90d033c75f600afbefde3d4780975db87 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Tue, 13 Aug 2024 19:19:03 +0300 Subject: [PATCH 4/7] [+] the actor of the partition sets `WrittenInTx` --- ydb/core/persqueue/partition_write.cpp | 2 ++ ydb/services/persqueue_v1/actors/write_session_actor.cpp | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 36505f232e3f..0afb95c2fc42 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -78,6 +78,8 @@ void TPartition::ReplyWrite( write->SetTotalTimeInPartitionQueueMs(queueTime.MilliSeconds()); write->SetWriteTimeMs(writeTime.MilliSeconds()); + write->SetWrittenInTx(IsSupportive()); + ctx.Send(Tablet, response.Release()); } diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.cpp b/ydb/services/persqueue_v1/actors/write_session_actor.cpp index 3d3aaf05f475..0dd836b9646d 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.cpp @@ -936,7 +936,7 @@ void TWriteSessionActor::ProcessWriteResponse( if (res.GetAlreadyWritten()) { Y_ABORT_UNLESS(UseDeduplication); ack->mutable_skipped()->set_reason(Topic::StreamWriteMessage::WriteResponse::WriteAck::Skipped::REASON_ALREADY_WRITTEN); - } else if (res.GetWrittenInTx()) { + } else if (res.HasWrittenInTx() && res.GetWrittenInTx()) { ack->mutable_written_in_tx(); } else { ack->mutable_written()->set_offset(res.GetOffset()); From b1166971321856fa780b4776c7db4f85a5bcaa91 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Wed, 14 Aug 2024 10:47:34 +0300 Subject: [PATCH 5/7] [+] code EES_WRITTEN_IN_TX --- .../ydb_topic/impl/write_session_impl.cpp | 19 +++++++++++++------ .../client/ydb_topic/include/write_events.h | 3 ++- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index da21ad95d437..d41d689acc4b 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -985,12 +985,19 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess // TODO: Fill writer statistics ui64 sequenceNumber = ack.seq_no(); - Y_ABORT_UNLESS(ack.has_written() || ack.has_skipped()); - auto msgWriteStatus = ack.has_written() - ? TWriteSessionEvent::TWriteAck::EES_WRITTEN - : (ack.skipped().reason() == Ydb::Topic::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason_REASON_ALREADY_WRITTEN - ? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN - : TWriteSessionEvent::TWriteAck::EES_DISCARDED); + Y_ABORT_UNLESS(ack.has_written() || ack.has_skipped() || ack.has_written_in_tx()); + + TWriteSessionEvent::TWriteAck::EEventState msgWriteStatus; + if (ack.has_written_in_tx()) { + msgWriteStatus = TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX; + } else if (ack.has_written()) { + msgWriteStatus = TWriteSessionEvent::TWriteAck::EES_WRITTEN; + } else { + msgWriteStatus = + (ack.skipped().reason() == Ydb::Topic::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason_REASON_ALREADY_WRITTEN) + ? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN + : TWriteSessionEvent::TWriteAck::EES_DISCARDED; + } ui64 offset = ack.has_written() ? ack.written().offset() : 0; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/include/write_events.h b/ydb/public/sdk/cpp/client/ydb_topic/include/write_events.h index a2ed9fb8d07f..9ac070beeb98 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/include/write_events.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/include/write_events.h @@ -55,7 +55,8 @@ struct TWriteSessionEvent { enum EEventState { EES_WRITTEN, //! Successfully written. EES_ALREADY_WRITTEN, //! Skipped on SeqNo deduplication. - EES_DISCARDED //! In case of destruction of writer or retry policy discarded future retries in this writer. + EES_DISCARDED, //! In case of destruction of writer or retry policy discarded future retries in this writer. + EES_WRITTEN_IN_TX, //! Successfully written in tx. }; //! Details of successfully written message. struct TWrittenMessageDetails { From e9ccd4068d102945808525300a1ebf2281d76a2f Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Wed, 14 Aug 2024 11:27:28 +0300 Subject: [PATCH 6/7] [+] the tests check the number of EES_WRITTEN_IN_TX --- .../client/ydb_topic/ut/topic_to_table_ut.cpp | 65 ++++++++++++++----- 1 file changed, 48 insertions(+), 17 deletions(-) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index 6b27c53c2cc8..696ac0e409b6 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -31,10 +31,13 @@ class TFixture : public NUnitTest::TBaseFixture { TTopicWriteSessionPtr Session; TMaybe ContinuationToken; size_t WriteCount = 0; - size_t AckCount = 0; + size_t WrittenAckCount = 0; + size_t WrittenInTxAckCount = 0; void WaitForContinuationToken(); void Write(const TString& message, NTable::TTransaction* tx = nullptr); + + size_t AckCount() const { return WrittenAckCount + WrittenInTxAckCount; } }; void SetUp(NUnitTest::TTestContext&) override; @@ -88,7 +91,8 @@ class TFixture : public NUnitTest::TBaseFixture { NTable::TTransaction* tx = nullptr, TMaybe partitionId = Nothing()); void WaitForAcks(const TString& topicPath, - const TString& messageGroupId); + const TString& messageGroupId, + size_t writtenInTxCount = Max()); void WaitForSessionClose(const TString& topicPath, const TString& messageGroupId, NYdb::EStatus status); @@ -597,8 +601,15 @@ void TFixture::TTopicWriteSessionContext::WaitForContinuationToken() ContinuationToken = std::move(e->ContinuationToken); } else if (auto* e = std::get_if(&event)) { for (auto& ack : e->Acks) { - if (ack.State == NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN) { - ++AckCount; + switch (ack.State) { + case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN: + ++WrittenAckCount; + break; + case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX: + ++WrittenInTxAckCount; + break; + default: + break; } } } else if (auto* e = std::get_if(&event)) { @@ -691,7 +702,7 @@ TVector TFixture::ReadFromTopic(const TString& topicPath, return messages; } -void TFixture::WaitForAcks(const TString& topicPath, const TString& messageGroupId) +void TFixture::WaitForAcks(const TString& topicPath, const TString& messageGroupId, size_t writtenInTxCount) { std::pair key(topicPath, messageGroupId); auto i = TopicWriteSessions.find(key); @@ -699,17 +710,24 @@ void TFixture::WaitForAcks(const TString& topicPath, const TString& messageGroup auto& context = i->second; - UNIT_ASSERT(context.AckCount <= context.WriteCount); + UNIT_ASSERT(context.AckCount() <= context.WriteCount); - while (context.AckCount < context.WriteCount) { + while (context.AckCount() < context.WriteCount) { context.Session->WaitEvent().Wait(); for (auto& event : context.Session->GetEvents()) { if (auto* e = std::get_if(&event)) { context.ContinuationToken = std::move(e->ContinuationToken); } else if (auto* e = std::get_if(&event)) { for (auto& ack : e->Acks) { - if (ack.State == NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN) { - ++context.AckCount; + switch (ack.State) { + case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN: + ++context.WrittenAckCount; + break; + case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX: + ++context.WrittenInTxAckCount; + break; + default: + break; } } } else if (auto* e = std::get_if(&event)) { @@ -718,7 +736,11 @@ void TFixture::WaitForAcks(const TString& topicPath, const TString& messageGroup } } - UNIT_ASSERT(context.AckCount == context.WriteCount); + UNIT_ASSERT((context.WrittenAckCount + context.WrittenInTxAckCount) == context.WriteCount); + + if (writtenInTxCount != Max()) { + UNIT_ASSERT_VALUES_EQUAL(context.WrittenInTxAckCount, writtenInTxCount); + } } void TFixture::WaitForSessionClose(const TString& topicPath, @@ -731,7 +753,7 @@ void TFixture::WaitForSessionClose(const TString& topicPath, auto& context = i->second; - UNIT_ASSERT(context.AckCount <= context.WriteCount); + UNIT_ASSERT(context.AckCount() <= context.WriteCount); for(bool stop = false; !stop; ) { context.Session->WaitEvent().Wait(); @@ -740,8 +762,15 @@ void TFixture::WaitForSessionClose(const TString& topicPath, context.ContinuationToken = std::move(e->ContinuationToken); } else if (auto* e = std::get_if(&event)) { for (auto& ack : e->Acks) { - if (ack.State == NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN) { - ++context.AckCount; + switch (ack.State) { + case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN: + ++context.WrittenAckCount; + break; + case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX: + ++context.WrittenInTxAckCount; + break; + default: + break; } } } else if (auto* e = std::get_if(&event)) { @@ -752,7 +781,7 @@ void TFixture::WaitForSessionClose(const TString& topicPath, } } - UNIT_ASSERT(context.AckCount <= context.WriteCount); + UNIT_ASSERT(context.AckCount() <= context.WriteCount); } ui64 TFixture::GetTopicTabletId(const TActorId& actorId, const TString& topicPath, ui32 partition) @@ -1852,7 +1881,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture) CreateTopic("topic_B", TEST_CONSUMER); CreateTopic("topic_C", TEST_CONSUMER); - for (size_t i = 0; i < 2; ++i) { + for (size_t i = 0, writtenInTx = 0; i < 2; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", nullptr, 0); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", nullptr, 0); @@ -1862,12 +1891,14 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture) auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0); - WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID); + ++writtenInTx; + WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID, writtenInTx); messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0); - WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID); + ++writtenInTx; + WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID, writtenInTx); CommitTx(tx, EStatus::SUCCESS); From dc06f8a1e84bcd0e6a2aa913b317018656646a63 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Wed, 14 Aug 2024 11:37:07 +0300 Subject: [PATCH 7/7] [*] refactoring --- .../client/ydb_topic/ut/topic_to_table_ut.cpp | 65 ++++++++----------- 1 file changed, 26 insertions(+), 39 deletions(-) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index 696ac0e409b6..010c1d6e5849 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -38,6 +38,8 @@ class TFixture : public NUnitTest::TBaseFixture { void Write(const TString& message, NTable::TTransaction* tx = nullptr); size_t AckCount() const { return WrittenAckCount + WrittenInTxAckCount; } + + void WaitForEvent(); }; void SetUp(NUnitTest::TTestContext&) override; @@ -595,26 +597,31 @@ auto TFixture::GetTopicReadSession(const TString& topicPath, void TFixture::TTopicWriteSessionContext::WaitForContinuationToken() { while (!ContinuationToken.Defined()) { - Session->WaitEvent().Wait(); - for (auto& event : Session->GetEvents()) { - if (auto* e = std::get_if(&event)) { - ContinuationToken = std::move(e->ContinuationToken); - } else if (auto* e = std::get_if(&event)) { - for (auto& ack : e->Acks) { - switch (ack.State) { - case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN: - ++WrittenAckCount; - break; - case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX: - ++WrittenInTxAckCount; - break; - default: - break; - } + WaitForEvent(); + } +} + +void TFixture::TTopicWriteSessionContext::WaitForEvent() +{ + Session->WaitEvent().Wait(); + for (auto& event : Session->GetEvents()) { + if (auto* e = std::get_if(&event)) { + ContinuationToken = std::move(e->ContinuationToken); + } else if (auto* e = std::get_if(&event)) { + for (auto& ack : e->Acks) { + switch (ack.State) { + case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN: + ++WrittenAckCount; + break; + case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX: + ++WrittenInTxAckCount; + break; + default: + break; } - } else if (auto* e = std::get_if(&event)) { - UNIT_FAIL(""); } + } else if (auto* e = std::get_if(&event)) { + UNIT_FAIL(""); } } } @@ -713,27 +720,7 @@ void TFixture::WaitForAcks(const TString& topicPath, const TString& messageGroup UNIT_ASSERT(context.AckCount() <= context.WriteCount); while (context.AckCount() < context.WriteCount) { - context.Session->WaitEvent().Wait(); - for (auto& event : context.Session->GetEvents()) { - if (auto* e = std::get_if(&event)) { - context.ContinuationToken = std::move(e->ContinuationToken); - } else if (auto* e = std::get_if(&event)) { - for (auto& ack : e->Acks) { - switch (ack.State) { - case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN: - ++context.WrittenAckCount; - break; - case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX: - ++context.WrittenInTxAckCount; - break; - default: - break; - } - } - } else if (auto* e = std::get_if(&event)) { - UNIT_FAIL(""); - } - } + context.WaitForEvent(); } UNIT_ASSERT((context.WrittenAckCount + context.WrittenInTxAckCount) == context.WriteCount);