From 1fb7894421c3b6175a17e4507c3d3d4c2164e876 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Sat, 27 Apr 2024 10:36:55 +0000 Subject: [PATCH] Fixed yt over kqp tests --- .../external_source_factory.cpp | 2 +- ydb/core/kqp/common/kqp_yql.cpp | 17 ++ ydb/core/kqp/common/kqp_yql.h | 3 + ydb/core/kqp/common/ya.make | 1 + ydb/core/kqp/host/kqp_runner.cpp | 6 + .../kqp/provider/yql_kikimr_opt_build.cpp | 34 ++- .../kqp/query_compiler/kqp_mkql_compiler.cpp | 14 +- .../connector/tests/utils/run/kqprun.py | 2 +- .../yt/provider/yql_yt_dq_integration.cpp | 31 ++- .../yt/provider/yql_yt_physical_optimize.cpp | 2 +- ydb/tests/fq/yt/cfg/kqprun_scheme.sql | 5 +- ydb/tests/fq/yt/kqp_yt_file.make | 1 + ydb/tests/fq/yt/kqp_yt_file.py | 258 +++++++----------- ydb/tests/fq/yt/kqprun.py | 4 + ydb/tests/tools/kqprun/kqprun.cpp | 9 +- ydb/tests/tools/kqprun/src/common.h | 1 + ydb/tests/tools/kqprun/src/kqp_runner.cpp | 34 ++- ydb/tests/tools/kqprun/src/kqp_runner.h | 2 +- 18 files changed, 216 insertions(+), 210 deletions(-) diff --git a/ydb/core/external_sources/external_source_factory.cpp b/ydb/core/external_sources/external_source_factory.cpp index 98e4514f0242..cda064ff5654 100644 --- a/ydb/core/external_sources/external_source_factory.cpp +++ b/ydb/core/external_sources/external_source_factory.cpp @@ -53,7 +53,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector GetUniqueIntegrations(TTypeAnnotationContext& typesCtx) { + std::unordered_set uniqueIntegrations; + for (const auto& provider : typesCtx.DataSources) { + if (auto* dqIntegration = provider->GetDqIntegration()) { + uniqueIntegrations.emplace(dqIntegration); + } + } + + for (const auto& provider : typesCtx.DataSinks) { + if (auto* dqIntegration = provider->GetDqIntegration()) { + uniqueIntegrations.emplace(dqIntegration); + } + } + + return uniqueIntegrations; +} + } // namespace NYql diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h index fea469d2ac9a..ecf79cc75833 100644 --- a/ydb/core/kqp/common/kqp_yql.h +++ b/ydb/core/kqp/common/kqp_yql.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace NYql { @@ -124,4 +125,6 @@ TString KqpExprToPrettyString(const NNodes::TExprBase& expr, TExprContext& ctx); TString PrintKqpStageOnly(const NNodes::TDqStageBase& stage, TExprContext& ctx); +std::unordered_set GetUniqueIntegrations(TTypeAnnotationContext& typesCtx); + } // namespace NYql diff --git a/ydb/core/kqp/common/ya.make b/ydb/core/kqp/common/ya.make index bd5d07580c24..4cd5a8b9becc 100644 --- a/ydb/core/kqp/common/ya.make +++ b/ydb/core/kqp/common/ya.make @@ -38,6 +38,7 @@ PEERDIR( ydb/library/yql/core/issue ydb/library/yql/dq/actors ydb/library/yql/dq/common + ydb/library/yql/dq/integration ydb/library/yql/parser/pg_wrapper/interface ydb/public/lib/operation_id ydb/public/lib/operation_id/protos diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index 2126799d691f..52b85126900a 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -307,11 +307,17 @@ class TKqpRunner : public IKqpRunner { .Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics") .Build(false); + auto dqIntegrationPeephole = TTransformationPipeline(typesCtx); + for (auto* dqIntegration : GetUniqueIntegrations(*typesCtx)) { + dqIntegration->ConfigurePeepholePipeline(true, {}, &dqIntegrationPeephole); + } + auto physicalPeepholeTransformer = TTransformationPipeline(typesCtx) .AddServiceTransformers() .Add(Log("PhysicalPeephole"), "LogPhysicalPeephole") .AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config)) .AddPostTypeAnnotation() + .Add(dqIntegrationPeephole.Build(), "DqIntegrationPeephole") .Add( CreateKqpTxsPeepholeTransformer( CreateTypeAnnotationTransformer( diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp index 77270799f7ae..2d2f40082d6a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp @@ -91,6 +91,7 @@ struct TKiExploreTxResults { TVector Sync; TVector QueryBlocks; bool HasExecute; + bool HasErrors; THashSet GetSyncSet() const { THashSet syncSet; @@ -280,10 +281,11 @@ struct TKiExploreTxResults { } TKiExploreTxResults() - : HasExecute(false) {} + : HasExecute(false) + , HasErrors(false) {} }; -bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types, bool estimateReadSize = true) { +bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types, bool estimateReadSize, bool* hasErrors = nullptr) { if (node.Ref().ChildrenSize() <= 1) { return false; } @@ -294,13 +296,21 @@ bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& auto dataSourceProviderIt = types.DataSourceMap.find(dataSourceCategory); if (dataSourceProviderIt != types.DataSourceMap.end()) { if (auto* dqIntegration = dataSourceProviderIt->second->GetDqIntegration()) { - if (dqIntegration->CanRead(*node.Ptr(), ctx) && - (!estimateReadSize || dqIntegration->EstimateReadSize( - TDqSettings::TDefault::DataSizePerJob, - TDqSettings::TDefault::MaxTasksPerStage, - {node.Raw()}, - ctx))) { - return true; + try { + if (dqIntegration->CanRead(*node.Ptr(), ctx) && + (!estimateReadSize || dqIntegration->EstimateReadSize( + TDqSettings::TDefault::DataSizePerJob, + TDqSettings::TDefault::MaxTasksPerStage, + {node.Raw()}, + ctx))) { + return true; + } + } catch (const TFallbackError& error) { + TString message = TStringBuilder() << "Cannot execute read through DQ integration. Cause: " << error.what(); + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), message)); + if (hasErrors) { + *hasErrors = true; + } } } } @@ -388,7 +398,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T return result; } - if (IsDqRead(node, ctx, types)) { + if (IsDqRead(node, ctx, types, true, &txRes.HasErrors)) { txRes.Ops.insert(node.Raw()); TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0); return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types); @@ -849,8 +859,8 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr CreateKqlCompiler(const TKqlCompileContext& return TRuntimeNode(); }); - std::unordered_set usedProviders; - for (const auto& provider : typesCtx.DataSources) { - if (auto* dqIntegration = provider->GetDqIntegration()) { - dqIntegration->RegisterMkqlCompiler(*compiler); - usedProviders.emplace(provider->GetName()); - } - } - - for (const auto& provider : typesCtx.DataSinks) { - if (auto* dqIntegration = provider->GetDqIntegration(); dqIntegration && !usedProviders.contains(TString(provider->GetName()))) { - dqIntegration->RegisterMkqlCompiler(*compiler); - } + for (auto* dqIntegration : GetUniqueIntegrations(typesCtx)) { + dqIntegration->RegisterMkqlCompiler(*compiler); } compiler->AddCallable(TKqpWideReadTable::CallableName(), diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/run/kqprun.py b/ydb/library/yql/providers/generic/connector/tests/utils/run/kqprun.py index 2b6d7a47d91c..7ebbb90307d6 100644 --- a/ydb/library/yql/providers/generic/connector/tests/utils/run/kqprun.py +++ b/ydb/library/yql/providers/generic/connector/tests/utils/run/kqprun.py @@ -191,7 +191,7 @@ def run(self, test_name: str, script: str, generic_settings: GenericSettings) -> result_path = artifacts.make_path(test_name=test_name, artifact_name='result.json') # For debug add option --trace-opt to args - cmd = f'{self.kqprun_path} -s {scheme_path} -p {script_path} --app-config={app_conf_path} --result-file={result_path} --result-format=full' + cmd = f'{self.kqprun_path} -s {scheme_path} -p {script_path} --app-config={app_conf_path} --result-file={result_path} --result-format=full-json' output = None data_out = None diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp index 87cde0e90a34..2ba007caa2f8 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -242,13 +242,16 @@ class TYtDqIntegration: public TDqIntegrationBase { return maxDataSizePerJob; } - void AddInfo(TExprContext& ctx, const TString& message, bool skipIssues) { + void AddInfo(TExprContext& ctx, const TString& message, bool skipIssues, bool throwException = false) { if (!skipIssues) { YQL_CLOG(INFO, ProviderDq) << message; TIssue info("DQ cannot execute the query. Cause: " + message); info.Severity = TSeverityIds::S_INFO; ctx.IssueManager.RaiseIssue(info); } + if (throwException) { + throw TFallbackError() << message; + } } bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues) override { @@ -293,7 +296,7 @@ class TYtDqIntegration: public TDqIntegrationBase { } else if (auto maybeRead = TMaybeNode(&node)) { auto cluster = maybeRead.Cast().DataSource().Cluster().StringValue(); if (!State_->Configuration->_EnableDq.Get(cluster).GetOrElse(true)) { - AddInfo(ctx, TStringBuilder() << "disabled for cluster " << cluster, skipIssues); + AddInfo(ctx, TStringBuilder() << "disabled for cluster " << cluster, skipIssues, State_->PassiveExecution); return false; } const auto canUseYtPartitioningApi = State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false); @@ -309,45 +312,45 @@ class TYtDqIntegration: public TDqIntegrationBase { } } } - AddInfo(ctx, info, skipIssues); + AddInfo(ctx, info, skipIssues, State_->PassiveExecution); return false; } auto sampleSetting = GetSetting(section.Settings().Ref(), EYtSettingType::Sample); if (sampleSetting && sampleSetting->Child(1)->Child(0)->Content() == "system") { - AddInfo(ctx, "system sampling", skipIssues); + AddInfo(ctx, "system sampling", skipIssues, State_->PassiveExecution); return false; } for (auto path: section.Paths()) { if (!path.Table().Maybe()) { - AddInfo(ctx, "non-table path", skipIssues); + AddInfo(ctx, "non-table path", skipIssues, State_->PassiveExecution); return false; } else { auto pathInfo = TYtPathInfo(path); auto tableInfo = pathInfo.Table; auto epoch = TEpochInfo::Parse(path.Table().Maybe().CommitEpoch().Ref()); if (!tableInfo->Stat) { - AddInfo(ctx, "table without statistics", skipIssues); + AddInfo(ctx, "table without statistics", skipIssues, State_->PassiveExecution); return false; } else if (!tableInfo->RowSpec) { - AddInfo(ctx, "table without row spec", skipIssues); + AddInfo(ctx, "table without row spec", skipIssues, State_->PassiveExecution); return false; } else if (!tableInfo->Meta) { - AddInfo(ctx, "table without meta", skipIssues); + AddInfo(ctx, "table without meta", skipIssues, State_->PassiveExecution); return false; } else if (tableInfo->IsAnonymous) { - AddInfo(ctx, "anonymous table", skipIssues); + AddInfo(ctx, "anonymous table", skipIssues, State_->PassiveExecution); return false; } else if ((!epoch.Empty() && *epoch.Get() > 0)) { - AddInfo(ctx, "table with non-empty epoch", skipIssues); + AddInfo(ctx, "table with non-empty epoch", skipIssues, State_->PassiveExecution); return false; } else if (NYql::HasSetting(tableInfo->Settings.Ref(), EYtSettingType::WithQB)) { - AddInfo(ctx, "table with QB2 premapper", skipIssues); + AddInfo(ctx, "table with QB2 premapper", skipIssues, State_->PassiveExecution); return false; } else if (pathInfo.Ranges && !canUseYtPartitioningApi) { - AddInfo(ctx, "table with ranges", skipIssues); + AddInfo(ctx, "table with ranges", skipIssues, State_->PassiveExecution); return false; } else if (tableInfo->Meta->IsDynamic && !canUseYtPartitioningApi) { - AddInfo(ctx, "dynamic table", skipIssues); + AddInfo(ctx, "dynamic table", skipIssues, State_->PassiveExecution); return false; } @@ -356,7 +359,7 @@ class TYtDqIntegration: public TDqIntegrationBase { } } if (auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); chunksCount > maxChunks) { - AddInfo(ctx, "table with too many chunks", skipIssues); + AddInfo(ctx, "table with too many chunks", skipIssues, State_->PassiveExecution); return false; } return true; diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp index f3c46b908097..117bbf315e72 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp @@ -3487,7 +3487,7 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase { return node; } - auto count = CleanupWorld(countBase.Count(), ctx); + auto count = State_->PassiveExecution ? countBase.Count() : CleanupWorld(countBase.Count(), ctx); if (!count) { return {}; } diff --git a/ydb/tests/fq/yt/cfg/kqprun_scheme.sql b/ydb/tests/fq/yt/cfg/kqprun_scheme.sql index 730292a3ab8e..7d14adc72d36 100644 --- a/ydb/tests/fq/yt/cfg/kqprun_scheme.sql +++ b/ydb/tests/fq/yt/cfg/kqprun_scheme.sql @@ -1,8 +1,5 @@ -CREATE OBJECT yt_token (TYPE SECRET) WITH (value = "token"); - CREATE EXTERNAL DATA SOURCE plato WITH ( SOURCE_TYPE="YT", LOCATION="localhost", - AUTH_METHOD="TOKEN", - TOKEN_SECRET_NAME="yt_token" + AUTH_METHOD="NONE" ); diff --git a/ydb/tests/fq/yt/kqp_yt_file.make b/ydb/tests/fq/yt/kqp_yt_file.make index d8ba0b7bddb4..349139fe0446 100644 --- a/ydb/tests/fq/yt/kqp_yt_file.make +++ b/ydb/tests/fq/yt/kqp_yt_file.make @@ -16,6 +16,7 @@ ENDIF() DEPENDS( ydb/library/yql/tests/common/test_framework/udfs_deps + ydb/library/yql/udfs/test/test_import ydb/tests/tools/kqprun ) diff --git a/ydb/tests/fq/yt/kqp_yt_file.py b/ydb/tests/fq/yt/kqp_yt_file.py index f6af177c44e6..04fd73c23da0 100644 --- a/ydb/tests/fq/yt/kqp_yt_file.py +++ b/ydb/tests/fq/yt/kqp_yt_file.py @@ -1,99 +1,67 @@ -import codecs -import os - import pytest -import yatest.common from file_common import check_provider, get_sql_query from kqprun import KqpRun -from utils import DATA_PATH, get_config -from yql_utils import KSV_ATTR, do_custom_query_check, get_tables, is_xfail, yql_binary_path +from utils import DATA_PATH, get_config, get_parameters_files +from yql_utils import KSV_ATTR, get_files, get_http_files, get_tables, is_xfail, yql_binary_path EXCLUDED_SUITES = [ - 'bigdate', # Many unsupported types - 'match_recognize', # MATCH_RECOGNIZE is disabled - 'params', # Params is not supported in KqpRun - 'pg', # Not fully supported - 'pg-tpcds', # Not fully supported - 'pg-tpch', # Not fully supported - 'pg_catalog', # Not fully supported - 'produce', # Variant cast errors - 'schema', # Not fully supported - 'simple_columns', # Peephole optimization failed for KQP transaction - 'type_literal', # Not supported - 'view', # Not fully supported + 'match_recognize', # MATCH_RECOGNIZE is disabled in KQP ] EXCLUDED_TESTS = [ - 'action/eval_asatom', # ATOM evaluation is not supported in YDB queries - 'action/eval_astagged', # ATOM evaluation is not supported in YDB queries - 'action/eval_capture', # ATOM evaluation is not supported in YDB queries - 'action/eval_ensuretype', # ATOM evaluation is not supported in YDB queries - 'action/eval_extract', # ATOM evaluation is not supported in YDB queries - 'action/eval_pragma', # ATOM evaluation is not supported in YDB queries - 'action/eval_resourcetype', # ATOM evaluation is not supported in YDB queries - 'action/eval_result_label', # ATOM evaluation is not supported in YDB queries - 'action/eval_taggedtype', # ATOM evaluation is not supported in YDB queries - 'action/runtime_for_select', # INTERNAL_ERROR - - 'blocks/pg_from_dates', # Unsupported primitive type: Date32 - - 'expr/as_dict_implicit_cast', # Unsupported type kind: Void - 'expr/as_table_emptylist2', # Expected list type, but got: EmptyList - 'expr/cast_variant', # Unsupported type kind: Variant - 'expr/dict_common_type', # Unsupported type kind: Void - 'expr/lds_literal', # Unsupported type kind: Void - 'expr/replace_member', # ATOM evaluation is not supported in YDB queries - 'expr/struct_builtins', # ATOM evaluation is not supported in YDB queries - 'expr/struct_literal', # ATOM evaluation is not supported in YDB queries - - 'flatten_by/flatten_mode', # INTERNAL_ERROR - - 'flexible_types/struct_literals_vs_columns', # ATOM evaluation is not supported in YDB queries - - 'hor_join/group_yamr', # INTERNAL_ERROR - 'hor_join/max_outtables', # Failed to build query results - 'hor_join/skip_yamr', # INTERNAL_ERROR - 'hor_join/sorted_out_mix', # Failed to build query results - 'hor_join/less_outs', # Failed to build query results - - 'in/in_ansi_join', # INTERNAL_ERROR - 'in/in_scalar_vector_subquery', # Peephole optimization failed for KQP transaction - 'in/in_tablesource_on_raw_list', # Failed to build query results - 'in/in_tuple_table', # Cannot find table 'Root/Plato.[InputWithTuples]' - - 'join/convert_check_key_mem', # FAULT - 'join/inmem_by_uncomparable_structs', # Peephole optimization failed for KQP transaction - 'join/inmem_by_uncomparable_tuples', # Peephole optimization failed for KQP transaction - 'join/inmem_with_null_key', # Error: Not comparable keys: a.a and b.a, Null != Null - 'join/inmem_with_set_key', # FAULT - 'join/inmem_with_set_key_any', # FAULT - 'join/join_comp_inmem', # Peephole optimization failed for KQP transaction - 'join/nopushdown_filter_with_depends_on', # Invalid YSON - 'join/mapjoin_sharded', # Test uses variables to set different pragma values for yt_local run in arcadia - - 'limit/dynamic_limit', # Missed callable: YtTableContent - 'limit/empty_read_after_limit', # INTERNAL_ERROR - - 'optimizers/flatmap_with_non_struct_out', # Failed to build query results - 'optimizers/yt_shuffle_by_keys', # Failed to build query results - - 'select/tablepathprefix', # ATOM evaluation is not supported in YDB queries - - 'ypath/direct_read_from_dynamic', # INTERNAL_ERROR -] - -EXCLUDED_CANONIZATION = [ - 'datetime/current_date', - 'expr/common_type_for_resource_and_data', - 'expr/current_tz', - 'optimizers/yql-10042_disable_flow_fuse_depends_on', - 'optimizers/yql-10042_disable_fuse_depends_on', - 'optimizers/yql-10074_dont_inline_lists_depends_on', - 'union/union_column_extention', - 'union/union_mix', - 'union_all/union_all_with_limits', - 'weak_field/weak_field', + # FAULT, CalcHash(): requirement false failed, YQ-3139 + 'pg/pg_types_dict', + + # FAULT, requirement !HasNullInKey(key1) failed, YQ-3141 + 'join/convert_check_key_mem', + 'join/inmem_with_set_key', + + # INTERNAL_ERROR, Visit(): requirement stagePlanNode.StageProto failed, YQ-3137 + 'action/runtime_for_select', + 'pg_catalog/pg_set_config', + + # INTERNAL_ERROR, Peephole optimization failed, YQ-3138 + 'flatten_by/flatten_mode', + 'in/in_scalar_vector_subquery', + 'join/inmem_with_set_key_any', + + # INTERNAL_ERROR, Cannot cast type Variant to Struct, YQ-3173 + 'produce/reduce_multi_in', + 'produce/reduce_multi_in_difftype', + 'produce/reduce_multi_in_keytuple', + 'produce/reduce_multi_in_keytuple_difftype', + 'produce/reduce_multi_in_presort', + 'produce/reduce_multi_in_ref', + + # GENERIC_ERROR, Expected set on right side but got Dict, YQ-3140 + 'in/in_tuple_table', + 'join/inmem_by_uncomparable_structs', + 'join/inmem_by_uncomparable_tuples', + 'join/join_comp_inmem', + 'pg/sublink_having_in', + 'pg/sublink_columns_in_test_expr_columns', + + # GENERIC_ERROR, JOIN with null type key, YQ-3142 + 'join/inmem_with_null_key', + + # GENERIC_ERROR, Failed to build query results, YQ-3149 + 'hor_join/max_outtables', + 'hor_join/sorted_out_mix', + 'hor_join/less_outs', + 'in/in_tablesource_on_raw_list', + 'optimizers/flatmap_with_non_struct_out', + 'optimizers/yt_shuffle_by_keys', + + # GENERIC_ERROR, Mismatch dict key types: Int64 and Optional, YQ-3164 + 'simple_columns/simple_columns_join_coalesce_all_1', + 'simple_columns/simple_columns_join_coalesce_all_2', + 'simple_columns/simple_columns_join_coalesce_bug8923', + 'simple_columns/simple_columns_join_coalesce_qualified_all_disable', + 'simple_columns/simple_columns_join_coalesce_qualified_all_enable', + + # PRECONDITION_FAILED, Unexpected flow status, YQ-3174 + 'produce/reduce_typeinfo', ] @@ -105,60 +73,45 @@ def contains_insert(sql_query): def validate_sql(sql_query): # Unsupported constructions if 'define subquery' in sql_query.lower(): - pytest.skip('SUBQUERY is not supported in KQP') - - if contains_insert(sql_query): - pytest.skip('INSERT is not supported in KQP') - - if 'discard' in sql_query.lower(): - pytest.skip('DISCARD is not supported in KQP') + pytest.skip('Using of system \'kikimr\' is not allowed in SUBQUERY') - if 'evaluate' in sql_query.lower(): - pytest.skip('EVALUATE is not supported in KQP') + if 'concat(' in sql_query.lower() or 'each(' in sql_query.lower(): + pytest.skip('CONCAT is not supported on Kikimr clusters') - if 'concat(' in sql_query.lower(): - pytest.skip('CONCAT is not supported in KQP') + if 'range(' in sql_query.lower() or 'regexp(' in sql_query.lower() or 'filter(' in sql_query.lower() or 'like(' in sql_query.lower() or '_strict(' in sql_query.lower(): + pytest.skip('RANGE is not supported on Kikimr clusters') - if '.range(' in sql_query.lower() or ' range(' in sql_query.lower(): - pytest.skip('RANGE is not supported in KQP') + if 'discard' in sql_query.lower(): + pytest.skip('DISCARD not supported in YDB queries') - if ' each(' in sql_query.lower(): - pytest.skip('EACH is not supported in KQP') + if 'commit' in sql_query.lower(): + pytest.skip('COMMIT not supported inside YDB query') if 'drop table' in sql_query.lower(): - pytest.skip('DROP TABLE is not supported in KQP for extarnal entities') + pytest.skip('DROP TABLE is not supported for extarnal entities') - if 'sample ' in sql_query.lower() or 'sample(' in sql_query.lower(): - pytest.skip('SAMPLE is not supported in KQP') + if 'yt:' in sql_query.lower(): + pytest.skip('Explicit data source declaration is not supported for external entities') - if 'count(' in sql_query.lower(): - pytest.skip('COUNT is not supported in KQP') + if contains_insert(sql_query): + pytest.skip('INSERT is not supported for external data source YT') # Unsupported functions if 'TableName(' in sql_query: pytest.skip('TableName is not supported in KQP') - if 'QuoteCode(' in sql_query: - pytest.skip('QuoteCode is not supported in KQP') - - if 'RangeComputeFor(' in sql_query: + if 'RangeComputeFor(' in sql_query: # INTERNAL_ERROR if used, YQ-3134 pytest.skip('RangeComputeFor is not supported in KQP') - if 'FromBytes(' in sql_query: - pytest.skip('FromBytes is not supported in KQP') - if 'folder(' in sql_query.lower(): pytest.skip('Folder is not supported in KQP') - if 'file(' in sql_query.lower() or 'FileContent(' in sql_query: - pytest.skip('Files is not supported in KQP') + if 'library(' in sql_query.lower() or 'file(' in sql_query.lower() or 'filecontent(' in sql_query.lower(): + pytest.skip('Attaching files and libraries is not supported in KQP') # Unsupported pragmas - if 'library(' in sql_query.lower(): - pytest.skip('Pragma Library is not supported in KQP') - if 'refselect' in sql_query.lower(): - pytest.skip('Pragma RefSelect is not supported in KQP') + pytest.skip('RefSelect mode isn\'t supported by provider: kikimr') if 'optimizerflags' in sql_query.lower(): pytest.skip('Pragma OptimizerFlags is not supported in KQP') @@ -169,30 +122,34 @@ def validate_sql(sql_query): if 'costbasedoptimizer' in sql_query.lower(): pytest.skip('Pragma CostBasedOptimizer is not supported in KQP') + if 'emitaggapply' in sql_query.lower(): + pytest.skip('Pragma EmitAggApply is not supported in KQP') + + if 'validateudf' in sql_query.lower(): + pytest.skip('Pragma ValidateUdf is not supported in KQP') + if 'pragma dq.' in sql_query.lower(): pytest.skip('DQ pragmas is not supported in KQP') - # Unsupported types - if 'date32' in sql_query.lower(): - pytest.skip('Type Date32 is not supported in KQP') + if 'direct_read' in sql_query.lower() or 'directread' in sql_query.lower(): + pytest.skip('Pragma DirectRead is not supported for external data source YT') - if 'datetime64' in sql_query.lower(): - pytest.skip('Type Datetime64 is not supported in KQP') + # Unsupported cases + if 'as_table([])' in sql_query.lower(): + pytest.skip('AS_TABLE from empty list is not supported in KQP') - if 'timestamp64' in sql_query.lower(): - pytest.skip('Type Timestamp64 is not supported in KQP') + if '--!syntax_pg' in sql_query and 'plato.' in sql_query.lower(): + pytest.skip('Dynamic cluster declaration is not supported in pg syntax') - if 'interval64' in sql_query.lower(): - pytest.skip('Type Interval64 is not supported in KQP') + if '--!syntax_pg' not in sql_query and 'pg_catalog.' in sql_query.lower(): + pytest.skip('Cluster pg_catalog supported only in pg syntax due to common KQP table path prefix') - if 'interval64' in sql_query.lower(): - pytest.skip('Type Interval64 is not supported in KQP') + if sql_query.count('plato') != sql_query.lower().count('plato'): + pytest.skip('External data source name are case sensitive') - if 'void(' in sql_query.lower(): - pytest.skip('Type Void is not supported in KQP') - - if 'variant(' in sql_query.lower(): - pytest.skip('Type Variant is not supported in KQP') + # Unsupported test constructions + if 'custom check:' in sql_query: + pytest.skip('custom checks is not supported for KqpRun output format') def run_test(suite, case, cfg): @@ -201,33 +158,28 @@ def run_test(suite, case, cfg): full_test_name = suite + '/' + case if full_test_name in EXCLUDED_TESTS: - pytest.skip('skip case ' + suite + '/' + suite) + pytest.skip('skip case ' + full_test_name) - program_sql = os.path.join(DATA_PATH, suite, '%s.sql' % case) - with codecs.open(program_sql, encoding='utf-8') as program_file_descr: - sql_query = program_file_descr.read() - validate_sql(sql_query) + run_file_kqp(suite, case, cfg) - result = run_file_kqp(suite, case, cfg) - if do_custom_query_check(result, sql_query): - return None +def run_file_kqp_no_cache(suite, case, cfg): + config = get_config(suite, case, cfg) + sql_query = get_sql_query('yt', suite, case, config) + in_tables = get_tables(suite, config, DATA_PATH, def_attr=KSV_ATTR)[0] - #if os.path.exists(result.results_file) and full_test_name not in EXCLUDED_CANONIZATION: - # return yatest.common.canonical_file(result.results_file) + check_provider('yt', config) + validate_sql(sql_query) + if get_parameters_files(suite, config): + pytest.skip('params is not supported in KqpRun') -def run_file_kqp_no_cache(suite, case, cfg): - config = get_config(suite, case, cfg) + if get_files(suite, config, DATA_PATH) or get_http_files(suite, config, DATA_PATH): + pytest.skip('file attachment is not supported in KQP') if is_xfail(config): pytest.skip('skip fail tests') - check_provider('yt', config) - - sql_query = get_sql_query('yt', suite, case, config) - in_tables = get_tables(suite, config, DATA_PATH, def_attr=KSV_ATTR)[0] - kqprun = KqpRun( udfs_dir=yql_binary_path('ydb/library/yql/tests/common/test_framework/udfs_deps') ) diff --git a/ydb/tests/fq/yt/kqprun.py b/ydb/tests/fq/yt/kqprun.py index a9564e07743a..3179fee6bee7 100644 --- a/ydb/tests/fq/yt/kqprun.py +++ b/ydb/tests/fq/yt/kqprun.py @@ -1,5 +1,6 @@ import os +import pytest import yatest.common import yql_utils @@ -43,10 +44,13 @@ def yql_exec(self, program=None, program_file=None, verbose=False, check_error=T '--result-file=%(results_file)s ' \ '--log-file=%(log_file)s ' \ '--udfs-dir=%(udfs_dir)s ' \ + '--result-format full-proto ' \ '--result-rows-limit 0 ' % locals() if tables is not None: for table in tables: + if table.format != 'yson': + pytest.skip('skip tests containing tables with a non-yson attribute format') cmd += '--table=yt.Root/%s@%s ' % (table.full_name, table.yqlrun_file) proc_result = yatest.common.process.execute(cmd.strip().split(), check_exit_code=False, cwd=self.res_dir) diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index b419ee2214f2..99c88dcbe79b 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -87,7 +87,9 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner } if (executionOptions.HasResults()) { - runner.PrintScriptResults(); + if (!runner.PrintScriptResults()) { + ythrow yexception() << "Failed to print script results"; + } } Cout << colors.Yellow() << "Finalization of kqp runner..." << colors.Default() << Endl; @@ -230,7 +232,7 @@ void RunMain(int argc, const char* argv[]) { .RequiredArgument("STR") .DefaultValue(planOutputFormat) .StoreResult(&planOutputFormat); - options.AddLongOption('R', "result-format", "Script query result format, one of { rows | full }") + options.AddLongOption('R', "result-format", "Script query result format, one of { rows | full-json | full-proto }") .Optional() .RequiredArgument("STR") .DefaultValue(resultOutputFormat) @@ -307,7 +309,8 @@ void RunMain(int argc, const char* argv[]) { runnerOptions.ResultOutputFormat = (resultOutputFormat == TStringBuf("rows")) ? NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson - : (resultOutputFormat == TStringBuf("full")) ? NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson + : (resultOutputFormat == TStringBuf("full-json")) ? NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson + : (resultOutputFormat == TStringBuf("full-proto")) ? NKqpRun::TRunnerOptions::EResultOutputFormat::FullProto : NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson; runnerOptions.PlanOutputFormat = diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index 5d833e670150..06c163c36789 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -38,6 +38,7 @@ struct TRunnerOptions { enum class EResultOutputFormat { RowsJson, // Rows in json format FullJson, // Columns, rows and types in json format + FullProto, // Columns, rows and types in proto string format }; IOutputStream* ResultOutput = &Cout; diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index 21c31f74ade2..effbc3491d1f 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -118,14 +118,17 @@ class TKqpRunner::TImpl { return true; } - void PrintScriptResults() const { + bool PrintScriptResults() const { Cout << CoutColors_.Cyan() << "Writing script query results" << CoutColors_.Default() << Endl; for (size_t i = 0; i < ResultSets_.size(); ++i) { if (ResultSets_.size() > 1) { *Options_.ResultOutput << CoutColors_.Cyan() << "Result set " << i + 1 << ":" << CoutColors_.Default() << Endl; } - PrintScriptResult(ResultSets_[i]); + if (!PrintScriptResult(ResultSets_[i])) { + return false; + } } + return true; } private: @@ -197,7 +200,7 @@ class TKqpRunner::TImpl { } } - void PrintScriptResult(const Ydb::ResultSet& resultSet) const { + bool PrintScriptResult(const Ydb::ResultSet& resultSet) const { switch (Options_.ResultOutputFormat) { case TRunnerOptions::EResultOutputFormat::RowsJson: { NYdb::TResultSet result(resultSet); @@ -205,15 +208,30 @@ class TKqpRunner::TImpl { while (parser.TryNextRow()) { NJsonWriter::TBuf writer(NJsonWriter::HEM_UNSAFE, Options_.ResultOutput); writer.SetWriteNanAsString(true); - NYdb::FormatResultRowJson(parser, result.GetColumnsMeta(), writer, NYdb::EBinaryStringEncoding::Unicode); + try { + NYdb::FormatResultRowJson(parser, result.GetColumnsMeta(), writer, NYdb::EBinaryStringEncoding::Unicode); + } catch (...) { + Cerr << CerrColors_.Red() << "Failed to convert query result rows to JSON, reason:\n" << CurrentExceptionMessage() << "\nTry to use --result-format full-proto" << CerrColors_.Default() << Endl; + return false; + } *Options_.ResultOutput << Endl; } - break; + return true; } case TRunnerOptions::EResultOutputFormat::FullJson: resultSet.PrintJSON(*Options_.ResultOutput); - break; + *Options_.ResultOutput << Endl; + return true; + + case TRunnerOptions::EResultOutputFormat::FullProto: + TString resultSetString; + google::protobuf::TextFormat::Printer printer; + printer.SetSingleLineMode(false); + printer.SetUseUtf8StringEscaping(true); + printer.PrintToString(resultSet, &resultSetString); + *Options_.ResultOutput << resultSetString; + return true; } } @@ -260,8 +278,8 @@ bool TKqpRunner::ForgetExecutionOperation() { return Impl_->ForgetExecutionOperation(); } -void TKqpRunner::PrintScriptResults() const { - Impl_->PrintScriptResults(); +bool TKqpRunner::PrintScriptResults() const { + return Impl_->PrintScriptResults(); } } // namespace NKqpRun diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.h b/ydb/tests/tools/kqprun/src/kqp_runner.h index b263bbedbf55..edd4c76e809e 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.h +++ b/ydb/tests/tools/kqprun/src/kqp_runner.h @@ -22,7 +22,7 @@ class TKqpRunner { bool ForgetExecutionOperation(); - void PrintScriptResults() const; + bool PrintScriptResults() const; private: class TImpl;