Skip to content

Commit

Permalink
[YQ-1997] Support OR REPLACE syntax in fq_proxy (ydb-platform#1891)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alnen committed Feb 16, 2024
1 parent e2b5982 commit 0be8b51
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 80 deletions.
4 changes: 4 additions & 0 deletions ydb/core/fq/libs/compute/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ class TComputeConfig {
}
}

[[nodiscard]] bool IsReplaceIfExistsSyntaxSupported() const {
return ComputeConfig.GetSupportedComputeYdbFeatures().GetReplaceIfExists();
}

private:
NFq::NConfig::TComputeConfig ComputeConfig;
NFq::NConfig::EComputeType DefaultCompute;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ class TSynchronizeScopeActor : public NActors::TActorBootstrapped<TSynchronizeSc
Counters,
TPermissions{},
CommonConfig,
ComputeConfig,
Signer,
true,
connection.first
Expand All @@ -424,7 +425,7 @@ class TSynchronizeScopeActor : public NActors::TActorBootstrapped<TSynchronizeSc
request.Get()->Get()->YDBClient = Client;
auto it = Connections.find(binding.second.content().connection_id());
if (it == Connections.end()) {
NYql::TIssue issue {TStringBuilder {}
NYql::TIssue issue {TStringBuilder {}
<< "While synchronizing tables for binding with id '" << binding.first << "'"};
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(TStringBuilder{}
<< "Can't find connection with id '" << binding.second.content().connection_id() << "'"));
Expand All @@ -439,6 +440,7 @@ class TSynchronizeScopeActor : public NActors::TActorBootstrapped<TSynchronizeSc
TDuration::Seconds(30),
Counters,
TPermissions{},
ComputeConfig,
true,
binding.first
));
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/fq/libs/config/protos/compute.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,14 @@ message TComputeMapping {
TActivationPercentage Activation = 3;
}

message TSupportedComputeYdbFeatures {
bool ReplaceIfExists = 1;
}

message TComputeConfig {
TInPlaceCompute InPlace = 1;
TYdbCompute Ydb = 2;
EComputeType DefaultCompute = 3;
repeated TComputeMapping ComputeMapping = 4;
TSupportedComputeYdbFeatures SupportedComputeYdbFeatures = 5;
}
28 changes: 15 additions & 13 deletions ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ namespace NFq {
namespace NPrivate {

TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& content,
const TString& connectionName) {
const TString& connectionName,
bool replaceIfExists) {
using namespace fmt::literals;

auto bindingName = content.name();
Expand Down Expand Up @@ -69,23 +70,22 @@ TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& c

return fmt::format(
R"(
CREATE EXTERNAL TABLE {externalTableName} (
CREATE {replaceIfSupported} EXTERNAL TABLE {externalTableName} (
{columns}
) WITH (
{withOptions}
);)",
"replaceIfSupported"_a = replaceIfExists ? "OR REPLACE" : "",
"externalTableName"_a = EncloseAndEscapeString(bindingName, '`'),
"columns"_a = JoinMapRange(",\n",
subset.schema().column().begin(),
"columns"_a = JoinMapRange(",\n", subset.schema().column().begin(),
subset.schema().column().end(),
columnsTransformFunction),
"withOptions"_a = JoinMapRange(",\n",
withOptions.begin(),
withOptions.end(),
[](const std::pair<TString, TString>& kv) -> TString {
return TStringBuilder{} << " " << kv.first
<< " = " << kv.second;
}));
"withOptions"_a =
JoinMapRange(",\n", withOptions.begin(), withOptions.end(),
[](const std::pair<TString, TString> &kv) -> TString {
return TStringBuilder{} << " " << kv.first << " = "
<< kv.second;
}));
}

TString SignAccountId(const TString& id, const TSigner::TPtr& signer) {
Expand Down Expand Up @@ -170,7 +170,8 @@ TString CreateAuthParamsQuery(const FederatedQuery::ConnectionSetting& setting,
TString MakeCreateExternalDataSourceQuery(
const FederatedQuery::ConnectionContent& connectionContent,
const TSigner::TPtr& signer,
const NConfig::TCommonConfig& common) {
const NConfig::TCommonConfig& common,
bool replaceIfExists) {
using namespace fmt::literals;

TString properties;
Expand Down Expand Up @@ -225,11 +226,12 @@ TString MakeCreateExternalDataSourceQuery(
auto sourceName = connectionContent.name();
return fmt::format(
R"(
CREATE EXTERNAL DATA SOURCE {external_source} WITH (
CREATE {replaceIfSupported} EXTERNAL DATA SOURCE {external_source} WITH (
{properties}
{auth_params}
);
)",
"replaceIfSupported"_a = replaceIfExists ? "OR REPLACE" : "",
"external_source"_a = EncloseAndEscapeString(sourceName, '`'),
"properties"_a = properties,
"auth_params"_a =
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ TMaybe<TString> DropSecretObjectQuery(const TString& name);
TString MakeCreateExternalDataSourceQuery(
const FederatedQuery::ConnectionContent& connectionContent,
const TSigner::TPtr& signer,
const NConfig::TCommonConfig& common);
const NConfig::TCommonConfig& common,
bool replaceIfExists);

TString MakeDeleteExternalDataSourceQuery(const TString& sourceName);

TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& content,
const TString& connectionName);
const TString& connectionName,
bool replaceIfExists);

TString MakeDeleteExternalDataTableQuery(const TString& tableName);

Expand Down
128 changes: 85 additions & 43 deletions ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,20 +521,25 @@ bool IsPathExistsIssue(const TStatus& status) {
}

/// Connection actors
NActors::IActor* MakeCreateConnectionActor(
IActor* MakeCreateConnectionActor(
const TActorId& proxyActorId,
TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters,
TPermissions permissions,
const NConfig::TCommonConfig& commonConfig,
const TCommonConfig& commonConfig,
const NFq::TComputeConfig& computeConfig,
TSigner::TPtr signer,
bool withoutRollback,
TMaybe<TString> connectionId) {
auto queryFactoryMethod =
[signer = std::move(signer),
requestTimeout,
&counters, permissions, withoutRollback, commonConfig](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request)
&counters,
permissions,
withoutRollback,
commonConfig,
computeConfig](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request)
-> std::vector<TSchemaQueryTask> {
auto& connectionContent = request->Get()->Request.content();

Expand All @@ -551,7 +556,7 @@ NActors::IActor* MakeCreateConnectionActor(
}

TScheduleErrorRecoverySQLGeneration alreadyExistRecoveryActorFactoryMethod =
[&request, requestTimeout, &counters, permissions](NActors::TActorId sender,
[&request, requestTimeout, &counters, permissions](TActorId sender,
const TStatus& status) {
if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS ||
status.GetIssues().ToOneLineString().Contains("error: path exist")) {
Expand All @@ -567,14 +572,15 @@ NActors::IActor* MakeCreateConnectionActor(
}
return false;
};
statements.push_back(
TSchemaQueryTask{.SQL = TString{MakeCreateExternalDataSourceQuery(
connectionContent, signer, commonConfig)},
.ScheduleErrorRecoverySQLGeneration =
withoutRollback ? NoRecoverySQLGeneration()
: alreadyExistRecoveryActorFactoryMethod,
.ShouldSkipStepOnError =
withoutRollback ? IsPathExistsIssue : NoSkipOnError()});
statements.push_back(TSchemaQueryTask{
.SQL = MakeCreateExternalDataSourceQuery(
connectionContent, signer, commonConfig,
computeConfig.IsReplaceIfExistsSyntaxSupported()),
.ScheduleErrorRecoverySQLGeneration =
withoutRollback ? NoRecoverySQLGeneration()
: alreadyExistRecoveryActorFactoryMethod,
.ShouldSkipStepOnError =
withoutRollback ? IsPathExistsIssue : NoSkipOnError()});
return statements;
};

Expand Down Expand Up @@ -610,10 +616,11 @@ NActors::IActor* MakeModifyConnectionActor(
TDuration requestTimeout,
TCounters& counters,
const NConfig::TCommonConfig& commonConfig,
const NFq::TComputeConfig& computeConfig,
TSigner::TPtr signer) {
auto queryFactoryMethod =
[signer = std::move(signer),
commonConfig](
commonConfig, computeConfig](
const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request)
-> std::vector<TSchemaQueryTask> {
using namespace fmt::literals;
Expand All @@ -628,8 +635,29 @@ NActors::IActor* MakeModifyConnectionActor(
CreateSecretObjectQuery(newConnectionContent.setting(),
newConnectionContent.name(),
signer);
std::vector<TSchemaQueryTask> statements;

bool replaceSupported = computeConfig.IsReplaceIfExistsSyntaxSupported();
if (replaceSupported &&
oldConnectionContent.name() == newConnectionContent.name()) {
// CREATE OR REPLACE
auto createSecretStatement =
CreateSecretObjectQuery(newConnectionContent.setting(),
newConnectionContent.name(), signer);

std::vector<TSchemaQueryTask> statements;
if (createSecretStatement) {
statements.push_back(
TSchemaQueryTask{.SQL = *createSecretStatement});
}

statements.push_back(TSchemaQueryTask{
.SQL = MakeCreateExternalDataSourceQuery(
newConnectionContent, signer, commonConfig, replaceSupported)});
return statements;
}

std::vector<TSchemaQueryTask> statements;
// remove and create new version
if (!oldBindings.empty()) {
statements.push_back(TSchemaQueryTask{
.SQL = JoinMapRange("\n",
Expand All @@ -644,16 +672,16 @@ NActors::IActor* MakeModifyConnectionActor(
oldBindings.begin(),
oldBindings.end(),
[&oldConnectionContent](const FederatedQuery::BindingContent& binding) {
return MakeCreateExternalDataTableQuery(binding,
oldConnectionContent.name());
return MakeCreateExternalDataTableQuery(
binding, oldConnectionContent.name(), false);
}),
.ShouldSkipStepOnError = IsPathDoesNotExistIssue});
};
}

statements.push_back(TSchemaQueryTask{
.SQL = TString{MakeDeleteExternalDataSourceQuery(oldConnectionContent.name())},
.RollbackSQL = TString{MakeCreateExternalDataSourceQuery(
oldConnectionContent, signer, commonConfig)},
oldConnectionContent, signer, commonConfig, false)},
.ShouldSkipStepOnError = IsPathDoesNotExistIssue});

if (dropOldSecret) {
Expand All @@ -672,7 +700,7 @@ NActors::IActor* MakeModifyConnectionActor(

statements.push_back(
TSchemaQueryTask{.SQL = TString{MakeCreateExternalDataSourceQuery(
newConnectionContent, signer, commonConfig)},
newConnectionContent, signer, commonConfig, false)},
.RollbackSQL = TString{MakeDeleteExternalDataSourceQuery(
newConnectionContent.name())}});

Expand All @@ -684,7 +712,7 @@ NActors::IActor* MakeModifyConnectionActor(
[&newConnectionContent](
const FederatedQuery::BindingContent& binding) {
return MakeCreateExternalDataTableQuery(
binding, newConnectionContent.name());
binding, newConnectionContent.name(), false);
}),
.RollbackSQL =
JoinMapRange("\n",
Expand Down Expand Up @@ -732,12 +760,12 @@ NActors::IActor* MakeDeleteConnectionActor(
auto dropSecret =
DropSecretObjectQuery(connectionContent.name());

std::vector<TSchemaQueryTask> statements = {TSchemaQueryTask{
.SQL = TString{MakeDeleteExternalDataSourceQuery(connectionContent.name())},
.RollbackSQL = MakeCreateExternalDataSourceQuery(connectionContent,
signer,
commonConfig),
.ShouldSkipStepOnError = IsPathDoesNotExistIssue}};
std::vector<TSchemaQueryTask> statements = {
TSchemaQueryTask{.SQL = TString{MakeDeleteExternalDataSourceQuery(
connectionContent.name())},
.RollbackSQL = MakeCreateExternalDataSourceQuery(
connectionContent, signer, commonConfig, false),
.ShouldSkipStepOnError = IsPathDoesNotExistIssue}};
if (dropSecret) {
statements.push_back(
TSchemaQueryTask{.SQL = *dropSecret,
Expand Down Expand Up @@ -774,11 +802,12 @@ NActors::IActor* MakeCreateBindingActor(
TDuration requestTimeout,
TCounters& counters,
TPermissions permissions,
const NFq::TComputeConfig& computeConfig,
bool withoutRollback,
TMaybe<TString> bindingId) {
auto queryFactoryMethod =
[requestTimeout,
&counters, permissions, withoutRollback](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request)
&counters, permissions, withoutRollback, computeConfig](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request)
-> std::vector<TSchemaQueryTask> {
auto& bindingContent = request->Get()->Request.content();
auto& externalSourceName = request->Get()->ConnectionContent->name();
Expand All @@ -787,7 +816,7 @@ NActors::IActor* MakeCreateBindingActor(
TScheduleErrorRecoverySQLGeneration alreadyExistRecoveryActorFactoryMethod =
[&request, requestTimeout, &counters, permissions](NActors::TActorId sender,
const TStatus& status) {
if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS ||
if (status.GetStatus() == EStatus::ALREADY_EXISTS ||
status.GetIssues().ToOneLineString().Contains("error: path exist")) {
TActivationContext::ActorSystem()->Register(
new TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor(
Expand All @@ -802,12 +831,14 @@ NActors::IActor* MakeCreateBindingActor(
return false;
};
statements.push_back(TSchemaQueryTask{
.SQL = TString{MakeCreateExternalDataTableQuery(bindingContent,
externalSourceName)},
.SQL = TString{MakeCreateExternalDataTableQuery(
bindingContent, externalSourceName,
computeConfig.IsReplaceIfExistsSyntaxSupported())},
.ScheduleErrorRecoverySQLGeneration =
withoutRollback ? NoRecoverySQLGeneration()
: alreadyExistRecoveryActorFactoryMethod,
.ShouldSkipStepOnError = withoutRollback ? IsPathExistsIssue : NoSkipOnError()});
.ShouldSkipStepOnError =
withoutRollback ? IsPathExistsIssue : NoSkipOnError()});
return statements;
};

Expand Down Expand Up @@ -842,24 +873,35 @@ NActors::IActor* MakeModifyBindingActor(
const TActorId& proxyActorId,
TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters) {
TCounters& counters,
const NFq::TComputeConfig& computeConfig) {
auto queryFactoryMethod =
[](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request)
[computeConfig](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request)
-> std::vector<TSchemaQueryTask> {
auto sourceName = request->Get()->ConnectionContent->name();
auto oldTableName = request->Get()->OldBindingContent->name();

bool replaceSupported = computeConfig.IsReplaceIfExistsSyntaxSupported();
if (replaceSupported &&
oldTableName == request->Get()->Request.content().name()) {
// CREATE OR REPLACE
return {TSchemaQueryTask{.SQL = MakeCreateExternalDataTableQuery(
request->Get()->Request.content(),
sourceName, replaceSupported)}};
}

// remove and create new version
auto deleteOldEntities = MakeDeleteExternalDataTableQuery(oldTableName);
auto createOldEntities =
MakeCreateExternalDataTableQuery(*request->Get()->OldBindingContent,
sourceName);
auto createNewEntities =
MakeCreateExternalDataTableQuery(request->Get()->Request.content(), sourceName);

return {TSchemaQueryTask{.SQL = deleteOldEntities,
.RollbackSQL = createOldEntities,
.ShouldSkipStepOnError = IsPathDoesNotExistIssue},
TSchemaQueryTask{.SQL = createNewEntities}};
auto createOldEntities = MakeCreateExternalDataTableQuery(
*request->Get()->OldBindingContent, sourceName, false);
auto createNewEntities = MakeCreateExternalDataTableQuery(
request->Get()->Request.content(), sourceName, false);

return {
TSchemaQueryTask{.SQL = deleteOldEntities,
.RollbackSQL = createOldEntities,
.ShouldSkipStepOnError = IsPathDoesNotExistIssue},
TSchemaQueryTask{.SQL = createNewEntities}};
};

auto errorMessageFactoryMethod = [](const EStatus status,
Expand Down
Loading

0 comments on commit 0be8b51

Please sign in to comment.