Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…orm#6298 ydb-platform#6471 ydb-platform#6484 ydb-platform#6703 (ydb-platform#6721)

Co-authored-by: Timur Sufiyanov <fa-luke16@mail.ru>
  • Loading branch information
2 people authored and uzhastik committed Sep 11, 2024
1 parent ed2488b commit 5137d7f
Show file tree
Hide file tree
Showing 14 changed files with 306 additions and 13 deletions.
8 changes: 8 additions & 0 deletions ydb/core/external_sources/external_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ struct TExternalDataSource : public IExternalSource {
ythrow TExternalSourceException() << "Only external table supports parameters";
}

bool IsRDBMSDataSource(const TProtoStringType& sourceType) const {
return IsIn({"Greenplum", "PostgreSQL", "MySQL", "MsSQLServer", "Clickhouse"}, sourceType);
}

virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const override {
NKikimrSchemeOp::TExternalDataSourceDescription proto;
if (!proto.ParseFromString(externalDataSourceDescription)) {
Expand All @@ -49,6 +53,10 @@ struct TExternalDataSource : public IExternalSource {
ythrow TExternalSourceException() << "Unsupported property: " << key;
}

if (IsRDBMSDataSource(proto.GetSourceType()) && !proto.GetProperties().GetProperties().contains("database_name")){
ythrow TExternalSourceException() << proto.GetSourceType() << " source must provide database_name";
}

ValidateHostname(HostnamePatterns, proto.GetLocation());
}

Expand Down
24 changes: 24 additions & 0 deletions ydb/core/fq/libs/actors/clusters_from_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,30 @@ void AddClustersFromConnections(
clusters.emplace(connectionName, GenericProviderName);
break;
}
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
FillGenericClusterConfig(
common,
*gatewaysConfig.MutableGeneric()->AddClusterMapping(),
conn.content().setting().greenplum_cluster(),
connectionName,
NYql::NConnector::NApi::EDataSourceKind::GREENPLUM,
authToken,
accountIdSignatures);
clusters.emplace(connectionName, GenericProviderName);
break;
}
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
FillGenericClusterConfig(
common,
*gatewaysConfig.MutableGeneric()->AddClusterMapping(),
conn.content().setting().mysql_cluster(),
connectionName,
NYql::NConnector::NApi::EDataSourceKind::MYSQL,
authToken,
accountIdSignatures);
clusters.emplace(connectionName, GenericProviderName);
break;
}

// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
Expand Down
56 changes: 53 additions & 3 deletions ydb/core/fq/libs/actors/database_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
// There are two kinds of managed YDBs: serverless and dedicated.
// While working with dedicated databases, we have to use underlay network.
// That's why we add `u-` prefix to database fqdn.
if (databaseInfo.GetMap().contains("dedicatedDatabase")) {
if (databaseInfo.GetMap().contains("storageConfig")) {
endpoint = "u-" + endpoint;
host = "u-" + host;
}
Expand All @@ -335,7 +335,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
{
auto ret = ydbParser(databaseInfo, mdbEndpointGenerator, useTls, protocol);
// TODO: Take explicit field from MVP
bool isDedicatedDb = databaseInfo.GetMap().contains("dedicatedDatabase");
bool isDedicatedDb = databaseInfo.GetMap().contains("storageConfig");
if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) {
// Replace "ydb." -> "yds."
ret.Endpoint[2] = 's';
Expand Down Expand Up @@ -457,6 +457,56 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>

endpoint = mdbEndpointGenerator->ToEndpoint(params);

return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
};
Parsers[NYql::EDatabaseType::MySQL] = [](
NJson::TJsonValue& databaseInfo,
const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator,
bool useTls,
NConnector::NApi::EProtocol protocol
) {
NYql::IMdbEndpointGenerator::TEndpoint endpoint;
TVector<TString> aliveHosts;

const auto& hostsArray = databaseInfo.GetMap().at("hosts").GetArraySafe();

for (const auto& host : hostsArray) {
const auto& hostMap = host.GetMap();

if (!hostMap.contains("services")) {
// indicates that cluster is down
continue;
}

const auto& servicesArray = hostMap.at("services").GetArraySafe();

// check if all services of a particular host are alive
const bool alive = std::all_of(
servicesArray.begin(),
servicesArray.end(),
[](const auto& service) {
return service["health"].GetString() == "ALIVE";
}
);

if (alive) {
aliveHosts.push_back(host["name"].GetString());
}
}

if (aliveHosts.empty()) {
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE MySQL hosts found";
}

NYql::IMdbEndpointGenerator::TParams params = {
.DatabaseType = NYql::EDatabaseType::MySQL,
.MdbHost = aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())],
.UseTls = useTls,
.Protocol = protocol,
};

endpoint = mdbEndpointGenerator->ToEndpoint(params);

return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
};
}
Expand Down Expand Up @@ -538,7 +588,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
.AddUrlParam("databaseId", databaseId)
.Build();
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL}, databaseType)) {
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL, NYql::EDatabaseType::MySQL}, databaseType)) {
YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway");
url = TUrlBuilder(
ev->Get()->MdbGateway + "/managed-" + NYql::DatabaseTypeLowercase(databaseType) + "/v1/clusters/")
Expand Down
81 changes: 77 additions & 4 deletions ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
R"(
{
"endpoint":"grpcs://lb.etnbrtlini51k7cinbdr.ydb.mdb.yandexcloud.net:2135/?database=/ru-central1/b1gtl2kg13him37quoo6/etn021us5r9rhld1vgbh",
"dedicatedDatabase":{"resuorcePresetId": "medium"}
"storageConfig":{"storageSizeLimit":107374182400}
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"u-lb.etnbrtlini51k7cinbdr.ydb.mdb.yandexcloud.net:2135"},
Expand Down Expand Up @@ -285,7 +285,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
R"(
{
"endpoint":"grpcs://lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135/?database=/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh",
"dedicatedDatabase":{"resourcePresetId": "medium"}
"storageConfig":{"storageSizeLimit":107374182400}
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135"},
Expand Down Expand Up @@ -473,6 +473,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
issues
);
}

Y_UNIT_TEST(Greenplum_MasterNode) {
Test(
NYql::EDatabaseType::Greenplum,
Expand Down Expand Up @@ -504,7 +505,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
TString(""),
true},
{});
}
}

Y_UNIT_TEST(Greenplum_PermissionDenied) {
NYql::TIssues issues{
Expand Down Expand Up @@ -535,7 +536,79 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
)",
NYql::TDatabaseResolverResponse::TDatabaseDescription{},
issues);
}
}

Y_UNIT_TEST(MySQL) {
Test(
NYql::EDatabaseType::MySQL,
NYql::NConnector::NApi::EProtocol::NATIVE,
"https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"200",
R"({
"hosts": [
{
"services": [
{
"type": "POOLER",
"health": "ALIVE"
},
{
"type": "MYSQL",
"health": "ALIVE"
}
],
"name": "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net",
"clusterId": "c9qb2bjghs8onbncpamk",
"zoneId": "ru-central1-b",
"role": "MASTER",
"health": "ALIVE"
}
]
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{""},
TString{"rc1b-eyt6dtobu96rwydq.db.yandex.net"},
3306,
TString(""),
true
},
{});
}

Y_UNIT_TEST(MySQL_PermissionDenied) {
NYql::TIssues issues{
NYql::TIssue(
TStringBuilder{} << MakeErrorPrefix(
"mdb.api.cloud.yandex.net:443",
"/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"etn021us5r9rhld1vgbh",
NYql::EDatabaseType::MySQL
) << NoPermissionStr
)
};

Test(
NYql::EDatabaseType::MySQL,
NYql::NConnector::NApi::EProtocol::NATIVE,
"https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
"403",
R"(
{
"code": 7,
"message": "Permission denied",
"details": [
{
"@type": "type.googleapis.com/google.rpc.RequestInfo",
"requestId": "a943c092-d596-4e0e-ae7b-1f67f9d8164e"
}
]
}
)",
NYql::TDatabaseResolverResponse::TDatabaseDescription{},
issues
);
}


Y_UNIT_TEST(DataStreams_PermissionDenied) {
NYql::TIssues issues{
Expand Down
22 changes: 22 additions & 0 deletions ydb/core/fq/libs/common/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ TString ExtractServiceAccountId(const FederatedQuery::ConnectionSetting& setting
case FederatedQuery::ConnectionSetting::kPostgresqlCluster: {
return GetServiceAccountId(setting.postgresql_cluster().auth());
}
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
return GetServiceAccountId(setting.greenplum_cluster().auth());
}
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
return GetServiceAccountId(setting.mysql_cluster().auth());
}
// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
break;
Expand Down Expand Up @@ -157,6 +163,10 @@ TMaybe<TString> GetLogin(const FederatedQuery::ConnectionSetting& setting) {
return {};
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
return setting.postgresql_cluster().login();
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return setting.greenplum_cluster().login();
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return setting.mysql_cluster().login();
}
}

Expand All @@ -176,6 +186,10 @@ TMaybe<TString> GetPassword(const FederatedQuery::ConnectionSetting& setting) {
return {};
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
return setting.postgresql_cluster().password();
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return setting.greenplum_cluster().password();
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return setting.mysql_cluster().password();
}
}

Expand All @@ -195,6 +209,10 @@ EYdbComputeAuth GetYdbComputeAuthMethod(const FederatedQuery::ConnectionSetting&
return GetIamAuthMethod(setting.monitoring().auth());
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
return GetBasicAuthMethod(setting.postgresql_cluster().auth());
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return GetBasicAuthMethod(setting.greenplum_cluster().auth());
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return GetBasicAuthMethod(setting.mysql_cluster().auth());
}
}

Expand All @@ -212,6 +230,10 @@ FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection) {
return connection.content().setting().monitoring().auth();
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
return connection.content().setting().postgresql_cluster().auth();
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return connection.content().setting().greenplum_cluster().auth();
case FederatedQuery::ConnectionSetting::kMysqlCluster:
return connection.content().setting().mysql_cluster().auth();
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
return FederatedQuery::IamAuth{};
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/compute/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class TComputeConfig {
case FederatedQuery::ConnectionSetting::kObjectStorage:
case FederatedQuery::ConnectionSetting::kClickhouseCluster:
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
case FederatedQuery::ConnectionSetting::kMysqlCluster:
case FederatedQuery::ConnectionSetting::kYdbDatabase:
return true;
case FederatedQuery::ConnectionSetting::kDataStreams:
Expand Down
37 changes: 34 additions & 3 deletions ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ TString MakeCreateExternalDataSourceQuery(
}
case FederatedQuery::ConnectionSetting::kMonitoring:
break;
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
const auto schema = connectionContent.setting().postgresql_cluster().schema();
case FederatedQuery::ConnectionSetting::kPostgresqlCluster: {
const auto pgschema = connectionContent.setting().postgresql_cluster().schema();
properties = fmt::format(
R"(
SOURCE_TYPE="PostgreSQL",
Expand All @@ -228,7 +228,38 @@ TString MakeCreateExternalDataSourceQuery(
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().postgresql_cluster().database_id(), '"'),
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().postgresql_cluster().database_name(), '"'),
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true",
"schema"_a = schema ? ", SCHEMA=" + EncloseAndEscapeString(schema, '"') : TString{});
"schema"_a = pgschema ? ", SCHEMA=" + EncloseAndEscapeString(pgschema, '"') : TString{});
}
break;
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
const auto gpschema = connectionContent.setting().greenplum_cluster().schema();
properties = fmt::format(
R"(
SOURCE_TYPE="Greenplum",
MDB_CLUSTER_ID={mdb_cluster_id},
DATABASE_NAME={database_name},
USE_TLS="{use_tls}"
{schema}
)",
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().greenplum_cluster().database_id(), '"'),
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().greenplum_cluster().database_name(), '"'),
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true",
"schema"_a = gpschema ? ", SCHEMA=" + EncloseAndEscapeString(gpschema, '"') : TString{});

}
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
properties = fmt::format(
R"(
SOURCE_TYPE="MySQL",
MDB_CLUSTER_ID={mdb_cluster_id},
DATABASE_NAME={database_name},
USE_TLS="{use_tls}"
)",
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().mysql_cluster().database_id(), '"'),
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().mysql_cluster().database_name(), '"'),
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true");

}
break;
}

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/fq/libs/control_plane_proxy/utils/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ TString ExtractServiceAccountIdWithConnection(const T& setting) {
case FederatedQuery::ConnectionSetting::kPostgresqlCluster: {
return GetServiceAccountId(setting.postgresql_cluster().auth());
}
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
return GetServiceAccountId(setting.greenplum_cluster().auth());
}
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
return GetServiceAccountId(setting.mysql_cluster().auth());
}
// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
break;
Expand Down
Loading

0 comments on commit 5137d7f

Please sign in to comment.