diff --git a/ydb/core/tx/replication/service/json_change_record.cpp b/ydb/core/tx/replication/service/json_change_record.cpp index d6d5808775e8..b11565aed63c 100644 --- a/ydb/core/tx/replication/service/json_change_record.cpp +++ b/ydb/core/tx/replication/service/json_change_record.cpp @@ -33,6 +33,10 @@ NChangeExchange::IChangeRecord::EKind TChangeRecord::GetKind() const { : EKind::CdcDataChange; } +TString TChangeRecord::GetSourceId() const { + return SourceId; +} + static bool ParseKey(TVector& cells, const NJson::TJsonValue::TArray& key, TLightweightSchema::TCPtr schema, TMemoryPool& pool, TString& error) { diff --git a/ydb/core/tx/replication/service/json_change_record.h b/ydb/core/tx/replication/service/json_change_record.h index e8c7f5f9c32a..89f5076fa6ca 100644 --- a/ydb/core/tx/replication/service/json_change_record.h +++ b/ydb/core/tx/replication/service/json_change_record.h @@ -40,6 +40,7 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { ui64 GetStep() const override; ui64 GetTxId() const override; EKind GetKind() const override; + TString GetSourceId() const; void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record, TMemoryPool& pool) const; void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const; @@ -48,6 +49,7 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { TConstArrayRef GetKey() const; private: + TString SourceId; NJson::TJsonValue JsonBody; TLightweightSchema::TCPtr Schema; @@ -59,6 +61,11 @@ class TChangeRecordBuilder: public NChangeExchange::TChangeRecordBuilderSourceId = sourceId; + return static_cast(*this); + } + template TSelf& WithBody(T&& body) { auto res = NJson::ReadJsonTree(body, &GetRecord()->JsonBody); diff --git a/ydb/core/tx/replication/service/table_writer.cpp b/ydb/core/tx/replication/service/table_writer.cpp index 92ff2650cda7..bf9858bda3c3 100644 --- a/ydb/core/tx/replication/service/table_writer.cpp +++ b/ydb/core/tx/replication/service/table_writer.cpp @@ -76,11 +76,21 @@ class TTablePartitionWriter: public TActorBootstrapped { tableId.SetTableId(TableId.PathId.LocalPathId); tableId.SetSchemaVersion(TableId.SchemaVersion); + TString source; for (auto recordPtr : ev->Get()->Records) { MemoryPool.Clear(); const auto& record = *recordPtr->Get(); record.Serialize(*event->Record.AddChanges(), MemoryPool); - // TODO: set WriteTxId, Source + + if (!source) { + source = record.GetSourceId(); + } else { + Y_ABORT_UNLESS(source == record.GetSourceId()); + } + } + + if (source) { + event->Record.SetSource(source); } Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), TabletId, false)); @@ -427,6 +437,7 @@ class TLocalTableWriter for (auto& record : ev->Get()->Records) { records.emplace_back(record.Offset, PathId, record.Data.size()); auto res = PendingRecords.emplace(record.Offset, TChangeRecordBuilder() + .WithSourceId(ev->Get()->Source) .WithOrder(record.Offset) .WithBody(std::move(record.Data)) .WithSchema(Schema) diff --git a/ydb/core/tx/replication/service/table_writer_ut.cpp b/ydb/core/tx/replication/service/table_writer_ut.cpp index ade2ae2bda81..2de0a0a7710a 100644 --- a/ydb/core/tx/replication/service/table_writer_ut.cpp +++ b/ydb/core/tx/replication/service/table_writer_ut.cpp @@ -31,7 +31,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(writer, new TEvWorker::TEvHandshake()); using TRecord = TEvWorker::TEvData::TRecord; - env.Send(writer, new TEvWorker::TEvData({ + env.Send(writer, new TEvWorker::TEvData("TestSource", { TRecord(1, R"({"key":[1], "update":{"value":"10"}})"), TRecord(2, R"({"key":[2], "update":{"value":"20"}})"), TRecord(3, R"({"key":[3], "update":{"value":"30"}})"), @@ -72,7 +72,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(writer, new TEvWorker::TEvHandshake()); using TRecord = TEvWorker::TEvData::TRecord; - env.Send(writer, new TEvWorker::TEvData({ + env.Send(writer, new TEvWorker::TEvData("TestSource", { TRecord(1, R"({"key":[1], "update":{"int32_value":-100500}})"), TRecord(2, R"({"key":[2], "update":{"uint32_value":100500}})"), TRecord(3, R"({"key":[3], "update":{"int64_value":-200500}})"), diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp index d4d9ba4acf9c..8e3133a122c9 100644 --- a/ydb/core/tx/replication/service/topic_reader.cpp +++ b/ydb/core/tx/replication/service/topic_reader.cpp @@ -61,7 +61,7 @@ class TRemoteTopicReader: public TActor { records.emplace_back(msg.GetOffset(), std::move(msg.GetData())); } - Send(Worker, new TEvWorker::TEvData(std::move(records))); + Send(Worker, new TEvWorker::TEvData(ToString(result.PartitionId), std::move(records))); } void Handle(TEvYdbProxy::TEvTopicReaderGone::TPtr& ev) { diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp index 8f2be72c6aa5..bef2b853480f 100644 --- a/ydb/core/tx/replication/service/worker.cpp +++ b/ydb/core/tx/replication/service/worker.cpp @@ -23,13 +23,15 @@ TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data) { } -TEvWorker::TEvData::TEvData(const TVector& records) - : Records(records) +TEvWorker::TEvData::TEvData(const TString& source, const TVector& records) + : Source(source) + , Records(records) { } -TEvWorker::TEvData::TEvData(TVector&& records) - : Records(std::move(records)) +TEvWorker::TEvData::TEvData(const TString& source, TVector&& records) + : Source(source) + , Records(std::move(records)) { } @@ -42,6 +44,7 @@ void TEvWorker::TEvData::TRecord::Out(IOutputStream& out) const { TString TEvWorker::TEvData::ToString() const { return TStringBuilder() << ToStringHeader() << " {" + << " Source: " << Source << " Records [" << JoinSeq(",", Records) << "]" << " }"; } @@ -115,7 +118,7 @@ class TWorker: public TActorBootstrapped { << ": sender# " << ev->Sender); Reader.Registered(); - if (!InFlightRecords) { + if (!InFlightData) { Send(Reader, new TEvWorker::TEvPoll()); } } else if (ev->Sender == Writer) { @@ -123,8 +126,8 @@ class TWorker: public TActorBootstrapped { << ": sender# " << ev->Sender); Writer.Registered(); - if (InFlightRecords) { - Send(Writer, new TEvWorker::TEvData(InFlightRecords)); + if (InFlightData) { + Send(Writer, new TEvWorker::TEvData(InFlightData->Source, InFlightData->Records)); } } else { LOG_W("Handshake from unknown actor" @@ -142,7 +145,7 @@ class TWorker: public TActorBootstrapped { return; } - InFlightRecords.clear(); + InFlightData.Reset(); if (Reader) { Send(ev->Forward(Reader)); } @@ -157,8 +160,8 @@ class TWorker: public TActorBootstrapped { return; } - Y_ABORT_UNLESS(InFlightRecords.empty()); - InFlightRecords = ev->Get()->Records; + Y_ABORT_UNLESS(!InFlightData); + InFlightData = MakeHolder(ev->Get()->Source, ev->Get()->Records); if (Writer) { Send(ev->Forward(Writer)); @@ -239,7 +242,7 @@ class TWorker: public TActorBootstrapped { mutable TMaybe LogPrefix; TActorInfo Reader; TActorInfo Writer; - TVector InFlightRecords; + THolder InFlightData; }; IActor* CreateWorker(std::function&& createReaderFn, std::function&& createWriterFn) { diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h index 523214190ac1..b5d43de7a7a6 100644 --- a/ydb/core/tx/replication/service/worker.h +++ b/ydb/core/tx/replication/service/worker.h @@ -36,10 +36,11 @@ struct TEvWorker { void Out(IOutputStream& out) const; }; + TString Source; TVector Records; - explicit TEvData(const TVector& records); - explicit TEvData(TVector&& records); + explicit TEvData(const TString& source, const TVector& records); + explicit TEvData(const TString& source, TVector&& records); TString ToString() const override; }; diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp index 942fa445e524..fc87485174c5 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp @@ -33,6 +33,7 @@ void TEvYdbProxy::TReadTopicResult::TMessage::Out(IOutputStream& out) const { void TEvYdbProxy::TReadTopicResult::Out(IOutputStream& out) const { out << "{" + << " PartitionId: " << PartitionId << " Messages [" << JoinSeq(",", Messages) << "]" << " }"; } diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h index 04114b5337ae..059449e4180d 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h @@ -186,6 +186,7 @@ struct TEvYdbProxy { }; explicit TReadTopicResult(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) { + PartitionId = event.GetPartitionSession()->GetPartitionId(); Messages.reserve(event.GetMessagesCount()); if (event.HasCompressedMessages()) { for (const auto& msg : event.GetCompressedMessages()) { @@ -200,6 +201,7 @@ struct TEvYdbProxy { void Out(IOutputStream& out) const; + ui64 PartitionId; TVector Messages; };