Skip to content

Commit

Permalink
Run & stop workers (ydb-platform#2860)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jun 10, 2024
1 parent a0c15cf commit a5c5c40
Show file tree
Hide file tree
Showing 23 changed files with 744 additions and 22 deletions.
48 changes: 46 additions & 2 deletions ydb/core/protos/replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,59 @@ message TEvDropReplicationResult {
SUCCESS = 1;
NOT_FOUND = 2;
}

optional TOperationId OperationId = 1;
optional uint64 Origin = 2;
optional EStatus Status = 3;
}

message TEvHandshake {
optional uint64 ControllerId = 1;
message TControllerIdentity {
optional uint64 TabletId = 1;
optional uint64 Generation = 2;
}

message TWorkerIdentity {
optional uint64 ReplicationId = 1;
optional uint64 TargetId = 2;
optional uint64 WorkerId = 3;
}

message TEvHandshake {
optional TControllerIdentity Controller = 1;
}

message TEvStatus {
repeated TWorkerIdentity Workers = 1;
}

message TRemoteTopicReaderSettings {
optional TConnectionParams ConnectionParams = 1;
optional string TopicPath = 2;
optional uint64 TopicPartitionId = 3;
optional string ConsumerName = 4;
}

message TLocalTableWriterSettings {
optional NKikimrProto.TPathID PathId = 1;
}

message TRunWorkerCommand {
oneof Reader {
TRemoteTopicReaderSettings RemoteTopicReader = 1;
}

oneof Writer {
TLocalTableWriterSettings LocalTableWriter = 2;
}
}

message TEvRunWorker {
optional TControllerIdentity Controller = 1;
optional TWorkerIdentity Worker = 2;
optional TRunWorkerCommand Command = 3;
}

message TEvStopWorker {
optional TControllerIdentity Controller = 1;
optional TWorkerIdentity Worker = 2;
}
34 changes: 34 additions & 0 deletions ydb/core/tx/replication/common/worker_id.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include "worker_id.h"

#include <ydb/core/protos/replication.pb.h>

namespace NKikimr::NReplication {

TWorkerId::TWorkerId(ui64 rid, ui64 tid, ui64 wid)
: std::tuple<ui64, ui64, ui64>(rid, tid, wid)
{
}

ui64 TWorkerId::ReplicationId() const {
return std::get<0>(*this);
}

ui64 TWorkerId::TargetId() const {
return std::get<1>(*this);
}

ui64 TWorkerId::WorkerId() const {
return std::get<2>(*this);
}

TWorkerId TWorkerId::Parse(const NKikimrReplication::TWorkerIdentity& proto) {
return TWorkerId(proto.GetReplicationId(), proto.GetTargetId(), proto.GetWorkerId());
}

void TWorkerId::Serialize(NKikimrReplication::TWorkerIdentity& proto) const {
proto.SetReplicationId(ReplicationId());
proto.SetTargetId(TargetId());
proto.SetWorkerId(WorkerId());
}

}
27 changes: 27 additions & 0 deletions ydb/core/tx/replication/common/worker_id.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include <util/generic/hash.h>

#include <tuple>

namespace NKikimrReplication {
class TWorkerIdentity;
}

namespace NKikimr::NReplication {

struct TWorkerId: std::tuple<ui64, ui64, ui64> {
explicit TWorkerId(ui64 rid, ui64 tid, ui64 wid);

ui64 ReplicationId() const;
ui64 TargetId() const;
ui64 WorkerId() const;

static TWorkerId Parse(const NKikimrReplication::TWorkerIdentity& proto);
void Serialize(NKikimrReplication::TWorkerIdentity& proto) const;
};

}

template <>
struct THash<NKikimr::NReplication::TWorkerId> : THash<std::tuple<ui64, ui64, ui64>> {};
13 changes: 13 additions & 0 deletions ydb/core/tx/replication/common/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
LIBRARY()

PEERDIR(
ydb/core/protos
)

SRCS(
worker_id.cpp
)

YQL_LAST_ABI_VERSION()

END()
164 changes: 159 additions & 5 deletions ydb/core/tx/replication/controller/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ STFUNC(TController::StateWork) {
HFunc(TEvPrivate::TEvDropDstResult, Handle);
HFunc(TEvPrivate::TEvResolveTenantResult, Handle);
HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle);
HFunc(TEvPrivate::TEvRunWorkers, Handle);
HFunc(TEvDiscovery::TEvDiscoveryData, Handle);
HFunc(TEvDiscovery::TEvError, Handle);
HFunc(TEvService::TEvStatus, Handle);
HFunc(TEvService::TEvRunWorker, Handle);
HFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
default:
HandleDefaultEvents(ev, SelfId());
Expand Down Expand Up @@ -213,7 +215,7 @@ void TController::CreateSession(ui32 nodeId, const TActorContext& ctx) {
<< ": nodeId# " << nodeId);

Y_ABORT_UNLESS(!Sessions.contains(nodeId));
Sessions.emplace(nodeId, TSessionInfo{});
Sessions.emplace(nodeId, TSessionInfo());

auto ev = MakeHolder<TEvService::TEvHandshake>(TabletID(), Executor()->Generation());
ui32 flags = 0;
Expand All @@ -228,18 +230,170 @@ void TController::DeleteSession(ui32 nodeId, const TActorContext& ctx) {
CLOG_D(ctx, "Delete session"
<< ": nodeId# " << nodeId);

auto it = Sessions.find(nodeId);
Y_ABORT_UNLESS(it != Sessions.end());
Sessions.erase(it);
Y_ABORT_UNLESS(Sessions.contains(nodeId));
auto& session = Sessions[nodeId];

for (const auto& id : session.GetWorkers()) {
auto it = Workers.find(id);
if (it == Workers.end()) {
continue;
}

auto& worker = it->second;
worker.ClearSession();

if (worker.HasCommand()) {
WorkersToRun.insert(id);
}
}

Sessions.erase(nodeId);
if (SelfId().NodeId() != nodeId) {
Send(ctx.InterconnectProxy(nodeId), new TEvents::TEvUnsubscribe());
}

ScheduleRunWorkers();
}

void TController::Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
// TODO

const auto nodeId = ev->Sender.NodeId();
if (!Sessions.contains(nodeId)) {
return;
}

auto& session = Sessions[nodeId];
session.SetReady();

for (const auto& workerIdentity : ev->Get()->Record.GetWorkers()) {
const auto id = TWorkerId::Parse(workerIdentity);

auto it = Workers.find(id);
if (it == Workers.end()) {
it = Workers.emplace(id, TWorkerInfo()).first;
}

auto& worker = it->second;
if (worker.HasSession() && Sessions.contains(worker.GetSession())) {
StopWorker(worker.GetSession(), id);
}

session.AttachWorker(id);
worker.AttachSession(nodeId);
}
}

void TController::StopWorker(ui32 nodeId, const TWorkerId& id) {
Y_ABORT_UNLESS(Sessions.contains(nodeId));
auto& session = Sessions[nodeId];

auto ev = MakeHolder<TEvService::TEvStopWorker>();
auto& record = ev->Record;

auto& controller = *record.MutableController();
controller.SetTabletId(TabletID());
controller.SetGeneration(Executor()->Generation());
id.Serialize(*record.MutableWorker());

Send(MakeReplicationServiceId(nodeId), std::move(ev));
session.DetachWorker(id);
}

void TController::Handle(TEvService::TEvRunWorker::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());

auto& record = ev->Get()->Record;
const auto id = TWorkerId::Parse(record.GetWorker());
auto* cmd = record.MutableCommand();

auto it = Workers.find(id);
if (it == Workers.end()) {
it = Workers.emplace(id, TWorkerInfo(cmd)).first;
}

auto& worker = it->second;
if (!worker.HasCommand()) {
worker.SetCommand(cmd);
}

if (!worker.HasSession()) {
WorkersToRun.insert(id);
}

ScheduleRunWorkers();
}

void TController::ScheduleRunWorkers() {
if (RunWorkersScheduled || !WorkersToRun) {
return;
}

Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvRunWorkers());
RunWorkersScheduled = true;
}

void TController::Handle(TEvPrivate::TEvRunWorkers::TPtr&, const TActorContext&) {
static constexpr ui32 limit = 100;
ui32 i = 0;

for (auto iter = WorkersToRun.begin(); iter != WorkersToRun.end() && i < limit;) {
const auto id = *iter;

auto it = Workers.find(id);
Y_ABORT_UNLESS(it != Workers.end());

auto& worker = it->second;
if (worker.HasSession()) {
WorkersToRun.erase(iter++);
continue;
}

auto replication = Find(id.ReplicationId());
if (!replication) {
WorkersToRun.erase(iter++);
continue;
}

const auto& tenant = replication->GetTenant();
if (!tenant || !NodesManager.HasTenant(tenant)) {
++iter;
continue;
}

const auto nodeId = NodesManager.GetRandomNode(tenant);
if (!Sessions.contains(nodeId) || !Sessions[nodeId].IsReady()) {
++iter;
continue;
}

Y_ABORT_UNLESS(worker.HasCommand());
RunWorker(nodeId, id, *worker.GetCommand());
worker.AttachSession(nodeId);

WorkersToRun.erase(iter++);
++i;
}

RunWorkersScheduled = false;
ScheduleRunWorkers();
}

void TController::RunWorker(ui32 nodeId, const TWorkerId& id, const NKikimrReplication::TRunWorkerCommand& cmd) {
Y_ABORT_UNLESS(Sessions.contains(nodeId));
auto& session = Sessions[nodeId];

auto ev = MakeHolder<TEvService::TEvRunWorker>();
auto& record = ev->Record;

auto& controller = *record.MutableController();
controller.SetTabletId(TabletID());
controller.SetGeneration(Executor()->Generation());
id.Serialize(*record.MutableWorker());
record.MutableCommand()->CopyFrom(cmd);

Send(MakeReplicationServiceId(nodeId), std::move(ev));
session.AttachWorker(id);
}

void TController::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx) {
Expand Down
12 changes: 11 additions & 1 deletion ydb/core/tx/replication/controller/controller_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <ydb/library/yverify_stream/yverify_stream.h>

#include <util/generic/hash.h>
#include <util/generic/hash_set.h>

namespace NKikimr::NReplication::NController {

Expand Down Expand Up @@ -75,13 +76,18 @@ class TController
void Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvResolveTenantResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvRunWorkers::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx);
void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx);
void Handle(TEvService::TEvRunWorker::TPtr& ev, const TActorContext& ctx);
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx);

void CreateSession(ui32 nodeId, const TActorContext& ctx);
void DeleteSession(ui32 nodeId, const TActorContext& ctx);
void ScheduleRunWorkers();
void RunWorker(ui32 nodeId, const TWorkerId& id, const NKikimrReplication::TRunWorkerCommand& cmd);
void StopWorker(ui32 nodeId, const TWorkerId& id);

// local transactions
class TTxInitSchema;
Expand Down Expand Up @@ -137,7 +143,11 @@ class TController

TActorId DiscoveryCache;
TNodesManager NodesManager;
THashMap<ui32, TSessionInfo> Sessions; // node id to session info
THashMap<ui32, TSessionInfo> Sessions;
THashMap<TWorkerId, TWorkerInfo> Workers;
THashSet<TWorkerId> WorkersToRun;

bool RunWorkersScheduled = false;

}; // TController

Expand Down
9 changes: 9 additions & 0 deletions ydb/core/tx/replication/controller/nodes_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

#include <ydb/core/tx/replication/service/service.h>

#include <util/generic/vector.h>
#include <util/random/random.h>

namespace NKikimr::NReplication::NController {

bool TNodesManager::HasTenant(const TString& tenant) const {
Expand All @@ -14,6 +17,12 @@ const THashSet<ui32>& TNodesManager::GetNodes(const TString& tenant) const {
return TenantNodes.at(tenant);
}

ui32 TNodesManager::GetRandomNode(const TString& tenant) const {
const auto& nodes = GetNodes(tenant);
TVector<ui32> nodesVec(nodes.begin(), nodes.end());
return nodesVec[RandomNumber(nodesVec.size())];
}

void TNodesManager::DiscoverNodes(const TString& tenant, const TActorId& cache, const TActorContext& ctx) {
TenantNodes.emplace(tenant, THashSet<ui32>());
NodeDiscoverers.emplace(
Expand Down
Loading

0 comments on commit a5c5c40

Please sign in to comment.