diff --git a/ydb/core/base/appdata.cpp b/ydb/core/base/appdata.cpp index 8fb299a65dc3..ebda1942c34d 100644 --- a/ydb/core/base/appdata.cpp +++ b/ydb/core/base/appdata.cpp @@ -75,6 +75,7 @@ TAppData::TAppData( , BootstrapConfigPtr(new NKikimrConfig::TBootstrap()) , AwsCompatibilityConfigPtr(new NKikimrConfig::TAwsCompatibilityConfig()) , S3ProxyResolverConfigPtr(new NKikimrConfig::TS3ProxyResolverConfig()) + , BackgroundCleaningConfigPtr(new NKikimrConfig::TBackgroundCleaningConfig()) , StreamingConfig(*StreamingConfigPtr.get()) , PQConfig(*PQConfigPtr.get()) , PQClusterDiscoveryConfig(*PQClusterDiscoveryConfigPtr.get()) @@ -96,8 +97,8 @@ TAppData::TAppData( , BootstrapConfig(*BootstrapConfigPtr.get()) , AwsCompatibilityConfig(*AwsCompatibilityConfigPtr.get()) , S3ProxyResolverConfig(*S3ProxyResolverConfigPtr.get()) + , BackgroundCleaningConfig(*BackgroundCleaningConfigPtr.get()) , KikimrShouldContinue(kikimrShouldContinue) - {} TIntrusivePtr TAppData::RandomProvider = CreateDefaultRandomProvider(); diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index 329183c5afce..c1f49337d067 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -31,7 +31,7 @@ namespace NKikimrSharedCache { namespace NKikimrProto { class TKeyConfig; - class TAuthConfig; + class TAuthConfig; namespace NFolderService { class TFolderServiceConfig; @@ -60,6 +60,7 @@ namespace NKikimrConfig { class TBootstrap; class TAwsCompatibilityConfig; class TS3ProxyResolverConfig; + class TBackgroundCleaningConfig; } namespace NKikimrNetClassifier { @@ -204,6 +205,7 @@ struct TAppData { std::unique_ptr AwsCompatibilityConfigPtr; std::unique_ptr S3ProxyResolverConfigPtr; std::unique_ptr SharedCacheConfigPtr; + std::unique_ptr BackgroundCleaningConfigPtr; NKikimrStream::TStreamingConfig& StreamingConfig; NKikimrPQ::TPQConfig& PQConfig; @@ -226,6 +228,7 @@ struct TAppData { NKikimrConfig::TBootstrap& BootstrapConfig; NKikimrConfig::TAwsCompatibilityConfig& AwsCompatibilityConfig; NKikimrConfig::TS3ProxyResolverConfig& S3ProxyResolverConfig; + NKikimrConfig::TBackgroundCleaningConfig& BackgroundCleaningConfig; bool EnforceUserTokenRequirement = false; bool AllowHugeKeyValueDeletes = true; // delete when all clients limit deletes per request bool EnableKqpSpilling = false; diff --git a/ydb/core/cms/console/configs_dispatcher.cpp b/ydb/core/cms/console/configs_dispatcher.cpp index 9879c65ba8d8..bd6b40d2edf8 100644 --- a/ydb/core/cms/console/configs_dispatcher.cpp +++ b/ydb/core/cms/console/configs_dispatcher.cpp @@ -57,6 +57,7 @@ const THashSet DYNAMIC_KINDS({ (ui32)NKikimrConsole::TConfigItem::TenantPoolConfigItem, (ui32)NKikimrConsole::TConfigItem::TenantSlotBrokerConfigItem, (ui32)NKikimrConsole::TConfigItem::AllowEditYamlInUiItem, + (ui32)NKikimrConsole::TConfigItem::BackgroundCleaningConfigItem }); const THashSet NON_YAML_KINDS({ @@ -161,7 +162,7 @@ class TConfigsDispatcher : public TActorBootstrapped { TCheckKindsResult CheckKinds(const TVector& kinds, const char* errorContext) const; NKikimrConfig::TAppConfig ParseYamlProtoConfig(); - + void Handle(NMon::TEvHttpInfo::TPtr &ev); void Handle(TEvInterconnect::TEvNodesInfo::TPtr &ev); void Handle(TEvConsole::TEvConfigSubscriptionNotification::TPtr &ev); @@ -783,7 +784,7 @@ void TConfigsDispatcher::Handle(TEvConsole::TEvConfigSubscriptionNotification::T subscription->YamlVersion = std::nullopt; } } - + if (CurrentStateFunc() == &TThis::StateInit) { Become(&TThis::StateWork); ProcessEnqueuedEvents(); @@ -995,7 +996,7 @@ void TConfigsDispatcher::Handle(TEvConsole::TEvGetNodeLabelsRequest::TPtr &ev) { Send(ev->Sender, Response.Release()); } - + IActor *CreateConfigsDispatcher( const NKikimrConfig::TAppConfig &config, const TMap &labels, diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 6f087ff0046d..9691bc7f542f 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -228,6 +228,7 @@ class TDomainsInitializer : public IAppDataInitializer { appData->EnableKqpSpilling = Config.GetTableServiceConfig().GetSpillingServiceConfig().GetLocalFileConfig().GetEnable(); appData->CompactionConfig = Config.GetCompactionConfig(); + appData->BackgroundCleaningConfig = Config.GetBackgroundCleaningConfig(); } }; diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 6d4cef595b03..e74d2c06cce1 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -93,10 +93,12 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext); -IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, +IActor* CreateKqpSchemeExecuter( + TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe& requestType, const TString& database, TIntrusiveConstPtr userToken, - bool temporary, TString SessionId, TIntrusivePtr ctx); + bool temporary, TString SessionId, TIntrusivePtr ctx, + const TActorId& kqpTempTablesAgentActor = TActorId()); std::unique_ptr ExecuteLiteral( IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner, const TIntrusivePtr& userRequestContext); diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index aa5028518bed..780bc7b85b0b 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -50,9 +50,11 @@ class TKqpSchemeExecuter : public TActorBootstrapped { return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR; } - TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe& requestType, + TKqpSchemeExecuter( + TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe& requestType, const TString& database, TIntrusiveConstPtr userToken, - bool temporary, TString sessionId, TIntrusivePtr ctx) + bool temporary, TString sessionId, TIntrusivePtr ctx, + const TActorId& kqpTempTablesAgentActor) : PhyTx(phyTx) , QueryType(queryType) , Target(target) @@ -62,6 +64,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped { , SessionId(sessionId) , RequestContext(std::move(ctx)) , RequestType(requestType) + , KqpTempTablesAgentActor(kqpTempTablesAgentActor) { YQL_ENSURE(PhyTx); YQL_ENSURE(PhyTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME); @@ -107,6 +110,9 @@ class TKqpSchemeExecuter : public TActorBootstrapped { } tableDesc->SetName(tableDesc->GetName() + SessionId); tableDesc->SetPath(tableDesc->GetPath() + SessionId); + YQL_ENSURE(KqpTempTablesAgentActor != TActorId(), + "Create temp table with empty KqpTempTablesAgentActor"); + ActorIdToProto(KqpTempTablesAgentActor, modifyScheme.MutableTempTableOwnerActorId()); } ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); break; @@ -576,16 +582,20 @@ class TKqpSchemeExecuter : public TActorBootstrapped { ui64 SchemeShardTabletId = 0; TIntrusivePtr RequestContext; const TMaybe RequestType; + const TActorId KqpTempTablesAgentActor; }; } // namespace -IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, +IActor* CreateKqpSchemeExecuter( + TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe& requestType, const TString& database, TIntrusiveConstPtr userToken, bool temporary, TString sessionId, - TIntrusivePtr ctx) + TIntrusivePtr ctx, const TActorId& kqpTempTablesAgentActor) { - return new TKqpSchemeExecuter(phyTx, queryType, target, requestType, database, userToken, temporary, sessionId, std::move(ctx)); + return new TKqpSchemeExecuter( + phyTx, queryType, target, requestType, database, userToken, + temporary, sessionId, std::move(ctx), kqpTempTablesAgentActor); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 157c5a6dfc73..f19e2e74e82d 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -109,6 +110,32 @@ TString EncodeSessionId(ui32 nodeId, const TString& id) { return NOperationId::ProtoToString(opId); } +class TKqpTempTablesAgentActor: public TActorBootstrapped { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::KQP_PROXY_ACTOR; + } + + explicit TKqpTempTablesAgentActor() + {} + + void Bootstrap() { + Become(&TKqpTempTablesAgentActor::StateWork); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NSchemeShard::TEvSchemeShard::TEvOwnerActorAck, HandleNoop) + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + template + void HandleNoop(T&) { + } +}; + class TKqpProxyService : public TActorBootstrapped { struct TEvPrivate { enum EEv { @@ -263,6 +290,8 @@ class TKqpProxyService : public TActorBootstrapped { KqpRmServiceActor = MakeKqpRmServiceID(SelfId().NodeId()); + KqpTempTablesAgentActor = Register(new TKqpTempTablesAgentActor()); + Become(&TKqpProxyService::MainState); StartCollectPeerProxyData(); PublishResourceUsage(); @@ -434,6 +463,8 @@ class TKqpProxyService : public TActorBootstrapped { void PassAway() override { Send(CompileService, new TEvents::TEvPoisonPill()); + Send(KqpTempTablesAgentActor, new TEvents::TEvPoisonPill()); + if (TableServiceConfig.GetEnableAsyncComputationPatternCompilation()) { Send(CompileComputationPatternService, new TEvents::TEvPoisonPill()); } @@ -657,8 +688,8 @@ class TKqpProxyService : public TActorBootstrapped { if (cancelAfter) { timerDuration = Min(timerDuration, cancelAfter); } - KQP_PROXY_LOG_D("Ctx: " << *ev->Get()->GetUserRequestContext() << ". TEvQueryRequest, set timer for: " << timerDuration - << " timeout: " << timeout << " cancelAfter: " << cancelAfter + KQP_PROXY_LOG_D("Ctx: " << *ev->Get()->GetUserRequestContext() << ". TEvQueryRequest, set timer for: " << timerDuration + << " timeout: " << timeout << " cancelAfter: " << cancelAfter << ". " << "Send request to target, requestId: " << requestId << ", targetId: " << targetId); auto status = timerDuration == cancelAfter ? NYql::NDqProto::StatusIds::CANCELLED : NYql::NDqProto::StatusIds::TIMEOUT; StartQueryTimeout(requestId, timerDuration, status); @@ -1419,7 +1450,9 @@ class TKqpProxyService : public TActorBootstrapped { auto config = CreateConfig(KqpSettings, workerSettings); - IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters, QueryServiceConfig, MetadataProviderConfig); + IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, + FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters, + QueryServiceConfig, MetadataProviderConfig, KqpTempTablesAgentActor); auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId); TKqpSessionInfo* sessionInfo = LocalSessions->Create( sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration(), pgWire); @@ -1651,6 +1684,7 @@ class TKqpProxyService : public TActorBootstrapped { EScriptExecutionsCreationStatus ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::NotStarted; std::deque> DelayedEventsQueue; bool IsLookupByRmScheduled = false; + TActorId KqpTempTablesAgentActor; }; } // namespace diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 1ff33679fd32..f8bfb307c8b8 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -155,7 +156,8 @@ class TKqpSessionActor : public TActorBootstrapped { NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TIntrusivePtr moduleResolverState, TIntrusivePtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, - const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig) + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, + const TActorId& kqpTempTablesAgentActor) : Owner(owner) , SessionId(sessionId) , Counters(counters) @@ -168,6 +170,7 @@ class TKqpSessionActor : public TActorBootstrapped { , Transactions(*Config->_KqpMaxActiveTxPerSession.Get(), TDuration::Seconds(*Config->_KqpTxIdleTimeoutSec.Get())) , QueryServiceConfig(queryServiceConfig) , MetadataProviderConfig(metadataProviderConfig) + , KqpTempTablesAgentActor(kqpTempTablesAgentActor) { RequestCounters = MakeIntrusive(); RequestCounters->Counters = Counters; @@ -1066,7 +1069,7 @@ class TKqpSessionActor : public TActorBootstrapped { bool temporary = GetTemporaryTableInfo(tx).has_value(); auto executerActor = CreateKqpSchemeExecuter(tx, QueryState->GetType(), SelfId(), requestType, Settings.Database, userToken, - temporary, TempTablesState.SessionId, QueryState->UserRequestContext); + temporary, TempTablesState.SessionId, QueryState->UserRequestContext, KqpTempTablesAgentActor); ExecuterId = RegisterWithSameMailbox(executerActor); } @@ -1168,6 +1171,7 @@ class TKqpSessionActor : public TActorBootstrapped { if (!tx) { return std::nullopt; } + auto optPath = tx->GetSchemeOpTempTablePath(); if (!optPath) { return std::nullopt; @@ -1194,6 +1198,7 @@ class TKqpSessionActor : public TActorBootstrapped { if (!tx) { return; } + auto optInfo = GetTemporaryTableInfo(tx); if (optInfo) { auto [isCreate, info] = *optInfo; @@ -1881,6 +1886,7 @@ class TKqpSessionActor : public TActorBootstrapped { LOG_D("Cleanup temp tables: " << TempTablesState.TempTables.size()); auto tempTablesManager = CreateKqpTempTablesManager( std::move(TempTablesState), SelfId(), Settings.Database); + RegisterWithSameMailbox(tempTablesManager); return; } else { @@ -2214,6 +2220,7 @@ class TKqpSessionActor : public TActorBootstrapped { NKikimrConfig::TQueryServiceConfig QueryServiceConfig; NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig; + TActorId KqpTempTablesAgentActor; std::shared_ptr> CompilationCookie; }; @@ -2225,11 +2232,12 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TIntrusivePtr moduleResolverState, TIntrusivePtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, - const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig) + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, + const TActorId& kqpTempTablesAgentActor) { return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, federatedQuerySetup, std::move(asyncIoFactory), std::move(moduleResolverState), counters, - queryServiceConfig, metadataProviderConfig + queryServiceConfig, metadataProviderConfig, kqpTempTablesAgentActor ); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h index 68214d534856..763e43de343d 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.h +++ b/ydb/core/kqp/session_actor/kqp_session_actor.h @@ -38,8 +38,8 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TIntrusivePtr moduleResolverState, TIntrusivePtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, - const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig - ); + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, + const TActorId& kqpTempTablesAgentActor); IActor* CreateKqpTempTablesManager( TKqpTempTablesState tempTablesState, const TActorId& target, const TString& database); diff --git a/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp b/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp index d7d13ad49828..d279f703d0f2 100644 --- a/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp +++ b/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp @@ -39,7 +39,8 @@ class TKqpTempTablesManager : public TActorBootstrapped { return NKikimrServices::TActivity::KQP_SESSION_ACTOR; } - TKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target, const TString& database) + TKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target, + const TString& database) : TempTablesState(std::move(tempTablesState)) , Target(target) , Database(database) @@ -112,7 +113,8 @@ class TKqpTempTablesManager : public TActorBootstrapped { } // namespace -IActor* CreateKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target, const TString& database) +IActor* CreateKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target, + const TString& database) { return new TKqpTempTablesManager(tempTablesState, target, database); } diff --git a/ydb/core/kqp/session_actor/ya.make b/ydb/core/kqp/session_actor/ya.make index 1dcdd665be4d..d728d0c2517f 100644 --- a/ydb/core/kqp/session_actor/ya.make +++ b/ydb/core/kqp/session_actor/ya.make @@ -16,6 +16,7 @@ PEERDIR( ydb/core/kqp/common ydb/core/kqp/federated_query ydb/public/lib/operation_id + ydb/core/tx/schemeshard ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 46aa0dacfc34..cf31b90b8c4a 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1580,6 +1580,28 @@ message TCompactionConfig { optional TBorrowedCompactionConfig BorrowedCompactionConfig = 2; } +message TBackgroundCleaningConfig { + optional double MaxRate = 1 [default = 0]; // unlimitted + optional uint64 InflightLimit = 2 [default = 10]; // TODO: consider more? + + // After this interval we will try to restart + optional uint64 TimeoutSeconds = 3 [default = 15]; + + // Do not wakeup earlier, than this interval + optional uint64 MinWakeupIntervalMs = 4 [default = 10]; + + message TRetrySettings { + optional uint32 StartDelayMs = 1 [default = 1000]; + optional uint32 MaxDelayMs = 2 [default = 256000]; + optional uint32 MaxRetryNumber = 3 [default = 8]; + } + + // after MaxRetryNumber retries to connect to the node, we will send the table to the BackgroundCleaningQueue, + // that is, after 2^(MaxRetryNumber + 1) - 1 = 511 seconds + + optional TRetrySettings RetrySettings = 5; +} + message TTracingConfig { message TAuthConfig { message TTvm { @@ -1758,6 +1780,7 @@ message TAppConfig { optional TConveyorConfig InsertConveyorConfig = 74; optional bool AllowEditYamlInUi = 75; optional TS3ProxyResolverConfig S3ProxyResolverConfig = 76; + optional TBackgroundCleaningConfig BackgroundCleaningConfig = 77; repeated TNamedConfig NamedConfigs = 100; optional string ClusterYamlConfig = 101; diff --git a/ydb/core/protos/console_config.proto b/ydb/core/protos/console_config.proto index 4e03297c7a9f..b3bddf6a1c9c 100644 --- a/ydb/core/protos/console_config.proto +++ b/ydb/core/protos/console_config.proto @@ -136,6 +136,7 @@ message TConfigItem { InsertConveyorConfigItem = 74; AllowEditYamlInUiItem = 75; S3ProxyResolverConfigItem = 76; + BackgroundCleaningConfigItem = 77; NamedConfigsItem = 100; ClusterYamlConfigItem = 101; diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 83e8533ba54b..3707875239e9 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -14,6 +14,7 @@ import "ydb/core/protos/blob_depot_config.proto"; import "ydb/public/api/protos/ydb_coordination.proto"; import "ydb/public/api/protos/ydb_export.proto"; import "ydb/public/api/protos/ydb_value.proto"; +import "ydb/library/actors/protos/actors.proto"; import "ydb/library/mkql_proto/protos/minikql.proto"; import "ydb/core/protos/index_builder.proto"; @@ -387,6 +388,7 @@ message TTableDescription { repeated TSequenceDescription Sequences = 39; optional TTableReplicationConfig ReplicationConfig = 40; + optional bool Temporary = 41; } @@ -1553,6 +1555,8 @@ message TModifyScheme { optional bool FailedOnAlreadyExists = 63 [default = true]; optional TViewDescription CreateView = 64; + + optional NActorsProto.TActorId TempTableOwnerActorId = 65; } // "Script", used by client to parse text files with multiple DDL commands diff --git a/ydb/core/protos/flat_tx_scheme.proto b/ydb/core/protos/flat_tx_scheme.proto index 598bf412cfa7..6972e9f44cae 100644 --- a/ydb/core/protos/flat_tx_scheme.proto +++ b/ydb/core/protos/flat_tx_scheme.proto @@ -408,3 +408,6 @@ message TEvFindTabletSubDomainPathIdResult { optional uint64 SchemeShardId = 3; // OwnerId of the subdomain optional uint64 SubDomainPathId = 4; // LocalPathId of the subdomain } + +message TEvOwnerActorAck { +} diff --git a/ydb/core/tx/schemeshard/schemeshard.h b/ydb/core/tx/schemeshard/schemeshard.h index e5f8cfe07d42..d86d17ffe9ad 100644 --- a/ydb/core/tx/schemeshard/schemeshard.h +++ b/ydb/core/tx/schemeshard/schemeshard.h @@ -93,6 +93,8 @@ struct TEvSchemeShard { EvProcessingRequest, EvProcessingResponse, + EvOwnerActorAck, + EvEnd }; @@ -642,6 +644,10 @@ struct TEvSchemeShard { struct TEvLoginResult : TEventPB { TEvLoginResult() = default; }; + + struct TEvOwnerActorAck : TEventPB { + TEvOwnerActorAck() = default; + }; }; } diff --git a/ydb/core/tx/schemeshard/schemeshard__background_cleaning.cpp b/ydb/core/tx/schemeshard/schemeshard__background_cleaning.cpp new file mode 100644 index 000000000000..2aa13e88a452 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__background_cleaning.cpp @@ -0,0 +1,281 @@ +#include "schemeshard_impl.h" + +#include + +namespace NKikimr::NSchemeShard { + +NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCleaning(const TPathId& pathId) { + auto info = ResolveTempTableInfo(pathId); + if (!info) { + return NOperationQueue::EStartStatus::EOperationRemove; + } + + auto& tempTablesByOwner = TempTablesState.TempTablesByOwner; + + auto it = tempTablesByOwner.find(info->OwnerActorId); + if (it == tempTablesByOwner.end()) { + return NOperationQueue::EStartStatus::EOperationRemove; + } + + auto tempTableIt = it->second.find(pathId); + if (tempTableIt == it->second.end()) { + return NOperationQueue::EStartStatus::EOperationRemove; + } + + auto ctx = ActorContext(); + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "RunBackgroundCleaning " + "for temp table# " << JoinPath({info->WorkingDir, info->Name}) + << ", ownerId# " << info->OwnerActorId + << ", next wakeup# " << BackgroundCleaningQueue->GetWakeupDelta() + << ", rate# " << BackgroundCleaningQueue->GetRate() + << ", in queue# " << BackgroundCleaningQueue->Size() << " cleaning events" + << ", running# " << BackgroundCleaningQueue->RunningSize() << " cleaning events" + << " at schemeshard " << TabletID()); + + auto txId = GetCachedTxId(ctx); + + auto propose = MakeHolder(ui64(txId), TabletID()); + auto& record = propose->Record; + + auto& modifyScheme = *record.AddTransaction(); + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpDropTable); + modifyScheme.SetWorkingDir(info->WorkingDir); + modifyScheme.SetInternal(true); + + auto& drop = *modifyScheme.MutableDrop(); + drop.SetName(info->Name); + + BackgroundCleaningTxs[txId] = pathId; + + Send(SelfId(), std::move(propose)); + + return NOperationQueue::EStartStatus::EOperationRunning; +} + +void TSchemeShard::HandleBackgroundCleaningCompletionResult(const TTxId& txId) { + const auto& pathId = BackgroundCleaningTxs.at(txId); + + auto ctx = ActorContext(); + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Get BackgroundCleaning CompletionResult " + "for txId# " << txId + << ", next wakeup# " << BackgroundCleaningQueue->GetWakeupDelta() + << ", in queue# " << BackgroundCleaningQueue->GetRate() << " cleaning events" + << ", running# " << BackgroundCleaningQueue->RunningSize() << " cleaning events" + << " at schemeshard " << TabletID()); + + BackgroundCleaningQueue->OnDone(pathId); +} + +void TSchemeShard::OnBackgroundCleaningTimeout(const TPathId& pathId) { + auto info = ResolveTempTableInfo(pathId); + + auto ctx = ActorContext(); + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "BackgroundCleaning timeout " + "for temp table# " << JoinPath({info->WorkingDir, info->Name}) + << ", ownerId# " << info->OwnerActorId + << ", next wakeup# " << BackgroundCleaningQueue->GetWakeupDelta() + << ", in queue# " << BackgroundCleaningQueue->GetRate() << " cleaning events" + << ", running# " << BackgroundCleaningQueue->RunningSize() << " cleaning events" + << " at schemeshard " << TabletID()); +} + +void TSchemeShard::Handle(TEvPrivate::TEvRetryNodeSubscribe::TPtr& ev, const TActorContext&) { + auto& nodeStates = TempTablesState.NodeStates; + auto nodeId = ev->Get()->NodeId; + + auto it = nodeStates.find(nodeId); + if (it == nodeStates.end()) { + return; + } + + auto& nodeState = it->second; + auto& retryState = nodeState.RetryState; + retryState.IsScheduled = false; + RetryNodeSubscribe(nodeId); +} + +void TSchemeShard::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext&) { + RetryNodeSubscribe(ev->Get()->NodeId); +} + +const TDuration& GetCurrentDelay( + const NKikimrConfig::TBackgroundCleaningConfig::TRetrySettings& backgroundCleaningRetrySettings, + TTempTablesState::TRetryState& state) { + if (state.CurrentDelay == TDuration::Zero()) { + state.CurrentDelay = + TDuration::MilliSeconds(backgroundCleaningRetrySettings.GetStartDelayMs()); + } + return state.CurrentDelay; +} + +TDuration GetDelay( + const NKikimrConfig::TBackgroundCleaningConfig::TRetrySettings& backgroundCleaningRetrySettings, + TTempTablesState::TRetryState& state +) { + auto newDelay = state.CurrentDelay; + newDelay *= 2; + auto maxDelay = + TDuration::MilliSeconds(backgroundCleaningRetrySettings.GetMaxDelayMs()); + if (newDelay > maxDelay) { + newDelay = maxDelay; + } + newDelay *= AppData()->RandomProvider->Uniform(50, 200); + newDelay /= 100; + state.CurrentDelay = newDelay; + return state.CurrentDelay; +} + +void TSchemeShard::RetryNodeSubscribe(ui32 nodeId) { + auto& nodeStates = TempTablesState.NodeStates; + auto it = nodeStates.find(nodeId); + if (it == nodeStates.end()) { + return; + } + + auto& nodeState = it->second; + auto& retryState = nodeState.RetryState; + + if (retryState.IsScheduled) { + return; + } + + retryState.RetryNumber++; + + if (retryState.RetryNumber > BackgroundCleaningRetrySettings.GetMaxRetryNumber()) { + for (const auto& ownerActorId: nodeState.Owners) { + auto& tempTablesByOwner = TempTablesState.TempTablesByOwner; + + auto itTempTables = tempTablesByOwner.find(ownerActorId); + if (itTempTables == tempTablesByOwner.end()) { + continue; + } + + auto& currentTempTables = itTempTables->second; + for (auto& pathId: currentTempTables) { + EnqueueBackgroundCleaning(pathId); + } + tempTablesByOwner.erase(itTempTables); + } + nodeStates.erase(it); + + Send(new IEventHandle(TActivationContext::InterconnectProxy(nodeId), SelfId(), + new TEvents::TEvUnsubscribe, 0)); + return; + } + + auto now = TlsActivationContext->Monotonic(); + if (now - retryState.LastRetryAt < GetCurrentDelay(BackgroundCleaningRetrySettings, retryState)) { + auto at = retryState.LastRetryAt + GetDelay(BackgroundCleaningRetrySettings, retryState); + retryState.IsScheduled = true; + Schedule(at - now, new TEvPrivate::TEvRetryNodeSubscribe(nodeId)); + return; + } + + for (const auto& ownerActorId: nodeState.Owners) { + Send(new IEventHandle(ownerActorId, SelfId(), + new TEvSchemeShard::TEvOwnerActorAck(), + IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession)); + } + retryState.LastRetryAt = now; + return; +} + +bool TSchemeShard::CheckOwnerUndelivered(TEvents::TEvUndelivered::TPtr& ev) { + auto& tempTablesByOwner = TempTablesState.TempTablesByOwner; + + auto ownerActorId = ev->Sender; + auto it = tempTablesByOwner.find(ownerActorId); + if (it == tempTablesByOwner.end()) { + return false; + } + + auto& currentTempTables = it->second; + + for (auto& pathId: currentTempTables) { + EnqueueBackgroundCleaning(pathId); + } + tempTablesByOwner.erase(it); + + auto& nodeStates = TempTablesState.NodeStates; + auto itNodeStates = nodeStates.find(ownerActorId.NodeId()); + if (itNodeStates == nodeStates.end()) { + return true; + } + auto itOwner = itNodeStates->second.Owners.find(ownerActorId); + if (itOwner == itNodeStates->second.Owners.end()) { + return true; + } + itNodeStates->second.Owners.erase(itOwner); + if (itNodeStates->second.Owners.empty()) { + nodeStates.erase(itNodeStates); + Send(new IEventHandle(TActivationContext::InterconnectProxy(ownerActorId.NodeId()), SelfId(), + new TEvents::TEvUnsubscribe, 0)); + } + + return true; +} + +void TSchemeShard::EnqueueBackgroundCleaning(const TPathId& pathId) { + if (BackgroundCleaningQueue) { + BackgroundCleaningQueue->Enqueue(std::move(pathId)); + } +} + +void TSchemeShard::RemoveBackgroundCleaning(const TPathId& pathId) { + if (BackgroundCleaningQueue) { + BackgroundCleaningQueue->Remove(std::move(pathId)); + } +} + +void TSchemeShard::HandleBackgroundCleaningTransactionResult( + TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& result) { + const auto txId = TTxId(result->Get()->Record.GetTxId()); + + auto ctx = ActorContext(); + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Get BackgroundCleaning TransactionResult " + "for txId# " << txId + << ", next wakeup# " << BackgroundCleaningQueue->GetWakeupDelta() + << ", in queue# " << BackgroundCleaningQueue->GetRate() << " cleaning events" + << ", running# " << BackgroundCleaningQueue->RunningSize() << " cleaning events" + << " at schemeshard " << TabletID()); + + const auto& pathId = BackgroundCleaningTxs.at(txId); + + const NKikimrScheme::TEvModifySchemeTransactionResult &record = result->Get()->Record; + + switch (record.GetStatus()) { + case NKikimrScheme::EStatus::StatusPathDoesNotExist: + case NKikimrScheme::EStatus::StatusSuccess: { + BackgroundCleaningQueue->OnDone(pathId); + break; + } + case NKikimrScheme::EStatus::StatusAccepted: + Send(SelfId(), new TEvSchemeShard::TEvNotifyTxCompletion(record.GetTxId())); + break; + default: { + BackgroundCleaningQueue->OnDone(pathId); + EnqueueBackgroundCleaning(pathId); + break; + } + } +} + +void TSchemeShard::ClearTempTablesState() { + auto ctx = ActorContext(); + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Clear TempTablesState with owners number: " + << TempTablesState.TempTablesByOwner.size()); + + if (BackgroundCleaningQueue) { + auto& nodeStates = TempTablesState.NodeStates; + for (const auto& [nodeId, nodeState] : nodeStates) { + Send(new IEventHandle(TActivationContext::InterconnectProxy(nodeId), SelfId(), + new TEvents::TEvUnsubscribe, 0)); + } + BackgroundCleaningQueue->Clear(); + } + TempTablesState.TempTablesByOwner.clear(); + TempTablesState.NodeStates.clear(); +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 5592f5587819..df556221b43d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -302,7 +302,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase { return true; } - typedef std::tuple TTableRec; + typedef std::tuple TTableRec; typedef TDeque TTableRows; template @@ -316,7 +316,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase { rowSet.template GetValueOrDefault(0), rowSet.template GetValueOrDefault(), rowSet.template GetValueOrDefault(false), - rowSet.template GetValueOrDefault() + rowSet.template GetValueOrDefault(), + rowSet.template GetValueOrDefault(false), + rowSet.template GetValueOrDefault("") ); } @@ -1791,6 +1793,39 @@ struct TSchemeShard::TTxInit : public TTransactionBase { } tableInfo->IsBackup = std::get<8>(rec); + tableInfo->IsTemporary = std::get<10>(rec); + + auto ownerActorIdStr = std::get<11>(rec); + tableInfo->OwnerActorId.Parse(ownerActorIdStr.c_str(), ownerActorIdStr.size()); + + if (tableInfo->IsTemporary) { + Y_VERIFY_S(tableInfo->OwnerActorId, "Empty OwnerActorId for temp table"); + + TActorId ownerActorId = tableInfo->OwnerActorId; + + auto& tempTablesByOwner = Self->TempTablesState.TempTablesByOwner; + auto& nodeStates = Self->TempTablesState.NodeStates; + + auto it = tempTablesByOwner.find(ownerActorId); + auto nodeId = ownerActorId.NodeId(); + + auto itNodeStates = nodeStates.find(nodeId); + if (itNodeStates == nodeStates.end()) { + auto& nodeState = nodeStates[nodeId]; + nodeState.Owners.insert(ownerActorId); + nodeState.RetryState.CurrentDelay = + TDuration::MilliSeconds(Self->BackgroundCleaningRetrySettings.GetStartDelayMs()); + } else { + itNodeStates->second.Owners.insert(ownerActorId); + } + + if (it == tempTablesByOwner.end()) { + auto& currentTempTables = tempTablesByOwner[ownerActorId]; + currentTempTables.insert(pathId); + } else { + it->second.insert(pathId); + } + } Self->Tables[pathId] = tableInfo; Self->IncrementPathDbRefCount(pathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp index bb549e76de4a..265c11c0140c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp @@ -537,7 +537,7 @@ class TCreateTable: public TSubOperation { TString errStr; - if ((schema.HasTemporary() && schema.GetTemporary()) && !AppData()->FeatureFlags.GetEnableTempTables()) { + if ((schema.HasTemporary() && schema.GetTemporary()) && !context.SS->EnableTempTables) { result->SetError(NKikimrScheme::StatusPreconditionFailed, TStringBuilder() << "It is not allowed to create temp table: " << schema.GetName()); return result; @@ -644,6 +644,11 @@ class TCreateTable: public TSubOperation { Y_ABORT_UNLESS(tableInfo->GetPartitions().back().EndOfRange.empty(), "End of last range must be +INF"); + if (schema.HasTemporary() && schema.GetTemporary()) { + tableInfo->IsTemporary = true; + tableInfo->OwnerActorId = ActorIdFromProto(Transaction.GetTempTableOwnerActorId()); + } + context.SS->Tables[newTable->PathId] = tableInfo; context.SS->TabletCounters->Simple()[COUNTER_TABLE_COUNT].Add(1); context.SS->IncrementPathDbRefCount(newTable->PathId, "new path created"); @@ -692,6 +697,17 @@ class TCreateTable: public TSubOperation { context.SS->ClearDescribePathCaches(dstPath.Base()); context.OnComplete.PublishToSchemeBoard(OperationId, dstPath.Base()->PathId); + if (schema.HasTemporary() && schema.GetTemporary()) { + const auto& ownerActorId = tableInfo->OwnerActorId; + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Processing create temp table with Name: " << name + << ", WorkingDir: " << parentPathStr + << ", OwnerActorId: " << ownerActorId + << ", PathId: " << newTable->PathId); + context.OnComplete.UpdateTempTablesToCreateState( + ownerActorId, newTable->PathId); + } + Y_ABORT_UNLESS(shardsToCreate == txState.Shards.size()); dstPath.DomainInfo()->IncPathsInside(); dstPath.DomainInfo()->AddInternalShards(txState); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp index 9a97bb9f5a0e..d823886a892d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp @@ -597,6 +597,15 @@ class TDropTable: public TSubOperation { context.OnComplete.Dependence(splitTx.GetTxId(), OperationId.GetTxId()); } + if (table->IsTemporary) { + const auto& ownerActorId = table->OwnerActorId; + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Processing drop temp table with Name: " << name + << ", WorkingDir: " << parentPathStr + << ", OwnerActorId: " << ownerActorId); + context.OnComplete.UpdateTempTablesToDropState(ownerActorId, path.Base()->PathId); + } + context.OnComplete.ActivateTx(OperationId); SetState(NextState()); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp index 843fe9805593..b62f21cc66a3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp @@ -53,6 +53,24 @@ void TSideEffects::UnbindMsgFromPipe(TOperationId opId, TTabletId dst, TPipeMess BindedMessageAcks.push_back(TBindMsgAck(opId, dst, cookie)); } +void TSideEffects::UpdateTempTablesToCreateState(const TActorId& ownerActorId, const TPathId& pathId) { + auto it = TempTablesToCreateState.find(ownerActorId); + if (it == TempTablesToCreateState.end()) { + TempTablesToCreateState[ownerActorId] = { pathId }; + } else { + it->second.push_back(pathId); + } +} + +void TSideEffects::UpdateTempTablesToDropState(const TActorId& ownerActorId, const TPathId& pathId) { + auto it = TempTablesToDropState.find(ownerActorId); + if (it == TempTablesToDropState.end()) { + TempTablesToDropState[ownerActorId] = { pathId }; + } else { + it->second.push_back(pathId); + } +} + void TSideEffects::RouteByTabletsFromOperation(TOperationId opId) { RelationsByTabletsFromOperation.push_back(opId); } @@ -193,6 +211,9 @@ void TSideEffects::ApplyOnComplete(TSchemeShard* ss, const TActorContext& ctx) { DoWaitPublication(ss, ctx); DoPublishToSchemeBoard(ss, ctx); + DoUpdateTempTablesToCreateState(ss, ctx); + DoUpdateTempTablesToDropState(ss, ctx); + DoSend(ss, ctx); DoBindMsg(ss, ctx); @@ -755,6 +776,86 @@ void TSideEffects::DoPersistDeleteShards(TSchemeShard *ss, NTabletFlatExecutor:: ss->PersistShardsToDelete(db, ToDeleteShards); } +void TSideEffects::DoUpdateTempTablesToCreateState(TSchemeShard* ss, const TActorContext &ctx) { + for (auto& [ownerActorId, tempTables]: TempTablesToCreateState) { + + auto& tempTablesByOwner = ss->TempTablesState.TempTablesByOwner; + auto& nodeStates = ss->TempTablesState.NodeStates; + + auto it = tempTablesByOwner.find(ownerActorId); + + auto nodeId = ownerActorId.NodeId(); + + auto itNodeStates = nodeStates.find(nodeId); + if (itNodeStates == nodeStates.end()) { + auto& nodeState = nodeStates[nodeId]; + nodeState.Owners.insert(ownerActorId); + nodeState.RetryState.CurrentDelay = + TDuration::MilliSeconds(ss->BackgroundCleaningRetrySettings.GetStartDelayMs()); + } else { + itNodeStates->second.Owners.insert(ownerActorId); + } + + if (it == tempTablesByOwner.end()) { + ctx.Send(new IEventHandle(ownerActorId, ss->SelfId(), + new TEvSchemeShard::TEvOwnerActorAck(), + IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession)); + + auto& currentTempTables = tempTablesByOwner[ownerActorId]; + + for (auto& pathId : tempTables) { + currentTempTables.insert(std::move(pathId)); + } + continue; + } + + for (auto& pathId : tempTables) { + it->second.insert(std::move(pathId)); + } + } +} + +void TSideEffects::DoUpdateTempTablesToDropState(TSchemeShard* ss, const TActorContext& ctx) { + for (auto& [ownerActorId, tempTables]: TempTablesToDropState) { + auto& tempTablesByOwner = ss->TempTablesState.TempTablesByOwner; + + auto it = tempTablesByOwner.find(ownerActorId); + if (it == tempTablesByOwner.end()) { + continue; + } + + for (auto& pathId : tempTables) { + auto tempTableIt = it->second.find(std::move(pathId)); + if (tempTableIt == it->second.end()) { + continue; + } + + it->second.erase(tempTableIt); + ss->RemoveBackgroundCleaning(pathId); + } + + if (it->second.empty()) { + tempTablesByOwner.erase(it); + + auto& nodeStates = ss->TempTablesState.NodeStates; + + auto nodeId = ownerActorId.NodeId(); + auto itStates = nodeStates.find(nodeId); + if (itStates != nodeStates.end()) { + auto itOwner = itStates->second.Owners.find(ownerActorId); + if (itOwner != itStates->second.Owners.end()) { + itStates->second.Owners.erase(itOwner); + } + if (itStates->second.Owners.empty()) { + nodeStates.erase(itStates); + ctx.Send(new IEventHandle(TActivationContext::InterconnectProxy(nodeId), ss->SelfId(), + new TEvents::TEvUnsubscribe, 0)); + } + } + } + } +} + void TSideEffects::ResumeLongOps(TSchemeShard *ss, const TActorContext &ctx) { ss->Resume(IndexToProgress, ctx); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.h b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.h index 3a1587a9d71f..aa7a68f28185 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.h @@ -1,6 +1,7 @@ #pragma once #include "schemeshard_identificators.h" +#include "schemeshard_types.h" #include #include @@ -63,6 +64,8 @@ class TSideEffects: public TSimpleRefCount { TVector PendingActivateShardCreated; TDeque WaitPublications; TDeque Barriers; + THashMap> TempTablesToCreateState; + THashMap> TempTablesToDropState; public: using TPtr = TIntrusivePtr; @@ -99,6 +102,9 @@ class TSideEffects: public TSimpleRefCount { void UnbindMsgFromPipe(TOperationId opId, TTabletId dst, TShardIdx shardIdx); void UnbindMsgFromPipe(TOperationId opId, TTabletId dst, TPipeMessageId cookie); + void UpdateTempTablesToCreateState(const TActorId& ownerActorId, const TPathId& pathId); + void UpdateTempTablesToDropState(const TActorId& ownerActorId, const TPathId& pathId); + void RouteByTabletsFromOperation(TOperationId opId); void RouteByTablet(TOperationId opId, TTabletId dst); void RouteByShardIdx(TOperationId opId, TShardIdx shardIdx); @@ -158,6 +164,9 @@ class TSideEffects: public TSimpleRefCount { void DoPersistDeleteShards(TSchemeShard* ss, NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx); + void DoUpdateTempTablesToCreateState(TSchemeShard* ss, const TActorContext &ctx); + void DoUpdateTempTablesToDropState(TSchemeShard* ss, const TActorContext &ctx); + void ResumeLongOps(TSchemeShard* ss, const TActorContext& ctx); void SetupRoutingLongOps(TSchemeShard* ss, const TActorContext& ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index fa2662f423bb..2d396a1528a2 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -61,6 +61,16 @@ bool ResolvePoolNames( const TSchemeLimits TSchemeShard::DefaultLimits = {}; +void TSchemeShard::SubscribeToTempTableOwners() { + auto ctx = ActorContext(); + auto& tempTablesByOwner = TempTablesState.TempTablesByOwner; + for (const auto& [ownerActorId, tempTables] : tempTablesByOwner) { + ctx.Send(new IEventHandle(ownerActorId, SelfId(), + new TEvSchemeShard::TEvOwnerActorAck(), + IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession)); + } +} + void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, TActivationOpts&& opts) { TPathId subDomainPathId = GetCurrentSubDomainPathId(); TSubDomainInfo::TPtr domainPtr = ResolveDomainInfo(subDomainPathId); @@ -127,11 +137,14 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, TActiva ScheduleCleanDroppedSubDomains(); StartStopCompactionQueues(); + BackgroundCleaningQueue->Start(); ctx.Send(TxAllocatorClient, MakeHolder(InitiateCachedTxIdsCount)); InitializeStatistics(ctx); + SubscribeToTempTableOwners(); + Become(&TThis::StateWork); } @@ -384,6 +397,8 @@ void TSchemeShard::Clear() { UpdateBorrowedCompactionQueueMetrics(); } + ClearTempTablesState(); + ShardsWithBorrowed.clear(); ShardsWithLoaned.clear(); @@ -2487,7 +2502,8 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId NIceDb::TUpdate(TString()), NIceDb::TUpdate(ttlSettings), NIceDb::TUpdate(tableInfo->IsBackup), - NIceDb::TUpdate(replicationConfig)); + NIceDb::TUpdate(tableInfo->IsTemporary), + NIceDb::TUpdate(tableInfo->OwnerActorId.ToString())); } else { db.Table().Key(pathId.OwnerId, pathId.LocalPathId).Update( NIceDb::TUpdate(tableInfo->NextColumnId), @@ -2497,7 +2513,9 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId NIceDb::TUpdate(TString()), NIceDb::TUpdate(ttlSettings), NIceDb::TUpdate(tableInfo->IsBackup), - NIceDb::TUpdate(replicationConfig)); + NIceDb::TUpdate(replicationConfig), + NIceDb::TUpdate(tableInfo->IsTemporary), + NIceDb::TUpdate(tableInfo->OwnerActorId.ToString())); } for (auto col : tableInfo->Columns) { @@ -4131,6 +4149,7 @@ TSchemeShard::TSchemeShard(const TActorId &tablet, TTabletStorageInfo *info) , PipeTracker(*PipeClientCache) , CompactionStarter(this) , BorrowedCompactionStarter(this) + , BackgroundCleaningStarter(this) , ShardDeleter(info->TabletID) , TableStatsQueue(this, COUNTER_STATS_QUEUE_SIZE, @@ -4226,8 +4245,14 @@ void TSchemeShard::Die(const TActorContext &ctx) { if (CompactionQueue) CompactionQueue->Shutdown(ctx); - if (BorrowedCompactionQueue) + if (BorrowedCompactionQueue) { BorrowedCompactionQueue->Shutdown(ctx); + } + + ClearTempTablesState(); + if (BackgroundCleaningQueue) { + BackgroundCleaningQueue->Shutdown(ctx); + } return IActor::Die(ctx); } @@ -4272,11 +4297,14 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { EnableTablePgTypes = appData->FeatureFlags.GetEnableTablePgTypes(); EnableServerlessExclusiveDynamicNodes = appData->FeatureFlags.GetEnableServerlessExclusiveDynamicNodes(); EnableAddColumsWithDefaults = appData->FeatureFlags.GetEnableAddColumsWithDefaults(); + EnableTempTables = appData->FeatureFlags.GetEnableTempTables(); ConfigureCompactionQueues(appData->CompactionConfig, ctx); ConfigureStatsBatching(appData->SchemeShardConfig, ctx); ConfigureStatsOperations(appData->SchemeShardConfig, ctx); + ConfigureBackgroundCleaningQueue(appData->BackgroundCleaningConfig, ctx); + if (appData->ChannelProfiles) { ChannelProfiles = appData->ChannelProfiles; } @@ -4546,6 +4574,10 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); HFuncTraced(TEvPrivate::TEvSendBaseStatsToSA, Handle); + // for subscriptions on owners + HFuncTraced(TEvInterconnect::TEvNodeDisconnected, Handle); + HFuncTraced(TEvPrivate::TEvRetryNodeSubscribe, Handle); + default: if (!HandleDefaultEvents(ev, SelfId())) { ALOG_WARN(NKikimrServices::FLAT_TX_SCHEMESHARD, @@ -5458,7 +5490,7 @@ void TSchemeShard::Handle(TEvHive::TEvUpdateDomainReply::TPtr &ev, const TActorC << ", at schemeshard: " << TabletID()); return; } - + Execute(CreateTxOperationReply(TOperationId(txId, partId), ev), ctx); } @@ -6162,6 +6194,8 @@ void TSchemeShard::Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr return Execute(CreateTxProgressImport(ev), ctx); } else if (TxIdToIndexBuilds.contains(txId)) { return Execute(CreateTxReply(ev), ctx); + } else if (BackgroundCleaningTxs.contains(txId)) { + return HandleBackgroundCleaningTransactionResult(ev); } LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, @@ -6216,6 +6250,10 @@ void TSchemeShard::Handle(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev, Execute(CreateTxReply(txId), ctx); executed = true; } + if (BackgroundCleaningTxs.contains(txId)) { + HandleBackgroundCleaningCompletionResult(txId); + executed = true; + } if (executed) { return; @@ -6386,6 +6424,27 @@ void TSchemeShard::ApplyPartitionConfigStoragePatch( } } +std::optional TSchemeShard::ResolveTempTableInfo(const TPathId& pathId) { + auto path = TPath::Init(pathId, this); + if (!path) { + return std::nullopt; + } + TTempTableInfo info; + info.Name = path.LeafName(); + info.WorkingDir = path.Parent().PathString(); + + TTableInfo::TPtr table = Tables.at(path.Base()->PathId); + if (!table) { + return std::nullopt; + } + if (!table->IsTemporary) { + return std::nullopt; + } + + info.OwnerActorId = table->OwnerActorId; + return info; +} + // Fills CreateTable transaction for datashard with the specified range void TSchemeShard::FillTableDescriptionForShardIdx( TPathId tableId, TShardIdx shardIdx, NKikimrSchemeOp::TTableDescription* tableDescr, @@ -6693,7 +6752,10 @@ void TSchemeShard::Handle(TEvPrivate::TEvConsoleConfigsTimeout::TPtr&, const TAc LoadTableProfiles(nullptr, ctx); } -void TSchemeShard::Handle(TEvents::TEvUndelivered::TPtr&, const TActorContext& ctx) { +void TSchemeShard::Handle(TEvents::TEvUndelivered::TPtr& ev, const TActorContext& ctx) { + if (CheckOwnerUndelivered(ev)) { + return; + } LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Cannot subscribe to console configs"); LoadTableProfiles(nullptr, ctx); } @@ -6708,6 +6770,11 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi ConfigureCompactionQueues(compactionConfig, ctx); } + if (appConfig.HasBackgroundCleaningConfig()) { + const auto& backgroundCleaningConfig = appConfig.GetBackgroundCleaningConfig(); + ConfigureBackgroundCleaningQueue(backgroundCleaningConfig, ctx); + } + if (appConfig.HasSchemeShardConfig()) { const auto& schemeShardConfig = appConfig.GetSchemeShardConfig(); ConfigureStatsBatching(schemeShardConfig, ctx); @@ -6727,6 +6794,9 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi if (IsSchemeShardConfigured()) { StartStopCompactionQueues(); + if (BackgroundCleaningQueue) { + BackgroundCleaningQueue->Start(); + } } } @@ -6750,6 +6820,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featu EnableTablePgTypes = featureFlags.GetEnableTablePgTypes(); EnableServerlessExclusiveDynamicNodes = featureFlags.GetEnableServerlessExclusiveDynamicNodes(); EnableAddColumsWithDefaults = featureFlags.GetEnableAddColumsWithDefaults(); + EnableTempTables = featureFlags.GetEnableTempTables(); } void TSchemeShard::ConfigureStatsBatching(const NKikimrConfig::TSchemeShardConfig& config, const TActorContext& ctx) { @@ -6876,6 +6947,38 @@ void TSchemeShard::ConfigureBorrowedCompactionQueue( << ", InflightLimit# " << compactionConfig.InflightLimit); } +void TSchemeShard::ConfigureBackgroundCleaningQueue( + const NKikimrConfig::TBackgroundCleaningConfig& config, + const TActorContext &ctx) +{ + TBackgroundCleaningQueue::TConfig cleaningConfig; + + cleaningConfig.IsCircular = false; + cleaningConfig.Timeout = TDuration::Seconds(config.GetTimeoutSeconds()); + cleaningConfig.MinWakeupInterval = TDuration::MilliSeconds(config.GetMinWakeupIntervalMs()); + cleaningConfig.InflightLimit = config.GetInflightLimit(); + cleaningConfig.MaxRate = config.GetMaxRate(); + + if (config.HasRetrySettings()) { + BackgroundCleaningRetrySettings = config.GetRetrySettings(); + } + + if (BackgroundCleaningQueue) { + BackgroundCleaningQueue->UpdateConfig(cleaningConfig); + } else { + BackgroundCleaningQueue = new TBackgroundCleaningQueue( + cleaningConfig, + BackgroundCleaningStarter); + ctx.RegisterWithSameMailbox(BackgroundCleaningQueue); + } + + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "BackgroundCleaningQueue configured: Timeout# " << cleaningConfig.Timeout + << ", Rate# " << BackgroundCleaningQueue->GetRate() + << ", WakeupInterval# " << cleaningConfig.WakeupInterval + << ", InflightLimit# " << cleaningConfig.InflightLimit); +} + void TSchemeShard::StartStopCompactionQueues() { // note, that we don't need to check current state of compaction queue if (IsServerlessDomain(TPath::Init(RootPathId(), this))) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 05c3f4cb7abd..3cb64a12d0ee 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -134,6 +134,31 @@ class TSchemeShard TSchemeShard* Self; }; + using TBackgroundCleaningQueue = NOperationQueue::TOperationQueueWithTimer< + TPathId, + TFifoQueue, + TEvPrivate::EvRunBackgroundCleaning, + NKikimrServices::FLAT_TX_SCHEMESHARD, + NKikimrServices::TActivity::SCHEMESHARD_BACKGROUND_CLEANING>; + + class TBackgroundCleaningStarter : public TBackgroundCleaningQueue::IStarter { + public: + TBackgroundCleaningStarter(TSchemeShard* self) + : Self(self) + { } + + NOperationQueue::EStartStatus StartOperation(const TPathId& pathId) override { + return Self->StartBackgroundCleaning(pathId); + } + + void OnTimeout(const TPathId& pathId) override { + Self->OnBackgroundCleaningTimeout(pathId); + } + + private: + TSchemeShard* Self; + }; + public: static constexpr ui32 DefaultPQTabletPartitionsCount = 1; static constexpr ui32 MaxPQTabletPartitionsCount = 1000; @@ -219,6 +244,8 @@ class TSchemeShard THashMap ExternalDataSources; THashMap Views; + TTempTablesState TempTablesState; + TTablesStorage ColumnTables; // it is only because we need to manage undo of upgrade subdomain, finally remove it @@ -257,6 +284,11 @@ class TSchemeShard TBorrowedCompactionStarter BorrowedCompactionStarter; TBorrowedCompactionQueue* BorrowedCompactionQueue = nullptr; + TBackgroundCleaningStarter BackgroundCleaningStarter; + TBackgroundCleaningQueue* BackgroundCleaningQueue = nullptr; + THashMap BackgroundCleaningTxs; + NKikimrConfig::TBackgroundCleaningConfig::TRetrySettings BackgroundCleaningRetrySettings; + // shardIdx -> clientId THashMap RunningBorrowedCompactions; @@ -272,6 +304,7 @@ class TSchemeShard bool EnableTablePgTypes = false; bool EnableServerlessExclusiveDynamicNodes = false; bool EnableAddColumsWithDefaults = false; + bool EnableTempTables = false; TShardDeleter ShardDeleter; @@ -412,6 +445,10 @@ class TSchemeShard const NKikimrConfig::TCompactionConfig::TBorrowedCompactionConfig& config, const TActorContext &ctx); + void ConfigureBackgroundCleaningQueue( + const NKikimrConfig::TBackgroundCleaningConfig& config, + const TActorContext &ctx); + void StartStopCompactionQueues(); void WaitForTableProfiles(ui64 importId, ui32 itemIdx); @@ -784,6 +821,9 @@ class TSchemeShard TVector TablesToClean; TDeque BlockStoreVolumesToClean; }; + + void SubscribeToTempTableOwners(); + void ActivateAfterInitialization(const TActorContext& ctx, TActivationOpts&& opts); struct TTxInitPopulator; @@ -814,6 +854,10 @@ class TSchemeShard void EnqueueBorrowedCompaction(const TShardIdx& shardIdx); void RemoveBorrowedCompaction(const TShardIdx& shardIdx); + void EnqueueBackgroundCleaning(const TPathId& pathId); + void RemoveBackgroundCleaning(const TPathId& pathId); + std::optional ResolveTempTableInfo(const TPathId& pathId); + void UpdateShardMetrics(const TShardIdx& shardIdx, const TPartitionStats& newStats); void RemoveShardMetrics(const TShardIdx& shardIdx); @@ -828,6 +872,17 @@ class TSchemeShard void BorrowedCompactionHandleDisconnect(TTabletId tabletId, const TActorId& clientId); void UpdateBorrowedCompactionQueueMetrics(); + NOperationQueue::EStartStatus StartBackgroundCleaning(const TPathId& pathId); + void OnBackgroundCleaningTimeout(const TPathId& pathId); + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx); + bool CheckOwnerUndelivered(TEvents::TEvUndelivered::TPtr& ev); + void RetryNodeSubscribe(ui32 nodeId); + void Handle(TEvPrivate::TEvRetryNodeSubscribe::TPtr& ev, const TActorContext& ctx); + void HandleBackgroundCleaningTransactionResult( + TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& result); + void HandleBackgroundCleaningCompletionResult(const TTxId& txId); + void ClearTempTablesState(); + struct TTxCleanDroppedSubDomains; NTabletFlatExecutor::ITransaction* CreateTxCleanDroppedSubDomains(); @@ -1284,6 +1339,8 @@ class TSchemeShard void ConnectToSA(); void SendBaseStatsToSA(); + + public: void ChangeStreamShardsCount(i64 delta) override; void ChangeStreamShardsQuota(i64 delta) override; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 8ae3ac154926..265d620a8201 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -403,6 +403,8 @@ struct TTableInfo : public TSimpleRefCount { THashMap Columns; TVector KeyColumnIds; bool IsBackup = false; + bool IsTemporary = false; + TActorId OwnerActorId; TAlterTableInfo::TPtr AlterData; @@ -2194,7 +2196,7 @@ struct TSubDomainInfo: TSimpleRefCount { TPathId ResourcesDomainId; TTabletId SharedHive = InvalidTabletId; TMaybeServerlessComputeResourcesMode ServerlessComputeResourcesMode; - + NLoginProto::TSecurityState SecurityState; ui64 SecurityStateVersion = 0; diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h index aed184ba4ba3..40129c234fe7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_private.h +++ b/ydb/core/tx/schemeshard/schemeshard_private.h @@ -2,6 +2,7 @@ #include "defs.h" #include "schemeshard_identificators.h" +#include namespace NKikimr { namespace NSchemeShard { @@ -29,6 +30,8 @@ struct TEvPrivate { EvRunCdcStreamScan, EvPersistTopicStats, EvSendBaseStatsToSA, + EvRunBackgroundCleaning, + EvRetryNodeSubscribe, EvEnd }; @@ -183,6 +186,13 @@ struct TEvPrivate { struct TEvSendBaseStatsToSA: public TEventLocal { }; + struct TEvRetryNodeSubscribe : public TEventLocal { + ui32 NodeId; + + explicit TEvRetryNodeSubscribe(ui32 nodeId) + : NodeId(nodeId) + { } + }; }; // TEvPrivate } // NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 3651c4e37d7f..929f8a4e07ae 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -118,6 +118,8 @@ struct Schema : NIceDb::Schema { struct TTLSettings : Column<8, NScheme::NTypeIds::String> {}; struct IsBackup : Column<9, NScheme::NTypeIds::Bool> {}; struct ReplicationConfig : Column<10, NScheme::NTypeIds::String> {}; + struct IsTemporary : Column<11, NScheme::NTypeIds::Bool> {}; + struct OwnerActorId : Column<12, NScheme::NTypeIds::String> {}; using TKey = TableKey; using TColumns = TableColumns< @@ -130,7 +132,9 @@ struct Schema : NIceDb::Schema { PartitioningVersion, TTLSettings, IsBackup, - ReplicationConfig + ReplicationConfig, + IsTemporary, + OwnerActorId >; }; @@ -147,6 +151,8 @@ struct Schema : NIceDb::Schema { struct TTLSettings : Column<9, NScheme::NTypeIds::String> {}; struct IsBackup : Column<10, NScheme::NTypeIds::Bool> {}; struct ReplicationConfig : Column<11, NScheme::NTypeIds::String> {}; + struct IsTemporary : Column<12, NScheme::NTypeIds::Bool> {}; + struct OwnerActorId : Column<13, NScheme::NTypeIds::String> {}; using TKey = TableKey; using TColumns = TableColumns< @@ -160,7 +166,9 @@ struct Schema : NIceDb::Schema { PartitioningVersion, TTLSettings, IsBackup, - ReplicationConfig + ReplicationConfig, + IsTemporary, + OwnerActorId >; }; diff --git a/ydb/core/tx/schemeshard/schemeshard_types.h b/ydb/core/tx/schemeshard/schemeshard_types.h index e59d0482791a..6d1f2389fe09 100644 --- a/ydb/core/tx/schemeshard/schemeshard_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_types.h @@ -138,4 +138,28 @@ enum class EAttachChildResult : ui32 { using EServerlessComputeResourcesMode = NKikimrSubDomains::EServerlessComputeResourcesMode; +struct TTempTablesState { + + struct TRetryState { + bool IsScheduled = false; + NMonotonic::TMonotonic LastRetryAt = TMonotonic::Zero(); + TDuration CurrentDelay = TDuration::Zero(); + ui32 RetryNumber = 0; + }; + + struct TNodeState { + THashSet Owners; + TRetryState RetryState; + }; + + THashMap> TempTablesByOwner; // OwnerActorId -> [ TPathId ] + THashMap NodeStates; // NodeId -> TNodeState +}; + +struct TTempTableInfo { + TString WorkingDir; + TString Name; + TActorId OwnerActorId; +}; + } diff --git a/ydb/core/tx/schemeshard/ut_background_cleaning/ut_background_cleaning.cpp b/ydb/core/tx/schemeshard/ut_background_cleaning/ut_background_cleaning.cpp new file mode 100644 index 000000000000..4f2dbb773d20 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_background_cleaning/ut_background_cleaning.cpp @@ -0,0 +1,324 @@ +#include +#include +#include +#include +#include +#include + +#include +#include + +using namespace NKikimr; +using namespace NSchemeShardUT_Private; + +namespace { + +THolder GetTestBackgroundCleaningConfig(bool withRetries = false) { + auto request = MakeHolder(); + + auto* backgroundCleaningConfig = request->Record.MutableConfig()->MutableBackgroundCleaningConfig(); + backgroundCleaningConfig->SetMaxRate(10); + backgroundCleaningConfig->SetMinWakeupIntervalMs(500); + backgroundCleaningConfig->SetInflightLimit(10); + + auto* retrySettings = backgroundCleaningConfig->MutableRetrySettings(); + + if (!withRetries) { + retrySettings->SetMaxRetryNumber(0); + } + + return request; +} + +void SetFeatures( + TTestActorRuntime &runtime, + TTestEnv&, + ui64 schemeShard, + const NKikimrConfig::TFeatureFlags& features, + bool withRetries = false) +{ + auto request = GetTestBackgroundCleaningConfig(withRetries); + *request->Record.MutableConfig()->MutableFeatureFlags() = features; + SetConfig(runtime, schemeShard, std::move(request)); +} + +void SetBackgroundCleaning(TTestActorRuntime &runtime, TTestEnv& env, ui64 schemeShard, bool withRetries = false) { + NKikimrConfig::TFeatureFlags features; + features.SetEnableTempTables(true); + SetFeatures(runtime, env, schemeShard, features, withRetries); +} + +void AsyncCreateTempTable(TTestActorRuntime& runtime, ui64 schemeShardId, ui64 txId, const TString& workingDir, const TString& scheme, const TActorId& ownerActorId, ui32 nodeIdx) { + auto ev = CreateTableRequest(txId, workingDir, scheme); + auto* tx = ev->Record.MutableTransaction(0); + auto* desc = tx->MutableCreateTable(); + desc->SetTemporary(true); + ActorIdToProto(ownerActorId, tx->MutableTempTableOwnerActorId()); + + AsyncSend(runtime, schemeShardId, ev, nodeIdx, ownerActorId); +} + +void TestCreateTempTable(TTestActorRuntime& runtime, ui64 txId, const TString& workingDir, const TString& scheme, const TActorId& ownerActorId, const TVector& expectedResults, ui32 ownerNodeIdx) { + AsyncCreateTempTable(runtime, TTestTxConfig::SchemeShard, txId, workingDir, scheme, ownerActorId, ownerNodeIdx); + TestModificationResults(runtime, txId, expectedResults); +} + +void TestDropTempTable(TTestActorRuntime& runtime, ui64 txId, const TString& workingDir, + const TString& scheme, bool checkUnsubscribe) { + TestDropTable(runtime, ++txId, workingDir, scheme); + if (checkUnsubscribe) { + TDispatchOptions options; + options.FinalEvents.emplace_back(NActors::TEvents::TSystem::Unsubscribe); + runtime.DispatchEvents(options); + } +} + +THashSet GetTables( + TTestActorRuntime &runtime, + ui64 tabletId) +{ + auto sender = runtime.AllocateEdgeActor(); + auto request = MakeHolder(); + runtime.SendToPipe(tabletId, sender, request.Release(), 0, GetPipeConfigWithRetries()); + + THashSet result; + + TAutoPtr handle; + auto response = runtime.GrabEdgeEventRethrow(handle); + for (auto& table: response->Record.GetUserTables()) { + result.insert(table.GetName()); + } + + return result; +} + +void CheckTable( + TTestActorRuntime &runtime, + const char* fullPath, + ui64 schemeshardId = TTestTxConfig::SchemeShard, + bool checkExists = true) +{ + TVector shards; + auto description = DescribePrivatePath(runtime, schemeshardId, fullPath, true, true); + if (!checkExists) { + UNIT_ASSERT(description.GetStatus() == NKikimrScheme::EStatus::StatusPathDoesNotExist); + return; + } + for (auto &part : description.GetPathDescription().GetTablePartitions()) + shards.push_back(part.GetDatashardId()); + + auto tables = GetTables(runtime, shards.at(0)); + auto userTableName = TStringBuf(fullPath).RNextTok('/'); + + UNIT_ASSERT(tables.contains(userTableName)); +} + +} // namespace + +Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCleaningTest) { + Y_UNIT_TEST(SchemeshardBackgroundCleaningTestSimpleCreateClean) { + TTestBasicRuntime runtime(3); + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + SetBackgroundCleaning(runtime, env, TTestTxConfig::SchemeShard); + env.SimulateSleep(runtime, TDuration::Seconds(30)); + + auto ownerActorId = runtime.AllocateEdgeActor(1); + + ui64 txId = 100; + TestCreateTempTable(runtime, txId, "/MyRoot", R"( + Name: "TempTable" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", ownerActorId, { NKikimrScheme::StatusAccepted }, 1); + + env.TestWaitNotification(runtime, txId); + + CheckTable(runtime, "/MyRoot/TempTable"); + + const TActorId proxy = runtime.GetInterconnectProxy(1, 0); + runtime.Send(new IEventHandle(proxy, TActorId(), new TEvInterconnect::TEvDisconnect(), 0, 0), 1, true); + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvInterconnect::EvNodeDisconnected); + runtime.DispatchEvents(options); + + env.SimulateSleep(runtime, TDuration::Seconds(50)); + CheckTable(runtime, "/MyRoot/TempTable", TTestTxConfig::SchemeShard, false); + } + + Y_UNIT_TEST(SchemeshardBackgroundCleaningTestCreateCleanWithRetry) { + TTestBasicRuntime runtime(3); + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + SetBackgroundCleaning(runtime, env, TTestTxConfig::SchemeShard, true); + env.SimulateSleep(runtime, TDuration::Seconds(30)); + + auto ownerActorId = runtime.AllocateEdgeActor(1); + + ui64 txId = 100; + TestCreateTempTable(runtime, txId, "/MyRoot", R"( + Name: "TempTable" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", ownerActorId, { NKikimrScheme::StatusAccepted }, 1); + + env.TestWaitNotification(runtime, txId); + + CheckTable(runtime, "/MyRoot/TempTable"); + + const TActorId proxy = runtime.GetInterconnectProxy(1, 0); + + runtime.Send(new IEventHandle(proxy, TActorId(), new TEvInterconnect::TEvDisconnect(), 0, 0), 1, true); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvInterconnect::EvNodeDisconnected); + runtime.DispatchEvents(options); + + env.SimulateSleep(runtime, TDuration::Seconds(50)); + + CheckTable(runtime, "/MyRoot/TempTable", TTestTxConfig::SchemeShard, true); + } + + Y_UNIT_TEST(SchemeshardBackgroundCleaningTestCreateCleanManyTables) { + TTestBasicRuntime runtime(3); + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + SetBackgroundCleaning(runtime, env, TTestTxConfig::SchemeShard); + env.SimulateSleep(runtime, TDuration::Seconds(30)); + + auto ownerActorId = runtime.AllocateEdgeActor(1); + ui64 txId = 100; + TestCreateTempTable(runtime, txId, "/MyRoot", R"( + Name: "TempTable1" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", ownerActorId, { NKikimrScheme::StatusAccepted }, 1); + env.TestWaitNotification(runtime, txId); + + ++txId; + TestCreateTempTable(runtime, txId, "/MyRoot", R"( + Name: "TempTable2" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", ownerActorId, { NKikimrScheme::StatusAccepted }, 1); + env.TestWaitNotification(runtime, txId); + + CheckTable(runtime, "/MyRoot/TempTable1"); + CheckTable(runtime, "/MyRoot/TempTable2"); + + const TActorId proxy = runtime.GetInterconnectProxy(1, 0); + runtime.Send(new IEventHandle(proxy, TActorId(), new TEvInterconnect::TEvDisconnect(), 0, 0), 1, true); + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvInterconnect::EvNodeDisconnected); + runtime.DispatchEvents(options); + + { + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvInterconnect::EvNodeConnected); + runtime.DispatchEvents(options); + } + + CheckTable(runtime, "/MyRoot/TempTable1", TTestTxConfig::SchemeShard, false); + CheckTable(runtime, "/MyRoot/TempTable2", TTestTxConfig::SchemeShard, false); + } + + Y_UNIT_TEST(SchemeshardBackgroundCleaningTestReboot) { + TTestBasicRuntime runtime(3); + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + SetBackgroundCleaning(runtime, env, TTestTxConfig::SchemeShard); + env.SimulateSleep(runtime, TDuration::Seconds(30)); + + auto ownerActorId1 = runtime.AllocateEdgeActor(1); + ui64 txId1 = 100; + TestCreateTempTable(runtime, txId1, "/MyRoot", R"( + Name: "TempTable1" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", ownerActorId1, { NKikimrScheme::StatusAccepted }, 1); + env.TestWaitNotification(runtime, txId1); + + auto ownerActorId2 = runtime.AllocateEdgeActor(2); + ui64 txId2 = ++txId1; + TestCreateTempTable(runtime, txId2, "/MyRoot", R"( + Name: "TempTable2" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", ownerActorId2, { NKikimrScheme::StatusAccepted }, 2); + env.TestWaitNotification(runtime, txId2); + + CheckTable(runtime, "/MyRoot/TempTable1"); + CheckTable(runtime, "/MyRoot/TempTable2"); + + TActorId sender = runtime.AllocateEdgeActor(); + RebootTablet(runtime, TTestTxConfig::SchemeShard, sender); + + env.SimulateSleep(runtime, TDuration::Seconds(30)); + SetBackgroundCleaning(runtime, env, TTestTxConfig::SchemeShard); + env.SimulateSleep(runtime, TDuration::Seconds(30)); + + const TActorId proxy = runtime.GetInterconnectProxy(1, 0); + runtime.Send(new IEventHandle(proxy, TActorId(), new TEvInterconnect::TEvDisconnect(), 0, 0), 1, true); + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvInterconnect::EvNodeDisconnected); + runtime.DispatchEvents(options); + + env.SimulateSleep(runtime, TDuration::Seconds(50)); + CheckTable(runtime, "/MyRoot/TempTable1", TTestTxConfig::SchemeShard, false); + CheckTable(runtime, "/MyRoot/TempTable2", TTestTxConfig::SchemeShard); + } + + Y_UNIT_TEST(SchemeshardBackgroundCleaningTestSimpleDrop) { + TTestBasicRuntime runtime(3); + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + SetBackgroundCleaning(runtime, env, TTestTxConfig::SchemeShard); + env.SimulateSleep(runtime, TDuration::Seconds(30)); + + auto ownerActorId = runtime.AllocateEdgeActor(1); + + ui64 txId = 100; + TestCreateTempTable(runtime, txId, "/MyRoot", R"( + Name: "TempTable" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", ownerActorId, { NKikimrScheme::StatusAccepted }, 1); + + env.TestWaitNotification(runtime, txId); + + CheckTable(runtime, "/MyRoot/TempTable"); + + ++txId; + TestDropTempTable(runtime, txId, "/MyRoot", "TempTable", true); + + env.SimulateSleep(runtime, TDuration::Seconds(50)); + CheckTable(runtime, "/MyRoot/TempTable", TTestTxConfig::SchemeShard, false); + } +}; diff --git a/ydb/core/tx/schemeshard/ut_background_cleaning/ya.make b/ydb/core/tx/schemeshard/ut_background_cleaning/ya.make new file mode 100644 index 000000000000..efa9259979cd --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_background_cleaning/ya.make @@ -0,0 +1,32 @@ +UNITTEST_FOR(ydb/core/tx/schemeshard) + +FORK_SUBTESTS() + +SPLIT_FACTOR(3) + +IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) + TIMEOUT(3600) + SIZE(LARGE) + TAG(ya:fat) +ELSE() + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +PEERDIR( + library/cpp/getopt + library/cpp/regex/pcre + ydb/core/testlib/default + ydb/core/tx + ydb/core/tx/schemeshard/ut_helpers + ydb/core/wrappers/ut_helpers + ydb/core/base +) + +SRCS( + ut_background_cleaning.cpp +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index aa302b2715a8..db27c43c06fc 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -1706,7 +1706,7 @@ namespace NSchemeShardUT_Private { } void TestBuildColumn(TTestActorRuntime& runtime, ui64 id, ui64 schemeShard, const TString &dbName, - const TString &src, const TString& columnName, const Ydb::TypedValue& literal, Ydb::StatusIds::StatusCode expectedStatus) + const TString &src, const TString& columnName, const Ydb::TypedValue& literal, Ydb::StatusIds::StatusCode expectedStatus) { AsyncBuildColumn(runtime, id, schemeShard, dbName, src, columnName, literal); @@ -2181,8 +2181,13 @@ namespace NSchemeShardUT_Private { return combination; } - void AsyncSend(TTestActorRuntime &runtime, ui64 targetTabletId, IEventBase *ev) { - ForwardToTablet(runtime, targetTabletId, runtime.AllocateEdgeActor(), ev); + void AsyncSend(TTestActorRuntime &runtime, ui64 targetTabletId, IEventBase *ev, + ui32 nodeIndex, TActorId sender) { + if (sender == TActorId()) { + ForwardToTablet(runtime, targetTabletId, runtime.AllocateEdgeActor(nodeIndex), ev); + } else { + ForwardToTablet(runtime, targetTabletId, sender, ev, nodeIndex); + } } TTestActorRuntimeBase::TEventObserver SetSuppressObserver(TTestActorRuntime &runtime, TVector > &suppressed, ui32 type) { diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index 6dd806cfd179..ddd1153e6c3f 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -117,7 +117,8 @@ namespace NSchemeShardUT_Private { void SkipModificationReply(TTestActorRuntime& runtime, ui32 num = 1); TEvTx* CombineSchemeTransactions(const TVector& transactions); - void AsyncSend(TTestActorRuntime &runtime, ui64 targetTabletId, IEventBase *ev); + void AsyncSend(TTestActorRuntime &runtime, ui64 targetTabletId, + IEventBase *ev, ui32 nodeIndex = 0, TActorId sender = TActorId()); ////////// generic diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index d1e8f5599223..10cb0c14cae5 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -1,5 +1,6 @@ RECURSE_FOR_TESTS( ut_auditsettings + ut_background_cleaning ut_backup ut_base ut_base_reboots @@ -55,6 +56,7 @@ LIBRARY() SRCS( defs.h schemeshard.cpp + schemeshard__background_cleaning.cpp schemeshard__borrowed_compaction.cpp schemeshard__compaction.cpp schemeshard__clean_pathes.cpp diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 49d3bcf55c95..fe0f4b71f6ae 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1016,5 +1016,6 @@ message TActivity { KAFKA_READ_SESSION_ACTOR = 623; GRAPH_SERVICE = 624; REPLICATION_WORKER = 625; + SCHEMESHARD_BACKGROUND_CLEANING = 626; }; }; diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 937270748b93..ac60dcdd47f0 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -346,6 +346,16 @@ "ColumnId": 10, "ColumnName": "ReplicationConfig", "ColumnType": "String" + }, + { + "ColumnId": 11, + "ColumnName": "IsTemporary", + "ColumnType": "Bool" + }, + { + "ColumnId": 12, + "ColumnName": "OwnerActorId", + "ColumnType": "String" } ], "ColumnsDropped": [], @@ -361,7 +371,9 @@ 7, 8, 9, - 10 + 10, + 11, + 12 ], "RoomID": 0, "Codec": 0, @@ -3533,6 +3545,16 @@ "ColumnId": 11, "ColumnName": "ReplicationConfig", "ColumnType": "String" + }, + { + "ColumnId": 12, + "ColumnName": "IsTemporary", + "ColumnType": "Bool" + }, + { + "ColumnId": 13, + "ColumnName": "OwnerActorId", + "ColumnType": "String" } ], "ColumnsDropped": [], @@ -3549,7 +3571,9 @@ 8, 9, 10, - 11 + 11, + 12, + 13 ], "RoomID": 0, "Codec": 0,