Skip to content

Commit

Permalink
Stop replication workers (ydb-platform#5126)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jun 10, 2024
1 parent dbe5fd3 commit df0ce0f
Show file tree
Hide file tree
Showing 13 changed files with 335 additions and 76 deletions.
11 changes: 11 additions & 0 deletions ydb/core/protos/replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,14 @@ message TEvStopWorker {
optional TControllerIdentity Controller = 1;
optional TWorkerIdentity Worker = 2;
}

message TEvWorkerStatus {
enum EStatus {
UNKNOWN = 0;
RUNNING = 1;
STOPPED = 2;
}

optional TWorkerIdentity Worker = 1;
optional EStatus Status = 2;
}
276 changes: 215 additions & 61 deletions ydb/core/tx/replication/controller/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ STFUNC(TController::StateWork) {
HFunc(TEvPrivate::TEvResolveSecretResult, Handle);
HFunc(TEvPrivate::TEvResolveTenantResult, Handle);
HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle);
HFunc(TEvPrivate::TEvRunWorkers, Handle);
HFunc(TEvPrivate::TEvProcessQueues, Handle);
HFunc(TEvPrivate::TEvRemoveWorker, Handle);
HFunc(TEvDiscovery::TEvDiscoveryData, Handle);
HFunc(TEvDiscovery::TEvError, Handle);
HFunc(TEvService::TEvStatus, Handle);
HFunc(TEvService::TEvWorkerStatus, Handle);
HFunc(TEvService::TEvRunWorker, Handle);
HFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
default:
Expand Down Expand Up @@ -271,13 +273,13 @@ void TController::DeleteSession(ui32 nodeId, const TActorContext& ctx) {
worker.ClearSession();

if (worker.HasCommand()) {
WorkersToRun.insert(id);
BootQueue.insert(id);
}
}

Sessions.erase(nodeId);
CloseSession(nodeId, ctx);
ScheduleRunWorkers();
ScheduleProcessQueues();
}

void TController::CloseSession(ui32 nodeId, const TActorContext& ctx) {
Expand All @@ -300,42 +302,57 @@ void TController::Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& c
auto& session = Sessions[nodeId];
session.SetReady();

for (const auto& workerIdentity : ev->Get()->Record.GetWorkers()) {
const auto id = TWorkerId::Parse(workerIdentity);

auto it = Workers.find(id);
if (it == Workers.end()) {
it = Workers.emplace(id, TWorkerInfo()).first;
for (const auto& protoId : ev->Get()->Record.GetWorkers()) {
const auto id = TWorkerId::Parse(protoId);
if (!IsValidWorker(id)) {
StopQueue.emplace(id, nodeId);
continue;
}

auto& worker = it->second;
if (worker.HasSession() && Sessions.contains(worker.GetSession())) {
StopWorker(worker.GetSession(), id);
auto* worker = GetOrCreateWorker(id);
if (worker->HasSession()) {
if (const auto sessionId = worker->GetSession(); sessionId != nodeId) {
Y_ABORT_UNLESS(Sessions.contains(sessionId));
Sessions[sessionId].DetachWorker(id);
StopQueue.emplace(id, sessionId);
}
}

session.AttachWorker(id);
worker.AttachSession(nodeId);
worker->AttachSession(nodeId);
}

ScheduleProcessQueues();
}

void TController::StopWorker(ui32 nodeId, const TWorkerId& id) {
LOG_D("Stop worker"
<< ": nodeId# " << nodeId
<< ", workerId# " << id);
void TController::Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());

Y_ABORT_UNLESS(Sessions.contains(nodeId));
auto& session = Sessions[nodeId];
const auto nodeId = ev->Sender.NodeId();
if (!Sessions.contains(nodeId)) {
return;
}

auto ev = MakeHolder<TEvService::TEvStopWorker>();
auto& record = ev->Record;
const auto& session = Sessions[nodeId];
const auto& record = ev->Get()->Record;
const auto id = TWorkerId::Parse(record.GetWorker());

auto& controller = *record.MutableController();
controller.SetTabletId(TabletID());
controller.SetGeneration(Executor()->Generation());
id.Serialize(*record.MutableWorker());
switch (record.GetStatus()) {
case NKikimrReplication::TEvWorkerStatus::RUNNING:
if (!session.HasWorker(id)) {
StopQueue.emplace(id, nodeId);
}
break;
case NKikimrReplication::TEvWorkerStatus::STOPPED:
MaybeRemoveWorker(id, ctx);
break;
default:
CLOG_W(ctx, "Unknown worker status"
<< ": value# " << static_cast<int>(record.GetStatus()));
break;
}

Send(MakeReplicationServiceId(nodeId), std::move(ev));
session.DetachWorker(id);
ScheduleProcessQueues();
}

void TController::Handle(TEvService::TEvRunWorker::TPtr& ev, const TActorContext& ctx) {
Expand All @@ -345,56 +362,108 @@ void TController::Handle(TEvService::TEvRunWorker::TPtr& ev, const TActorContext
const auto id = TWorkerId::Parse(record.GetWorker());
auto* cmd = record.MutableCommand();

auto it = Workers.find(id);
if (it == Workers.end()) {
it = Workers.emplace(id, TWorkerInfo(cmd)).first;
if (!IsValidWorker(id)) {
return;
}

auto& worker = it->second;
if (!worker.HasCommand()) {
worker.SetCommand(cmd);
auto* worker = GetOrCreateWorker(id, cmd);
if (!worker->HasCommand()) {
worker->SetCommand(cmd);
}

if (!worker.HasSession()) {
WorkersToRun.insert(id);
if (!worker->HasSession()) {
BootQueue.insert(id);
}

ScheduleProcessQueues();
}

bool TController::IsValidWorker(const TWorkerId& id) const {
auto replication = Find(id.ReplicationId());
if (!replication) {
return false;
}

if (replication->GetState() != TReplication::EState::Ready) {
return false;
}

auto* target = replication->FindTarget(id.TargetId());
if (!target) {
return false;
}

if (target->GetDstState() != TReplication::EDstState::Ready) {
return false;
}

if (target->GetStreamState() != TReplication::EStreamState::Ready) {
return false;
}

ScheduleRunWorkers();
return true;
}

void TController::ScheduleRunWorkers() {
if (RunWorkersScheduled || !WorkersToRun) {
TWorkerInfo* TController::GetOrCreateWorker(const TWorkerId& id, NKikimrReplication::TRunWorkerCommand* cmd) {
auto it = Workers.find(id);
if (it == Workers.end()) {
it = Workers.emplace(id, cmd).first;
}

auto replication = Find(id.ReplicationId());
Y_ABORT_UNLESS(replication);

auto* target = replication->FindTarget(id.TargetId());
Y_ABORT_UNLESS(target);

target->AddWorker(id.WorkerId());
return &it->second;
}

void TController::ScheduleProcessQueues() {
if (ProcessQueuesScheduled || (!BootQueue && !StopQueue)) {
return;
}

Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvRunWorkers());
RunWorkersScheduled = true;
Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvProcessQueues());
ProcessQueuesScheduled = true;
}

void TController::Handle(TEvPrivate::TEvRunWorkers::TPtr&, const TActorContext& ctx) {
CLOG_D(ctx, "Run workers"
<< ": queue# " << WorkersToRun.size());
void TController::Handle(TEvPrivate::TEvProcessQueues::TPtr&, const TActorContext& ctx) {
CLOG_D(ctx, "Process queues"
<< ": boot# " << BootQueue.size()
<< ": stop# " << StopQueue.size());

static constexpr ui32 limit = 100;
ui32 i = 0;
ProcessBootQueue(ctx);
ProcessStopQueue(ctx);

ProcessQueuesScheduled = false;
ScheduleProcessQueues();
}

for (auto iter = WorkersToRun.begin(); iter != WorkersToRun.end() && i < limit;) {
void TController::ProcessBootQueue(const TActorContext&) {
ui32 i = 0;
for (auto iter = BootQueue.begin(); iter != BootQueue.end() && i < ProcessBatchLimit;) {
const auto id = *iter;
if (!IsValidWorker(id)) {
BootQueue.erase(iter++);
continue;
}

auto it = Workers.find(id);
Y_ABORT_UNLESS(it != Workers.end());
if (it == Workers.end()) {
BootQueue.erase(iter++);
continue;
}

auto& worker = it->second;
if (worker.HasSession()) {
WorkersToRun.erase(iter++);
BootQueue.erase(iter++);
continue;
}

auto replication = Find(id.ReplicationId());
if (!replication) {
WorkersToRun.erase(iter++);
continue;
}
Y_ABORT_UNLESS(replication);

const auto& tenant = replication->GetTenant();
if (!tenant || !NodesManager.HasTenant(tenant) || !NodesManager.HasNodes(tenant)) {
Expand All @@ -409,19 +478,16 @@ void TController::Handle(TEvPrivate::TEvRunWorkers::TPtr&, const TActorContext&
}

Y_ABORT_UNLESS(worker.HasCommand());
RunWorker(nodeId, id, *worker.GetCommand());
BootWorker(nodeId, id, *worker.GetCommand());
worker.AttachSession(nodeId);

WorkersToRun.erase(iter++);
BootQueue.erase(iter++);
++i;
}

RunWorkersScheduled = false;
ScheduleRunWorkers();
}

void TController::RunWorker(ui32 nodeId, const TWorkerId& id, const NKikimrReplication::TRunWorkerCommand& cmd) {
LOG_D("Run worker"
void TController::BootWorker(ui32 nodeId, const TWorkerId& id, const NKikimrReplication::TRunWorkerCommand& cmd) {
LOG_D("Boot worker"
<< ": nodeId# " << nodeId
<< ", workerId# " << id);

Expand All @@ -441,6 +507,94 @@ void TController::RunWorker(ui32 nodeId, const TWorkerId& id, const NKikimrRepli
session.AttachWorker(id);
}

void TController::ProcessStopQueue(const TActorContext& ctx) {
ui32 i = 0;
for (auto iter = StopQueue.begin(); iter != StopQueue.end() && i < ProcessBatchLimit;) {
const auto& id = iter->first;
auto sessionId = iter->second;

if (!Sessions.contains(sessionId) || !Sessions[sessionId].IsReady()) {
MaybeRemoveWorker(id, ctx);
StopQueue.erase(iter++);
continue;
}

StopWorker(sessionId, id);

StopQueue.erase(iter++);
++i;
}
}

void TController::StopWorker(ui32 nodeId, const TWorkerId& id) {
LOG_D("Stop worker"
<< ": nodeId# " << nodeId
<< ", workerId# " << id);

Y_ABORT_UNLESS(Sessions.contains(nodeId));
auto& session = Sessions[nodeId];

auto ev = MakeHolder<TEvService::TEvStopWorker>();
auto& record = ev->Record;

auto& controller = *record.MutableController();
controller.SetTabletId(TabletID());
controller.SetGeneration(Executor()->Generation());
id.Serialize(*record.MutableWorker());

Send(MakeReplicationServiceId(nodeId), std::move(ev));
session.DetachWorker(id);
}

void TController::Handle(TEvPrivate::TEvRemoveWorker::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());

const auto& id = ev->Get()->Id;
RemoveQueue.insert(id);

auto it = Workers.find(id);
if (it == Workers.end()) {
return RemoveWorker(id, ctx);
}

auto& worker = it->second;
if (!worker.HasSession()) {
return RemoveWorker(id, ctx);
}

StopQueue.emplace(id, worker.GetSession());
ScheduleProcessQueues();
}

void TController::RemoveWorker(const TWorkerId& id, const TActorContext& ctx) {
LOG_D("Remove worker"
<< ", workerId# " << id);

Y_ABORT_UNLESS(RemoveQueue.contains(id));

RemoveQueue.erase(id);
Workers.erase(id);

auto replication = Find(id.ReplicationId());
if (!replication) {
return;
}

auto* target = replication->FindTarget(id.TargetId());
if (!target) {
return;
}

target->RemoveWorker(id.WorkerId());
target->Progress(ctx);
}

void TController::MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx) {
if (RemoveQueue.contains(id)) {
RemoveWorker(id, ctx);
}
}

void TController::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx) {
const ui32 nodeId = ev->Get()->NodeId;

Expand All @@ -452,7 +606,7 @@ void TController::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const T
}
}

TReplication::TPtr TController::Find(ui64 id) {
TReplication::TPtr TController::Find(ui64 id) const {
auto it = Replications.find(id);
if (it == Replications.end()) {
return nullptr;
Expand All @@ -461,7 +615,7 @@ TReplication::TPtr TController::Find(ui64 id) {
return it->second;
}

TReplication::TPtr TController::Find(const TPathId& pathId) {
TReplication::TPtr TController::Find(const TPathId& pathId) const {
auto it = ReplicationsByPathId.find(pathId);
if (it == ReplicationsByPathId.end()) {
return nullptr;
Expand Down
Loading

0 comments on commit df0ce0f

Please sign in to comment.