Skip to content

Commit

Permalink
code EES_WRITTEN_IN_TX (#7766)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored Aug 14, 2024
1 parent 49fe58f commit ac2e53a
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 49 deletions.
2 changes: 2 additions & 0 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ void TPartition::ReplyWrite(
write->SetTotalTimeInPartitionQueueMs(queueTime.MilliSeconds());
write->SetWriteTimeMs(writeTime.MilliSeconds());

write->SetWrittenInTx(IsSupportive());

ctx.Send(Tablet, response.Release());
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/msgbus_pq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ message TPersQueuePartitionResponse {
optional uint32 TopicQuotedTimeMs = 11;
optional uint32 TotalTimeInPartitionQueueMs = 9;
optional uint32 WriteTimeMs = 10;

optional bool WrittenInTx = 12;
}

message TCmdGetMaxSeqNoResult {
Expand Down
4 changes: 4 additions & 0 deletions ydb/public/api/protos/ydb_topic.proto
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ message StreamWriteMessage {
oneof message_write_status {
Written written = 2;
Skipped skipped = 3;
WrittenInTx written_in_tx = 4;
}

message Written {
Expand All @@ -213,6 +214,9 @@ message StreamWriteMessage {
REASON_ALREADY_WRITTEN = 1;
}
}

message WrittenInTx {
}
}

// Message with write statistics.
Expand Down
22 changes: 14 additions & 8 deletions ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -982,17 +982,23 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
writeStat->PartitionQuotedTime = durationConv(stat.partition_quota_wait_time());
writeStat->TopicQuotedTime = durationConv(stat.topic_quota_wait_time());

for (size_t messageIndex = 0, endIndex = batchWriteResponse.acks_size(); messageIndex != endIndex; ++messageIndex) {
for (const auto& ack : batchWriteResponse.acks()) {
// TODO: Fill writer statistics
auto ack = batchWriteResponse.acks(messageIndex);
ui64 sequenceNumber = ack.seq_no();

Y_ABORT_UNLESS(ack.has_written() || ack.has_skipped());
auto msgWriteStatus = ack.has_written()
? TWriteSessionEvent::TWriteAck::EES_WRITTEN
: (ack.skipped().reason() == Ydb::Topic::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason_REASON_ALREADY_WRITTEN
? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN
: TWriteSessionEvent::TWriteAck::EES_DISCARDED);
Y_ABORT_UNLESS(ack.has_written() || ack.has_skipped() || ack.has_written_in_tx());

TWriteSessionEvent::TWriteAck::EEventState msgWriteStatus;
if (ack.has_written_in_tx()) {
msgWriteStatus = TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX;
} else if (ack.has_written()) {
msgWriteStatus = TWriteSessionEvent::TWriteAck::EES_WRITTEN;
} else {
msgWriteStatus =
(ack.skipped().reason() == Ydb::Topic::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason_REASON_ALREADY_WRITTEN)
? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN
: TWriteSessionEvent::TWriteAck::EES_DISCARDED;
}

ui64 offset = ack.has_written() ? ack.written().offset() : 0;

Expand Down
3 changes: 2 additions & 1 deletion ydb/public/sdk/cpp/client/ydb_topic/include/write_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ struct TWriteSessionEvent {
enum EEventState {
EES_WRITTEN, //! Successfully written.
EES_ALREADY_WRITTEN, //! Skipped on SeqNo deduplication.
EES_DISCARDED //! In case of destruction of writer or retry policy discarded future retries in this writer.
EES_DISCARDED, //! In case of destruction of writer or retry policy discarded future retries in this writer.
EES_WRITTEN_IN_TX, //! Successfully written in tx.
};
//! Details of successfully written message.
struct TWrittenMessageDetails {
Expand Down
94 changes: 56 additions & 38 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@ class TFixture : public NUnitTest::TBaseFixture {
TTopicWriteSessionPtr Session;
TMaybe<NTopic::TContinuationToken> ContinuationToken;
size_t WriteCount = 0;
size_t AckCount = 0;
size_t WrittenAckCount = 0;
size_t WrittenInTxAckCount = 0;

void WaitForContinuationToken();
void Write(const TString& message, NTable::TTransaction* tx = nullptr);

size_t AckCount() const { return WrittenAckCount + WrittenInTxAckCount; }

void WaitForEvent();
};

void SetUp(NUnitTest::TTestContext&) override;
Expand Down Expand Up @@ -88,7 +93,8 @@ class TFixture : public NUnitTest::TBaseFixture {
NTable::TTransaction* tx = nullptr,
TMaybe<ui32> partitionId = Nothing());
void WaitForAcks(const TString& topicPath,
const TString& messageGroupId);
const TString& messageGroupId,
size_t writtenInTxCount = Max<size_t>());
void WaitForSessionClose(const TString& topicPath,
const TString& messageGroupId,
NYdb::EStatus status);
Expand Down Expand Up @@ -591,19 +597,31 @@ auto TFixture::GetTopicReadSession(const TString& topicPath,
void TFixture::TTopicWriteSessionContext::WaitForContinuationToken()
{
while (!ContinuationToken.Defined()) {
Session->WaitEvent().Wait();
for (auto& event : Session->GetEvents()) {
if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&event)) {
ContinuationToken = std::move(e->ContinuationToken);
} else if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TAcksEvent>(&event)) {
for (auto& ack : e->Acks) {
if (ack.State == NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN) {
++AckCount;
}
WaitForEvent();
}
}

void TFixture::TTopicWriteSessionContext::WaitForEvent()
{
Session->WaitEvent().Wait();
for (auto& event : Session->GetEvents()) {
if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&event)) {
ContinuationToken = std::move(e->ContinuationToken);
} else if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TAcksEvent>(&event)) {
for (auto& ack : e->Acks) {
switch (ack.State) {
case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN:
++WrittenAckCount;
break;
case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX:
++WrittenInTxAckCount;
break;
default:
break;
}
} else if (auto* e = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
UNIT_FAIL("");
}
} else if (auto* e = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
UNIT_FAIL("");
}
}
}
Expand Down Expand Up @@ -691,34 +709,25 @@ TVector<TString> TFixture::ReadFromTopic(const TString& topicPath,
return messages;
}

void TFixture::WaitForAcks(const TString& topicPath, const TString& messageGroupId)
void TFixture::WaitForAcks(const TString& topicPath, const TString& messageGroupId, size_t writtenInTxCount)
{
std::pair<TString, TString> key(topicPath, messageGroupId);
auto i = TopicWriteSessions.find(key);
UNIT_ASSERT(i != TopicWriteSessions.end());

auto& context = i->second;

UNIT_ASSERT(context.AckCount <= context.WriteCount);
UNIT_ASSERT(context.AckCount() <= context.WriteCount);

while (context.AckCount < context.WriteCount) {
context.Session->WaitEvent().Wait();
for (auto& event : context.Session->GetEvents()) {
if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&event)) {
context.ContinuationToken = std::move(e->ContinuationToken);
} else if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TAcksEvent>(&event)) {
for (auto& ack : e->Acks) {
if (ack.State == NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN) {
++context.AckCount;
}
}
} else if (auto* e = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
UNIT_FAIL("");
}
}
while (context.AckCount() < context.WriteCount) {
context.WaitForEvent();
}

UNIT_ASSERT(context.AckCount == context.WriteCount);
UNIT_ASSERT((context.WrittenAckCount + context.WrittenInTxAckCount) == context.WriteCount);

if (writtenInTxCount != Max<size_t>()) {
UNIT_ASSERT_VALUES_EQUAL(context.WrittenInTxAckCount, writtenInTxCount);
}
}

void TFixture::WaitForSessionClose(const TString& topicPath,
Expand All @@ -731,7 +740,7 @@ void TFixture::WaitForSessionClose(const TString& topicPath,

auto& context = i->second;

UNIT_ASSERT(context.AckCount <= context.WriteCount);
UNIT_ASSERT(context.AckCount() <= context.WriteCount);

for(bool stop = false; !stop; ) {
context.Session->WaitEvent().Wait();
Expand All @@ -740,8 +749,15 @@ void TFixture::WaitForSessionClose(const TString& topicPath,
context.ContinuationToken = std::move(e->ContinuationToken);
} else if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TAcksEvent>(&event)) {
for (auto& ack : e->Acks) {
if (ack.State == NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN) {
++context.AckCount;
switch (ack.State) {
case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN:
++context.WrittenAckCount;
break;
case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX:
++context.WrittenInTxAckCount;
break;
default:
break;
}
}
} else if (auto* e = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
Expand All @@ -752,7 +768,7 @@ void TFixture::WaitForSessionClose(const TString& topicPath,
}
}

UNIT_ASSERT(context.AckCount <= context.WriteCount);
UNIT_ASSERT(context.AckCount() <= context.WriteCount);
}

ui64 TFixture::GetTopicTabletId(const TActorId& actorId, const TString& topicPath, ui32 partition)
Expand Down Expand Up @@ -1852,7 +1868,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture)
CreateTopic("topic_B", TEST_CONSUMER);
CreateTopic("topic_C", TEST_CONSUMER);

for (size_t i = 0; i < 2; ++i) {
for (size_t i = 0, writtenInTx = 0; i < 2; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", nullptr, 0);
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", nullptr, 0);

Expand All @@ -1862,12 +1878,14 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture)
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0);
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0);
WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID);
++writtenInTx;
WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID, writtenInTx);

messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0);
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0);
WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID);
++writtenInTx;
WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID, writtenInTx);

CommitTx(tx, EStatus::SUCCESS);

Expand Down
6 changes: 4 additions & 2 deletions ydb/services/persqueue_v1/actors/write_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -928,14 +928,16 @@ void TWriteSessionActor<UseMigrationProtocol>::ProcessWriteResponse(
};

auto addAck = [this](const TPersQueuePartitionResponse::TCmdWriteResult& res,
Topic::StreamWriteMessage::WriteResponse* writeResponse,
Topic::StreamWriteMessage::WriteResponse::WriteStatistics* stat) {
Topic::StreamWriteMessage::WriteResponse* writeResponse,
Topic::StreamWriteMessage::WriteResponse::WriteStatistics* stat) {
auto ack = writeResponse->add_acks();
// TODO (ildar-khisam@): validate res before filling ack fields
ack->set_seq_no(res.GetSeqNo());
if (res.GetAlreadyWritten()) {
Y_ABORT_UNLESS(UseDeduplication);
ack->mutable_skipped()->set_reason(Topic::StreamWriteMessage::WriteResponse::WriteAck::Skipped::REASON_ALREADY_WRITTEN);
} else if (res.HasWrittenInTx() && res.GetWrittenInTx()) {
ack->mutable_written_in_tx();
} else {
ack->mutable_written()->set_offset(res.GetOffset());
}
Expand Down

0 comments on commit ac2e53a

Please sign in to comment.