Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hive metastore client has been added #4593

Merged
merged 9 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/external_sources/hive_metastore/events.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "events.h"
86 changes: 86 additions & 0 deletions ydb/core/external_sources/hive_metastore/events.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#pragma once

#include <ydb/core/external_sources/hive_metastore/hive_metastore_native/gen-cpp/ThriftHiveMetastore.h>
#include <ydb/library/actors/core/event_local.h>
#include <ydb/library/actors/core/events.h>
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <ydb/public/api/protos/ydb_value.pb.h>

#include <library/cpp/threading/future/core/future.h>

namespace NKikimr::NExternalSource {

struct TEvHiveMetastore {
// Event ids.
enum EEv : ui32 {
EvGetTable = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvGetStatistics,
EvGetPartitions,

EvHiveGetTableResult,
EvHiveGetTableStatisticsResult,
EvHiveGetPartitionsResult,
EvEnd
};

static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents:ES_PRIVATE)");

struct TTable {
std::vector<Ydb::Column> Columns;
TString Location;
TString Format;
TString Compression;
std::vector<TString> PartitionedBy;
};

struct TEvGetTable: NActors::TEventLocal<TEvGetTable, EvGetTable> {
TString DatbaseName;
TString TableName;
NThreading::TPromise<TTable> Promise;
};

struct TStatistics {
TMaybe<int64_t> Rows;
TMaybe<int64_t> Size;
};

struct TEvGetStatistics: NActors::TEventLocal<TEvGetStatistics, EvGetStatistics> {
TString DatbaseName;
TString TableName;
std::vector<std::string> Columns;
NThreading::TPromise<TStatistics> Promise;
};

struct TPartitions {
struct TPartition {
TString Location;
std::vector<TString> Values;
};
std::vector<TPartition> Partitions;
};

struct TEvGetPartitions: NActors::TEventLocal<TEvGetPartitions, EvGetPartitions> {
TString DatbaseName;
TString TableName;
NYql::NConnector::NApi::TPredicate Predicate;
NThreading::TPromise<TPartitions> Promise;
};

struct TEvHiveGetTableResult: NActors::TEventLocal<TEvHiveGetTableResult, EvHiveGetTableResult> {
Apache::Hadoop::Hive::Table Table;
NYql::TIssues Issues;
};

struct TEvHiveGetTableStatisticsResult: NActors::TEventLocal<TEvHiveGetTableStatisticsResult, EvHiveGetTableStatisticsResult> {
Apache::Hadoop::Hive::TableStatsResult Statistics;
NYql::TIssues Issues;
};

struct TEvHiveGetPartitionsResult: NActors::TEventLocal<TEvHiveGetPartitionsResult, EvHiveGetPartitionsResult> {
std::vector<Apache::Hadoop::Hive::Partition> Partitions;
NYql::TIssues Issues;
};
};

}
130 changes: 130 additions & 0 deletions ydb/core/external_sources/hive_metastore/hive_metastore_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#include "hive_metastore_client.h"

namespace NKikimr::NExternalSource {

THiveMetastoreClient::THiveMetastoreClient(const TString& host, int32_t port)
: Socket(new apache::thrift::transport::TSocket(host, port))
, Transport(new apache::thrift::transport::TBufferedTransport(Socket))
, Protocol(new apache::thrift::protocol::TBinaryProtocol(Transport))
, Client(new Apache::Hadoop::Hive::ThriftHiveMetastoreClient(Protocol)) {
Transport->open();
// It's very important to keep here the only 1 thread
ThreadPool.Start(1);
}

template<typename TResultValue>
NThreading::TFuture<TResultValue> THiveMetastoreClient::RunOperation(const std::function<TResultValue()>& function) {
NThreading::TPromise<TResultValue> promise = NThreading::NewPromise<TResultValue>();
Y_ABORT_UNLESS(ThreadPool.Add(MakeThrFuncObj([promise, function, transport=Transport]() mutable {
try {
if constexpr (std::is_void_v<TResultValue>) {
function();
promise.SetValue();
} else {
promise.SetValue(function());
}
} catch (const apache::thrift::transport::TTransportException&) {
transport->close();
transport->open();
promise.SetException(std::current_exception());
} catch (...) {
promise.SetException(std::current_exception());
uzhastik marked this conversation as resolved.
Show resolved Hide resolved
}
})));
return promise.GetFuture();
}

NThreading::TFuture<void> THiveMetastoreClient::CreateDatabase(const Apache::Hadoop::Hive::Database& database) {
return RunOperation<void>([client=Client, database]() {
client->create_database(database);
});
}

NThreading::TFuture<Apache::Hadoop::Hive::Database> THiveMetastoreClient::GetDatabase(const TString& name) {
return RunOperation<Apache::Hadoop::Hive::Database>([client=Client, name]() {
Apache::Hadoop::Hive::Database database;
client->get_database(database, name);
return database;
});
}

NThreading::TFuture<void> THiveMetastoreClient::CreateTable(const Apache::Hadoop::Hive::Table& table) {
return RunOperation<void>([client=Client, table]() {
client->create_table(table);
});
}

NThreading::TFuture<std::vector<std::string>> THiveMetastoreClient::GetAllDatabases() {
return RunOperation<std::vector<std::string>>([client=Client]() {
std::vector<std::string> databases;
client->get_all_databases(databases);
return databases;
});
}

NThreading::TFuture<Apache::Hadoop::Hive::Table> THiveMetastoreClient::GetTable(const TString& databaseName, const TString& tableName) {
return RunOperation<Apache::Hadoop::Hive::Table>([client=Client, databaseName, tableName]() {
Apache::Hadoop::Hive::Table table;
client->get_table(table, databaseName, tableName);
return table;
});
}

NThreading::TFuture<std::vector<std::string>> THiveMetastoreClient::GetAllTables(const TString& databaseName) {
return RunOperation<std::vector<std::string>>([client=Client, databaseName]() {
std::vector<std::string> tables;
client->get_all_tables(tables, databaseName);
return tables;
});
}

NThreading::TFuture<void> THiveMetastoreClient::UpdateTableColumnStatistics(const Apache::Hadoop::Hive::ColumnStatistics& columnStatistics) {
return RunOperation<void>([client=Client, columnStatistics]() {
client->update_table_column_statistics(columnStatistics);
});
}

NThreading::TFuture<Apache::Hadoop::Hive::TableStatsResult> THiveMetastoreClient::GetTableStatistics(const Apache::Hadoop::Hive::TableStatsRequest& request) {
return RunOperation<Apache::Hadoop::Hive::TableStatsResult>([client=Client, request]() {
Apache::Hadoop::Hive::TableStatsResult result;
client->get_table_statistics_req(result, request);
return result;
});
}

NThreading::TFuture<Apache::Hadoop::Hive::Partition> THiveMetastoreClient::AddPartition(const Apache::Hadoop::Hive::Partition& partition) {
return RunOperation<Apache::Hadoop::Hive::Partition>([client=Client, partition]() {
Apache::Hadoop::Hive::Partition result;
client->add_partition(result, partition);
return result;
});
}

NThreading::TFuture<void> THiveMetastoreClient::DropPartition(const TString& databaseName, const TString& tableName, const std::vector<std::string>& partitionValues, bool deleteData) {
return RunOperation<void>([client=Client, databaseName, tableName, partitionValues, deleteData]() {
client->drop_partition(databaseName, tableName, partitionValues, deleteData);
});
}

NThreading::TFuture<std::vector<Apache::Hadoop::Hive::Partition>> THiveMetastoreClient::GetPartitionsByFilter(const TString& databaseName, const TString& tableName, const TString& filter, int16_t maxPartitions) {
return RunOperation<std::vector<Apache::Hadoop::Hive::Partition>>([client=Client, databaseName, tableName, filter, maxPartitions]() {
std::vector<Apache::Hadoop::Hive::Partition> partitions;
client->get_partitions_by_filter(partitions, databaseName, tableName, filter, maxPartitions);
return partitions;
});
}

NThreading::TFuture<std::string> THiveMetastoreClient::GetConfigValue(const std::string& name, const std::string& defaultValue) {
return RunOperation<std::string>([client=Client, name, defaultValue]() {
std::string result;
client->get_config_value(result, name, defaultValue);
return result;
});
}

THiveMetastoreClient::~THiveMetastoreClient() {
ThreadPool.Stop();
Transport->close();
}

}
49 changes: 49 additions & 0 deletions ydb/core/external_sources/hive_metastore/hive_metastore_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include <ydb/core/external_sources/hive_metastore/hive_metastore_native/gen-cpp/ThriftHiveMetastore.h>

#include <contrib/restricted/thrift/thrift/protocol/TBinaryProtocol.h>
uzhastik marked this conversation as resolved.
Show resolved Hide resolved
#include <contrib/restricted/thrift/thrift/transport/TSocket.h>
#include <contrib/restricted/thrift/thrift/transport/TTransportUtils.h>

#include <library/cpp/threading/future/core/future.h>

#include <util/generic/string.h>
#include <util/thread/pool.h>

namespace NKikimr::NExternalSource {

struct THiveMetastoreClient : public TThrRefBase {
public:
THiveMetastoreClient(const TString& host, int32_t port);

NThreading::TFuture<void> CreateDatabase(const Apache::Hadoop::Hive::Database& database);
NThreading::TFuture<Apache::Hadoop::Hive::Database> GetDatabase(const TString& name);
NThreading::TFuture<std::vector<std::string>> GetAllDatabases();

NThreading::TFuture<void> CreateTable(const Apache::Hadoop::Hive::Table& table);
NThreading::TFuture<Apache::Hadoop::Hive::Table> GetTable(const TString& databaseName, const TString& tableName);
NThreading::TFuture<std::vector<std::string>> GetAllTables(const TString& databaseName);

NThreading::TFuture<void> UpdateTableColumnStatistics(const Apache::Hadoop::Hive::ColumnStatistics& columnStatistics);
NThreading::TFuture<Apache::Hadoop::Hive::TableStatsResult> GetTableStatistics(const Apache::Hadoop::Hive::TableStatsRequest& request);

NThreading::TFuture<Apache::Hadoop::Hive::Partition> AddPartition(const Apache::Hadoop::Hive::Partition& partition);
NThreading::TFuture<void> DropPartition(const TString& databaseName, const TString& tableName, const std::vector<std::string>& partitionValues, bool deleteData = false);
NThreading::TFuture<std::vector<Apache::Hadoop::Hive::Partition>> GetPartitionsByFilter(const TString& databaseName, const TString& tableName, const TString& filter, int16_t maxPartitions = -1);

NThreading::TFuture<std::string> GetConfigValue(const std::string& name, const std::string& defaultValue = {});

~THiveMetastoreClient();

private:
template<typename TResultValue>
NThreading::TFuture<TResultValue> RunOperation(const std::function<TResultValue()>& function);

private:
std::shared_ptr<apache::thrift::protocol::TTransport> Socket;
std::shared_ptr<apache::thrift::protocol::TTransport> Transport;
std::shared_ptr<apache::thrift::protocol::TProtocol> Protocol;
std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> Client;
TThreadPool ThreadPool;
};

}
Loading
Loading