-
Notifications
You must be signed in to change notification settings - Fork 606
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
36 changed files
with
249,810 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
#include "events.h" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
#pragma once | ||
|
||
#include <ydb/core/external_sources/hive_metastore/hive_metastore_native/gen-cpp/ThriftHiveMetastore.h> | ||
#include <ydb/library/actors/core/event_local.h> | ||
#include <ydb/library/actors/core/events.h> | ||
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h> | ||
#include <ydb/library/yql/public/issue/yql_issue.h> | ||
#include <ydb/public/api/protos/ydb_value.pb.h> | ||
|
||
#include <library/cpp/threading/future/core/future.h> | ||
|
||
namespace NKikimr::NExternalSource { | ||
|
||
struct TEvHiveMetastore { | ||
// Event ids. | ||
enum EEv : ui32 { | ||
EvGetTable = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), | ||
EvGetStatistics, | ||
EvGetPartitions, | ||
|
||
EvHiveGetTableResult, | ||
EvHiveGetTableStatisticsResult, | ||
EvHiveGetPartitionsResult, | ||
EvEnd | ||
}; | ||
|
||
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents:ES_PRIVATE)"); | ||
|
||
struct TTable { | ||
std::vector<Ydb::Column> Columns; | ||
TString Location; | ||
TString Format; | ||
TString Compression; | ||
std::vector<TString> PartitionedBy; | ||
}; | ||
|
||
struct TEvGetTable: NActors::TEventLocal<TEvGetTable, EvGetTable> { | ||
TString DatbaseName; | ||
TString TableName; | ||
NThreading::TPromise<TTable> Promise; | ||
}; | ||
|
||
struct TStatistics { | ||
TMaybe<int64_t> Rows; | ||
TMaybe<int64_t> Size; | ||
}; | ||
|
||
struct TEvGetStatistics: NActors::TEventLocal<TEvGetStatistics, EvGetStatistics> { | ||
TString DatbaseName; | ||
TString TableName; | ||
std::vector<std::string> Columns; | ||
NThreading::TPromise<TStatistics> Promise; | ||
}; | ||
|
||
struct TPartitions { | ||
struct TPartition { | ||
TString Location; | ||
std::vector<TString> Values; | ||
}; | ||
std::vector<TPartition> Partitions; | ||
}; | ||
|
||
struct TEvGetPartitions: NActors::TEventLocal<TEvGetPartitions, EvGetPartitions> { | ||
TString DatbaseName; | ||
TString TableName; | ||
NYql::NConnector::NApi::TPredicate Predicate; | ||
NThreading::TPromise<TPartitions> Promise; | ||
}; | ||
|
||
struct TEvHiveGetTableResult: NActors::TEventLocal<TEvHiveGetTableResult, EvHiveGetTableResult> { | ||
Apache::Hadoop::Hive::Table Table; | ||
NYql::TIssues Issues; | ||
}; | ||
|
||
struct TEvHiveGetTableStatisticsResult: NActors::TEventLocal<TEvHiveGetTableStatisticsResult, EvHiveGetTableStatisticsResult> { | ||
Apache::Hadoop::Hive::TableStatsResult Statistics; | ||
NYql::TIssues Issues; | ||
}; | ||
|
||
struct TEvHiveGetPartitionsResult: NActors::TEventLocal<TEvHiveGetPartitionsResult, EvHiveGetPartitionsResult> { | ||
std::vector<Apache::Hadoop::Hive::Partition> Partitions; | ||
NYql::TIssues Issues; | ||
}; | ||
}; | ||
|
||
} |
130 changes: 130 additions & 0 deletions
130
ydb/core/external_sources/hive_metastore/hive_metastore_client.cpp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
#include "hive_metastore_client.h" | ||
|
||
namespace NKikimr::NExternalSource { | ||
|
||
THiveMetastoreClient::THiveMetastoreClient(const TString& host, int32_t port) | ||
: Socket(new apache::thrift::transport::TSocket(host, port)) | ||
, Transport(new apache::thrift::transport::TBufferedTransport(Socket)) | ||
, Protocol(new apache::thrift::protocol::TBinaryProtocol(Transport)) | ||
, Client(new Apache::Hadoop::Hive::ThriftHiveMetastoreClient(Protocol)) { | ||
Transport->open(); | ||
// It's very important to keep here the only 1 thread | ||
ThreadPool.Start(1); | ||
} | ||
|
||
template<typename TResultValue> | ||
NThreading::TFuture<TResultValue> THiveMetastoreClient::RunOperation(const std::function<TResultValue()>& function) { | ||
NThreading::TPromise<TResultValue> promise = NThreading::NewPromise<TResultValue>(); | ||
Y_ABORT_UNLESS(ThreadPool.Add(MakeThrFuncObj([promise, function, transport=Transport]() mutable { | ||
try { | ||
if constexpr (std::is_void_v<TResultValue>) { | ||
function(); | ||
promise.SetValue(); | ||
} else { | ||
promise.SetValue(function()); | ||
} | ||
} catch (const apache::thrift::transport::TTransportException&) { | ||
transport->close(); | ||
transport->open(); | ||
promise.SetException(std::current_exception()); | ||
} catch (...) { | ||
promise.SetException(std::current_exception()); | ||
} | ||
}))); | ||
return promise.GetFuture(); | ||
} | ||
|
||
NThreading::TFuture<void> THiveMetastoreClient::CreateDatabase(const Apache::Hadoop::Hive::Database& database) { | ||
return RunOperation<void>([client=Client, database]() { | ||
client->create_database(database); | ||
}); | ||
} | ||
|
||
NThreading::TFuture<Apache::Hadoop::Hive::Database> THiveMetastoreClient::GetDatabase(const TString& name) { | ||
return RunOperation<Apache::Hadoop::Hive::Database>([client=Client, name]() { | ||
Apache::Hadoop::Hive::Database database; | ||
client->get_database(database, name); | ||
return database; | ||
}); | ||
} | ||
|
||
NThreading::TFuture<void> THiveMetastoreClient::CreateTable(const Apache::Hadoop::Hive::Table& table) { | ||
return RunOperation<void>([client=Client, table]() { | ||
client->create_table(table); | ||
}); | ||
} | ||
|
||
NThreading::TFuture<std::vector<std::string>> THiveMetastoreClient::GetAllDatabases() { | ||
return RunOperation<std::vector<std::string>>([client=Client]() { | ||
std::vector<std::string> databases; | ||
client->get_all_databases(databases); | ||
return databases; | ||
}); | ||
} | ||
|
||
NThreading::TFuture<Apache::Hadoop::Hive::Table> THiveMetastoreClient::GetTable(const TString& databaseName, const TString& tableName) { | ||
return RunOperation<Apache::Hadoop::Hive::Table>([client=Client, databaseName, tableName]() { | ||
Apache::Hadoop::Hive::Table table; | ||
client->get_table(table, databaseName, tableName); | ||
return table; | ||
}); | ||
} | ||
|
||
NThreading::TFuture<std::vector<std::string>> THiveMetastoreClient::GetAllTables(const TString& databaseName) { | ||
return RunOperation<std::vector<std::string>>([client=Client, databaseName]() { | ||
std::vector<std::string> tables; | ||
client->get_all_tables(tables, databaseName); | ||
return tables; | ||
}); | ||
} | ||
|
||
NThreading::TFuture<void> THiveMetastoreClient::UpdateTableColumnStatistics(const Apache::Hadoop::Hive::ColumnStatistics& columnStatistics) { | ||
return RunOperation<void>([client=Client, columnStatistics]() { | ||
client->update_table_column_statistics(columnStatistics); | ||
}); | ||
} | ||
|
||
NThreading::TFuture<Apache::Hadoop::Hive::TableStatsResult> THiveMetastoreClient::GetTableStatistics(const Apache::Hadoop::Hive::TableStatsRequest& request) { | ||
return RunOperation<Apache::Hadoop::Hive::TableStatsResult>([client=Client, request]() { | ||
Apache::Hadoop::Hive::TableStatsResult result; | ||
client->get_table_statistics_req(result, request); | ||
return result; | ||
}); | ||
} | ||
|
||
NThreading::TFuture<Apache::Hadoop::Hive::Partition> THiveMetastoreClient::AddPartition(const Apache::Hadoop::Hive::Partition& partition) { | ||
return RunOperation<Apache::Hadoop::Hive::Partition>([client=Client, partition]() { | ||
Apache::Hadoop::Hive::Partition result; | ||
client->add_partition(result, partition); | ||
return result; | ||
}); | ||
} | ||
|
||
NThreading::TFuture<void> THiveMetastoreClient::DropPartition(const TString& databaseName, const TString& tableName, const std::vector<std::string>& partitionValues, bool deleteData) { | ||
return RunOperation<void>([client=Client, databaseName, tableName, partitionValues, deleteData]() { | ||
client->drop_partition(databaseName, tableName, partitionValues, deleteData); | ||
}); | ||
} | ||
|
||
NThreading::TFuture<std::vector<Apache::Hadoop::Hive::Partition>> THiveMetastoreClient::GetPartitionsByFilter(const TString& databaseName, const TString& tableName, const TString& filter, int16_t maxPartitions) { | ||
return RunOperation<std::vector<Apache::Hadoop::Hive::Partition>>([client=Client, databaseName, tableName, filter, maxPartitions]() { | ||
std::vector<Apache::Hadoop::Hive::Partition> partitions; | ||
client->get_partitions_by_filter(partitions, databaseName, tableName, filter, maxPartitions); | ||
return partitions; | ||
}); | ||
} | ||
|
||
NThreading::TFuture<std::string> THiveMetastoreClient::GetConfigValue(const std::string& name, const std::string& defaultValue) { | ||
return RunOperation<std::string>([client=Client, name, defaultValue]() { | ||
std::string result; | ||
client->get_config_value(result, name, defaultValue); | ||
return result; | ||
}); | ||
} | ||
|
||
THiveMetastoreClient::~THiveMetastoreClient() { | ||
ThreadPool.Stop(); | ||
Transport->close(); | ||
} | ||
|
||
} |
49 changes: 49 additions & 0 deletions
49
ydb/core/external_sources/hive_metastore/hive_metastore_client.h
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
#include <ydb/core/external_sources/hive_metastore/hive_metastore_native/gen-cpp/ThriftHiveMetastore.h> | ||
|
||
#include <contrib/restricted/thrift/thrift/protocol/TBinaryProtocol.h> | ||
#include <contrib/restricted/thrift/thrift/transport/TSocket.h> | ||
#include <contrib/restricted/thrift/thrift/transport/TTransportUtils.h> | ||
|
||
#include <library/cpp/threading/future/core/future.h> | ||
|
||
#include <util/generic/string.h> | ||
#include <util/thread/pool.h> | ||
|
||
namespace NKikimr::NExternalSource { | ||
|
||
struct THiveMetastoreClient : public TThrRefBase { | ||
public: | ||
THiveMetastoreClient(const TString& host, int32_t port); | ||
|
||
NThreading::TFuture<void> CreateDatabase(const Apache::Hadoop::Hive::Database& database); | ||
NThreading::TFuture<Apache::Hadoop::Hive::Database> GetDatabase(const TString& name); | ||
NThreading::TFuture<std::vector<std::string>> GetAllDatabases(); | ||
|
||
NThreading::TFuture<void> CreateTable(const Apache::Hadoop::Hive::Table& table); | ||
NThreading::TFuture<Apache::Hadoop::Hive::Table> GetTable(const TString& databaseName, const TString& tableName); | ||
NThreading::TFuture<std::vector<std::string>> GetAllTables(const TString& databaseName); | ||
|
||
NThreading::TFuture<void> UpdateTableColumnStatistics(const Apache::Hadoop::Hive::ColumnStatistics& columnStatistics); | ||
NThreading::TFuture<Apache::Hadoop::Hive::TableStatsResult> GetTableStatistics(const Apache::Hadoop::Hive::TableStatsRequest& request); | ||
|
||
NThreading::TFuture<Apache::Hadoop::Hive::Partition> AddPartition(const Apache::Hadoop::Hive::Partition& partition); | ||
NThreading::TFuture<void> DropPartition(const TString& databaseName, const TString& tableName, const std::vector<std::string>& partitionValues, bool deleteData = false); | ||
NThreading::TFuture<std::vector<Apache::Hadoop::Hive::Partition>> GetPartitionsByFilter(const TString& databaseName, const TString& tableName, const TString& filter, int16_t maxPartitions = -1); | ||
|
||
NThreading::TFuture<std::string> GetConfigValue(const std::string& name, const std::string& defaultValue = {}); | ||
|
||
~THiveMetastoreClient(); | ||
|
||
private: | ||
template<typename TResultValue> | ||
NThreading::TFuture<TResultValue> RunOperation(const std::function<TResultValue()>& function); | ||
|
||
private: | ||
std::shared_ptr<apache::thrift::protocol::TTransport> Socket; | ||
std::shared_ptr<apache::thrift::protocol::TTransport> Transport; | ||
std::shared_ptr<apache::thrift::protocol::TProtocol> Protocol; | ||
std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> Client; | ||
TThreadPool ThreadPool; | ||
}; | ||
|
||
} |
Oops, something went wrong.