Skip to content

Commit

Permalink
YQ-3006 fix drop table for external entities (#4064)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Apr 25, 2024
1 parent 049f8e0 commit 7999411
Show file tree
Hide file tree
Showing 14 changed files with 276 additions and 68 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
switch (entry.Kind) {
case EKind::KindExternalDataSource: {
auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table);
if (!externalDataSourceMetadata.Success()) {
if (!externalDataSourceMetadata.Success() || !settings.RequestAuthInfo_) {
promise.SetValue(externalDataSourceMetadata);
return;
}
Expand Down
24 changes: 17 additions & 7 deletions ydb/core/kqp/provider/yql_kikimr_datasink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
auto tableType = settings.TableType.IsValid()
? GetTableTypeFromString(settings.TableType.Cast())
: ETableType::Table; // v0, pg support
ctx->Tables().GetOrAddTable(TString(cluster), ctx->GetDatabase(), key.GetTablePath(), tableType);
ctx->Tables().GetOrAddTable(TString(cluster), ctx->GetDatabase(), key.GetTablePath(), tableType).DisableAuthInfo();
}

TStatus HandleWrite(TExprBase node, TExprContext& ctx) override {
Expand Down Expand Up @@ -557,18 +557,28 @@ class TKikimrDataSink : public TDataProviderBase
return true;
}

if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource && tableDesc.Metadata->TableType == NYql::ETableType::Unknown) {
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Attempt to write to external data source \"" << key.GetTablePath() << "\" without table. Please specify table to write to"));
return false;
}

if (tableDesc.Metadata->ExternalSource.SourceType != ESourceType::ExternalDataSource && tableDesc.Metadata->ExternalSource.SourceType != ESourceType::ExternalTable) {
YQL_CVLOG(NLog::ELevel::ERROR, NLog::EComponent::ProviderKikimr) << "Skip RewriteIO for external entity: unknown entity type: " << (int)tableDesc.Metadata->ExternalSource.SourceType;
return true;
}

if (mode != "insert_abort") {
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Write mode '" << static_cast<TStringBuf>(mode) << "' is not supported for external entities"));
if (mode == "drop" || mode == "drop_if_exists") {
TString dropHint;
if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource) {
dropHint = "DROP EXTERNAL DATA SOURCE";
} else if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalTable) {
dropHint = "DROP EXTERNAL TABLE";
}
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Cannot drop external entity by using DROP TABLE" << (dropHint ? ". Please use " : "") << dropHint));
} else {
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Write mode '" << static_cast<TStringBuf>(mode) << "' is not supported for external entities"));
}
return false;
}

if (tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource && tableDesc.Metadata->TableType == NYql::ETableType::Unknown) {
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Attempt to write to external data source \"" << key.GetTablePath() << "\" without table. Please specify table to write to"));
return false;
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/yql_kikimr_datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase {
.WithTableStats(table.GetNeedsStats())
.WithPrivateTables(IsInternalCall)
.WithExternalDatasources(SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources())
.WithAuthInfo(table.GetNeedAuthInfo())
);

futures.push_back(future.Apply([result, queryType]
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -765,9 +765,15 @@ class IKikimrGateway : public TThrRefBase {
return *this;
}

TLoadTableMetadataSettings& WithAuthInfo(bool enable) {
RequestAuthInfo_ = enable;
return *this;
}

bool RequestStats_ = false;
bool WithPrivateTables_ = false;
bool WithExternalDatasources_ = false;
bool RequestAuthInfo_ = true;
};

class IKqpTableMetadataLoader : public std::enable_shared_from_this<IKqpTableMetadataLoader> {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,15 @@ class TKikimrTableDescription {

void RequireStats() { NeedsStats = true; }
bool GetNeedsStats() const { return NeedsStats; }
void DisableAuthInfo() { NeedAuthInfo = false; }
bool GetNeedAuthInfo() const { return NeedAuthInfo; }
ETableType GetTableType() const { return TableType; }
void SetTableType(ETableType tableType) { TableType = tableType; }

private:
THashMap<TString, const TTypeAnnotationNode*> ColumnTypes;
bool NeedsStats = false;
bool NeedAuthInfo = true;
ETableType TableType;
};

Expand Down
71 changes: 71 additions & 0 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,77 @@ Y_UNIT_TEST_SUITE(KqpFederatedSchemeTest) {
checkTableExists(false, 1);
checkDrop(true, EEx::IfExists, 1);
}

void TestInvalidDropForExternalTableWithAuth(std::function<std::pair<bool, TString>(const TString&)> queryExecuter, TString tableSuffix) {
const TString externalDataSourceName = "test_data_source_" + tableSuffix;
const TString externalTableName = "test_table_" + tableSuffix;

// Create external table
{
const TString sql = TStringBuilder() << R"(
UPSERT OBJECT mysasignature (TYPE SECRET) WITH (value = "mysasignaturevalue");
CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="my-bucket",
AUTH_METHOD="SERVICE_ACCOUNT",
SERVICE_ACCOUNT_ID="mysa",
SERVICE_ACCOUNT_SECRET_NAME="mysasignature"
);
CREATE EXTERNAL TABLE `)" << externalTableName << R"(` (
Key Uint64
) WITH (
DATA_SOURCE=")" << externalDataSourceName << R"(",
LOCATION="/",
FORMAT="json_each_row"
);)";
const auto& [success, issues] = queryExecuter(sql);
UNIT_ASSERT_C(success, issues);
}

// Drop secret object
{
const TString sql = "DROP OBJECT mysasignature (TYPE SECRET)";
const auto& [success, issues] = queryExecuter(sql);
UNIT_ASSERT_C(success, issues);
}

// Drop external table
{
const TString sql = TStringBuilder() << "DROP TABLE `" << externalTableName << "`";
const auto& [success, issues] = queryExecuter(sql);
UNIT_ASSERT(!success);
UNIT_ASSERT_STRING_CONTAINS(issues, "Cannot drop external entity by using DROP TABLE. Please use DROP EXTERNAL TABLE");
}

// Drop external data source
{
const TString sql = TStringBuilder() << "DROP TABLE `" << externalDataSourceName << "`";
const auto& [success, issues] = queryExecuter(sql);
UNIT_ASSERT(!success);
UNIT_ASSERT_STRING_CONTAINS(issues, "Cannot drop external entity by using DROP TABLE. Please use DROP EXTERNAL DATA SOURCE");
}
}

Y_UNIT_TEST(InvalidDropForExternalTableWithAuth) {
auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());

auto driver = kikimr->GetDriver();
NScripting::TScriptingClient yqlScriptClient(driver);
auto yqlScriptClientExecutor = [&](const TString& sql) {
Cerr << "Execute sql by yql script client:\n" << sql << Endl;
auto result = yqlScriptClient.ExecuteYqlScript(sql).GetValueSync();
return std::make_pair(result.IsSuccess(), result.GetIssues().ToString());
};
TestInvalidDropForExternalTableWithAuth(yqlScriptClientExecutor, "yql_script");

auto queryClient = kikimr->GetQueryClient();
auto queryClientExecutor = [&](const TString& sql) {
Cerr << "Execute sql by query client:\n" << sql << Endl;
auto result = queryClient.ExecuteQuery(sql, TTxControl::NoTx()).GetValueSync();
return std::make_pair(result.IsSuccess(), result.GetIssues().ToString());
};
TestInvalidDropForExternalTableWithAuth(queryClientExecutor, "generic_query");
}
}

} // namespace NKikimr::NKqp
84 changes: 69 additions & 15 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,25 @@
#include <util/system/env.h>

#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
#include <ydb/library/yql/public/udf/udf_static_registry.h>
#include <ydb/library/yql/utils/backtrace/backtrace.h>


struct TExecutionOptions {
enum class EClearExecutionCase {
Disabled,
GenericQuery,
YqlScript
};

TString ScriptQuery;
TString SchemeQuery;

bool ClearExecution = false;
bool ForgetExecution = false;
EClearExecutionCase ClearExecution = EClearExecutionCase::Disabled;
NKikimrKqp::EQueryAction ScriptQueryAction = NKikimrKqp::QUERY_ACTION_EXECUTE;

TString ScriptTraceId = "kqprun";
TString TraceId = "kqprun";

bool HasResults() const {
return ScriptQuery && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE;
Expand All @@ -38,15 +45,16 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner

if (executionOptions.SchemeQuery) {
Cout << colors.Yellow() << "Executing scheme query..." << colors.Default() << Endl;
if (!runner.ExecuteSchemeQuery(executionOptions.SchemeQuery)) {
if (!runner.ExecuteSchemeQuery(executionOptions.SchemeQuery, executionOptions.TraceId)) {
ythrow yexception() << "Scheme query execution failed";
}
}

if (executionOptions.ScriptQuery) {
Cout << colors.Yellow() << "Executing script..." << colors.Default() << Endl;
if (!executionOptions.ClearExecution) {
if (!runner.ExecuteScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.ScriptTraceId)) {
switch (executionOptions.ClearExecution) {
case TExecutionOptions::EClearExecutionCase::Disabled:
if (!runner.ExecuteScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
ythrow yexception() << "Script execution failed";
}
Cout << colors.Yellow() << "Fetching script results..." << colors.Default() << Endl;
Expand All @@ -59,10 +67,19 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
ythrow yexception() << "Forget script execution operation failed";
}
}
} else {
if (!runner.ExecuteQuery(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.ScriptTraceId)) {
break;

case TExecutionOptions::EClearExecutionCase::GenericQuery:
if (!runner.ExecuteQuery(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
ythrow yexception() << "Query execution failed";
}
break;

case TExecutionOptions::EClearExecutionCase::YqlScript:
if (!runner.ExecuteYqlScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
ythrow yexception() << "Yql script execution failed";
}
break;
}
}

Expand All @@ -86,15 +103,34 @@ THolder<TFileOutput> SetupDefaultFileOutput(const TString& filePath, IOutputStre
}


TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> CreateFunctionRegistry(const TString& udfsDirectory, TVector<TString> udfsPaths) {
void ReplaceTemplate(const TString& variableName, const TString& variableValue, TString& query) {
TString variableTemplate = TStringBuilder() << "${" << variableName << "}";
for (size_t position = query.find(variableTemplate); position != TString::npos; position = query.find(variableTemplate, position)) {
query.replace(position, variableTemplate.size(), variableValue);
position += variableValue.size();
}
}


TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> CreateFunctionRegistry(const TString& udfsDirectory, TVector<TString> udfsPaths, bool excludeLinkedUdfs) {
if (!udfsDirectory.empty() || !udfsPaths.empty()) {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
Cout << colors.Yellow() << "Fetching udfs..." << colors.Default() << Endl;
}

NKikimr::NMiniKQL::FindUdfsInDir(udfsDirectory, &udfsPaths);
auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone();
NKikimr::NMiniKQL::FillStaticModules(*functionRegistry);

if (excludeLinkedUdfs) {
for (const auto& wrapper : NYql::NUdf::GetStaticUdfModuleWrapperList()) {
auto [name, ptr] = wrapper();
if (!functionRegistry->IsLoadedUdfModule(name)) {
functionRegistry->AddModule(TString(NKikimr::NMiniKQL::StaticModulePrefix) + name, name, std::move(ptr));
}
}
} else {
NKikimr::NMiniKQL::FillStaticModules(*functionRegistry);
}

return functionRegistry;
}
Expand All @@ -113,6 +149,7 @@ void RunMain(int argc, const char* argv[]) {
TString logFile = "-";
TString appConfigFile = "./configuration/app_config.conf";

TString clearExecutionType = "disabled";
TString traceOptType = "disabled";
TString scriptQueryAction = "execute";
TString planOutputFormat = "pretty";
Expand All @@ -121,6 +158,7 @@ void RunMain(int argc, const char* argv[]) {

TVector<TString> udfsPaths;
TString udfsDirectory;
bool excludeLinkedUdfs = false;

NLastGetopt::TOpts options = NLastGetopt::TOpts::Default();
options.AddLongOption('p', "script-query", "Script query to execute")
Expand Down Expand Up @@ -158,11 +196,11 @@ void RunMain(int argc, const char* argv[]) {
.RequiredArgument("FILE")
.StoreResult(&scriptQueryPlanFile);

options.AddLongOption('C', "clear-execution", "Execute script query without RunScriptActor in one query request")
options.AddLongOption('C', "clear-execution", "Execute script query without creating additional tables, one of { query | yql-script }")
.Optional()
.NoArgument()
.DefaultValue(executionOptions.ClearExecution)
.SetFlag(&executionOptions.ClearExecution);
.RequiredArgument("STR")
.DefaultValue(clearExecutionType)
.StoreResult(&clearExecutionType);
options.AddLongOption('F', "forget", "Forget script execution operation after fetching results, cannot be used with -C")
.Optional()
.NoArgument()
Expand Down Expand Up @@ -202,21 +240,37 @@ void RunMain(int argc, const char* argv[]) {
.Optional()
.RequiredArgument("PATH")
.StoreResult(&udfsDirectory);
options.AddLongOption("exclude-linked-udfs", "Exclude linked udfs when same udf passed from -u or --udfs-dir")
.Optional()
.NoArgument()
.DefaultValue(excludeLinkedUdfs)
.SetFlag(&excludeLinkedUdfs);

NLastGetopt::TOptsParseResult parsedOptions(&options, argc, argv);

// Environment variables

const TString& yqlToken = GetEnv(NKqpRun::YQL_TOKEN_VARIABLE);

// Execution options

if (!schemeQueryFile && !scriptQueryFile) {
ythrow yexception() << "Nothing to execute";
}
if (schemeQueryFile) {
executionOptions.SchemeQuery = TFileInput(schemeQueryFile).ReadAll();
ReplaceTemplate(NKqpRun::YQL_TOKEN_VARIABLE, yqlToken, executionOptions.SchemeQuery);
}
if (scriptQueryFile) {
executionOptions.ScriptQuery = TFileInput(scriptQueryFile).ReadAll();
}

executionOptions.ClearExecution =
(clearExecutionType == TStringBuf("query")) ? TExecutionOptions::EClearExecutionCase::GenericQuery
: (clearExecutionType == TStringBuf("yql-script")) ? TExecutionOptions::EClearExecutionCase::YqlScript
: (clearExecutionType == TStringBuf("disabled")) ? TExecutionOptions::EClearExecutionCase::Disabled
: TExecutionOptions::EClearExecutionCase::Disabled;

executionOptions.ScriptQueryAction =
(scriptQueryAction == TStringBuf("execute")) ? NKikimrKqp::QUERY_ACTION_EXECUTE
: (scriptQueryAction == TStringBuf("explain")) ? NKikimrKqp::QUERY_ACTION_EXPLAIN
Expand Down Expand Up @@ -255,8 +309,8 @@ void RunMain(int argc, const char* argv[]) {
std::remove(logFile.c_str());
}

runnerOptions.YdbSettings.YqlToken = GetEnv("YQL_TOKEN");
runnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(udfsDirectory, udfsPaths).Get();
runnerOptions.YdbSettings.YqlToken = yqlToken;
runnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(udfsDirectory, udfsPaths, excludeLinkedUdfs).Get();

TString appConfigData = TFileInput(appConfigFile).ReadAll();
if (!google::protobuf::TextFormat::ParseFromString(appConfigData, &runnerOptions.YdbSettings.AppConfig)) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@

namespace NKqpRun {

constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN";

struct TYdbSetupSettings {
TString DomainName = "Root";

bool TraceOptEnabled = false;
TMaybe<TString> LogOutputFile;

TString YqlToken;
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry = nullptr;
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry;
NKikimrConfig::TAppConfig AppConfig;
};

Expand Down
Loading

0 comments on commit 7999411

Please sign in to comment.