Skip to content

Commit

Permalink
Send unsubcribe in viewer (ydb-platform#3266)
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall authored Mar 29, 2024
1 parent 874723c commit 9c0752f
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 30 deletions.
70 changes: 43 additions & 27 deletions ydb/core/viewer/counters_hosts.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ using namespace NActors;
using namespace NNodeWhiteboard;

class TCountersHostsList : public TActorBootstrapped<TCountersHostsList> {
using TBase = TActorBootstrapped<TCountersHostsList>;

IViewer* Viewer;
NMon::TEvHttpInfo::TPtr Event;
THolder<TEvInterconnect::TEvNodesInfo> NodesInfo;
TMap<TNodeId, THolder<TEvWhiteboard::TEvSystemStateResponse>> NodesResponses;
THashSet<TActorId> TcpProxies;
ui32 NodesRequested = 0;
ui32 NodesReceived = 0;
bool StaticNodesOnly = false;
Expand All @@ -35,47 +38,48 @@ class TCountersHostsList : public TActorBootstrapped<TCountersHostsList> {
, Event(ev)
{}

void Bootstrap(const TActorContext& ctx) {
void Bootstrap() {
const auto& params(Event->Get()->Request.GetParams());
StaticNodesOnly = FromStringWithDefault<bool>(params.Get("static_only"), StaticNodesOnly);
DynamicNodesOnly = FromStringWithDefault<bool>(params.Get("dynamic_only"), DynamicNodesOnly);
const TActorId nameserviceId = GetNameserviceActorId();
ctx.Send(nameserviceId, new TEvInterconnect::TEvListNodes());
ctx.Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup());
Send(nameserviceId, new TEvInterconnect::TEvListNodes());
Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup());
Become(&TThis::StateRequestedList);
}

STFUNC(StateRequestedList) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvInterconnect::TEvNodesInfo, Handle);
CFunc(TEvents::TSystem::Wakeup, Timeout);
hFunc(TEvInterconnect::TEvNodesInfo, Handle);
cFunc(TEvents::TSystem::Wakeup, Timeout);
}
}

STFUNC(StateRequestedSysInfo) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvWhiteboard::TEvSystemStateResponse, Handle);
HFunc(TEvents::TEvUndelivered, Undelivered);
HFunc(TEvInterconnect::TEvNodeDisconnected, Disconnected);
CFunc(TEvents::TSystem::Wakeup, Timeout);
hFunc(TEvWhiteboard::TEvSystemStateResponse, Handle);
hFunc(TEvents::TEvUndelivered, Undelivered);
hFunc(TEvInterconnect::TEvNodeDisconnected, Disconnected);
hFunc(TEvInterconnect::TEvNodeConnected, Connected);
cFunc(TEvents::TSystem::Wakeup, Timeout);
}
}

void SendRequest(ui32 nodeId, const TActorContext& ctx) {
void SendRequest(ui32 nodeId) {
TActorId whiteboardServiceId = MakeNodeWhiteboardServiceId(nodeId);
THolder<TEvWhiteboard::TEvSystemStateRequest> request = MakeHolder<TEvWhiteboard::TEvSystemStateRequest>();
ctx.Send(whiteboardServiceId, request.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, nodeId);
++NodesRequested;
Send(whiteboardServiceId, request.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, nodeId);
NodesRequested++;
}

void NodeStateInfoReceived(const TActorContext& ctx) {
void NodeStateInfoReceived() {
++NodesReceived;
if (NodesRequested == NodesReceived) {
ReplyAndDie(ctx);
ReplyAndDie();
}
}

void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev, const TActorContext& ctx) {
void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev) {
NodesInfo = ev->Release();
ui32 minAllowedNodeId = std::numeric_limits<ui32>::min();
ui32 maxAllowedNodeId = std::numeric_limits<ui32>::max();
Expand All @@ -90,33 +94,38 @@ class TCountersHostsList : public TActorBootstrapped<TCountersHostsList> {
}
for (const auto& nodeInfo : NodesInfo->Nodes) {
if (nodeInfo.NodeId >= minAllowedNodeId && nodeInfo.NodeId <= maxAllowedNodeId) {
SendRequest(nodeInfo.NodeId, ctx);
SendRequest(nodeInfo.NodeId);
}
}
Become(&TThis::StateRequestedSysInfo);
}

void Handle(TEvWhiteboard::TEvSystemStateResponse::TPtr& ev, const TActorContext& ctx) {
void Handle(TEvWhiteboard::TEvSystemStateResponse::TPtr& ev) {
ui64 nodeId = ev.Get()->Cookie;
NodesResponses[nodeId] = ev->Release();
NodeStateInfoReceived(ctx);
NodeStateInfoReceived();
}

void Undelivered(TEvents::TEvUndelivered::TPtr& ev, const TActorContext& ctx) {
void Undelivered(TEvents::TEvUndelivered::TPtr& ev) {
ui32 nodeId = ev.Get()->Cookie;
if (NodesResponses.emplace(nodeId, nullptr).second) {
NodeStateInfoReceived(ctx);
NodeStateInfoReceived();
}
}

void Disconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx) {
void Disconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
ui32 nodeId = ev->Get()->NodeId;
TcpProxies.erase(ev->Sender);
if (NodesResponses.emplace(nodeId, nullptr).second) {
NodeStateInfoReceived(ctx);
NodeStateInfoReceived();
}
}

void ReplyAndDie(const TActorContext& ctx) {
void Connected(TEvInterconnect::TEvNodeConnected::TPtr& ev) {
TcpProxies.insert(ev->Sender);
}

void ReplyAndDie() {
TStringStream text;
for (const auto& [nodeId, sysInfo] : NodesResponses) {
if (sysInfo) {
Expand Down Expand Up @@ -147,12 +156,19 @@ class TCountersHostsList : public TActorBootstrapped<TCountersHostsList> {
}
}
}
ctx.Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKTEXT(Event->Get()) + text.Str(), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
Die(ctx);
Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKTEXT(Event->Get()) + text.Str(), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
PassAway();
}

void PassAway() {
for (auto &tcpPorxy: TcpProxies) {
Send(tcpPorxy, new TEvents::TEvUnsubscribe);
}
TBase::PassAway();
}

void Timeout(const TActorContext &ctx) {
ReplyAndDie(ctx);
void Timeout() {
ReplyAndDie();
}
};

Expand Down
19 changes: 16 additions & 3 deletions ydb/core/viewer/json_vdisk_req.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class TJsonVDiskRequest : public TViewerPipeClient<TJsonVDiskRequest<RequestType
ui32 PDiskId = 0;
ui32 VSlotId = 0;

std::optional<TActorId> TcpProxyId;

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::VIEWER_HANDLER;
Expand Down Expand Up @@ -111,6 +113,7 @@ class TJsonVDiskRequest : public TViewerPipeClient<TJsonVDiskRequest<RequestType
hFunc(ResponseType, Handle);
cFunc(TEvRetryNodeRequest::EventType, HandleRetry);
cFunc(TEvents::TEvUndelivered::EventType, Undelivered);
hFunc(TEvInterconnect::TEvNodeConnected, Connected);
cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, Disconnected);
cFunc(TEvents::TSystem::Wakeup, HandleTimeout);
}
Expand Down Expand Up @@ -143,7 +146,12 @@ class TJsonVDiskRequest : public TViewerPipeClient<TJsonVDiskRequest<RequestType
}
}

void Connected(TEvInterconnect::TEvNodeConnected::TPtr &ev) {
TcpProxyId = ev->Sender;
}

void Disconnected() {
TcpProxyId = {};
if (!RetryRequest()) {
TBase::RequestDone();
}
Expand All @@ -170,6 +178,13 @@ class TJsonVDiskRequest : public TViewerPipeClient<TJsonVDiskRequest<RequestType
}
}

void PassAway() override {
if (TcpProxyId) {
this->Send(*TcpProxyId, new TEvents::TEvUnsubscribe);
}
TBase::PassAway();
}

void ReplyAndPassAway(const TString &error = "") {
try {
TStringStream json;
Expand All @@ -182,10 +197,8 @@ class TJsonVDiskRequest : public TViewerPipeClient<TJsonVDiskRequest<RequestType
} catch (const std::exception& e) {
TBase::Send(Initiator, new NMon::TEvHttpInfoRes(TString("HTTP/1.1 400 Bad Request\r\n\r\n") + e.what(), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
}
TBase::PassAway();
PassAway();
}


};

template <typename RequestType, typename ResponseType>
Expand Down

0 comments on commit 9c0752f

Please sign in to comment.