Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce sessions between controller & service #2744

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,11 @@ struct TKikimrEvents : TEvents {
ES_TABLE_CREATOR,
ES_PQ_PARTITION_CHOOSER,
ES_GRAPH,
ES_REPLICATION_SERVICE,
ES_REPLICATION_WORKER,
ES_CHANGE_EXCHANGE,
ES_S3_FILE_QUEUE,
ES_NEBIUS_ACCESS_SERVICE,
ES_REPLICATION_SERVICE,
};
};

Expand Down
8 changes: 8 additions & 0 deletions ydb/core/protos/replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,11 @@ message TEvDropReplicationResult {
optional uint64 Origin = 2;
optional EStatus Status = 3;
}

message TEvHandshake {
optional uint64 ControllerId = 1;
optional uint64 Generation = 2;
}

message TEvStatus {
}
62 changes: 60 additions & 2 deletions ydb/core/tx/replication/controller/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ STFUNC(TController::StateWork) {
HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle);
HFunc(TEvDiscovery::TEvDiscoveryData, Handle);
HFunc(TEvDiscovery::TEvError, Handle);
HFunc(TEvService::TEvStatus, Handle);
HFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
default:
HandleDefaultEvents(ev, SelfId());
}
Expand Down Expand Up @@ -184,17 +186,73 @@ void TController::Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActo

void TController::Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) {
Y_ABORT_UNLESS(ev->Get()->CachedMessageData);

CLOG_T(ctx, "Handle " << ev->Get()->ToString());

NodesManager.ProcessResponse(ev, ctx);
auto result = NodesManager.ProcessResponse(ev, ctx);

for (auto nodeId : result.NewNodes) {
if (!Sessions.contains(nodeId)) {
CreateSession(nodeId, ctx);
}
}

for (auto nodeId : result.RemovedNodes) {
if (Sessions.contains(nodeId)) {
DeleteSession(nodeId, ctx);
}
}
}

void TController::Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
NodesManager.ProcessResponse(ev, ctx);
}

void TController::CreateSession(ui32 nodeId, const TActorContext& ctx) {
CLOG_D(ctx, "Create session"
<< ": nodeId# " << nodeId);

Y_ABORT_UNLESS(!Sessions.contains(nodeId));
Sessions.emplace(nodeId, TSessionInfo{});

auto ev = MakeHolder<TEvService::TEvHandshake>(TabletID(), Executor()->Generation());
ui32 flags = 0;
if (SelfId().NodeId() != nodeId) {
flags = IEventHandle::FlagSubscribeOnSession;
}

Send(MakeReplicationServiceId(nodeId), std::move(ev), flags);
}

void TController::DeleteSession(ui32 nodeId, const TActorContext& ctx) {
CLOG_D(ctx, "Delete session"
<< ": nodeId# " << nodeId);

auto it = Sessions.find(nodeId);
Y_ABORT_UNLESS(it != Sessions.end());
Sessions.erase(it);

if (SelfId().NodeId() != nodeId) {
Send(ctx.InterconnectProxy(nodeId), new TEvents::TEvUnsubscribe());
}
}

void TController::Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
// TODO
}

void TController::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx) {
const ui32 nodeId = ev->Get()->NodeId;

CLOG_I(ctx, "Node disconnected"
<< ": nodeId# " << nodeId);

if (Sessions.contains(nodeId)) {
DeleteSession(nodeId, ctx);
}
}

TReplication::TPtr TController::Find(ui64 id) {
auto it = Replications.find(id);
if (it == Replications.end()) {
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/tx/replication/controller/controller_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
#include "public_events.h"
#include "replication.h"
#include "schema.h"
#include "session_info.h"
#include "sys_params.h"

#include <ydb/core/base/blobstorage.h>
#include <ydb/core/base/defs.h>
#include <ydb/core/protos/counters_replication.pb.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
#include <ydb/core/tx/replication/service/service.h>
#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/yverify_stream/yverify_stream.h>

#include <util/generic/hash.h>
Expand Down Expand Up @@ -74,6 +77,11 @@ class TController
void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx);
void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx);
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx);

void CreateSession(ui32 nodeId, const TActorContext& ctx);
void DeleteSession(ui32 nodeId, const TActorContext& ctx);

// local transactions
class TTxInitSchema;
Expand Down Expand Up @@ -127,9 +135,9 @@ class TController
THashMap<ui64, TReplication::TPtr> Replications;
THashMap<TPathId, TReplication::TPtr> ReplicationsByPathId;

// discovery
TActorId DiscoveryCache;
TNodesManager NodesManager;
THashMap<ui32, TSessionInfo> Sessions; // node id to session info

}; // TController

Expand Down
24 changes: 19 additions & 5 deletions ydb/core/tx/replication/controller/nodes_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,39 @@ void TNodesManager::DiscoverNodes(const TString& tenant, const TActorId& cache,
);
}

void TNodesManager::ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) {
TNodesManager::TProcessResult TNodesManager::ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) {
Y_ABORT_UNLESS(ev->Get()->CachedMessageData);
Y_ABORT_UNLESS(!ev->Get()->CachedMessageData->InfoEntries.empty());
Y_ABORT_UNLESS(ev->Get()->CachedMessageData->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok);

TProcessResult result;

auto it = NodeDiscoverers.find(ev->Sender);
if (it == NodeDiscoverers.end()) {
return;
return result;
}

auto& nodes = TenantNodes[it->second];
nodes.clear();
THashSet<ui32> newNodes;
auto& curNodes = TenantNodes[it->second];

for (const auto& [actorId, _] : ev->Get()->CachedMessageData->InfoEntries) {
nodes.insert(actorId.NodeId());
const ui32 nodeId = actorId.NodeId();
newNodes.insert(nodeId);
auto it = curNodes.find(nodeId);
if (it != curNodes.end()) {
curNodes.erase(it);
} else {
result.NewNodes.insert(nodeId);
}
}

result.RemovedNodes = std::move(curNodes);
curNodes = std::move(newNodes);

ctx.Schedule(UpdateInternal, new TEvPrivate::TEvUpdateTenantNodes(it->second));
NodeDiscoverers.erase(it);

return result;
}

void TNodesManager::ProcessResponse(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx) {
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/tx/replication/controller/nodes_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@ class TNodesManager {
static constexpr TDuration UpdateInternal = TDuration::Minutes(5);
static constexpr TDuration RetryInternal = TDuration::Seconds(10);

public:
struct TProcessResult {
THashSet<ui32> NewNodes;
THashSet<ui32> RemovedNodes;
};

public:
bool HasTenant(const TString& tenant) const;
const THashSet<ui32>& GetNodes(const TString& tenant) const;

void DiscoverNodes(const TString& tenant, const TActorId& cache, const TActorContext& ctx);
void ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx);
TProcessResult ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx);
void ProcessResponse(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx);

void Shutdown(const TActorContext& ctx);
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/tx/replication/controller/session_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

namespace NKikimr::NReplication::NController {

struct TSessionInfo {
// TODO
};

}
49 changes: 49 additions & 0 deletions ydb/core/tx/replication/service/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,34 @@ namespace NKikimr::NReplication {

namespace NService {

class TSessionInfo {
public:
explicit TSessionInfo(const TActorId& actorId)
: ActorId(actorId)
, Generation(0)
{
}

operator TActorId() const {
return ActorId;
}

ui64 GetGeneration() const {
return Generation;
}

void Update(const TActorId& actorId, ui64 generation) {
Y_ABORT_UNLESS(Generation <= generation);
ActorId = actorId;
Generation = generation;
}

private:
TActorId ActorId;
ui64 Generation;

};

class TReplicationService: public TActorBootstrapped<TReplicationService> {
void RunBoardPublisher() {
const auto& tenant = AppData()->TenantName;
Expand All @@ -25,6 +53,25 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
BoardPublisher = Register(CreateBoardPublishActor(boardPath, TString(), SelfId(), 0, true));
}

void Handle(TEvService::TEvHandshake::TPtr& ev) {
const auto& record = ev->Get()->Record;

auto it = Sessions.find(record.GetControllerId());
if (it == Sessions.end()) {
it = Sessions.emplace(record.GetControllerId(), ev->Sender).first;
}

auto& session = it->second;

if (session.GetGeneration() > record.GetGeneration()) {
// ignore stale controller
return;
}

session.Update(ev->Sender, record.GetGeneration());
Send(session, new TEvService::TEvStatus()); // TODO
}

void PassAway() override {
if (auto actorId = std::exchange(BoardPublisher, {})) {
Send(actorId, new TEvents::TEvPoison());
Expand All @@ -45,12 +92,14 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {

STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvService::TEvHandshake, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}

private:
TActorId BoardPublisher;
THashMap<ui64, TSessionInfo> Sessions;

}; // TReplicationService

Expand Down
28 changes: 28 additions & 0 deletions ydb/core/tx/replication/service/service.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,37 @@
#pragma once

#include <ydb/core/base/defs.h>
#include <ydb/core/base/events.h>
#include <ydb/core/protos/replication.pb.h>

namespace NKikimr::NReplication {

struct TEvService {
enum EEv {
EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_SERVICE),

EvHandshake,
EvStatus,

EvEnd,
};

static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_SERVICE));

struct TEvHandshake: public TEventPB<TEvHandshake, NKikimrReplication::TEvHandshake, EvHandshake> {
TEvHandshake() = default;

explicit TEvHandshake(ui64 tabletId, ui64 generation) {
Record.SetControllerId(tabletId);
Record.SetGeneration(generation);
}
};

struct TEvStatus: public TEventPB<TEvStatus, NKikimrReplication::TEvStatus, EvStatus> {
TEvStatus() = default;
};
};

namespace NService {

inline TString MakeDiscoveryPath(const TString& tenant) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/replication/service/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace NKikimr::NReplication::NService {

struct TEvWorker {
enum EEv {
EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_SERVICE),
EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_WORKER),

EvHandshake,
EvPoll,
Expand All @@ -21,7 +21,7 @@ struct TEvWorker {
EvEnd,
};

static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_SERVICE));
static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_WORKER));

struct TEvHandshake: public TEventLocal<TEvHandshake, EvHandshake> {};
struct TEvPoll: public TEventLocal<TEvPoll, EvPoll> {};
Expand Down
Loading