Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scheme_tests canondata #1492

Closed
wants to merge 19 commits into from
3 changes: 2 additions & 1 deletion ydb/core/base/appdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ TAppData::TAppData(
, BootstrapConfigPtr(new NKikimrConfig::TBootstrap())
, AwsCompatibilityConfigPtr(new NKikimrConfig::TAwsCompatibilityConfig())
, S3ProxyResolverConfigPtr(new NKikimrConfig::TS3ProxyResolverConfig())
, BackgroundCleaningConfigPtr(new NKikimrConfig::TBackgroundCleaningConfig())
, StreamingConfig(*StreamingConfigPtr.get())
, PQConfig(*PQConfigPtr.get())
, PQClusterDiscoveryConfig(*PQClusterDiscoveryConfigPtr.get())
Expand All @@ -96,8 +97,8 @@ TAppData::TAppData(
, BootstrapConfig(*BootstrapConfigPtr.get())
, AwsCompatibilityConfig(*AwsCompatibilityConfigPtr.get())
, S3ProxyResolverConfig(*S3ProxyResolverConfigPtr.get())
, BackgroundCleaningConfig(*BackgroundCleaningConfigPtr.get())
, KikimrShouldContinue(kikimrShouldContinue)

{}

TIntrusivePtr<IRandomProvider> TAppData::RandomProvider = CreateDefaultRandomProvider();
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/base/appdata_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace NKikimrSharedCache {

namespace NKikimrProto {
class TKeyConfig;
class TAuthConfig;
class TAuthConfig;

namespace NFolderService {
class TFolderServiceConfig;
Expand Down Expand Up @@ -60,6 +60,7 @@ namespace NKikimrConfig {
class TBootstrap;
class TAwsCompatibilityConfig;
class TS3ProxyResolverConfig;
class TBackgroundCleaningConfig;
}

namespace NKikimrNetClassifier {
Expand Down Expand Up @@ -204,6 +205,7 @@ struct TAppData {
std::unique_ptr<NKikimrConfig::TAwsCompatibilityConfig> AwsCompatibilityConfigPtr;
std::unique_ptr<NKikimrConfig::TS3ProxyResolverConfig> S3ProxyResolverConfigPtr;
std::unique_ptr<NKikimrSharedCache::TSharedCacheConfig> SharedCacheConfigPtr;
std::unique_ptr<NKikimrConfig::TBackgroundCleaningConfig> BackgroundCleaningConfigPtr;

NKikimrStream::TStreamingConfig& StreamingConfig;
NKikimrPQ::TPQConfig& PQConfig;
Expand All @@ -226,6 +228,7 @@ struct TAppData {
NKikimrConfig::TBootstrap& BootstrapConfig;
NKikimrConfig::TAwsCompatibilityConfig& AwsCompatibilityConfig;
NKikimrConfig::TS3ProxyResolverConfig& S3ProxyResolverConfig;
NKikimrConfig::TBackgroundCleaningConfig& BackgroundCleaningConfig;
bool EnforceUserTokenRequirement = false;
bool AllowHugeKeyValueDeletes = true; // delete when all clients limit deletes per request
bool EnableKqpSpilling = false;
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/cms/console/configs_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const THashSet<ui32> DYNAMIC_KINDS({
(ui32)NKikimrConsole::TConfigItem::TenantPoolConfigItem,
(ui32)NKikimrConsole::TConfigItem::TenantSlotBrokerConfigItem,
(ui32)NKikimrConsole::TConfigItem::AllowEditYamlInUiItem,
(ui32)NKikimrConsole::TConfigItem::BackgroundCleaningConfigItem
});

const THashSet<ui32> NON_YAML_KINDS({
Expand Down Expand Up @@ -161,7 +162,7 @@ class TConfigsDispatcher : public TActorBootstrapped<TConfigsDispatcher> {
TCheckKindsResult CheckKinds(const TVector<ui32>& kinds, const char* errorContext) const;

NKikimrConfig::TAppConfig ParseYamlProtoConfig();

void Handle(NMon::TEvHttpInfo::TPtr &ev);
void Handle(TEvInterconnect::TEvNodesInfo::TPtr &ev);
void Handle(TEvConsole::TEvConfigSubscriptionNotification::TPtr &ev);
Expand Down Expand Up @@ -783,7 +784,7 @@ void TConfigsDispatcher::Handle(TEvConsole::TEvConfigSubscriptionNotification::T
subscription->YamlVersion = std::nullopt;
}
}

if (CurrentStateFunc() == &TThis::StateInit) {
Become(&TThis::StateWork);
ProcessEnqueuedEvents();
Expand Down Expand Up @@ -995,7 +996,7 @@ void TConfigsDispatcher::Handle(TEvConsole::TEvGetNodeLabelsRequest::TPtr &ev) {

Send(ev->Sender, Response.Release());
}

IActor *CreateConfigsDispatcher(
const NKikimrConfig::TAppConfig &config,
const TMap<TString, TString> &labels,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class TDomainsInitializer : public IAppDataInitializer {
appData->EnableKqpSpilling = Config.GetTableServiceConfig().GetSpillingServiceConfig().GetLocalFileConfig().GetEnable();

appData->CompactionConfig = Config.GetCompactionConfig();
appData->BackgroundCleaningConfig = Config.GetBackgroundCleaningConfig();
}
};

Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,12 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext);

IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
IActor* CreateKqpSchemeExecuter(
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
const TMaybe<TString>& requestType, const TString& database,
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
bool temporary, TString SessionId, TIntrusivePtr<TUserRequestContext> ctx);
bool temporary, TString SessionId, TIntrusivePtr<TUserRequestContext> ctx,
const TActorId& kqpTempTablesAgentActor = TActorId());

std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral(
IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner, const TIntrusivePtr<TUserRequestContext>& userRequestContext);
Expand Down
20 changes: 15 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR;
}

TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType,
TKqpSchemeExecuter(
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType,
const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken,
bool temporary, TString sessionId, TIntrusivePtr<TUserRequestContext> ctx)
bool temporary, TString sessionId, TIntrusivePtr<TUserRequestContext> ctx,
const TActorId& kqpTempTablesAgentActor)
: PhyTx(phyTx)
, QueryType(queryType)
, Target(target)
Expand All @@ -62,6 +64,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
, SessionId(sessionId)
, RequestContext(std::move(ctx))
, RequestType(requestType)
, KqpTempTablesAgentActor(kqpTempTablesAgentActor)
{
YQL_ENSURE(PhyTx);
YQL_ENSURE(PhyTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME);
Expand Down Expand Up @@ -107,6 +110,9 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
}
tableDesc->SetName(tableDesc->GetName() + SessionId);
tableDesc->SetPath(tableDesc->GetPath() + SessionId);
YQL_ENSURE(KqpTempTablesAgentActor != TActorId(),
"Create temp table with empty KqpTempTablesAgentActor");
ActorIdToProto(KqpTempTablesAgentActor, modifyScheme.MutableTempTableOwnerActorId());
}
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
Expand Down Expand Up @@ -576,16 +582,20 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
ui64 SchemeShardTabletId = 0;
TIntrusivePtr<TUserRequestContext> RequestContext;
const TMaybe<TString> RequestType;
const TActorId KqpTempTablesAgentActor;
};

} // namespace

IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
IActor* CreateKqpSchemeExecuter(
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
const TMaybe<TString>& requestType, const TString& database,
TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool temporary, TString sessionId,
TIntrusivePtr<TUserRequestContext> ctx)
TIntrusivePtr<TUserRequestContext> ctx, const TActorId& kqpTempTablesAgentActor)
{
return new TKqpSchemeExecuter(phyTx, queryType, target, requestType, database, userToken, temporary, sessionId, std::move(ctx));
return new TKqpSchemeExecuter(
phyTx, queryType, target, requestType, database, userToken,
temporary, sessionId, std::move(ctx), kqpTempTablesAgentActor);
}

} // namespace NKikimr::NKqp
40 changes: 37 additions & 3 deletions ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#include <ydb/core/kqp/session_actor/kqp_worker_common.h>
#include <ydb/core/kqp/node_service/kqp_node_service.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/library/yql/dq/actors/spilling/spilling_file.h>
#include <ydb/library/yql/dq/actors/spilling/spilling.h>
#include <ydb/core/actorlib_impl/long_timer.h>
Expand Down Expand Up @@ -109,6 +110,32 @@ TString EncodeSessionId(ui32 nodeId, const TString& id) {
return NOperationId::ProtoToString(opId);
}

class TKqpTempTablesAgentActor: public TActorBootstrapped<TKqpTempTablesAgentActor> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::KQP_PROXY_ACTOR;
}

explicit TKqpTempTablesAgentActor()
{}

void Bootstrap() {
Become(&TKqpTempTablesAgentActor::StateWork);
}

STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(NSchemeShard::TEvSchemeShard::TEvOwnerActorAck, HandleNoop)
sFunc(TEvents::TEvPoison, PassAway);
}
}

private:
template<typename T>
void HandleNoop(T&) {
}
};

class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
struct TEvPrivate {
enum EEv {
Expand Down Expand Up @@ -263,6 +290,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {

KqpRmServiceActor = MakeKqpRmServiceID(SelfId().NodeId());

KqpTempTablesAgentActor = Register(new TKqpTempTablesAgentActor());

Become(&TKqpProxyService::MainState);
StartCollectPeerProxyData();
PublishResourceUsage();
Expand Down Expand Up @@ -434,6 +463,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
void PassAway() override {
Send(CompileService, new TEvents::TEvPoisonPill());

Send(KqpTempTablesAgentActor, new TEvents::TEvPoisonPill());

if (TableServiceConfig.GetEnableAsyncComputationPatternCompilation()) {
Send(CompileComputationPatternService, new TEvents::TEvPoisonPill());
}
Expand Down Expand Up @@ -657,8 +688,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
if (cancelAfter) {
timerDuration = Min(timerDuration, cancelAfter);
}
KQP_PROXY_LOG_D("Ctx: " << *ev->Get()->GetUserRequestContext() << ". TEvQueryRequest, set timer for: " << timerDuration
<< " timeout: " << timeout << " cancelAfter: " << cancelAfter
KQP_PROXY_LOG_D("Ctx: " << *ev->Get()->GetUserRequestContext() << ". TEvQueryRequest, set timer for: " << timerDuration
<< " timeout: " << timeout << " cancelAfter: " << cancelAfter
<< ". " << "Send request to target, requestId: " << requestId << ", targetId: " << targetId);
auto status = timerDuration == cancelAfter ? NYql::NDqProto::StatusIds::CANCELLED : NYql::NDqProto::StatusIds::TIMEOUT;
StartQueryTimeout(requestId, timerDuration, status);
Expand Down Expand Up @@ -1419,7 +1450,9 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {

auto config = CreateConfig(KqpSettings, workerSettings);

IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters, QueryServiceConfig, MetadataProviderConfig);
IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings,
FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters,
QueryServiceConfig, MetadataProviderConfig, KqpTempTablesAgentActor);
auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId);
TKqpSessionInfo* sessionInfo = LocalSessions->Create(
sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration(), pgWire);
Expand Down Expand Up @@ -1651,6 +1684,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
EScriptExecutionsCreationStatus ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::NotStarted;
std::deque<THolder<IEventHandle>> DelayedEventsQueue;
bool IsLookupByRmScheduled = false;
TActorId KqpTempTablesAgentActor;
};

} // namespace
Expand Down
16 changes: 12 additions & 4 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <ydb/core/kqp/provider/yql_kikimr_results.h>
#include <ydb/core/kqp/rm_service/kqp_snapshot_manager.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/public/lib/operation_id/operation_id.h>

#include <ydb/core/util/ulid.h>
Expand Down Expand Up @@ -155,7 +156,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig)
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
const TActorId& kqpTempTablesAgentActor)
: Owner(owner)
, SessionId(sessionId)
, Counters(counters)
Expand All @@ -168,6 +170,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
, Transactions(*Config->_KqpMaxActiveTxPerSession.Get(), TDuration::Seconds(*Config->_KqpTxIdleTimeoutSec.Get()))
, QueryServiceConfig(queryServiceConfig)
, MetadataProviderConfig(metadataProviderConfig)
, KqpTempTablesAgentActor(kqpTempTablesAgentActor)
{
RequestCounters = MakeIntrusive<TKqpRequestCounters>();
RequestCounters->Counters = Counters;
Expand Down Expand Up @@ -1066,7 +1069,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
bool temporary = GetTemporaryTableInfo(tx).has_value();

auto executerActor = CreateKqpSchemeExecuter(tx, QueryState->GetType(), SelfId(), requestType, Settings.Database, userToken,
temporary, TempTablesState.SessionId, QueryState->UserRequestContext);
temporary, TempTablesState.SessionId, QueryState->UserRequestContext, KqpTempTablesAgentActor);

ExecuterId = RegisterWithSameMailbox(executerActor);
}
Expand Down Expand Up @@ -1168,6 +1171,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
if (!tx) {
return std::nullopt;
}

auto optPath = tx->GetSchemeOpTempTablePath();
if (!optPath) {
return std::nullopt;
Expand All @@ -1194,6 +1198,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
if (!tx) {
return;
}

auto optInfo = GetTemporaryTableInfo(tx);
if (optInfo) {
auto [isCreate, info] = *optInfo;
Expand Down Expand Up @@ -1881,6 +1886,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
LOG_D("Cleanup temp tables: " << TempTablesState.TempTables.size());
auto tempTablesManager = CreateKqpTempTablesManager(
std::move(TempTablesState), SelfId(), Settings.Database);

RegisterWithSameMailbox(tempTablesManager);
return;
} else {
Expand Down Expand Up @@ -2214,6 +2220,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig;
TActorId KqpTempTablesAgentActor;
std::shared_ptr<std::atomic<bool>> CompilationCookie;
};

Expand All @@ -2225,11 +2232,12 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig)
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
const TActorId& kqpTempTablesAgentActor)
{
return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, federatedQuerySetup,
std::move(asyncIoFactory), std::move(moduleResolverState), counters,
queryServiceConfig, metadataProviderConfig
queryServiceConfig, metadataProviderConfig, kqpTempTablesAgentActor
);
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/session_actor/kqp_session_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig
);
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
const TActorId& kqpTempTablesAgentActor);

IActor* CreateKqpTempTablesManager(
TKqpTempTablesState tempTablesState, const TActorId& target, const TString& database);
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class TKqpTempTablesManager : public TActorBootstrapped<TKqpTempTablesManager> {
return NKikimrServices::TActivity::KQP_SESSION_ACTOR;
}

TKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target, const TString& database)
TKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target,
const TString& database)
: TempTablesState(std::move(tempTablesState))
, Target(target)
, Database(database)
Expand Down Expand Up @@ -112,7 +113,8 @@ class TKqpTempTablesManager : public TActorBootstrapped<TKqpTempTablesManager> {

} // namespace

IActor* CreateKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target, const TString& database)
IActor* CreateKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target,
const TString& database)
{
return new TKqpTempTablesManager(tempTablesState, target, database);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ PEERDIR(
ydb/core/kqp/common
ydb/core/kqp/federated_query
ydb/public/lib/operation_id
ydb/core/tx/schemeshard
)

YQL_LAST_ABI_VERSION()
Expand Down
Loading
Loading