Skip to content

Commit

Permalink
Merge 00ad227 into 3a7aa37
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored May 2, 2024
2 parents 3a7aa37 + 00ad227 commit 13d4667
Show file tree
Hide file tree
Showing 17 changed files with 203 additions and 201 deletions.
2 changes: 1 addition & 1 deletion ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
},
{
ToString(NYql::EDatabaseType::YT),
CreateExternalDataSource(TString{NYql::YtProviderName}, {"TOKEN"}, {}, hostnamePatternsRegEx)
CreateExternalDataSource(TString{NYql::YtProviderName}, {"NONE", "TOKEN"}, {}, hostnamePatternsRegEx)
}
});
}
Expand Down
17 changes: 17 additions & 0 deletions ydb/core/kqp/common/kqp_yql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,4 +454,21 @@ TString PrintKqpStageOnly(const TDqStageBase& stage, TExprContext& ctx) {
return KqpExprToPrettyString(TExprBase(newStage), ctx);
}

std::unordered_set<IDqIntegration*> GetUniqueIntegrations(TTypeAnnotationContext& typesCtx) {
std::unordered_set<IDqIntegration*> uniqueIntegrations;
for (const auto& provider : typesCtx.DataSources) {
if (auto* dqIntegration = provider->GetDqIntegration()) {
uniqueIntegrations.emplace(dqIntegration);
}
}

for (const auto& provider : typesCtx.DataSinks) {
if (auto* dqIntegration = provider->GetDqIntegration()) {
uniqueIntegrations.emplace(dqIntegration);
}
}

return uniqueIntegrations;
}

} // namespace NYql
3 changes: 3 additions & 0 deletions ydb/core/kqp/common/kqp_yql.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h>
#include <ydb/library/yql/ast/yql_pos_handle.h>
#include <ydb/library/yql/ast/yql_expr.h>
#include <ydb/library/yql/dq/integration/yql_dq_integration.h>

namespace NYql {

Expand Down Expand Up @@ -124,4 +125,6 @@ TString KqpExprToPrettyString(const NNodes::TExprBase& expr, TExprContext& ctx);

TString PrintKqpStageOnly(const NNodes::TDqStageBase& stage, TExprContext& ctx);

std::unordered_set<IDqIntegration*> GetUniqueIntegrations(TTypeAnnotationContext& typesCtx);

} // namespace NYql
1 change: 1 addition & 0 deletions ydb/core/kqp/common/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ PEERDIR(
ydb/library/yql/core/issue
ydb/library/yql/dq/actors
ydb/library/yql/dq/common
ydb/library/yql/dq/integration
ydb/library/yql/parser/pg_wrapper/interface
ydb/public/lib/operation_id
ydb/public/lib/operation_id/protos
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,17 @@ class TKqpRunner : public IKqpRunner {
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
.Build(false);

auto dqIntegrationPeephole = TTransformationPipeline(typesCtx);
for (auto* dqIntegration : GetUniqueIntegrations(*typesCtx)) {
dqIntegration->ConfigurePeepholePipeline(true, {}, &dqIntegrationPeephole);
}

auto physicalPeepholeTransformer = TTransformationPipeline(typesCtx)
.AddServiceTransformers()
.Add(Log("PhysicalPeephole"), "LogPhysicalPeephole")
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config))
.AddPostTypeAnnotation()
.Add(dqIntegrationPeephole.Build(), "DqIntegrationPeephole")
.Add(
CreateKqpTxsPeepholeTransformer(
CreateTypeAnnotationTransformer(
Expand Down
34 changes: 22 additions & 12 deletions ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ struct TKiExploreTxResults {
TVector<TExprBase> Sync;
TVector<TKiQueryBlock> QueryBlocks;
bool HasExecute;
bool HasErrors;

THashSet<const TExprNode*> GetSyncSet() const {
THashSet<const TExprNode*> syncSet;
Expand Down Expand Up @@ -280,10 +281,11 @@ struct TKiExploreTxResults {
}

TKiExploreTxResults()
: HasExecute(false) {}
: HasExecute(false)
, HasErrors(false) {}
};

bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types, bool estimateReadSize = true) {
bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types, bool estimateReadSize, bool* hasErrors = nullptr) {
if (node.Ref().ChildrenSize() <= 1) {
return false;
}
Expand All @@ -294,13 +296,21 @@ bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&
auto dataSourceProviderIt = types.DataSourceMap.find(dataSourceCategory);
if (dataSourceProviderIt != types.DataSourceMap.end()) {
if (auto* dqIntegration = dataSourceProviderIt->second->GetDqIntegration()) {
if (dqIntegration->CanRead(*node.Ptr(), ctx) &&
(!estimateReadSize || dqIntegration->EstimateReadSize(
TDqSettings::TDefault::DataSizePerJob,
TDqSettings::TDefault::MaxTasksPerStage,
{node.Raw()},
ctx))) {
return true;
try {
if (dqIntegration->CanRead(*node.Ptr(), ctx) &&
(!estimateReadSize || dqIntegration->EstimateReadSize(
TDqSettings::TDefault::DataSizePerJob,
TDqSettings::TDefault::MaxTasksPerStage,
{node.Raw()},
ctx))) {
return true;
}
} catch (const TFallbackError& error) {
TString message = TStringBuilder() << "Cannot execute read through DQ integration. Cause: " << error.what();
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), message));
if (hasErrors) {
*hasErrors = true;
}
}
}
}
Expand Down Expand Up @@ -388,7 +398,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return result;
}

if (IsDqRead(node, ctx, types)) {
if (IsDqRead(node, ctx, types, true, &txRes.HasErrors)) {
txRes.Ops.insert(node.Raw());
TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0);
return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
Expand Down Expand Up @@ -849,8 +859,8 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK

TKiExploreTxResults txExplore;
txExplore.ConcurrentResults = concurrentResults;
if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore, tablesData, types)) {
return node.Ptr();
if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore, tablesData, types) || txExplore.HasErrors) {
return txExplore.HasErrors ? nullptr : node.Ptr();
}

if (txExplore.HasExecute) {
Expand Down
14 changes: 2 additions & 12 deletions ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,8 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext&
return TRuntimeNode();
});

std::unordered_set<TString> usedProviders;
for (const auto& provider : typesCtx.DataSources) {
if (auto* dqIntegration = provider->GetDqIntegration()) {
dqIntegration->RegisterMkqlCompiler(*compiler);
usedProviders.emplace(provider->GetName());
}
}

for (const auto& provider : typesCtx.DataSinks) {
if (auto* dqIntegration = provider->GetDqIntegration(); dqIntegration && !usedProviders.contains(TString(provider->GetName()))) {
dqIntegration->RegisterMkqlCompiler(*compiler);
}
for (auto* dqIntegration : GetUniqueIntegrations(typesCtx)) {
dqIntegration->RegisterMkqlCompiler(*compiler);
}

compiler->AddCallable(TKqpWideReadTable::CallableName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def run(self, test_name: str, script: str, generic_settings: GenericSettings) ->
result_path = artifacts.make_path(test_name=test_name, artifact_name='result.json')

# For debug add option --trace-opt to args
cmd = f'{self.kqprun_path} -s {scheme_path} -p {script_path} --app-config={app_conf_path} --result-file={result_path} --result-format=full'
cmd = f'{self.kqprun_path} -s {scheme_path} -p {script_path} --app-config={app_conf_path} --result-file={result_path} --result-format=full-json'

output = None
data_out = None
Expand Down
31 changes: 17 additions & 14 deletions ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,16 @@ class TYtDqIntegration: public TDqIntegrationBase {
return maxDataSizePerJob;
}

void AddInfo(TExprContext& ctx, const TString& message, bool skipIssues) {
void AddInfo(TExprContext& ctx, const TString& message, bool skipIssues, bool throwException = false) {
if (!skipIssues) {
YQL_CLOG(INFO, ProviderDq) << message;
TIssue info("DQ cannot execute the query. Cause: " + message);
info.Severity = TSeverityIds::S_INFO;
ctx.IssueManager.RaiseIssue(info);
}
if (throwException) {
throw TFallbackError() << message;
}
}

bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues) override {
Expand Down Expand Up @@ -293,7 +296,7 @@ class TYtDqIntegration: public TDqIntegrationBase {
} else if (auto maybeRead = TMaybeNode<TYtReadTable>(&node)) {
auto cluster = maybeRead.Cast().DataSource().Cluster().StringValue();
if (!State_->Configuration->_EnableDq.Get(cluster).GetOrElse(true)) {
AddInfo(ctx, TStringBuilder() << "disabled for cluster " << cluster, skipIssues);
AddInfo(ctx, TStringBuilder() << "disabled for cluster " << cluster, skipIssues, State_->PassiveExecution);
return false;
}
const auto canUseYtPartitioningApi = State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false);
Expand All @@ -309,45 +312,45 @@ class TYtDqIntegration: public TDqIntegrationBase {
}
}
}
AddInfo(ctx, info, skipIssues);
AddInfo(ctx, info, skipIssues, State_->PassiveExecution);
return false;
}
auto sampleSetting = GetSetting(section.Settings().Ref(), EYtSettingType::Sample);
if (sampleSetting && sampleSetting->Child(1)->Child(0)->Content() == "system") {
AddInfo(ctx, "system sampling", skipIssues);
AddInfo(ctx, "system sampling", skipIssues, State_->PassiveExecution);
return false;
}
for (auto path: section.Paths()) {
if (!path.Table().Maybe<TYtTable>()) {
AddInfo(ctx, "non-table path", skipIssues);
AddInfo(ctx, "non-table path", skipIssues, State_->PassiveExecution);
return false;
} else {
auto pathInfo = TYtPathInfo(path);
auto tableInfo = pathInfo.Table;
auto epoch = TEpochInfo::Parse(path.Table().Maybe<TYtTable>().CommitEpoch().Ref());
if (!tableInfo->Stat) {
AddInfo(ctx, "table without statistics", skipIssues);
AddInfo(ctx, "table without statistics", skipIssues, State_->PassiveExecution);
return false;
} else if (!tableInfo->RowSpec) {
AddInfo(ctx, "table without row spec", skipIssues);
AddInfo(ctx, "table without row spec", skipIssues, State_->PassiveExecution);
return false;
} else if (!tableInfo->Meta) {
AddInfo(ctx, "table without meta", skipIssues);
AddInfo(ctx, "table without meta", skipIssues, State_->PassiveExecution);
return false;
} else if (tableInfo->IsAnonymous) {
AddInfo(ctx, "anonymous table", skipIssues);
AddInfo(ctx, "anonymous table", skipIssues, State_->PassiveExecution);
return false;
} else if ((!epoch.Empty() && *epoch.Get() > 0)) {
AddInfo(ctx, "table with non-empty epoch", skipIssues);
AddInfo(ctx, "table with non-empty epoch", skipIssues, State_->PassiveExecution);
return false;
} else if (NYql::HasSetting(tableInfo->Settings.Ref(), EYtSettingType::WithQB)) {
AddInfo(ctx, "table with QB2 premapper", skipIssues);
AddInfo(ctx, "table with QB2 premapper", skipIssues, State_->PassiveExecution);
return false;
} else if (pathInfo.Ranges && !canUseYtPartitioningApi) {
AddInfo(ctx, "table with ranges", skipIssues);
AddInfo(ctx, "table with ranges", skipIssues, State_->PassiveExecution);
return false;
} else if (tableInfo->Meta->IsDynamic && !canUseYtPartitioningApi) {
AddInfo(ctx, "dynamic table", skipIssues);
AddInfo(ctx, "dynamic table", skipIssues, State_->PassiveExecution);
return false;
}

Expand All @@ -356,7 +359,7 @@ class TYtDqIntegration: public TDqIntegrationBase {
}
}
if (auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); chunksCount > maxChunks) {
AddInfo(ctx, "table with too many chunks", skipIssues);
AddInfo(ctx, "table with too many chunks", skipIssues, State_->PassiveExecution);
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3487,7 +3487,7 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
return node;
}

auto count = CleanupWorld(countBase.Count(), ctx);
auto count = State_->PassiveExecution ? countBase.Count() : CleanupWorld(countBase.Count(), ctx);
if (!count) {
return {};
}
Expand Down
5 changes: 1 addition & 4 deletions ydb/tests/fq/yt/cfg/kqprun_scheme.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
CREATE OBJECT yt_token (TYPE SECRET) WITH (value = "token");

CREATE EXTERNAL DATA SOURCE plato WITH (
SOURCE_TYPE="YT",
LOCATION="localhost",
AUTH_METHOD="TOKEN",
TOKEN_SECRET_NAME="yt_token"
AUTH_METHOD="NONE"
);
1 change: 1 addition & 0 deletions ydb/tests/fq/yt/kqp_yt_file.make
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ ENDIF()

DEPENDS(
ydb/library/yql/tests/common/test_framework/udfs_deps
ydb/library/yql/udfs/test/test_import
ydb/tests/tools/kqprun
)

Expand Down
Loading

0 comments on commit 13d4667

Please sign in to comment.