Skip to content

Commit

Permalink
Merge dff52e8 into 0bce610
Browse files Browse the repository at this point in the history
  • Loading branch information
EgorkaZ authored May 29, 2024
2 parents 0bce610 + dff52e8 commit 9d18c74
Show file tree
Hide file tree
Showing 23 changed files with 785 additions and 50 deletions.
8 changes: 8 additions & 0 deletions ydb/core/external_sources/external_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ struct TExternalDataSource : public IExternalSource {
ValidateHostname(HostnamePatterns, proto.GetLocation());
}

virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
return NThreading::MakeFuture(std::move(meta));
}

virtual bool CanLoadDynamicMetadata() const override {
return false;
}

private:
const TString Name;
const TVector<TString> AuthMethods;
Expand Down
78 changes: 78 additions & 0 deletions ydb/core/external_sources/external_source.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,83 @@
#pragma once

#include <library/cpp/threading/future/core/future.h>
#include <util/generic/map.h>
#include <util/generic/string.h>

#include <ydb/core/protos/external_sources.pb.h>
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/yql/public/issue/yql_issue.h>

namespace NKikimr::NExternalSource {

struct TExternalSourceException: public yexception {
};

namespace NAuth {

struct TNone {
static constexpr std::string_view Method = "NONE";
};

struct TAws {
static constexpr std::string_view Method = "AWS";

TAws(const TString& accessKey, const TString& secretAccessKey, const TString& region)
: AccessKey{accessKey}
, SecretAccessKey{secretAccessKey}
, Region{region}
{}

TString AccessKey;
TString SecretAccessKey;
TString Region;
};

struct TServiceAccount {
static constexpr std::string_view Method = "SERVICE_ACCOUNT";

TServiceAccount(TString serviceAccountId, TString serviceAccountIdSignature)
: ServiceAccountId{std::move(serviceAccountId)}
, ServiceAccountIdSignature{std::move(serviceAccountIdSignature)}
{}

TString ServiceAccountId;
TString ServiceAccountIdSignature;
};

using TAuth = std::variant<TNone, TServiceAccount, TAws>;

std::string_view GetMethod(const TAuth& auth);

inline TAuth MakeNone() {
return TAuth{std::in_place_type_t<TNone>{}};
}

inline TAuth MakeServiceAccount(const TString& serviceAccountId, const TString& serviceAccountIdSignature) {
return TAuth{std::in_place_type_t<TServiceAccount>{}, serviceAccountId, serviceAccountIdSignature};
}

inline TAuth MakeAws(const TString& accessKey, const TString& secretAccessKey, const TString& region) {
return TAuth{std::in_place_type_t<TAws>{}, accessKey, secretAccessKey, region};
}
}

using TAuth = NAuth::TAuth;

struct TMetadata {
bool Changed = false;
TString TableLocation;
TString DataSourceLocation;
TString DataSourcePath;
TString Type;

THashMap<TString, TString> Attributes;

TAuth Auth;

NKikimrExternalSources::TSchema Schema;
};

struct IExternalSource : public TThrRefBase {
using TPtr = TIntrusivePtr<IExternalSource>;

Expand Down Expand Up @@ -54,6 +121,17 @@ struct IExternalSource : public TThrRefBase {
If an error occurs, an exception is thrown.
*/
virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const = 0;

/*
Retrieve additional metadata from runtime data, enrich provided metadata
*/
virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) = 0;

/*
A method that should tell whether there is an implementation
of the previous method.
*/
virtual bool CanLoadDynamicMetadata() const = 0;
};

}
4 changes: 2 additions & 2 deletions ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ struct TExternalSourceFactory : public IExternalSourceFactory {

}

IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, size_t pathsLimit) {
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) {
std::vector<TRegExMatch> hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end());
return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
{
ToString(NYql::EDatabaseType::ObjectStorage),
CreateObjectStorageExternalSource(hostnamePatternsRegEx, pathsLimit)
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit)
},
{
ToString(NYql::EDatabaseType::ClickHouse),
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/external_sources/external_source_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ struct IExternalSourceFactory : public TThrRefBase {
virtual IExternalSource::TPtr GetOrCreate(const TString& type) const = 0;
};

IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, size_t pathsLimit = 50000);
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, NActors::TActorSystem* actorSystem = nullptr, size_t pathsLimit = 50000);

}
26 changes: 23 additions & 3 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
#include "object_storage.h"
#include "validation_functions.h"

#include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h>
#include <ydb/core/protos/external_sources.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
Expand All @@ -20,9 +23,10 @@ namespace NKikimr::NExternalSource {
namespace {

struct TObjectStorageExternalSource : public IExternalSource {
explicit TObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit)
explicit TObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit)
: HostnamePatterns(hostnamePatterns)
, PathsLimit(pathsLimit)
, ActorSystem(actorSystem)
{}

virtual TString Pack(const NKikimrExternalSources::TSchema& schema,
Expand Down Expand Up @@ -255,6 +259,20 @@ struct TObjectStorageExternalSource : public IExternalSource {
return issues;
}

struct TMetadataResult : NYql::NCommon::TOperationResult {
std::shared_ptr<TMetadata> Metadata;
};

virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
Y_UNUSED(ActorSystem);
// TODO: implement
return NThreading::MakeFuture(std::move(meta));
}

virtual bool CanLoadDynamicMetadata() const override {
return false;
}

private:
static bool IsValidIntervalUnit(const TString& unit) {
static constexpr std::array<std::string_view, 7> IntervalUnits = {
Expand Down Expand Up @@ -474,12 +492,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
private:
const std::vector<TRegExMatch> HostnamePatterns;
const size_t PathsLimit;
NActors::TActorSystem* ActorSystem = nullptr;
};

}

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit) {
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, pathsLimit);

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) {
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit);
}

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/external_sources/object_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace NKikimr::NExternalSource {

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit);
IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit);

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit);

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/external_sources/object_storage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ namespace NKikimr {

Y_UNIT_TEST_SUITE(ObjectStorageTest) {
Y_UNIT_TEST(SuccessValidation) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, 1000);
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000);
NKikimrExternalSources::TSchema schema;
NKikimrExternalSources::TGeneral general;
UNIT_ASSERT_NO_EXCEPTION(source->Pack(schema, general));
}

Y_UNIT_TEST(FailedCreate) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, 1000);
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000);
NKikimrExternalSources::TSchema schema;
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->insert({"a", "b"});
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Unknown attribute a");
}

Y_UNIT_TEST(FailedValidation) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, 1000);
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000);
NKikimrExternalSources::TSchema schema;
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->insert({"projection.h", "b"});
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ PEERDIR(
ydb/core/io_formats/ydb_dump
ydb/core/kesus/tablet
ydb/core/kqp/common
ydb/core/kqp/session_actor
ydb/core/protos
ydb/core/scheme
ydb/core/sys_view
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ PEERDIR(
ydb/services/metadata/abstract
ydb/services/metadata/secret
ydb/core/kqp/gateway/actors
ydb/core/kqp/federated_query
ydb/core/kqp/gateway/utils
ydb/core/kqp/gateway/behaviour/tablestore/operations
)
Expand Down
Loading

0 comments on commit 9d18c74

Please sign in to comment.