Skip to content

Commit

Permalink
Re-request tx ids during handshake (#13059)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Dec 27, 2024
1 parent d996c04 commit d61c12b
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions ydb/core/tx/replication/service/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,30 @@ class TSessionInfo {
return Generation;
}

void Update(const TActorId& actorId, ui64 generation) {
void Handle(IActorOps* ops, TEvService::TEvHandshake::TPtr& ev) {
const ui64 generation = ev->Get()->Record.GetController().GetGeneration();
Y_ABORT_UNLESS(Generation <= generation);
ActorId = actorId;

ActorId = ev->Sender;
Generation = generation;

auto status = MakeHolder<TEvService::TEvStatus>();
auto& record = status->Record;

for (const auto& [id, _] : Workers) {
id.Serialize(*record.AddWorkers());
}

ops->Send(ActorId, status.Release());

TVector<TRowVersion> versionsWithoutTxId;
for (const auto& [version, _] : PendingTxId) {
versionsWithoutTxId.push_back(version);
}

if (versionsWithoutTxId) {
ops->Send(ActorId, new TEvService::TEvGetTxId(versionsWithoutTxId));
}
}

bool HasWorker(const TWorkerId& id) const {
Expand Down Expand Up @@ -116,17 +136,6 @@ class TSessionInfo {
ops->Send(ActorId, new TEvService::TEvWorkerStatus(id, std::forward<Args>(args)...));
}

void SendStatus(IActorOps* ops) const {
auto ev = MakeHolder<TEvService::TEvStatus>();
auto& record = ev->Record;

for (const auto& [id, _] : Workers) {
id.Serialize(*record.AddWorkers());
}

ops->Send(ActorId, ev.Release());
}

void SendWorkerDataEnd(IActorOps* ops, const TWorkerId& id, ui64 partitionId,
const TVector<ui64>&& adjacentPartitionsIds, const TVector<ui64>&& childPartitionsIds)
{
Expand Down Expand Up @@ -350,8 +359,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
return;
}

session.Update(ev->Sender, controller.GetGeneration());
session.SendStatus(this);
session.Handle(this, ev);
}

template <typename... Args>
Expand Down

0 comments on commit d61c12b

Please sign in to comment.