Skip to content

Commit

Permalink
YDB FQ: Support MySQL as an external data source (ydb-platform#4951)
Browse files Browse the repository at this point in the history
  • Loading branch information
mkls6 authored and skywalker-jpg committed Jun 25, 2024
1 parent 48f5f33 commit b8f6256
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 1 deletion.
4 changes: 4 additions & 0 deletions ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
ToString(NYql::EDatabaseType::PostgreSQL),
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"MDB_BASIC", "BASIC"}, {"database_name", "protocol", "mdb_cluster_id", "use_tls", "schema"}, hostnamePatternsRegEx)
},
{
ToString(NYql::EDatabaseType::MySQL),
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"MDB_BASIC", "BASIC"}, {"database_name", "mdb_cluster_id", "use_tls"}, hostnamePatternsRegEx)
},
{
ToString(NYql::EDatabaseType::Ydb),
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC", "SERVICE_ACCOUNT"}, {"database_name", "use_tls", "database_id"}, hostnamePatternsRegEx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ enum class EDatabaseType {
ClickHouse,
DataStreams,
ObjectStorage,
PostgreSQL
PostgreSQL,
YT,
MySQL
};

inline EDatabaseType DatabaseTypeFromDataSourceKind(NConnector::NApi::EDataSourceKind dataSourceKind) {
Expand All @@ -24,6 +26,8 @@ inline EDatabaseType DatabaseTypeFromDataSourceKind(NConnector::NApi::EDataSourc
return EDatabaseType::ClickHouse;
case NConnector::NApi::EDataSourceKind::YDB:
return EDatabaseType::Ydb;
case NConnector::NApi::EDataSourceKind::MYSQL:
return EDatabaseType::MySQL;
default:
ythrow yexception() << "Unknown data source kind: " << NConnector::NApi::EDataSourceKind_Name(dataSourceKind);
}
Expand All @@ -37,6 +41,8 @@ inline NConnector::NApi::EDataSourceKind DatabaseTypeToDataSourceKind(EDatabaseT
return NConnector::NApi::EDataSourceKind::CLICKHOUSE;
case EDatabaseType::Ydb:
return NConnector::NApi::EDataSourceKind::YDB;
case EDatabaseType::MySQL:
return NConnector::NApi::EDataSourceKind::MYSQL;
default:
ythrow yexception() << "Unknown database type: " << ToString(databaseType);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#include "yql_generic_provider_factories.h"

#include "yql_generic_read_actor.h"
#include "yql_generic_lookup_actor.h"

#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>

namespace NYql::NDq {

void RegisterGenericProviderFactories(TDqAsyncIoFactory& factory,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NYql::NConnector::IClient::TPtr genericClient) {
auto readActorFactory = [credentialsFactory, genericClient](
Generic::TSource&& settings,
IDqAsyncIoFactory::TSourceArguments&& args) {
return CreateGenericReadActor(genericClient, std::move(settings), args.InputIndex, args.StatsLevel,
args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, args.HolderFactory);
};

auto lookupActorFactory = [credentialsFactory, genericClient](NYql::Generic::TLookupSource&& lookupSource, IDqAsyncIoFactory::TLookupSourceArguments&& args) {
return CreateGenericLookupActor(
genericClient,
credentialsFactory,
std::move(args.ParentId),
args.Alloc,
std::move(lookupSource),
args.KeyType,
args.PayloadType,
args.TypeEnv,
args.HolderFactory,
args.MaxKeysInRequest);
};

for (auto& name : {"ClickHouseGeneric", "PostgreSqlGeneric", "YdbGeneric", "MySqlGeneric"}) {
factory.RegisterSource<Generic::TSource>(name, readActorFactory);
factory.RegisterLookupSource<Generic::TLookupSource>(name, lookupActorFactory);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ namespace NYql {
return;
}

if (clusterConfig.GetKind() == EDataSourceKind::MYSQL) {
clusterConfig.SetProtocol(EProtocol::NATIVE);
return;
}

auto it = properties.find("protocol");
if (it == properties.cend()) {
ythrow yexception() << "missing 'PROTOCOL' value";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ namespace NYql {
case NYql::NConnector::NApi::POSTGRESQL:
sourceType = "PostgreSqlGeneric";
break;
case NYql::NConnector::NApi::MYSQL:
sourceType = "MySqlGeneric";
break;
case NYql::NConnector::NApi::YDB:
sourceType = "YdbGeneric";
break;
Expand Down Expand Up @@ -204,6 +207,9 @@ namespace NYql {
case NConnector::NApi::POSTGRESQL:
properties["SourceType"] = "PostgreSql";
break;
case NConnector::NApi::MYSQL:
properties["SourceType"] = "MySql";
break;
case NConnector::NApi::YDB:
properties["SourceType"] = "Ydb";
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ namespace NYql {
break;
case NYql::NConnector::NApi::YDB:
break;
case NYql::NConnector::NApi::MYSQL:
break;
case NYql::NConnector::NApi::POSTGRESQL: {
// for backward compability set schema "public" by default
// TODO: simplify during https://st.yandex-team.ru/YQ-2494
Expand Down

0 comments on commit b8f6256

Please sign in to comment.