Skip to content

Commit

Permalink
metastore fetcher has been added
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg committed May 20, 2024
1 parent 66d99f4 commit cf54814
Show file tree
Hide file tree
Showing 19 changed files with 1,236 additions and 65 deletions.
1 change: 1 addition & 0 deletions ydb/core/external_sources/hive_metastore/events.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "events.h"
86 changes: 86 additions & 0 deletions ydb/core/external_sources/hive_metastore/events.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#pragma once

#include <ydb/core/external_sources/hive_metastore/hive_metastore_native/gen-cpp/ThriftHiveMetastore.h>
#include <ydb/library/actors/core/event_local.h>
#include <ydb/library/actors/core/events.h>
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <ydb/public/api/protos/ydb_value.pb.h>

#include <library/cpp/threading/future/core/future.h>

namespace NKikimr::NExternalSource {

struct TEvHiveMetastore {
// Event ids.
enum EEv : ui32 {
EvGetTable = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvGetStatistics,
EvGetPartitions,

EvHiveGetTableResult,
EvHiveGetTableStatisticsResult,
EvHiveGetPartitionsResult,
EvEnd
};

static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents:ES_PRIVATE)");

struct TTable {
std::vector<Ydb::Column> Columns;
TString Location;
TString Format;
TString Compression;
std::vector<TString> PartitionedBy;
};

struct TEvGetTable: NActors::TEventLocal<TEvGetTable, EvGetTable> {
TString DatbaseName;
TString TableName;
NThreading::TPromise<TTable> Promise;
};

struct TStatistics {
TMaybe<int64_t> Rows;
TMaybe<int64_t> Size;
};

struct TEvGetStatistics: NActors::TEventLocal<TEvGetStatistics, EvGetStatistics> {
TString DatbaseName;
TString TableName;
std::vector<std::string> Columns;
NThreading::TPromise<TStatistics> Promise;
};

struct TPartitions {
struct TPartition {
TString Location;
std::vector<TString> Values;
};
std::vector<TPartition> Partitions;
};

struct TEvGetPartitions: NActors::TEventLocal<TEvGetPartitions, EvGetPartitions> {
TString DatbaseName;
TString TableName;
NYql::NConnector::NApi::TPredicate Predicate;
NThreading::TPromise<TPartitions> Promise;
};

struct TEvHiveGetTableResult: NActors::TEventLocal<TEvHiveGetTableResult, EvHiveGetTableResult> {
Apache::Hadoop::Hive::Table Table;
NYql::TIssues Issues;
};

struct TEvHiveGetTableStatisticsResult: NActors::TEventLocal<TEvHiveGetTableStatisticsResult, EvHiveGetTableStatisticsResult> {
Apache::Hadoop::Hive::TableStatsResult Statistics;
NYql::TIssues Issues;
};

struct TEvHiveGetPartitionsResult: NActors::TEventLocal<TEvHiveGetPartitionsResult, EvHiveGetPartitionsResult> {
std::vector<Apache::Hadoop::Hive::Partition> Partitions;
NYql::TIssues Issues;
};
};

}
41 changes: 19 additions & 22 deletions ydb/core/external_sources/hive_metastore/hive_metastore_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,6 @@

namespace NKikimr::NExternalSource {

namespace {

template<typename TFunc>
class TFuncTask : public IObjectInQueue {
private:
TFunc Func;

public:
TFuncTask(TFunc&& func)
: Func(std::move(func)) {
}

void Process(void*) override {
Func();
}
};

}

THiveMetastoreClient::THiveMetastoreClient(const TString& host, int32_t port)
: Socket(new apache::thrift::transport::TSocket(host, port))
, Transport(new apache::thrift::transport::TBufferedTransport(Socket))
Expand All @@ -34,9 +15,9 @@ THiveMetastoreClient::THiveMetastoreClient(const TString& host, int32_t port)
template<typename TResultValue>
NThreading::TFuture<TResultValue> THiveMetastoreClient::RunOperation(const std::function<TResultValue()>& function) {
NThreading::TPromise<TResultValue> promise = NThreading::NewPromise<TResultValue>();
Y_ABORT_UNLESS(ThreadPool.AddAndOwn(THolder(new TFuncTask([promise, function, transport=Transport]() mutable {
Y_ABORT_UNLESS(ThreadPool.Add(MakeThrFuncObj([promise, function, transport=Transport]() mutable {
try {
if constexpr (std::is_same<TResultValue, void>::value) {
if constexpr (std::is_void_v<TResultValue>) {
function();
promise.SetValue();
} else {
Expand All @@ -49,7 +30,7 @@ NThreading::TFuture<TResultValue> THiveMetastoreClient::RunOperation(const std::
} catch (...) {
promise.SetException(std::current_exception());
}
}))));
})));
return promise.GetFuture();
}

Expand All @@ -73,6 +54,14 @@ NThreading::TFuture<void> THiveMetastoreClient::CreateTable(const Apache::Hadoop
});
}

NThreading::TFuture<std::vector<std::string>> THiveMetastoreClient::GetAllDatabases() {
return RunOperation<std::vector<std::string>>([client=Client]() {
std::vector<std::string> databases;
client->get_all_databases(databases);
return databases;
});
}

NThreading::TFuture<Apache::Hadoop::Hive::Table> THiveMetastoreClient::GetTable(const TString& databaseName, const TString& tableName) {
return RunOperation<Apache::Hadoop::Hive::Table>([client=Client, databaseName, tableName]() {
Apache::Hadoop::Hive::Table table;
Expand All @@ -81,6 +70,14 @@ NThreading::TFuture<Apache::Hadoop::Hive::Table> THiveMetastoreClient::GetTable(
});
}

NThreading::TFuture<std::vector<std::string>> THiveMetastoreClient::GetAllTables(const TString& databaseName) {
return RunOperation<std::vector<std::string>>([client=Client, databaseName]() {
std::vector<std::string> tables;
client->get_all_tables(tables, databaseName);
return tables;
});
}

NThreading::TFuture<void> THiveMetastoreClient::UpdateTableColumnStatistics(const Apache::Hadoop::Hive::ColumnStatistics& columnStatistics) {
return RunOperation<void>([client=Client, columnStatistics]() {
client->update_table_column_statistics(columnStatistics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ struct THiveMetastoreClient : public TThrRefBase {

NThreading::TFuture<void> CreateDatabase(const Apache::Hadoop::Hive::Database& database);
NThreading::TFuture<Apache::Hadoop::Hive::Database> GetDatabase(const TString& name);
NThreading::TFuture<std::vector<std::string>> GetAllDatabases();

NThreading::TFuture<void> CreateTable(const Apache::Hadoop::Hive::Table& table);
NThreading::TFuture<Apache::Hadoop::Hive::Table> GetTable(const TString& databaseName, const TString& tableName);
NThreading::TFuture<std::vector<std::string>> GetAllTables(const TString& databaseName);

NThreading::TFuture<void> UpdateTableColumnStatistics(const Apache::Hadoop::Hive::ColumnStatistics& columnStatistics);
NThreading::TFuture<Apache::Hadoop::Hive::TableStatsResult> GetTableStatistics(const Apache::Hadoop::Hive::TableStatsRequest& request);
Expand All @@ -44,4 +46,4 @@ struct THiveMetastoreClient : public TThrRefBase {
TThreadPool ThreadPool;
};

}
}
Loading

0 comments on commit cf54814

Please sign in to comment.