From e05b692e0e0171a3d6b72117e4ed9b8f8f08b448 Mon Sep 17 00:00:00 2001 From: Alexey Efimov <34044711+adameat@users.noreply.github.com> Date: Thu, 28 Dec 2023 15:11:15 +0100 Subject: [PATCH] graph backend KIKIMR-18277 (#745) --- .gitignore | 2 + ydb/core/base/events.h | 1 + ydb/core/base/pool_stats_collector.cpp | 7 + ydb/core/base/ya.make | 1 + .../cms/console/console_tenants_manager.cpp | 1 + ydb/core/driver_lib/run/config.h | 1 + .../run/kikimr_services_initializers.cpp | 16 +- .../run/kikimr_services_initializers.h | 8 + ydb/core/driver_lib/run/run.cpp | 4 + ydb/core/driver_lib/run/ya.make | 2 + ydb/core/graph/api/events.h | 48 ++++ ydb/core/graph/api/service.h | 18 ++ ydb/core/graph/api/shard.h | 14 + ydb/core/graph/api/ya.make | 18 ++ ydb/core/graph/protos/graph.proto | 31 +++ ydb/core/graph/protos/ya.make | 14 + ydb/core/graph/service/log.h | 25 ++ ydb/core/graph/service/service_impl.cpp | 234 ++++++++++++++++ ydb/core/graph/service/ya.make | 18 ++ ydb/core/graph/shard/backends.cpp | 251 ++++++++++++++++++ ydb/core/graph/shard/backends.h | 62 +++++ ydb/core/graph/shard/log.h | 42 +++ .../graph/shard/protos/counters_shard.proto | 29 ++ ydb/core/graph/shard/protos/ya.make | 18 ++ ydb/core/graph/shard/schema.h | 47 ++++ ydb/core/graph/shard/shard_impl.cpp | 111 ++++++++ ydb/core/graph/shard/shard_impl.h | 61 +++++ ydb/core/graph/shard/tx_change_backend.cpp | 37 +++ ydb/core/graph/shard/tx_clear_data.cpp | 42 +++ ydb/core/graph/shard/tx_get_metrics.cpp | 46 ++++ ydb/core/graph/shard/tx_init_schema.cpp | 36 +++ ydb/core/graph/shard/tx_monitoring.cpp | 80 ++++++ ydb/core/graph/shard/tx_startup.cpp | 68 +++++ ydb/core/graph/shard/tx_store_metrics.cpp | 45 ++++ ydb/core/graph/shard/ut/shard_ut.cpp | 185 +++++++++++++ ydb/core/graph/shard/ut/ya.make | 22 ++ ydb/core/graph/shard/ya.make | 35 +++ ydb/core/graph/ut/graph_ut.cpp | 184 +++++++++++++ ydb/core/graph/ut/ya.make | 24 ++ ydb/core/graph/ya.make | 13 + ydb/core/mind/hive/hive_statics.cpp | 2 +- ydb/core/mind/hive/monitoring.cpp | 7 +- ydb/core/protos/counters_schemeshard.proto | 2 + ydb/core/protos/flat_tx_scheme.proto | 2 + ydb/core/protos/subdomains.proto | 4 +- ydb/core/protos/tablet.proto | 3 +- ydb/core/tablet/tablet_counters_app.cpp | 7 + ydb/core/tablet/ya.make | 1 + ydb/core/testlib/tablet_helpers.cpp | 3 + ydb/core/testlib/tenant_runtime.cpp | 8 +- ydb/core/tx/schemeshard/schemeshard.h | 52 ++-- .../schemeshard__delete_tablet_reply.cpp | 3 + ydb/core/tx/schemeshard/schemeshard__init.cpp | 3 + .../tx/schemeshard/schemeshard__init_root.cpp | 5 +- ...emeshard__operation_alter_extsubdomain.cpp | 29 +- .../schemeshard__operation_common_subdomain.h | 10 + .../schemeshard__operation_side_effects.cpp | 5 + .../schemeshard__sync_update_tenants.cpp | 8 + .../schemeshard/schemeshard_domain_links.cpp | 27 +- .../tx/schemeshard/schemeshard_domain_links.h | 1 + .../tx/schemeshard/schemeshard_info_types.h | 14 + ydb/core/tx/schemeshard/ut_base/ut_base.cpp | 2 +- .../tx/schemeshard/ut_helpers/test_env.cpp | 2 +- ydb/core/tx/schemeshard/ut_helpers/test_env.h | 2 + ydb/core/viewer/json_graph.h | 172 ++++++++++++ ydb/core/viewer/json_handlers_viewer.cpp | 4 + ydb/core/viewer/json_render.h | 158 +++++++++++ ydb/core/viewer/viewer.cpp | 17 ++ ydb/core/viewer/viewer.h | 1 + ydb/core/viewer/ya.make | 3 + ydb/core/ya.make | 1 + ydb/library/services/services.proto | 3 + 72 files changed, 2413 insertions(+), 49 deletions(-) create mode 100644 ydb/core/graph/api/events.h create mode 100644 ydb/core/graph/api/service.h create mode 100644 ydb/core/graph/api/shard.h create mode 100644 ydb/core/graph/api/ya.make create mode 100644 ydb/core/graph/protos/graph.proto create mode 100644 ydb/core/graph/protos/ya.make create mode 100644 ydb/core/graph/service/log.h create mode 100644 ydb/core/graph/service/service_impl.cpp create mode 100644 ydb/core/graph/service/ya.make create mode 100644 ydb/core/graph/shard/backends.cpp create mode 100644 ydb/core/graph/shard/backends.h create mode 100644 ydb/core/graph/shard/log.h create mode 100644 ydb/core/graph/shard/protos/counters_shard.proto create mode 100644 ydb/core/graph/shard/protos/ya.make create mode 100644 ydb/core/graph/shard/schema.h create mode 100644 ydb/core/graph/shard/shard_impl.cpp create mode 100644 ydb/core/graph/shard/shard_impl.h create mode 100644 ydb/core/graph/shard/tx_change_backend.cpp create mode 100644 ydb/core/graph/shard/tx_clear_data.cpp create mode 100644 ydb/core/graph/shard/tx_get_metrics.cpp create mode 100644 ydb/core/graph/shard/tx_init_schema.cpp create mode 100644 ydb/core/graph/shard/tx_monitoring.cpp create mode 100644 ydb/core/graph/shard/tx_startup.cpp create mode 100644 ydb/core/graph/shard/tx_store_metrics.cpp create mode 100644 ydb/core/graph/shard/ut/shard_ut.cpp create mode 100644 ydb/core/graph/shard/ut/ya.make create mode 100644 ydb/core/graph/shard/ya.make create mode 100644 ydb/core/graph/ut/graph_ut.cpp create mode 100644 ydb/core/graph/ut/ya.make create mode 100644 ydb/core/graph/ya.make create mode 100644 ydb/core/viewer/json_graph.h create mode 100644 ydb/core/viewer/json_render.h diff --git a/.gitignore b/.gitignore index 9f73be657f2c..e5d1f13253e4 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,8 @@ __pycache__/ *_pb2.py *_pb2_grpc.py *_pb2.pyi +*.pb.h +*.pb.cc # MacOS specific .DS_Store diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 62fbea255dab..d192a702ffc8 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -170,6 +170,7 @@ struct TKikimrEvents : TEvents { ES_DB_METADATA_CACHE, ES_TABLE_CREATOR, ES_PQ_PARTITION_CHOOSER, + ES_GRAPH, }; }; diff --git a/ydb/core/base/pool_stats_collector.cpp b/ydb/core/base/pool_stats_collector.cpp index e6e1b613a1f2..fe3041d40249 100644 --- a/ydb/core/base/pool_stats_collector.cpp +++ b/ydb/core/base/pool_stats_collector.cpp @@ -9,6 +9,9 @@ #include #include +#include +#include + namespace NKikimr { // Periodically collects stats from executor threads and exposes them as mon counters @@ -44,11 +47,15 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor { MiniKQLPoolStats.Update(); TVector> pools; + double cpuUsage = 0; for (const auto& pool : PoolCounters) { pools.emplace_back(pool.Name, pool.Usage, pool.Threads); + cpuUsage += pool.Usage; } ctx.Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ctx.SelfID.NodeId()), new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools)); + + ctx.Send(NGraph::MakeGraphServiceId(), new NGraph::TEvGraph::TEvSendMetrics("cpu_usage", cpuUsage)); } private: diff --git a/ydb/core/base/ya.make b/ydb/core/base/ya.make index c4520c1375ed..085cf88f37d1 100644 --- a/ydb/core/base/ya.make +++ b/ydb/core/base/ya.make @@ -88,6 +88,7 @@ PEERDIR( ydb/core/base/services ydb/core/debug ydb/core/erasure + ydb/core/graph/api ydb/core/protos ydb/core/protos/out ydb/core/scheme diff --git a/ydb/core/cms/console/console_tenants_manager.cpp b/ydb/core/cms/console/console_tenants_manager.cpp index 955e8926f13a..e4982abd3630 100644 --- a/ydb/core/cms/console/console_tenants_manager.cpp +++ b/ydb/core/cms/console/console_tenants_manager.cpp @@ -451,6 +451,7 @@ class TSubDomainManip : public TActorBootstrapped { subdomain.SetName(Subdomain.second); if (Tenant->IsExternalSubdomain) { subdomain.SetExternalSchemeShard(true); + subdomain.SetGraphShard(true); if (Tenant->IsExternalHive) { subdomain.SetExternalHive(true); } diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h index 7a5aae7eb3b1..2cd944329a68 100644 --- a/ydb/core/driver_lib/run/config.h +++ b/ydb/core/driver_lib/run/config.h @@ -83,6 +83,7 @@ union TBasicKikimrServicesMask { // next 64 flags bool EnableDatabaseMetadataCache:1; + bool EnableGraphService:1; }; struct { diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 7c5357e91bbe..738ce4196258 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -213,6 +213,9 @@ #include #include +#include +#include + #include #include @@ -1021,7 +1024,7 @@ void TLocalServiceInitializer::InitializeServices( addToLocalConfig(TTabletTypes::ReplicationController, &NReplication::CreateController, TMailboxType::ReadAsFilled, appData->UserPoolId); addToLocalConfig(TTabletTypes::BlobDepot, &NBlobDepot::CreateBlobDepot, TMailboxType::ReadAsFilled, appData->UserPoolId); addToLocalConfig(TTabletTypes::StatisticsAggregator, &NStat::CreateStatisticsAggregator, TMailboxType::ReadAsFilled, appData->UserPoolId); - + addToLocalConfig(TTabletTypes::GraphShard, &NGraph::CreateGraphShard, TMailboxType::ReadAsFilled, appData->UserPoolId); TTenantPoolConfig::TPtr tenantPoolConfig = new TTenantPoolConfig(Config.GetTenantPoolConfig(), localConfig); if (!tenantPoolConfig->IsEnabled && !tenantPoolConfig->StaticSlots.empty()) @@ -2666,5 +2669,16 @@ void TDatabaseMetadataCacheInitializer::InitializeServices(NActors::TActorSystem TActorSetupCmd(CreateDatabaseMetadataCache(appData->TenantName), TMailboxType::HTSwap, appData->UserPoolId)); } +TGraphServiceInitializer::TGraphServiceInitializer(const TKikimrRunConfig& runConfig) + : IKikimrServicesInitializer(runConfig) +{ +} + +void TGraphServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { + setup->LocalServices.emplace_back( + NGraph::MakeGraphServiceId(), + TActorSetupCmd(NGraph::CreateGraphService(appData->TenantName), TMailboxType::HTSwap, appData->UserPoolId)); +} + } // namespace NKikimrServicesInitializers } // namespace NKikimr diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index 645423fe8e9a..a291b4383064 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -605,5 +605,13 @@ class TDatabaseMetadataCacheInitializer : public IKikimrServicesInitializer { void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; }; + +class TGraphServiceInitializer : public IKikimrServicesInitializer { +public: + TGraphServiceInitializer(const TKikimrRunConfig& runConfig); + + void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; +}; + } // namespace NKikimrServicesInitializers } // namespace NKikimr diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index a8ac5cc73b93..b656afb5608a 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1614,6 +1614,10 @@ TIntrusivePtr TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TDatabaseMetadataCacheInitializer(runConfig)); } + if (serviceMask.EnableGraphService) { + sil->AddServiceInitializer(new TGraphServiceInitializer(runConfig)); + } + return sil; } diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make index c7699785460b..b6025b7c4e2f 100644 --- a/ydb/core/driver_lib/run/ya.make +++ b/ydb/core/driver_lib/run/ya.make @@ -67,6 +67,8 @@ PEERDIR( ydb/core/formats ydb/core/fq/libs/init ydb/core/fq/libs/logs + ydb/core/graph/service + ydb/core/graph/shard ydb/core/grpc_services ydb/core/grpc_services/base ydb/core/grpc_services/auth_processor diff --git a/ydb/core/graph/api/events.h b/ydb/core/graph/api/events.h new file mode 100644 index 000000000000..b05350630cf2 --- /dev/null +++ b/ydb/core/graph/api/events.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include + +namespace NKikimr { +namespace NGraph { + +struct TEvGraph { + enum EEv { + // requests + EvSendMetrics = EventSpaceBegin(TKikimrEvents::ES_GRAPH), + EvGetMetrics, + EvMetricsResult, + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_GRAPH), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_GRAPH)"); + + struct TEvSendMetrics : TEventPB { + TEvSendMetrics() = default; + + TEvSendMetrics(const TString& name, double value) { + NKikimrGraph::TMetric* metric = Record.AddMetrics(); + metric->SetName(name); + metric->SetValue(value); + } + }; + + struct TEvGetMetrics : TEventPB { + TEvGetMetrics() = default; + + TEvGetMetrics(const NKikimrGraph::TEvGetMetrics& request) + : TEventPB(request) + {} + }; + + struct TEvMetricsResult : TEventPB { + TEvMetricsResult() = default; + + TEvMetricsResult(NKikimrGraph::TEvMetricsResult&& result) + : TEventPB(std::move(result)) + {} + }; +}; + +} // NGraph +} // NKikimr diff --git a/ydb/core/graph/api/service.h b/ydb/core/graph/api/service.h new file mode 100644 index 000000000000..88c66eaa8e50 --- /dev/null +++ b/ydb/core/graph/api/service.h @@ -0,0 +1,18 @@ +#pragma once + +#include + +namespace NKikimr { +namespace NGraph { + +using namespace NActors; + +inline TActorId MakeGraphServiceId(ui32 node = 0) { + char x[12] = {'g','r','a','p','h','s', 'v', 'c'}; + return TActorId(node, TStringBuf(x, 12)); +} + +IActor* CreateGraphService(const TString& database); + +} // NGraph +} // NKikimr diff --git a/ydb/core/graph/api/shard.h b/ydb/core/graph/api/shard.h new file mode 100644 index 000000000000..bf204ab67a1c --- /dev/null +++ b/ydb/core/graph/api/shard.h @@ -0,0 +1,14 @@ +#pragma once + +#include +#include + +namespace NKikimr { +namespace NGraph { + +using namespace NActors; + +IActor* CreateGraphShard(const TActorId& tablet, TTabletStorageInfo* info); + +} // NGraph +} // NKikimr diff --git a/ydb/core/graph/api/ya.make b/ydb/core/graph/api/ya.make new file mode 100644 index 000000000000..e986b2c92d7c --- /dev/null +++ b/ydb/core/graph/api/ya.make @@ -0,0 +1,18 @@ +LIBRARY() + +OWNER( + xenoxeno + g:kikimr +) + +SRCS( + events.h + service.h + shard.h +) + +PEERDIR( + ydb/core/graph/protos +) + +END() diff --git a/ydb/core/graph/protos/graph.proto b/ydb/core/graph/protos/graph.proto new file mode 100644 index 000000000000..279d76859249 --- /dev/null +++ b/ydb/core/graph/protos/graph.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +package NKikimrGraph; + +option java_package = "ru.yandex.kikimr.proto"; + +message TMetric { + string Name = 1; + double Value = 2; +} + +message TEvSendMetrics { + repeated TMetric Metrics = 1; +} + +message TEvGetMetrics { + optional uint64 TimeFrom = 1; + optional uint64 TimeTo = 2; + repeated string Metrics = 3; + optional uint32 MaxPoints = 4; +} + +message TMetricData { + repeated double Values = 1 [packed = true]; +} + +message TEvMetricsResult { + repeated uint64 Time = 1 [packed = true]; + repeated TMetricData Data = 2; + string Error = 3; +} diff --git a/ydb/core/graph/protos/ya.make b/ydb/core/graph/protos/ya.make new file mode 100644 index 000000000000..3b975a1d2390 --- /dev/null +++ b/ydb/core/graph/protos/ya.make @@ -0,0 +1,14 @@ +PROTO_LIBRARY() + +OWNER( + xenoxeno + g:kikimr +) + +SRCS( + graph.proto +) + +EXCLUDE_TAGS(GO_PROTO) + +END() diff --git a/ydb/core/graph/service/log.h b/ydb/core/graph/service/log.h new file mode 100644 index 000000000000..a987f248e2ca --- /dev/null +++ b/ydb/core/graph/service/log.h @@ -0,0 +1,25 @@ +#pragma once + +#if defined BLOG_D || defined BLOG_I || defined BLOG_ERROR || defined BLOG_TRACE +#error log macro definition clash +#endif + +#include +#include + +namespace NKikimr { +namespace NGraph { + +TString GetLogPrefix(); + +} +} + +#define BLOG_D(stream) ALOG_DEBUG(NKikimrServices::GRAPH, GetLogPrefix() << stream) +#define BLOG_I(stream) ALOG_INFO(NKikimrServices::GRAPH, GetLogPrefix() << stream) +#define BLOG_W(stream) ALOG_WARN(NKikimrServices::GRAPH, GetLogPrefix() << stream) +#define BLOG_NOTICE(stream) ALOG_NOTICE(NKikimrServices::GRAPH, GetLogPrefix() << stream) +#define BLOG_ERROR(stream) ALOG_ERROR(NKikimrServices::GRAPH, GetLogPrefix() << stream) +#define BLOG_CRIT(stream) ALOG_CRIT(NKikimrServices::GRAPH, GetLogPrefix() << stream) +#define BLOG_TRACE(stream) ALOG_TRACE(NKikimrServices::GRAPH, GetLogPrefix() << stream) +#define Y_ENSURE_LOG(cond, stream) if (!(cond)) { BLOG_ERROR("Failed condition \"" << #cond << "\" " << stream); } diff --git a/ydb/core/graph/service/service_impl.cpp b/ydb/core/graph/service/service_impl.cpp new file mode 100644 index 000000000000..5ecd8f983ddb --- /dev/null +++ b/ydb/core/graph/service/service_impl.cpp @@ -0,0 +1,234 @@ +#include "log.h" +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr { +namespace NGraph { + +class TGraphService : public TActor { +private: + using TBase = TActor; + static constexpr TDuration RESOLVE_TIMEOUT = TDuration::Seconds(1); + static constexpr TDuration CONNECT_TIMEOUT = TDuration::Seconds(1); + static constexpr TDuration GET_TIMEOUT = TDuration::Seconds(10); + static constexpr size_t MAX_INFLIGHT = 100; + TString Database; + TInstant ResolveTimestamp; + ui64 GraphShardId = 0; + TInstant ConnectTimestamp; + TActorId GraphShardPipe = {}; + + struct TGetMetricsRequest { + ui64 Id; + TInstant Deadline; + TActorId Sender; + ui64 Cookie; + NKikimrGraph::TEvGetMetrics Request; + }; + + ui64 RequestId = 0; + std::deque RequestsInFlight; + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::GRAPH_SERVICE; + } + + TGraphService(const TString& database) + : TActor(&TGraphService::StateWork) + , Database(database) + { + } + + TString GetLogPrefix() const { + return "SVC "; + } + + void ResolveDatabase() { + if (ResolveTimestamp && (ResolveTimestamp + RESOLVE_TIMEOUT > TActivationContext::Now())) { + BLOG_TRACE("ResolveDatabase too soon"); + return; // too soon + } + + BLOG_D("ResolveDatabase " << Database); + TAutoPtr request(new NSchemeCache::TSchemeCacheNavigate()); + NSchemeCache::TSchemeCacheNavigate::TEntry entry; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList; + entry.SyncVersion = false; + entry.Path = SplitPath(Database); + request->ResultSet.emplace_back(entry); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request)); + ResolveTimestamp = TActivationContext::Now(); + } + + NTabletPipe::TClientConfig GetPipeClientConfig() { + NTabletPipe::TClientConfig clientConfig; + clientConfig.RetryPolicy = {.RetryLimitCount = 3}; + return clientConfig; + } + + void ConnectShard() { + if (GraphShardId) { + if (ConnectTimestamp && (ConnectTimestamp + CONNECT_TIMEOUT > TActivationContext::Now())) { + BLOG_TRACE("ConnectShard too soon"); + return; // too soon + } + BLOG_D("ConnectToShard " << GraphShardId); + IActor* pipeActor = NTabletPipe::CreateClient(TBase::SelfId(), GraphShardId, GetPipeClientConfig()); + GraphShardPipe = TBase::RegisterWithSameMailbox(pipeActor); + ConnectTimestamp = TActivationContext::Now(); + } else { + ResolveDatabase(); + } + } + + void SendRequest(const TGetMetricsRequest& request) { + if (GraphShardPipe) { + TEvGraph::TEvGetMetrics* event = new TEvGraph::TEvGetMetrics(); + event->Record = request.Request; + NTabletPipe::SendData(SelfId(), GraphShardPipe, event, request.Id); + } else { + ConnectShard(); + } + } + + void EnqueueRequest(TEvGraph::TEvGetMetrics::TPtr& ev) { + if (RequestsInFlight.size() >= MAX_INFLIGHT) { + TEvGraph::TEvMetricsResult* response = new TEvGraph::TEvMetricsResult(); + response->Record.SetError("Maximum number of outstanding requests reached"); + Send(ev->Sender, response, 0, ev->Cookie); + return; + } + if (RequestsInFlight.empty()) { + Schedule(GET_TIMEOUT, new TEvents::TEvWakeup()); + } + RequestsInFlight.push_back({ + .Id = ++RequestId, + .Deadline = TActivationContext::Now() + GET_TIMEOUT, + .Sender = ev->Sender, + .Cookie = ev->Cookie, + .Request = std::move(ev->Get()->Record) + }); + SendRequest(RequestsInFlight.back()); + } + + void DiscardOldRequests(TInstant now) { + while (!RequestsInFlight.empty() && RequestsInFlight.front().Deadline <= now) { + BLOG_W("Discarding request with id " << RequestsInFlight.front().Id); + TEvGraph::TEvMetricsResult* response = new TEvGraph::TEvMetricsResult(); + response->Record.SetError("Request timed out"); + Send(RequestsInFlight.front().Sender, response, 0, RequestsInFlight.front().Cookie); + RequestsInFlight.pop_front(); + } + } + + void ResendRequests() { + for (const TGetMetricsRequest& request : RequestsInFlight) { + BLOG_TRACE("Resending request " << request.Id); + NTabletPipe::SendData(SelfId(), GraphShardPipe, new TEvGraph::TEvGetMetrics(request.Request), request.Id); + } + } + + void Handle(TEvGraph::TEvSendMetrics::TPtr& ev) { + BLOG_TRACE("TEvSendMetrics"); + if (GraphShardPipe) { + NTabletPipe::SendData(SelfId(), GraphShardPipe, ev.Get()->Release()); + } else { + ConnectShard(); + BLOG_TRACE("Dropped metrics"); + } + } + + void Handle(TEvGraph::TEvGetMetrics::TPtr& ev) { + BLOG_TRACE("TEvGetMetrics"); + if (!GraphShardPipe) { + ConnectShard(); + } + EnqueueRequest(ev); + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + TAutoPtr request = ev->Get()->Request; + if (!request->ResultSet.empty() && request->ResultSet.front().Status == NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { + const NSchemeCache::TSchemeCacheNavigate::TEntry& response = request->ResultSet.front(); + if (response.DomainDescription) { + if (response.DomainDescription->Description.GetProcessingParams().GetGraphShard() != 0) { + GraphShardId = response.DomainDescription->Description.GetProcessingParams().GetGraphShard(); + BLOG_D("Database " << Database << " resolved to shard " << GraphShardId); + ConnectShard(); + return; + } + } + BLOG_W("Error resolving database " << Database << " incomplete response / no graph shard"); + } else { + if (!request->ResultSet.empty()) { + BLOG_W("Error resolving database " << Database << " error " << request->ResultSet.front().Status); + } else { + BLOG_W("Error resolving database " << Database << " no response"); + } + } + } + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { + if (ev->Get()->Status == NKikimrProto::OK) { + BLOG_D("Connected to shard " << GraphShardId); + ResendRequests(); + } else { + BLOG_W("Error connecting to shard " << GraphShardId << " error " << ev->Get()->Status); + NTabletPipe::CloseClient(TBase::SelfId(), GraphShardPipe); + GraphShardPipe = {}; + } + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) { + BLOG_W("Connection to shard was destroyed"); + NTabletPipe::CloseClient(TBase::SelfId(), GraphShardPipe); + GraphShardPipe = {}; + } + + void Handle(TEvGraph::TEvMetricsResult::TPtr& ev) { + auto id(ev->Cookie); + BLOG_TRACE("TEvMetricsResult " << id); + for (auto it = RequestsInFlight.begin(); it != RequestsInFlight.end(); ++it) { + if (it->Id == id) { + BLOG_TRACE("TEvMetricsResult found request " << id << " resending to " << it->Sender); + Send(it->Sender, ev->Release().Release(), 0, it->Cookie); + RequestsInFlight.erase(it); + return; + } + } + BLOG_W("Couldn't find request with id " << id); + } + + void HandleTimeout() { + TInstant now(TActivationContext::Now()); + DiscardOldRequests(now); + if (!RequestsInFlight.empty()) { + Schedule(RequestsInFlight.front().Deadline - now, new TEvents::TEvWakeup()); + } + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvGraph::TEvSendMetrics, Handle); + hFunc(TEvGraph::TEvGetMetrics, Handle); + hFunc(TEvGraph::TEvMetricsResult, Handle); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + hFunc(TEvTabletPipe::TEvClientConnected, Handle); + cFunc(TEvents::TSystem::Wakeup, HandleTimeout); + } + } +}; + + +IActor* CreateGraphService(const TString& database) { + return new TGraphService(database); +} + +} // NGraph +} // NKikimr diff --git a/ydb/core/graph/service/ya.make b/ydb/core/graph/service/ya.make new file mode 100644 index 000000000000..9a9bda397686 --- /dev/null +++ b/ydb/core/graph/service/ya.make @@ -0,0 +1,18 @@ +LIBRARY() + +OWNER( + xenoxeno + g:kikimr +) + +SRCS( + log.h + service_impl.cpp +) + +PEERDIR( + ydb/core/base + ydb/core/graph/api +) + +END() diff --git a/ydb/core/graph/shard/backends.cpp b/ydb/core/graph/shard/backends.cpp new file mode 100644 index 000000000000..7573c6e04cb0 --- /dev/null +++ b/ydb/core/graph/shard/backends.cpp @@ -0,0 +1,251 @@ +#include "log.h" +#include "backends.h" +#include "schema.h" + +namespace NKikimr { +namespace NGraph { + +template<> +std::vector TMemoryBackend::Downsample(const std::vector& data, size_t maxPoints) { + if (data.size() <= maxPoints) { + return data; + } + std::vector result; + double coeff = (double)maxPoints / data.size(); + result.resize(maxPoints); + size_t ltrg = maxPoints; + for (size_t src = 0; src < data.size(); ++src) { + size_t trg = floor(coeff * src); + if (trg != ltrg) { + result[trg] = data[src]; // we expect sorted data so we practically use min() here + ltrg = trg; + } + } + return result; +} + +template<> +std::vector TMemoryBackend::Downsample(const std::vector& data, size_t maxPoints) { + if (data.size() <= maxPoints) { + return data; + } + std::vector result; + double coeff = (double)maxPoints / data.size(); + result.resize(maxPoints); + size_t ltrg = 0; + long cnt = 0; + for (size_t src = 0; src < data.size(); ++src) { + if (isnan(data[src])) { + continue; + } + size_t trg = floor(coeff * src); + if (trg != ltrg && cnt > 0) { + if (cnt > 1) { + result[ltrg] /= cnt; + } + cnt = 0; + } + result[trg] += data[src]; + ++cnt; + ltrg = trg; + } + if (cnt > 1) { + result[ltrg] /= cnt; + } + return result; +} + +void TMemoryBackend::StoreMetrics(TMetricsData&& data) { + if (!MetricsValues.empty() && MetricsValues.back().Timestamp >= data.Timestamp) { + BLOG_ERROR("Invalid timestamp ordering for " << data.Timestamp << " and " << MetricsValues.back().Timestamp); + } + TMetricsRecord& record = MetricsValues.emplace_back(); + record.Timestamp = data.Timestamp; + for (const auto& [name, value] : data.Values) { + auto itMetricsIndex = MetricsIndex.find(name); + if (itMetricsIndex == MetricsIndex.end()) { + itMetricsIndex = MetricsIndex.emplace(name, MetricsIndex.size()).first; + } + size_t idx = itMetricsIndex->second; + if (idx <= record.Values.size()) { + record.Values.resize(idx + 1, NAN); + } + record.Values[idx] = value; + } + BLOG_TRACE("Stored metrics"); +} + +void TMemoryBackend::GetMetrics(const NKikimrGraph::TEvGetMetrics& get, NKikimrGraph::TEvMetricsResult& result) const { + auto itLeft = get.HasTimeFrom() + ? std::lower_bound(MetricsValues.begin(), MetricsValues.end(), TInstant::Seconds(get.GetTimeFrom())) + : MetricsValues.begin(); + auto itRight = get.HasTimeTo() + ? std::upper_bound(itLeft, MetricsValues.end(), TInstant::Seconds(get.GetTimeTo())) + : MetricsValues.end(); + std::vector indexes; + for (const TString& metric : get.GetMetrics()) { + auto itMetricsIndex = MetricsIndex.find(metric); + size_t idx; + if (itMetricsIndex != MetricsIndex.end()) { + idx = itMetricsIndex->second; + } else { + idx = MetricsIndex.size(); // non-existent index + } + indexes.push_back(idx); + } + std::vector timestamps; + std::vector> values; + values.resize(indexes.size()); + for (auto it = itLeft; it != itRight; ++it) { + timestamps.push_back(it->Timestamp); + for (size_t num = 0; num < indexes.size(); ++num) { + size_t idx = indexes[num]; + if (idx < it->Values.size()) { + values[num].push_back(it->Values[idx]); + } else { + values[num].push_back(NAN); + } + } + } + if (get.HasMaxPoints() && timestamps.size() > get.GetMaxPoints()) { + timestamps = Downsample(timestamps, get.GetMaxPoints()); + BLOG_TRACE("GetMetrics timestamps=" << timestamps.size()); + for (std::vector& values : values) { + values = Downsample(values, get.GetMaxPoints()); + BLOG_TRACE("GetMetrics values=" << values.size()); + } + } + result.Clear(); + auto time = result.MutableTime(); + time->Reserve(timestamps.size()); + for (const TInstant t : timestamps) { + time->Add(t.Seconds()); + } + for (std::vector& values : values) { + result.AddData()->MutableValues()->Add(values.begin(), values.end()); + } +} + +void TMemoryBackend::ClearData(TInstant cutline, TInstant& newStartTimestamp) { + auto itCutLine = std::lower_bound(MetricsValues.begin(), MetricsValues.end(), cutline); + MetricsValues.erase(MetricsValues.begin(), itCutLine); + if (!MetricsValues.empty()) { + newStartTimestamp = MetricsValues.front().Timestamp; + } else { + newStartTimestamp = {}; + } +} + +TString TMemoryBackend::GetLogPrefix() const { + return "MEM "; +} + +bool TLocalBackend::StoreMetrics(NTabletFlatExecutor::TTransactionContext& txc, TMetricsData&& data) { + NIceDb::TNiceDb db(txc.DB); + for (const auto& [name, value] : data.Values) { + auto itId = MetricsIndex.find(name); + if (itId == MetricsIndex.end()) { + itId = MetricsIndex.emplace(name, MetricsIndex.size()).first; + db.Table().Key(name).Update(itId->second); + } + ui64 id = itId->second; + db.Table().Key(data.Timestamp.Seconds(), id).Update(value); + } + BLOG_TRACE("Stored metrics"); + return true; +} + +bool TLocalBackend::GetMetrics(NTabletFlatExecutor::TTransactionContext& txc, const NKikimrGraph::TEvGetMetrics& get, NKikimrGraph::TEvMetricsResult& result) const { + NIceDb::TNiceDb db(txc.DB); + ui64 minTime = std::numeric_limits::min(); + ui64 maxTime = std::numeric_limits::max(); + std::unordered_map metricIdx; + if (get.HasTimeFrom()) { + minTime = get.GetTimeFrom(); + } + if (get.HasTimeTo()) { + maxTime = get.GetTimeTo(); + } + for (size_t nMetric = 0; nMetric < get.MetricsSize(); ++nMetric) { + TString name = get.GetMetrics(nMetric); + auto itMetricIdx = MetricsIndex.find(name); + if (itMetricIdx != MetricsIndex.end()) { + metricIdx[itMetricIdx->second] = nMetric; + } + } + std::vector timestamps; + std::vector> values; + auto rowset = db.Table().GreaterOrEqual(minTime).LessOrEqual(maxTime).Select(); + if (!rowset.IsReady()) { + return false; + } + ui64 lastTime = 0; + values.resize(get.MetricsSize()); + while (!rowset.EndOfSet()) { + ui64 time = rowset.GetValue(); + if (time != lastTime) { + lastTime = time; + timestamps.push_back(TInstant::Seconds(time)); + for (auto& vals : values) { + vals.emplace_back(NAN); + } + } + ui64 id = rowset.GetValue(); + auto itIdx = metricIdx.find(id); + if (itIdx != metricIdx.end()) { + values.back()[itIdx->second] = rowset.GetValue(); + } + if (!rowset.Next()) { + return false; + } + } + if (get.HasMaxPoints() && timestamps.size() > get.GetMaxPoints()) { + timestamps = TMemoryBackend::Downsample(timestamps, get.GetMaxPoints()); + BLOG_TRACE("GetMetrics timestamps=" << timestamps.size()); + for (std::vector& values : values) { + values = TMemoryBackend::Downsample(values, get.GetMaxPoints()); + BLOG_TRACE("GetMetrics values=" << values.size()); + } + } + result.Clear(); + auto time = result.MutableTime(); + time->Reserve(timestamps.size()); + for (const TInstant t : timestamps) { + time->Add(t.Seconds()); + } + for (std::vector& values : values) { + result.AddData()->MutableValues()->Add(values.begin(), values.end()); + } + return true; +} + +bool TLocalBackend::ClearData(NTabletFlatExecutor::TTransactionContext& txc, TInstant cutline, TInstant& newStartTimestamp) { + NIceDb::TNiceDb db(txc.DB); + ui64 rows = 0; + auto rowset = db.Table().LessOrEqual(cutline.Seconds()).Select(); + if (!rowset.IsReady()) { + return false; + } + while (!rowset.EndOfSet()) { + ui64 timestamp = rowset.GetValue(); + ui64 id = rowset.GetValue(); + db.Table().Key(timestamp, id).Delete(); + newStartTimestamp = TInstant::Seconds(timestamp); + if (++rows >= MAX_ROWS_TO_DELETE) { + break; + } + if (!rowset.Next()) { + return false; + } + } + return true; +} + +TString TLocalBackend::GetLogPrefix() const { + return "DB "; +} + + +} // NGraph +} // NKikimr + diff --git a/ydb/core/graph/shard/backends.h b/ydb/core/graph/shard/backends.h new file mode 100644 index 000000000000..0c9e82e0ba8b --- /dev/null +++ b/ydb/core/graph/shard/backends.h @@ -0,0 +1,62 @@ +#pragma once + +#include +#include + +namespace NKikimr { +namespace NGraph { + +enum class EBackendType { + Memory = 0, + Local = 1, + External = 2, +}; + +struct TMetricsData { + TInstant Timestamp; + std::unordered_map Values; +}; + +class TMemoryBackend { +public: + void StoreMetrics(TMetricsData&& data); + void GetMetrics(const NKikimrGraph::TEvGetMetrics& get, NKikimrGraph::TEvMetricsResult& result) const; + void ClearData(TInstant cutline, TInstant& newStartTimestamp); + + template + static std::vector Downsample(const std::vector& data, size_t maxPoints); + + TString GetLogPrefix() const; + + struct TMetricsRecord { + TInstant Timestamp; + TSmallVec Values; + + std::strong_ordering operator <=>(const TMetricsRecord& rec) const { + return Timestamp.GetValue() <=> rec.Timestamp.GetValue(); + } + + std::strong_ordering operator <=>(TInstant time) const { + return Timestamp.GetValue() <=> time.GetValue(); + } + }; + + std::unordered_map MetricsIndex; // mapping name -> id + std::deque MetricsValues; +}; + +class TLocalBackend { +public: + static constexpr ui64 MAX_ROWS_TO_DELETE = 1000; + + bool StoreMetrics(NTabletFlatExecutor::TTransactionContext& txc, TMetricsData&& data); + bool GetMetrics(NTabletFlatExecutor::TTransactionContext& txc, const NKikimrGraph::TEvGetMetrics& get, NKikimrGraph::TEvMetricsResult& result) const; + bool ClearData(NTabletFlatExecutor::TTransactionContext& txc, TInstant cutline, TInstant& newStartTimestamp); + + TString GetLogPrefix() const; + + std::unordered_map MetricsIndex; // mapping name -> id +}; + +} // NGraph +} // NKikimr diff --git a/ydb/core/graph/shard/log.h b/ydb/core/graph/shard/log.h new file mode 100644 index 000000000000..c401e075ad01 --- /dev/null +++ b/ydb/core/graph/shard/log.h @@ -0,0 +1,42 @@ +#pragma once + +#if defined BLOG_D || defined BLOG_I || defined BLOG_ERROR || defined BLOG_TRACE +#error log macro definition clash +#endif + +#include +#include +#include + +namespace NKikimr { +namespace NGraph { + +TString GetLogPrefix(); + +template +class TTransactionBase : public NKikimr::NTabletFlatExecutor::TTransactionBase { +protected: + using TSelf = T; + using TBase = TTransactionBase; + +public: + TTransactionBase(T* self) + : NKikimr::NTabletFlatExecutor::TTransactionBase(self) + {} + + TString GetLogPrefix() const { + return NKikimr::NTabletFlatExecutor::TTransactionBase::Self->GetLogPrefix(); + } +}; + +} +} + +#define BLOG_D(stream) ALOG_DEBUG(NKikimrServices::GRAPH, GetLogPrefix() << stream) +#define BLOG_I(stream) ALOG_INFO(NKikimrServices::GRAPH, GetLogPrefix() << stream) +#define BLOG_W(stream) ALOG_WARN(NKikimrServices::GRAPH, GetLogPrefix() << stream) +#define BLOG_NOTICE(stream) ALOG_NOTICE(NKikimrServices::GRAPH, GetLogPrefix() << stream) +#define BLOG_ERROR(stream) ALOG_ERROR(NKikimrServices::GRAPH, GetLogPrefix() << stream) +#define BLOG_CRIT(stream) ALOG_CRIT(NKikimrServices::GRAPH, GetLogPrefix() << stream) +#define BLOG_TRACE(stream) ALOG_TRACE(NKikimrServices::GRAPH, GetLogPrefix() << stream) +#define Y_ENSURE_LOG(cond, stream) if (!(cond)) { BLOG_ERROR("Failed condition \"" << #cond << "\" " << stream); } diff --git a/ydb/core/graph/shard/protos/counters_shard.proto b/ydb/core/graph/shard/protos/counters_shard.proto new file mode 100644 index 000000000000..3ca00fe86588 --- /dev/null +++ b/ydb/core/graph/shard/protos/counters_shard.proto @@ -0,0 +1,29 @@ +import "ydb/core/protos/counters.proto"; + +package NKikimr.NGraphShard; + +option java_package = "ru.yandex.kikimr.proto"; + +option (TabletTypeName) = "GraphShard"; // Used as prefix for all counters + +enum ESimpleCounters { + COUNTER_SIMPLE_IGNORE = 0; +} + +enum ECumulativeCounters { + COUNTER_CUMULATIVE_IGNORE = 0; +} + +enum EPercentileCounters { + COUNTER_PERCENTILE_IGNORE = 0; +} + +enum ETxTypes { + TXTYPE_INIT_SCHEMA = 1 [(TxTypeOpts) = {Name: "TxInitSchema"}]; + TXTYPE_MONITORING = 2 [(TxTypeOpts) = {Name: "TxMonitoring"}]; + TXTYPE_STORE_METRICS = 3 [(TxTypeOpts) = {Name: "TxStoreMetrics"}]; + TXTYPE_CLEAR_DATA = 4 [(TxTypeOpts) = {Name: "TxClearData"}]; + TXTYPE_GET_METRICS = 5 [(TxTypeOpts) = {Name: "TxGetMetrics"}]; + TXTYPE_STARTUP = 6 [(TxTypeOpts) = {Name: "TxStartup"}]; + TXTYPE_CHANGE_BACKEND = 7 [(TxTypeOpts) = {Name: "TxChangeBackend"}]; +} diff --git a/ydb/core/graph/shard/protos/ya.make b/ydb/core/graph/shard/protos/ya.make new file mode 100644 index 000000000000..13ca24b18e4b --- /dev/null +++ b/ydb/core/graph/shard/protos/ya.make @@ -0,0 +1,18 @@ +PROTO_LIBRARY() + +OWNER( + xenoxeno + g:kikimr +) + +SRCS( + counters_shard.proto +) + +PEERDIR( + ydb/core/protos +) + +EXCLUDE_TAGS(GO_PROTO) + +END() diff --git a/ydb/core/graph/shard/schema.h b/ydb/core/graph/shard/schema.h new file mode 100644 index 000000000000..85fee6ad1bbf --- /dev/null +++ b/ydb/core/graph/shard/schema.h @@ -0,0 +1,47 @@ +#pragma once +#include + +namespace NKikimr { +namespace NGraph { + +struct Schema : NIceDb::Schema { + struct State : Table<1> { + struct Name : Column<1, NScheme::NTypeIds::Text> {}; + struct ValueUI64 : Column<2, NScheme::NTypeIds::Uint64> {}; + struct ValueText : Column<3, NScheme::NTypeIds::Text> {}; + + using TKey = TableKey; + using TColumns = TableColumns; + }; + + struct MetricsIndex : Table<2> { + struct Name : Column<1, NScheme::NTypeIds::Text> {}; + struct Id : Column<2, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey; + using TColumns = TableColumns; + }; + + struct MetricsValues : Table<3> { + struct Timestamp : Column<1, NScheme::NTypeIds::Uint64> {}; + struct Id : Column<2, NScheme::NTypeIds::Uint64> {}; + struct Value : Column<3, NScheme::NTypeIds::Double> {}; + + using TKey = TableKey; + using TColumns = TableColumns; + }; + + using TTables = SchemaTables< + State, + MetricsIndex, + MetricsValues + >; + + using TSettings = SchemaSettings< + ExecutorLogBatching, + ExecutorLogFlushPeriod + >; +}; + +} // NGraph +} // NKikimr diff --git a/ydb/core/graph/shard/shard_impl.cpp b/ydb/core/graph/shard/shard_impl.cpp new file mode 100644 index 000000000000..e5361e37ed37 --- /dev/null +++ b/ydb/core/graph/shard/shard_impl.cpp @@ -0,0 +1,111 @@ +#include "shard_impl.h" +#include "log.h" +#include +#include +#include + +namespace NKikimr { +namespace NGraph { + +TGraphShard::TGraphShard(TTabletStorageInfo* info, const TActorId& tablet) + : TActor(&TThis::StateWork) + , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory) +{ + +} + +TString TGraphShard::GetLogPrefix() const { + return "SHARD "; +} + +void TGraphShard::OnActivateExecutor(const TActorContext&) { + BLOG_D("OnActivateExecutor"); + ExecuteTxInitSchema(); +} + +void TGraphShard::OnTabletDead(TEvTablet::TEvTabletDead::TPtr&, const TActorContext&) { + BLOG_D("OnTabletDead"); + PassAway(); +} + +void TGraphShard::OnDetach(const TActorContext&) { + BLOG_D("OnDetach"); + PassAway(); +} + +bool TGraphShard::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext&) { + if (!Executor() || !Executor()->GetStats().IsActive) + return false; + + if (!ev) + return true; + + ExecuteTxMonitoring(std::move(ev)); + return true; +} + +void TGraphShard::OnReadyToWork() { + SignalTabletActive(ActorContext()); +} + +STFUNC(TGraphShard::StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvSubDomain::TEvConfigure, Handle); + hFunc(TEvTabletPipe::TEvServerConnected, Handle); + hFunc(TEvTabletPipe::TEvServerDisconnected, Handle); + hFunc(TEvGraph::TEvSendMetrics, Handle); + hFunc(TEvGraph::TEvGetMetrics, Handle); + default: + if (!HandleDefaultEvents(ev, SelfId())) { + BLOG_W("StateWork unhandled event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString()); + } + break; + } +} + +void TGraphShard::Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev) { + BLOG_TRACE("Handle TEvTabletPipe::TEvServerConnected(" << ev->Get()->ClientId << ") " << ev->Get()->ServerId); +} + +void TGraphShard::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev) { + BLOG_TRACE("Handle TEvTabletPipe::TEvServerDisconnected(" << ev->Get()->ClientId << ") " << ev->Get()->ServerId); +} + +void TGraphShard::Handle(TEvSubDomain::TEvConfigure::TPtr& ev) { + BLOG_D("Handle TEvSubDomain::TEvConfigure(" << ev->Get()->Record.ShortDebugString() << ")"); + Send(ev->Sender, new TEvSubDomain::TEvConfigureStatus(NKikimrTx::TEvSubDomainConfigurationAck::SUCCESS, TabletID())); +} + +void TGraphShard::Handle(TEvGraph::TEvSendMetrics::TPtr& ev) { + BLOG_TRACE("Handle TEvGraph::TEvSendMetrics from " << ev->Sender); + TInstant now = TInstant::Seconds(TActivationContext::Now().Seconds()); // 1 second resolution + if (StartTimestamp == TInstant()) { + StartTimestamp = now; + } + if (now != MetricsData.Timestamp) { + if (MetricsData.Timestamp != TInstant()) { + ExecuteTxStoreMetrics(std::move(MetricsData)); + } + MetricsData.Timestamp = now; + MetricsData.Values.clear(); + } + if ((now - StartTimestamp) > DURATION_CLEAR_TRIGGER && (now - ClearTimestamp) < DURATION_CLEAR_PERIOD) { + ClearTimestamp = now; + ExecuteTxClearData(); + } + for (const auto& metric : ev->Get()->Record.GetMetrics()) { + MetricsData.Values[metric.GetName()] += metric.GetValue(); // simple accumulation by name of metric + } +} + +void TGraphShard::Handle(TEvGraph::TEvGetMetrics::TPtr& ev) { + BLOG_TRACE("Handle TEvGraph::TEvGetMetrics from " << ev->Sender); + ExecuteTxGetMetrics(ev); +} + +IActor* CreateGraphShard(const TActorId& tablet, TTabletStorageInfo* info) { + return new NGraph::TGraphShard(info, tablet); +} + +} // NGraph +} // NKikimr diff --git a/ydb/core/graph/shard/shard_impl.h b/ydb/core/graph/shard/shard_impl.h new file mode 100644 index 000000000000..fca4eafbc014 --- /dev/null +++ b/ydb/core/graph/shard/shard_impl.h @@ -0,0 +1,61 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include "backends.h" + +namespace NKikimr { +namespace NGraph { + +class TGraphShard : public TActor, public NTabletFlatExecutor::TTabletExecutedFlat { +public: + TGraphShard(TTabletStorageInfo* info, const TActorId& tablet); + TString GetLogPrefix() const; + + void OnActivateExecutor(const TActorContext& ctx) override; + void DefaultSignalTabletActive(const TActorContext&) override {} + void OnDetach(const TActorContext&) override; + void OnTabletDead(TEvTablet::TEvTabletDead::TPtr&, const TActorContext&) override; + bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext&) override; + void OnReadyToWork(); + + void Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev); + void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev); + void Handle(TEvSubDomain::TEvConfigure::TPtr& ev); + void Handle(TEvGraph::TEvSendMetrics::TPtr& ev); + void Handle(TEvGraph::TEvGetMetrics::TPtr& ev); + +//protected: + void ExecuteTxInitSchema(); + void ExecuteTxStartup(); + void ExecuteTxMonitoring(NMon::TEvRemoteHttpInfo::TPtr ev); + void ExecuteTxStoreMetrics(TMetricsData&& data); + void ExecuteTxClearData(); + void ExecuteTxGetMetrics(TEvGraph::TEvGetMetrics::TPtr ev); + void ExecuteTxChangeBackend(EBackendType backend); + + STATEFN(StateWork); + + // how often we could issue a clear operation + static constexpr TDuration DURATION_CLEAR_PERIOD = TDuration::Minutes(10); + // after what size of metrics data we issue a clear operation + static constexpr TDuration DURATION_CLEAR_TRIGGER = TDuration::Hours(25); + // the maximum size of metrics data to keep + static constexpr TDuration DURATION_TO_KEEP = TDuration::Hours(24); + + TMetricsData MetricsData; // current accumulated metrics, ready to be stored + TInstant StartTimestamp; // the earliest point of metrics + TInstant ClearTimestamp; // last time of clear operation + EBackendType BackendType = EBackendType::Memory; + TMemoryBackend MemoryBackend; + TLocalBackend LocalBackend; +}; + +} // NGraph +} // NKikimr diff --git a/ydb/core/graph/shard/tx_change_backend.cpp b/ydb/core/graph/shard/tx_change_backend.cpp new file mode 100644 index 000000000000..42a677f15036 --- /dev/null +++ b/ydb/core/graph/shard/tx_change_backend.cpp @@ -0,0 +1,37 @@ +#include "shard_impl.h" +#include "log.h" +#include "schema.h" + +namespace NKikimr { +namespace NGraph { + +class TTxChangeBackend : public TTransactionBase { +private: + EBackendType Backend; +public: + TTxChangeBackend(TGraphShard* shard, EBackendType backend) + : TBase(shard) + , Backend(backend) + {} + + TTxType GetTxType() const override { return NGraphShard::TXTYPE_CHANGE_BACKEND; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + BLOG_D("TTxChangeBackend::Execute (" << static_cast(Backend) << ")"); + NIceDb::TNiceDb db(txc.DB); + db.Table().Key(TString("backend")).Update(static_cast(Backend)); + return true; + } + + void Complete(const TActorContext&) override { + BLOG_D("TTxChangeBackend::Complete"); + } +}; + +void TGraphShard::ExecuteTxChangeBackend(EBackendType backend) { + Execute(new TTxChangeBackend(this, backend)); +} + +} // NGraph +} // NKikimr + diff --git a/ydb/core/graph/shard/tx_clear_data.cpp b/ydb/core/graph/shard/tx_clear_data.cpp new file mode 100644 index 000000000000..f5808e54f9d7 --- /dev/null +++ b/ydb/core/graph/shard/tx_clear_data.cpp @@ -0,0 +1,42 @@ +#include "shard_impl.h" +#include "log.h" +#include "schema.h" + +namespace NKikimr { +namespace NGraph { + +class TTxClearData : public TTransactionBase { +public: + TTxClearData(TGraphShard* shard) + : TBase(shard) + {} + + TTxType GetTxType() const override { return NGraphShard::TXTYPE_CLEAR_DATA; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + BLOG_D("TTxClearData::Execute"); + TInstant cutline = TActivationContext::Now() - TGraphShard::DURATION_TO_KEEP; + switch (Self->BackendType) { + case EBackendType::Memory: + Self->MemoryBackend.ClearData(cutline, Self->StartTimestamp); + return true; + case EBackendType::Local: + return Self->LocalBackend.ClearData(txc, cutline, Self->StartTimestamp); + case EBackendType::External: + break; + } + return true; + } + + void Complete(const TActorContext&) override { + BLOG_D("TTxClearData::Complete"); + } +}; + +void TGraphShard::ExecuteTxClearData() { + Execute(new TTxClearData(this)); +} + +} // NGraph +} // NKikimr + diff --git a/ydb/core/graph/shard/tx_get_metrics.cpp b/ydb/core/graph/shard/tx_get_metrics.cpp new file mode 100644 index 000000000000..1602de1b3dcf --- /dev/null +++ b/ydb/core/graph/shard/tx_get_metrics.cpp @@ -0,0 +1,46 @@ +#include "shard_impl.h" +#include "log.h" + +namespace NKikimr { +namespace NGraph { + +class TTxGetMetrics : public TTransactionBase { +private: + TEvGraph::TEvGetMetrics::TPtr Event; + NKikimrGraph::TEvMetricsResult Result; +public: + TTxGetMetrics(TGraphShard* shard, TEvGraph::TEvGetMetrics::TPtr ev) + : TBase(shard) + , Event(ev) + {} + + TTxType GetTxType() const override { return NGraphShard::TXTYPE_GET_METRICS; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + BLOG_D("TTxGetMetrics::Execute"); + switch (Self->BackendType) { + case EBackendType::Memory: + Self->MemoryBackend.GetMetrics(Event->Get()->Record, Result); + return true; + case EBackendType::Local: + return Self->LocalBackend.GetMetrics(txc, Event->Get()->Record, Result); + case EBackendType::External: + break; + } + return true; + } + + void Complete(const TActorContext& ctx) override { + BLOG_D("TTxGetMetric::Complete"); + BLOG_TRACE("TxGetMetrics returned " << Result.TimeSize() << " points"); + ctx.Send(Event->Sender, new TEvGraph::TEvMetricsResult(std::move(Result)), Event->Cookie); + } +}; + +void TGraphShard::ExecuteTxGetMetrics(TEvGraph::TEvGetMetrics::TPtr ev) { + Execute(new TTxGetMetrics(this, ev)); +} + +} // NGraph +} // NKikimr + diff --git a/ydb/core/graph/shard/tx_init_schema.cpp b/ydb/core/graph/shard/tx_init_schema.cpp new file mode 100644 index 000000000000..840619ac8450 --- /dev/null +++ b/ydb/core/graph/shard/tx_init_schema.cpp @@ -0,0 +1,36 @@ +#include "shard_impl.h" +#include "log.h" +#include "schema.h" + +namespace NKikimr { +namespace NGraph { + +class TTxInitSchema : public TTransactionBase { +public: + TTxInitSchema(TGraphShard* shard) + : TBase(shard) + {} + + TTxType GetTxType() const override { return NGraphShard::TXTYPE_INIT_SCHEMA; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + BLOG_D("TTxInitScheme::Execute"); + NIceDb::TNiceDb db(txc.DB); + db.Materialize(); + db.Table().Key(TString("version")).Update(1); + return true; + } + + void Complete(const TActorContext&) override { + BLOG_D("TTxInitScheme::Complete"); + Self->ExecuteTxStartup(); + } +}; + +void TGraphShard::ExecuteTxInitSchema() { + Execute(new TTxInitSchema(this)); +} + +} // NGraph +} // NKikimr + diff --git a/ydb/core/graph/shard/tx_monitoring.cpp b/ydb/core/graph/shard/tx_monitoring.cpp new file mode 100644 index 000000000000..9629f84a60f7 --- /dev/null +++ b/ydb/core/graph/shard/tx_monitoring.cpp @@ -0,0 +1,80 @@ +#include "shard_impl.h" +#include "log.h" + +namespace NKikimr { +namespace NGraph { + +class TTxMonitoring : public TTransactionBase { +private: + NMon::TEvRemoteHttpInfo::TPtr Event; + +public: + TTxMonitoring(TGraphShard* shard, NMon::TEvRemoteHttpInfo::TPtr ev) + : TBase(shard) + , Event(std::move(ev)) + {} + + TTxType GetTxType() const override { return NGraphShard::TXTYPE_MONITORING; } + + bool Execute(TTransactionContext&, const TActorContext&) override { + BLOG_D("TTxMonitoring::Execute"); + return true; + } + + void Complete(const TActorContext& ctx) override { + BLOG_D("TTxMonitoring::Complete"); + TStringBuilder html; + html << ""; + html << ""; + + html << ""; + + html << ""; + + html << ""; + html << ""; + + html << ""; + html << ""; + + html << "
Backend"; + switch (Self->BackendType) { + case EBackendType::Memory: + html << "Memory"; + break; + case EBackendType::Local: + html << "Local"; + break; + case EBackendType::External: + html << "External"; + break; + } + html << "
Memory.MetricsSize" << Self->MemoryBackend.MetricsIndex.size() << "
Memory.RecordsSize" << Self->MemoryBackend.MetricsValues.size() << "
Local.MetricsSize" << Self->LocalBackend.MetricsIndex.size() << "
Local.StartTimestamp" << Self->StartTimestamp << "
"; + html << ""; + ctx.Send(Event->Sender, new NMon::TEvRemoteHttpInfoRes(html)); + } +}; + +void TGraphShard::ExecuteTxMonitoring(NMon::TEvRemoteHttpInfo::TPtr ev) { + if (ev->Get()->Cgi().Has("action")) { + if (ev->Get()->Cgi().Get("action") == "change_backend") { + ui64 backend = FromStringWithDefault(ev->Get()->Cgi().Get("backend"), 0); + if (backend >= 0 && backend <= 2) { + ExecuteTxChangeBackend(static_cast(backend)); + Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes("

ok

")); + return; + } + } + Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes("

bad parameters

")); + return; + } + Execute(new TTxMonitoring(this, std::move(ev))); +} + +} // NGraph +} // NKikimr + diff --git a/ydb/core/graph/shard/tx_startup.cpp b/ydb/core/graph/shard/tx_startup.cpp new file mode 100644 index 000000000000..dd1d5756e96f --- /dev/null +++ b/ydb/core/graph/shard/tx_startup.cpp @@ -0,0 +1,68 @@ +#include "shard_impl.h" +#include "log.h" +#include "schema.h" +#include "backends.h" + +namespace NKikimr { +namespace NGraph { + +class TTxStartup : public TTransactionBase { +public: + TTxStartup(TGraphShard* shard) + : TBase(shard) + {} + + TTxType GetTxType() const override { return NGraphShard::TXTYPE_STARTUP; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + BLOG_D("TTxStartup::Execute"); + NIceDb::TNiceDb db(txc.DB); + { + auto row = db.Table().Key(TString("backend")).Select(); + if (!row.IsReady()) { + return false; + } + if (!row.EndOfSet()) { + ui64 backend = row.GetValue(); + if (backend >= 0 && backend <= 2) { + Self->BackendType = static_cast(backend); + } + } + } + { + auto rowset = db.Table().Select(); + if (!rowset.IsReady()) { + return false; + } + while (!rowset.EndOfSet()) { + Self->LocalBackend.MetricsIndex[rowset.GetValue()] = rowset.GetValue(); + if (!rowset.Next()) { + return false; + } + } + } + { + auto rowset = db.Table().Select(); + if (!rowset.IsReady()) { + return false; + } + if (!rowset.EndOfSet()) { + Self->StartTimestamp = TInstant::Seconds(rowset.GetValue()); + } + } + return true; + } + + void Complete(const TActorContext&) override { + BLOG_D("TTxStartup::Complete"); + Self->OnReadyToWork(); + } +}; + +void TGraphShard::ExecuteTxStartup() { + Execute(new TTxStartup(this)); +} + +} // NGraph +} // NKikimr + diff --git a/ydb/core/graph/shard/tx_store_metrics.cpp b/ydb/core/graph/shard/tx_store_metrics.cpp new file mode 100644 index 000000000000..78456361c60b --- /dev/null +++ b/ydb/core/graph/shard/tx_store_metrics.cpp @@ -0,0 +1,45 @@ +#include "shard_impl.h" +#include "log.h" + +namespace NKikimr { +namespace NGraph { + +class TTxStoreMetrics : public TTransactionBase { +private: + TMetricsData Data; + +public: + TTxStoreMetrics(TGraphShard* shard, TMetricsData&& data) + : TBase(shard) + , Data(std::move(data)) + {} + + TTxType GetTxType() const override { return NGraphShard::TXTYPE_STORE_METRICS; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + BLOG_D("TTxStoreMetrics::Execute"); + switch (Self->BackendType) { + case EBackendType::Memory: + Self->MemoryBackend.StoreMetrics(std::move(Data)); + return true; + case EBackendType::Local: + return Self->LocalBackend.StoreMetrics(txc, std::move(Data)); + case EBackendType::External: + // TODO + break; + } + return true; + } + + void Complete(const TActorContext&) override { + BLOG_D("TTxStoreMetrics::Complete"); + } +}; + +void TGraphShard::ExecuteTxStoreMetrics(TMetricsData&& data) { + Execute(new TTxStoreMetrics(this, std::move(data))); +} + +} // NGraph +} // NKikimr + diff --git a/ydb/core/graph/shard/ut/shard_ut.cpp b/ydb/core/graph/shard/ut/shard_ut.cpp new file mode 100644 index 000000000000..c700c2f141c2 --- /dev/null +++ b/ydb/core/graph/shard/ut/shard_ut.cpp @@ -0,0 +1,185 @@ +#include +#include +#include +#include +#include +#include +#include + +#ifdef NDEBUG +#define Ctest Cnull +#else +#define Ctest Cerr +#endif + +Y_DECLARE_OUT_SPEC(, std::vector, stream, value) { + stream << '['; + for (auto it = value.begin(); it != value.end(); ++it) { + if (it != value.begin()) { + stream << ','; + } + stream << it->GetValue(); + } + stream << ']'; +} + +Y_DECLARE_OUT_SPEC(, std::vector, stream, value) { + stream << '['; + for (auto it = value.begin(); it != value.end(); ++it) { + if (it != value.begin()) { + stream << ','; + } + stream << *it; + } + stream << ']'; +} + +namespace NKikimr { + +using namespace Tests; +using namespace NSchemeShardUT_Private; + +Y_UNIT_TEST_SUITE(GraphShard) { + Y_UNIT_TEST(DownsampleFixed) { + std::vector sourceData = { + TInstant::FromValue( 1 ), + TInstant::FromValue( 2 ), + TInstant::FromValue( 3 ), + TInstant::FromValue( 4 ), + TInstant::FromValue( 5 ), + TInstant::FromValue( 6 ), + TInstant::FromValue( 7 ), + TInstant::FromValue( 8 ), + TInstant::FromValue( 9 ), + TInstant::FromValue( 10 ) + }; + { + std::vector targetData = NGraph::TMemoryBackend::Downsample(sourceData, 10); + Ctest << targetData << Endl; + std::vector canonData = { + TInstant::FromValue( 1 ), + TInstant::FromValue( 2 ), + TInstant::FromValue( 3 ), + TInstant::FromValue( 4 ), + TInstant::FromValue( 5 ), + TInstant::FromValue( 6 ), + TInstant::FromValue( 7 ), + TInstant::FromValue( 8 ), + TInstant::FromValue( 9 ), + TInstant::FromValue( 10 ) + }; + UNIT_ASSERT(targetData == canonData); + } + { + std::vector targetData = NGraph::TMemoryBackend::Downsample(sourceData, 5); + Ctest << targetData << Endl; + std::vector canonData = { + TInstant::FromValue( 1 ), + TInstant::FromValue( 3 ), + TInstant::FromValue( 5 ), + TInstant::FromValue( 7 ), + TInstant::FromValue( 9 ) + }; + UNIT_ASSERT(targetData == canonData); + } + { + std::vector targetData = NGraph::TMemoryBackend::Downsample(sourceData, 1); + Ctest << targetData << Endl; + std::vector canonData = { TInstant::FromValue( 1 ) }; + UNIT_ASSERT(targetData == canonData); + } + } + + Y_UNIT_TEST(DownsampleFloat) { + std::vector sourceData = {1,2,3,4,5, 6, 7, 8, 9, 10}; + { + std::vector targetData = NGraph::TMemoryBackend::Downsample(sourceData, 10); + Ctest << targetData << Endl; + std::vector canonData = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + UNIT_ASSERT(targetData == canonData); + } + { + std::vector targetData = NGraph::TMemoryBackend::Downsample(sourceData, 5); + Ctest << targetData << Endl; + std::vector canonData = {1.5, 3.5, 5.5, 7.5, 9.5}; + UNIT_ASSERT(targetData == canonData); + } + { + std::vector targetData = NGraph::TMemoryBackend::Downsample(sourceData, 1); + Ctest << targetData << Endl; + std::vector canonData = {5.5}; + UNIT_ASSERT(targetData == canonData); + } + } + + TTenantTestConfig GetTenantTestConfig() { + return { + .Domains = { + { + .Name = DOMAIN1_NAME, + .SchemeShardId = SCHEME_SHARD1_ID, + .Subdomains = {TENANT1_1_NAME, TENANT1_2_NAME} + } + }, + .HiveId = HIVE_ID, + .FakeTenantSlotBroker = true, + .FakeSchemeShard = true, + .CreateConsole = false, + .Nodes = { + { + .TenantPoolConfig = { + .StaticSlots = { + { + .Tenant = DOMAIN1_NAME, + .Limit = { + .CPU = 1, + .Memory = 1, + .Network = 1 + } + } + }, + .NodeType = "node-type" + } + } + }, + .DataCenterCount = 1 + }; + } + + Y_UNIT_TEST(CreateGraphShard) { + TTestBasicRuntime runtime; + + runtime.SetLogPriority(NKikimrServices::GRAPH, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::HIVE, NLog::PRI_TRACE); + + TTestEnv env(runtime); + ui64 txId = 100; + TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", R"( + Name: "db1" + )"); + + env.TestWaitNotification(runtime, txId); + + TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", R"( + Name: "db1" + PlanResolution: 50 + Coordinators: 1 + Mediators: 1 + TimeCastBucketsPerMediator: 2 + StoragePools { + Name: "pool-1" + Kind: "hdd" + } + ExternalSchemeShard: true + ExternalHive: true + GraphShard: true + )"); + + env.TestWaitNotification(runtime, txId); + + auto result = DescribePath(runtime, "/MyRoot/db1"); + UNIT_ASSERT(result.GetPathDescription().GetDomainDescription().GetProcessingParams().GetGraphShard() != 0); + } +} + +} // NKikimr diff --git a/ydb/core/graph/shard/ut/ya.make b/ydb/core/graph/shard/ut/ya.make new file mode 100644 index 000000000000..9f3d2861bd14 --- /dev/null +++ b/ydb/core/graph/shard/ut/ya.make @@ -0,0 +1,22 @@ +UNITTEST_FOR(ydb/core/graph/shard) + +OWNER( + xenoxeno + g:kikimr +) + +SIZE(SMALL) + +SRC( + shard_ut.cpp +) + +PEERDIR( + ydb/library/actors/helpers + ydb/core/tx/schemeshard/ut_helpers + ydb/core/testlib/default +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/graph/shard/ya.make b/ydb/core/graph/shard/ya.make new file mode 100644 index 000000000000..20feec5286cb --- /dev/null +++ b/ydb/core/graph/shard/ya.make @@ -0,0 +1,35 @@ +LIBRARY() + +OWNER( + xenoxeno + g:kikimr +) + +SRCS( + backends.cpp + backends.h + log.h + schema.h + shard_impl.cpp + shard_impl.h + tx_change_backend.cpp + tx_clear_data.cpp + tx_get_metrics.cpp + tx_init_schema.cpp + tx_monitoring.cpp + tx_startup.cpp + tx_store_metrics.cpp +) + +PEERDIR( + ydb/library/actors/core + ydb/core/base + ydb/core/tablet + ydb/core/tablet_flat + ydb/core/graph/api + ydb/core/graph/shard/protos +) + +END() + +RECURSE_FOR_TESTS(ut) diff --git a/ydb/core/graph/ut/graph_ut.cpp b/ydb/core/graph/ut/graph_ut.cpp new file mode 100644 index 000000000000..104fafc10504 --- /dev/null +++ b/ydb/core/graph/ut/graph_ut.cpp @@ -0,0 +1,184 @@ +#include +#include +#include +#include +#include +#include +#include + +#ifdef NDEBUG +#define Ctest Cnull +#else +#define Ctest Cerr +#endif + +namespace NKikimr { + +using namespace Tests; +using namespace NSchemeShardUT_Private; + +Y_UNIT_TEST_SUITE(Graph) { + TTenantTestConfig GetTenantTestConfig() { + return { + .Domains = { + { + .Name = DOMAIN1_NAME, + .SchemeShardId = SCHEME_SHARD1_ID, + .Subdomains = {TENANT1_1_NAME, TENANT1_2_NAME} + } + }, + .HiveId = HIVE_ID, + .FakeTenantSlotBroker = true, + .FakeSchemeShard = true, + .CreateConsole = false, + .Nodes = { + { + .TenantPoolConfig = { + .StaticSlots = { + { + .Tenant = DOMAIN1_NAME, + .Limit = { + .CPU = 1, + .Memory = 1, + .Network = 1 + } + } + }, + .NodeType = "node-type" + } + } + }, + .DataCenterCount = 1 + }; + } + + + + Y_UNIT_TEST(CreateGraphShard) { + TTestBasicRuntime runtime; + + runtime.SetLogPriority(NKikimrServices::GRAPH, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::HIVE, NLog::PRI_TRACE); + + TTestEnv env(runtime); + ui64 txId = 100; + TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", R"( + Name: "db1" + )"); + + env.TestWaitNotification(runtime, txId); + + TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", R"( + Name: "db1" + PlanResolution: 50 + Coordinators: 1 + Mediators: 1 + TimeCastBucketsPerMediator: 2 + StoragePools { + Name: "pool-1" + Kind: "hdd" + } + ExternalSchemeShard: true + ExternalHive: true + GraphShard: true + )"); + + env.TestWaitNotification(runtime, txId); + + auto result = DescribePath(runtime, "/MyRoot/db1"); + UNIT_ASSERT(result.GetPathDescription().GetDomainDescription().GetProcessingParams().GetGraphShard() != 0); + } + + Y_UNIT_TEST(UseGraphShard) { + TTestBasicRuntime runtime; + + runtime.SetLogPriority(NKikimrServices::GRAPH, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::HIVE, NLog::PRI_TRACE); + + TTestEnv::ENABLE_SCHEMESHARD_LOG = false; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateExtSubDomain(runtime, ++txId, "/MyRoot", R"( + Name: "db1" + )"); + + env.TestWaitNotification(runtime, txId); + + TestAlterExtSubDomain(runtime, ++txId, "/MyRoot", R"( + Name: "db1" + PlanResolution: 50 + Coordinators: 1 + Mediators: 1 + TimeCastBucketsPerMediator: 2 + StoragePools { + Name: "pool-1" + Kind: "hdd" + } + ExternalSchemeShard: true + ExternalHive: true + GraphShard: true + )"); + + env.TestWaitNotification(runtime, txId); + + NKikimrScheme::TEvDescribeSchemeResult result = DescribePath(runtime, "/MyRoot/db1"); + UNIT_ASSERT(result.GetPathDescription().GetDomainDescription().GetProcessingParams().GetGraphShard() != 0); + + IActor* service = NGraph::CreateGraphService("/MyRoot/db1"); + TActorId serviceId = runtime.Register(service); + runtime.RegisterService(NGraph::MakeGraphServiceId(), serviceId); + TActorId sender = runtime.AllocateEdgeActor(); + + // this call is needed to wait for establishing of pipe connection + { + NGraph::TEvGraph::TEvGetMetrics* event = new NGraph::TEvGraph::TEvGetMetrics(); + event->Record.AddMetrics("test.metric1"); + runtime.Send(NGraph::MakeGraphServiceId(), sender, event); + TAutoPtr handle; + NGraph::TEvGraph::TEvMetricsResult* response = runtime.GrabEdgeEventRethrow(handle); + Ctest << "Received result: " << response->Record.ShortDebugString() << Endl; + } + + { + NGraph::TEvGraph::TEvSendMetrics* event = new NGraph::TEvGraph::TEvSendMetrics(); + NKikimrGraph::TMetric* metric = event->Record.AddMetrics(); + metric->SetName("test.metric1"); + metric->SetValue(13); + runtime.Send(NGraph::MakeGraphServiceId(), sender, event); + } + + runtime.SimulateSleep(TDuration::Seconds(1)); + + { + NGraph::TEvGraph::TEvSendMetrics* event = new NGraph::TEvGraph::TEvSendMetrics(); + NKikimrGraph::TMetric* metric = event->Record.AddMetrics(); + metric->SetName("test.metric1"); + metric->SetValue(14); + runtime.Send(NGraph::MakeGraphServiceId(), sender, event); + } + + runtime.SimulateSleep(TDuration::Seconds(1)); + + { + NGraph::TEvGraph::TEvSendMetrics* event = new NGraph::TEvGraph::TEvSendMetrics(); + NKikimrGraph::TMetric* metric = event->Record.AddMetrics(); + metric->SetName("test.metric1"); + metric->SetValue(15); + runtime.Send(NGraph::MakeGraphServiceId(), sender, event); + } + + { + NGraph::TEvGraph::TEvGetMetrics* event = new NGraph::TEvGraph::TEvGetMetrics(); + event->Record.AddMetrics("test.metric1"); + runtime.Send(NGraph::MakeGraphServiceId(), sender, event); + TAutoPtr handle; + NGraph::TEvGraph::TEvMetricsResult* response = runtime.GrabEdgeEventRethrow(handle); + Ctest << "Received result: " << response->Record.ShortDebugString() << Endl; + UNIT_ASSERT(response->Record.DataSize() > 0); + UNIT_ASSERT(response->Record.GetData(0).ShortDebugString() == "Values: 13 Values: 14"); + } + } +} + +} // NKikimr diff --git a/ydb/core/graph/ut/ya.make b/ydb/core/graph/ut/ya.make new file mode 100644 index 000000000000..b69d8208f9c5 --- /dev/null +++ b/ydb/core/graph/ut/ya.make @@ -0,0 +1,24 @@ +UNITTEST_FOR(ydb/core/graph) + +OWNER( + xenoxeno + g:kikimr +) + +SIZE(SMALL) + +SRC( + graph_ut.cpp +) + +PEERDIR( + ydb/library/actors/helpers + ydb/core/tx/schemeshard/ut_helpers + ydb/core/testlib/default + ydb/core/graph/shard + ydb/core/graph/service +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/graph/ya.make b/ydb/core/graph/ya.make new file mode 100644 index 000000000000..50821e8f78a5 --- /dev/null +++ b/ydb/core/graph/ya.make @@ -0,0 +1,13 @@ +OWNER( + xenoxeno + g:kikimr +) + +RECURSE( + api + protos + service + shard +) + +RECURSE_FOR_TESTS(ut) diff --git a/ydb/core/mind/hive/hive_statics.cpp b/ydb/core/mind/hive/hive_statics.cpp index e3d6c65eb7bd..9e3f4695ba00 100644 --- a/ydb/core/mind/hive/hive_statics.cpp +++ b/ydb/core/mind/hive/hive_statics.cpp @@ -384,7 +384,7 @@ void MakeTabletTypeSet(std::vector& list) { bool IsValidTabletType(TTabletTypes::EType type) { return (type > TTabletTypes::Unknown - && type < TTabletTypes::Reserved41 + && type < TTabletTypes::EType_MAX ); } diff --git a/ydb/core/mind/hive/monitoring.cpp b/ydb/core/mind/hive/monitoring.cpp index 25c8e9c3af76..3705912d47ef 100644 --- a/ydb/core/mind/hive/monitoring.cpp +++ b/ydb/core/mind/hive/monitoring.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include #include @@ -747,7 +747,8 @@ class TTxMonEvent_Settings : public TTransactionBase { TTabletTypes::NodeBroker, TTabletTypes::TestShard, TTabletTypes::BlobDepot, - TTabletTypes::ColumnShard}) { + TTabletTypes::ColumnShard, + TTabletTypes::GraphShard}) { if (shortType == LongToShortTabletName(TTabletTypes::TypeToStr(tabletType))) { return tabletType; } @@ -1280,6 +1281,8 @@ class TTxMonEvent_Landing : public TTransactionBase { return "BD"; case TTabletTypes::StatisticsAggregator: return "SA"; + case TTabletTypes::GraphShard: + return "GS"; default: return Sprintf("%d", (int)type); } diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 320927d7a596..ca2a07a1d508 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -193,6 +193,8 @@ enum ESimpleCounters { COUNTER_IN_FLIGHT_OPS_TxCreateView = 155 [(CounterOpts) = {Name: "InFlightOps/CreateView"}]; COUNTER_IN_FLIGHT_OPS_TxAlterView = 156 [(CounterOpts) = {Name: "InFlightOps/AlterView"}]; COUNTER_IN_FLIGHT_OPS_TxDropView = 157 [(CounterOpts) = {Name: "InFlightOps/DropView"}]; + + COUNTER_GRAPHSHARD_COUNT = 158 [(CounterOpts) = {Name: "GraphShards"}]; } enum ECumulativeCounters { diff --git a/ydb/core/protos/flat_tx_scheme.proto b/ydb/core/protos/flat_tx_scheme.proto index 2a7488687896..598bf412cfa7 100644 --- a/ydb/core/protos/flat_tx_scheme.proto +++ b/ydb/core/protos/flat_tx_scheme.proto @@ -358,6 +358,7 @@ message TEvSyncTenantSchemeShard { optional uint64 TenantHive = 8; optional uint64 TenantSysViewProcessor = 9; optional uint64 TenantStatisticsAggregator = 11; + optional fixed64 TenantGraphShard = 12; optional string TenantRootACL = 10; } @@ -379,6 +380,7 @@ message TEvUpdateTenantSchemeShard { optional uint64 TenantHive = 10; optional uint64 TenantSysViewProcessor = 11; optional uint64 TenantStatisticsAggregator = 16; + optional fixed64 TenantGraphShard = 18; optional NKikimrSubDomains.TSchemeQuotas DeclaredSchemeQuotas = 12; optional Ydb.Cms.DatabaseQuotas DatabaseQuotas = 14; diff --git a/ydb/core/protos/subdomains.proto b/ydb/core/protos/subdomains.proto index cafd60f519c6..e740544f8565 100644 --- a/ydb/core/protos/subdomains.proto +++ b/ydb/core/protos/subdomains.proto @@ -25,6 +25,7 @@ message TSubDomainSettings { optional Ydb.Cms.DatabaseQuotas DatabaseQuotas = 12; optional TAuditSettings AuditSettings = 13; optional bool ExternalStatisticsAggregator = 14 [default = false]; + optional bool GraphShard = 16 [default = false]; } message TProcessingParams { @@ -38,8 +39,7 @@ message TProcessingParams { optional fixed64 Hive = 7; optional fixed64 SysViewProcessor = 8; optional fixed64 StatisticsAggregator = 10; - - //put there SubSchemeShard and SubHive at the future + optional fixed64 GraphShard = 11; // Plan resolution for idle coordinators optional uint64 IdlePlanResolution = 9; diff --git a/ydb/core/protos/tablet.proto b/ydb/core/protos/tablet.proto index 1723064942e4..08903c7d0fbf 100644 --- a/ydb/core/protos/tablet.proto +++ b/ydb/core/protos/tablet.proto @@ -48,14 +48,15 @@ message TTabletTypes { ReplicationController = 38; BlobDepot = 39; StatisticsAggregator = 40; + GraphShard = 41; // when adding a new tablet type and keeping parse compatibility with the old version // rename existing reserved item to desired one, and add new reserved item to // the end of reserved list - Reserved41 = 41; Reserved42 = 42; Reserved43 = 43; Reserved44 = 44; + Reserved45 = 45; UserTypeStart = 255; TypeInvalid = -1; diff --git a/ydb/core/tablet/tablet_counters_app.cpp b/ydb/core/tablet/tablet_counters_app.cpp index bfed7ffd13cb..f3c768574409 100644 --- a/ydb/core/tablet/tablet_counters_app.cpp +++ b/ydb/core/tablet/tablet_counters_app.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace NKikimr { @@ -35,6 +36,12 @@ THolder CreateAppCountersByTabletType(TTabletTypes::EType t NKesus::ECumulativeCounters_descriptor, NKesus::EPercentileCounters_descriptor >>(); + case TTabletTypes::GraphShard: + return MakeHolder>(); default: return {}; } diff --git a/ydb/core/tablet/ya.make b/ydb/core/tablet/ya.make index 662e29c87583..7013c5cd0897 100644 --- a/ydb/core/tablet/ya.make +++ b/ydb/core/tablet/ya.make @@ -64,6 +64,7 @@ PEERDIR( library/cpp/deprecated/enum_codegen library/cpp/yson ydb/core/base + ydb/core/graph/shard/protos ydb/core/mon ydb/core/mon_alloc ydb/core/node_whiteboard diff --git a/ydb/core/testlib/tablet_helpers.cpp b/ydb/core/testlib/tablet_helpers.cpp index 57981f9880fa..5d62205c0c6f 100644 --- a/ydb/core/testlib/tablet_helpers.cpp +++ b/ydb/core/testlib/tablet_helpers.cpp @@ -44,6 +44,7 @@ #include #include #include +#include #include #include @@ -1223,6 +1224,8 @@ namespace NKikimr { bootstrapperActorId = Boot(ctx, type, &CreatePersQueue, DataGroupErasure); } else if (type == TTabletTypes::StatisticsAggregator) { bootstrapperActorId = Boot(ctx, type, &NStat::CreateStatisticsAggregator, DataGroupErasure); + } else if (type == TTabletTypes::GraphShard) { + bootstrapperActorId = Boot(ctx, type, &NGraph::CreateGraphShard, DataGroupErasure); } else { status = NKikimrProto::ERROR; } diff --git a/ydb/core/testlib/tenant_runtime.cpp b/ydb/core/testlib/tenant_runtime.cpp index 0f7eac140f57..88ba8b75905c 100644 --- a/ydb/core/testlib/tenant_runtime.cpp +++ b/ydb/core/testlib/tenant_runtime.cpp @@ -793,11 +793,11 @@ void TTenantTestRuntime::Setup(bool createTenantPools) if (ENABLE_DETAILED_LOG) { SetLogPriority(NKikimrServices::LOCAL, NLog::PRI_DEBUG); SetLogPriority(NKikimrServices::TENANT_POOL, NLog::PRI_DEBUG); - SetLogPriority(NKikimrServices::LABELS_MAINTAINER, NLog::PRI_DEBUG); + //SetLogPriority(NKikimrServices::LABELS_MAINTAINER, NLog::PRI_DEBUG); SetLogPriority(NKikimrServices::TENANT_SLOT_BROKER, NLog::PRI_DEBUG); - SetLogPriority(NKikimrServices::CMS, NLog::PRI_DEBUG); - SetLogPriority(NKikimrServices::CMS_CONFIGS, NLog::PRI_TRACE); - SetLogPriority(NKikimrServices::CMS_TENANTS, NLog::PRI_TRACE); + //SetLogPriority(NKikimrServices::CMS, NLog::PRI_DEBUG); + //SetLogPriority(NKikimrServices::CMS_CONFIGS, NLog::PRI_TRACE); + //SetLogPriority(NKikimrServices::CMS_TENANTS, NLog::PRI_TRACE); SetLogPriority(NKikimrServices::CONFIGS_DISPATCHER, NLog::PRI_TRACE); SetLogPriority(NKikimrServices::CONFIGS_CACHE, NLog::PRI_TRACE); SetLogPriority(NKikimrServices::HIVE, NLog::PRI_DEBUG); diff --git a/ydb/core/tx/schemeshard/schemeshard.h b/ydb/core/tx/schemeshard/schemeshard.h index be4b061765c1..e5f8cfe07d42 100644 --- a/ydb/core/tx/schemeshard/schemeshard.h +++ b/ydb/core/tx/schemeshard/schemeshard.h @@ -497,32 +497,38 @@ struct TEvSchemeShard { EvSyncTenantSchemeShard> { TEvSyncTenantSchemeShard() = default; - TEvSyncTenantSchemeShard(const TPathId& domainKey, - ui64 tabletId, - ui64 generation, - ui64 effectiveACLVersion, - ui64 subdomainVersion, - ui64 userAttrsVersion, - ui64 tenantHive, - ui64 tenantSysViewProcessor, - ui64 tenantStatisticsAggregator, - const TString& rootACL) + struct TEvSyncTenantSchemeShardInitializer { + TPathId DomainKey; + ui64 TabletId; + ui64 Generation; + ui64 EffectiveACLVersion; + ui64 SubdomainVersion; + ui64 UserAttrsVersion; + ui64 TenantHive; + ui64 TenantSysViewProcessor; + ui64 TenantStatisticsAggregator; + ui64 TenantGraphShard; + TString RootACL; + }; + + TEvSyncTenantSchemeShard(const TEvSyncTenantSchemeShardInitializer& _) { - Record.SetDomainSchemeShard(domainKey.OwnerId); - Record.SetDomainPathId(domainKey.LocalPathId); + Record.SetDomainSchemeShard(_.DomainKey.OwnerId); + Record.SetDomainPathId(_.DomainKey.LocalPathId); - Record.SetTabletID(tabletId); - Record.SetGeneration(generation); + Record.SetTabletID(_.TabletId); + Record.SetGeneration(_.Generation); - Record.SetEffectiveACLVersion(effectiveACLVersion); - Record.SetSubdomainVersion(subdomainVersion); - Record.SetUserAttributesVersion(userAttrsVersion); + Record.SetEffectiveACLVersion(_.EffectiveACLVersion); + Record.SetSubdomainVersion(_.SubdomainVersion); + Record.SetUserAttributesVersion(_.UserAttrsVersion); - Record.SetTenantHive(tenantHive); - Record.SetTenantSysViewProcessor(tenantSysViewProcessor); - Record.SetTenantStatisticsAggregator(tenantStatisticsAggregator); + Record.SetTenantHive(_.TenantHive); + Record.SetTenantSysViewProcessor(_.TenantSysViewProcessor); + Record.SetTenantStatisticsAggregator(_.TenantStatisticsAggregator); + Record.SetTenantGraphShard(_.TenantGraphShard); - Record.SetTenantRootACL(rootACL); + Record.SetTenantRootACL(_.RootACL); } }; @@ -575,6 +581,10 @@ struct TEvSchemeShard { void SetUpdateTenantRootACL(const TString& acl) { Record.SetUpdateTenantRootACL(acl); } + + void SetTenantGraphShard(ui64 gs) { + Record.SetTenantGraphShard(gs); + } }; struct TEvFindTabletSubDomainPathId diff --git a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp index 83b6cd3cce54..18f1881f250f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp @@ -107,6 +107,9 @@ struct TSchemeShard::TTxDeleteTabletReply : public TSchemeShard::TRwTxBase { case ETabletType::StatisticsAggregator: Self->TabletCounters->Simple()[COUNTER_STATISTICS_AGGREGATOR_COUNT].Sub(1); break; + case ETabletType::GraphShard: + Self->TabletCounters->Simple()[COUNTER_GRAPHSHARD_COUNT].Sub(1); + break; default: Y_FAIL_S("Unknown TabletType" << ", ShardIdx " << ShardIdx diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 403e0f30c649..e1f75bf1b4e6 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -3924,6 +3924,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase { case ETabletType::StatisticsAggregator: Self->TabletCounters->Simple()[COUNTER_STATISTICS_AGGREGATOR_COUNT].Add(1); break; + case ETabletType::GraphShard: + Self->TabletCounters->Simple()[COUNTER_GRAPHSHARD_COUNT].Add(1); + break; default: Y_FAIL_S("dont know how to interpret tablet type" << ", type id: " << (ui32)si.second.TabletType diff --git a/ydb/core/tx/schemeshard/schemeshard__init_root.cpp b/ydb/core/tx/schemeshard/schemeshard__init_root.cpp index 6b9d51a8b4a6..908784e3f2c7 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init_root.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init_root.cpp @@ -399,7 +399,7 @@ struct TSchemeShard::TTxInitTenantSchemeShard : public TSchemeShard::TRwTxBase { if (record.HasServerlessComputeResourcesMode()) { subdomain->SetServerlessComputeResourcesMode(record.GetServerlessComputeResourcesMode()); } - + RegisterShard(db, subdomain, processingParams.GetCoordinators(), TTabletTypes::Coordinator); RegisterShard(db, subdomain, processingParams.GetMediators(), TTabletTypes::Mediator); RegisterShard(db, subdomain, TVector{processingParams.GetSchemeShard()}, TTabletTypes::SchemeShard); @@ -412,6 +412,9 @@ struct TSchemeShard::TTxInitTenantSchemeShard : public TSchemeShard::TRwTxBase { if (processingParams.HasStatisticsAggregator()) { RegisterShard(db, subdomain, TVector{processingParams.GetStatisticsAggregator()}, TTabletTypes::StatisticsAggregator); } + if (processingParams.HasGraphShard()) { + RegisterShard(db, subdomain, TVector{processingParams.GetGraphShard()}, TTabletTypes::GraphShard); + } subdomain->Initialize(Self->ShardInfos); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp index 18fd376bf6f4..ef8c34d80c3c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_extsubdomain.cpp @@ -37,6 +37,7 @@ struct TParamsDelta { uint8_t AddExternalHive = 0; uint8_t AddExternalSysViewProcessor = 0; uint8_t AddExternalStatisticsAggregator = 0; + uint8_t AddGraphShard = 0; bool SharedTxSupportAdded = false; TVector StoragePoolsAdded; bool ServerlessComputeResourcesModeChanged = false; @@ -189,6 +190,21 @@ VerifyParams(TParamsDelta* delta, const TPathId pathId, const TSubDomainInfo::TP } } + // GraphShard checks + uint8_t addGraphShard = 0; + if (input.GetGraphShard()) { + const bool prev = bool(current->GetTenantGraphShardID()); + const bool next = input.GetGraphShard(); + const bool changed = (prev != next); + + if (changed) { + if (next == false) { + return paramError("GraphShard could only be added, not removed"); + } + addGraphShard = 1; + } + } + // Second params check: combinations bool sharedTxSupportAdded = (coordinatorsAdded + mediatorsAdded) > 0; @@ -275,6 +291,7 @@ VerifyParams(TParamsDelta* delta, const TPathId pathId, const TSubDomainInfo::TP delta->AddExternalHive = addExternalHive; delta->AddExternalSysViewProcessor = addExternalSysViewProcessor; delta->AddExternalStatisticsAggregator = addExternalStatisticsAggregator; + delta->AddGraphShard = addGraphShard; delta->SharedTxSupportAdded = sharedTxSupportAdded; delta->StoragePoolsAdded = std::move(storagePoolsAdded); delta->ServerlessComputeResourcesModeChanged = serverlessComputeResourcesModeChanged; @@ -856,7 +873,7 @@ class TAlterExtSubDomain: public TSubOperation { //NOTE: ExternalHive, ExternalSysViewProcessor and ExternalStatisticsAggregator are _not_ counted against limits ui64 tabletsToCreateUnderLimit = delta.AddExternalSchemeShard + delta.CoordinatorsAdded + delta.MediatorsAdded; - ui64 tabletsToCreateOverLimit = delta.AddExternalSysViewProcessor + delta.AddExternalStatisticsAggregator; + ui64 tabletsToCreateOverLimit = delta.AddExternalSysViewProcessor + delta.AddExternalStatisticsAggregator + delta.AddGraphShard; ui64 tabletsToCreateTotal = tabletsToCreateUnderLimit + tabletsToCreateOverLimit; // Check path limits @@ -933,7 +950,8 @@ class TAlterExtSubDomain: public TSubOperation { delta.AddExternalSchemeShard || delta.AddExternalSysViewProcessor || delta.AddExternalHive || - delta.AddExternalStatisticsAggregator) + delta.AddExternalStatisticsAggregator || + delta.AddGraphShard) { if (!context.SS->ResolveSubdomainsChannels(alter->GetStoragePools(), channelsBinding)) { result->SetError(NKikimrScheme::StatusInvalidParameter, "failed to construct channels binding"); @@ -942,8 +960,8 @@ class TAlterExtSubDomain: public TSubOperation { } // Declare shards. - // - hive always come first (OwnerIdx 1) - // - schemeshard always come second (OwnerIdx 2) + // - hive always comes first (OwnerIdx 1) + // - schemeshard always comes second (OwnerIdx 2) // - others follow // if (delta.AddExternalHive && !context.SS->EnableAlterDatabaseCreateHiveFirst) { @@ -963,6 +981,9 @@ class TAlterExtSubDomain: public TSubOperation { if (delta.AddExternalStatisticsAggregator) { AddShardsTo(txState, OperationId.GetTxId(), basenameId, 1, TTabletTypes::StatisticsAggregator, channelsBinding, context.SS); } + if (delta.AddGraphShard) { + AddShardsTo(txState, OperationId.GetTxId(), basenameId, 1, TTabletTypes::GraphShard, channelsBinding, context.SS); + } Y_ABORT_UNLESS(txState.Shards.size() == tabletsToCreateTotal); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_subdomain.h b/ydb/core/tx/schemeshard/schemeshard__operation_common_subdomain.h index 294c9f77e49c..c4caed85e7dc 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common_subdomain.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_subdomain.h @@ -258,6 +258,16 @@ class TConfigureParts: public TSubOperationState { context.OnComplete.BindMsgToPipe(OperationId, tabletID, idx, event); break; } + case ETabletType::GraphShard: { + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Send configure request to graph shard: " << tabletID << + " opId: " << OperationId << + " schemeshard: " << ssId); + shard.Operation = TTxState::ConfigureParts; + auto event = new TEvSubDomain::TEvConfigure(processing); + context.OnComplete.BindMsgToPipe(OperationId, tabletID, idx, event); + break; + } default: Y_FAIL_S("Unexpected type, we don't create tablets with type " << ETabletType::TypeToStr(type)); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp index 5f5022967f46..843fe9805593 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp @@ -525,6 +525,11 @@ void TSideEffects::DoUpdateTenant(TSchemeShard* ss, NTabletFlatExecutor::TTransa } } + if (!tenantLink.TenantGraphShard && subDomain->GetTenantGraphShardID()) { + message->SetTenantGraphShard(ui64(subDomain->GetTenantGraphShardID())); + hasChanges = true; + } + if (!hasChanges) { LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "DoUpdateTenant no hasChanges" diff --git a/ydb/core/tx/schemeshard/schemeshard__sync_update_tenants.cpp b/ydb/core/tx/schemeshard/schemeshard__sync_update_tenants.cpp index 9d75aacb1f23..dc87be3b16ea 100644 --- a/ydb/core/tx/schemeshard/schemeshard__sync_update_tenants.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__sync_update_tenants.cpp @@ -192,6 +192,14 @@ struct TSchemeShard::TTxUpdateTenant : public TSchemeShard::TRwTxBase { Y_ABORT_UNLESS(tenantSA == subdomain->GetTenantStatisticsAggregatorID()); } + if (record.HasTenantGraphShard()) { + TTabletId tenantGS = TTabletId(record.GetTenantGraphShard()); + if (!subdomain->GetTenantGraphShardID()) { + addPrivateShard(tenantGS, ETabletType::GraphShard); + } + Y_ABORT_UNLESS(tenantGS == subdomain->GetTenantGraphShardID()); + } + if (record.HasUpdateTenantRootACL()) { // KIKIMR-10699: transfer tenants root ACL from GSS to the TSS // here TSS sees the ACL from GSS diff --git a/ydb/core/tx/schemeshard/schemeshard_domain_links.cpp b/ydb/core/tx/schemeshard/schemeshard_domain_links.cpp index 299ab2918594..35e50d05b282 100644 --- a/ydb/core/tx/schemeshard/schemeshard_domain_links.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_domain_links.cpp @@ -20,16 +20,20 @@ THolder TParentDomainLink::MakeSyncMsg Y_ABORT_UNLESS(Self->PathsById.contains(Self->RootPathId())); auto& rootSubdomain = Self->SubDomains.at(Self->RootPathId()); - return MakeHolder(Self->ParentDomainId, - Self->TabletID(), - Self->Generation(), - Self->ParentDomainEffectiveACLVersion, - rootSubdomain->GetVersion(), - rootPath->UserAttrs->AlterVersion, - ui64(rootSubdomain->GetTenantHiveID()), - ui64(rootSubdomain->GetTenantSysViewProcessorID()), - ui64(rootSubdomain->GetTenantStatisticsAggregatorID()), - rootPath->ACL); + TEvSchemeShard::TEvSyncTenantSchemeShard* ptr = new TEvSchemeShard::TEvSyncTenantSchemeShard({ + .DomainKey = Self->ParentDomainId, + .TabletId = Self->TabletID(), + .Generation = Self->Generation(), + .EffectiveACLVersion = Self->ParentDomainEffectiveACLVersion, + .SubdomainVersion = rootSubdomain->GetVersion(), + .UserAttrsVersion = rootPath->UserAttrs->AlterVersion, + .TenantHive = ui64(rootSubdomain->GetTenantHiveID()), + .TenantSysViewProcessor = ui64(rootSubdomain->GetTenantSysViewProcessorID()), + .TenantStatisticsAggregator = ui64(rootSubdomain->GetTenantStatisticsAggregatorID()), + .TenantGraphShard = ui64(rootSubdomain->GetTenantGraphShardID()), + .RootACL = rootPath->ACL + }); + return THolder(ptr); } void TParentDomainLink::SendSync(const TActorContext &ctx) { @@ -109,6 +113,7 @@ void TSubDomainsLinks::TLink::Out(IOutputStream& stream) const { << ", TenantHive: " << TenantHive << ", TenantSysViewProcessor: " << TenantSysViewProcessor << ", TenantStatisticsAggregator: " << TenantStatisticsAggregator + << ", TenantGraphShard: " << TenantGraphShard << ", TenantRootACL: " << TenantRootACL << "}"; } @@ -125,6 +130,8 @@ TSubDomainsLinks::TLink::TLink(const NKikimrScheme::TEvSyncTenantSchemeShard &re TTabletId(record.GetTenantSysViewProcessor()) : InvalidTabletId) , TenantStatisticsAggregator(record.HasTenantStatisticsAggregator() ? TTabletId(record.GetTenantStatisticsAggregator()) : InvalidTabletId) + , TenantGraphShard(record.HasTenantGraphShard() ? + TTabletId(record.GetTenantGraphShard()) : InvalidTabletId) , TenantRootACL(record.GetTenantRootACL()) {} diff --git a/ydb/core/tx/schemeshard/schemeshard_domain_links.h b/ydb/core/tx/schemeshard/schemeshard_domain_links.h index fa207d179202..a7fda70dfd2b 100644 --- a/ydb/core/tx/schemeshard/schemeshard_domain_links.h +++ b/ydb/core/tx/schemeshard/schemeshard_domain_links.h @@ -42,6 +42,7 @@ class TSubDomainsLinks { TTabletId TenantHive = InvalidTabletId; TTabletId TenantSysViewProcessor = InvalidTabletId; TTabletId TenantStatisticsAggregator = InvalidTabletId; + TTabletId TenantGraphShard = InvalidTabletId; TString TenantRootACL; TLink() = default; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index ae02b52b4634..501b8c68531d 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -1576,6 +1576,13 @@ struct TSubDomainInfo: TSimpleRefCount { return TTabletId(ProcessingParams.GetStatisticsAggregator()); } + TTabletId GetTenantGraphShardID() const { + if (!ProcessingParams.HasGraphShard()) { + return InvalidTabletId; + } + return TTabletId(ProcessingParams.GetGraphShard()); + } + ui64 GetPathsInside() const { return PathsInsideCount; } @@ -1953,6 +1960,13 @@ struct TSubDomainInfo: TSimpleRefCount { if (statisticsAggregators.size()) { ProcessingParams.SetStatisticsAggregator(ui64(statisticsAggregators.front())); } + + ProcessingParams.ClearGraphShard(); + TVector graphs = FilterPrivateTablets(ETabletType::GraphShard, allShards); + Y_VERIFY_S(graphs.size() <= 1, "size was: " << graphs.size()); + if (graphs.size()) { + ProcessingParams.SetGraphShard(ui64(graphs.front())); + } } void InitializeAsGlobal(NKikimrSubDomains::TProcessingParams&& processingParams) { diff --git a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp index aae3cefa646d..5b58e11db1c5 100644 --- a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp @@ -144,7 +144,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { ~TLocaleGuard() { std::locale::global(OriginalLocale_); } - + private: const std::locale OriginalLocale_; }; diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index 62a1378ab1fd..dbc73641066b 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -15,7 +15,7 @@ #include -static const bool ENABLE_SCHEMESHARD_LOG = true; +bool NSchemeShardUT_Private::TTestEnv::ENABLE_SCHEMESHARD_LOG = true; static const bool ENABLE_DATASHARD_LOG = false; static const bool ENABLE_COORDINATOR_MEDIATOR_LOG = false; static const bool ENABLE_SCHEMEBOARD_LOG = false; diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h index 8338737b0768..50c33be744f2 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h @@ -77,6 +77,8 @@ namespace NSchemeShardUT_Private { THolder YdbDriver; public: + static bool ENABLE_SCHEMESHARD_LOG; + TTestEnv(TTestActorRuntime& runtime, ui32 nchannels = 4, bool enablePipeRetries = true, TSchemeShardFactory ssFactory = &CreateFlatTxSchemeShard, bool enableSystemViews = false); TTestEnv(TTestActorRuntime& runtime, const TTestEnvOptions& opts, diff --git a/ydb/core/viewer/json_graph.h b/ydb/core/viewer/json_graph.h new file mode 100644 index 000000000000..05b86088d08b --- /dev/null +++ b/ydb/core/viewer/json_graph.h @@ -0,0 +1,172 @@ +#pragma once +#include +#include +#include +#include +#include +#include "viewer.h" +#include "log.h" + +namespace NKikimr { +namespace NViewer { + +using namespace NActors; + +class TJsonGraph : public TActorBootstrapped { + IViewer* Viewer; + NMon::TEvHttpInfo::TPtr Event; + std::vector Metrics; + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::VIEWER_HANDLER; + } + + TJsonGraph(IViewer* viewer, NMon::TEvHttpInfo::TPtr &ev) + : Viewer(viewer) + , Event(ev) + {} + + void Bootstrap() { + BLOG_TRACE("Graph received request for " << Event->Get()->Request.GetUri()); + const auto& params(Event->Get()->Request.GetParams()); + NKikimrGraph::TEvGetMetrics getRequest; + if (params.Has("target")) { + StringSplitter(params.Get("target")).Split(',').SkipEmpty().Collect(&Metrics); + for (const auto& metric : Metrics) { + getRequest.AddMetrics(metric); + } + } else { + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPBADREQUEST(Event->Get(), {}, "Bad Request"), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + return PassAway(); + } + if (params.Has("from")) { + getRequest.SetTimeFrom(FromStringWithDefault(params.Get("from"))); + } + if (params.Has("until")) { + getRequest.SetTimeTo(FromStringWithDefault(params.Get("until"))); + } + if (params.Has("maxDataPoints")) { + getRequest.SetMaxPoints(FromStringWithDefault(params.Get("maxDataPoints"), 1000)); + } + Send(NGraph::MakeGraphServiceId(), new NGraph::TEvGraph::TEvGetMetrics(std::move(getRequest))); + Schedule(TDuration::Seconds(30), new TEvents::TEvWakeup()); + Become(&TThis::StateWork); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NGraph::TEvGraph::TEvMetricsResult, Handle); + cFunc(TEvents::TSystem::Wakeup, Timeout); + } + } + + void Handle(NGraph::TEvGraph::TEvMetricsResult::TPtr& ev) { + const auto& params(Event->Get()->Request.GetParams()); + const auto& response(ev->Get()->Record); + NJson::TJsonValue json; + + if (response.GetError()) { + json["status"] = "error"; + json["error"] = response.GetError(); + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + return PassAway(); + } + if (response.DataSize() != Metrics.size()) { + json["status"] = "error"; + json["error"] = "Invalid data size received"; + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + return PassAway(); + } + for (size_t nMetric = 0; nMetric < response.DataSize(); ++nMetric) { + const auto& protoMetric(response.GetData(nMetric)); + if (response.TimeSize() != protoMetric.ValuesSize()) { + json["status"] = "error"; + json["error"] = "Invalid value size received"; + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + return PassAway(); + } + } + if (!params.Has("format") || params.Get("format") == "graphite") { // graphite + json.SetType(NJson::JSON_ARRAY); + for (size_t nMetric = 0; nMetric < response.DataSize(); ++nMetric) { + const auto& protoMetric(response.GetData(nMetric)); + NJson::TJsonValue& jsonMetric(json.AppendValue({})); + jsonMetric["target"] = Metrics[nMetric]; + jsonMetric["title"] = Metrics[nMetric]; + jsonMetric["tags"]["name"] = Metrics[nMetric]; + NJson::TJsonValue& jsonDataPoints(jsonMetric["datapoints"]); + jsonDataPoints.SetType(NJson::JSON_ARRAY); + for (size_t nTime = 0; nTime < response.TimeSize(); ++nTime) { + NJson::TJsonValue& jsonDataPoint(jsonDataPoints.AppendValue({})); + jsonDataPoint.AppendValue(response.GetTime(nTime)); + double value = protoMetric.GetValues(nTime); + if (isnan(value)) { + jsonDataPoint.AppendValue(NJson::TJsonValue(NJson::JSON_NULL)); + } else { + jsonDataPoint.AppendValue(value); + } + } + } + } else { // prometheus + json["status"] = "success"; + NJson::TJsonValue& jsonData(json["data"]); + jsonData["resultType"] = "matrix"; + NJson::TJsonValue& jsonResults(jsonData["result"]); + jsonResults.SetType(NJson::JSON_ARRAY); + for (size_t nMetric = 0; nMetric < response.DataSize(); ++nMetric) { + const auto& protoMetric(response.GetData(nMetric)); + NJson::TJsonValue& jsonResult(jsonResults.AppendValue({})); + jsonResult["metric"]["__name__"] = Metrics[nMetric]; + NJson::TJsonValue& jsonValues(jsonResult["values"]); + jsonValues.SetType(NJson::JSON_ARRAY); + for (size_t nTime = 0; nTime < response.TimeSize(); ++nTime) { + NJson::TJsonValue& jsonDataPoint(jsonValues.AppendValue({})); + jsonDataPoint.AppendValue(response.GetTime(nTime)); + double value = protoMetric.GetValues(nTime); + if (isnan(value)) { + jsonDataPoint.AppendValue(NJson::TJsonValue(NJson::JSON_NULL)); + } else { + jsonDataPoint.AppendValue(value); + } + } + } + } + + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + PassAway(); + } + + void Timeout() { + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPGATEWAYTIMEOUT(Event->Get()), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + PassAway(); + } +}; + +template <> +struct TJsonRequestParameters { + static TString GetParameters() { + return R"___([{"name":"target","in":"query","description":"metrics comma delimited","required":true,"type":"string"}, + {"name":"from","in":"query","description":"time in seconds","required":false,"type":"integer"}, + {"name":"until","in":"query","description":"time in seconds","required":false,"type":"integer"}, + {"name":"maxDataPoints","in":"query","description":"maximum number of data points","required":false,"type":"integer"}, + {"name":"format","in":"query","description":"response format, could be prometheus or graphite","required":false,"type":"string"}])___"; + } +}; + +template <> +struct TJsonRequestSummary { + static TString GetSummary() { + return "\"Graph data\""; + } +}; + +template <> +struct TJsonRequestDescription { + static TString GetDescription() { + return "\"Returns graph data\""; + } +}; + +} +} diff --git a/ydb/core/viewer/json_handlers_viewer.cpp b/ydb/core/viewer/json_handlers_viewer.cpp index 3b5e219e4654..0ade1a646182 100644 --- a/ydb/core/viewer/json_handlers_viewer.cpp +++ b/ydb/core/viewer/json_handlers_viewer.cpp @@ -36,6 +36,8 @@ #include "json_healthcheck.h" #include "json_nodes.h" #include "json_acl.h" +#include "json_graph.h" +#include "json_render.h" namespace NKikimr::NViewer { @@ -76,4 +78,6 @@ void TViewerJsonHandlers::Init() { JsonHandlers["/json/healthcheck"] = new TJsonHandler; JsonHandlers["/json/nodes"] = new TJsonHandler; JsonHandlers["/json/acl"] = new TJsonHandler; + JsonHandlers["/json/graph"] = new TJsonHandler; + JsonHandlers["/json/render"] = new TJsonHandler; }} diff --git a/ydb/core/viewer/json_render.h b/ydb/core/viewer/json_render.h new file mode 100644 index 000000000000..83fdb356a35d --- /dev/null +++ b/ydb/core/viewer/json_render.h @@ -0,0 +1,158 @@ +#pragma once +#include +#include +#include +#include +#include +#include "viewer.h" +#include "log.h" + +namespace NKikimr { +namespace NViewer { + +using namespace NActors; + +class TJsonRender : public TActorBootstrapped { + IViewer* Viewer; + NMon::TEvHttpInfo::TPtr Event; + std::vector Metrics; + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::VIEWER_HANDLER; + } + + TJsonRender(IViewer* viewer, NMon::TEvHttpInfo::TPtr &ev) + : Viewer(viewer) + , Event(ev) + {} + + void Bootstrap() { + auto postData = Event->Get()->Request.GetPostContent(); + BLOG_D("PostData=" << postData); + NKikimrGraph::TEvGetMetrics getRequest; + if (postData) { + TCgiParameters params(postData); + if (params.Has("target")) { + StringSplitter(params.Get("target")).Split(',').SkipEmpty().Collect(&Metrics); + for (const auto& metric : Metrics) { + getRequest.AddMetrics(metric); + } + } else { + static const TString png1x1 = "\x89\x50\x4e\x47\x0d\x0a\x1a\x0a\x00\x00\x00\x0d\x49\x48\x44\x52\x00\x00\x00\x01\x00\x00\x00\x01\x01" + "\x03\x00\x00\x00\x25\xdb\x56\xca\x00\x00\x00\x03\x50\x4c\x54\x45\x00\x00\x00\xa7\x7a\x3d\xda\x00\x00" + "\x00\x01\x74\x52\x4e\x53\x00\x40\xe6\xd8\x66\x00\x00\x00\x0a\x49\x44\x41\x54\x08\xd7\x63\x60\x00\x00" + "\x00\x02\x00\x01\xe2\x21\xbc\x33\x00\x00\x00\x00\x49\x45\x4e\x44\xae\x42\x60\x82"; + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOK(Event->Get(), "image/png", png1x1), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + return PassAway(); + } + if (params.Has("from")) { + getRequest.SetTimeFrom(FromStringWithDefault(params.Get("from"))); + } + if (params.Has("until")) { + getRequest.SetTimeTo(FromStringWithDefault(params.Get("until"))); + } + if (params.Has("maxDataPoints")) { + getRequest.SetMaxPoints(FromStringWithDefault(params.Get("maxDataPoints"), 1000)); + } + } else { + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPBADREQUEST(Event->Get(), {}, "Bad Request"), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + return PassAway(); + } + Send(NGraph::MakeGraphServiceId(), new NGraph::TEvGraph::TEvGetMetrics(std::move(getRequest))); + Schedule(TDuration::Seconds(30), new TEvents::TEvWakeup()); + Become(&TThis::StateWork); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NGraph::TEvGraph::TEvMetricsResult, Handle); + cFunc(TEvents::TSystem::Wakeup, Timeout); + } + } + + void Handle(NGraph::TEvGraph::TEvMetricsResult::TPtr& ev) { + const auto& response(ev->Get()->Record); + NJson::TJsonValue json; + + if (response.GetError()) { + json["status"] = "error"; + json["error"] = response.GetError(); + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + return PassAway(); + } + if (response.DataSize() != Metrics.size()) { + json["status"] = "error"; + json["error"] = "Invalid data size received"; + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + return PassAway(); + } + for (size_t nMetric = 0; nMetric < response.DataSize(); ++nMetric) { + const auto& protoMetric(response.GetData(nMetric)); + if (response.TimeSize() != protoMetric.ValuesSize()) { + json["status"] = "error"; + json["error"] = "Invalid value size received"; + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + return PassAway(); + } + } + { // graphite + json.SetType(NJson::JSON_ARRAY); + for (size_t nMetric = 0; nMetric < response.DataSize(); ++nMetric) { + const auto& protoMetric(response.GetData(nMetric)); + NJson::TJsonValue& jsonMetric(json.AppendValue({})); + jsonMetric["target"] = Metrics[nMetric]; + jsonMetric["title"] = Metrics[nMetric]; + jsonMetric["tags"]["name"] = Metrics[nMetric]; + NJson::TJsonValue& jsonDataPoints(jsonMetric["datapoints"]); + jsonDataPoints.SetType(NJson::JSON_ARRAY); + for (size_t nTime = 0; nTime < response.TimeSize(); ++nTime) { + NJson::TJsonValue& jsonDataPoint(jsonDataPoints.AppendValue({})); + double value = protoMetric.GetValues(nTime); + if (isnan(value)) { + jsonDataPoint.AppendValue(NJson::TJsonValue(NJson::JSON_NULL)); + } else { + jsonDataPoint.AppendValue(value); + } + jsonDataPoint.AppendValue(response.GetTime(nTime)); + } + } + } + + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + PassAway(); + } + + void Timeout() { + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPGATEWAYTIMEOUT(Event->Get()), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + PassAway(); + } +}; + +template <> +struct TJsonRequestParameters { + static TString GetParameters() { + return R"___([{"name":"target","in":"query","description":"metrics comma delimited","required":true,"type":"string"}, + {"name":"from","in":"query","description":"time in seconds","required":false,"type":"integer"}, + {"name":"until","in":"query","description":"time in seconds","required":false,"type":"integer"}, + {"name":"maxDataPoints","in":"query","description":"maximum number of data points","required":false,"type":"integer"}, + {"name":"format","in":"query","description":"response format","required":false,"type":"string"}])___"; + } +}; + +template <> +struct TJsonRequestSummary { + static TString GetSummary() { + return "\"Graph data\""; + } +}; + +template <> +struct TJsonRequestDescription { + static TString GetDescription() { + return "\"Returns graph data in graphite format\""; + } +}; + +} +} diff --git a/ydb/core/viewer/viewer.cpp b/ydb/core/viewer/viewer.cpp index 643670e5ea4b..65279dd2cbeb 100644 --- a/ydb/core/viewer/viewer.cpp +++ b/ydb/core/viewer/viewer.cpp @@ -152,6 +152,7 @@ class TViewer : public TActorBootstrapped, public IViewer { TString GetCORS(const NMon::TEvHttpInfo* request) override; TString GetHTTPOKJSON(const NMon::TEvHttpInfo* request, TString response) override; + TString GetHTTPOK(const NMon::TEvHttpInfo* request, TString type, TString response) override; TString GetHTTPGATEWAYTIMEOUT(const NMon::TEvHttpInfo* request) override; TString GetHTTPBADREQUEST(const NMon::TEvHttpInfo* request, TString type, TString response) override; @@ -491,6 +492,22 @@ TString TViewer::GetHTTPBADREQUEST(const NMon::TEvHttpInfo* request, TString con return res; } +TString TViewer::GetHTTPOK(const NMon::TEvHttpInfo* request, TString contentType = {}, TString response = {}) { + TStringBuilder res; + res << "HTTP/1.1 200 Ok\r\n" + << "Content-Type: " << contentType << "\r\n" + << "X-Worker-Name: " << CurrentWorkerName << "\r\n"; + res << GetCORS(request); + if (response) { + res << "Content-Length: " << response.size() << "\r\n"; + } + res << "\r\n"; + if (response) { + res << response; + } + return res; +} + NKikimrViewer::EFlag GetFlagFromTabletState(NKikimrWhiteboard::TTabletStateInfo::ETabletState state) { NKikimrViewer::EFlag flag = NKikimrViewer::EFlag::Grey; switch (state) { diff --git a/ydb/core/viewer/viewer.h b/ydb/core/viewer/viewer.h index cb4642d570a1..7fdab30f8332 100644 --- a/ydb/core/viewer/viewer.h +++ b/ydb/core/viewer/viewer.h @@ -154,6 +154,7 @@ class IViewer { const TContentHandler& handler) = 0; virtual TString GetCORS(const NMon::TEvHttpInfo* request) = 0; + virtual TString GetHTTPOK(const NMon::TEvHttpInfo* request, TString contentType = {}, TString response = {}) = 0; virtual TString GetHTTPOKJSON(const NMon::TEvHttpInfo* request, TString response = {}) = 0; virtual TString GetHTTPGATEWAYTIMEOUT(const NMon::TEvHttpInfo* request) = 0; virtual TString GetHTTPBADREQUEST(const NMon::TEvHttpInfo* request, TString contentType = {}, TString response = {}) = 0; diff --git a/ydb/core/viewer/ya.make b/ydb/core/viewer/ya.make index ec0fd7a831c9..1dd3163c5a13 100644 --- a/ydb/core/viewer/ya.make +++ b/ydb/core/viewer/ya.make @@ -22,6 +22,7 @@ SRCS( json_describe.h json_local_rpc.h json_getblob.h + json_graph.h json_handlers_vdisk.cpp json_handlers_viewer.cpp json_healthcheck.h @@ -35,6 +36,7 @@ SRCS( json_nodes.h json_pdiskinfo.h json_query.h + json_render.h json_storage.h json_sysinfo.h json_tabletcounters.h @@ -259,6 +261,7 @@ PEERDIR( ydb/core/blobstorage/base ydb/core/blobstorage/vdisk/common ydb/core/client/server + ydb/core/graph/api ydb/core/grpc_services ydb/core/grpc_services/local_rpc ydb/core/health_check diff --git a/ydb/core/ya.make b/ydb/core/ya.make index 0754ebcedeb5..ddc2239a6b06 100644 --- a/ydb/core/ya.make +++ b/ydb/core/ya.make @@ -18,6 +18,7 @@ RECURSE( filestore fq formats + graph grpc_caching grpc_services grpc_streaming diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index ebd5ed391159..525fdd251952 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -83,6 +83,8 @@ enum EServiceKikimr { TENANT_POOL = 303; LABELS_MAINTAINER = 305; + GRAPH = 306; + // TABLET section TABLET_EXECUTOR = 310; TABLET_MAIN = 311; @@ -1012,5 +1014,6 @@ message TActivity { PQ_FETCH_REQUEST = 621; STATISTICS_AGGREGATOR = 622; KAFKA_READ_SESSION_ACTOR = 623; + GRAPH_SERVICE = 624; }; };