diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp index 499d82cf0fbf..f81b3a7da3b2 100644 --- a/ydb/core/persqueue/partition_sourcemanager.cpp +++ b/ydb/core/persqueue/partition_sourcemanager.cpp @@ -148,7 +148,7 @@ std::optional TPartitionSourceManager::TSourceManager::UpdatedSeqNo() cons void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp) { if (InMemory == MemoryStorage().end()) { - Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, seqNo, offset, timestamp); + Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp); } else { Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp)); } diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index f307f4a70a20..ebcd3df4917f 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -327,7 +327,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { if (it == SourceIdStorage.GetInMemorySourceIds().end()) { Y_ABORT_UNLESS(!writeResponse.Msg.HeartbeatVersion); TabletCounters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1); - SourceIdStorage.RegisterSourceId(s, seqNo, 0, offset, CurrentTimestamp); + SourceIdStorage.RegisterSourceId(s, seqNo, offset, CurrentTimestamp); } else if (const auto& hbVersion = writeResponse.Msg.HeartbeatVersion) { SourceIdStorage.RegisterSourceId(s, it->second.Updated( seqNo, offset, CurrentTimestamp, THeartbeat{*hbVersion, writeResponse.Msg.Data} @@ -378,7 +378,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { } Y_ABORT_UNLESS(body.AssignedOffset); - SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, 0, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange)); + SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange)); ReplyOk(ctx, response.GetCookie()); } else if (response.IsDeregisterMessageGroup()) { const auto& body = response.GetDeregisterMessageGroup().Body; @@ -399,7 +399,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { } Y_ABORT_UNLESS(body.AssignedOffset); - SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, 0, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true); + SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true); } ReplyOk(ctx, response.GetCookie()); @@ -836,7 +836,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TRegisterMessageGroupMsg& m } body.AssignedOffset = parameters.CurOffset; - parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, 0, parameters.CurOffset, CurrentTimestamp, std::move(keyRange)); + parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange)); return ProcessResult::Continue; } @@ -859,7 +859,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TSplitMessageGroupMsg& msg, } body.AssignedOffset = parameters.CurOffset; - parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, 0, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true); + parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true); } return ProcessResult::Continue; diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 731fbfc67b90..15ebebb65770 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -146,11 +146,11 @@ class TReadProxy : public TActorBootstrapped { PreparedResponse = std::make_shared(); } } - + auto& responseRecord = isDirectRead ? *PreparedResponse : Response->Record; responseRecord.SetStatus(NMsgBusProxy::MSTATUS_OK); - responseRecord.SetErrorCode(NPersQueue::NErrorCode::OK); - + responseRecord.SetErrorCode(NPersQueue::NErrorCode::OK); + Y_ABORT_UNLESS(readResult.ResultSize() > 0); bool isStart = false; if (!responseRecord.HasPartitionResponse()) { @@ -191,7 +191,7 @@ class TReadProxy : public TActorBootstrapped { } if (isNewMsg) { - if (!isStart && readResult.GetResult(i).HasTotalParts() + if (!isStart && readResult.GetResult(i).HasTotalParts() && readResult.GetResult(i).GetTotalParts() + i > readResult.ResultSize()) //last blob is not full break; partResp->AddResult()->CopyFrom(readResult.GetResult(i)); @@ -292,7 +292,7 @@ class TReadProxy : public TActorBootstrapped { }; -TActorId CreateReadProxy(const TActorId& sender, const TActorId& tablet, ui32 tabletGeneration, +TActorId CreateReadProxy(const TActorId& sender, const TActorId& tablet, ui32 tabletGeneration, const TDirectReadKey& directReadKey, const NKikimrClient::TPersQueueRequest& request, const TActorContext& ctx) { @@ -304,7 +304,7 @@ class TResponseBuilder { public: TResponseBuilder(const TActorId& sender, const TActorId& tablet, const TString& topicName, const ui32 partition, const ui64 messageNo, - const TString& reqId, const TMaybe cookie, NMetrics::TResourceMetrics* resourceMetrics, + const TString& reqId, const TMaybe cookie, NMetrics::TResourceMetrics* resourceMetrics, const TActorContext& ctx) : Sender(sender) , Tablet(tablet) @@ -639,7 +639,7 @@ struct TPersQueue::TReplyToActor { Event(std::move(event)) { } - + TActorId ActorId; TEventBasePtr Event; }; @@ -840,7 +840,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill()); return; } - + Y_ABORT_UNLESS(readRange.HasStatus()); if (readRange.GetStatus() != NKikimrProto::OK && readRange.GetStatus() != NKikimrProto::NODATA) { LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, @@ -1267,7 +1267,7 @@ void TPersQueue::FinishResponse(THashMap>::iter void TPersQueue::Handle(TEvPersQueue::TEvUpdateConfig::TPtr& ev, const TActorContext& ctx) -{ +{ if (!ConfigInited) { UpdateConfigRequests.emplace_back(ev->Release(), ev->Sender); return; @@ -1304,7 +1304,7 @@ void TPersQueue::TrySendUpdateConfigResponses(const TActorContext& ctx) ChangeConfigNotification.clear(); } - + void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config, NPersQueue::TConverterFactoryPtr& converterFactory, NPersQueue::TTopicConverterPtr& topicConverter, @@ -1500,7 +1500,7 @@ void TPersQueue::AddCmdWriteConfig(TEvKeyValue::TEvRequest* request, keyRange = TPartitionKeyRange::Parse(mg.GetKeyRange()); } - sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, 0, ctx.Now(), std::move(keyRange)); + sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange)); } for (const auto& partition : cfg.GetPartitions()) { @@ -2109,7 +2109,7 @@ void TPersQueue::HandleReadRequest( ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::READ_ERROR_NO_SESSION, TStringBuilder() << "Read prepare request with unknown(old?) session id " << cmd.GetSessionId()); return; - } + } } THolder event = @@ -2375,7 +2375,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& } ResponseProxy[responseCookie] = ans; Counters->Simple()[COUNTER_PQ_TABLET_INFLIGHT].Set(ResponseProxy.size()); - + if (!ConfigInited) { ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::INITIALIZING, "tablet is not ready"); return; @@ -2396,11 +2396,11 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::BAD_REQUEST, "no partition number"); return; } - + TPartitionId partition(req.GetPartition()); auto it = Partitions.find(partition); - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic '" + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic '" << (TopicConverter ? TopicConverter->GetClientsideName() : "Undefined") << "' partition " << partition); if (it == Partitions.end()) { @@ -2859,7 +2859,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte std::unique_ptr ack; if (!(event.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_ACK)) { - ack = std::make_unique(*ev->Get(), TabletID()); + ack = std::make_unique(*ev->Get(), TabletID()); } if (auto tx = GetTransaction(ctx, event.GetTxId()); tx && tx->Senders.contains(event.GetTabletProducer())) { @@ -2927,7 +2927,7 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC void TPersQueue::Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const TActorContext& ctx) { const TEvPQ::TEvProposePartitionConfigResult& event = *ev->Get(); - + auto tx = GetTransaction(ctx, event.TxId); if (!tx) { return; @@ -3582,7 +3582,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected); PQ_LOG_T("TxId="<< tx.TxId << ", State=EXECUTING" << - ", tx.PartitionRepliesCount=" << tx.PartitionRepliesCount << + ", tx.PartitionRepliesCount=" << tx.PartitionRepliesCount << ", tx.PartitionRepliesExpected=" << tx.PartitionRepliesExpected); if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) { Y_ABORT_UNLESS(!TxQueue.empty()); @@ -3728,7 +3728,7 @@ TPartition* TPersQueue::CreatePartitionActor(const TPartitionId& partitionId, const TActorContext& ctx) { int channels = Info()->Channels.size() - NKeyValue::BLOB_CHANNEL; // channels 0,1 are reserved in tablet - Y_ABORT_UNLESS(channels > 0); + Y_ABORT_UNLESS(channels > 0); return new TPartition(TabletID(), partitionId, @@ -3793,7 +3793,7 @@ void TPersQueue::EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig& Y_VERIFY_S(was.contains(partition.GetPartitionId()), "New config is bad, missing partition " << partition.GetPartitionId()); } } - + void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange, THashMap>& partitionTxs) { diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp index 73dac7232040..86d949fa394c 100644 --- a/ydb/core/persqueue/sourceid.cpp +++ b/ydb/core/persqueue/sourceid.cpp @@ -101,18 +101,18 @@ void THeartbeatProcessor::ForgetSourceId(const TString& sourceId) { SourceIdsWithHeartbeat.erase(sourceId); } -TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs) +TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs) : SeqNo(seqNo) - , MinSeqNo(minSeqNo) + , MinSeqNo(seqNo) , Offset(offset) , WriteTimestamp(createTs) , CreateTimestamp(createTs) { } -TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat) +TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat) : SeqNo(seqNo) - , MinSeqNo(minSeqNo) + , MinSeqNo(seqNo) , Offset(offset) , WriteTimestamp(createTs) , CreateTimestamp(createTs) @@ -120,9 +120,9 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant cr { } -TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, TMaybe&& keyRange, bool isInSplit) +TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe&& keyRange, bool isInSplit) : SeqNo(seqNo) - , MinSeqNo(minSeqNo) + , MinSeqNo(seqNo) , Offset(offset) , CreateTimestamp(createTs) , Explicit(true) @@ -136,7 +136,7 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant cr TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const { auto copy = *this; copy.SeqNo = seqNo; - if (copy.MinSeqNo == 0 || copy.MinSeqNo > seqNo) { + if (copy.MinSeqNo == 0) { copy.MinSeqNo = seqNo; } copy.Offset = offset; diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h index 12ba710d0f55..81c70bd4603e 100644 --- a/ydb/core/persqueue/sourceid.h +++ b/ydb/core/persqueue/sourceid.h @@ -34,9 +34,9 @@ struct TSourceIdInfo { EState State = EState::Registered; TSourceIdInfo() = default; - TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs); - TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat); - TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, TMaybe&& keyRange, bool isInSplit = false); + TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs); + TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat); + TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe&& keyRange, bool isInSplit = false); TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const; TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const; diff --git a/ydb/core/persqueue/ut/sourceid_ut.cpp b/ydb/core/persqueue/ut/sourceid_ut.cpp index f85464396a24..530f34806b46 100644 --- a/ydb/core/persqueue/ut/sourceid_ut.cpp +++ b/ydb/core/persqueue/ut/sourceid_ut.cpp @@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { TSourceIdWriter writer(ESourceIdFormat::Raw); const auto sourceId = TestSourceId(1); - const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100)); + const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100)); writer.RegisterSourceId(sourceId, sourceIdInfo); UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1); @@ -35,7 +35,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { } const auto anotherSourceId = TestSourceId(2); - const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200)); + const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200)); UNIT_ASSERT_VALUES_UNEQUAL(sourceIdInfo, anotherSourceIdInfo); { @@ -47,7 +47,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { Y_UNIT_TEST(SourceIdWriterClean) { TSourceIdWriter writer(ESourceIdFormat::Raw); - writer.RegisterSourceId(TestSourceId(), 1, 0, 10, TInstant::Seconds(100)); + writer.RegisterSourceId(TestSourceId(), 1, 10, TInstant::Seconds(100)); UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1); writer.Clear(); @@ -60,7 +60,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { auto expectedRequest = MakeHolder(); const auto sourceId = TestSourceId(1); - const auto sourceIdInfo = TSourceIdInfo(1, 1, 10, TInstant::Seconds(100)); + const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100)); writer.RegisterSourceId(sourceId, sourceIdInfo); UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1); { @@ -78,7 +78,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { } const auto anotherSourceId = TestSourceId(2); - const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200)); + const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200)); writer.RegisterSourceId(anotherSourceId, anotherSourceIdInfo); UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 2); { @@ -102,9 +102,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { TSourceIdStorage storage; const auto sourceId = TestSourceId(1); - const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100)); + const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100)); const auto anotherSourceId = TestSourceId(2); - const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200)); + const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200)); storage.RegisterSourceId(sourceId, sourceIdInfo); UNIT_ASSERT_VALUES_EQUAL(storage.GetInMemorySourceIds().size(), 1); @@ -130,7 +130,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { void SourceIdStorageParseAndAdd(TKeyPrefix::EMark mark, ESourceIdFormat format) { const auto sourceId = TestSourceId(); - const auto sourceIdInfo = TSourceIdInfo(1, 1, 10, TInstant::Seconds(100)); + const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100)); TKeyPrefix ikey(TKeyPrefix::TypeInfo, TPartitionId(TestPartition), mark); TBuffer idata; @@ -162,20 +162,20 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { TSourceIdStorage storage; const auto sourceId = TestSourceId(1); - storage.RegisterSourceId(sourceId, 1, 0, 10, TInstant::Seconds(100)); + storage.RegisterSourceId(sourceId, 1, 10, TInstant::Seconds(100)); { auto ds = storage.MinAvailableTimestamp(now); UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(100)); } const auto anotherSourceId = TestSourceId(2); - storage.RegisterSourceId(anotherSourceId, 2, 0, 20, TInstant::Seconds(200)); + storage.RegisterSourceId(anotherSourceId, 2, 20, TInstant::Seconds(200)); { auto ds = storage.MinAvailableTimestamp(now); UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(100)); } - storage.RegisterSourceId(sourceId, 3, 0, 30, TInstant::Seconds(300)); + storage.RegisterSourceId(sourceId, 3, 30, TInstant::Seconds(300)); { auto ds = storage.MinAvailableTimestamp(now); UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(200)); @@ -185,7 +185,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { Y_UNIT_TEST(SourceIdStorageTestClean) { TSourceIdStorage storage; for (ui64 i = 1; i <= 10000; ++i) { - storage.RegisterSourceId(TestSourceId(i), i, 0, i, TInstant::Seconds(10 * i)); + storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i)); } NKikimrPQ::TPartitionConfig config; @@ -226,7 +226,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { Y_UNIT_TEST(SourceIdStorageDeleteByMaxCount) { TSourceIdStorage storage; for (ui64 i = 1; i <= 10000; ++i) { - storage.RegisterSourceId(TestSourceId(i), i, 0, i, TInstant::Seconds(10 * i)); + storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i)); } NKikimrPQ::TPartitionConfig config; @@ -258,7 +258,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { Y_UNIT_TEST(SourceIdStorageComplexDelete) { TSourceIdStorage storage; for (ui64 i = 1; i <= 10000 + 1; ++i) { // add 10000 + one extra sources - storage.RegisterSourceId(TestSourceId(i), i, 1, i , TInstant::Seconds(10 * i)); + storage.RegisterSourceId(TestSourceId(i), i, i , TInstant::Seconds(10 * i)); } NKikimrPQ::TPartitionConfig config; @@ -297,7 +297,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { const auto sourceId = TestSourceId(i); const auto owner = TestOwner(sourceId); - storage.RegisterSourceId(sourceId, i, 0, i, TInstant::Hours(i)); + storage.RegisterSourceId(sourceId, i, i, TInstant::Hours(i)); storage.RegisterSourceIdOwner(sourceId, owner); owners[owner]; } @@ -324,7 +324,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { } inline static TSourceIdInfo MakeExplicitSourceIdInfo(ui64 offset, const TMaybe& heartbeat = Nothing()) { - auto info = TSourceIdInfo(0, 0, offset, TInstant::Now()); + auto info = TSourceIdInfo(0, offset, TInstant::Now()); info.Explicit = true; if (heartbeat) { @@ -464,18 +464,18 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { TSourceIdStorage storage; const auto sourceId = TestSourceId(1); - const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100)); + const auto sourceIdInfo = TSourceIdInfo(2, 10, TInstant::Seconds(100)); const auto anotherSourceId = TestSourceId(2); - const auto anotherSourceIdInfo = TSourceIdInfo(2, 1, 20, TInstant::Seconds(200)); + const auto anotherSourceIdInfo = TSourceIdInfo(0, 20, TInstant::Seconds(200)); storage.RegisterSourceId(sourceId, sourceIdInfo); storage.RegisterSourceId(anotherSourceId, anotherSourceIdInfo); { auto it = storage.GetInMemorySourceIds().find(anotherSourceId); - UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 1); + UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 0); } - storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(2, 11, TInstant::Seconds(100))); + storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(3, 11, TInstant::Seconds(100))); { auto it = storage.GetInMemorySourceIds().find(sourceId); UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 2); @@ -483,12 +483,12 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(1, 12, TInstant::Seconds(100))); { auto it = storage.GetInMemorySourceIds().find(sourceId); - UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 1); + UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 2); } storage.RegisterSourceId(anotherSourceId, anotherSourceIdInfo.Updated(3, 12, TInstant::Seconds(100))); { auto it = storage.GetInMemorySourceIds().find(anotherSourceId); - UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 1); + UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 3); } }