diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 0cf9296b0459..8e880bfe1cb3 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -171,10 +171,11 @@ struct TKikimrEvents : TEvents { ES_TABLE_CREATOR, ES_PQ_PARTITION_CHOOSER, ES_GRAPH, - ES_REPLICATION_SERVICE, + ES_REPLICATION_WORKER, ES_CHANGE_EXCHANGE, ES_S3_FILE_QUEUE, ES_NEBIUS_ACCESS_SERVICE, + ES_REPLICATION_SERVICE, }; }; diff --git a/ydb/core/protos/replication.proto b/ydb/core/protos/replication.proto index 8d8b254d01c0..273cf25e6e8c 100644 --- a/ydb/core/protos/replication.proto +++ b/ydb/core/protos/replication.proto @@ -81,3 +81,11 @@ message TEvDropReplicationResult { optional uint64 Origin = 2; optional EStatus Status = 3; } + +message TEvHandshake { + optional uint64 ControllerId = 1; + optional uint64 Generation = 2; +} + +message TEvStatus { +} diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index 66ee2d0fa1a9..dbe0d985b3f9 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -55,6 +55,8 @@ STFUNC(TController::StateWork) { HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle); HFunc(TEvDiscovery::TEvDiscoveryData, Handle); HFunc(TEvDiscovery::TEvError, Handle); + HFunc(TEvService::TEvStatus, Handle); + HFunc(TEvInterconnect::TEvNodeDisconnected, Handle); default: HandleDefaultEvents(ev, SelfId()); } @@ -184,10 +186,21 @@ void TController::Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActo void TController::Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) { Y_ABORT_UNLESS(ev->Get()->CachedMessageData); - CLOG_T(ctx, "Handle " << ev->Get()->ToString()); - NodesManager.ProcessResponse(ev, ctx); + auto result = NodesManager.ProcessResponse(ev, ctx); + + for (auto nodeId : result.NewNodes) { + if (!Sessions.contains(nodeId)) { + CreateSession(nodeId, ctx); + } + } + + for (auto nodeId : result.RemovedNodes) { + if (Sessions.contains(nodeId)) { + DeleteSession(nodeId, ctx); + } + } } void TController::Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx) { @@ -195,6 +208,51 @@ void TController::Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& NodesManager.ProcessResponse(ev, ctx); } +void TController::CreateSession(ui32 nodeId, const TActorContext& ctx) { + CLOG_D(ctx, "Create session" + << ": nodeId# " << nodeId); + + Y_ABORT_UNLESS(!Sessions.contains(nodeId)); + Sessions.emplace(nodeId, TSessionInfo{}); + + auto ev = MakeHolder(TabletID(), Executor()->Generation()); + ui32 flags = 0; + if (SelfId().NodeId() != nodeId) { + flags = IEventHandle::FlagSubscribeOnSession; + } + + Send(MakeReplicationServiceId(nodeId), std::move(ev), flags); +} + +void TController::DeleteSession(ui32 nodeId, const TActorContext& ctx) { + CLOG_D(ctx, "Delete session" + << ": nodeId# " << nodeId); + + auto it = Sessions.find(nodeId); + Y_ABORT_UNLESS(it != Sessions.end()); + Sessions.erase(it); + + if (SelfId().NodeId() != nodeId) { + Send(ctx.InterconnectProxy(nodeId), new TEvents::TEvUnsubscribe()); + } +} + +void TController::Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + // TODO +} + +void TController::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx) { + const ui32 nodeId = ev->Get()->NodeId; + + CLOG_I(ctx, "Node disconnected" + << ": nodeId# " << nodeId); + + if (Sessions.contains(nodeId)) { + DeleteSession(nodeId, ctx); + } +} + TReplication::TPtr TController::Find(ui64 id) { auto it = Replications.find(id); if (it == Replications.end()) { diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index 74cb25a3ff10..bff9f46e8dc0 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -6,12 +6,15 @@ #include "public_events.h" #include "replication.h" #include "schema.h" +#include "session_info.h" #include "sys_params.h" #include #include #include #include +#include +#include #include #include @@ -74,6 +77,11 @@ class TController void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx); + void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx); + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx); + + void CreateSession(ui32 nodeId, const TActorContext& ctx); + void DeleteSession(ui32 nodeId, const TActorContext& ctx); // local transactions class TTxInitSchema; @@ -127,9 +135,9 @@ class TController THashMap Replications; THashMap ReplicationsByPathId; - // discovery TActorId DiscoveryCache; TNodesManager NodesManager; + THashMap Sessions; // node id to session info }; // TController diff --git a/ydb/core/tx/replication/controller/nodes_manager.cpp b/ydb/core/tx/replication/controller/nodes_manager.cpp index 290fb5c7992f..2244ccdef003 100644 --- a/ydb/core/tx/replication/controller/nodes_manager.cpp +++ b/ydb/core/tx/replication/controller/nodes_manager.cpp @@ -21,25 +21,39 @@ void TNodesManager::DiscoverNodes(const TString& tenant, const TActorId& cache, ); } -void TNodesManager::ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) { +TNodesManager::TProcessResult TNodesManager::ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) { Y_ABORT_UNLESS(ev->Get()->CachedMessageData); Y_ABORT_UNLESS(!ev->Get()->CachedMessageData->InfoEntries.empty()); Y_ABORT_UNLESS(ev->Get()->CachedMessageData->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok); + TProcessResult result; + auto it = NodeDiscoverers.find(ev->Sender); if (it == NodeDiscoverers.end()) { - return; + return result; } - auto& nodes = TenantNodes[it->second]; - nodes.clear(); + THashSet newNodes; + auto& curNodes = TenantNodes[it->second]; for (const auto& [actorId, _] : ev->Get()->CachedMessageData->InfoEntries) { - nodes.insert(actorId.NodeId()); + const ui32 nodeId = actorId.NodeId(); + newNodes.insert(nodeId); + auto it = curNodes.find(nodeId); + if (it != curNodes.end()) { + curNodes.erase(it); + } else { + result.NewNodes.insert(nodeId); + } } + result.RemovedNodes = std::move(curNodes); + curNodes = std::move(newNodes); + ctx.Schedule(UpdateInternal, new TEvPrivate::TEvUpdateTenantNodes(it->second)); NodeDiscoverers.erase(it); + + return result; } void TNodesManager::ProcessResponse(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/tx/replication/controller/nodes_manager.h b/ydb/core/tx/replication/controller/nodes_manager.h index 36a4598300c8..4984f3212091 100644 --- a/ydb/core/tx/replication/controller/nodes_manager.h +++ b/ydb/core/tx/replication/controller/nodes_manager.h @@ -12,12 +12,18 @@ class TNodesManager { static constexpr TDuration UpdateInternal = TDuration::Minutes(5); static constexpr TDuration RetryInternal = TDuration::Seconds(10); +public: + struct TProcessResult { + THashSet NewNodes; + THashSet RemovedNodes; + }; + public: bool HasTenant(const TString& tenant) const; const THashSet& GetNodes(const TString& tenant) const; void DiscoverNodes(const TString& tenant, const TActorId& cache, const TActorContext& ctx); - void ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx); + TProcessResult ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx); void ProcessResponse(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx); void Shutdown(const TActorContext& ctx); diff --git a/ydb/core/tx/replication/controller/session_info.h b/ydb/core/tx/replication/controller/session_info.h new file mode 100644 index 000000000000..c0faaf9ea0af --- /dev/null +++ b/ydb/core/tx/replication/controller/session_info.h @@ -0,0 +1,9 @@ +#pragma once + +namespace NKikimr::NReplication::NController { + +struct TSessionInfo { + // TODO +}; + +} diff --git a/ydb/core/tx/replication/service/service.cpp b/ydb/core/tx/replication/service/service.cpp index 35840dafb9c8..e28eea51cc1d 100644 --- a/ydb/core/tx/replication/service/service.cpp +++ b/ydb/core/tx/replication/service/service.cpp @@ -12,6 +12,34 @@ namespace NKikimr::NReplication { namespace NService { +class TSessionInfo { +public: + explicit TSessionInfo(const TActorId& actorId) + : ActorId(actorId) + , Generation(0) + { + } + + operator TActorId() const { + return ActorId; + } + + ui64 GetGeneration() const { + return Generation; + } + + void Update(const TActorId& actorId, ui64 generation) { + Y_ABORT_UNLESS(Generation <= generation); + ActorId = actorId; + Generation = generation; + } + +private: + TActorId ActorId; + ui64 Generation; + +}; + class TReplicationService: public TActorBootstrapped { void RunBoardPublisher() { const auto& tenant = AppData()->TenantName; @@ -25,6 +53,25 @@ class TReplicationService: public TActorBootstrapped { BoardPublisher = Register(CreateBoardPublishActor(boardPath, TString(), SelfId(), 0, true)); } + void Handle(TEvService::TEvHandshake::TPtr& ev) { + const auto& record = ev->Get()->Record; + + auto it = Sessions.find(record.GetControllerId()); + if (it == Sessions.end()) { + it = Sessions.emplace(record.GetControllerId(), ev->Sender).first; + } + + auto& session = it->second; + + if (session.GetGeneration() > record.GetGeneration()) { + // ignore stale controller + return; + } + + session.Update(ev->Sender, record.GetGeneration()); + Send(session, new TEvService::TEvStatus()); // TODO + } + void PassAway() override { if (auto actorId = std::exchange(BoardPublisher, {})) { Send(actorId, new TEvents::TEvPoison()); @@ -45,12 +92,14 @@ class TReplicationService: public TActorBootstrapped { STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { + hFunc(TEvService::TEvHandshake, Handle); sFunc(TEvents::TEvPoison, PassAway); } } private: TActorId BoardPublisher; + THashMap Sessions; }; // TReplicationService diff --git a/ydb/core/tx/replication/service/service.h b/ydb/core/tx/replication/service/service.h index fb113e2d61cb..e487f7fe26e5 100644 --- a/ydb/core/tx/replication/service/service.h +++ b/ydb/core/tx/replication/service/service.h @@ -1,9 +1,37 @@ #pragma once #include +#include +#include namespace NKikimr::NReplication { +struct TEvService { + enum EEv { + EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_SERVICE), + + EvHandshake, + EvStatus, + + EvEnd, + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_SERVICE)); + + struct TEvHandshake: public TEventPB { + TEvHandshake() = default; + + explicit TEvHandshake(ui64 tabletId, ui64 generation) { + Record.SetControllerId(tabletId); + Record.SetGeneration(generation); + } + }; + + struct TEvStatus: public TEventPB { + TEvStatus() = default; + }; +}; + namespace NService { inline TString MakeDiscoveryPath(const TString& tenant) { diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h index 54e6d351a773..523214190ac1 100644 --- a/ydb/core/tx/replication/service/worker.h +++ b/ydb/core/tx/replication/service/worker.h @@ -11,7 +11,7 @@ namespace NKikimr::NReplication::NService { struct TEvWorker { enum EEv { - EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_SERVICE), + EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_WORKER), EvHandshake, EvPoll, @@ -21,7 +21,7 @@ struct TEvWorker { EvEnd, }; - static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_SERVICE)); + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_WORKER)); struct TEvHandshake: public TEventLocal {}; struct TEvPoll: public TEventLocal {};