Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YQL-16903] Introduce BlockEngine pragma (parser/config provider part) #616

Merged
merged 2 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,7 @@ class TKqpHost : public IKqpHost {
|| settingName == "DisableOrderedColumns"
|| settingName == "Warning"
|| settingName == "UseBlocks"
|| settingName == "BlockEngine"
;
};
auto configProvider = CreateConfigProvider(*TypesCtx, gatewaysConfig, {}, allowSettings);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx, TTypeA
} else if (auto maybeRead = node.Maybe<TKqpReadOlapTableRanges>()) {
auto read = maybeRead.Cast();

if (typesCtx.UseBlocks) {
if (typesCtx.IsBlockEngineEnabled()) {
wideRead = Build<TCoWideFromBlocks>(ctx, node.Pos())
.Input<TKqpBlockReadOlapTableRanges>()
.Table(read.Table())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7774,7 +7774,7 @@ THolder<IGraphTransformer> CreatePeepHoleFinalStageTransformer(TTypeAnnotationCo
pipeline.Add(
CreateFunctorTransformer(
[&types](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> IGraphTransformer::TStatus {
if (types.UseBlocks) {
if (types.IsBlockEngineEnabled()) {
const auto& extStageRules = TPeepHoleRules::Instance().BlockStageExtRules;
return PeepHoleBlockStage(input, output, ctx, types, extStageRules);
} else {
Expand All @@ -7789,7 +7789,7 @@ THolder<IGraphTransformer> CreatePeepHoleFinalStageTransformer(TTypeAnnotationCo
pipeline.Add(
CreateFunctorTransformer(
[&types](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> IGraphTransformer::TStatus {
if (types.UseBlocks) {
if (types.IsBlockEngineEnabled()) {
const auto& extStageRules = TPeepHoleRules::Instance().BlockStageExtFinalRules;
return PeepHoleBlockStage(input, output, ctx, types, extStageRules);
} else {
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/core/type_ann/type_ann_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4689,7 +4689,7 @@ namespace {
return IGraphTransformer::TStatus::Repeat;
}

if (isMany && ctx.Types.UseBlocks) {
if (isMany && ctx.Types.IsBlockEngineEnabled()) {
auto streamIndex = inputStructType->FindItem("_yql_group_stream_index");
if (streamIndex) {
const TTypeAnnotationNode* streamIndexType = inputStructType->GetItems()[*streamIndex]->GetItemType();
Expand Down
14 changes: 7 additions & 7 deletions ydb/library/yql/core/yql_aggregate_expander.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ TExprNode::TPtr TAggregateExpander::ExpandAggregate()

HaveDistinct = AnyOf(AggregatedColumns->ChildrenList(),
[](const auto& child) { return child->ChildrenSize() == 3; });
EffectiveCompact = (HaveDistinct && CompactForDistinct && !TypesCtx.UseBlocks) || ForceCompact || HasSetting(*settings, "compact");
EffectiveCompact = (HaveDistinct && CompactForDistinct && !TypesCtx.IsBlockEngineEnabled()) || ForceCompact || HasSetting(*settings, "compact");
for (const auto& trait : Traits) {
auto mergeLambda = trait->Child(5);
if (mergeLambda->Tail().IsCallable("Void")) {
Expand Down Expand Up @@ -56,7 +56,7 @@ TExprNode::TPtr TAggregateExpander::ExpandAggregate()
return GeneratePhases();
}

if (TypesCtx.UseBlocks) {
if (TypesCtx.IsBlockEngineEnabled()) {
if (Suffix == "Combine") {
auto ret = TryGenerateBlockCombine();
if (ret) {
Expand Down Expand Up @@ -2776,7 +2776,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
streams.push_back(SerializeIdxSet(indicies));
}

if (TypesCtx.UseBlocks) {
if (TypesCtx.IsBlockEngineEnabled()) {
for (ui32 i = 0; i < unionAllInputs.size(); ++i) {
unionAllInputs[i] = Ctx.Builder(Node->Pos())
.Callable("Map")
Expand All @@ -2797,7 +2797,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
}

auto settings = Node->ChildPtr(3);
if (TypesCtx.UseBlocks) {
if (TypesCtx.IsBlockEngineEnabled()) {
settings = AddSetting(*settings, Node->Pos(), "many_streams", Ctx.NewList(Node->Pos(), std::move(streams)), Ctx);
}

Expand Down Expand Up @@ -2830,7 +2830,7 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombine() {
}

TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalize() {
if (UsePartitionsByKeys || !TypesCtx.UseBlocks) {
if (UsePartitionsByKeys || !TypesCtx.IsBlockEngineEnabled()) {
return nullptr;
}

Expand Down Expand Up @@ -2919,13 +2919,13 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
TExprNode::TPtr ExpandAggregatePeephole(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
if (NNodes::TCoAggregate::Match(node.Get())) {
NNodes::TCoAggregate self(node);
auto ret = TAggregateExpander::CountAggregateRewrite(self, ctx, typesCtx.UseBlocks);
auto ret = TAggregateExpander::CountAggregateRewrite(self, ctx, typesCtx.IsBlockEngineEnabled());
if (ret != node) {
YQL_CLOG(DEBUG, Core) << "CountAggregateRewrite on peephole";
return ret;
}
}
return ExpandAggregatePeepholeImpl(node, ctx, typesCtx, false, typesCtx.UseBlocks);
return ExpandAggregatePeepholeImpl(node, ctx, typesCtx, false, typesCtx.IsBlockEngineEnabled());
}

} // namespace NYql
12 changes: 11 additions & 1 deletion ydb/library/yql/core/yql_type_annotation.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ enum class EMatchRecognizeStreamingMode {
Force,
};

enum class EBlockEngineMode {
Disable /* "disable" */,
Auto /* "auto" */,
Force /* "force" */,
};

struct TUdfCachedInfo {
const TTypeAnnotationNode* FunctionType = nullptr;
const TTypeAnnotationNode* RunConfigType = nullptr;
Expand Down Expand Up @@ -251,6 +257,7 @@ struct TTypeAnnotationContext: public TThrRefBase {
bool YsonCastToString = true;
ui32 FolderSubDirsLimit = 1000;
bool UseBlocks = false;
EBlockEngineMode BlockEngineMode = EBlockEngineMode::Disable;
bool PgEmitAggApply = false;
IArrowResolver::TPtr ArrowResolver;
ECostBasedOptimizerType CostBasedOptimizer = ECostBasedOptimizerType::Disable;
Expand Down Expand Up @@ -350,7 +357,10 @@ struct TTypeAnnotationContext: public TThrRefBase {
void SetStats(const TExprNode* input, std::shared_ptr<TOptimizerStatistics> stats) {
StatisticsMap[input] = stats;
}


bool IsBlockEngineEnabled() const {
return BlockEngineMode != EBlockEngineMode::Disable || UseBlocks;
}
};

template <> inline
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/opt/dq_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ TExprBase DqRewriteAggregate(TExprBase node, TExprContext& ctx, TTypeAnnotationC
if (!node.Maybe<TCoAggregateBase>()) {
return node;
}
TAggregateExpander aggExpander(true, !typesCtx.UseBlocks && !useFinalizeByKey, useFinalizeByKey, node.Ptr(), ctx, typesCtx, false, compactForDistinct, usePhases);
TAggregateExpander aggExpander(true, !typesCtx.IsBlockEngineEnabled() && !useFinalizeByKey, useFinalizeByKey, node.Ptr(), ctx, typesCtx, false, compactForDistinct, usePhases);
auto result = aggExpander.ExpandAggregate();
YQL_ENSURE(result);

Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/opt/dq_opt_peephole.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ NNodes::TExprBase DqPeepholeRewriteLength(const NNodes::TExprBase& node, TExprCo
}

auto dqPhyLength = node.Cast<TDqPhyLength>();
if (typesCtx.UseBlocks) {
if (typesCtx.IsBlockEngineEnabled()) {
return NNodes::TExprBase(ctx.Builder(node.Pos())
.Callable("NarrowMap")
.Callable(0, "BlockCombineAll")
Expand Down
12 changes: 12 additions & 0 deletions ydb/library/yql/providers/config/yql_config_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,18 @@ namespace {
return false;
}
}
else if (name == "BlockEngine") {
if (args.size() != 1) {
ctx.AddError(TIssue(pos, TStringBuilder() << "Expected at most 1 argument, but got " << args.size()));
return false;
}

auto arg = TString{args[0]};
if (!TryFromString(arg, Types.BlockEngineMode)) {
ctx.AddError(TIssue(pos, TStringBuilder() << "Expected `disable|auto|force', but got: " << args[0]));
return false;
}
}
else {
ctx.AddError(TIssue(pos, TStringBuilder() << "Unsupported command: " << name));
return false;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/dq/opt/logical_optimize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
auto input = aggregate.Input().Maybe<TDqConnection>();

if (input) {
auto newNode = TAggregateExpander::CountAggregateRewrite(aggregate, ctx, TypesCtx.UseBlocks);
auto newNode = TAggregateExpander::CountAggregateRewrite(aggregate, ctx, TypesCtx.IsBlockEngineEnabled());
if (node.Ptr() != newNode) {
return TExprBase(newNode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2512,7 +2512,7 @@ class TYtLogicalOptProposalTransformer : public TOptimizeTransformerBase {
return node;
}

return TAggregateExpander::CountAggregateRewrite(aggregate, ctx, State_->Types->UseBlocks);
return TAggregateExpander::CountAggregateRewrite(aggregate, ctx, State_->Types->IsBlockEngineEnabled());
}

TMaybeNode<TExprBase> ZeroSampleToZeroLimit(TExprBase node, TExprContext& ctx) const {
Expand Down
35 changes: 27 additions & 8 deletions ydb/library/yql/sql/pg/pg_sql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ class TConverter : public IPGParseEvents {
: AstParseResult(astParseResult)
, Settings(settings)
, DqEngineEnabled(Settings.DqDefaultAuto->Allow())
, BlockEngineEnabled(Settings.BlockDefaultAuto->Allow())
{
Positions.push_back({});
ScanRows(query);
Expand All @@ -281,6 +282,10 @@ class TConverter : public IPGParseEvents {
DqEngineEnabled = true;
} else if (flag == "DqEngineForce") {
DqEngineForce = true;
} else if (flag == "BlockEngineEnable") {
BlockEngineEnabled = true;
} else if (flag == "BlockEngineForce") {
BlockEngineForce = true;
}
}

Expand Down Expand Up @@ -320,6 +325,8 @@ class TConverter : public IPGParseEvents {
Statements.push_back(L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource,
QA("OrderedColumns"))));

ui32 blockEnginePgmPos = Statements.size();
Statements.push_back(configSource);
ui32 costBasedOptimizerPos = Statements.size();
Statements.push_back(configSource);
ui32 dqEnginePgmPos = Statements.size();
Expand Down Expand Up @@ -359,6 +366,13 @@ class TConverter : public IPGParseEvents {
Statements.erase(Statements.begin() + costBasedOptimizerPos);
}

if (BlockEngineEnabled) {
Statements[blockEnginePgmPos] = L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource,
QA("BlockEngine"), QA(BlockEngineForce ? "force" : "auto")));
} else {
Statements.erase(Statements.begin() + blockEnginePgmPos);
}

return VL(Statements.data(), Statements.size());
}

Expand Down Expand Up @@ -2001,7 +2015,7 @@ class TConverter : public IPGParseEvents {
AddError(TStringBuilder() << "VariableSetStmt, expected string literal for " << value->name << " option");
return nullptr;
}
} else if (name == "dqengine") {
} else if (name == "dqengine" || name == "blockengine") {
if (ListLength(value->args) != 1) {
AddError(TStringBuilder() << "VariableSetStmt, expected 1 arg, but got: " << ListLength(value->args));
return nullptr;
Expand All @@ -2011,17 +2025,20 @@ class TConverter : public IPGParseEvents {
if (NodeTag(arg) == T_A_Const && (NodeTag(CAST_NODE(A_Const, arg)->val) == T_String)) {
auto rawStr = StrVal(CAST_NODE(A_Const, arg)->val);
auto str = to_lower(TString(rawStr));
const bool isDqEngine = name == "dqengine";
auto& enable = isDqEngine ? DqEngineEnabled : BlockEngineEnabled;
auto& force = isDqEngine ? DqEngineForce : BlockEngineForce;
if (str == "auto") {
DqEngineEnabled = true;
DqEngineForce = false;
enable = true;
force = false;
} else if (str == "force") {
DqEngineEnabled = true;
DqEngineForce = true;
enable = true;
force = true;
} else if (str == "disable") {
DqEngineEnabled = false;
DqEngineForce = false;
enable = false;
force = false;
} else {
AddError(TStringBuilder() << "VariableSetStmt, not supported DqEngine option value: " << rawStr);
AddError(TStringBuilder() << "VariableSetStmt, not supported " << value->name << " option value: " << rawStr);
return nullptr;
}
} else {
Expand Down Expand Up @@ -4247,6 +4264,8 @@ class TConverter : public IPGParseEvents {
bool DqEngineEnabled = false;
bool DqEngineForce = false;
TString CostBasedOptimizer;
bool BlockEngineEnabled = false;
bool BlockEngineForce = false;
TVector<TAstNode*> Statements;
ui32 ReadIndex = 0;
TViews Views;
Expand Down
21 changes: 21 additions & 0 deletions ydb/library/yql/sql/pg/pg_sql_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,4 +435,25 @@ SELECT COUNT(*) FROM public.t;");
UNIT_ASSERT_C(res.Root, "Failed to parse statement, root is nullptr");
UNIT_ASSERT_STRINGS_EQUAL(res.Root->ToString(), expectedAst.Root->ToString());
}

Y_UNIT_TEST(BlockEngine) {
auto res = PgSqlToYql("set blockEngine='auto'; select 1;");
UNIT_ASSERT(res.Root);
UNIT_ASSERT_STRING_CONTAINS(res.Root->ToString(), "(let world (Configure! world (DataSource 'config) 'BlockEngine 'auto))");

res = PgSqlToYql("set Blockengine='force'; select 1;");
UNIT_ASSERT(res.Root);
UNIT_ASSERT_STRING_CONTAINS(res.Root->ToString(), "(let world (Configure! world (DataSource 'config) 'BlockEngine 'force))");

res = PgSqlToYql("set BlockEngine='disable'; select 1;");
UNIT_ASSERT(res.Root);
UNIT_ASSERT(!res.Root->ToString().Contains("BlockEngine"));

res = PgSqlToYql("set BlockEngine='foo'; select 1;");
UNIT_ASSERT(!res.Root);
UNIT_ASSERT_EQUAL(res.Issues.Size(), 1);

auto issue = *(res.Issues.begin());
UNIT_ASSERT(issue.GetMessage().Contains("VariableSetStmt, not supported BlockEngine option value: foo"));
}
}
1 change: 1 addition & 0 deletions ydb/library/yql/sql/settings/translation_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ namespace NSQLTranslation {
, WarnOnV0(true)
, V0WarnAsError(ISqlFeaturePolicy::MakeAlwaysDisallow())
, DqDefaultAuto(ISqlFeaturePolicy::MakeAlwaysDisallow())
, BlockDefaultAuto(ISqlFeaturePolicy::MakeAlwaysDisallow())
, AssumeYdbOnClusterWithSlash(false)
{}

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/sql/settings/translation_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ namespace NSQLTranslation {
bool WarnOnV0;
ISqlFeaturePolicy::TPtr V0WarnAsError;
ISqlFeaturePolicy::TPtr DqDefaultAuto;
ISqlFeaturePolicy::TPtr BlockDefaultAuto;
bool AssumeYdbOnClusterWithSlash;
TString DynamicClusterProvider;
TString FileAliasPrefix;
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/yql/sql/v1/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ THashMap<TStringBuf, TPragmaField> CTX_PRAGMA_FIELDS = {
{"EmitAggApply", &TContext::EmitAggApply},
{"AnsiLike", &TContext::AnsiLike},
{"UseBlocks", &TContext::UseBlocks},
{"BlockEngineEnable", &TContext::BlockEngineEnable},
{"BlockEngineForce", &TContext::BlockEngineForce},
};

typedef TMaybe<bool> TContext::*TPragmaMaybeField;
Expand All @@ -84,6 +86,7 @@ TContext::TContext(const NSQLTranslation::TTranslationSettings& settings,
, HasPendingErrors(false)
, DqEngineEnable(Settings.DqDefaultAuto->Allow())
, AnsiQuotedIdentifiers(settings.AnsiLexer)
, BlockEngineEnable(Settings.BlockDefaultAuto->Allow())
{
for (auto lib : settings.Libraries) {
Libraries.emplace(lib, TLibraryStuff());
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/sql/v1/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ namespace NSQLTranslationV1 {
bool AnsiLike = false;
bool FeatureR010 = false; //Row pattern recognition: FROM clause
TMaybe<bool> CompactGroupBy;
bool BlockEngineEnable = false;
bool BlockEngineForce = false;
};

class TColumnRefScope {
Expand Down
6 changes: 6 additions & 0 deletions ydb/library/yql/sql/v1/query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2691,6 +2691,12 @@ class TYqlProgramNode: public TAstListNode {
if (ctx.UseBlocks) {
Add(Y("let", "world", Y(TString(ConfigureName), "world", configSource, BuildQuotedAtom(Pos, "UseBlocks"))));
}

if (ctx.BlockEngineEnable) {
TString mode = ctx.BlockEngineForce ? "force" : "auto";
Add(Y("let", "world", Y(TString(ConfigureName), "world", configSource,
BuildQuotedAtom(Pos, "BlockEngine"), BuildQuotedAtom(Pos, mode))));
}
}
}

Expand Down
17 changes: 10 additions & 7 deletions ydb/library/yql/sql/v1/sql_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1847,7 +1847,7 @@ TNodePtr TSqlQuery::PragmaStatement(const TRule_pragma_stmt& stmt, bool& success
} else if (normalizedPragma == "disableansiinforemptyornullableitemscollections") {
Ctx.AnsiInForEmptyOrNullableItemsCollections = false;
Ctx.IncrementMonCounter("sql_pragma", "DisableAnsiInForEmptyOrNullableItemsCollections");
} else if (normalizedPragma == "dqengine") {
} else if (normalizedPragma == "dqengine" || normalizedPragma == "blockengine") {
Ctx.IncrementMonCounter("sql_pragma", "DqEngine");
if (values.size() != 1 || !values[0].GetLiteral()
|| ! (*values[0].GetLiteral() == "disable" || *values[0].GetLiteral() == "auto" || *values[0].GetLiteral() == "force"))
Expand All @@ -1856,15 +1856,18 @@ TNodePtr TSqlQuery::PragmaStatement(const TRule_pragma_stmt& stmt, bool& success
Ctx.IncrementMonCounter("sql_errors", "BadPragmaValue");
return {};
}
const bool isDqEngine = normalizedPragma == "dqengine";
auto& enable = isDqEngine ? Ctx.DqEngineEnable : Ctx.BlockEngineEnable;
auto& force = isDqEngine ? Ctx.DqEngineForce : Ctx.BlockEngineForce;
if (*values[0].GetLiteral() == "disable") {
Ctx.DqEngineEnable = false;
Ctx.DqEngineForce = false;
enable = false;
force = false;
} else if (*values[0].GetLiteral() == "force") {
Ctx.DqEngineEnable = true;
Ctx.DqEngineForce = true;
enable = true;
force = true;
} else if (*values[0].GetLiteral() == "auto") {
Ctx.DqEngineEnable = true;
Ctx.DqEngineForce = false;
enable = true;
force = false;
}
} else if (normalizedPragma == "ansirankfornullablekeys") {
Ctx.AnsiRankForNullableKeys = true;
Expand Down
Loading
Loading