Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge #5932 #5984 #6257 #6298 #6471 #6484 #6703 #6721

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ydb/core/external_sources/external_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ struct TExternalDataSource : public IExternalSource {
ythrow TExternalSourceException() << "Only external table supports parameters";
}

bool IsRDBMSDataSource(const TProtoStringType& sourceType) const {
return IsIn({"Greenplum", "PostgreSQL", "MySQL", "MsSQLServer", "Clickhouse"}, sourceType);
}

virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const override {
NKikimrSchemeOp::TExternalDataSourceDescription proto;
if (!proto.ParseFromString(externalDataSourceDescription)) {
Expand All @@ -49,6 +53,10 @@ struct TExternalDataSource : public IExternalSource {
ythrow TExternalSourceException() << "Unsupported property: " << key;
}

if (IsRDBMSDataSource(proto.GetSourceType()) && !proto.GetProperties().GetProperties().contains("database_name")){
ythrow TExternalSourceException() << proto.GetSourceType() << " source must provide database_name";
}

ValidateHostname(HostnamePatterns, proto.GetLocation());
}

Expand Down
24 changes: 24 additions & 0 deletions ydb/core/fq/libs/actors/clusters_from_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,30 @@ void AddClustersFromConnections(
clusters.emplace(connectionName, GenericProviderName);
break;
}
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
FillGenericClusterConfig(
common,
*gatewaysConfig.MutableGeneric()->AddClusterMapping(),
conn.content().setting().greenplum_cluster(),
connectionName,
NYql::NConnector::NApi::EDataSourceKind::GREENPLUM,
authToken,
accountIdSignatures);
clusters.emplace(connectionName, GenericProviderName);
break;
}
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
FillGenericClusterConfig(
common,
*gatewaysConfig.MutableGeneric()->AddClusterMapping(),
conn.content().setting().mysql_cluster(),
connectionName,
NYql::NConnector::NApi::EDataSourceKind::MYSQL,
authToken,
accountIdSignatures);
clusters.emplace(connectionName, GenericProviderName);
break;
}

// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
Expand Down
56 changes: 53 additions & 3 deletions ydb/core/fq/libs/actors/database_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
// There are two kinds of managed YDBs: serverless and dedicated.
// While working with dedicated databases, we have to use underlay network.
// That's why we add `u-` prefix to database fqdn.
if (databaseInfo.GetMap().contains("dedicatedDatabase")) {
if (databaseInfo.GetMap().contains("storageConfig")) {
endpoint = "u-" + endpoint;
host = "u-" + host;
}
Expand All @@ -335,7 +335,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
{
auto ret = ydbParser(databaseInfo, mdbEndpointGenerator, useTls, protocol);
// TODO: Take explicit field from MVP
bool isDedicatedDb = databaseInfo.GetMap().contains("dedicatedDatabase");
bool isDedicatedDb = databaseInfo.GetMap().contains("storageConfig");
if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) {
// Replace "ydb." -> "yds."
ret.Endpoint[2] = 's';
Expand Down Expand Up @@ -457,6 +457,56 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>

endpoint = mdbEndpointGenerator->ToEndpoint(params);

return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
};
Parsers[NYql::EDatabaseType::MySQL] = [](
NJson::TJsonValue& databaseInfo,
const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator,
bool useTls,
NConnector::NApi::EProtocol protocol
) {
NYql::IMdbEndpointGenerator::TEndpoint endpoint;
TVector<TString> aliveHosts;

const auto& hostsArray = databaseInfo.GetMap().at("hosts").GetArraySafe();

for (const auto& host : hostsArray) {
const auto& hostMap = host.GetMap();

if (!hostMap.contains("services")) {
// indicates that cluster is down
continue;
}

const auto& servicesArray = hostMap.at("services").GetArraySafe();

// check if all services of a particular host are alive
const bool alive = std::all_of(
servicesArray.begin(),
servicesArray.end(),
[](const auto& service) {
return service["health"].GetString() == "ALIVE";
}
);

if (alive) {
aliveHosts.push_back(host["name"].GetString());
}
}

if (aliveHosts.empty()) {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE MySQL hosts found";
}

NYql::IMdbEndpointGenerator::TParams params = {
.DatabaseType = NYql::EDatabaseType::MySQL,
.MdbHost = aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())],
.UseTls = useTls,
.Protocol = protocol,
};

endpoint = mdbEndpointGenerator->ToEndpoint(params);

return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
};
}
Expand Down Expand Up @@ -538,7 +588,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
.AddUrlParam("databaseId", databaseId)
.Build();
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL}, databaseType)) {
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL, NYql::EDatabaseType::MySQL}, databaseType)) {
YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway");
url = TUrlBuilder(
ev->Get()->MdbGateway + "/managed-" + NYql::DatabaseTypeLowercase(databaseType) + "/v1/clusters/")
Expand Down
81 changes: 77 additions & 4 deletions ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
R"(
{
"endpoint":"grpcs://lb.etnbrtlini51k7cinbdr.ydb.mdb.yandexcloud.net:2135/?database=/ru-central1/b1gtl2kg13him37quoo6/etn021us5r9rhld1vgbh",
"dedicatedDatabase":{"resuorcePresetId": "medium"}
"storageConfig":{"storageSizeLimit":107374182400}
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"u-lb.etnbrtlini51k7cinbdr.ydb.mdb.yandexcloud.net:2135"},
Expand Down Expand Up @@ -285,7 +285,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
R"(
{
"endpoint":"grpcs://lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135/?database=/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh",
"dedicatedDatabase":{"resourcePresetId": "medium"}
"storageConfig":{"storageSizeLimit":107374182400}
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135"},
Expand Down Expand Up @@ -473,6 +473,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
issues
);
}

Y_UNIT_TEST(Greenplum_MasterNode) {
Test(
NYql::EDatabaseType::Greenplum,
Expand Down Expand Up @@ -504,7 +505,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
TString(""),
true},
{});
}
}

Y_UNIT_TEST(Greenplum_PermissionDenied) {
NYql::TIssues issues{
Expand Down Expand Up @@ -535,7 +536,79 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
)",
NYql::TDatabaseResolverResponse::TDatabaseDescription{},
issues);
}
}

Y_UNIT_TEST(MySQL) {
Test(
NYql::EDatabaseType::MySQL,
NYql::NConnector::NApi::EProtocol::NATIVE,
"https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"200",
R"({
"hosts": [
{
"services": [
{
"type": "POOLER",
"health": "ALIVE"
},
{
"type": "MYSQL",
"health": "ALIVE"
}
],
"name": "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net",
"clusterId": "c9qb2bjghs8onbncpamk",
"zoneId": "ru-central1-b",
"role": "MASTER",
"health": "ALIVE"
}
]
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{""},
TString{"rc1b-eyt6dtobu96rwydq.db.yandex.net"},
3306,
TString(""),
true
},
{});
}

Y_UNIT_TEST(MySQL_PermissionDenied) {
NYql::TIssues issues{
NYql::TIssue(
TStringBuilder{} << MakeErrorPrefix(
"mdb.api.cloud.yandex.net:443",
"/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"etn021us5r9rhld1vgbh",
NYql::EDatabaseType::MySQL
) << NoPermissionStr
)
};

Test(
NYql::EDatabaseType::MySQL,
NYql::NConnector::NApi::EProtocol::NATIVE,
"https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"403",
R"(
{
"code": 7,
"message": "Permission denied",
"details": [
{
"@type": "type.googleapis.com/google.rpc.RequestInfo",
"requestId": "a943c092-d596-4e0e-ae7b-1f67f9d8164e"
}
]
}
)",
NYql::TDatabaseResolverResponse::TDatabaseDescription{},
issues
);
}


Y_UNIT_TEST(DataStreams_PermissionDenied) {
NYql::TIssues issues{
Expand Down
22 changes: 22 additions & 0 deletions ydb/core/fq/libs/common/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ TString ExtractServiceAccountId(const FederatedQuery::ConnectionSetting& setting
case FederatedQuery::ConnectionSetting::kPostgresqlCluster: {
return GetServiceAccountId(setting.postgresql_cluster().auth());
}
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
return GetServiceAccountId(setting.greenplum_cluster().auth());
}
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
return GetServiceAccountId(setting.mysql_cluster().auth());
}
// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
break;
Expand Down Expand Up @@ -157,6 +163,10 @@ TMaybe<TString> GetLogin(const FederatedQuery::ConnectionSetting& setting) {
return {};
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
return setting.postgresql_cluster().login();
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return setting.greenplum_cluster().login();
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return setting.mysql_cluster().login();
}
}

Expand All @@ -176,6 +186,10 @@ TMaybe<TString> GetPassword(const FederatedQuery::ConnectionSetting& setting) {
return {};
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
return setting.postgresql_cluster().password();
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return setting.greenplum_cluster().password();
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return setting.mysql_cluster().password();
}
}

Expand All @@ -195,6 +209,10 @@ EYdbComputeAuth GetYdbComputeAuthMethod(const FederatedQuery::ConnectionSetting&
return GetIamAuthMethod(setting.monitoring().auth());
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
return GetBasicAuthMethod(setting.postgresql_cluster().auth());
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return GetBasicAuthMethod(setting.greenplum_cluster().auth());
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return GetBasicAuthMethod(setting.mysql_cluster().auth());
}
}

Expand All @@ -212,6 +230,10 @@ FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection) {
return connection.content().setting().monitoring().auth();
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
return connection.content().setting().postgresql_cluster().auth();
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return connection.content().setting().greenplum_cluster().auth();
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return connection.content().setting().mysql_cluster().auth();
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
return FederatedQuery::IamAuth{};
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/compute/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class TComputeConfig {
case FederatedQuery::ConnectionSetting::kObjectStorage:
case FederatedQuery::ConnectionSetting::kClickhouseCluster:
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
case FederatedQuery::ConnectionSetting::kMysqlCluster:
case FederatedQuery::ConnectionSetting::kYdbDatabase:
return true;
case FederatedQuery::ConnectionSetting::kDataStreams:
Expand Down
37 changes: 34 additions & 3 deletions ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ TString MakeCreateExternalDataSourceQuery(
}
case FederatedQuery::ConnectionSetting::kMonitoring:
break;
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
const auto schema = connectionContent.setting().postgresql_cluster().schema();
case FederatedQuery::ConnectionSetting::kPostgresqlCluster: {
const auto pgschema = connectionContent.setting().postgresql_cluster().schema();
properties = fmt::format(
R"(
SOURCE_TYPE="PostgreSQL",
Expand All @@ -228,7 +228,38 @@ TString MakeCreateExternalDataSourceQuery(
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().postgresql_cluster().database_id(), '"'),
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().postgresql_cluster().database_name(), '"'),
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true",
"schema"_a = schema ? ", SCHEMA=" + EncloseAndEscapeString(schema, '"') : TString{});
"schema"_a = pgschema ? ", SCHEMA=" + EncloseAndEscapeString(pgschema, '"') : TString{});
}
break;
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
const auto gpschema = connectionContent.setting().greenplum_cluster().schema();
properties = fmt::format(
R"(
SOURCE_TYPE="Greenplum",
MDB_CLUSTER_ID={mdb_cluster_id},
DATABASE_NAME={database_name},
USE_TLS="{use_tls}"
{schema}
)",
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().greenplum_cluster().database_id(), '"'),
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().greenplum_cluster().database_name(), '"'),
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true",
"schema"_a = gpschema ? ", SCHEMA=" + EncloseAndEscapeString(gpschema, '"') : TString{});

}
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
properties = fmt::format(
R"(
SOURCE_TYPE="MySQL",
MDB_CLUSTER_ID={mdb_cluster_id},
DATABASE_NAME={database_name},
USE_TLS="{use_tls}"
)",
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().mysql_cluster().database_id(), '"'),
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().mysql_cluster().database_name(), '"'),
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true");

}
break;
}

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/fq/libs/control_plane_proxy/utils/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ TString ExtractServiceAccountIdWithConnection(const T& setting) {
case FederatedQuery::ConnectionSetting::kPostgresqlCluster: {
return GetServiceAccountId(setting.postgresql_cluster().auth());
}
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
return GetServiceAccountId(setting.greenplum_cluster().auth());
}
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
return GetServiceAccountId(setting.mysql_cluster().auth());
}
// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
break;
Expand Down
Loading
Loading