Skip to content

Commit

Permalink
Merge a3702b7 into a8fb87e
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Sep 11, 2024
2 parents a8fb87e + a3702b7 commit 89fe93e
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 45 deletions.
58 changes: 30 additions & 28 deletions ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ namespace Tests {
NKikimr::NConfig::TConfigsDispatcherInitInfo {
.InitialConfig = initial,
});
auto aid = Runtime->Register(dispatcher, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
auto aid = Runtime->Register(dispatcher, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
if (Settings->IsEnableMetadataProvider()) {
Expand All @@ -812,7 +812,7 @@ namespace Tests {
}
if (Settings->IsEnableExternalIndex()) {
auto* actor = NCSIndex::CreateService(NCSIndex::TConfig());
const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NCSIndex::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
{
Expand All @@ -838,7 +838,7 @@ namespace Tests {
Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);

auto sysViewService = NSysView::CreateSysViewServiceForTests();
TActorId sysViewServiceId = Runtime->Register(sysViewService.Release(), nodeIdx);
TActorId sysViewServiceId = Runtime->Register(sysViewService.Release(), nodeIdx, appData.UserPoolId);
Runtime->RegisterService(NSysView::MakeSysViewServiceID(Runtime->GetNodeId(nodeIdx)), sysViewServiceId, nodeIdx);

auto tenantPublisher = CreateTenantNodeEnumerationPublisher();
Expand All @@ -856,11 +856,13 @@ namespace Tests {
}

void TServer::SetupProxies(ui32 nodeIdx) {
const ui32 userPoolId = Runtime->GetAppData(nodeIdx).UserPoolId;

Runtime->SetTxAllocatorTabletIds({ChangeStateStorage(TxAllocator, Settings->Domain)});
{
if (Settings->AuthConfig.HasLdapAuthentication()) {
IActor* ldapAuthProvider = NKikimr::CreateLdapAuthProvider(Settings->AuthConfig.GetLdapAuthentication());
TActorId ldapAuthProviderId = Runtime->Register(ldapAuthProvider, nodeIdx);
TActorId ldapAuthProviderId = Runtime->Register(ldapAuthProvider, nodeIdx, userPoolId);
Runtime->RegisterService(MakeLdapAuthProviderID(), ldapAuthProviderId, nodeIdx);
}
TTicketParserSettings ticketParserSettings {
Expand All @@ -872,27 +874,27 @@ namespace Tests {
}
};
IActor* ticketParser = Settings->CreateTicketParser(ticketParserSettings);
TActorId ticketParserId = Runtime->Register(ticketParser, nodeIdx);
TActorId ticketParserId = Runtime->Register(ticketParser, nodeIdx, userPoolId);
Runtime->RegisterService(MakeTicketParserID(), ticketParserId, nodeIdx);
}

{
IActor* healthCheck = NHealthCheck::CreateHealthCheckService();
TActorId healthCheckId = Runtime->Register(healthCheck, nodeIdx);
TActorId healthCheckId = Runtime->Register(healthCheck, nodeIdx, userPoolId);
Runtime->RegisterService(NHealthCheck::MakeHealthCheckID(), healthCheckId, nodeIdx);
}
{
const auto& appData = Runtime->GetAppData(nodeIdx);
IActor* metadataCache = CreateDatabaseMetadataCache(appData.TenantName, appData.Counters).release();
TActorId metadataCacheId = Runtime->Register(metadataCache, nodeIdx);
TActorId metadataCacheId = Runtime->Register(metadataCache, nodeIdx, userPoolId);
Runtime->RegisterService(MakeDatabaseMetadataCacheId(Runtime->GetNodeId(nodeIdx)), metadataCacheId, nodeIdx);
}
{
auto kqpProxySharedResources = std::make_shared<NKqp::TKqpProxySharedResources>();

IActor* kqpRmService = NKqp::CreateKqpResourceManagerActor(
Settings->AppConfig->GetTableServiceConfig().GetResourceManager(), nullptr, {}, kqpProxySharedResources, Runtime->GetNodeId(nodeIdx));
TActorId kqpRmServiceId = Runtime->Register(kqpRmService, nodeIdx);
TActorId kqpRmServiceId = Runtime->Register(kqpRmService, nodeIdx, userPoolId);
Runtime->RegisterService(NKqp::MakeKqpRmServiceID(Runtime->GetNodeId(nodeIdx)), kqpRmServiceId, nodeIdx);

if (!KqpLoggerScope) {
Expand All @@ -915,14 +917,14 @@ namespace Tests {
auto httpProxyActorId = NFq::MakeYqlAnalyticsHttpProxyId();
Runtime->RegisterService(
httpProxyActorId,
Runtime->Register(NHttp::CreateHttpProxy(), nodeIdx),
Runtime->Register(NHttp::CreateHttpProxy(), nodeIdx, userPoolId),
nodeIdx
);

auto databaseResolverActorId = NFq::MakeDatabaseResolverActorId();
Runtime->RegisterService(
databaseResolverActorId,
Runtime->Register(NFq::CreateDatabaseResolver(httpProxyActorId, Settings->CredentialsFactory), nodeIdx),
Runtime->Register(NFq::CreateDatabaseResolver(httpProxyActorId, Settings->CredentialsFactory), nodeIdx, userPoolId),
nodeIdx
);

Expand Down Expand Up @@ -956,50 +958,50 @@ namespace Tests {
TVector<NKikimrKqp::TKqpSetting>(Settings->KqpSettings),
nullptr, std::move(kqpProxySharedResources),
federatedQuerySetupFactory, Settings->S3ActorsFactory);
TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx);
TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx, userPoolId);
Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx);

IActor* scriptFinalizeService = NKqp::CreateKqpFinalizeScriptService(
Settings->AppConfig->GetQueryServiceConfig(), federatedQuerySetupFactory, Settings->S3ActorsFactory
);
TActorId scriptFinalizeServiceId = Runtime->Register(scriptFinalizeService, nodeIdx);
TActorId scriptFinalizeServiceId = Runtime->Register(scriptFinalizeService, nodeIdx, userPoolId);
Runtime->RegisterService(NKqp::MakeKqpFinalizeScriptServiceId(Runtime->GetNodeId(nodeIdx)), scriptFinalizeServiceId, nodeIdx);
}

{
IActor* txProxy = CreateTxProxy(Runtime->GetTxAllocatorTabletIds());
TActorId txProxyId = Runtime->Register(txProxy, nodeIdx);
TActorId txProxyId = Runtime->Register(txProxy, nodeIdx, userPoolId);
Runtime->RegisterService(MakeTxProxyID(), txProxyId, nodeIdx);
}

{
IActor* compileService = CreateMiniKQLCompileService(100000);
TActorId compileServiceId = Runtime->Register(compileService, nodeIdx, Runtime->GetAppData(nodeIdx).SystemPoolId, TMailboxType::Revolving, 0);
TActorId compileServiceId = Runtime->Register(compileService, nodeIdx, userPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(MakeMiniKQLCompileServiceID(), compileServiceId, nodeIdx);
}

{
IActor* longTxService = NLongTxService::CreateLongTxService();
TActorId longTxServiceId = Runtime->Register(longTxService, nodeIdx);
TActorId longTxServiceId = Runtime->Register(longTxService, nodeIdx, userPoolId);
Runtime->RegisterService(NLongTxService::MakeLongTxServiceID(Runtime->GetNodeId(nodeIdx)), longTxServiceId, nodeIdx);
}

{
IActor* sequenceProxy = NSequenceProxy::CreateSequenceProxy();
TActorId sequenceProxyId = Runtime->Register(sequenceProxy, nodeIdx);
TActorId sequenceProxyId = Runtime->Register(sequenceProxy, nodeIdx, userPoolId);
Runtime->RegisterService(NSequenceProxy::MakeSequenceProxyServiceID(), sequenceProxyId, nodeIdx);
}

if (BusServer && nodeIdx == 0) { // MsgBus and GRPC are run now only on first node
{
IActor* proxy = BusServer->CreateProxy();
TActorId proxyId = Runtime->Register(proxy, nodeIdx, Runtime->GetAppData(nodeIdx).SystemPoolId, TMailboxType::Revolving, 0);
TActorId proxyId = Runtime->Register(proxy, nodeIdx, userPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NMsgBusProxy::CreateMsgBusProxyId(), proxyId, nodeIdx);
}
}
{
IActor* icNodeCache = NIcNodeCache::CreateICNodesInfoCacheService(Runtime->GetDynamicCounters());
TActorId icCacheId = Runtime->Register(icNodeCache, nodeIdx);
TActorId icCacheId = Runtime->Register(icNodeCache, nodeIdx, userPoolId);
Runtime->RegisterService(NIcNodeCache::CreateICNodesInfoCacheServiceId(), icCacheId, nodeIdx);
}
{
Expand All @@ -1012,12 +1014,12 @@ namespace Tests {

{
IActor* pqClusterTracker = NPQ::NClusterTracker::CreateClusterTracker();
TActorId pqClusterTrackerId = Runtime->Register(pqClusterTracker, nodeIdx);
TActorId pqClusterTrackerId = Runtime->Register(pqClusterTracker, nodeIdx, userPoolId);
Runtime->RegisterService(NPQ::NClusterTracker::MakeClusterTrackerID(), pqClusterTrackerId, nodeIdx);
}
{
IActor* pqReadCacheService = NPQ::CreatePQDReadCacheService(Runtime->GetDynamicCounters());
TActorId readCacheId = Runtime->Register(pqReadCacheService, nodeIdx);
TActorId readCacheId = Runtime->Register(pqReadCacheService, nodeIdx, userPoolId);
Runtime->RegisterService(NPQ::MakePQDReadCacheServiceActorId(), readCacheId, nodeIdx);
}

Expand All @@ -1027,7 +1029,7 @@ namespace Tests {
new ::NMonitoring::TDynamicCounters(), TDuration::Seconds(1)
);

TActorId pqMetaCacheId = Runtime->Register(pqMetaCache, nodeIdx);
TActorId pqMetaCacheId = Runtime->Register(pqMetaCache, nodeIdx, userPoolId);
Runtime->RegisterService(NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), pqMetaCacheId, nodeIdx);
}
}
Expand All @@ -1038,7 +1040,7 @@ namespace Tests {
try {
fileBackend = MakeHolder<TFileLogBackend>(Settings->MeteringFilePath);
auto meteringActor = NMetering::CreateMeteringWriter(std::move(fileBackend));
TActorId meteringId = Runtime->Register(meteringActor.Release(), nodeIdx);
TActorId meteringId = Runtime->Register(meteringActor.Release(), nodeIdx, Runtime->GetAppData(nodeIdx).IOPoolId);
Runtime->RegisterService(NMetering::MakeMeteringServiceID(), meteringId, nodeIdx);

} catch (const TFileError &ex) {
Expand All @@ -1050,19 +1052,19 @@ namespace Tests {

{
IActor* kesusService = NKesus::CreateKesusProxyService();
TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx);
TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx, userPoolId);
Runtime->RegisterService(NKesus::MakeKesusProxyServiceId(), kesusServiceId, nodeIdx);
}

{
IActor* netClassifier = NNetClassifier::CreateNetClassifier();
TActorId netClassifierId = Runtime->Register(netClassifier, nodeIdx);
TActorId netClassifierId = Runtime->Register(netClassifier, nodeIdx, userPoolId);
Runtime->RegisterService(NNetClassifier::MakeNetClassifierID(), netClassifierId, nodeIdx);
}

{
IActor* actor = CreatePollerActor();
TActorId actorId = Runtime->Register(actor, nodeIdx);
TActorId actorId = Runtime->Register(actor, nodeIdx, Runtime->GetAppData(nodeIdx).SystemPoolId);
Runtime->RegisterService(MakePollerActorId(), actorId, nodeIdx);
}

Expand All @@ -1074,11 +1076,11 @@ namespace Tests {
}

IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig->GetKafkaProxyConfig());
TActorId actorId = Runtime->Register(actor, nodeIdx);
TActorId actorId = Runtime->Register(actor, nodeIdx, userPoolId);
Runtime->RegisterService(TActorId{}, actorId, nodeIdx);

IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{Runtime->GetAppData().Counters->GetSubgroup("counters", "kafka_proxy")});
TActorId metricsActorId = Runtime->Register(metricsActor, nodeIdx);
TActorId metricsActorId = Runtime->Register(metricsActor, nodeIdx, userPoolId);
Runtime->RegisterService(NKafka::MakeKafkaMetricsServiceID(), metricsActorId, nodeIdx);
}

Expand Down Expand Up @@ -1205,7 +1207,7 @@ namespace Tests {
IActor* viewer = CreateViewer(*Settings->KikimrRunConfig);
SetupPQVirtualHandlers(dynamic_cast<IViewer*>(viewer));
SetupDBVirtualHandlers(dynamic_cast<IViewer*>(viewer));
TActorId viewerId = Runtime->Register(viewer, nodeIdx);
TActorId viewerId = Runtime->Register(viewer, nodeIdx, Runtime->GetAppData(nodeIdx).BatchPoolId);
Runtime->RegisterService(MakeViewerID(nodeIdx), viewerId, nodeIdx);
}
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/tests/tools/kqprun/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ udfs
*.sql
*.bin
*.txt
*.svg
*.old
46 changes: 46 additions & 0 deletions ydb/tests/tools/kqprun/configuration/app_config.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,49 @@
ActorSystemConfig {
Executor {
Type: BASIC
Threads: 1
SpinThreshold: 10
Name: "System"
}
Executor {
Type: BASIC
Threads: 6
SpinThreshold: 1
Name: "User"
}
Executor {
Type: BASIC
Threads: 1
SpinThreshold: 1
Name: "Batch"
}
Executor {
Type: IO
Threads: 1
Name: "IO"
}
Executor {
Type: BASIC
Threads: 2
SpinThreshold: 10
Name: "IC"
TimePerMailboxMicroSecs: 100
}
Scheduler {
Resolution: 64
SpinThreshold: 0
ProgressThreshold: 10000
}
SysExecutor: 0
UserExecutor: 1
IoExecutor: 3
BatchExecutor: 2
ServiceExecutor {
ServiceName: "Interconnect"
ExecutorId: 4
}
}

ColumnShardConfig {
DisabledOnSchemeShard: false
}
Expand Down
11 changes: 11 additions & 0 deletions ydb/tests/tools/kqprun/flame_graph.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/usr/bin/env bash

# For svg graph download https://github.com/brendangregg/FlameGraph
# and run `FlameGraph/stackcollapse-perf.pl profdata.txt | FlameGraph/flamegraph.pl > profdata.svg`

pid=$(pgrep -u $USER kqprun)

echo "Target process id: ${pid}"

sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $pid -v -o profdata -- sleep 30
sudo perf script -i profdata > profdata.txt
Loading

0 comments on commit 89fe93e

Please sign in to comment.