From ea2c989b3c532b20ddc7a76b3c39249468d8a3af Mon Sep 17 00:00:00 2001 From: Oleg Doronin Date: Mon, 20 May 2024 21:43:42 +0000 Subject: [PATCH] metastore fetcher has been added --- .../hive_metastore/events.cpp | 1 + .../external_sources/hive_metastore/events.h | 86 +++++ .../hive_metastore/hive_metastore_client.cpp | 41 ++- .../hive_metastore/hive_metastore_client.h | 4 +- .../hive_metastore_converters.cpp | 294 +++++++++++++++++ .../hive_metastore_converters.h | 26 ++ .../hive_metastore/hive_metastore_fetcher.cpp | 208 ++++++++++++ .../hive_metastore/hive_metastore_fetcher.h | 8 + .../hive_metastore/ut/common.cpp | 46 +++ .../hive_metastore/ut/common.h | 9 + .../hive_metastore/ut/docker-compose.yml | 13 + .../ut/hive_metastore_client_ut.cpp | 305 +++++++++++++++--- .../ut/hive_metastore_fetcher_ut.cpp | 157 +++++++++ .../hive_metastore/ut/scripts/bootstrap.bash | 10 + .../ut/scripts/create_table.sql | 59 ++++ .../ut/trino/catalog/datalake.properties | 9 + .../ut/trino/catalog/hive.properties | 9 + .../hive_metastore/ut/ya.make | 10 +- .../external_sources/hive_metastore/ya.make | 6 + 19 files changed, 1236 insertions(+), 65 deletions(-) create mode 100644 ydb/core/external_sources/hive_metastore/events.cpp create mode 100644 ydb/core/external_sources/hive_metastore/events.h create mode 100644 ydb/core/external_sources/hive_metastore/hive_metastore_converters.cpp create mode 100644 ydb/core/external_sources/hive_metastore/hive_metastore_converters.h create mode 100644 ydb/core/external_sources/hive_metastore/hive_metastore_fetcher.cpp create mode 100644 ydb/core/external_sources/hive_metastore/hive_metastore_fetcher.h create mode 100644 ydb/core/external_sources/hive_metastore/ut/common.cpp create mode 100644 ydb/core/external_sources/hive_metastore/ut/common.h create mode 100644 ydb/core/external_sources/hive_metastore/ut/hive_metastore_fetcher_ut.cpp create mode 100644 ydb/core/external_sources/hive_metastore/ut/scripts/bootstrap.bash create mode 100644 ydb/core/external_sources/hive_metastore/ut/scripts/create_table.sql create mode 100644 ydb/core/external_sources/hive_metastore/ut/trino/catalog/datalake.properties create mode 100644 ydb/core/external_sources/hive_metastore/ut/trino/catalog/hive.properties diff --git a/ydb/core/external_sources/hive_metastore/events.cpp b/ydb/core/external_sources/hive_metastore/events.cpp new file mode 100644 index 000000000000..6c3d2603e7e3 --- /dev/null +++ b/ydb/core/external_sources/hive_metastore/events.cpp @@ -0,0 +1 @@ +#include "events.h" diff --git a/ydb/core/external_sources/hive_metastore/events.h b/ydb/core/external_sources/hive_metastore/events.h new file mode 100644 index 000000000000..c7328becbcf4 --- /dev/null +++ b/ydb/core/external_sources/hive_metastore/events.h @@ -0,0 +1,86 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +namespace NKikimr::NExternalSource { + +struct TEvHiveMetastore { + // Event ids. + enum EEv : ui32 { + EvGetTable = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvGetStatistics, + EvGetPartitions, + + EvHiveGetTableResult, + EvHiveGetTableStatisticsResult, + EvHiveGetPartitionsResult, + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents:ES_PRIVATE)"); + + struct TTable { + std::vector Columns; + TString Location; + TString Format; + TString Compression; + std::vector PartitionedBy; + }; + + struct TEvGetTable: NActors::TEventLocal { + TString DatbaseName; + TString TableName; + NThreading::TPromise Promise; + }; + + struct TStatistics { + TMaybe Rows; + TMaybe Size; + }; + + struct TEvGetStatistics: NActors::TEventLocal { + TString DatbaseName; + TString TableName; + std::vector Columns; + NThreading::TPromise Promise; + }; + + struct TPartitions { + struct TPartition { + TString Location; + std::vector Values; + }; + std::vector Partitions; + }; + + struct TEvGetPartitions: NActors::TEventLocal { + TString DatbaseName; + TString TableName; + NYql::NConnector::NApi::TPredicate Predicate; + NThreading::TPromise Promise; + }; + + struct TEvHiveGetTableResult: NActors::TEventLocal { + Apache::Hadoop::Hive::Table Table; + NYql::TIssues Issues; + }; + + struct TEvHiveGetTableStatisticsResult: NActors::TEventLocal { + Apache::Hadoop::Hive::TableStatsResult Statistics; + NYql::TIssues Issues; + }; + + struct TEvHiveGetPartitionsResult: NActors::TEventLocal { + std::vector Partitions; + NYql::TIssues Issues; + }; +}; + +} diff --git a/ydb/core/external_sources/hive_metastore/hive_metastore_client.cpp b/ydb/core/external_sources/hive_metastore/hive_metastore_client.cpp index 09847a55328f..ff5348b89c9e 100644 --- a/ydb/core/external_sources/hive_metastore/hive_metastore_client.cpp +++ b/ydb/core/external_sources/hive_metastore/hive_metastore_client.cpp @@ -2,25 +2,6 @@ namespace NKikimr::NExternalSource { -namespace { - -template -class TFuncTask : public IObjectInQueue { -private: - TFunc Func; - -public: - TFuncTask(TFunc&& func) - : Func(std::move(func)) { - } - - void Process(void*) override { - Func(); - } -}; - -} - THiveMetastoreClient::THiveMetastoreClient(const TString& host, int32_t port) : Socket(new apache::thrift::transport::TSocket(host, port)) , Transport(new apache::thrift::transport::TBufferedTransport(Socket)) @@ -34,9 +15,9 @@ THiveMetastoreClient::THiveMetastoreClient(const TString& host, int32_t port) template NThreading::TFuture THiveMetastoreClient::RunOperation(const std::function& function) { NThreading::TPromise promise = NThreading::NewPromise(); - Y_ABORT_UNLESS(ThreadPool.AddAndOwn(THolder(new TFuncTask([promise, function, transport=Transport]() mutable { + Y_ABORT_UNLESS(ThreadPool.Add(MakeThrFuncObj([promise, function, transport=Transport]() mutable { try { - if constexpr (std::is_same::value) { + if constexpr (std::is_void_v) { function(); promise.SetValue(); } else { @@ -49,7 +30,7 @@ NThreading::TFuture THiveMetastoreClient::RunOperation(const std:: } catch (...) { promise.SetException(std::current_exception()); } - })))); + }))); return promise.GetFuture(); } @@ -73,6 +54,14 @@ NThreading::TFuture THiveMetastoreClient::CreateTable(const Apache::Hadoop }); } +NThreading::TFuture> THiveMetastoreClient::GetAllDatabases() { + return RunOperation>([client=Client]() { + std::vector databases; + client->get_all_databases(databases); + return databases; + }); +} + NThreading::TFuture THiveMetastoreClient::GetTable(const TString& databaseName, const TString& tableName) { return RunOperation([client=Client, databaseName, tableName]() { Apache::Hadoop::Hive::Table table; @@ -81,6 +70,14 @@ NThreading::TFuture THiveMetastoreClient::GetTable( }); } +NThreading::TFuture> THiveMetastoreClient::GetAllTables(const TString& databaseName) { + return RunOperation>([client=Client, databaseName]() { + std::vector tables; + client->get_all_tables(tables, databaseName); + return tables; + }); +} + NThreading::TFuture THiveMetastoreClient::UpdateTableColumnStatistics(const Apache::Hadoop::Hive::ColumnStatistics& columnStatistics) { return RunOperation([client=Client, columnStatistics]() { client->update_table_column_statistics(columnStatistics); diff --git a/ydb/core/external_sources/hive_metastore/hive_metastore_client.h b/ydb/core/external_sources/hive_metastore/hive_metastore_client.h index b4fd1bb77ac3..e32e20841d6b 100644 --- a/ydb/core/external_sources/hive_metastore/hive_metastore_client.h +++ b/ydb/core/external_sources/hive_metastore/hive_metastore_client.h @@ -17,9 +17,11 @@ struct THiveMetastoreClient : public TThrRefBase { NThreading::TFuture CreateDatabase(const Apache::Hadoop::Hive::Database& database); NThreading::TFuture GetDatabase(const TString& name); + NThreading::TFuture> GetAllDatabases(); NThreading::TFuture CreateTable(const Apache::Hadoop::Hive::Table& table); NThreading::TFuture GetTable(const TString& databaseName, const TString& tableName); + NThreading::TFuture> GetAllTables(const TString& databaseName); NThreading::TFuture UpdateTableColumnStatistics(const Apache::Hadoop::Hive::ColumnStatistics& columnStatistics); NThreading::TFuture GetTableStatistics(const Apache::Hadoop::Hive::TableStatsRequest& request); @@ -44,4 +46,4 @@ struct THiveMetastoreClient : public TThrRefBase { TThreadPool ThreadPool; }; -} \ No newline at end of file +} diff --git a/ydb/core/external_sources/hive_metastore/hive_metastore_converters.cpp b/ydb/core/external_sources/hive_metastore/hive_metastore_converters.cpp new file mode 100644 index 000000000000..5a656b3a10d4 --- /dev/null +++ b/ydb/core/external_sources/hive_metastore/hive_metastore_converters.cpp @@ -0,0 +1,294 @@ +#include "hive_metastore_converters.h" +#include + +namespace NKikimr::NExternalSource { + +namespace { + +Ydb::Type CreatePrimitiveType(::Ydb::Type_PrimitiveTypeId value) { + Ydb::Type type; + type.set_type_id(value); + return type; +} + +Ydb::Type GetType(const std::string& typeName) { + if (typeName == "boolean") { + return CreatePrimitiveType(Ydb::Type::BOOL); + } + + if (typeName == "int") { + return CreatePrimitiveType(Ydb::Type::INT32); + } + + if (typeName == "long") { + return CreatePrimitiveType(Ydb::Type::INT64); + } + + if (typeName == "float") { + return CreatePrimitiveType(Ydb::Type::FLOAT); + } + + if (typeName == "double") { + return CreatePrimitiveType(Ydb::Type::DOUBLE); + } + + if (typeName == "date") { + return CreatePrimitiveType(Ydb::Type::DATE); + } + + if (typeName == "string") { + return CreatePrimitiveType(Ydb::Type::UTF8); + } + + if (typeName == "timestamp") { + return CreatePrimitiveType(Ydb::Type::TIMESTAMP); + } + + if (typeName == "binary") { + return CreatePrimitiveType(Ydb::Type::STRING); + } + + throw yexception() << "Unsupported type: " << typeName; +} + +} + +TString THiveMetastoreConverters::GetFormat(const Apache::Hadoop::Hive::Table& table) { + if (!table.sd.__isset.inputFormat) { + throw yexception() << "Input format wasn't specified for table " << table.tableName << " in database " << table.dbName; + } + + if (!table.sd.__isset.outputFormat) { + throw yexception() << "Output format wasn't specified for table " << table.tableName << " in database " << table.dbName; + } + + if (table.sd.inputFormat == "org.apache.hadoop.hive.serde2.OpenCSVSerde") { + if (table.sd.outputFormat != "org.apache.hadoop.hive.serde2.OpenCSVSerde") { + throw yexception() << "Invalid combination of input and output formats: " << table.sd.inputFormat << " " << table.sd.outputFormat; + } + return "csv_with_names"; + } + + if (table.sd.inputFormat == "org.apache.hadoop.mapred.FileInputFormat") { + if (table.sd.outputFormat != "org.apache.hadoop.mapred.FileOutputFormat") { + throw yexception() << "Invalid combination of input and output formats: " << table.sd.inputFormat << " " << table.sd.outputFormat; + } + return "raw"; + } + + if (table.sd.inputFormat == "org.apache.hudi.hadoop.HoodieParquetInputFormat") { + if (table.sd.outputFormat != "org.apache.hudi.hadoop.HoodieParquetOutputFormat") { + throw yexception() << "Invalid combination of input and output formats: " << table.sd.inputFormat << " " << table.sd.outputFormat; + } + return "parquet"; + } + + if (table.sd.inputFormat == "org.apache.hive.hcatalog.data.JsonSerDe") { + if (table.sd.outputFormat != "org.apache.hive.hcatalog.data.JsonSerDe") { + throw yexception() << "Invalid combination of input and output formats: " << table.sd.inputFormat << " " << table.sd.outputFormat; + } + return "json_each_row"; + } + + if (table.sd.inputFormat == "org.apache.hadoop.hive.serde2.JsonSerDe") { + if (table.sd.outputFormat != "org.apache.hadoop.hive.serde2.JsonSerDe") { + throw yexception() << "Invalid combination of input and output formats: " << table.sd.inputFormat << " " << table.sd.outputFormat; + } + return "json_each_row"; + } + + if (table.sd.inputFormat == "org.openx.data.jsonserde.JsonSerDe") { + if (table.sd.outputFormat != "org.openx.data.jsonserde.JsonSerDe") { + throw yexception() << "Invalid combination of input and output formats: " << table.sd.inputFormat << " " << table.sd.outputFormat; + } + return "json_each_row"; + } + + if (table.sd.inputFormat == "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat") { + if (table.sd.outputFormat != "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") { + throw yexception() << "Invalid combination of input and output formats: " << table.sd.inputFormat << " " << table.sd.outputFormat; + } + return "parquet"; + } + + if (table.sd.inputFormat == "org.apache.hadoop.mapred.TextInputFormat") { + if (table.sd.outputFormat != "org.apache.hadoop.mapred.TextOutputFormat") { + throw yexception() << "Invalid combination of input and output formats: " << table.sd.inputFormat << " " << table.sd.outputFormat; + } + return "raw"; + } + + if (table.sd.inputFormat == "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") { + if (table.sd.outputFormat != "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") { + throw yexception() << "Invalid combination of input and output formats: " << table.sd.inputFormat << " " << table.sd.outputFormat; + } + return "parquet"; + } + + throw yexception() << "Input format with name " << table.sd.inputFormat << " isn't supported"; +} + +TString THiveMetastoreConverters::GetCompression(const Apache::Hadoop::Hive::Table&) { + return TString{}; +} + +TString THiveMetastoreConverters::GetLocation(const Apache::Hadoop::Hive::Table& table) { + return TString{table.sd.location}; +} + +std::vector THiveMetastoreConverters::GetColumns(const Apache::Hadoop::Hive::Table& table) { + std::vector columns; + TSet columnNames; + for (const auto& hiveColumn: table.sd.cols) { + if (columnNames.contains(hiveColumn.name)) { + continue; + } + columnNames.insert(TString{hiveColumn.name}); + Ydb::Column ydbColumn; + ydbColumn.set_name(TString{hiveColumn.name}); + *ydbColumn.mutable_type() = GetType(hiveColumn.type); + columns.push_back(ydbColumn); + } + for (const auto& hiveColumn: table.partitionKeys) { + if (columnNames.contains(hiveColumn.name)) { + continue; + } + columnNames.insert(TString{hiveColumn.name}); + Ydb::Column ydbColumn; + ydbColumn.set_name(TString{hiveColumn.name}); + *ydbColumn.mutable_type() = GetType(hiveColumn.type); + columns.push_back(ydbColumn); + } + return columns; +} + +std::vector THiveMetastoreConverters::GetPartitionedColumns(const Apache::Hadoop::Hive::Table& table) { + std::vector columns; + TSet columnNames; + for (const auto& hiveColumn: table.partitionKeys) { + if (columnNames.contains(hiveColumn.name)) { + continue; + } + columnNames.insert(TString{hiveColumn.name}); + columns.push_back(TString{hiveColumn.name}); + } + return columns; +} + +TString THiveMetastoreConverters::GetPartitionsFilter(const NYql::NConnector::NApi::TPredicate&) { + return TString{}; // TODO: push down filter +} + +THiveMetastoreConverters::TStatistics THiveMetastoreConverters::GetStatistics(const Apache::Hadoop::Hive::TableStatsResult& statistics) { + if (statistics.tableStats.empty()) { + return TStatistics{}; + } + + int64_t rowsCount = 0; + for (const auto& columnStatistics : statistics.tableStats) { + if (columnStatistics.statsData.__isset.binaryStats) { + auto stats = columnStatistics.statsData.binaryStats; + rowsCount = std::max(rowsCount, stats.numNulls); + continue; + } + if (columnStatistics.statsData.__isset.booleanStats) { + auto stats = columnStatistics.statsData.booleanStats; + rowsCount = std::max(rowsCount, stats.numTrues + stats.numFalses + stats.numNulls); + continue; + } + if (columnStatistics.statsData.__isset.dateStats) { + auto stats = columnStatistics.statsData.dateStats; + rowsCount = std::max(rowsCount, stats.numDVs + stats.numNulls); + continue; + } + if (columnStatistics.statsData.__isset.decimalStats) { + auto stats = columnStatistics.statsData.decimalStats; + rowsCount = std::max(rowsCount, stats.numDVs + stats.numNulls); + continue; + } + if (columnStatistics.statsData.__isset.doubleStats) { + auto stats = columnStatistics.statsData.doubleStats; + rowsCount = std::max(rowsCount, stats.numDVs + stats.numNulls); + continue; + } + if (columnStatistics.statsData.__isset.longStats) { + auto stats = columnStatistics.statsData.longStats; + rowsCount = std::max(rowsCount, stats.numDVs + stats.numNulls); + continue; + } + if (columnStatistics.statsData.__isset.stringStats) { + auto stats = columnStatistics.statsData.stringStats; + rowsCount = std::max(rowsCount, stats.numDVs + stats.numNulls); + continue; + } + if (columnStatistics.statsData.__isset.timestampStats) { + auto stats = columnStatistics.statsData.timestampStats; + rowsCount = std::max(rowsCount, stats.numDVs + stats.numNulls); + continue; + } + } + + int64_t sizeCount = 0; + for (const auto& columnStatistics : statistics.tableStats) { + if (columnStatistics.statsData.__isset.binaryStats) { + auto stats = columnStatistics.statsData.binaryStats; + sizeCount += (rowsCount - stats.numNulls) * stats.avgColLen; + continue; + } + if (columnStatistics.statsData.__isset.booleanStats) { + auto stats = columnStatistics.statsData.booleanStats; + sizeCount += (rowsCount - stats.numNulls); + continue; + } + if (columnStatistics.statsData.__isset.dateStats) { + auto stats = columnStatistics.statsData.dateStats; + sizeCount += (rowsCount - stats.numNulls) * 4; + continue; + } + if (columnStatistics.statsData.__isset.decimalStats) { + auto stats = columnStatistics.statsData.decimalStats; + sizeCount += (rowsCount - stats.numNulls) * 8; + continue; + } + if (columnStatistics.statsData.__isset.doubleStats) { + auto stats = columnStatistics.statsData.doubleStats; + sizeCount += (rowsCount - stats.numNulls) * 8; + continue; + } + if (columnStatistics.statsData.__isset.longStats) { + auto stats = columnStatistics.statsData.longStats; + sizeCount += (rowsCount - stats.numNulls) * 8; + continue; + } + if (columnStatistics.statsData.__isset.stringStats) { + auto stats = columnStatistics.statsData.stringStats; + sizeCount += (rowsCount - stats.numNulls) * stats.avgColLen; + continue; + } + if (columnStatistics.statsData.__isset.timestampStats) { + auto stats = columnStatistics.statsData.timestampStats; + sizeCount += (rowsCount - stats.numNulls) * 8; + continue; + } + } + + return TStatistics{rowsCount, sizeCount}; +} + +THiveMetastoreConverters::TStatistics THiveMetastoreConverters::GetStatistics(const std::vector& partitions) { + THiveMetastoreConverters::TStatistics statistics; + for (const auto& partition: partitions) { + auto it = partition.parameters.find("numRows"); + if (it != partition.parameters.end()) { + statistics.Rows = statistics.Rows.GetOrElse(0) + std::stoi(it->second); + } + + it = partition.parameters.find("totalSize"); + if (it != partition.parameters.end()) { + statistics.Size = statistics.Size.GetOrElse(0) + std::stoi(it->second); + } + } + return statistics; +} + +} diff --git a/ydb/core/external_sources/hive_metastore/hive_metastore_converters.h b/ydb/core/external_sources/hive_metastore/hive_metastore_converters.h new file mode 100644 index 000000000000..97f2bc8d5a86 --- /dev/null +++ b/ydb/core/external_sources/hive_metastore/hive_metastore_converters.h @@ -0,0 +1,26 @@ +#include +#include +#include + +#include +#include + +namespace NKikimr::NExternalSource { + +struct THiveMetastoreConverters { + struct TStatistics { + TMaybe Rows; + TMaybe Size; + }; + + static TString GetFormat(const Apache::Hadoop::Hive::Table& table); + static TString GetCompression(const Apache::Hadoop::Hive::Table& table); + static TString GetLocation(const Apache::Hadoop::Hive::Table& table); + static std::vector GetColumns(const Apache::Hadoop::Hive::Table& table); + static std::vector GetPartitionedColumns(const Apache::Hadoop::Hive::Table& table); + static TString GetPartitionsFilter(const NYql::NConnector::NApi::TPredicate& predicate); + static TStatistics GetStatistics(const Apache::Hadoop::Hive::TableStatsResult& statistics); + static TStatistics GetStatistics(const std::vector& partitions); +}; + +} \ No newline at end of file diff --git a/ydb/core/external_sources/hive_metastore/hive_metastore_fetcher.cpp b/ydb/core/external_sources/hive_metastore/hive_metastore_fetcher.cpp new file mode 100644 index 000000000000..137f23a385c9 --- /dev/null +++ b/ydb/core/external_sources/hive_metastore/hive_metastore_fetcher.cpp @@ -0,0 +1,208 @@ +#include +#include +#include + +#include +#include +#include +#include + +#define LOG_E(stream) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_GATEWAY, "[HiveMetastoreFetcher]: " << stream) +#define LOG_W(stream) LOG_WARN_S( *NActors::TlsActivationContext, NKikimrServices::KQP_GATEWAY, "[HiveMetastoreFetcher]: " << stream) +#define LOG_I(stream) LOG_INFO_S( *NActors::TlsActivationContext, NKikimrServices::KQP_GATEWAY, "[HiveMetastoreFetcher]: " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_GATEWAY, "[HiveMetastoreFetcher]: " << stream) +#define LOG_T(stream) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_GATEWAY, "[HiveMetastoreFetcher]: " << stream) + +namespace NKikimr::NExternalSource { + +using namespace NKikimr::NExternalSource; + +namespace { + +class THiveMetastoreFetcherActor : public NActors::TActor { +public: + using TBase = NActors::TActor; + THiveMetastoreFetcherActor(const TString& host, int32_t port) + : TBase(&THiveMetastoreFetcherActor::StateFunc) + , Client(host, port) + {} + + STRICT_STFUNC(StateFunc, + hFunc(TEvHiveMetastore::TEvGetTable, Handle); + hFunc(TEvHiveMetastore::TEvGetStatistics, Handle); + hFunc(TEvHiveMetastore::TEvGetPartitions, Handle); + hFunc(TEvHiveMetastore::TEvHiveGetTableResult, Handle); + hFunc(TEvHiveMetastore::TEvHiveGetTableStatisticsResult, Handle); + hFunc(TEvHiveMetastore::TEvHiveGetPartitionsResult, Handle); + ) + + void Handle(TEvHiveMetastore::TEvGetTable::TPtr& ev) { + Client.GetTable(ev->Get()->DatbaseName, ev->Get()->TableName).Apply([actorSystem = NActors::TActivationContext::ActorSystem(), self = SelfId()](const auto& future) { + auto result = std::make_unique(); + try { + auto table = future.GetValue(); + result->Table = table; + actorSystem->Send(self, result.release(), 0, 0); + } catch (...) { + result->Issues.AddIssue(CurrentExceptionMessage()); + actorSystem->Send(self, result.release(), 0, 0); + } + }); + Requests.emplace<0>(ev); + } + + void Handle(TEvHiveMetastore::TEvGetStatistics::TPtr& ev) { + Apache::Hadoop::Hive::TableStatsRequest request; + request.__set_dbName(ev->Get()->DatbaseName); + request.__set_tblName(ev->Get()->TableName); + request.__set_colNames(ev->Get()->Columns); + + Client.GetTableStatistics(request).Apply([actorSystem = NActors::TActivationContext::ActorSystem(), self = SelfId()](const auto& future) { + auto result = std::make_unique(); + try { + auto statistics = future.GetValue(); + result->Statistics = statistics; + actorSystem->Send(self, result.release(), 0, 0); + } catch (...) { + result->Issues.AddIssue(CurrentExceptionMessage()); + actorSystem->Send(self, result.release(), 0, 0); + } + }); + Requests.emplace<1>(ev); + } + + void Handle(TEvHiveMetastore::TEvGetPartitions::TPtr& ev) { + SendGetPartitions(ev.Get()->Get()->DatbaseName, ev.Get()->Get()->TableName, THiveMetastoreConverters::GetPartitionsFilter(ev.Get()->Get()->Predicate)); + Requests.emplace<2>(ev); + } + + void Handle(TEvHiveMetastore::TEvHiveGetTableResult::TPtr& ev) { + auto request = std::get<0>(Requests); + const auto& issues = ev.Get()->Get()->Issues; + if (issues) { + LOG_E(issues.ToString(true)); + request->Get()->Promise.SetException(std::make_exception_ptr(yexception() << issues.ToString(true))); + PassAway(); + return; + } + + try { + auto hiveMetastoreTable = ev.Get()->Get()->Table; + TEvHiveMetastore::TTable table; + table.Columns = THiveMetastoreConverters::GetColumns(hiveMetastoreTable); + table.Compression = THiveMetastoreConverters::GetCompression(hiveMetastoreTable); + table.Format = THiveMetastoreConverters::GetFormat(hiveMetastoreTable); + table.Location = THiveMetastoreConverters::GetLocation(hiveMetastoreTable); + table.PartitionedBy = THiveMetastoreConverters::GetPartitionedColumns(hiveMetastoreTable); + request->Get()->Promise.SetValue(table); + } catch (...) { + request->Get()->Promise.SetException(std::current_exception()); + } + + PassAway(); + } + + void Handle(TEvHiveMetastore::TEvHiveGetTableStatisticsResult::TPtr& ev) { + const auto& request = std::get<1>(Requests); + const auto& issues = ev.Get()->Get()->Issues; + if (issues) { + LOG_E(issues.ToString(true)); + request->Get()->Promise.SetException(std::make_exception_ptr(yexception() << issues.ToString(true))); + PassAway(); + return; + } + + auto statistics = THiveMetastoreConverters::GetStatistics(ev.Get()->Get()->Statistics); + if (!statistics.Rows && !statistics.Size) { + SendGetPartitions(request->Get()->DatbaseName, request->Get()->TableName, {}); + return; + } + + request->Get()->Promise.SetValue(TEvHiveMetastore::TStatistics{statistics.Rows, statistics.Size}); + PassAway(); + } + + void Handle(TEvHiveMetastore::TEvHiveGetPartitionsResult::TPtr& ev) { + if (Requests.index() == 1) { + ProcessGetTableStatistics(ev); + return; + } + + ProcessGetPartitions(ev); + } + + void ProcessGetPartitions(TEvHiveMetastore::TEvHiveGetPartitionsResult::TPtr& ev) { + auto request = std::get<2>(Requests); + const auto& issues = ev.Get()->Get()->Issues; + if (issues) { + LOG_E(issues.ToString(true)); + request->Get()->Promise.SetException(std::make_exception_ptr(yexception() << issues.ToString(true))); + PassAway(); + return; + } + + try { + TEvHiveMetastore::TPartitions partitions; + const auto& hiveMetastorePartitions = ev.Get()->Get()->Partitions; + for (const auto& partition: hiveMetastorePartitions) { + std::vector values; + for (const auto& value: partition.values) { + values.push_back(TString{value}); + } + partitions.Partitions.push_back(TEvHiveMetastore::TPartitions::TPartition{TString{partition.sd.location}, values}); + } + request->Get()->Promise.SetValue(partitions); + } catch (...) { + request->Get()->Promise.SetException(std::current_exception()); + } + + PassAway(); + } + + void ProcessGetTableStatistics(TEvHiveMetastore::TEvHiveGetPartitionsResult::TPtr& ev) { + auto request = std::get<1>(Requests); + const auto& issues = ev.Get()->Get()->Issues; + if (issues) { + LOG_E(issues.ToString(true)); + request->Get()->Promise.SetException(std::make_exception_ptr(yexception() << issues.ToString(true))); + PassAway(); + return; + } + + auto statistics = THiveMetastoreConverters::GetStatistics(ev.Get()->Get()->Partitions); + if (!statistics.Rows && !statistics.Size) { + SendGetPartitions(request->Get()->DatbaseName, request->Get()->TableName, {}); + return; + } + + request->Get()->Promise.SetValue(TEvHiveMetastore::TStatistics{statistics.Rows, statistics.Size}); + PassAway(); + } + +private: + void SendGetPartitions(const TString& databaseName, const TString& tableName, const TString& filter) { + Client.GetPartitionsByFilter(databaseName, tableName, filter).Apply([actorSystem = NActors::TActivationContext::ActorSystem(), self = SelfId()](const auto& future) { + auto result = std::make_unique(); + try { + auto partitions = future.GetValue(); + result->Partitions = partitions; + actorSystem->Send(self, result.release(), 0, 0); + } catch (...) { + result->Issues.AddIssue(CurrentExceptionMessage()); + actorSystem->Send(self, result.release(), 0, 0); + } + }); + } + +private: + NKikimr::NExternalSource::THiveMetastoreClient Client; + std::variant Requests; +}; + +} + +std::unique_ptr CreateHiveMetastoreFetcherActor(const TString& host, int32_t port) { + return std::make_unique(host, port); +} + +} diff --git a/ydb/core/external_sources/hive_metastore/hive_metastore_fetcher.h b/ydb/core/external_sources/hive_metastore/hive_metastore_fetcher.h new file mode 100644 index 000000000000..26c8a71c59c8 --- /dev/null +++ b/ydb/core/external_sources/hive_metastore/hive_metastore_fetcher.h @@ -0,0 +1,8 @@ +#include +#include + +namespace NKikimr::NExternalSource { + +std::unique_ptr CreateHiveMetastoreFetcherActor(const TString& host, int32_t port); + +} diff --git a/ydb/core/external_sources/hive_metastore/ut/common.cpp b/ydb/core/external_sources/hive_metastore/ut/common.cpp new file mode 100644 index 000000000000..95036143b37c --- /dev/null +++ b/ydb/core/external_sources/hive_metastore/ut/common.cpp @@ -0,0 +1,46 @@ +#include + +#include +#include +#include + +namespace NKikimr::NExternalSource { + +namespace { + +TString Exec(const TString& cmd) { + std::array buffer; + TString result; + std::unique_ptr pipe(popen(cmd.c_str(), "r"), pclose); + if (!pipe) { + throw std::runtime_error("popen() failed!"); + } + while (fgets(buffer.data(), static_cast(buffer.size()), pipe.get()) != nullptr) { + result += buffer.data(); + } + return result; +} + +} + +TString GetExternalPort(const TString& service, const TString& port) { + auto dockerComposeBin = BinaryPath("library/recipes/docker_compose/bin/docker-compose"); + auto composeFileYml = ArcadiaSourceRoot() + "/ydb/core/external_sources/hive_metastore/ut/docker-compose.yml"; + auto result = StringSplitter(Exec(dockerComposeBin + " -f " + composeFileYml + " port " + service + " " + port)).Split(':').ToList(); + return result ? Strip(result.back()) : TString{}; +} + +void WaitHiveMetastore(const TString& host, int32_t port, const TString& database) { + NKikimr::NExternalSource::THiveMetastoreClient client(host, port); + for (int i = 0; i < 60; i++) { + try { + client.GetDatabase(database).GetValue(TDuration::Seconds(10)); + return; + } catch (...) { + Sleep(TDuration::Seconds(1)); + } + } + ythrow yexception() << "Hive metastore isn't ready, host: " << host << " port: " << port; +} + +} \ No newline at end of file diff --git a/ydb/core/external_sources/hive_metastore/ut/common.h b/ydb/core/external_sources/hive_metastore/ut/common.h new file mode 100644 index 000000000000..9ce114e7d14b --- /dev/null +++ b/ydb/core/external_sources/hive_metastore/ut/common.h @@ -0,0 +1,9 @@ +#include + +namespace NKikimr::NExternalSource { + +TString GetExternalPort(const TString& service, const TString& port); + +void WaitHiveMetastore(const TString& host, int32_t port, const TString& database); + +} \ No newline at end of file diff --git a/ydb/core/external_sources/hive_metastore/ut/docker-compose.yml b/ydb/core/external_sources/hive_metastore/ut/docker-compose.yml index 6f406223ca53..5b018869ab3b 100644 --- a/ydb/core/external_sources/hive_metastore/ut/docker-compose.yml +++ b/ydb/core/external_sources/hive_metastore/ut/docker-compose.yml @@ -3,9 +3,22 @@ services: trino: hostname: trino image: 'trinodb/trino@sha256:49aa7230aaa00b92d54842b2f0cafc1f5ec5f228e77b4bc45b3d65f9fa94c45f' + volumes: + - ./trino/catalog:/etc/trino/catalog:ro ports: - '8080' + # This job creates the "datalake" bucket on Minio + trino-job: + image: 'trinodb/trino@sha256:49aa7230aaa00b92d54842b2f0cafc1f5ec5f228e77b4bc45b3d65f9fa94c45f' + container_name: trino-job + volumes: + - ./scripts:/scripts + command: | + /bin/bash /scripts/bootstrap.bash + depends_on: + - trino + # HMS backend database metastore_db: image: postgres@sha256:00e6ed9967881099ce9e552be567537d0bb47c990dacb43229cc9494bfddd8a0 diff --git a/ydb/core/external_sources/hive_metastore/ut/hive_metastore_client_ut.cpp b/ydb/core/external_sources/hive_metastore/ut/hive_metastore_client_ut.cpp index f6f85d3c13d3..3c8e8f152fcb 100644 --- a/ydb/core/external_sources/hive_metastore/ut/hive_metastore_client_ut.cpp +++ b/ydb/core/external_sources/hive_metastore/ut/hive_metastore_client_ut.cpp @@ -1,53 +1,20 @@ -#include "../hive_metastore_client.h" +#include +#include #include #include #include +#include -namespace { - -TString Exec(const TString& cmd) { - std::array buffer; - TString result; - std::unique_ptr pipe(popen(cmd.c_str(), "r"), pclose); - if (!pipe) { - throw std::runtime_error("popen() failed!"); - } - while (fgets(buffer.data(), static_cast(buffer.size()), pipe.get()) != nullptr) { - result += buffer.data(); - } - return result; -} - -TString GetExternalHiveMetastorePort(const TString& service, const TString& port) { - auto dockerComposeBin = BinaryPath("library/recipes/docker_compose/bin/docker-compose"); - auto composeFileYml = ArcadiaSourceRoot() + "/ydb/core/external_sources/hive_metastore/ut/docker-compose.yml"; - auto result = StringSplitter(Exec(dockerComposeBin + " -f " + composeFileYml + " port " + service + " " + port)).Split(':').ToList(); - return result ? result.back() : TString{}; -} - -void WaitHiveMetastore(const TString& host, int32_t port) { - NKikimr::NExternalSource::THiveMetastoreClient client(host, port); - for (int i = 0; i < 30; i++) { - try { - client.GetDatabase("default").GetValue(TDuration::Seconds(10)); - return; - } catch (...) { - Sleep(TDuration::Seconds(1)); - } - } - ythrow yexception() << "Hive metastore isn't ready, host: " << host << " port: " << port; -} - -} +namespace NKikimr::NExternalSource { Y_UNIT_TEST_SUITE(HiveMetastoreClient) { Y_UNIT_TEST(SuccessRequest) { const TString host = "0.0.0.0"; - const int32_t port = stoi(GetExternalHiveMetastorePort("hive-metastore", "9083")); - WaitHiveMetastore(host, port); - - NKikimr::NExternalSource::THiveMetastoreClient client("0.0.0.0", stoi(GetExternalHiveMetastorePort("hive-metastore", "9083"))); + const int32_t port = stoi(GetExternalPort("hive-metastore", "9083")); + WaitHiveMetastore(host, port, "default"); + + NKikimr::NExternalSource::THiveMetastoreClient client("0.0.0.0", port); { auto future = client.GetDatabase("default"); UNIT_ASSERT_VALUES_EQUAL(future.GetValue(TDuration::Seconds(10)).name, "default"); @@ -240,4 +207,260 @@ Y_UNIT_TEST_SUITE(HiveMetastoreClient) { UNIT_ASSERT_VALUES_EQUAL(configValue, "9083"); } } + + Y_UNIT_TEST(SuccessTrinoRequest) { + const TString host = "0.0.0.0"; + const int32_t port = stoi(GetExternalPort("hive-metastore", "9083")); + WaitHiveMetastore(host, port, "final"); + + NKikimr::NExternalSource::THiveMetastoreClient client("0.0.0.0", port); + + // iceberg + { + auto tables = client.GetAllTables("iceberg").GetValue(TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(tables.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(tables[0], "request_logs"); + + auto table = client.GetTable("iceberg", "request_logs").GetValue(TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(table.tableName, "request_logs"); + UNIT_ASSERT_VALUES_EQUAL(table.dbName, "iceberg"); + UNIT_ASSERT_VALUES_EQUAL(table.owner, "trino"); + UNIT_ASSERT_VALUES_UNEQUAL(table.createTime, 0); + UNIT_ASSERT_VALUES_EQUAL(table.lastAccessTime, 0); + UNIT_ASSERT_VALUES_EQUAL(table.retention, 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols.size(), 7); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[0].name, "request_time"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[0].type, "timestamp"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[1].name, "url"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[1].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[2].name, "ip"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[2].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[3].name, "user_agent"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[3].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[4].name, "year"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[4].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[5].name, "month"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[5].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[6].name, "day"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[6].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.location, "s3://datalake/data/logs"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.inputFormat, "org.apache.hadoop.mapred.FileInputFormat"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.outputFormat, "org.apache.hadoop.mapred.FileOutputFormat"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.compressed, 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.numBuckets, 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.serdeInfo.name, "request_logs"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.serdeInfo.serializationLib, "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.serdeInfo.parameters.size(), 0); + UNIT_ASSERT(!table.sd.serdeInfo.__isset.description); + UNIT_ASSERT(!table.sd.serdeInfo.__isset.serializerClass); + UNIT_ASSERT(!table.sd.serdeInfo.__isset.deserializerClass); + UNIT_ASSERT(!table.sd.serdeInfo.__isset.serdeType); + UNIT_ASSERT_VALUES_EQUAL(table.sd.bucketCols.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.sortCols.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.parameters.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.skewedInfo.skewedColNames.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.skewedInfo.skewedColValues.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.skewedInfo.skewedColValueLocationMaps.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.storedAsSubDirectories, 0); + UNIT_ASSERT_VALUES_EQUAL(table.partitionKeys.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(table.parameters.size(), 7); + UNIT_ASSERT(table.parameters.contains("EXTERNAL")); + UNIT_ASSERT(table.parameters.contains("metadata_location")); + UNIT_ASSERT(table.parameters.contains("numFiles")); + UNIT_ASSERT(table.parameters.contains("previous_metadata_location")); + UNIT_ASSERT(table.parameters.contains("table_type")); + UNIT_ASSERT(table.parameters.contains("totalSize")); + UNIT_ASSERT(table.parameters.contains("transient_lastDdlTime")); + UNIT_ASSERT_VALUES_EQUAL(table.parameters["EXTERNAL"], "TRUE"); + UNIT_ASSERT_STRING_CONTAINS(table.parameters["metadata_location"], "s3://datalake/data/logs/metadata/"); + UNIT_ASSERT_VALUES_EQUAL(table.parameters["numFiles"], "2"); + UNIT_ASSERT_STRING_CONTAINS(table.parameters["previous_metadata_location"], "s3://datalake/data/logs/metadata/"); + UNIT_ASSERT_VALUES_EQUAL(table.parameters["table_type"], "ICEBERG"); + UNIT_ASSERT_C(table.parameters["totalSize"] == "6760" || table.parameters["totalSize"] == "6754", table.parameters["totalSize"]); + UNIT_ASSERT_VALUES_UNEQUAL(table.parameters["transient_lastDdlTime"], TString()); + UNIT_ASSERT_VALUES_EQUAL(table.viewOriginalText, TString()); + UNIT_ASSERT_VALUES_EQUAL(table.viewExpandedText, TString()); + UNIT_ASSERT_VALUES_EQUAL(table.tableType, "EXTERNAL_TABLE"); + UNIT_ASSERT(!table.__isset.privileges); + UNIT_ASSERT_VALUES_EQUAL(table.temporary, 0); + UNIT_ASSERT_VALUES_EQUAL(table.rewriteEnabled, 0); + UNIT_ASSERT(!table.__isset.creationMetadata); + UNIT_ASSERT_VALUES_EQUAL(table.catName, "hive"); + UNIT_ASSERT_EQUAL(table.ownerType, Apache::Hadoop::Hive::PrincipalType::USER); + UNIT_ASSERT_VALUES_EQUAL(table.writeId, -1); + UNIT_ASSERT(!table.__isset.isStatsCompliant); + UNIT_ASSERT(!table.__isset.colStats); + UNIT_ASSERT(!table.__isset.accessType); + UNIT_ASSERT(!table.__isset.requiredReadCapabilities); + UNIT_ASSERT(!table.__isset.requiredWriteCapabilities); + UNIT_ASSERT(!table.__isset.id); + UNIT_ASSERT(!table.__isset.fileMetadata); + UNIT_ASSERT(!table.__isset.dictionary); + UNIT_ASSERT(!table.__isset.txnId); + + auto partitions = client.GetPartitionsByFilter("iceberg", "request_logs", "").GetValue(TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 0); + + Apache::Hadoop::Hive::TableStatsRequest request; + request.__set_dbName("iceberg"); + request.__set_tblName("request_logs"); + request.__set_colNames({"request_time", "url", "ip", "user_agent"}); + auto statisctics = client.GetTableStatistics(request).GetValue(TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(statisctics.tableStats.size(), 0); + UNIT_ASSERT(!statisctics.__isset.isStatsCompliant); + } + + // hive + { + auto tables = client.GetAllTables("hive").GetValue(TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(tables.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(tables[0], "request_logs"); + + auto table = client.GetTable("hive", "request_logs").GetValue(TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(table.tableName, "request_logs"); + UNIT_ASSERT_VALUES_EQUAL(table.dbName, "hive"); + UNIT_ASSERT_VALUES_EQUAL(table.owner, "trino"); + UNIT_ASSERT_VALUES_UNEQUAL(table.createTime, 0); + UNIT_ASSERT_VALUES_EQUAL(table.lastAccessTime, 0); + UNIT_ASSERT_VALUES_EQUAL(table.retention, 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols.size(), 4); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[0].name, "request_time"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[0].type, "timestamp"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[1].name, "url"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[1].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[2].name, "ip"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[2].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[3].name, "user_agent"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.cols[3].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.location, "s3://datalake/data/logs/hive/request_logs"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.inputFormat, "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.outputFormat, "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.compressed, 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.numBuckets, 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.serdeInfo.name, "request_logs"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.serdeInfo.serializationLib, "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"); + UNIT_ASSERT_VALUES_EQUAL(table.sd.serdeInfo.parameters.size(), 0); + UNIT_ASSERT(!table.sd.serdeInfo.__isset.description); + UNIT_ASSERT(!table.sd.serdeInfo.__isset.serializerClass); + UNIT_ASSERT(!table.sd.serdeInfo.__isset.deserializerClass); + UNIT_ASSERT(!table.sd.serdeInfo.__isset.serdeType); + UNIT_ASSERT_VALUES_EQUAL(table.sd.bucketCols.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.sortCols.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.parameters.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.skewedInfo.skewedColNames.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.skewedInfo.skewedColValues.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.skewedInfo.skewedColValueLocationMaps.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(table.sd.storedAsSubDirectories, 0); + UNIT_ASSERT_VALUES_EQUAL(table.partitionKeys.size(), 3); + UNIT_ASSERT_VALUES_EQUAL(table.partitionKeys[0].name, "year"); + UNIT_ASSERT_VALUES_EQUAL(table.partitionKeys[0].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(table.partitionKeys[1].name, "month"); + UNIT_ASSERT_VALUES_EQUAL(table.partitionKeys[1].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(table.partitionKeys[2].name, "day"); + UNIT_ASSERT_VALUES_EQUAL(table.partitionKeys[2].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(table.parameters.size(), 5); + UNIT_ASSERT(table.parameters.contains("STATS_GENERATED_VIA_STATS_TASK")); + UNIT_ASSERT(table.parameters.contains("auto.purge")); + UNIT_ASSERT(table.parameters.contains("trino_query_id")); + UNIT_ASSERT(table.parameters.contains("trino_version")); + UNIT_ASSERT(table.parameters.contains("transient_lastDdlTime")); + UNIT_ASSERT_VALUES_EQUAL(table.parameters["STATS_GENERATED_VIA_STATS_TASK"], "workaround for potential lack of HIVE-12730"); + UNIT_ASSERT_STRING_CONTAINS(table.parameters["auto.purge"], "false"); + UNIT_ASSERT_VALUES_UNEQUAL(table.parameters["trino_query_id"], TString()); + UNIT_ASSERT_STRING_CONTAINS(table.parameters["trino_version"], "447"); + UNIT_ASSERT_VALUES_UNEQUAL(table.parameters["transient_lastDdlTime"], TString()); + UNIT_ASSERT_VALUES_EQUAL(table.viewOriginalText, TString()); + UNIT_ASSERT_VALUES_EQUAL(table.viewExpandedText, TString()); + UNIT_ASSERT_VALUES_EQUAL(table.tableType, "MANAGED_TABLE"); + UNIT_ASSERT(!table.__isset.privileges); + UNIT_ASSERT_VALUES_EQUAL(table.temporary, 0); + UNIT_ASSERT_VALUES_EQUAL(table.rewriteEnabled, 0); + UNIT_ASSERT(!table.__isset.creationMetadata); + UNIT_ASSERT_VALUES_EQUAL(table.catName, "hive"); + UNIT_ASSERT_EQUAL(table.ownerType, Apache::Hadoop::Hive::PrincipalType::USER); + UNIT_ASSERT_VALUES_EQUAL(table.writeId, -1); + UNIT_ASSERT(!table.__isset.isStatsCompliant); + UNIT_ASSERT(!table.__isset.colStats); + UNIT_ASSERT(!table.__isset.accessType); + UNIT_ASSERT(!table.__isset.requiredReadCapabilities); + UNIT_ASSERT(!table.__isset.requiredWriteCapabilities); + UNIT_ASSERT(!table.__isset.id); + UNIT_ASSERT(!table.__isset.fileMetadata); + UNIT_ASSERT(!table.__isset.dictionary); + UNIT_ASSERT(!table.__isset.txnId); + + auto partitions = client.GetPartitionsByFilter("hive", "request_logs", "").GetValue(TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 4); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].values.size(), 3); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].values[0], "2024"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].values[1], "05"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].values[2], "01"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].dbName, "hive"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].tableName, "request_logs"); + UNIT_ASSERT_VALUES_UNEQUAL(partitions[0].createTime, 0); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].lastAccessTime, 0); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.cols.size(), 4); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.cols[0].name, "request_time"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.cols[0].type, "timestamp"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.cols[1].name, "url"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.cols[1].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.cols[2].name, "ip"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.cols[2].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.cols[3].name, "user_agent"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.cols[3].type, "string"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.location, "s3://datalake/data/logs/hive/request_logs/year=2024/month=05/day=01"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.inputFormat, "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.outputFormat, "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.compressed, 0); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.numBuckets, 0); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.serdeInfo.name, "request_logs"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.serdeInfo.serializationLib, "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.serdeInfo.parameters.size(), 0); + UNIT_ASSERT(!partitions[0].sd.serdeInfo.__isset.description); + UNIT_ASSERT(!partitions[0].sd.serdeInfo.__isset.serializerClass); + UNIT_ASSERT(!partitions[0].sd.serdeInfo.__isset.deserializerClass); + UNIT_ASSERT(!partitions[0].sd.serdeInfo.__isset.serdeType); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.bucketCols.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.sortCols.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.parameters.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.skewedInfo.skewedColNames.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.skewedInfo.skewedColValues.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.skewedInfo.skewedColValueLocationMaps.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].sd.storedAsSubDirectories, 0); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].parameters.size(), 8); + UNIT_ASSERT(partitions[0].parameters.contains("COLUMN_STATS_ACCURATE")); + UNIT_ASSERT(partitions[0].parameters.contains("STATS_GENERATED_VIA_STATS_TASK")); + UNIT_ASSERT(partitions[0].parameters.contains("numFiles")); + UNIT_ASSERT(partitions[0].parameters.contains("numRows")); + UNIT_ASSERT(partitions[0].parameters.contains("totalSize")); + UNIT_ASSERT(partitions[0].parameters.contains("transient_lastDdlTime")); + UNIT_ASSERT(partitions[0].parameters.contains("trino_query_id")); + UNIT_ASSERT(partitions[0].parameters.contains("trino_version")); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].parameters["COLUMN_STATS_ACCURATE"], R"({"COLUMN_STATS":{"ip":"true","request_time":"true","url":"true","user_agent":"true"}})"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].parameters["STATS_GENERATED_VIA_STATS_TASK"], "workaround for potential lack of HIVE-12730"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].parameters["numFiles"], "3"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].parameters["numRows"], "3"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].parameters["totalSize"], "2367"); + UNIT_ASSERT_VALUES_UNEQUAL(partitions[0].parameters["transient_lastDdlTime"], TString()); + UNIT_ASSERT_VALUES_UNEQUAL(partitions[0].parameters["trino_query_id"], TString()); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].parameters["trino_version"], "447"); + UNIT_ASSERT(!partitions[0].__isset.privileges); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].catName, "hive"); + UNIT_ASSERT_VALUES_EQUAL(partitions[0].writeId, -1); + UNIT_ASSERT(!partitions[0].__isset.isStatsCompliant); + UNIT_ASSERT(!partitions[0].__isset.colStats); + UNIT_ASSERT(!partitions[0].__isset.fileMetadata); + + Apache::Hadoop::Hive::TableStatsRequest request; + request.__set_dbName("hive"); + request.__set_tblName("request_logs"); + request.__set_colNames({"request_time", "url", "ip", "user_agent"}); + + auto statisctics = client.GetTableStatistics(request).GetValue(TDuration::Seconds(10)); + UNIT_ASSERT_VALUES_EQUAL(statisctics.tableStats.size(), 0); + UNIT_ASSERT(!statisctics.__isset.isStatsCompliant); + } + } +} + } \ No newline at end of file diff --git a/ydb/core/external_sources/hive_metastore/ut/hive_metastore_fetcher_ut.cpp b/ydb/core/external_sources/hive_metastore/ut/hive_metastore_fetcher_ut.cpp new file mode 100644 index 000000000000..66e7676964cd --- /dev/null +++ b/ydb/core/external_sources/hive_metastore/ut/hive_metastore_fetcher_ut.cpp @@ -0,0 +1,157 @@ +#include +#include +#include +#include +#include + +#include + +namespace NKikimr::NExternalSource { + +using namespace NActors; +using namespace NKikimr; +using namespace NKikimr::NExternalSource; + +namespace { + +////////////////////////////////////////////////////// + +using TRuntimePtr = std::shared_ptr; + +struct TTestBootstrap { + TRuntimePtr Runtime; + + TTestBootstrap() + : Runtime(PrepareTestActorRuntime()) { + } + +private: + TRuntimePtr PrepareTestActorRuntime() + { + TRuntimePtr runtime(new TTestBasicRuntime()); + SetupTabletServices(*runtime); + return runtime; + } +}; + +} // namespace + +////////////////////////////////////////////////////// + +Y_UNIT_TEST_SUITE(HiveMetastoreFetcher) { + Y_UNIT_TEST(SuccessTrinoRequest) { + const TString host = "0.0.0.0"; + const int32_t port = stoi(GetExternalPort("hive-metastore", "9083")); + WaitHiveMetastore(host, port, "final"); + + TTestBootstrap bootstrap; + { + auto fetcher = bootstrap.Runtime->Register(CreateHiveMetastoreFetcherActor(host, port).release()); + + auto request = std::make_unique(); + request->DatbaseName = "hive"; + request->TableName = "request_logs"; + NThreading::TPromise promise = NThreading::NewPromise(); + request->Promise = promise; + + bootstrap.Runtime->Send(new IEventHandle(fetcher, TActorId{}, request.release())); + auto future = promise.GetFuture(); + while (!future.HasException() && !future.HasValue()) { + bootstrap.Runtime->DispatchEvents({}, TDuration::Seconds(1)); + } + + auto table = future.GetValue(); + UNIT_ASSERT_VALUES_EQUAL(table.Columns.size(), 7); + UNIT_ASSERT_VALUES_EQUAL(table.Columns[0].name(), "request_time"); + UNIT_ASSERT_EQUAL(table.Columns[0].type().type_id(), Ydb::Type::TIMESTAMP); + UNIT_ASSERT_VALUES_EQUAL(table.Columns[1].name(), "url"); + UNIT_ASSERT_EQUAL(table.Columns[1].type().type_id(), Ydb::Type::UTF8); + UNIT_ASSERT_VALUES_EQUAL(table.Columns[2].name(), "ip"); + UNIT_ASSERT_EQUAL(table.Columns[2].type().type_id(), Ydb::Type::UTF8); + UNIT_ASSERT_VALUES_EQUAL(table.Columns[3].name(), "user_agent"); + UNIT_ASSERT_EQUAL(table.Columns[3].type().type_id(), Ydb::Type::UTF8); + UNIT_ASSERT_VALUES_EQUAL(table.Columns[4].name(), "year"); + UNIT_ASSERT_EQUAL(table.Columns[4].type().type_id(), Ydb::Type::UTF8); + UNIT_ASSERT_VALUES_EQUAL(table.Columns[5].name(), "month"); + UNIT_ASSERT_EQUAL(table.Columns[5].type().type_id(), Ydb::Type::UTF8); + UNIT_ASSERT_VALUES_EQUAL(table.Columns[6].name(), "day"); + UNIT_ASSERT_EQUAL(table.Columns[6].type().type_id(), Ydb::Type::UTF8); + UNIT_ASSERT_VALUES_EQUAL(table.Compression, ""); + UNIT_ASSERT_VALUES_EQUAL(table.Format, "parquet"); + UNIT_ASSERT_VALUES_EQUAL(table.Location, "s3://datalake/data/logs/hive/request_logs"); + UNIT_ASSERT_VALUES_EQUAL(table.PartitionedBy.size(), 3); + UNIT_ASSERT_VALUES_EQUAL(table.PartitionedBy[0], "year"); + UNIT_ASSERT_VALUES_EQUAL(table.PartitionedBy[1], "month"); + UNIT_ASSERT_VALUES_EQUAL(table.PartitionedBy[2], "day"); + } + { + auto fetcher = bootstrap.Runtime->Register(CreateHiveMetastoreFetcherActor(host, port).release()); + + auto request = std::make_unique(); + request->DatbaseName = "hive"; + request->TableName = "request_logs"; + NThreading::TPromise promise = NThreading::NewPromise(); + request->Promise = promise; + + auto now = TInstant::Now(); + bootstrap.Runtime->Send(new IEventHandle(fetcher, TActorId{}, request.release())); + auto future = promise.GetFuture(); + while (!future.HasException() && !future.HasValue()) { + bootstrap.Runtime->DispatchEvents({}, TDuration::Seconds(1)); + if (TInstant::Now() - now > TDuration::Seconds(1)) { + UNIT_ASSERT(false); + } + } + + auto table = future.GetValue(); + UNIT_ASSERT(table.Rows); + UNIT_ASSERT(table.Size); + UNIT_ASSERT_VALUES_EQUAL(*table.Rows, 6); + UNIT_ASSERT_VALUES_EQUAL(*table.Size, 4734); + } + { + auto fetcher = bootstrap.Runtime->Register(CreateHiveMetastoreFetcherActor(host, port).release()); + + auto request = std::make_unique(); + request->DatbaseName = "hive"; + request->TableName = "request_logs"; + NThreading::TPromise promise = NThreading::NewPromise(); + request->Promise = promise; + + auto now = TInstant::Now(); + bootstrap.Runtime->Send(new IEventHandle(fetcher, TActorId{}, request.release())); + auto future = promise.GetFuture(); + while (!future.HasException() && !future.HasValue()) { + bootstrap.Runtime->DispatchEvents({}, TDuration::Seconds(1)); + if (TInstant::Now() - now > TDuration::Seconds(1)) { + UNIT_ASSERT(false); + } + } + + auto partitions = future.GetValue(); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions.size(), 4); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[0].Location, "s3://datalake/data/logs/hive/request_logs/year=2024/month=05/day=01"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[0].Values.size(), 3); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[0].Values[0], "2024"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[0].Values[1], "05"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[0].Values[2], "01"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[1].Location, "s3://datalake/data/logs/hive/request_logs/year=2024/month=05/day=02"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[1].Values.size(), 3); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[1].Values[0], "2024"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[1].Values[1], "05"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[1].Values[2], "02"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[2].Location, "s3://datalake/data/logs/hive/request_logs/year=2024/month=05/day=03"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[2].Values.size(), 3); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[2].Values[0], "2024"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[2].Values[1], "05"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[2].Values[2], "03"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[3].Location, "s3://datalake/data/logs/hive/request_logs/year=2024/month=05/day=04"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[3].Values.size(), 3); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[3].Values[0], "2024"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[3].Values[1], "05"); + UNIT_ASSERT_VALUES_EQUAL(partitions.Partitions[3].Values[2], "04"); + } + } +} + +} // namespace NKikimr::NExternalSource diff --git a/ydb/core/external_sources/hive_metastore/ut/scripts/bootstrap.bash b/ydb/core/external_sources/hive_metastore/ut/scripts/bootstrap.bash new file mode 100644 index 000000000000..bc53a9816c7e --- /dev/null +++ b/ydb/core/external_sources/hive_metastore/ut/scripts/bootstrap.bash @@ -0,0 +1,10 @@ +set -e + +for i in $(seq 0 80); do + if trino --execute 'select 1' http://trino:8080; then + break + fi + sleep 1 +done + +trino --file /scripts/create_table.sql http://trino:8080 \ No newline at end of file diff --git a/ydb/core/external_sources/hive_metastore/ut/scripts/create_table.sql b/ydb/core/external_sources/hive_metastore/ut/scripts/create_table.sql new file mode 100644 index 000000000000..410bfed85ac8 --- /dev/null +++ b/ydb/core/external_sources/hive_metastore/ut/scripts/create_table.sql @@ -0,0 +1,59 @@ +SHOW CATALOGS; + +CREATE SCHEMA datalake.iceberg; +CREATE SCHEMA hive.hive +WITH (location = 's3://datalake/data/logs/hive/'); + +SHOW tables FROM datalake.iceberg; +SHOW tables FROM hive.hive; + +CREATE TABLE datalake.iceberg.request_logs ( + request_time TIMESTAMP, + Url VARCHAR, + ip VARCHAR, + user_agent VARCHAR, + year VARCHAR, + month VARCHAR, + day VARCHAR +) +WITH ( + format = 'PARQUET', + location = 's3://datalake/data/logs/', + partitioning = ARRAY['year','month', 'day'] +); + +CREATE TABLE hive.hive.request_logs ( + request_time TIMESTAMP, + Url VARCHAR, + ip VARCHAR, + user_agent VARCHAR, + year VARCHAR, + month VARCHAR, + day VARCHAR +) +WITH ( + format = 'PARQUET', + partitioned_by = ARRAY['year','month', 'day'] +); + +INSERT INTO datalake.iceberg.request_logs VALUES (timestamp '2024-05-01 01:00 UTC', 'http://test1', '0.0.0.0', 'YQ', '2024', '05', '01'); +INSERT INTO datalake.iceberg.request_logs VALUES (timestamp '2024-05-01 02:00 UTC', 'http://test2', '0.0.0.0', 'YQ', '2024', '05', '01'); +INSERT INTO datalake.iceberg.request_logs VALUES (timestamp '2024-05-01 03:00 UTC', 'http://test3', '0.0.0.0', 'YQ', '2024', '05', '01'); +INSERT INTO datalake.iceberg.request_logs VALUES (timestamp '2024-05-02 01:00 UTC', 'http://test1', '0.0.0.0', 'YQ', '2024', '05', '02'); +INSERT INTO datalake.iceberg.request_logs VALUES (timestamp '2024-05-03 01:00 UTC', 'http://test2', '0.0.0.0', 'YQ', '2024', '05', '03'); +INSERT INTO datalake.iceberg.request_logs VALUES (timestamp '2024-05-04 01:00 UTC', 'http://test1', '0.0.0.0', 'YQ', '2024', '05', '04'); + +INSERT INTO hive.hive.request_logs VALUES (timestamp '2024-05-01 01:00 UTC', 'http://test1', '0.0.0.0', 'YQ', '2024', '05', '01'); +INSERT INTO hive.hive.request_logs VALUES (timestamp '2024-05-01 02:00 UTC', 'http://test2', '0.0.0.0', 'YQ', '2024', '05', '01'); +INSERT INTO hive.hive.request_logs VALUES (timestamp '2024-05-01 03:00 UTC', 'http://test3', '0.0.0.0', 'YQ', '2024', '05', '01'); +INSERT INTO hive.hive.request_logs VALUES (timestamp '2024-05-02 01:00 UTC', 'http://test1', '0.0.0.0', 'YQ', '2024', '05', '02'); +INSERT INTO hive.hive.request_logs VALUES (timestamp '2024-05-03 01:00 UTC', 'http://test2', '0.0.0.0', 'YQ', '2024', '05', '03'); +INSERT INTO hive.hive.request_logs VALUES (timestamp '2024-05-04 01:00 UTC', 'http://test1', '0.0.0.0', 'YQ', '2024', '05', '04'); + +SELECT * FROM datalake.iceberg.request_logs; +SELECT * FROM hive.hive.request_logs; + +ANALYZE datalake.iceberg.request_logs; +ANALYZE hive.hive.request_logs; + +CREATE SCHEMA datalake.final; \ No newline at end of file diff --git a/ydb/core/external_sources/hive_metastore/ut/trino/catalog/datalake.properties b/ydb/core/external_sources/hive_metastore/ut/trino/catalog/datalake.properties new file mode 100644 index 000000000000..8c517f57ceda --- /dev/null +++ b/ydb/core/external_sources/hive_metastore/ut/trino/catalog/datalake.properties @@ -0,0 +1,9 @@ +connector.name=iceberg +hive.metastore.uri=thrift://hive-metastore:9083 +hive.s3.endpoint=http://minio:9000 +hive.s3.path-style-access=true +hive.s3.aws-access-key=minio +hive.s3.aws-secret-key=minio123 +hive.metastore-cache-ttl=0s +hive.metastore-refresh-interval=5s +hive.metastore-timeout=10s \ No newline at end of file diff --git a/ydb/core/external_sources/hive_metastore/ut/trino/catalog/hive.properties b/ydb/core/external_sources/hive_metastore/ut/trino/catalog/hive.properties new file mode 100644 index 000000000000..6783012a162b --- /dev/null +++ b/ydb/core/external_sources/hive_metastore/ut/trino/catalog/hive.properties @@ -0,0 +1,9 @@ +connector.name=hive +hive.metastore.uri=thrift://hive-metastore:9083 +hive.s3.endpoint=http://minio:9000 +hive.s3.path-style-access=true +hive.s3.aws-access-key=minio +hive.s3.aws-secret-key=minio123 +hive.metastore-cache-ttl=0s +hive.metastore-refresh-interval=5s +hive.metastore-timeout=10s \ No newline at end of file diff --git a/ydb/core/external_sources/hive_metastore/ut/ya.make b/ydb/core/external_sources/hive_metastore/ut/ya.make index 1d6e32c6ce7c..51a650145d32 100644 --- a/ydb/core/external_sources/hive_metastore/ut/ya.make +++ b/ydb/core/external_sources/hive_metastore/ut/ya.make @@ -46,12 +46,20 @@ IF (OPENSOURCE) ENDIF() SRCS( + common.cpp hive_metastore_client_ut.cpp + hive_metastore_fetcher_ut.cpp ) PEERDIR( - library/cpp/testing/unittest library/cpp/testing/common + library/cpp/testing/unittest + ydb/core/testlib + ydb/core/testlib/actors + ydb/core/testlib/basics + ydb/library/yql/minikql/comp_nodes + ydb/library/yql/minikql/comp_nodes/llvm14 + ydb/library/yql/sql/pg_dummy ) DEPENDS( diff --git a/ydb/core/external_sources/hive_metastore/ya.make b/ydb/core/external_sources/hive_metastore/ya.make index 7ab6ca86a649..ed1f56dba75f 100644 --- a/ydb/core/external_sources/hive_metastore/ya.make +++ b/ydb/core/external_sources/hive_metastore/ya.make @@ -1,12 +1,18 @@ LIBRARY() SRCS( + events.cpp hive_metastore_client.cpp + hive_metastore_converters.cpp + hive_metastore_fetcher.cpp ) PEERDIR( library/cpp/threading/future ydb/core/external_sources/hive_metastore/hive_metastore_native + ydb/library/actors/core + ydb/library/yql/providers/generic/connector/api/service/protos + ydb/library/yql/public/issue/protos ) END()