Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-2824: add basis for type inference in KQP metadata loader #4273

Merged
merged 5 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ydb/core/external_sources/external_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ struct TExternalDataSource : public IExternalSource {
ValidateHostname(HostnamePatterns, proto.GetLocation());
}

virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
EgorkaZ marked this conversation as resolved.
Show resolved Hide resolved
return NThreading::MakeFuture(std::move(meta));
}

virtual bool CanLoadDynamicMetadata() const override {
return false;
}

private:
const TString Name;
const TVector<TString> AuthMethods;
Expand Down
78 changes: 78 additions & 0 deletions ydb/core/external_sources/external_source.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,83 @@
#pragma once

#include <library/cpp/threading/future/core/future.h>
#include <util/generic/map.h>
#include <util/generic/string.h>

#include <ydb/core/protos/external_sources.pb.h>
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/yql/public/issue/yql_issue.h>

namespace NKikimr::NExternalSource {

struct TExternalSourceException: public yexception {
};

namespace NAuth {

struct TNone {
static constexpr std::string_view Method = "NONE";
};

struct TAws {
static constexpr std::string_view Method = "AWS";

TAws(const TString& accessKey, const TString& secretAccessKey, const TString& region)
: AccessKey{accessKey}
, SecretAccessKey{secretAccessKey}
, Region{region}
{}

TString AccessKey;
TString SecretAccessKey;
TString Region;
};

struct TServiceAccount {
static constexpr std::string_view Method = "SERVICE_ACCOUNT";

TServiceAccount(TString serviceAccountId, TString serviceAccountIdSignature)
: ServiceAccountId{std::move(serviceAccountId)}
, ServiceAccountIdSignature{std::move(serviceAccountIdSignature)}
{}

TString ServiceAccountId;
TString ServiceAccountIdSignature;
};

using TAuth = std::variant<TNone, TServiceAccount, TAws>;

std::string_view GetMethod(const TAuth& auth);

inline TAuth MakeNone() {
return TAuth{std::in_place_type_t<TNone>{}};
}

inline TAuth MakeServiceAccount(const TString& serviceAccountId, const TString& serviceAccountIdSignature) {
return TAuth{std::in_place_type_t<TServiceAccount>{}, serviceAccountId, serviceAccountIdSignature};
}

inline TAuth MakeAws(const TString& accessKey, const TString& secretAccessKey, const TString& region) {
return TAuth{std::in_place_type_t<TAws>{}, accessKey, secretAccessKey, region};
}
}

using TAuth = NAuth::TAuth;

struct TMetadata {
bool Changed = false;
TString TableLocation;
TString DataSourceLocation;
TString DataSourcePath;
TString Type;
EgorkaZ marked this conversation as resolved.
Show resolved Hide resolved

THashMap<TString, TString> Attributes;

TAuth Auth;

NKikimrExternalSources::TSchema Schema;
};

struct IExternalSource : public TThrRefBase {
using TPtr = TIntrusivePtr<IExternalSource>;

Expand Down Expand Up @@ -54,6 +121,17 @@ struct IExternalSource : public TThrRefBase {
If an error occurs, an exception is thrown.
*/
virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const = 0;

/*
Retrieve additional metadata from runtime data, enrich provided metadata
*/
virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) = 0;

/*
A method that should tell whether there is an implementation
of the previous method.
*/
virtual bool CanLoadDynamicMetadata() const = 0;
};

}
4 changes: 2 additions & 2 deletions ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ struct TExternalSourceFactory : public IExternalSourceFactory {

}

IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, size_t pathsLimit) {
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) {
std::vector<TRegExMatch> hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end());
return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
{
ToString(NYql::EDatabaseType::ObjectStorage),
CreateObjectStorageExternalSource(hostnamePatternsRegEx, pathsLimit)
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit)
},
{
ToString(NYql::EDatabaseType::ClickHouse),
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/external_sources/external_source_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ struct IExternalSourceFactory : public TThrRefBase {
virtual IExternalSource::TPtr GetOrCreate(const TString& type) const = 0;
};

IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, size_t pathsLimit = 50000);
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, NActors::TActorSystem* actorSystem = nullptr, size_t pathsLimit = 50000);

}
26 changes: 23 additions & 3 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
#include "object_storage.h"
#include "validation_functions.h"

#include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h>
EgorkaZ marked this conversation as resolved.
Show resolved Hide resolved
#include <ydb/core/protos/external_sources.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
Expand All @@ -20,9 +23,10 @@ namespace NKikimr::NExternalSource {
namespace {

struct TObjectStorageExternalSource : public IExternalSource {
explicit TObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit)
explicit TObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit)
: HostnamePatterns(hostnamePatterns)
, PathsLimit(pathsLimit)
, ActorSystem(actorSystem)
{}

virtual TString Pack(const NKikimrExternalSources::TSchema& schema,
Expand Down Expand Up @@ -255,6 +259,20 @@ struct TObjectStorageExternalSource : public IExternalSource {
return issues;
}

struct TMetadataResult : NYql::NCommon::TOperationResult {
std::shared_ptr<TMetadata> Metadata;
};

virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
Y_UNUSED(ActorSystem);
// TODO: implement
return NThreading::MakeFuture(std::move(meta));
}

virtual bool CanLoadDynamicMetadata() const override {
return false;
}

private:
static bool IsValidIntervalUnit(const TString& unit) {
static constexpr std::array<std::string_view, 7> IntervalUnits = {
Expand Down Expand Up @@ -474,12 +492,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
private:
const std::vector<TRegExMatch> HostnamePatterns;
const size_t PathsLimit;
NActors::TActorSystem* ActorSystem = nullptr;
};

}

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit) {
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, pathsLimit);

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) {
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit);
}

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/external_sources/object_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace NKikimr::NExternalSource {

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit);
IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit);

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit);

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/external_sources/object_storage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ namespace NKikimr {

Y_UNIT_TEST_SUITE(ObjectStorageTest) {
Y_UNIT_TEST(SuccessValidation) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, 1000);
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000);
NKikimrExternalSources::TSchema schema;
NKikimrExternalSources::TGeneral general;
UNIT_ASSERT_NO_EXCEPTION(source->Pack(schema, general));
}

Y_UNIT_TEST(FailedCreate) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, 1000);
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000);
NKikimrExternalSources::TSchema schema;
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->insert({"a", "b"});
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Unknown attribute a");
}

Y_UNIT_TEST(FailedValidation) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, 1000);
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000);
NKikimrExternalSources::TSchema schema;
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->insert({"projection.h", "b"});
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ PEERDIR(
ydb/core/io_formats/ydb_dump
ydb/core/kesus/tablet
ydb/core/kqp/common
ydb/core/kqp/session_actor
ydb/core/protos
ydb/core/scheme
ydb/core/sys_view
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ PEERDIR(
ydb/services/metadata/abstract
ydb/services/metadata/secret
ydb/core/kqp/gateway/actors
ydb/core/kqp/federated_query
ydb/core/kqp/gateway/utils
ydb/core/kqp/gateway/behaviour/tablestore/operations
)
Expand Down
Loading
Loading