Skip to content

Commit

Permalink
add feature flag to enable spilling nodes (#6895)
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Jul 26, 2024
1 parent 6064125 commit 7cf9db7
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 20 deletions.
1 change: 1 addition & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableSpillingGenericQuery = serviceConfig.GetEnableQueryServiceSpilling();
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());

if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
ui64 defaultCostBasedOptimizationLevel = TableServiceConfig.GetDefaultCostBasedOptimizationLevel();
bool enableConstantFolding = TableServiceConfig.GetEnableConstantFolding();

TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes();

TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");

Expand All @@ -556,6 +558,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit ||
TableServiceConfig.GetIdxLookupJoinPointsLimit() != idxLookupPointsLimit ||
TableServiceConfig.GetEnableSpillingNodes() != enableSpillingNodes ||
TableServiceConfig.GetEnableQueryServiceSpilling() != enableQueryServiceSpilling ||
TableServiceConfig.GetDefaultCostBasedOptimizationLevel() != defaultCostBasedOptimizationLevel ||
TableServiceConfig.GetEnableConstantFolding() != enableConstantFolding ||
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
runtimeSettings.UseSpilling = args.WithSpilling;
runtimeSettings.StatsMode = args.StatsMode;

if (runtimeSettings.UseSpilling) {
args.Task->SetEnableSpilling(runtimeSettings.UseSpilling);
}

if (args.Deadline) {
runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
request.SetStartAllOrFail(true);
if (UseDataQueryPool) {
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA);
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
} else {
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::SCAN);
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
Expand Down
44 changes: 25 additions & 19 deletions ydb/core/kqp/provider/yql_kikimr_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ EOptionalFlag GetOptionalFlagValue(const TMaybe<TType>& flag) {
return EOptionalFlag::Disabled;
}


ui64 ParseEnableSpillingNodes(const TString &v) {
ui64 res = 0;
TVector<TString> vec;
StringSplitter(v).SplitBySet(",;| ").AddTo(&vec);
for (auto& s: vec) {
if (s.empty()) {
throw yexception() << "Empty value item";
}
auto value = FromStringWithDefault<NYql::TDqSettings::EEnabledSpillingNodes>(
s, NYql::TDqSettings::EEnabledSpillingNodes::None);
res |= ui64(value);
}
return res;
}

static inline bool GetFlagValue(const TMaybe<bool>& flag) {
return flag ? flag.GetRef() : false;
}
Expand Down Expand Up @@ -73,20 +89,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
REGISTER_SETTING(*this, OptUseFinalizeByKey);
REGISTER_SETTING(*this, CostBasedOptimizationLevel);
REGISTER_SETTING(*this, EnableSpillingNodes)
.Parser([](const TString& v) {
ui64 res = 0;
TVector<TString> vec;
StringSplitter(v).SplitBySet(",;| ").AddTo(&vec);
for (auto& s: vec) {
if (s.empty()) {
throw yexception() << "Empty value item";
}
auto value = FromStringWithDefault<NYql::TDqSettings::EEnabledSpillingNodes>(
s, NYql::TDqSettings::EEnabledSpillingNodes::None);
res |= ui64(value);
}
return res;
});
.Parser([](const TString& v) { return ParseEnableSpillingNodes(v); });

REGISTER_SETTING(*this, MaxDPccpDPTableSize);

Expand Down Expand Up @@ -143,11 +146,6 @@ bool TKikimrSettings::HasOptUseFinalizeByKey() const {
return GetOptionalFlagValue(OptUseFinalizeByKey.Get()) != EOptionalFlag::Disabled;
}

ui64 TKikimrSettings::GetEnabledSpillingNodes() const {
return EnableSpillingNodes.Get().GetOrElse(0);
}


EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const {
return GetOptionalFlagValue(OptEnablePredicateExtract.Get());
}
Expand All @@ -169,4 +167,12 @@ TKikimrSettings::TConstPtr TKikimrConfiguration::Snapshot() const {
return std::make_shared<const TKikimrSettings>(*this);
}

void TKikimrConfiguration::SetDefaultEnabledSpillingNodes(const TString& node) {
DefaultEnableSpillingNodes = ParseEnableSpillingNodes(node);
}

ui64 TKikimrConfiguration::GetEnabledSpillingNodes() const {
return EnableSpillingNodes.Get().GetOrElse(DefaultEnableSpillingNodes);
}

}
5 changes: 4 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ struct TKikimrSettings {
bool HasOptEnableOlapPushdown() const;
bool HasOptEnableOlapProvideComputeSharding() const;
bool HasOptUseFinalizeByKey() const;
ui64 GetEnabledSpillingNodes() const;

EOptionalFlag GetOptPredicateExtract() const;
EOptionalFlag GetUseLlvm() const;
Expand Down Expand Up @@ -167,6 +166,10 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
bool EnableSpillingGenericQuery = false;
ui32 DefaultCostBasedOptimizationLevel = 3;
bool EnableConstantFolding = true;
ui64 DefaultEnableSpillingNodes = 0;

void SetDefaultEnabledSpillingNodes(const TString& node);
ui64 GetEnabledSpillingNodes() const;
};

}
60 changes: 60 additions & 0 deletions ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,70 @@ NKikimrConfig::TAppConfig AppCfg() {
return appCfg;
}

NKikimrConfig::TAppConfig AppCfgLowComputeLimits(ui64 reasonableTreshold) {
NKikimrConfig::TAppConfig appCfg;

auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager();
rm->SetMkqlLightProgramMemoryLimit(100);
rm->SetMkqlHeavyProgramMemoryLimit(300);
rm->SetReasonableSpillingTreshold(reasonableTreshold);
appCfg.MutableTableServiceConfig()->SetEnableQueryServiceSpilling(true);

auto* spilling = appCfg.MutableTableServiceConfig()->MutableSpillingServiceConfig()->MutableLocalFileConfig();

spilling->SetEnable(true);
spilling->SetRoot("./spilling/");

return appCfg;
}


} // anonymous namespace

Y_UNIT_TEST_SUITE(KqpScanSpilling) {

Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) {
ui64 reasonableTreshold = EnabledSpilling ? 100 : 200_MB;
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
TKikimrRunner kikimr(AppCfgLowComputeLimits(reasonableTreshold));

auto db = kikimr.GetQueryClient();

for (ui32 i = 0; i < 300; ++i) {
auto result = db.ExecuteQuery(Sprintf(R"(
--!syntax_v1
REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s")
)", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

auto query = R"(
--!syntax_v1
PRAGMA ydb.EnableSpillingNodes="GraceJoin";
select t1.Key, t1.Value, t2.Key, t2.Value
from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value
order by t1.Value
)";

auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain);
auto planres = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString());

Cerr << planres.GetStats()->GetAst() << Endl;

auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
if (EnabledSpilling) {
UNIT_ASSERT(counters.SpillingWriteBlobs->Val() > 0);
UNIT_ASSERT(counters.SpillingReadBlobs->Val() > 0);
} else {
UNIT_ASSERT(counters.SpillingWriteBlobs->Val() == 0);
UNIT_ASSERT(counters.SpillingReadBlobs->Val() == 0);
}
}

Y_UNIT_TEST(SelfJoinQueryService) {
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/table_service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,6 @@ message TTableServiceConfig {
optional bool EnableConstantFolding = 65 [ default = true ];

optional bool EnableImplicitQueryParameterTypes = 66 [ default = true ];

optional string EnableSpillingNodes = 67 [ default = "All" ];
};

0 comments on commit 7cf9db7

Please sign in to comment.