Skip to content

Commit

Permalink
Specify source of changes (ydb-platform#4680)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jun 6, 2024
1 parent 060f4ed commit b2d3c2b
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 17 deletions.
4 changes: 4 additions & 0 deletions ydb/core/tx/replication/service/json_change_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ NChangeExchange::IChangeRecord::EKind TChangeRecord::GetKind() const {
: EKind::CdcDataChange;
}

TString TChangeRecord::GetSourceId() const {
return SourceId;
}

static bool ParseKey(TVector<TCell>& cells,
const NJson::TJsonValue::TArray& key, TLightweightSchema::TCPtr schema, TMemoryPool& pool, TString& error)
{
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/replication/service/json_change_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
ui64 GetStep() const override;
ui64 GetTxId() const override;
EKind GetKind() const override;
TString GetSourceId() const;

void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record, TMemoryPool& pool) const;
void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const;
Expand All @@ -48,6 +49,7 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
TConstArrayRef<TCell> GetKey() const;

private:
TString SourceId;
NJson::TJsonValue JsonBody;
TLightweightSchema::TCPtr Schema;

Expand All @@ -59,6 +61,11 @@ class TChangeRecordBuilder: public NChangeExchange::TChangeRecordBuilder<TChange
public:
using TBase::TBase;

TSelf& WithSourceId(const TString& sourceId) {
GetRecord()->SourceId = sourceId;
return static_cast<TSelf&>(*this);
}

template <typename T>
TSelf& WithBody(T&& body) {
auto res = NJson::ReadJsonTree(body, &GetRecord()->JsonBody);
Expand Down
13 changes: 12 additions & 1 deletion ydb/core/tx/replication/service/table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,21 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {
tableId.SetTableId(TableId.PathId.LocalPathId);
tableId.SetSchemaVersion(TableId.SchemaVersion);

TString source;
for (auto recordPtr : ev->Get()->Records) {
MemoryPool.Clear();
const auto& record = *recordPtr->Get<TChangeRecord>();
record.Serialize(*event->Record.AddChanges(), MemoryPool);
// TODO: set WriteTxId, Source

if (!source) {
source = record.GetSourceId();
} else {
Y_ABORT_UNLESS(source == record.GetSourceId());
}
}

if (source) {
event->Record.SetSource(source);
}

Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), TabletId, false));
Expand Down Expand Up @@ -427,6 +437,7 @@ class TLocalTableWriter
for (auto& record : ev->Get()->Records) {
records.emplace_back(record.Offset, PathId, record.Data.size());
auto res = PendingRecords.emplace(record.Offset, TChangeRecordBuilder()
.WithSourceId(ev->Get()->Source)
.WithOrder(record.Offset)
.WithBody(std::move(record.Data))
.WithSchema(Schema)
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/replication/service/table_writer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());

using TRecord = TEvWorker::TEvData::TRecord;
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData({
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
TRecord(1, R"({"key":[1], "update":{"value":"10"}})"),
TRecord(2, R"({"key":[2], "update":{"value":"20"}})"),
TRecord(3, R"({"key":[3], "update":{"value":"30"}})"),
Expand Down Expand Up @@ -72,7 +72,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());

using TRecord = TEvWorker::TEvData::TRecord;
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData({
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
TRecord(1, R"({"key":[1], "update":{"int32_value":-100500}})"),
TRecord(2, R"({"key":[2], "update":{"uint32_value":100500}})"),
TRecord(3, R"({"key":[3], "update":{"int64_value":-200500}})"),
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/replication/service/topic_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
records.emplace_back(msg.GetOffset(), std::move(msg.GetData()));
}

Send(Worker, new TEvWorker::TEvData(std::move(records)));
Send(Worker, new TEvWorker::TEvData(ToString(result.PartitionId), std::move(records)));
}

void Handle(TEvYdbProxy::TEvTopicReaderGone::TPtr& ev) {
Expand Down
25 changes: 14 additions & 11 deletions ydb/core/tx/replication/service/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data)
{
}

TEvWorker::TEvData::TEvData(const TVector<TRecord>& records)
: Records(records)
TEvWorker::TEvData::TEvData(const TString& source, const TVector<TRecord>& records)
: Source(source)
, Records(records)
{
}

TEvWorker::TEvData::TEvData(TVector<TRecord>&& records)
: Records(std::move(records))
TEvWorker::TEvData::TEvData(const TString& source, TVector<TRecord>&& records)
: Source(source)
, Records(std::move(records))
{
}

Expand All @@ -42,6 +44,7 @@ void TEvWorker::TEvData::TRecord::Out(IOutputStream& out) const {

TString TEvWorker::TEvData::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Source: " << Source
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}
Expand Down Expand Up @@ -115,16 +118,16 @@ class TWorker: public TActorBootstrapped<TWorker> {
<< ": sender# " << ev->Sender);

Reader.Registered();
if (!InFlightRecords) {
if (!InFlightData) {
Send(Reader, new TEvWorker::TEvPoll());
}
} else if (ev->Sender == Writer) {
LOG_I("Handshake with writer"
<< ": sender# " << ev->Sender);

Writer.Registered();
if (InFlightRecords) {
Send(Writer, new TEvWorker::TEvData(InFlightRecords));
if (InFlightData) {
Send(Writer, new TEvWorker::TEvData(InFlightData->Source, InFlightData->Records));
}
} else {
LOG_W("Handshake from unknown actor"
Expand All @@ -142,7 +145,7 @@ class TWorker: public TActorBootstrapped<TWorker> {
return;
}

InFlightRecords.clear();
InFlightData.Reset();
if (Reader) {
Send(ev->Forward(Reader));
}
Expand All @@ -157,8 +160,8 @@ class TWorker: public TActorBootstrapped<TWorker> {
return;
}

Y_ABORT_UNLESS(InFlightRecords.empty());
InFlightRecords = ev->Get()->Records;
Y_ABORT_UNLESS(!InFlightData);
InFlightData = MakeHolder<TEvWorker::TEvData>(ev->Get()->Source, ev->Get()->Records);

if (Writer) {
Send(ev->Forward(Writer));
Expand Down Expand Up @@ -239,7 +242,7 @@ class TWorker: public TActorBootstrapped<TWorker> {
mutable TMaybe<TString> LogPrefix;
TActorInfo Reader;
TActorInfo Writer;
TVector<TEvWorker::TEvData::TRecord> InFlightRecords;
THolder<TEvWorker::TEvData> InFlightData;
};

IActor* CreateWorker(std::function<IActor*(void)>&& createReaderFn, std::function<IActor*(void)>&& createWriterFn) {
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/replication/service/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ struct TEvWorker {
void Out(IOutputStream& out) const;
};

TString Source;
TVector<TRecord> Records;

explicit TEvData(const TVector<TRecord>& records);
explicit TEvData(TVector<TRecord>&& records);
explicit TEvData(const TString& source, const TVector<TRecord>& records);
explicit TEvData(const TString& source, TVector<TRecord>&& records);
TString ToString() const override;
};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ void TEvYdbProxy::TReadTopicResult::TMessage::Out(IOutputStream& out) const {

void TEvYdbProxy::TReadTopicResult::Out(IOutputStream& out) const {
out << "{"
<< " PartitionId: " << PartitionId
<< " Messages [" << JoinSeq(",", Messages) << "]"
<< " }";
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ struct TEvYdbProxy {
};

explicit TReadTopicResult(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) {
PartitionId = event.GetPartitionSession()->GetPartitionId();
Messages.reserve(event.GetMessagesCount());
if (event.HasCompressedMessages()) {
for (const auto& msg : event.GetCompressedMessages()) {
Expand All @@ -200,6 +201,7 @@ struct TEvYdbProxy {

void Out(IOutputStream& out) const;

ui64 PartitionId;
TVector<TMessage> Messages;
};

Expand Down

0 comments on commit b2d3c2b

Please sign in to comment.