From 975049ffa0860e6c0bdf72de1c8473e873fe4061 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 27 May 2024 15:43:13 +0000 Subject: [PATCH 1/2] Refactored TMultiTableCreator --- .../proxy_service/kqp_script_executions.cpp | 190 +++++++----------- ydb/library/table_creator/table_creator.cpp | 53 +++++ ydb/library/table_creator/table_creator.h | 39 ++++ 3 files changed, 165 insertions(+), 117 deletions(-) diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index ba38ca37d0d6..00cc765ddb03 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -79,140 +79,96 @@ class TQueryBase : public NKikimr::TQueryBase { }; -class TScriptExecutionsTablesCreator : public TActorBootstrapped { +class TScriptExecutionsTablesCreator : public NTableCreator::TMultiTableCreator { + using TBase = NTableCreator::TMultiTableCreator; + public: explicit TScriptExecutionsTablesCreator(THolder resultEvent) - : ResultEvent(std::move(resultEvent)) - { - } - - void Registered(NActors::TActorSystem* sys, const NActors::TActorId& owner) override { - NActors::TActorBootstrapped::Registered(sys, owner); - Owner = owner; - } - - void Bootstrap() { - Become(&TScriptExecutionsTablesCreator::StateFunc); - RunCreateScriptExecutions(); - RunCreateScriptExecutionLeases(); - RunCreateScriptResultSets(); - } + : TBase({ + GetScriptExecutionsCreator(), + GetScriptExecutionLeasesCreator(), + GetScriptResultSetsCreator() + }) + , ResultEvent(std::move(resultEvent)) + {} private: - static NKikimrSchemeOp::TColumnDescription Col(const TString& columnName, const char* columnType) { - NKikimrSchemeOp::TColumnDescription desc; - desc.SetName(columnName); - desc.SetType(columnType); - return desc; - } - - static NKikimrSchemeOp::TColumnDescription Col(const TString& columnName, NScheme::TTypeId columnType) { - return Col(columnName, NScheme::TypeName(columnType)); - } - - static NKikimrSchemeOp::TTTLSettings TtlCol(const TString& columnName) { - NKikimrSchemeOp::TTTLSettings settings; - settings.MutableEnabled()->SetExpireAfterSeconds(DEADLINE_OFFSET.Seconds()); - settings.MutableEnabled()->SetColumnName(columnName); - settings.MutableEnabled()->MutableSysSettings()->SetRunInterval(BRO_RUN_INTERVAL.MicroSeconds()); - return settings; - } - - void RunCreateScriptExecutions() { - TablesCreating++; - Register( - CreateTableCreator( - { ".metadata", "script_executions" }, - { - Col("database", NScheme::NTypeIds::Text), - Col("execution_id", NScheme::NTypeIds::Text), - Col("run_script_actor_id", NScheme::NTypeIds::Text), - Col("operation_status", NScheme::NTypeIds::Int32), - Col("execution_status", NScheme::NTypeIds::Int32), - Col("finalization_status", NScheme::NTypeIds::Int32), - Col("execution_mode", NScheme::NTypeIds::Int32), - Col("start_ts", NScheme::NTypeIds::Timestamp), - Col("end_ts", NScheme::NTypeIds::Timestamp), - Col("query_text", NScheme::NTypeIds::Text), - Col("syntax", NScheme::NTypeIds::Int32), - Col("ast", NScheme::NTypeIds::Text), - Col("ast_compressed", NScheme::NTypeIds::String), - Col("ast_compression_method", NScheme::NTypeIds::Text), - Col("issues", NScheme::NTypeIds::JsonDocument), - Col("plan", NScheme::NTypeIds::JsonDocument), - Col("meta", NScheme::NTypeIds::JsonDocument), - Col("parameters", NScheme::NTypeIds::String), // TODO: store aparameters separately to support bigger storage. - Col("result_set_metas", NScheme::NTypeIds::JsonDocument), - Col("stats", NScheme::NTypeIds::JsonDocument), - Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline. - Col("customer_supplied_id", NScheme::NTypeIds::Text), - Col("user_token", NScheme::NTypeIds::Text), - Col("script_sinks", NScheme::NTypeIds::JsonDocument), - Col("script_secret_names", NScheme::NTypeIds::JsonDocument), - }, - { "database", "execution_id" }, - NKikimrServices::KQP_PROXY, - TtlCol("expire_at") - ) + static IActor* GetScriptExecutionsCreator() { + return CreateTableCreator( + { ".metadata", "script_executions" }, + { + Col("database", NScheme::NTypeIds::Text), + Col("execution_id", NScheme::NTypeIds::Text), + Col("run_script_actor_id", NScheme::NTypeIds::Text), + Col("operation_status", NScheme::NTypeIds::Int32), + Col("execution_status", NScheme::NTypeIds::Int32), + Col("finalization_status", NScheme::NTypeIds::Int32), + Col("execution_mode", NScheme::NTypeIds::Int32), + Col("start_ts", NScheme::NTypeIds::Timestamp), + Col("end_ts", NScheme::NTypeIds::Timestamp), + Col("query_text", NScheme::NTypeIds::Text), + Col("syntax", NScheme::NTypeIds::Int32), + Col("ast", NScheme::NTypeIds::Text), + Col("ast_compressed", NScheme::NTypeIds::String), + Col("ast_compression_method", NScheme::NTypeIds::Text), + Col("issues", NScheme::NTypeIds::JsonDocument), + Col("plan", NScheme::NTypeIds::JsonDocument), + Col("meta", NScheme::NTypeIds::JsonDocument), + Col("parameters", NScheme::NTypeIds::String), // TODO: store aparameters separately to support bigger storage. + Col("result_set_metas", NScheme::NTypeIds::JsonDocument), + Col("stats", NScheme::NTypeIds::JsonDocument), + Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline. + Col("customer_supplied_id", NScheme::NTypeIds::Text), + Col("user_token", NScheme::NTypeIds::Text), + Col("script_sinks", NScheme::NTypeIds::JsonDocument), + Col("script_secret_names", NScheme::NTypeIds::JsonDocument), + }, + { "database", "execution_id" }, + NKikimrServices::KQP_PROXY, + TtlCol("expire_at", DEADLINE_OFFSET, BRO_RUN_INTERVAL) ); } - void RunCreateScriptExecutionLeases() { - TablesCreating++; - Register( - CreateTableCreator( - { ".metadata", "script_execution_leases" }, - { - Col("database", NScheme::NTypeIds::Text), - Col("execution_id", NScheme::NTypeIds::Text), - Col("lease_deadline", NScheme::NTypeIds::Timestamp), - Col("lease_generation", NScheme::NTypeIds::Int64), - Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline. - }, - { "database", "execution_id" }, - NKikimrServices::KQP_PROXY, - TtlCol("expire_at") - ) + static IActor* GetScriptExecutionLeasesCreator() { + return CreateTableCreator( + { ".metadata", "script_execution_leases" }, + { + Col("database", NScheme::NTypeIds::Text), + Col("execution_id", NScheme::NTypeIds::Text), + Col("lease_deadline", NScheme::NTypeIds::Timestamp), + Col("lease_generation", NScheme::NTypeIds::Int64), + Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline. + }, + { "database", "execution_id" }, + NKikimrServices::KQP_PROXY, + TtlCol("expire_at", DEADLINE_OFFSET, BRO_RUN_INTERVAL) ); } - void RunCreateScriptResultSets() { - TablesCreating++; - Register( - CreateTableCreator( - { ".metadata", "result_sets" }, - { - Col("database", NScheme::NTypeIds::Text), - Col("execution_id", NScheme::NTypeIds::Text), - Col("result_set_id", NScheme::NTypeIds::Int32), - Col("row_id", NScheme::NTypeIds::Int64), - Col("expire_at", NScheme::NTypeIds::Timestamp), - Col("result_set", NScheme::NTypeIds::String), - Col("accumulated_size", NScheme::NTypeIds::Int64), - }, - { "database", "execution_id", "result_set_id", "row_id" }, - NKikimrServices::KQP_PROXY, - TtlCol("expire_at") - ) + static IActor* GetScriptResultSetsCreator() { + return CreateTableCreator( + { ".metadata", "result_sets" }, + { + Col("database", NScheme::NTypeIds::Text), + Col("execution_id", NScheme::NTypeIds::Text), + Col("result_set_id", NScheme::NTypeIds::Int32), + Col("row_id", NScheme::NTypeIds::Int64), + Col("expire_at", NScheme::NTypeIds::Timestamp), + Col("result_set", NScheme::NTypeIds::String), + Col("accumulated_size", NScheme::NTypeIds::Int64), + }, + { "database", "execution_id", "result_set_id", "row_id" }, + NKikimrServices::KQP_PROXY, + TtlCol("expire_at", DEADLINE_OFFSET, BRO_RUN_INTERVAL) ); } - void Handle(TEvTableCreator::TEvCreateTableResponse::TPtr&) { - Y_ABORT_UNLESS(TablesCreating > 0); - if (--TablesCreating == 0) { - Send(Owner, std::move(ResultEvent)); - PassAway(); - } + void OnTablesCreated() override { + Send(Owner, std::move(ResultEvent)); } - STRICT_STFUNC(StateFunc, - hFunc(TEvTableCreator::TEvCreateTableResponse, Handle); - ) - private: THolder ResultEvent; - NActors::TActorId Owner; - size_t TablesCreating = 0; }; Ydb::Query::ExecMode GetExecModeFromAction(NKikimrKqp::EQueryAction action) { diff --git a/ydb/library/table_creator/table_creator.cpp b/ydb/library/table_creator/table_creator.cpp index 788b24406d35..9c0014565704 100644 --- a/ydb/library/table_creator/table_creator.cpp +++ b/ydb/library/table_creator/table_creator.cpp @@ -380,6 +380,59 @@ using TTableCreatorRetryPolicy = IRetryPolicy; } // namespace +namespace NTableCreator { + +NKikimrSchemeOp::TColumnDescription TMultiTableCreator::Col(const TString& columnName, const char* columnType) { + NKikimrSchemeOp::TColumnDescription desc; + desc.SetName(columnName); + desc.SetType(columnType); + return desc; +} + +NKikimrSchemeOp::TColumnDescription TMultiTableCreator::Col(const TString& columnName, NScheme::TTypeId columnType) { + return Col(columnName, NScheme::TypeName(columnType)); +} + +NKikimrSchemeOp::TTTLSettings TMultiTableCreator::TtlCol(const TString& columnName, TDuration expireAfter, TDuration runInterval) { + NKikimrSchemeOp::TTTLSettings settings; + settings.MutableEnabled()->SetExpireAfterSeconds(expireAfter.Seconds()); + settings.MutableEnabled()->SetColumnName(columnName); + settings.MutableEnabled()->MutableSysSettings()->SetRunInterval(runInterval.MicroSeconds()); + return settings; +} + +TMultiTableCreator::TMultiTableCreator(std::vector tableCreators) + : TableCreators(tableCreators) +{} + +void TMultiTableCreator::Registered(NActors::TActorSystem* sys, const NActors::TActorId& owner) { + TBase::Registered(sys, owner); + Owner = owner; +} + +void TMultiTableCreator::Bootstrap() { + Become(&TMultiTableCreator::StateFunc); + + TablesCreating = TableCreators.size(); + for (const auto creator : TableCreators) { + Register(creator); + } +} + +void TMultiTableCreator::Handle(TEvTableCreator::TEvCreateTableResponse::TPtr&) { + Y_ABORT_UNLESS(TablesCreating > 0); + if (--TablesCreating == 0) { + OnTablesCreated(); + PassAway(); + } +} + +STRICT_STFUNC(TMultiTableCreator::StateFunc, + hFunc(TEvTableCreator::TEvCreateTableResponse, Handle); +); + +} // namespace NTableCreator + NActors::IActor* CreateTableCreator( TVector pathComponents, TVector columns, diff --git a/ydb/library/table_creator/table_creator.h b/ydb/library/table_creator/table_creator.h index 1a60ef865a35..2a92da654c9a 100644 --- a/ydb/library/table_creator/table_creator.h +++ b/ydb/library/table_creator/table_creator.h @@ -5,6 +5,9 @@ #include #include +#include + +#include namespace NKikimr { @@ -18,6 +21,42 @@ struct TEvTableCreator { }; }; +namespace NTableCreator { + +class TMultiTableCreator : public NActors::TActorBootstrapped { + using TBase = NActors::TActorBootstrapped; + +public: + explicit TMultiTableCreator(std::vector tableCreators); + + void Bootstrap(); + +protected: + virtual void OnTablesCreated() = 0; + + static NKikimrSchemeOp::TColumnDescription Col(const TString& columnName, const char* columnType); + + static NKikimrSchemeOp::TColumnDescription Col(const TString& columnName, NScheme::TTypeId columnType); + + static NKikimrSchemeOp::TTTLSettings TtlCol(const TString& columnName, TDuration expireAfter, TDuration runInterval); + +private: + void Registered(NActors::TActorSystem* sys, const NActors::TActorId& owner) override; + + void Handle(TEvTableCreator::TEvCreateTableResponse::TPtr&); + + STFUNC(StateFunc); + +protected: + NActors::TActorId Owner; + +private: + std::vector TableCreators; + size_t TablesCreating = 0; +}; + +} // namespace NTableCreator + NActors::IActor* CreateTableCreator( TVector pathComponents, TVector columns, From f2e0dad4dfcddc577b3676e2cd7d5697cd11847d Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 27 May 2024 15:46:18 +0000 Subject: [PATCH 2/2] Added unit test for TMultiTableCreator --- .../table_creator/table_creator_ut.cpp | 101 +++++++++++++----- 1 file changed, 75 insertions(+), 26 deletions(-) diff --git a/ydb/library/table_creator/table_creator_ut.cpp b/ydb/library/table_creator/table_creator_ut.cpp index 40a28c2cb6f4..76d0bf58d62c 100644 --- a/ydb/library/table_creator/table_creator_ut.cpp +++ b/ydb/library/table_creator/table_creator_ut.cpp @@ -8,8 +8,58 @@ namespace NKikimr { +namespace { + +class TTestTablesCreator : public NTableCreator::TMultiTableCreator { + using TBase = NTableCreator::TMultiTableCreator; + +public: + explicit TTestTablesCreator(NThreading::TPromise promise) + : TBase({ + GetFirstCreator(), + GetSecondCreator() + }) + , Promise(promise) + {} + +private: + static IActor* GetFirstCreator() { + return CreateTableCreator( + { "path", "to", "table" }, + { + Col("key", "Uint32"), + Col("value", "String"), + }, + { "key" }, + NKikimrServices::STATISTICS + ); + } + + static IActor* GetSecondCreator() { + return CreateTableCreator( + { "path", "to", "other", "table" }, + { + Col("key", NScheme::NTypeIds::Uint32), + Col("expire_at", NScheme::NTypeIds::Timestamp), + }, + { "key" }, + NKikimrServices::STATISTICS, + TtlCol("expire_at", TDuration::Zero(), TDuration::Minutes(60)) + ); + } + + void OnTablesCreated() override { + Promise.SetValue(); + } + +private: + NThreading::TPromise Promise; +}; + +} // namespace + Y_UNIT_TEST_SUITE(TableCreator) { - Y_UNIT_TEST(CreateTable) { + Y_UNIT_TEST(CreateTables) { TPortManager tp; ui16 mbusPort = tp.GetPort(); ui16 grpcPort = tp.GetPort(); @@ -23,26 +73,11 @@ Y_UNIT_TEST_SUITE(TableCreator) { client.InitRootScheme(); auto runtime = server.GetRuntime(); - TVector pathComponents = {"path", "to", "table"}; - - TVector columns; - NKikimrSchemeOp::TColumnDescription descKey; - descKey.SetName("key"); - descKey.SetType("Uint32"); - columns.push_back(descKey); - NKikimrSchemeOp::TColumnDescription descValue; - descValue.SetName("value"); - descValue.SetType("String"); - columns.push_back(descValue); - - TVector keyColumns = {"key"}; - + auto promise = NThreading::NewPromise(); TActorId edgeActor = server.GetRuntime()->AllocateEdgeActor(0); - runtime->Register(CreateTableCreator( - std::move(pathComponents), std::move(columns), std::move(keyColumns), NKikimrServices::STATISTICS), + runtime->Register(new TTestTablesCreator(promise), 0, 0, TMailboxType::Simple, 0, edgeActor); - - runtime->GrabEdgeEvent(edgeActor); + promise.GetFuture().GetValueSync(); NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << grpcPort).SetDatabase(Tests::TestDomainName); @@ -52,13 +87,27 @@ Y_UNIT_TEST_SUITE(TableCreator) { UNIT_ASSERT_C(createSessionResult.IsSuccess(), createSessionResult.GetIssues().ToString()); NYdb::NTable::TSession session(createSessionResult.GetSession()); - TString path = TString("/") + Tests::TestDomainName + "/path/to/table"; - auto result = session.DescribeTable(path).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - const auto& createdColumns = result.GetTableDescription().GetColumns(); - UNIT_ASSERT_C(createdColumns.size() == 2, "expected 2 columns"); - UNIT_ASSERT_C(createdColumns[0].Name == "key", "expected key column"); - UNIT_ASSERT_C(createdColumns[1].Name == "value", "expected value column"); + { // First table + auto path = TStringBuilder() << "/" << Tests::TestDomainName << "/path/to/table"; + auto result = session.DescribeTable(path).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + const auto& createdColumns = result.GetTableDescription().GetColumns(); + UNIT_ASSERT_VALUES_EQUAL_C(createdColumns.size(), 2, "expected 2 columns"); + UNIT_ASSERT_VALUES_EQUAL_C(createdColumns[0].Name, "key", "expected key column"); + UNIT_ASSERT_VALUES_EQUAL_C(createdColumns[1].Name, "value", "expected value column"); + UNIT_ASSERT_VALUES_EQUAL_C(createdColumns[1].Type.ToString(), "String?", "expected type string"); + } + + { // Second table + auto path = TStringBuilder() << "/" << Tests::TestDomainName << "/path/to/other/table"; + auto result = session.DescribeTable(path).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + const auto& createdColumns = result.GetTableDescription().GetColumns(); + UNIT_ASSERT_VALUES_EQUAL_C(createdColumns.size(), 2, "expected 2 columns"); + UNIT_ASSERT_VALUES_EQUAL_C(createdColumns[0].Name, "key", "expected key column"); + UNIT_ASSERT_VALUES_EQUAL_C(createdColumns[1].Name, "expire_at", "expected expire_at column"); + UNIT_ASSERT_VALUES_EQUAL_C(createdColumns[1].Type.ToString(), "Timestamp?", "expected type timestamp"); + } } }