From 903a699a05cedade820b39905169b620aa06a362 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Mon, 6 May 2024 19:34:33 +0300 Subject: [PATCH] provide bg tasks flow into schemeshard --- .../columnshard/bg_tasks/abstract/adapter.h | 23 +-- .../columnshard/bg_tasks/abstract/control.cpp | 28 +++ .../columnshard/bg_tasks/abstract/control.h | 32 +-- .../columnshard/bg_tasks/abstract/session.h | 18 +- .../tx/columnshard/bg_tasks/abstract/task.cpp | 23 +++ .../tx/columnshard/bg_tasks/abstract/task.h | 27 +-- .../tx/columnshard/bg_tasks/manager/actor.h | 9 +- .../columnshard/bg_tasks/manager/manager.cpp | 1 + .../tx/columnshard/bg_tasks/manager/manager.h | 6 +- .../tx/columnshard/bg_tasks/session/session.h | 5 + .../olap/bg_tasks/adapter/adapter.cpp | 65 +++++++ .../olap/bg_tasks/adapter/adapter.h | 14 ++ .../schemeshard/olap/bg_tasks/adapter/ya.make | 11 ++ .../olap/bg_tasks/protos/data.proto | 33 ++++ .../schemeshard/olap/bg_tasks/protos/ya.make | 13 ++ .../olap/bg_tasks/resharding/task.cpp | 5 + .../olap/bg_tasks/resharding/task.h | 182 ++++++++++++++++++ .../olap/bg_tasks/tx_chain/actor.cpp | 38 ++++ .../olap/bg_tasks/tx_chain/actor.h | 51 +++++ .../olap/bg_tasks/tx_chain/common.cpp | 22 +++ .../olap/bg_tasks/tx_chain/common.h | 18 ++ .../olap/bg_tasks/tx_chain/session.cpp | 10 + .../olap/bg_tasks/tx_chain/session.h | 75 ++++++++ .../olap/bg_tasks/tx_chain/status_channel.cpp | 5 + .../olap/bg_tasks/tx_chain/status_channel.h | 50 +++++ .../olap/bg_tasks/tx_chain/task.cpp | 10 + .../schemeshard/olap/bg_tasks/tx_chain/task.h | 37 ++++ .../olap/bg_tasks/tx_chain/ya.make | 16 ++ ydb/core/tx/schemeshard/olap/bg_tasks/ya.make | 9 + ydb/core/tx/schemeshard/olap/ya.make | 1 + ydb/core/tx/schemeshard/schemeshard_impl.cpp | 15 +- ydb/core/tx/schemeshard/schemeshard_impl.h | 2 + ydb/core/tx/schemeshard/schemeshard_schema.h | 23 ++- ydb/core/tx/schemeshard/ya.make | 1 + .../tx/tx_allocator_client/actor_client.cpp | 19 ++ .../tx/tx_allocator_client/actor_client.h | 3 + ydb/services/bg_tasks/abstract/interface.h | 16 +- ydb/services/bg_tasks/abstract/ya.make | 1 + 38 files changed, 825 insertions(+), 92 deletions(-) create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.cpp create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.h create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/adapter/ya.make create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/protos/data.proto create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/protos/ya.make create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/resharding/task.cpp create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/resharding/task.h create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.cpp create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.h create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/common.cpp create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/common.h create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/session.cpp create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/session.h create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/status_channel.cpp create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/status_channel.h create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/task.cpp create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/task.h create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/ya.make create mode 100644 ydb/core/tx/schemeshard/olap/bg_tasks/ya.make diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h b/ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h index b768e4bff029..99f7c0d2e39d 100644 --- a/ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h @@ -9,20 +9,15 @@ namespace NKikimr::NOlap::NBackground { class ITabletAdapter { private: - YDB_READONLY_DEF(NActors::TActorId, TabletActorId); virtual bool DoLoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque& records) = 0; - virtual TConclusionStatus DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0; - virtual TConclusionStatus DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0; - virtual TConclusionStatus DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0; - virtual TConclusionStatus DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) = 0; + virtual void DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0; + virtual void DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0; + virtual void DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0; + virtual void DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) = 0; public: - ITabletAdapter(const NActors::TActorId& actorId) - : TabletActorId(actorId) - { + virtual ~ITabletAdapter() = default; - } - - [[nodiscard]] TConclusionStatus RemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) { + void RemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) { return DoRemoveSessionFromLocalDatabase(txc, className, identifier); } @@ -30,15 +25,15 @@ class ITabletAdapter { return DoLoadSessionsFromLocalDatabase(txc, records); } - [[nodiscard]] TConclusionStatus SaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) { + void SaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) { return DoSaveSessionToLocalDatabase(txc, session); } - [[nodiscard]] TConclusionStatus SaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) { + void SaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) { return DoSaveStateToLocalDatabase(txc, session); } - [[nodiscard]] TConclusionStatus SaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) { + void SaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) { return DoSaveProgressToLocalDatabase(txc, session); } }; diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/control.cpp b/ydb/core/tx/columnshard/bg_tasks/abstract/control.cpp index 94181a4b5898..4ac837de8a4b 100644 --- a/ydb/core/tx/columnshard/bg_tasks/abstract/control.cpp +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/control.cpp @@ -1,5 +1,33 @@ #include "control.h" +#include namespace NKikimr::NOlap::NBackground { +NKikimrTxBackgroundProto::TSessionControlContainer TSessionControlContainer::SerializeToProto() const { + NKikimrTxBackgroundProto::TSessionControlContainer result; + result.SetSessionClassName(SessionClassName); + result.SetSessionIdentifier(SessionIdentifier); + result.SetStatusChannelContainer(ChannelContainer.SerializeToString()); + result.SetLogicControlContainer(LogicControlContainer.SerializeToString()); + return result; +} + +NKikimr::TConclusionStatus TSessionControlContainer::DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto) { + SessionClassName = proto.GetSessionClassName(); + SessionIdentifier = proto.GetSessionIdentifier(); + if (!SessionClassName) { + return TConclusionStatus::Fail("incorrect session class name for bg_task"); + } + if (!SessionIdentifier) { + return TConclusionStatus::Fail("incorrect session id for bg_task"); + } + if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) { + return TConclusionStatus::Fail("cannot parse channel from proto"); + } + if (!LogicControlContainer.DeserializeFromString(proto.GetLogicControlContainer())) { + return TConclusionStatus::Fail("cannot parse logic from proto"); + } + return TConclusionStatus::Success(); +} + } \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/control.h b/ydb/core/tx/columnshard/bg_tasks/abstract/control.h index f4158009bca3..43e53729de76 100644 --- a/ydb/core/tx/columnshard/bg_tasks/abstract/control.h +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/control.h @@ -1,10 +1,13 @@ #pragma once #include "session.h" #include "status_channel.h" -#include #include #include +namespace NKikimrTxBackgroundProto { +class TSessionControlContainer; +} + namespace NKikimr::NOlap::NBackground { class ISessionLogicControl { @@ -50,31 +53,8 @@ class TSessionControlContainer { YDB_READONLY_DEF(TStatusChannelContainer, ChannelContainer); YDB_READONLY_DEF(TSessionLogicControlContainer, LogicControlContainer); public: - NKikimrTxBackgroundProto::TSessionControlContainer SerializeToProto() const { - NKikimrTxBackgroundProto::TSessionControlContainer result; - result.SetSessionClassName(SessionClassName); - result.SetSessionIdentifier(SessionIdentifier); - result.SetStatusChannelContainer(ChannelContainer.SerializeToString()); - result.SetLogicControlContainer(LogicControlContainer.SerializeToString()); - return result; - } - TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto) { - SessionClassName = proto.GetSessionClassName(); - SessionIdentifier = proto.GetSessionIdentifier(); - if (!SessionClassName) { - return TConclusionStatus::Fail("incorrect session class name for bg_task"); - } - if (!SessionIdentifier) { - return TConclusionStatus::Fail("incorrect session id for bg_task"); - } - if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) { - return TConclusionStatus::Fail("cannot parse channel from proto"); - } - if (!LogicControlContainer.DeserializeFromString(proto.GetLogicControlContainer())) { - return TConclusionStatus::Fail("cannot parse logic from proto"); - } - return TConclusionStatus::Success(); - } + NKikimrTxBackgroundProto::TSessionControlContainer SerializeToProto() const; + TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto); TSessionControlContainer() = default; diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/session.h b/ydb/core/tx/columnshard/bg_tasks/abstract/session.h index 63402f93e686..55317ff6665d 100644 --- a/ydb/core/tx/columnshard/bg_tasks/abstract/session.h +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/session.h @@ -11,14 +11,24 @@ namespace NKikimr::NOlap::NBackground { +class TSession; +class ITabletAdapter; + class TStartContext { private: + YDB_READONLY_DEF(TTabletId, TabletId); YDB_READONLY_DEF(NActors::TActorId, TabletActorId); YDB_READONLY_DEF(TStatusChannelContainer, Channel); + YDB_READONLY_DEF(std::shared_ptr, SessionSelfPtr); + YDB_READONLY_DEF(std::shared_ptr, Adapter); public: - TStartContext(const NActors::TActorId& tabletActorId, const TStatusChannelContainer channel) - : TabletActorId(tabletActorId) + TStartContext(const TTabletId tabletId, const NActors::TActorId& tabletActorId, const TStatusChannelContainer channel, const std::shared_ptr& sessionSelfPtr, + const std::shared_ptr& adapter) + : TabletId(tabletId) + , TabletActorId(tabletActorId) , Channel(channel) + , SessionSelfPtr(sessionSelfPtr) + , Adapter(adapter) { } @@ -82,7 +92,7 @@ class ISessionLogic { }; template -class TSessionProtoAdapter: public TInterfaceProtoAdapter { +class TSessionProtoAdapter: public NBackgroundTasks::TInterfaceProtoAdapter { protected: using TProtoProgress = TProtoProgressExt; using TProtoState = TProtoStateExt; @@ -101,7 +111,7 @@ class TSessionProtoAdapter: public TInterfaceProtoAdapter namespace NKikimr::NOlap::NBackground { +NKikimrTxBackgroundProto::TTaskContainer TTask::SerializeToProto() const { + NKikimrTxBackgroundProto::TTaskContainer result; + result.SetIdentifier(Identifier); + result.SetStatusChannelContainer(ChannelContainer.SerializeToString()); + result.SetTaskDescriptionContainer(DescriptionContainer.SerializeToString()); + return result; +} + +NKikimr::TConclusionStatus TTask::DeserializeFromProto(const NKikimrTxBackgroundProto::TTaskContainer& proto) { + Identifier = proto.GetIdentifier(); + if (!Identifier) { + return TConclusionStatus::Fail("empty identifier is not correct for bg_task"); + } + if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) { + return TConclusionStatus::Fail("cannot parse status channel from proto"); + } + if (!DescriptionContainer.DeserializeFromString(proto.GetTaskDescriptionContainer())) { + return TConclusionStatus::Fail("cannot parse task description from proto"); + } + return TConclusionStatus::Success(); +} + } \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/task.h b/ydb/core/tx/columnshard/bg_tasks/abstract/task.h index 99877abe784d..217fbea65f03 100644 --- a/ydb/core/tx/columnshard/bg_tasks/abstract/task.h +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/task.h @@ -1,9 +1,12 @@ #pragma once #include "session.h" -#include #include #include +namespace NKikimrTxBackgroundProto { +class TTaskContainer; +} + namespace NKikimr::NOlap::NBackground { class ITaskDescription { @@ -54,26 +57,8 @@ class TTask { AFL_VERIFY(!!ChannelContainer); AFL_VERIFY(!!DescriptionContainer); } - NKikimrTxBackgroundProto::TTaskContainer SerializeToProto() const { - NKikimrTxBackgroundProto::TTaskContainer result; - result.SetIdentifier(Identifier); - result.SetStatusChannelContainer(ChannelContainer.SerializeToString()); - result.SetTaskDescriptionContainer(DescriptionContainer.SerializeToString()); - return result; - } - TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TTaskContainer& proto) { - Identifier = proto.GetIdentifier(); - if (!Identifier) { - return TConclusionStatus::Fail("empty identifier is not correct for bg_task"); - } - if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) { - return TConclusionStatus::Fail("cannot parse status channel from proto"); - } - if (!DescriptionContainer.DeserializeFromString(proto.GetTaskDescriptionContainer())) { - return TConclusionStatus::Fail("cannot parse task description from proto"); - } - return TConclusionStatus::Success(); - } + NKikimrTxBackgroundProto::TTaskContainer SerializeToProto() const; + TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TTaskContainer& proto); }; } \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/manager/actor.h b/ydb/core/tx/columnshard/bg_tasks/manager/actor.h index 6b5fbcc9e6f9..e111251c28af 100644 --- a/ydb/core/tx/columnshard/bg_tasks/manager/actor.h +++ b/ydb/core/tx/columnshard/bg_tasks/manager/actor.h @@ -11,17 +11,17 @@ namespace NKikimr::NOlap::NBackground { class TSessionActor: public NActors::TActorBootstrapped { private: - const TTabletId TabletId; - const NActors::TActorId TabletActorId; ui64 TxCounter = 0; std::optional SaveSessionProgressTx; std::optional SaveSessionStateTx; - std::shared_ptr Adapter; virtual void OnTxCompleted(const ui64 txInternalId) = 0; virtual void OnSessionProgressSaved() = 0; virtual void OnSessionStateSaved() = 0; virtual void OnBootstrap(const TActorContext& ctx) = 0; protected: + const TTabletId TabletId; + const NActors::TActorId TabletActorId; + std::shared_ptr Adapter; std::shared_ptr Session; ui64 GetNextTxId() { return ++TxCounter; @@ -35,8 +35,9 @@ class TSessionActor: public NActors::TActorBootstrapped { void SaveSessionState(); public: - TSessionActor(const TTabletId tabletId, const std::shared_ptr& session, const std::shared_ptr& adapter) + TSessionActor(const TTabletId tabletId, const NActors::TActorId tabletActorId, const std::shared_ptr& session, const std::shared_ptr& adapter) : TabletId(tabletId) + , TabletActorId(tabletActorId) , Adapter(adapter) , Session(session) { diff --git a/ydb/core/tx/columnshard/bg_tasks/manager/manager.cpp b/ydb/core/tx/columnshard/bg_tasks/manager/manager.cpp index b9c8cd3ea650..c07390f2e2e1 100644 --- a/ydb/core/tx/columnshard/bg_tasks/manager/manager.cpp +++ b/ydb/core/tx/columnshard/bg_tasks/manager/manager.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace NKikimr::NOlap::NBackground { diff --git a/ydb/core/tx/columnshard/bg_tasks/manager/manager.h b/ydb/core/tx/columnshard/bg_tasks/manager/manager.h index fc405f67b799..f6c269407f81 100644 --- a/ydb/core/tx/columnshard/bg_tasks/manager/manager.h +++ b/ydb/core/tx/columnshard/bg_tasks/manager/manager.h @@ -5,9 +5,13 @@ #include #include #include -#include #include +namespace NKikimrTxBackgroundProto { +class TTaskContainer; +class TSessionControlContainer; +} + namespace NKikimr::NOlap::NBackground { class TSessionsManager { diff --git a/ydb/core/tx/columnshard/bg_tasks/session/session.h b/ydb/core/tx/columnshard/bg_tasks/session/session.h index 2f9ed198280a..387e536ea6e8 100644 --- a/ydb/core/tx/columnshard/bg_tasks/session/session.h +++ b/ydb/core/tx/columnshard/bg_tasks/session/session.h @@ -23,6 +23,11 @@ class TSession { return result; } + template + std::shared_ptr GetLogicAsVerifiedPtr() const { + return LogicContainer.GetObjectPtrVerifiedAs(); + } + [[nodiscard]] TConclusionStatus DeserializeFromLocalDatabase(TSessionRecord&& record) { Identifier = record.GetIdentifier(); ChannelContainer.DeserializeFromString(record.GetStatusChannel()); diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.cpp b/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.cpp new file mode 100644 index 000000000000..ffa34560383d --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.cpp @@ -0,0 +1,65 @@ +#include "adapter.h" +#include + +namespace NKikimr::NSchemeShard::NBackground { + +bool TAdapter::DoLoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque& records) { + NIceDb::TNiceDb db(txc.DB); + using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions; + auto rowset = db.Table().Select(); + if (!rowset.IsReady()) { + return false; + } + + std::deque result; + while (!rowset.EndOfSet()) { + NOlap::NBackground::TSessionRecord sRecord; + sRecord.SetClassName(rowset.GetValue()); + sRecord.SetIdentifier(rowset.GetValue()); + sRecord.SetLogicDescription(rowset.GetValue()); + sRecord.SetStatusChannel(rowset.GetValue()); + sRecord.SetProgress(rowset.GetValue()); + sRecord.SetState(rowset.GetValue()); + result.emplace_back(std::move(sRecord)); + if (!rowset.Next()) { + return false; + } + } + std::swap(result, records); + return true; +} + +void TAdapter::DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) { + NIceDb::TNiceDb db(txc.DB); + using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions; + db.Table().Key(container.GetClassName(), container.GetIdentifier()).Update( + NIceDb::TUpdate(container.GetProgress()) + ); +} + +void TAdapter::DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) { + NIceDb::TNiceDb db(txc.DB); + using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions; + db.Table().Key(container.GetClassName(), container.GetIdentifier()).Update( + NIceDb::TUpdate(container.GetState()) + ); +} + +void TAdapter::DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) { + NIceDb::TNiceDb db(txc.DB); + using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions; + db.Table().Key(container.GetClassName(), container.GetIdentifier()).Update( + NIceDb::TUpdate(container.GetLogicDescription()), + NIceDb::TUpdate(container.GetStatusChannel()), + NIceDb::TUpdate(container.GetProgress()), + NIceDb::TUpdate(container.GetState()) + ); +} + +void TAdapter::DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) { + NIceDb::TNiceDb db(txc.DB); + using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions; + db.Table().Key(className, identifier).Delete(); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.h b/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.h new file mode 100644 index 000000000000..ec50c4ea1eef --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.h @@ -0,0 +1,14 @@ +#pragma once +#include + +namespace NKikimr::NSchemeShard::NBackground { + +class TAdapter: public NKikimr::NOlap::NBackground::ITabletAdapter { +protected: + virtual bool DoLoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque& records) override; + virtual void DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) override; + virtual void DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) override; + virtual void DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) override; + virtual void DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) override;; +}; +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/ya.make b/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/ya.make new file mode 100644 index 000000000000..4b5a0b0f87a0 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + adapter.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/bg_tasks/abstract +) + +END() diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/protos/data.proto b/ydb/core/tx/schemeshard/olap/bg_tasks/protos/data.proto new file mode 100644 index 000000000000..6b862855b56b --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/protos/data.proto @@ -0,0 +1,33 @@ +package NKikimrSchemeShardTxBackgroundProto; + +import "ydb/core/protos/flat_scheme_op.proto"; + +message TTxModification { + optional NKikimrSchemeOp.TModifyScheme Transaction = 1; +} + +message TTxChainCommonData { + optional string TablePath = 1; + repeated TTxModification Modification = 2; +} + +message TTxChainTask { + optional TTxChainCommonData CommonData = 1; +} + +message TTxChainSessionLogic { + optional TTxChainCommonData CommonData = 1; +} + +message TTxChainSessionProgress { + optional uint32 StepForExecute = 1; +} + +message TTxChainSessionState { +} + +message TStatusChannel { + optional uint64 SSTabletId = 1; + optional string TaskClassName = 2; + optional string TaskIdentifier = 3; +} diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/protos/ya.make b/ydb/core/tx/schemeshard/olap/bg_tasks/protos/ya.make new file mode 100644 index 000000000000..da3a31187631 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/protos/ya.make @@ -0,0 +1,13 @@ +PROTO_LIBRARY() + +SRCS( + data.proto +) + +PEERDIR( + ydb/core/protos +) + +EXCLUDE_TAGS(GO_PROTO) + +END() diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/resharding/task.cpp b/ydb/core/tx/schemeshard/olap/bg_tasks/resharding/task.cpp new file mode 100644 index 000000000000..5d6c6f5798d7 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/resharding/task.cpp @@ -0,0 +1,5 @@ +#include "task.h" + +namespace NKikimr::NOlap::NBackground { + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/resharding/task.h b/ydb/core/tx/schemeshard/olap/bg_tasks/resharding/task.h new file mode 100644 index 000000000000..c72f2a9e5fc1 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/resharding/task.h @@ -0,0 +1,182 @@ +#pragma once +#include "session.h" +#include +#include +#include + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +class TStatusChannel: public NBackgroundTasks::TInterfaceProtoAdapter { +private: + TString TaskClassName; + TString TaskIdentifier; + ui64 SSTabletId; +public: + TStatusChannel(const TString taskClassName, const TString& taskId, const ui64 ssTabletId) + : TaskClassName(taskClassName) + , TaskIdentifier(taskId) + , SSTabletId(ssTabletId) + { + AFL_VERIFY(!!TaskClassName); + AFL_VERIFY(!!TaskIdentifier); + AFL_VERIFY(SSTabletId); + } + virtual void DoOnFail(const TString& errorMessage) const override { + + } + virtual void DoOnAdded() const override { + + } + virtual void DoOnFinished() const override { + + } + virtual TProtoStorage DoSerializeToProto() const override { + TProtoStorage result; + result.SetTaskClassName(TaskClassName); + result.SetTaskIdentifier(TaskIdentifier); + result.SetSSTabletId(SSTabletId); + return result; + } + virtual TConclusionStatus DoDeserializeFromProto(const TProtoStorage& proto) override { + TaskClassName = proto.GetTaskClassName(); + TaskIdentifier = result.GetTaskIdentifier(); + SSTabletId = result.GetSSTabletId(); + return TConclusionStatus::Success(); + } +}; + +class TTransactionsActor: public NOlap::NBackground::TSessionActor { +private: + std::shared_ptr SessionLogic; +protected: + virtual void OnTxCompleted(const ui64 txInternalId) override { + + } + virtual void OnSessionProgressSaved() override { + + } + virtual void OnSessionStateSaved() override { + if (SessionLogic->GetStepForExecute() < SessionLogic->GetTransactions().size()) { + SendTransactionForExecute(SessionLogic->GetTransactions()[SessionLogic->GetStepForExecute()]); + } + } + virtual void OnBootstrap(const TActorContext& ctx) override { + Become(&TTransactionsActor::StateInProgress); + SessionLogic = Session.GetLogicAsVerifiedPtr(); + AFL_VERIFY(SessionLogic->GetStepForExecute() < SessionLogic->GetTransactions().size()); + SendTransactionForExecute(SessionLogic->GetTransactions()[SessionLogic->GetStepForExecute()]); + } + + void SendTransactionForExecute(const NKikimrSchemeOp::TModifyScheme& modification) const { + auto ev = std::make_unique(txId, SSTabletId); + *ev->Record.AddTransaction() = modification; + NActors::TActivationContext::AsActorContext().Send(MakePipePeNodeCacheID(false), + new TEvPipeCache::TEvForward(ev.release(), SSTabletId, true), IEventHandle::FlagTrackDelivery, SessionLogic->GetStepForExecute()); + } + + void Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev) { + SessionLogic->NextStep(); + SaveSessionProgress(); + } +public: + STATEFN(StateInProgress) { + const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_BACKGROUND)("SelfId", SelfId())("TabletId", TabletId); + switch (ev->GetTypeRewrite()) { + hFunc(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle); + default: + TBase::StateInProgress(ev); + } + } + +}; + +class TSessionTransactions: public NOlap::NBackground::TSessionProtoAdapter< + NKikimrSchemeShardTxBackgroundProto::TSessionTransactionsLogic, + NKikimrSchemeShardTxBackgroundProto::TSessionTransactionsProgress, + NKikimrSchemeShardTxBackgroundProto::TSessionTransactionsState> { +public: + static TString GetStaticClassName() { + return "SS::BG::TRANSACTIONS"; + } +private: + using TBase = NBackgroundTasks::TInterfaceProtoAdapter; + YDB_READONLY_DEF(TString, TablePath); + YDB_READONLY_DEF(std::vector, Transactions); + YDB_READONLY(ui32, StepForExecute, 0); +protected: + virtual TConclusion> DoCreateActor(const TStartContext& context) const override { + + } + virtual TConclusionStatus DoDeserializeFromProto(const TProtoLogic& proto) override { + TablePath = proto.GetTablePath(); + for (auto&& i : proto.GetModifications()) { + Transactions.emplace_back(i.GetTransaction()); + } + return TConclusionStatus::Success(); + } + virtual TProtoLogic DoSerializeToProto() const override { + TProtoLogic result; + result.SetTablePath(TablePath); + for (auto&& i : Transactions) { + *result.AddModification()->MutableTransaction() = i; + } + return result; + } + virtual TConclusionStatus DoDeserializeProgressFromProto(const TProtoProgress& proto) override { + StepForExecute = proto.GetStepForExecute(); + return TConclusionStatus::Success(); + } + virtual TProtoProgress DoSerializeProgressToProto() const override { + TProtoProgress proto; + proto.SetStepForExecute(StepForExecute); + return proto; + } + virtual TConclusionStatus DoDeserializeStateFromProto(const TProtoState& proto) override { + return TConclusionStatus::Success(); + } + virtual TProtoState DoSerializeStateToProto() const override { + + } +public: + virtual TString GetClassName() const override { + return GetStaticClassName(); + } + virtual bool IsReadyForStart() const override { + return true; + } + virtual bool IsFinished() const override { + AFL_VERIFY(StepForExecute <= Transactions.size()) + return StepForExecute == Transactions.size(); + } + virtual bool IsReadyForRemove() const override { + return true; + } +}; + +class TTaskResharding: public NBackgroundTasks::TInterfaceProtoAdapter { +private: + using TBase = NBackgroundTasks::TInterfaceProtoAdapter; +public: + static TString GetStaticClassName() { + return "SS::OLAP::RESHARDING"; + } +private: + virtual TConclusionStatus DoDeserializeFromProto(const TProtoStorage& proto) override { + TablePath = proto.GetTablePath(); + } + virtual TProtoStorage DoSerializeToProto() const { + TProtoStorage result; + result.SetTablePath(TablePath); + return result; + } + virtual std::shared_ptr DoBuildSession() const override { + return std::make_shared(TablePath); + } + static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetStaticClassName()); +public: + virtual TString GetClassName() const override { + return GetStaticClassName(); + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.cpp b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.cpp new file mode 100644 index 000000000000..4cbf2a7f6ac4 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.cpp @@ -0,0 +1,38 @@ +#include "actor.h" +#include +#include + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +void TTxChainActor::SendTransactionForExecute(const ui64 txId, const NKikimrSchemeOp::TModifyScheme& modification) const { + auto ev = std::make_unique(txId, (ui64)TabletId); + *ev->Record.AddTransaction() = modification; + NActors::TActivationContext::AsActorContext().Send(MakePipePeNodeCacheID(false), + new TEvPipeCache::TEvForward(ev.release(), (ui64)TabletId, true), IEventHandle::FlagTrackDelivery, SessionLogic->GetStepForExecute()); +} + +void TTxChainActor::OnBootstrap(const TActorContext& ctx) { + Become(&TTxChainActor::StateInProgress); + SessionLogic = Session->GetLogicAsVerifiedPtr(); + AFL_VERIFY(SessionLogic->GetStepForExecute() < SessionLogic->GetTxData().GetTransactions().size()); + TxAllocatorClient = RegisterWithSameMailbox(CreateTxAllocatorClient(&AppDataVerified())); + ctx.Send(TxAllocatorClient, MakeHolder(1)); +} + +void TTxChainActor::Handle(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev) { + AFL_VERIFY(ev->Get()->TxIds.size() == 1); + SendTransactionForExecute(ev->Get()->TxIds.front(), SessionLogic->GetTxData().GetTransactions()[SessionLogic->GetStepForExecute()]); +} + +void TTxChainActor::Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& /*ev*/) { + SessionLogic->NextStep(); + SaveSessionProgress(); +} + +void TTxChainActor::OnSessionStateSaved() { + if (SessionLogic->GetStepForExecute() < SessionLogic->GetTxData().GetTransactions().size()) { + NActors::TActivationContext::AsActorContext().Send(TxAllocatorClient, MakeHolder(1)); + } +} + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.h b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.h new file mode 100644 index 000000000000..add6ac97d267 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.h @@ -0,0 +1,51 @@ +#pragma once +#include "session.h" +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +class TTxChainActor: public NKikimr::NOlap::NBackground::TSessionActor { +private: + using TBase = NKikimr::NOlap::NBackground::TSessionActor; + std::shared_ptr SessionLogic; + NActors::TActorId TxAllocatorClient; +protected: + virtual void OnTxCompleted(const ui64 /*txInternalId*/) override { + + } + virtual void OnSessionProgressSaved() override { + + } + virtual void OnSessionStateSaved() override; + virtual void OnBootstrap(const TActorContext& /*ctx*/) override; + + void SendTransactionForExecute(const ui64 txId, const NKikimrSchemeOp::TModifyScheme& modification) const; + + void Handle(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev); + void Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& /*ev*/); +public: + TTxChainActor(const NKikimr::NOlap::TTabletId tabletId, const NActors::TActorId tabletActorId, const std::shared_ptr& session, const std::shared_ptr& adapter) + : TBase(tabletId, tabletActorId, session, adapter) + { + AFL_VERIFY(!!Session); + AFL_VERIFY(!!Adapter); + } + + STATEFN(StateInProgress) { + const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_BACKGROUND)("SelfId", SelfId())("TabletId", TabletId); + switch (ev->GetTypeRewrite()) { + hFunc(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle); + hFunc(TEvTxAllocatorClient::TEvAllocateResult, Handle); + default: + TBase::StateInProgress(ev); + } + } + +}; +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/common.cpp b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/common.cpp new file mode 100644 index 000000000000..b4d7c94df9d6 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/common.cpp @@ -0,0 +1,22 @@ +#include "common.h" + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +TConclusionStatus TTxChainData::DeserializeFromProto(const TProtoStorage& proto) { + TablePath = proto.GetTablePath(); + for (auto&& i : proto.GetModification()) { + Transactions.emplace_back(i.GetTransaction()); + } + return TConclusionStatus::Success(); +} + +TTxChainData::TProtoStorage TTxChainData::SerializeToProto() const { + TProtoStorage result; + result.SetTablePath(TablePath); + for (auto&& i : Transactions) { + *result.AddModification()->MutableTransaction() = i; + } + return result; +} + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/common.h b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/common.h new file mode 100644 index 000000000000..bce627d7a668 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/common.h @@ -0,0 +1,18 @@ +#pragma once +#include +#include +#include + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +class TTxChainData { +private: + YDB_READONLY_DEF(TString, TablePath); + YDB_READONLY_DEF(std::vector, Transactions); +public: + using TProtoStorage = NKikimrSchemeShardTxBackgroundProto::TTxChainCommonData; + TConclusionStatus DeserializeFromProto(const TProtoStorage& proto); + TProtoStorage SerializeToProto() const; +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/session.cpp b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/session.cpp new file mode 100644 index 000000000000..e916ae0434f2 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/session.cpp @@ -0,0 +1,10 @@ +#include "session.h" +#include "actor.h" + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +TConclusion> TTxChainSession::DoCreateActor(const NKikimr::NOlap::NBackground::TStartContext& context) const { + return std::make_unique(context.GetTabletId(), context.GetTabletActorId(), context.GetSessionSelfPtr(), context.GetAdapter()); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/session.h b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/session.h new file mode 100644 index 000000000000..8c3cbf9e7ead --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/session.h @@ -0,0 +1,75 @@ +#pragma once +#include "common.h" +#include +#include + +#include +#include + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +class TTxChainSession: public NKikimr::NOlap::NBackground::TSessionProtoAdapter< + NKikimrSchemeShardTxBackgroundProto::TTxChainSessionLogic, + NKikimrSchemeShardTxBackgroundProto::TTxChainSessionProgress, + NKikimrSchemeShardTxBackgroundProto::TTxChainSessionState> { +public: + static TString GetStaticClassName() { + return "SS::BG::TX_CHAIN"; + } +private: + using TBase = NKikimr::NOlap::NBackground::TSessionProtoAdapter; + YDB_READONLY_DEF(TTxChainData, TxData); + YDB_READONLY(ui32, StepForExecute, 0); +protected: + virtual TConclusion> DoCreateActor(const NKikimr::NOlap::NBackground::TStartContext& context) const override; + virtual TConclusionStatus DoDeserializeFromProto(const TProtoLogic& proto) override { + return TxData.DeserializeFromProto(proto.GetCommonData()); + } + virtual TProtoLogic DoSerializeToProto() const override { + TProtoLogic result; + *result.MutableCommonData() = TxData.SerializeToProto(); + return result; + } + virtual TConclusionStatus DoDeserializeProgressFromProto(const TProtoProgress& proto) override { + StepForExecute = proto.GetStepForExecute(); + return TConclusionStatus::Success(); + } + virtual TProtoProgress DoSerializeProgressToProto() const override { + TProtoProgress proto; + proto.SetStepForExecute(StepForExecute); + return proto; + } + virtual TConclusionStatus DoDeserializeStateFromProto(const TProtoState& /*proto*/) override { + return TConclusionStatus::Success(); + } + virtual TProtoState DoSerializeStateToProto() const override { + TProtoState result; + return result; + } +public: + TTxChainSession(const TTxChainData& data) + : TxData(data) + { + + } + + void NextStep() { + AFL_VERIFY(++StepForExecute <= GetTxData().GetTransactions().size()); + } + + virtual TString GetClassName() const override { + return GetStaticClassName(); + } + virtual bool IsReadyForStart() const override { + return true; + } + virtual bool IsFinished() const override { + AFL_VERIFY(StepForExecute <= GetTxData().GetTransactions().size()); + return StepForExecute == GetTxData().GetTransactions().size(); + } + virtual bool IsReadyForRemove() const override { + return true; + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/status_channel.cpp b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/status_channel.cpp new file mode 100644 index 000000000000..135bac68d2f3 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/status_channel.cpp @@ -0,0 +1,5 @@ +#include "status_channel.h" + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/status_channel.h b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/status_channel.h new file mode 100644 index 000000000000..b42cc11a8db7 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/status_channel.h @@ -0,0 +1,50 @@ +#pragma once +#include +#include +#include +#include +#include + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +class TStatusChannel: public NBackgroundTasks::TInterfaceProtoAdapter { +private: + using TBase = NBackgroundTasks::TInterfaceProtoAdapter; + TString TaskClassName; + TString TaskIdentifier; + ui64 SSTabletId; +public: + TStatusChannel(const TString taskClassName, const TString& taskId, const ui64 ssTabletId) + : TaskClassName(taskClassName) + , TaskIdentifier(taskId) + , SSTabletId(ssTabletId) + { + AFL_VERIFY(!!TaskClassName); + AFL_VERIFY(!!TaskIdentifier); + AFL_VERIFY(SSTabletId); + } + virtual void DoOnFail(const TString& /*errorMessage*/) const override { + + } + virtual void DoOnAdded() const override { + + } + virtual void DoOnFinished() const override { + + } + virtual TProtoStorage DoSerializeToProto() const override { + TProtoStorage result; + result.SetTaskClassName(TaskClassName); + result.SetTaskIdentifier(TaskIdentifier); + result.SetSSTabletId(SSTabletId); + return result; + } + virtual TConclusionStatus DoDeserializeFromProto(const TProtoStorage& proto) override { + TaskClassName = proto.GetTaskClassName(); + TaskIdentifier = proto.GetTaskIdentifier(); + SSTabletId = proto.GetSSTabletId(); + return TConclusionStatus::Success(); + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/task.cpp b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/task.cpp new file mode 100644 index 000000000000..945814b1a2eb --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/task.cpp @@ -0,0 +1,10 @@ +#include "task.h" +#include "session.h" + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +std::shared_ptr TTxChainTask::DoBuildSession() const { + return std::make_shared(TxData); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/task.h b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/task.h new file mode 100644 index 000000000000..e8ba8ead96fa --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/task.h @@ -0,0 +1,37 @@ +#pragma once +#include "common.h" +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +class TTxChainTask: public NBackgroundTasks::TInterfaceProtoAdapter { +private: + TTxChainData TxData; + using TBase = NBackgroundTasks::TInterfaceProtoAdapter; +public: + static TString GetStaticClassName() { + return "SS::BG::TX_CHAIN"; + } +private: + virtual TConclusionStatus DoDeserializeFromProto(const TProtoStorage& proto) override { + return TxData.DeserializeFromProto(proto.GetCommonData()); + } + virtual TProtoStorage DoSerializeToProto() const override { + TProtoStorage result; + *result.MutableCommonData() = TxData.SerializeToProto(); + return result; + } + virtual std::shared_ptr DoBuildSession() const override; + static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetStaticClassName()); +public: + virtual TString GetClassName() const override { + return GetStaticClassName(); + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/ya.make b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/ya.make new file mode 100644 index 000000000000..edaaef5b8527 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + status_channel.cpp + task.cpp + session.cpp + actor.cpp + common.cpp +) + +PEERDIR( + ydb/core/tx/schemeshard/olap/bg_tasks/protos + ydb/core/tx/columnshard/bg_tasks/abstract +) + +END() diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/ya.make b/ydb/core/tx/schemeshard/olap/bg_tasks/ya.make new file mode 100644 index 000000000000..f83b5e3e843d --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/ya.make @@ -0,0 +1,9 @@ +LIBRARY() + +PEERDIR( + ydb/core/tx/schemeshard/olap/bg_tasks/adapter + ydb/core/tx/schemeshard/olap/bg_tasks/protos + ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain +) + +END() diff --git a/ydb/core/tx/schemeshard/olap/ya.make b/ydb/core/tx/schemeshard/olap/ya.make index fbb487ef0b5a..d41824702a58 100644 --- a/ydb/core/tx/schemeshard/olap/ya.make +++ b/ydb/core/tx/schemeshard/olap/ya.make @@ -2,6 +2,7 @@ LIBRARY() PEERDIR( ydb/core/tx/schemeshard/olap/columns + ydb/core/tx/schemeshard/olap/bg_tasks ydb/core/tx/schemeshard/olap/indexes ydb/core/tx/schemeshard/olap/schema ydb/core/tx/schemeshard/olap/common diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 2c5c92617953..4961c21f98a2 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1,6 +1,7 @@ #include "schemeshard.h" #include "schemeshard_impl.h" #include "schemeshard_svp_migration.h" +#include "olap/bg_tasks/adapter/adapter.h" #include #include @@ -407,6 +408,7 @@ void TSchemeShard::Clear() { Views.clear(); ColumnTables = { }; + BackgroundSessionsManager = std::make_shared(std::make_shared()); RevertedMigrations.clear(); @@ -4230,6 +4232,7 @@ TSchemeShard::TSchemeShard(const TActorId &tablet, TTabletStorageInfo *info) TabletCounters = TabletCountersPtr.Get(); SelfPinger = new TSelfPinger(SelfTabletId(), TabletCounters); + BackgroundSessionsManager = std::make_shared(std::make_shared()); } const TDomainsInfo::TDomain& TSchemeShard::GetDomainDescription(const TActorContext &ctx) const { @@ -4319,16 +4322,6 @@ void TSchemeShard::OnTabletDead(TEvTablet::TEvTabletDead::TPtr &ev, const TActor Die(ctx); } -static TVector CollectTxAllocators(const TAppData *appData) { - TVector allocators; - if (const auto& domain = appData->DomainsInfo->Domain) { - for (auto tabletId: domain->TxAllocators) { - allocators.push_back(tabletId); - } - } - return allocators; -} - void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { const TTabletId selfTabletId = SelfTabletId(); @@ -4376,7 +4369,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { AllowServerlessStorageBilling = appData->FeatureFlags.GetAllowServerlessStorageBillingForSchemeShard(); appData->Icb->RegisterSharedControl(AllowServerlessStorageBilling, "SchemeShard_AllowServerlessStorageBilling"); - TxAllocatorClient = RegisterWithSameMailbox(CreateTxAllocatorClient(CollectTxAllocators(appData))); + TxAllocatorClient = RegisterWithSameMailbox(CreateTxAllocatorClient(appData)); SysPartitionStatsCollector = Register(NSysView::CreatePartitionStatsCollector().Release()); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index c00aef33df61..7fedcfed1a49 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -49,6 +49,7 @@ #include #include #include +#include #include #include #include @@ -249,6 +250,7 @@ class TSchemeShard TTempTablesState TempTablesState; TTablesStorage ColumnTables; + std::shared_ptr BackgroundSessionsManager; // it is only because we need to manage undo of upgrade subdomain, finally remove it THashMap> RevertedMigrations; diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 4bf23b3691cb..1a7abb8950ed 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1758,15 +1758,27 @@ struct Schema : NIceDb::Schema { >; }; - struct View : Table<108> { - struct PathId : Column<1, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; }; - struct AlterVersion : Column<2, NScheme::NTypeIds::Uint64> {}; - struct QueryText : Column<3, NScheme::NTypeIds::String> {}; + struct View: Table<108> { + struct PathId: Column<1, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; }; + struct AlterVersion: Column<2, NScheme::NTypeIds::Uint64> {}; + struct QueryText: Column<3, NScheme::NTypeIds::String> {}; using TKey = TableKey; using TColumns = TableColumns; }; + struct BackgroundSessions: Table<109> { + struct ClassName: Column<1, NScheme::NTypeIds::String> {}; + struct Identifier: Column<2, NScheme::NTypeIds::String> {}; + struct StatusChannel: Column<3, NScheme::NTypeIds::String> {}; + struct LogicDescription: Column<4, NScheme::NTypeIds::String> {}; + struct Progress: Column<5, NScheme::NTypeIds::String> {}; + struct State: Column<6, NScheme::NTypeIds::String> {}; + + using TKey = TableKey; + using TColumns = TableColumns; + }; + using TTables = SchemaTables< Paths, TxInFlight, @@ -1874,7 +1886,8 @@ struct Schema : NIceDb::Schema { ExternalDataSource, PersQueueGroupStats, BuildColumnOperationSettings, - View + View, + BackgroundSessions >; static constexpr ui64 SysParam_NextPathId = 1; diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index ac8e6d436cb0..310f550bdd79 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -276,6 +276,7 @@ PEERDIR( ydb/library/yql/minikql ydb/library/yql/providers/common/proto ydb/services/bg_tasks + ydb/core/tx/columnshard/bg_tasks/manager ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/tx/tx_allocator_client/actor_client.cpp b/ydb/core/tx/tx_allocator_client/actor_client.cpp index 00e47c440ec7..7ee371522588 100644 --- a/ydb/core/tx/tx_allocator_client/actor_client.cpp +++ b/ydb/core/tx/tx_allocator_client/actor_client.cpp @@ -2,6 +2,7 @@ #include "client.h" #include +#include #include #include @@ -125,4 +126,22 @@ IActor* CreateTxAllocatorClient(TVector txAllocators) { return new TTxAllocatorClientActor(std::move(txAllocators)); } +namespace { + +TVector CollectTxAllocators(const TAppData* appData) { + TVector allocators; + if (const auto& domain = appData->DomainsInfo->Domain) { + for (auto tabletId : domain->TxAllocators) { + allocators.push_back(tabletId); + } + } + return allocators; +} + +} + +IActor* CreateTxAllocatorClient(const TAppData* appData) { + return CreateTxAllocatorClient(CollectTxAllocators(appData)); +} + } // NKikimr diff --git a/ydb/core/tx/tx_allocator_client/actor_client.h b/ydb/core/tx/tx_allocator_client/actor_client.h index 3f1f608928ca..5d5d97f33358 100644 --- a/ydb/core/tx/tx_allocator_client/actor_client.h +++ b/ydb/core/tx/tx_allocator_client/actor_client.h @@ -4,7 +4,9 @@ #include + namespace NKikimr { +struct TAppData; struct TEvTxAllocatorClient { enum EEv { @@ -49,5 +51,6 @@ struct TEvTxAllocatorClient { }; // TTxAllocatorClientEvents IActor* CreateTxAllocatorClient(TVector txAllocators); +IActor* CreateTxAllocatorClient(const TAppData* appData); } // NKikimr diff --git a/ydb/services/bg_tasks/abstract/interface.h b/ydb/services/bg_tasks/abstract/interface.h index 4f5ec86063eb..cc7a512ca8b0 100644 --- a/ydb/services/bg_tasks/abstract/interface.h +++ b/ydb/services/bg_tasks/abstract/interface.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -137,16 +138,12 @@ class TCommonInterfaceContainer { template const T& GetAsSafe() const { - auto result = std::dynamic_pointer_cast(Object); - Y_ABORT_UNLESS(!!result); - return *result; + return *GetObjectPtrVerifiedAs(); } template T& GetAsSafe() { - auto result = std::dynamic_pointer_cast(Object); - Y_ABORT_UNLESS(!!result); - return *result; + return *GetObjectPtrVerifiedAs(); } std::shared_ptr GetObjectPtr() const { @@ -158,6 +155,13 @@ class TCommonInterfaceContainer { return Object; } + template + std::shared_ptr GetObjectPtrVerifiedAs() const { + auto result = std::dynamic_pointer_cast(Object); + Y_ABORT_UNLESS(!!result); + return result; + } + const IInterface& GetObjectVerified() const { AFL_VERIFY(Object); return *Object; diff --git a/ydb/services/bg_tasks/abstract/ya.make b/ydb/services/bg_tasks/abstract/ya.make index d6ea911e3ad0..4e973f9674d9 100644 --- a/ydb/services/bg_tasks/abstract/ya.make +++ b/ydb/services/bg_tasks/abstract/ya.make @@ -14,6 +14,7 @@ PEERDIR( ydb/library/actors/core ydb/public/api/protos ydb/services/bg_tasks/protos + ydb/library/conclusion ydb/core/base )