From 6026e60e879d202b15a29eba3e5647a724e2e04e Mon Sep 17 00:00:00 2001 From: Konstantin Melekhov Date: Wed, 7 Feb 2024 09:12:48 +0000 Subject: [PATCH] LOGBROKER-8894 --- .../persqueue/partition_sourcemanager.cpp | 7 ++- ydb/core/persqueue/partition_sourcemanager.h | 3 +- ydb/core/persqueue/partition_write.cpp | 16 ++--- ydb/core/persqueue/pq_impl.cpp | 2 +- ydb/core/persqueue/sourceid.cpp | 20 +++++-- ydb/core/persqueue/sourceid.h | 11 ++-- ydb/core/persqueue/ut/sourceid_ut.cpp | 59 ++++++++++++++----- ydb/core/protos/pqconfig.proto | 1 + 8 files changed, 79 insertions(+), 40 deletions(-) diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp index f73e21ec38ad..905b0608502e 100644 --- a/ydb/core/persqueue/partition_sourcemanager.cpp +++ b/ydb/core/persqueue/partition_sourcemanager.cpp @@ -104,6 +104,7 @@ TPartitionSourceManager& TPartitionSourceManager::TModificationBatch::GetManager TPartitionSourceManager::TSourceInfo Convert(TSourceIdInfo value) { TPartitionSourceManager::TSourceInfo result(value.State); result.SeqNo = value.SeqNo; + result.MinSeqNo = value.MinSeqNo; result.Offset = value.Offset; result.Explicit = value.Explicit; result.WriteTimestamp = value.WriteTimestamp; @@ -145,11 +146,11 @@ std::optional TPartitionSourceManager::TSourceManager::UpdatedSeqNo() cons return InWriter == WriteStorage().end() ? std::nullopt : std::optional(InWriter->second.SeqNo); } -void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp) { +void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant timestamp) { if (InMemory == MemoryStorage().end()) { - Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp); + Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, minSeqNo, offset, timestamp); } else { - Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp)); + Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, minSeqNo, offset, timestamp)); } } diff --git a/ydb/core/persqueue/partition_sourcemanager.h b/ydb/core/persqueue/partition_sourcemanager.h index e825eedbd77a..a0d2e72d30cc 100644 --- a/ydb/core/persqueue/partition_sourcemanager.h +++ b/ydb/core/persqueue/partition_sourcemanager.h @@ -25,6 +25,7 @@ class TPartitionSourceManager { TSourceIdInfo::EState State; ui64 SeqNo = 0; + ui64 MinSeqNo = 0; ui64 Offset = 0; bool Explicit = false; TInstant WriteTimestamp; @@ -45,7 +46,7 @@ class TPartitionSourceManager { std::optional CommittedSeqNo() const; std::optional UpdatedSeqNo() const; - void Update(ui64 seqNo, ui64 offset, TInstant timestamp); + void Update(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant timestamp); void Update(THeartbeat&& heartbeat); private: diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 786639ecba8c..6288a5289135 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -327,13 +327,13 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { if (it == SourceIdStorage.GetInMemorySourceIds().end()) { Y_ABORT_UNLESS(!writeResponse.Msg.HeartbeatVersion); TabletCounters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1); - SourceIdStorage.RegisterSourceId(s, seqNo, offset, CurrentTimestamp); + SourceIdStorage.RegisterSourceId(s, seqNo, 0, offset, CurrentTimestamp); } else if (const auto& hbVersion = writeResponse.Msg.HeartbeatVersion) { SourceIdStorage.RegisterSourceId(s, it->second.Updated( - seqNo, offset, CurrentTimestamp, THeartbeat{*hbVersion, writeResponse.Msg.Data} + seqNo, seqNo, offset, CurrentTimestamp, THeartbeat{*hbVersion, writeResponse.Msg.Data} )); } else { - SourceIdStorage.RegisterSourceId(s, it->second.Updated(seqNo, offset, CurrentTimestamp)); + SourceIdStorage.RegisterSourceId(s, it->second.Updated(seqNo, seqNo, offset, CurrentTimestamp)); } TabletCounters.Cumulative()[COUNTER_PQ_WRITE_OK].Increment(1); @@ -378,7 +378,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { } Y_ABORT_UNLESS(body.AssignedOffset); - SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange)); + SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, 0, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange)); ReplyOk(ctx, response.GetCookie()); } else if (response.IsDeregisterMessageGroup()) { const auto& body = response.GetDeregisterMessageGroup().Body; @@ -399,7 +399,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { } Y_ABORT_UNLESS(body.AssignedOffset); - SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true); + SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, 0, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true); } ReplyOk(ctx, response.GetCookie()); @@ -836,7 +836,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TRegisterMessageGroupMsg& m } body.AssignedOffset = parameters.CurOffset; - parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange)); + parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, 0, parameters.CurOffset, CurrentTimestamp, std::move(keyRange)); return ProcessResult::Continue; } @@ -859,7 +859,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TSplitMessageGroupMsg& msg, } body.AssignedOffset = parameters.CurOffset; - parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true); + parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, 0, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true); } return ProcessResult::Continue; @@ -1132,7 +1132,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame << " NewHead: " << NewHead ); - sourceId.Update(p.Msg.SeqNo, curOffset, CurrentTimestamp); + sourceId.Update(p.Msg.SeqNo, p.Msg.SeqNo, curOffset, CurrentTimestamp); ++curOffset; PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize); diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index af6459c15e4c..731fbfc67b90 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1500,7 +1500,7 @@ void TPersQueue::AddCmdWriteConfig(TEvKeyValue::TEvRequest* request, keyRange = TPartitionKeyRange::Parse(mg.GetKeyRange()); } - sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange)); + sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, 0, ctx.Now(), std::move(keyRange)); } for (const auto& partition : cfg.GetPartitions()) { diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp index b47dd70c7945..76a19197ba55 100644 --- a/ydb/core/persqueue/sourceid.cpp +++ b/ydb/core/persqueue/sourceid.cpp @@ -101,16 +101,18 @@ void THeartbeatProcessor::ForgetSourceId(const TString& sourceId) { SourceIdsWithHeartbeat.erase(sourceId); } -TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs) +TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs) : SeqNo(seqNo) + , MinSeqNo(minSeqNo) , Offset(offset) , WriteTimestamp(createTs) , CreateTimestamp(createTs) { } -TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat) +TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat) : SeqNo(seqNo) + , MinSeqNo(minSeqNo) , Offset(offset) , WriteTimestamp(createTs) , CreateTimestamp(createTs) @@ -118,8 +120,9 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartb { } -TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe&& keyRange, bool isInSplit) +TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, TMaybe&& keyRange, bool isInSplit) : SeqNo(seqNo) + , MinSeqNo(minSeqNo) , Offset(offset) , CreateTimestamp(createTs) , Explicit(true) @@ -130,17 +133,20 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe< } } -TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const { +TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant writeTs) const { auto copy = *this; copy.SeqNo = seqNo; + if (minSeqNo) { + copy.MinSeqNo = minSeqNo; + } copy.Offset = offset; copy.WriteTimestamp = writeTs; return copy; } -TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const { - auto copy = Updated(seqNo, offset, writeTs); +TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const { + auto copy = Updated(seqNo, minSeqNo, offset, writeTs); copy.LastHeartbeat = std::move(heartbeat); return copy; @@ -178,6 +184,7 @@ void TSourceIdInfo::Serialize(TBuffer& data) const { TSourceIdInfo TSourceIdInfo::Parse(const NKikimrPQ::TMessageGroupInfo& proto) { TSourceIdInfo result; result.SeqNo = proto.GetSeqNo(); + result.MinSeqNo = proto.GetMinSeqNo(); result.Offset = proto.GetOffset(); result.WriteTimestamp = TInstant::FromValue(proto.GetWriteTimestamp()); result.CreateTimestamp = TInstant::FromValue(proto.GetCreateTimestamp()); @@ -197,6 +204,7 @@ TSourceIdInfo TSourceIdInfo::Parse(const NKikimrPQ::TMessageGroupInfo& proto) { void TSourceIdInfo::Serialize(NKikimrPQ::TMessageGroupInfo& proto) const { proto.SetSeqNo(SeqNo); + proto.SetMinSeqNo(MinSeqNo); proto.SetOffset(Offset); proto.SetWriteTimestamp(WriteTimestamp.GetValue()); proto.SetCreateTimestamp(CreateTimestamp.GetValue()); diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h index 8566810c40ce..ddd9acced371 100644 --- a/ydb/core/persqueue/sourceid.h +++ b/ydb/core/persqueue/sourceid.h @@ -24,6 +24,7 @@ struct TSourceIdInfo { }; ui64 SeqNo = 0; + ui64 MinSeqNo = 0; ui64 Offset = 0; TInstant WriteTimestamp; TInstant CreateTimestamp; @@ -33,12 +34,12 @@ struct TSourceIdInfo { EState State = EState::Registered; TSourceIdInfo() = default; - TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs); - TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat); - TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe&& keyRange, bool isInSplit = false); + TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs); + TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat); + TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, TMaybe&& keyRange, bool isInSplit = false); - TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const; - TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const; + TSourceIdInfo Updated(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant writeTs) const; + TSourceIdInfo Updated(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const; static EState ConvertState(NKikimrPQ::TMessageGroupInfo::EState value); static NKikimrPQ::TMessageGroupInfo::EState ConvertState(EState value); diff --git a/ydb/core/persqueue/ut/sourceid_ut.cpp b/ydb/core/persqueue/ut/sourceid_ut.cpp index 8cb27cdc2b81..edc20142dcd0 100644 --- a/ydb/core/persqueue/ut/sourceid_ut.cpp +++ b/ydb/core/persqueue/ut/sourceid_ut.cpp @@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { TSourceIdWriter writer(ESourceIdFormat::Raw); const auto sourceId = TestSourceId(1); - const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100)); + const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100)); writer.RegisterSourceId(sourceId, sourceIdInfo); UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1); @@ -35,7 +35,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { } const auto anotherSourceId = TestSourceId(2); - const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200)); + const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200)); UNIT_ASSERT_VALUES_UNEQUAL(sourceIdInfo, anotherSourceIdInfo); { @@ -47,7 +47,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { Y_UNIT_TEST(SourceIdWriterClean) { TSourceIdWriter writer(ESourceIdFormat::Raw); - writer.RegisterSourceId(TestSourceId(), 1, 10, TInstant::Seconds(100)); + writer.RegisterSourceId(TestSourceId(), 1, 0, 10, TInstant::Seconds(100)); UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1); writer.Clear(); @@ -60,7 +60,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { auto expectedRequest = MakeHolder(); const auto sourceId = TestSourceId(1); - const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100)); + const auto sourceIdInfo = TSourceIdInfo(1, 1, 10, TInstant::Seconds(100)); writer.RegisterSourceId(sourceId, sourceIdInfo); UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1); { @@ -78,7 +78,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { } const auto anotherSourceId = TestSourceId(2); - const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200)); + const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200)); writer.RegisterSourceId(anotherSourceId, anotherSourceIdInfo); UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 2); { @@ -102,9 +102,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { TSourceIdStorage storage; const auto sourceId = TestSourceId(1); - const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100)); + const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100)); const auto anotherSourceId = TestSourceId(2); - const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200)); + const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200)); storage.RegisterSourceId(sourceId, sourceIdInfo); UNIT_ASSERT_VALUES_EQUAL(storage.GetInMemorySourceIds().size(), 1); @@ -130,7 +130,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { void SourceIdStorageParseAndAdd(TKeyPrefix::EMark mark, ESourceIdFormat format) { const auto sourceId = TestSourceId(); - const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100)); + const auto sourceIdInfo = TSourceIdInfo(1, 1, 10, TInstant::Seconds(100)); TKeyPrefix ikey(TKeyPrefix::TypeInfo, TPartitionId(TestPartition), mark); TBuffer idata; @@ -162,20 +162,20 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { TSourceIdStorage storage; const auto sourceId = TestSourceId(1); - storage.RegisterSourceId(sourceId, 1, 10, TInstant::Seconds(100)); + storage.RegisterSourceId(sourceId, 1, 0, 10, TInstant::Seconds(100)); { auto ds = storage.MinAvailableTimestamp(now); UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(100)); } const auto anotherSourceId = TestSourceId(2); - storage.RegisterSourceId(anotherSourceId, 2, 20, TInstant::Seconds(200)); + storage.RegisterSourceId(anotherSourceId, 2, 0, 20, TInstant::Seconds(200)); { auto ds = storage.MinAvailableTimestamp(now); UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(100)); } - storage.RegisterSourceId(sourceId, 3, 30, TInstant::Seconds(300)); + storage.RegisterSourceId(sourceId, 3, 0, 30, TInstant::Seconds(300)); { auto ds = storage.MinAvailableTimestamp(now); UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(200)); @@ -185,7 +185,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { Y_UNIT_TEST(SourceIdStorageTestClean) { TSourceIdStorage storage; for (ui64 i = 1; i <= 10000; ++i) { - storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i)); + storage.RegisterSourceId(TestSourceId(i), i, 0, i, TInstant::Seconds(10 * i)); } NKikimrPQ::TPartitionConfig config; @@ -226,7 +226,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { Y_UNIT_TEST(SourceIdStorageDeleteByMaxCount) { TSourceIdStorage storage; for (ui64 i = 1; i <= 10000; ++i) { - storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i)); + storage.RegisterSourceId(TestSourceId(i), i, 0, i, TInstant::Seconds(10 * i)); } NKikimrPQ::TPartitionConfig config; @@ -258,7 +258,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { Y_UNIT_TEST(SourceIdStorageComplexDelete) { TSourceIdStorage storage; for (ui64 i = 1; i <= 10000 + 1; ++i) { // add 10000 + one extra sources - storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i)); + storage.RegisterSourceId(TestSourceId(i), i, 1, i , TInstant::Seconds(10 * i)); } NKikimrPQ::TPartitionConfig config; @@ -297,7 +297,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { const auto sourceId = TestSourceId(i); const auto owner = TestOwner(sourceId); - storage.RegisterSourceId(sourceId, i, i, TInstant::Hours(i)); + storage.RegisterSourceId(sourceId, i, 0, i, TInstant::Hours(i)); storage.RegisterSourceIdOwner(sourceId, owner); owners[owner]; } @@ -324,7 +324,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { } inline static TSourceIdInfo MakeExplicitSourceIdInfo(ui64 offset, const TMaybe& heartbeat = Nothing()) { - auto info = TSourceIdInfo(0, offset, TInstant::Now()); + auto info = TSourceIdInfo(0, 0, offset, TInstant::Now()); info.Explicit = true; if (heartbeat) { @@ -460,6 +460,33 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { } } + Y_UNIT_TEST(SourceIdMinSeqNo) { + TSourceIdStorage storage; + + const auto sourceId = TestSourceId(1); + const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100)); + const auto anotherSourceId = TestSourceId(2); + const auto anotherSourceIdInfo = TSourceIdInfo(2, 1, 20, TInstant::Seconds(200)); + + storage.RegisterSourceId(sourceId, sourceIdInfo); + storage.RegisterSourceId(sourceId, anotherSourceIdInfo); + storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(2, 2, 11, TInstant::Seconds(100))); + { + auto it = storage.GetInMemorySourceIds().find(sourceId); + UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 2); + } + storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(2, 1, 12, TInstant::Seconds(100))); + { + auto it = storage.GetInMemorySourceIds().find(sourceId); + UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 1); + } + storage.RegisterSourceId(anotherSourceId, anotherSourceIdInfo.Updated(2, 0, 12, TInstant::Seconds(100))); + { + auto it = storage.GetInMemorySourceIds().find(anotherSourceId); + UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 1); + } + } + } // TSourceIdTests } // namespace NKikimr::NPQ diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index bb70645ab640..e17dd24f53d0 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -377,6 +377,7 @@ message TMessageGroupInfo { } optional uint64 SeqNo = 1; + optional uint64 MinSeqNo = 9; optional uint64 Offset = 2; optional uint64 WriteTimestamp = 3; // TInstant::TValue optional uint64 CreateTimestamp = 4; // TInstant::TValue