Skip to content

Commit

Permalink
[YQL-16903] Introduce BlockEngine pragma (parser/config provider part) (
Browse files Browse the repository at this point in the history
#616)

* initial

* add tests
  • Loading branch information
nepal authored Dec 21, 2023
1 parent 4009b81 commit 089fa57
Show file tree
Hide file tree
Showing 20 changed files with 135 additions and 31 deletions.
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,7 @@ class TKqpHost : public IKqpHost {
|| settingName == "DisableOrderedColumns"
|| settingName == "Warning"
|| settingName == "UseBlocks"
|| settingName == "BlockEngine"
;
};
auto configProvider = CreateConfigProvider(*TypesCtx, gatewaysConfig, {}, allowSettings);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx, TTypeA
} else if (auto maybeRead = node.Maybe<TKqpReadOlapTableRanges>()) {
auto read = maybeRead.Cast();

if (typesCtx.UseBlocks) {
if (typesCtx.IsBlockEngineEnabled()) {
wideRead = Build<TCoWideFromBlocks>(ctx, node.Pos())
.Input<TKqpBlockReadOlapTableRanges>()
.Table(read.Table())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7774,7 +7774,7 @@ THolder<IGraphTransformer> CreatePeepHoleFinalStageTransformer(TTypeAnnotationCo
pipeline.Add(
CreateFunctorTransformer(
[&types](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> IGraphTransformer::TStatus {
if (types.UseBlocks) {
if (types.IsBlockEngineEnabled()) {
const auto& extStageRules = TPeepHoleRules::Instance().BlockStageExtRules;
return PeepHoleBlockStage(input, output, ctx, types, extStageRules);
} else {
Expand All @@ -7789,7 +7789,7 @@ THolder<IGraphTransformer> CreatePeepHoleFinalStageTransformer(TTypeAnnotationCo
pipeline.Add(
CreateFunctorTransformer(
[&types](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> IGraphTransformer::TStatus {
if (types.UseBlocks) {
if (types.IsBlockEngineEnabled()) {
const auto& extStageRules = TPeepHoleRules::Instance().BlockStageExtFinalRules;
return PeepHoleBlockStage(input, output, ctx, types, extStageRules);
} else {
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/core/type_ann/type_ann_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4689,7 +4689,7 @@ namespace {
return IGraphTransformer::TStatus::Repeat;
}

if (isMany && ctx.Types.UseBlocks) {
if (isMany && ctx.Types.IsBlockEngineEnabled()) {
auto streamIndex = inputStructType->FindItem("_yql_group_stream_index");
if (streamIndex) {
const TTypeAnnotationNode* streamIndexType = inputStructType->GetItems()[*streamIndex]->GetItemType();
Expand Down
14 changes: 7 additions & 7 deletions ydb/library/yql/core/yql_aggregate_expander.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ TExprNode::TPtr TAggregateExpander::ExpandAggregate()

HaveDistinct = AnyOf(AggregatedColumns->ChildrenList(),
[](const auto& child) { return child->ChildrenSize() == 3; });
EffectiveCompact = (HaveDistinct && CompactForDistinct && !TypesCtx.UseBlocks) || ForceCompact || HasSetting(*settings, "compact");
EffectiveCompact = (HaveDistinct && CompactForDistinct && !TypesCtx.IsBlockEngineEnabled()) || ForceCompact || HasSetting(*settings, "compact");
for (const auto& trait : Traits) {
auto mergeLambda = trait->Child(5);
if (mergeLambda->Tail().IsCallable("Void")) {
Expand Down Expand Up @@ -56,7 +56,7 @@ TExprNode::TPtr TAggregateExpander::ExpandAggregate()
return GeneratePhases();
}

if (TypesCtx.UseBlocks) {
if (TypesCtx.IsBlockEngineEnabled()) {
if (Suffix == "Combine") {
auto ret = TryGenerateBlockCombine();
if (ret) {
Expand Down Expand Up @@ -2776,7 +2776,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
streams.push_back(SerializeIdxSet(indicies));
}

if (TypesCtx.UseBlocks) {
if (TypesCtx.IsBlockEngineEnabled()) {
for (ui32 i = 0; i < unionAllInputs.size(); ++i) {
unionAllInputs[i] = Ctx.Builder(Node->Pos())
.Callable("Map")
Expand All @@ -2797,7 +2797,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
}

auto settings = Node->ChildPtr(3);
if (TypesCtx.UseBlocks) {
if (TypesCtx.IsBlockEngineEnabled()) {
settings = AddSetting(*settings, Node->Pos(), "many_streams", Ctx.NewList(Node->Pos(), std::move(streams)), Ctx);
}

Expand Down Expand Up @@ -2830,7 +2830,7 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombine() {
}

TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalize() {
if (UsePartitionsByKeys || !TypesCtx.UseBlocks) {
if (UsePartitionsByKeys || !TypesCtx.IsBlockEngineEnabled()) {
return nullptr;
}

Expand Down Expand Up @@ -2919,13 +2919,13 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
TExprNode::TPtr ExpandAggregatePeephole(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
if (NNodes::TCoAggregate::Match(node.Get())) {
NNodes::TCoAggregate self(node);
auto ret = TAggregateExpander::CountAggregateRewrite(self, ctx, typesCtx.UseBlocks);
auto ret = TAggregateExpander::CountAggregateRewrite(self, ctx, typesCtx.IsBlockEngineEnabled());
if (ret != node) {
YQL_CLOG(DEBUG, Core) << "CountAggregateRewrite on peephole";
return ret;
}
}
return ExpandAggregatePeepholeImpl(node, ctx, typesCtx, false, typesCtx.UseBlocks);
return ExpandAggregatePeepholeImpl(node, ctx, typesCtx, false, typesCtx.IsBlockEngineEnabled());
}

} // namespace NYql
12 changes: 11 additions & 1 deletion ydb/library/yql/core/yql_type_annotation.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ enum class EMatchRecognizeStreamingMode {
Force,
};

enum class EBlockEngineMode {
Disable /* "disable" */,
Auto /* "auto" */,
Force /* "force" */,
};

struct TUdfCachedInfo {
const TTypeAnnotationNode* FunctionType = nullptr;
const TTypeAnnotationNode* RunConfigType = nullptr;
Expand Down Expand Up @@ -251,6 +257,7 @@ struct TTypeAnnotationContext: public TThrRefBase {
bool YsonCastToString = true;
ui32 FolderSubDirsLimit = 1000;
bool UseBlocks = false;
EBlockEngineMode BlockEngineMode = EBlockEngineMode::Disable;
bool PgEmitAggApply = false;
IArrowResolver::TPtr ArrowResolver;
ECostBasedOptimizerType CostBasedOptimizer = ECostBasedOptimizerType::Disable;
Expand Down Expand Up @@ -350,7 +357,10 @@ struct TTypeAnnotationContext: public TThrRefBase {
void SetStats(const TExprNode* input, std::shared_ptr<TOptimizerStatistics> stats) {
StatisticsMap[input] = stats;
}


bool IsBlockEngineEnabled() const {
return BlockEngineMode != EBlockEngineMode::Disable || UseBlocks;
}
};

template <> inline
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/opt/dq_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ TExprBase DqRewriteAggregate(TExprBase node, TExprContext& ctx, TTypeAnnotationC
if (!node.Maybe<TCoAggregateBase>()) {
return node;
}
TAggregateExpander aggExpander(true, !typesCtx.UseBlocks && !useFinalizeByKey, useFinalizeByKey, node.Ptr(), ctx, typesCtx, false, compactForDistinct, usePhases);
TAggregateExpander aggExpander(true, !typesCtx.IsBlockEngineEnabled() && !useFinalizeByKey, useFinalizeByKey, node.Ptr(), ctx, typesCtx, false, compactForDistinct, usePhases);
auto result = aggExpander.ExpandAggregate();
YQL_ENSURE(result);

Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/opt/dq_opt_peephole.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ NNodes::TExprBase DqPeepholeRewriteLength(const NNodes::TExprBase& node, TExprCo
}

auto dqPhyLength = node.Cast<TDqPhyLength>();
if (typesCtx.UseBlocks) {
if (typesCtx.IsBlockEngineEnabled()) {
return NNodes::TExprBase(ctx.Builder(node.Pos())
.Callable("NarrowMap")
.Callable(0, "BlockCombineAll")
Expand Down
12 changes: 12 additions & 0 deletions ydb/library/yql/providers/config/yql_config_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,18 @@ namespace {
return false;
}
}
else if (name == "BlockEngine") {
if (args.size() != 1) {
ctx.AddError(TIssue(pos, TStringBuilder() << "Expected at most 1 argument, but got " << args.size()));
return false;
}

auto arg = TString{args[0]};
if (!TryFromString(arg, Types.BlockEngineMode)) {
ctx.AddError(TIssue(pos, TStringBuilder() << "Expected `disable|auto|force', but got: " << args[0]));
return false;
}
}
else {
ctx.AddError(TIssue(pos, TStringBuilder() << "Unsupported command: " << name));
return false;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/dq/opt/logical_optimize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
auto input = aggregate.Input().Maybe<TDqConnection>();

if (input) {
auto newNode = TAggregateExpander::CountAggregateRewrite(aggregate, ctx, TypesCtx.UseBlocks);
auto newNode = TAggregateExpander::CountAggregateRewrite(aggregate, ctx, TypesCtx.IsBlockEngineEnabled());
if (node.Ptr() != newNode) {
return TExprBase(newNode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2512,7 +2512,7 @@ class TYtLogicalOptProposalTransformer : public TOptimizeTransformerBase {
return node;
}

return TAggregateExpander::CountAggregateRewrite(aggregate, ctx, State_->Types->UseBlocks);
return TAggregateExpander::CountAggregateRewrite(aggregate, ctx, State_->Types->IsBlockEngineEnabled());
}

TMaybeNode<TExprBase> ZeroSampleToZeroLimit(TExprBase node, TExprContext& ctx) const {
Expand Down
35 changes: 27 additions & 8 deletions ydb/library/yql/sql/pg/pg_sql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ class TConverter : public IPGParseEvents {
: AstParseResult(astParseResult)
, Settings(settings)
, DqEngineEnabled(Settings.DqDefaultAuto->Allow())
, BlockEngineEnabled(Settings.BlockDefaultAuto->Allow())
{
Positions.push_back({});
ScanRows(query);
Expand All @@ -281,6 +282,10 @@ class TConverter : public IPGParseEvents {
DqEngineEnabled = true;
} else if (flag == "DqEngineForce") {
DqEngineForce = true;
} else if (flag == "BlockEngineEnable") {
BlockEngineEnabled = true;
} else if (flag == "BlockEngineForce") {
BlockEngineForce = true;
}
}

Expand Down Expand Up @@ -320,6 +325,8 @@ class TConverter : public IPGParseEvents {
Statements.push_back(L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource,
QA("OrderedColumns"))));

ui32 blockEnginePgmPos = Statements.size();
Statements.push_back(configSource);
ui32 costBasedOptimizerPos = Statements.size();
Statements.push_back(configSource);
ui32 dqEnginePgmPos = Statements.size();
Expand Down Expand Up @@ -359,6 +366,13 @@ class TConverter : public IPGParseEvents {
Statements.erase(Statements.begin() + costBasedOptimizerPos);
}

if (BlockEngineEnabled) {
Statements[blockEnginePgmPos] = L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource,
QA("BlockEngine"), QA(BlockEngineForce ? "force" : "auto")));
} else {
Statements.erase(Statements.begin() + blockEnginePgmPos);
}

return VL(Statements.data(), Statements.size());
}

Expand Down Expand Up @@ -2001,7 +2015,7 @@ class TConverter : public IPGParseEvents {
AddError(TStringBuilder() << "VariableSetStmt, expected string literal for " << value->name << " option");
return nullptr;
}
} else if (name == "dqengine") {
} else if (name == "dqengine" || name == "blockengine") {
if (ListLength(value->args) != 1) {
AddError(TStringBuilder() << "VariableSetStmt, expected 1 arg, but got: " << ListLength(value->args));
return nullptr;
Expand All @@ -2011,17 +2025,20 @@ class TConverter : public IPGParseEvents {
if (NodeTag(arg) == T_A_Const && (NodeTag(CAST_NODE(A_Const, arg)->val) == T_String)) {
auto rawStr = StrVal(CAST_NODE(A_Const, arg)->val);
auto str = to_lower(TString(rawStr));
const bool isDqEngine = name == "dqengine";
auto& enable = isDqEngine ? DqEngineEnabled : BlockEngineEnabled;
auto& force = isDqEngine ? DqEngineForce : BlockEngineForce;
if (str == "auto") {
DqEngineEnabled = true;
DqEngineForce = false;
enable = true;
force = false;
} else if (str == "force") {
DqEngineEnabled = true;
DqEngineForce = true;
enable = true;
force = true;
} else if (str == "disable") {
DqEngineEnabled = false;
DqEngineForce = false;
enable = false;
force = false;
} else {
AddError(TStringBuilder() << "VariableSetStmt, not supported DqEngine option value: " << rawStr);
AddError(TStringBuilder() << "VariableSetStmt, not supported " << value->name << " option value: " << rawStr);
return nullptr;
}
} else {
Expand Down Expand Up @@ -4247,6 +4264,8 @@ class TConverter : public IPGParseEvents {
bool DqEngineEnabled = false;
bool DqEngineForce = false;
TString CostBasedOptimizer;
bool BlockEngineEnabled = false;
bool BlockEngineForce = false;
TVector<TAstNode*> Statements;
ui32 ReadIndex = 0;
TViews Views;
Expand Down
21 changes: 21 additions & 0 deletions ydb/library/yql/sql/pg/pg_sql_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,4 +435,25 @@ SELECT COUNT(*) FROM public.t;");
UNIT_ASSERT_C(res.Root, "Failed to parse statement, root is nullptr");
UNIT_ASSERT_STRINGS_EQUAL(res.Root->ToString(), expectedAst.Root->ToString());
}

Y_UNIT_TEST(BlockEngine) {
auto res = PgSqlToYql("set blockEngine='auto'; select 1;");
UNIT_ASSERT(res.Root);
UNIT_ASSERT_STRING_CONTAINS(res.Root->ToString(), "(let world (Configure! world (DataSource 'config) 'BlockEngine 'auto))");

res = PgSqlToYql("set Blockengine='force'; select 1;");
UNIT_ASSERT(res.Root);
UNIT_ASSERT_STRING_CONTAINS(res.Root->ToString(), "(let world (Configure! world (DataSource 'config) 'BlockEngine 'force))");

res = PgSqlToYql("set BlockEngine='disable'; select 1;");
UNIT_ASSERT(res.Root);
UNIT_ASSERT(!res.Root->ToString().Contains("BlockEngine"));

res = PgSqlToYql("set BlockEngine='foo'; select 1;");
UNIT_ASSERT(!res.Root);
UNIT_ASSERT_EQUAL(res.Issues.Size(), 1);

auto issue = *(res.Issues.begin());
UNIT_ASSERT(issue.GetMessage().Contains("VariableSetStmt, not supported BlockEngine option value: foo"));
}
}
1 change: 1 addition & 0 deletions ydb/library/yql/sql/settings/translation_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ namespace NSQLTranslation {
, WarnOnV0(true)
, V0WarnAsError(ISqlFeaturePolicy::MakeAlwaysDisallow())
, DqDefaultAuto(ISqlFeaturePolicy::MakeAlwaysDisallow())
, BlockDefaultAuto(ISqlFeaturePolicy::MakeAlwaysDisallow())
, AssumeYdbOnClusterWithSlash(false)
{}

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/sql/settings/translation_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ namespace NSQLTranslation {
bool WarnOnV0;
ISqlFeaturePolicy::TPtr V0WarnAsError;
ISqlFeaturePolicy::TPtr DqDefaultAuto;
ISqlFeaturePolicy::TPtr BlockDefaultAuto;
bool AssumeYdbOnClusterWithSlash;
TString DynamicClusterProvider;
TString FileAliasPrefix;
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/yql/sql/v1/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ THashMap<TStringBuf, TPragmaField> CTX_PRAGMA_FIELDS = {
{"EmitAggApply", &TContext::EmitAggApply},
{"AnsiLike", &TContext::AnsiLike},
{"UseBlocks", &TContext::UseBlocks},
{"BlockEngineEnable", &TContext::BlockEngineEnable},
{"BlockEngineForce", &TContext::BlockEngineForce},
};

typedef TMaybe<bool> TContext::*TPragmaMaybeField;
Expand All @@ -84,6 +86,7 @@ TContext::TContext(const NSQLTranslation::TTranslationSettings& settings,
, HasPendingErrors(false)
, DqEngineEnable(Settings.DqDefaultAuto->Allow())
, AnsiQuotedIdentifiers(settings.AnsiLexer)
, BlockEngineEnable(Settings.BlockDefaultAuto->Allow())
{
for (auto lib : settings.Libraries) {
Libraries.emplace(lib, TLibraryStuff());
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/sql/v1/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ namespace NSQLTranslationV1 {
bool AnsiLike = false;
bool FeatureR010 = false; //Row pattern recognition: FROM clause
TMaybe<bool> CompactGroupBy;
bool BlockEngineEnable = false;
bool BlockEngineForce = false;
};

class TColumnRefScope {
Expand Down
6 changes: 6 additions & 0 deletions ydb/library/yql/sql/v1/query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2691,6 +2691,12 @@ class TYqlProgramNode: public TAstListNode {
if (ctx.UseBlocks) {
Add(Y("let", "world", Y(TString(ConfigureName), "world", configSource, BuildQuotedAtom(Pos, "UseBlocks"))));
}

if (ctx.BlockEngineEnable) {
TString mode = ctx.BlockEngineForce ? "force" : "auto";
Add(Y("let", "world", Y(TString(ConfigureName), "world", configSource,
BuildQuotedAtom(Pos, "BlockEngine"), BuildQuotedAtom(Pos, mode))));
}
}
}

Expand Down
17 changes: 10 additions & 7 deletions ydb/library/yql/sql/v1/sql_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1847,7 +1847,7 @@ TNodePtr TSqlQuery::PragmaStatement(const TRule_pragma_stmt& stmt, bool& success
} else if (normalizedPragma == "disableansiinforemptyornullableitemscollections") {
Ctx.AnsiInForEmptyOrNullableItemsCollections = false;
Ctx.IncrementMonCounter("sql_pragma", "DisableAnsiInForEmptyOrNullableItemsCollections");
} else if (normalizedPragma == "dqengine") {
} else if (normalizedPragma == "dqengine" || normalizedPragma == "blockengine") {
Ctx.IncrementMonCounter("sql_pragma", "DqEngine");
if (values.size() != 1 || !values[0].GetLiteral()
|| ! (*values[0].GetLiteral() == "disable" || *values[0].GetLiteral() == "auto" || *values[0].GetLiteral() == "force"))
Expand All @@ -1856,15 +1856,18 @@ TNodePtr TSqlQuery::PragmaStatement(const TRule_pragma_stmt& stmt, bool& success
Ctx.IncrementMonCounter("sql_errors", "BadPragmaValue");
return {};
}
const bool isDqEngine = normalizedPragma == "dqengine";
auto& enable = isDqEngine ? Ctx.DqEngineEnable : Ctx.BlockEngineEnable;
auto& force = isDqEngine ? Ctx.DqEngineForce : Ctx.BlockEngineForce;
if (*values[0].GetLiteral() == "disable") {
Ctx.DqEngineEnable = false;
Ctx.DqEngineForce = false;
enable = false;
force = false;
} else if (*values[0].GetLiteral() == "force") {
Ctx.DqEngineEnable = true;
Ctx.DqEngineForce = true;
enable = true;
force = true;
} else if (*values[0].GetLiteral() == "auto") {
Ctx.DqEngineEnable = true;
Ctx.DqEngineForce = false;
enable = true;
force = false;
}
} else if (normalizedPragma == "ansirankfornullablekeys") {
Ctx.AnsiRankForNullableKeys = true;
Expand Down
Loading

0 comments on commit 089fa57

Please sign in to comment.