diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 5265663f8cbc..7476cca03108 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -177,6 +177,7 @@ struct TKikimrEvents : TEvents { ES_NEBIUS_ACCESS_SERVICE, ES_REPLICATION_SERVICE, ES_BACKUP_SERVICE, + ES_TX_BACKGROUND, }; }; diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto index b2002ee2cca8..9d2a212bb47a 100644 --- a/ydb/core/protos/counters_columnshard.proto +++ b/ydb/core/protos/counters_columnshard.proto @@ -190,4 +190,8 @@ enum ETxTypes { TXTYPE_DATA_SHARING_APPLY_LINKS_MODIFICATION = 27 [(TxTypeOpts) = {Name: "TxDataSharingApplyLinksModification"}]; TXTYPE_DATA_SHARING_WRITE_SOURCE_CURSOR = 28 [(TxTypeOpts) = {Name: "TxDataSharingWriteSourceCursor"}]; TXTYPE_EXPORT_SAVE_CURSOR = 29 [(TxTypeOpts) = {Name: "TxExportSaveCursor"}]; + TXTYPE_REMOVE_BACKGROUND_SESSION = 30 [(TxTypeOpts) = {Name: "TxRemoveBackgroundSession"}]; + TXTYPE_ADD_BACKGROUND_SESSION = 31 [(TxTypeOpts) = {Name: "TxAddBackgroundSession"}]; + TXTYPE_SAVE_BACKGROUND_SESSION_PROGRESS = 32 [(TxTypeOpts) = {Name: "TxSaveBackgroundSessionProgress"}]; + TXTYPE_SAVE_BACKGROUND_SESSION_STATE = 33 [(TxTypeOpts) = {Name: "TxSaveBackgroundSessionState"}]; } diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/adapter.cpp b/ydb/core/tx/columnshard/bg_tasks/abstract/adapter.cpp new file mode 100644 index 000000000000..d7419850c8c2 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/adapter.cpp @@ -0,0 +1,5 @@ +#include "adapter.h" + +namespace NKikimr::NOlap::NBackground { + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h b/ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h new file mode 100644 index 000000000000..99f7c0d2e39d --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h @@ -0,0 +1,41 @@ +#pragma once +#include "session.h" +#include +#include +#include +#include + +namespace NKikimr::NOlap::NBackground { + +class ITabletAdapter { +private: + virtual bool DoLoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque& records) = 0; + virtual void DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0; + virtual void DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0; + virtual void DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0; + virtual void DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) = 0; +public: + virtual ~ITabletAdapter() = default; + + void RemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) { + return DoRemoveSessionFromLocalDatabase(txc, className, identifier); + } + + [[nodiscard]] bool LoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque& records) { + return DoLoadSessionsFromLocalDatabase(txc, records); + } + + void SaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) { + return DoSaveSessionToLocalDatabase(txc, session); + } + + void SaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) { + return DoSaveStateToLocalDatabase(txc, session); + } + + void SaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) { + return DoSaveProgressToLocalDatabase(txc, session); + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/control.cpp b/ydb/core/tx/columnshard/bg_tasks/abstract/control.cpp new file mode 100644 index 000000000000..4ac837de8a4b --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/control.cpp @@ -0,0 +1,33 @@ +#include "control.h" +#include + +namespace NKikimr::NOlap::NBackground { + +NKikimrTxBackgroundProto::TSessionControlContainer TSessionControlContainer::SerializeToProto() const { + NKikimrTxBackgroundProto::TSessionControlContainer result; + result.SetSessionClassName(SessionClassName); + result.SetSessionIdentifier(SessionIdentifier); + result.SetStatusChannelContainer(ChannelContainer.SerializeToString()); + result.SetLogicControlContainer(LogicControlContainer.SerializeToString()); + return result; +} + +NKikimr::TConclusionStatus TSessionControlContainer::DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto) { + SessionClassName = proto.GetSessionClassName(); + SessionIdentifier = proto.GetSessionIdentifier(); + if (!SessionClassName) { + return TConclusionStatus::Fail("incorrect session class name for bg_task"); + } + if (!SessionIdentifier) { + return TConclusionStatus::Fail("incorrect session id for bg_task"); + } + if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) { + return TConclusionStatus::Fail("cannot parse channel from proto"); + } + if (!LogicControlContainer.DeserializeFromString(proto.GetLogicControlContainer())) { + return TConclusionStatus::Fail("cannot parse logic from proto"); + } + return TConclusionStatus::Success(); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/control.h b/ydb/core/tx/columnshard/bg_tasks/abstract/control.h new file mode 100644 index 000000000000..43e53729de76 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/control.h @@ -0,0 +1,69 @@ +#pragma once +#include "session.h" +#include "status_channel.h" +#include +#include + +namespace NKikimrTxBackgroundProto { +class TSessionControlContainer; +} + +namespace NKikimr::NOlap::NBackground { + +class ISessionLogicControl { +private: + YDB_READONLY_DEF(TString, Identifier); + virtual TConclusionStatus DoApply(const std::shared_ptr& session) const = 0; + virtual TConclusionStatus DoDeserializeFromString(const TString& data) = 0; + virtual TString DoSerializeToString() const = 0; +public: + using TFactory = NObjectFactory::TObjectFactory; + + virtual ~ISessionLogicControl() = default; + + TConclusionStatus DeserializeFromString(const TString& data) { + return DoDeserializeFromString(data); + } + + TString SerializeToString() const { + return DoSerializeToString(); + } + + TConclusionStatus Apply(const std::shared_ptr& session) const { + session->CheckStatusCorrect(); + auto result = DoApply(session); + session->CheckStatusCorrect(); + return result; + } + + virtual TString GetClassName() const = 0; +}; + +class TSessionLogicControlContainer: public NBackgroundTasks::TInterfaceStringContainer { +private: + using TBase = NBackgroundTasks::TInterfaceStringContainer; +public: + using TBase::TBase; +}; + +class TSessionControlContainer { +private: + YDB_READONLY_DEF(TString, SessionClassName); + YDB_READONLY_DEF(TString, SessionIdentifier); + YDB_READONLY_DEF(TStatusChannelContainer, ChannelContainer); + YDB_READONLY_DEF(TSessionLogicControlContainer, LogicControlContainer); +public: + NKikimrTxBackgroundProto::TSessionControlContainer SerializeToProto() const; + TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto); + + TSessionControlContainer() = default; + + TSessionControlContainer(const TStatusChannelContainer& channel, const TSessionLogicControlContainer& logic) + : ChannelContainer(channel) + , LogicControlContainer(logic) { + AFL_VERIFY(!!ChannelContainer); + AFL_VERIFY(!!LogicControlContainer); + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/session.cpp b/ydb/core/tx/columnshard/bg_tasks/abstract/session.cpp new file mode 100644 index 000000000000..cdc3393c5854 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/session.cpp @@ -0,0 +1,5 @@ +#include "session.h" + +namespace NKikimr::NOlap::NBackground { + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/session.h b/ydb/core/tx/columnshard/bg_tasks/abstract/session.h new file mode 100644 index 000000000000..55317ff6665d --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/session.h @@ -0,0 +1,148 @@ +#pragma once +#include "status_channel.h" + +#include +#include + +#include +#include +#include +#include + +namespace NKikimr::NOlap::NBackground { + +class TSession; +class ITabletAdapter; + +class TStartContext { +private: + YDB_READONLY_DEF(TTabletId, TabletId); + YDB_READONLY_DEF(NActors::TActorId, TabletActorId); + YDB_READONLY_DEF(TStatusChannelContainer, Channel); + YDB_READONLY_DEF(std::shared_ptr, SessionSelfPtr); + YDB_READONLY_DEF(std::shared_ptr, Adapter); +public: + TStartContext(const TTabletId tabletId, const NActors::TActorId& tabletActorId, const TStatusChannelContainer channel, const std::shared_ptr& sessionSelfPtr, + const std::shared_ptr& adapter) + : TabletId(tabletId) + , TabletActorId(tabletActorId) + , Channel(channel) + , SessionSelfPtr(sessionSelfPtr) + , Adapter(adapter) + { + + } +}; + +class ISessionLogic { +private: + mutable bool ActorConstructed = false; + virtual TConclusionStatus DoDeserializeProgressFromString(const TString& data) = 0; + virtual TConclusionStatus DoDeserializeStateFromString(const TString& data) = 0; + virtual TConclusionStatus DoDeserializeFromString(const TString& data) = 0; + virtual TString DoSerializeProgressToString() const = 0; + virtual TString DoSerializeStateToString() const = 0; + virtual TString DoSerializeToString() const = 0; + virtual TConclusion> DoCreateActor(const TStartContext& context) const = 0; +public: + using TFactory = NObjectFactory::TObjectFactory; + + virtual ~ISessionLogic() = default; + + virtual TString GetClassName() const = 0; + + void CheckStatusCorrect() const { + } + + TConclusionStatus DeserializeProgressFromString(const TString& data) { + return DoDeserializeProgressFromString(data); + } + TString SerializeProgressToString() const { + CheckStatusCorrect(); + return DoSerializeProgressToString(); + } + TConclusionStatus DeserializeFromString(const TString& data) { + return DoDeserializeFromString(data); + } + TString SerializeToString() const { + CheckStatusCorrect(); + return DoSerializeToString(); + } + TConclusionStatus DeserializeStateFromString(const TString& data) { + return DoDeserializeStateFromString(data); + } + TString SerializeStateToString() const { + CheckStatusCorrect(); + return DoSerializeStateToString(); + } + + std::unique_ptr CreateActor(const TStartContext& context) const { + AFL_VERIFY(IsReadyForStart()); + AFL_VERIFY(!IsFinished()); + AFL_VERIFY(!ActorConstructed); + ActorConstructed = true; + std::unique_ptr result = DoCreateActor(context).DetachResult(); + AFL_VERIFY(!!result); + return result; + } + + virtual bool IsReadyForStart() const = 0; + virtual bool IsFinished() const = 0; + virtual bool IsReadyForRemove() const = 0; +}; + +template +class TSessionProtoAdapter: public NBackgroundTasks::TInterfaceProtoAdapter { +protected: + using TProtoProgress = TProtoProgressExt; + using TProtoState = TProtoStateExt; + using TProtoLogic = TProtoLogicExt; +private: + virtual TConclusionStatus DoDeserializeProgressFromProto(const TProtoProgress& proto) = 0; + virtual TProtoProgress DoSerializeProgressToProto() const = 0; + virtual TConclusionStatus DoDeserializeStateFromProto(const TProtoState& proto) = 0; + virtual TProtoState DoSerializeStateToProto() const = 0; +protected: + virtual TConclusionStatus DoDeserializeProgressFromString(const TString& data) override final { + TProtoProgress proto; + if (!proto.ParseFromArray(data.data(), data.size())) { + return TConclusionStatus::Fail("cannot parse proto string as " + TypeName()); + } + return DoDeserializeProgressFromProto(proto); + } + virtual TString DoSerializeProgressToString() const override final { + TProtoProgress proto = DoSerializeProgressToProto(); + return proto.SerializeAsString(); + } + virtual TConclusionStatus DoDeserializeStateFromString(const TString& data) override final { + TProtoState proto; + if (!proto.ParseFromArray(data.data(), data.size())) { + return TConclusionStatus::Fail("cannot parse proto string as " + TypeName()); + } + return DoDeserializeStateFromProto(proto); + } + virtual TString DoSerializeStateToString() const override final { + TProtoState proto = DoSerializeStateToProto(); + return proto.SerializeAsString(); + } +}; + +class TSessionLogicContainer: public NBackgroundTasks::TInterfaceStringContainer { +private: + using TBase = NBackgroundTasks::TInterfaceStringContainer; +public: + using TBase::TBase; +}; + +class TSessionRecord { +private: + YDB_ACCESSOR_DEF(TString, Identifier); + YDB_ACCESSOR_DEF(TString, ClassName); + YDB_ACCESSOR_DEF(TString, LogicDescription); + YDB_ACCESSOR_DEF(TString, StatusChannel); + YDB_ACCESSOR_DEF(TString, Progress); + YDB_ACCESSOR_DEF(TString, State); +public: +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.cpp b/ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.cpp new file mode 100644 index 000000000000..3e259053d4ac --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.cpp @@ -0,0 +1,5 @@ +#include "status_channel.h" + +namespace NKikimr::NOlap::NBackground { + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.h b/ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.h new file mode 100644 index 000000000000..2ab8c1f4180b --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.h @@ -0,0 +1,88 @@ +#pragma once +#include + +#include +#include +#include +#include + +namespace NKikimr::NOlap::NBackground { + +class IStatusChannel { +private: + virtual void DoOnFail(const TString& errorMessage) const = 0; + virtual void DoOnAdded() const = 0; + virtual void DoOnFinished() const = 0; + virtual TString DoSerializeToString() const = 0; + virtual TConclusionStatus DoDeserializeFromString(const TString& data) = 0; +public: + virtual ~IStatusChannel() = default; + + using TFactory = NObjectFactory::TObjectFactory; + + virtual TString GetClassName() const = 0; + + TString SerializeToString() const { + return DoSerializeToString(); + } + TConclusionStatus DeserializeFromString(const TString& data) { + return DoDeserializeFromString(data); + } + + void OnFail(const TString& errorMessage) const { + AFL_ERROR(NKikimrServices::TX_BACKGROUND)("problem", "fail_on_background_task")("reason", errorMessage); + DoOnFail(errorMessage); + } + void OnAdded() const { + AFL_INFO(NKikimrServices::TX_BACKGROUND)("info", "background task added"); + DoOnAdded(); + } + void OnFinished() const { + AFL_INFO(NKikimrServices::TX_BACKGROUND)("info", "background task finished"); + DoOnFinished(); + } +}; + +class TFakeStatusChannel: public IStatusChannel { +public: + static TString GetClassNameStatic() { + return "FAKE"; + } +private: + static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); + virtual void DoOnFail(const TString& /*errorMessage*/) const override { + + } + virtual void DoOnAdded() const override { + + } + virtual void DoOnFinished() const override { + + } + virtual TString DoSerializeToString() const override { + return ""; + } + virtual TConclusionStatus DoDeserializeFromString(const TString& /*data*/) override { + return TConclusionStatus::Success(); + } +public: + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +class TStatusChannelContainer: public NBackgroundTasks::TInterfaceStringContainer { +private: + using TBase = NBackgroundTasks::TInterfaceStringContainer; +public: + using TBase::TBase; + bool DeserializeFromString(const TString& data) { + if (!TBase::DeserializeFromString(data)) { + Initialize(TFakeStatusChannel::GetClassNameStatic()); + return false; + } + return true; + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/task.cpp b/ydb/core/tx/columnshard/bg_tasks/abstract/task.cpp new file mode 100644 index 000000000000..0fde956575f7 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/task.cpp @@ -0,0 +1,28 @@ +#include "task.h" +#include + +namespace NKikimr::NOlap::NBackground { + +NKikimrTxBackgroundProto::TTaskContainer TTask::SerializeToProto() const { + NKikimrTxBackgroundProto::TTaskContainer result; + result.SetIdentifier(Identifier); + result.SetStatusChannelContainer(ChannelContainer.SerializeToString()); + result.SetTaskDescriptionContainer(DescriptionContainer.SerializeToString()); + return result; +} + +NKikimr::TConclusionStatus TTask::DeserializeFromProto(const NKikimrTxBackgroundProto::TTaskContainer& proto) { + Identifier = proto.GetIdentifier(); + if (!Identifier) { + return TConclusionStatus::Fail("empty identifier is not correct for bg_task"); + } + if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) { + return TConclusionStatus::Fail("cannot parse status channel from proto"); + } + if (!DescriptionContainer.DeserializeFromString(proto.GetTaskDescriptionContainer())) { + return TConclusionStatus::Fail("cannot parse task description from proto"); + } + return TConclusionStatus::Success(); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/task.h b/ydb/core/tx/columnshard/bg_tasks/abstract/task.h new file mode 100644 index 000000000000..217fbea65f03 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/task.h @@ -0,0 +1,64 @@ +#pragma once +#include "session.h" +#include +#include + +namespace NKikimrTxBackgroundProto { +class TTaskContainer; +} + +namespace NKikimr::NOlap::NBackground { + +class ITaskDescription { +private: + virtual TConclusionStatus DoDeserializeFromString(const TString& data) = 0; + virtual TString DoSerializeToString() const = 0; + virtual std::shared_ptr DoBuildSession() const = 0; +public: + using TFactory = NObjectFactory::TObjectFactory; + + virtual ~ITaskDescription() = default; + + virtual TString GetClassName() const = 0; + + TConclusionStatus DeserializeFromString(const TString& data) { + return DoDeserializeFromString(data); + } + + TString SerializeToString() const { + return DoSerializeToString(); + } + + TConclusion> BuildSessionLogic() const { + return DoBuildSession(); + } +}; + +class TTaskDescriptionContainer: public NBackgroundTasks::TInterfaceStringContainer { +private: + using TBase = NBackgroundTasks::TInterfaceStringContainer; +public: + using TBase::TBase; +}; + +class TTask { +private: + YDB_READONLY_DEF(TString, Identifier); + YDB_READONLY_DEF(TStatusChannelContainer, ChannelContainer); + YDB_READONLY_DEF(TTaskDescriptionContainer, DescriptionContainer); +public: + TTask() = default; + TTask(const TString& identifier, const TStatusChannelContainer& channelContainer, const TTaskDescriptionContainer& descriptionContainer) + : Identifier(identifier) + , ChannelContainer(channelContainer) + , DescriptionContainer(descriptionContainer) + { + AFL_VERIFY(!!Identifier); + AFL_VERIFY(!!ChannelContainer); + AFL_VERIFY(!!DescriptionContainer); + } + NKikimrTxBackgroundProto::TTaskContainer SerializeToProto() const; + TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TTaskContainer& proto); +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/ya.make b/ydb/core/tx/columnshard/bg_tasks/abstract/ya.make new file mode 100644 index 000000000000..23d533c4af5e --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/ya.make @@ -0,0 +1,18 @@ +LIBRARY() + +SRCS( + task.cpp + status_channel.cpp + session.cpp + control.cpp + adapter.cpp +) + +PEERDIR( + ydb/core/tablet_flat + ydb/library/accessor + ydb/library/services + ydb/core/tx/columnshard/bg_tasks/protos +) + +END() diff --git a/ydb/core/tx/columnshard/bg_tasks/events/events.cpp b/ydb/core/tx/columnshard/bg_tasks/events/events.cpp new file mode 100644 index 000000000000..1227faa871bb --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/events/events.cpp @@ -0,0 +1,5 @@ +#include "events.h" + +namespace NKikimr::NOlap::NExport::NEvents { + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/events/events.h b/ydb/core/tx/columnshard/bg_tasks/events/events.h new file mode 100644 index 000000000000..23e7ca49947d --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/events/events.h @@ -0,0 +1,73 @@ +#pragma once +#include +#include +#include +#include + +namespace NKikimr::NOlap::NBackground { + +class TEvents { +public: + enum EEv { + EvExecuteGeneralTransaction = EventSpaceBegin(TKikimrEvents::ES_TX_BACKGROUND), + EvTransactionComplete, + EvSessionControl, + EvRemoveSession, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_TX_BACKGROUND), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_TX_BACKGROUND)"); +}; + +class TEvExecuteGeneralTransaction: public TEventLocal { +private: + std::unique_ptr Transaction; +public: + TEvExecuteGeneralTransaction(std::unique_ptr&& transaction) + : Transaction(std::move(transaction)) + { + AFL_VERIFY(!!Transaction); + } + + std::unique_ptr ExtractTransaction() { + AFL_VERIFY(!!Transaction); + return std::move(Transaction); + } +}; + +class TEvTransactionCompleted: public TEventLocal { +private: + const ui64 InternalTxId; +public: + TEvTransactionCompleted(const ui64 internalTxId) + : InternalTxId(internalTxId) { + } + + ui64 GetInternalTxId() const { + return InternalTxId; + } +}; + +class TEvRemoveSession: public TEventLocal { +private: + YDB_READONLY_DEF(TString, ClassName); + YDB_READONLY_DEF(TString, Identifier); +public: + TEvRemoveSession(const TString& className, const TString& identifier) + : ClassName(className) + , Identifier(identifier) + { + } + +}; + +struct TEvSessionControl: public TEventPB { + TEvSessionControl() = default; + + TEvSessionControl(const TSessionControlContainer& container) { + Record = container.SerializeToProto(); + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/events/ya.make b/ydb/core/tx/columnshard/bg_tasks/events/ya.make new file mode 100644 index 000000000000..07e0da0f237b --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/events/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + events.cpp +) + +PEERDIR( + ydb/core/base + ydb/core/tx/columnshard/bg_tasks/protos +) + +END() diff --git a/ydb/core/tx/columnshard/bg_tasks/manager/actor.cpp b/ydb/core/tx/columnshard/bg_tasks/manager/actor.cpp new file mode 100644 index 000000000000..8eeffa84b000 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/manager/actor.cpp @@ -0,0 +1,54 @@ +#include "actor.h" +#include +#include + +namespace NKikimr::NOlap::NBackground { + +void TSessionActor::SaveSessionProgress() { + AFL_VERIFY(!SaveSessionProgressTx); + const ui64 txId = GetNextTxId(); + SaveSessionProgressTx.emplace(txId); + auto tx = std::make_unique(Session, SelfId(), Adapter, txId); + AFL_VERIFY(Send(TabletActorId, std::move(tx))); +} + +void TSessionActor::SaveSessionState() { + AFL_VERIFY(!SaveSessionStateTx); + const ui64 txId = GetNextTxId(); + SaveSessionStateTx.emplace(txId); + auto tx = std::make_unique(Session, SelfId(), Adapter, txId); + AFL_VERIFY(Send(TabletActorId, std::move(tx))); +} + +void TSessionActor::Handle(TEvTransactionCompleted::TPtr& ev) { + if (SaveSessionProgressTx && *SaveSessionProgressTx == ev->Get()->GetInternalTxId()) { + SaveSessionProgressTx.reset(); + OnSessionProgressSaved(); + } else if (SaveSessionStateTx && *SaveSessionStateTx == ev->Get()->GetInternalTxId()) { + SaveSessionStateTx.reset(); + OnSessionStateSaved(); + } else { + OnTxCompleted(ev->Get()->GetInternalTxId()); + } +} + +void TSessionActor::Handle(TEvSessionControl::TPtr& ev) { + TSessionControlContainer control; + { + auto conclusion = control.DeserializeFromProto(ev->Get()->Record); + if (conclusion.IsFail()) { + control.GetChannelContainer()->OnFail(conclusion.GetErrorMessage()); + return; + } + } + { + auto conclusion = control.GetLogicControlContainer()->Apply(Session->GetLogicContainer().GetObjectPtrVerified()); + if (conclusion.IsFail()) { + control.GetChannelContainer()->OnFail(conclusion.GetErrorMessage()); + return; + } + } + SaveSessionState(); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/manager/actor.h b/ydb/core/tx/columnshard/bg_tasks/manager/actor.h new file mode 100644 index 000000000000..e111251c28af --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/manager/actor.h @@ -0,0 +1,69 @@ +#pragma once +#include +#include +#include +#include + +#include +#include + +namespace NKikimr::NOlap::NBackground { + +class TSessionActor: public NActors::TActorBootstrapped { +private: + ui64 TxCounter = 0; + std::optional SaveSessionProgressTx; + std::optional SaveSessionStateTx; + virtual void OnTxCompleted(const ui64 txInternalId) = 0; + virtual void OnSessionProgressSaved() = 0; + virtual void OnSessionStateSaved() = 0; + virtual void OnBootstrap(const TActorContext& ctx) = 0; +protected: + const TTabletId TabletId; + const NActors::TActorId TabletActorId; + std::shared_ptr Adapter; + std::shared_ptr Session; + ui64 GetNextTxId() { + return ++TxCounter; + } +protected: + void ExecuteTransaction(std::unique_ptr&& tx) { + AFL_VERIFY(Send(TabletActorId, std::move(tx))); + } + + void SaveSessionProgress(); + + void SaveSessionState(); +public: + TSessionActor(const TTabletId tabletId, const NActors::TActorId tabletActorId, const std::shared_ptr& session, const std::shared_ptr& adapter) + : TabletId(tabletId) + , TabletActorId(tabletActorId) + , Adapter(adapter) + , Session(session) + { + AFL_VERIFY(!!Session); + AFL_VERIFY(!!Adapter); + } + + void Handle(TEvTransactionCompleted::TPtr& ev); + + void Handle(TEvSessionControl::TPtr& ev); + + STATEFN(StateInProgress) { + const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_BACKGROUND)("SelfId", SelfId())("TabletId", TabletId); + switch (ev->GetTypeRewrite()) { + hFunc(TEvTransactionCompleted, Handle); + hFunc(TEvSessionControl, Handle); + cFunc(NActors::TEvents::TEvPoisonPill::EventType, PassAway); + default: + AFL_VERIFY(false)("unexpected_event", ev->GetTypeName()); + } + } + + void Bootstrap(const TActorContext& ctx) { + OnBootstrap(ctx); + } + +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/manager/manager.cpp b/ydb/core/tx/columnshard/bg_tasks/manager/manager.cpp new file mode 100644 index 000000000000..c07390f2e2e1 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/manager/manager.cpp @@ -0,0 +1,81 @@ +#include "manager.h" +#include +#include +#include +#include +#include + +namespace NKikimr::NOlap::NBackground { + +std::unique_ptr TSessionsManager::ApplyControlFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& controlProto) { + TSessionControlContainer control; + auto conclusion = control.DeserializeFromProto(controlProto); + if (conclusion.IsFail()) { + control.GetChannelContainer()->OnFail(conclusion.GetErrorMessage()); + return nullptr; + } + auto session = Storage->GetSession(control.GetSessionClassName(), control.GetSessionIdentifier()); + if (!session) { + control.GetChannelContainer()->OnFail("session not exists"); + return nullptr; + } + if (!session->IsRunning()) { + auto conclusion = control.GetLogicControlContainer()->Apply(session->GetLogicContainer().GetObjectPtr()); + if (conclusion.IsFail()) { + control.GetChannelContainer()->OnFail(conclusion.GetErrorMessage()); + return nullptr; + } else { + return std::make_unique(session, std::nullopt, Adapter, 0); + } + } else { + NActors::TActivationContext::AsActorContext().Send(session->GetActorIdVerified(), new TEvSessionControl(control)); + return nullptr; + } +} + +std::unique_ptr TSessionsManager::AddTaskFromProto(const NKikimrTxBackgroundProto::TTaskContainer& taskProto) { + TTask task; + auto conclusion = task.DeserializeFromProto(taskProto); + if (conclusion.IsFail()) { + task.GetChannelContainer()->OnFail(conclusion.GetErrorMessage()); + return nullptr; + } + auto session = Storage->GetSession(task.GetDescriptionContainer().GetClassName(), task.GetIdentifier()); + if (!!session) { + task.GetChannelContainer()->OnFail("session exists already"); + return nullptr; + } + + if (!task.GetDescriptionContainer()) { + task.GetChannelContainer()->OnFail("task description is empty"); + return nullptr; + } + TConclusion> sessionLogic = task.GetDescriptionContainer()->BuildSessionLogic(); + if (sessionLogic.IsFail()) { + task.GetChannelContainer()->OnFail(sessionLogic.GetErrorMessage()); + return nullptr; + } + std::shared_ptr newSession(new TSession(task.GetIdentifier(), task.GetChannelContainer(), sessionLogic.DetachResult())); + return std::make_unique(Adapter, Storage, std::move(newSession)); +} + +bool TSessionsManager::LoadIdempotency(NTabletFlatExecutor::TTransactionContext& txc) { + std::deque records; + if (!Adapter->LoadSessionsFromLocalDatabase(txc, records)) { + return false; + } + auto storage = std::make_shared(); + while (records.size()) { + std::shared_ptr session = std::make_shared(); + session->DeserializeFromLocalDatabase(std::move(records.front())).Validate("on load from local database"); + storage->AddSession(session); + records.pop_front(); + } + return true; +} + +std::unique_ptr TSessionsManager::Remove(const TString& className, const TString& identifier) { + return std::make_unique(className, identifier, Adapter, Storage); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/manager/manager.h b/ydb/core/tx/columnshard/bg_tasks/manager/manager.h new file mode 100644 index 000000000000..f6c269407f81 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/manager/manager.h @@ -0,0 +1,48 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimrTxBackgroundProto { +class TTaskContainer; +class TSessionControlContainer; +} + +namespace NKikimr::NOlap::NBackground { + +class TSessionsManager { +private: + bool Started = false; + std::shared_ptr Storage; + std::shared_ptr Adapter; +public: + TSessionsManager(const std::shared_ptr& adapter) + : Adapter(adapter) + { + Storage = std::make_shared(); + } + + bool LoadIdempotency(NTabletFlatExecutor::TTransactionContext& txc); + + [[nodiscard]] std::unique_ptr AddTaskFromProto(const NKikimrTxBackgroundProto::TTaskContainer& taskProto); + [[nodiscard]] std::unique_ptr Remove(const TString& className, const TString& identifier); + [[nodiscard]] std::unique_ptr ApplyControlFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& controlProto); + + void Start(const TStartContext& context) { + AFL_VERIFY(!Started); + Storage->Start(context); + Started = true; + } + + void Finish() { + AFL_VERIFY(Started); + Storage->Finish(); + Started = false; + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/manager/ya.make b/ydb/core/tx/columnshard/bg_tasks/manager/ya.make new file mode 100644 index 000000000000..0e5308ee6634 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/manager/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + manager.cpp + actor.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/bg_tasks/abstract + ydb/core/tx/columnshard/bg_tasks/protos + ydb/core/tablet_flat +) + +END() diff --git a/ydb/core/tx/columnshard/bg_tasks/protos/data.proto b/ydb/core/tx/columnshard/bg_tasks/protos/data.proto new file mode 100644 index 000000000000..93be1aae96bd --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/protos/data.proto @@ -0,0 +1,20 @@ +package NKikimrTxBackgroundProto; + +import "ydb/services/bg_tasks/protos/container.proto"; + +message TSessionControlContainer { + optional string SessionClassName = 1; + optional string SessionIdentifier = 2; + optional string StatusChannelContainer = 3; + optional string LogicControlContainer = 4; +} + +message TTaskContainer { + optional string Identifier= 1; + optional string StatusChannelContainer = 2; + optional string TaskDescriptionContainer = 3; +} + +message TEvSessionControl { + optional string ControlInterfaceContainer = 1; +} diff --git a/ydb/core/tx/columnshard/bg_tasks/protos/ya.make b/ydb/core/tx/columnshard/bg_tasks/protos/ya.make new file mode 100644 index 000000000000..5a8eea941bff --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/protos/ya.make @@ -0,0 +1,12 @@ +PROTO_LIBRARY() + +SRCS( + data.proto +) + +PEERDIR( + ydb/core/tx/columnshard/common/protos + ydb/services/bg_tasks/protos +) + +END() diff --git a/ydb/core/tx/columnshard/bg_tasks/session/session.cpp b/ydb/core/tx/columnshard/bg_tasks/session/session.cpp new file mode 100644 index 000000000000..cdc3393c5854 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/session/session.cpp @@ -0,0 +1,5 @@ +#include "session.h" + +namespace NKikimr::NOlap::NBackground { + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/session/session.h b/ydb/core/tx/columnshard/bg_tasks/session/session.h new file mode 100644 index 000000000000..387e536ea6e8 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/session/session.h @@ -0,0 +1,84 @@ +#pragma once +#include +#include +#include + +namespace NKikimr::NOlap::NBackground { + +class TSession { +private: + YDB_READONLY_DEF(TString, Identifier); + YDB_READONLY_DEF(std::optional, ActorId); + YDB_READONLY_DEF(TStatusChannelContainer, ChannelContainer); + YDB_READONLY_DEF(TSessionLogicContainer, LogicContainer); +public: + TSessionRecord SerializeToLocalDatabaseRecord() const { + TSessionRecord result; + result.SetIdentifier(Identifier); + result.SetClassName(LogicContainer.GetClassName()); + result.SetLogicDescription(LogicContainer.SerializeToString()); + result.SetProgress(LogicContainer->SerializeProgressToString()); + result.SetState(LogicContainer->SerializeStateToString()); + result.SetStatusChannel(ChannelContainer.SerializeToString()); + return result; + } + + template + std::shared_ptr GetLogicAsVerifiedPtr() const { + return LogicContainer.GetObjectPtrVerifiedAs(); + } + + [[nodiscard]] TConclusionStatus DeserializeFromLocalDatabase(TSessionRecord&& record) { + Identifier = record.GetIdentifier(); + ChannelContainer.DeserializeFromString(record.GetStatusChannel()); + if (!LogicContainer.DeserializeFromString(record.GetLogicDescription())) { + return TConclusionStatus::Fail("cannot parse logic description"); + } + if (!LogicContainer->DeserializeProgressFromString(record.GetProgress())) { + return TConclusionStatus::Fail("cannot parse progress"); + } + if (!LogicContainer->DeserializeStateFromString(record.GetState())) { + return TConclusionStatus::Fail("cannot parse state"); + } + return TConclusionStatus::Success(); + } + + TString GetLogicClassName() const { + return LogicContainer.GetClassName(); + } + + bool IsRunning() const { + return !!ActorId; + } + + const NActors::TActorId& GetActorIdVerified() const { + AFL_VERIFY(!!ActorId); + return *ActorId; + } + + void StartActor(const TStartContext& startContext) { + AFL_VERIFY(!ActorId); + std::unique_ptr actor = LogicContainer->CreateActor(startContext); + ActorId = NActors::TActivationContext::AsActorContext().Register(actor.release()); + } + + void FinishActor() { + AFL_VERIFY(!!ActorId); + NActors::TActivationContext::AsActorContext().Send(*ActorId, new NActors::TEvents::TEvPoisonPill); + ActorId.reset(); + } + + TSession() = default; + + TSession(const TString& identifier, const TStatusChannelContainer& channel, const TSessionLogicContainer& logic) + : Identifier(identifier) + , ChannelContainer(channel) + , LogicContainer(logic) + { + AFL_VERIFY(!!Identifier); + AFL_VERIFY(!!ChannelContainer); + AFL_VERIFY(!!LogicContainer); + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/session/storage.cpp b/ydb/core/tx/columnshard/bg_tasks/session/storage.cpp new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/ydb/core/tx/columnshard/bg_tasks/session/storage.h b/ydb/core/tx/columnshard/bg_tasks/session/storage.h new file mode 100644 index 000000000000..35faa7e14b6d --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/session/storage.h @@ -0,0 +1,103 @@ +#pragma once +#include "session.h" + +namespace NKikimr::NOlap::NBackground { + +class TSessionsContainer { +private: + THashMap> Sessions; +public: + TConclusionStatus AddSession(const std::shared_ptr& session) { + AFL_VERIFY(!!session); + if (!Sessions.emplace(session->GetIdentifier(), session).second) { + return TConclusionStatus::Fail("task emplaced already"); + } + return TConclusionStatus::Success(); + } + bool RemoveSession(const TString& identifier) { + AFL_VERIFY(!!identifier); + auto it = Sessions.find(identifier); + if (it == Sessions.end()) { + return false; + } + Sessions.erase(it); + return true; + } + void Start(const TStartContext& context) { + for (auto&& [_, i] : Sessions) { + i->StartActor(context); + } + } + + void Finish() { + for (auto&& [_, i] : Sessions) { + if (i->IsRunning()) { + i->FinishActor(); + } + } + } + + std::shared_ptr GetSession(const TString& identifier) { + auto it = Sessions.find(identifier); + if (it == Sessions.end()) { + return {}; + } + return it->second; + } + +}; + +class TSessionsStorage { +private: + NActors::TActorId ProcessActorId; + THashMap> ContainersByClass; + + TSessionsContainer& GetContainerOrCreateNew(const TString& className) { + auto it = ContainersByClass.find(className); + if (it == ContainersByClass.end()) { + it = ContainersByClass.emplace(className, std::make_shared()).first; + } + return *it->second; + } +public: + void Start(const TStartContext& context) { + for (auto&& i : ContainersByClass) { + i.second->Start(context); + } + } + + void Finish() { + for (auto&& i : ContainersByClass) { + i.second->Finish(); + } + } + + bool RemoveSession(const TString& className, const TString& identifier) { + auto container = GetSessionsContainer(className); + if (!container) { + return false; + } + return container->RemoveSession(identifier); + } + + TConclusionStatus AddSession(const std::shared_ptr& session) { + AFL_VERIFY(!!session); + TSessionsContainer& sessions = GetContainerOrCreateNew(session->GetLogicContainer().GetClassName()); + return sessions.AddSession(session); + } + + std::shared_ptr GetSession(const TString& className, const TString& identifier) { + TSessionsContainer& sessions = GetContainerOrCreateNew(className); + return sessions.GetSession(identifier); + } + + std::shared_ptr GetSessionsContainer(const TString& className) { + auto it = ContainersByClass.find(className); + if (it == ContainersByClass.end()) { + return nullptr; + } + return it->second; + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/bg_tasks/session/ya.make b/ydb/core/tx/columnshard/bg_tasks/session/ya.make new file mode 100644 index 000000000000..4dd46e14ab27 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/session/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + session.cpp + storage.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/bg_tasks/abstract + ydb/core/tx/columnshard/bg_tasks/protos +) + +END() diff --git a/ydb/core/tx/columnshard/bg_tasks/transactions/tx_add.cpp b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_add.cpp new file mode 100644 index 000000000000..65527076ae60 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_add.cpp @@ -0,0 +1,20 @@ +#include "tx_add.h" +#include + +namespace NKikimr::NOlap::NBackground { + +bool TTxAddSession::Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) { + Adapter->SaveSessionToLocalDatabase(txc, Session->SerializeToLocalDatabaseRecord()).Validate("on AddSession"); + return true; +} + +void TTxAddSession::Complete(const TActorContext& ctx) { + Sessions->AddSession(Session).Validate("on add background session"); + Session->GetChannelContainer()->OnAdded(); + if (Session->GetLogicContainer()->IsReadyForStart()) { + TStartContext context(ctx.SelfID, Session->GetChannelContainer()); + Session->StartActor(context); + } +} + +} diff --git a/ydb/core/tx/columnshard/bg_tasks/transactions/tx_add.h b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_add.h new file mode 100644 index 000000000000..eb623172aa0d --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_add.h @@ -0,0 +1,29 @@ +#pragma once +#include +#include +#include +#include + +namespace NKikimr::NOlap::NBackground { +class TSessionsStorage; +class TTxAddSession: public NTabletFlatExecutor::ITransaction { +private: + std::shared_ptr Sessions; + std::shared_ptr Adapter; + std::shared_ptr Session; +public: + TTxAddSession(const std::shared_ptr& adapter, const std::shared_ptr& sessions, const std::shared_ptr& session) + : Sessions(sessions) + , Adapter(adapter) + , Session(session) { + AFL_VERIFY(!!Session); + } + + virtual bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + virtual void Complete(const TActorContext& ctx) override; + virtual TTxType GetTxType() const override { + return NColumnShard::TXTYPE_ADD_BACKGROUND_SESSION; + }; +}; + +} diff --git a/ydb/core/tx/columnshard/bg_tasks/transactions/tx_general.cpp b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_general.cpp new file mode 100644 index 000000000000..36f5fabd3678 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_general.cpp @@ -0,0 +1,13 @@ +#include "tx_general.h" +#include + +namespace NKikimr::NOlap::NBackground { + +void TTxGeneral::Complete(const TActorContext& ctx) { + DoComplete(ctx); + if (!!ProgressActorId) { + ctx.Send(*ProgressActorId, new TEvTransactionCompleted(TxInternalId)); + } +} + +} diff --git a/ydb/core/tx/columnshard/bg_tasks/transactions/tx_general.h b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_general.h new file mode 100644 index 000000000000..1dc335776e24 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_general.h @@ -0,0 +1,24 @@ +#pragma once +#include +#include +#include +#include + +namespace NKikimr::NOlap::NBackground { +class TTxGeneral: public NTabletFlatExecutor::ITransaction { +private: + using TBase = NTabletFlatExecutor::ITransaction; + const std::optional ProgressActorId; + const ui64 TxInternalId; + virtual void DoComplete(const TActorContext& ctx) = 0; +public: + TTxGeneral(const std::optional progressActorId, const ui64 txInternalId) + : ProgressActorId(progressActorId) + , TxInternalId(txInternalId) + { + } + + void Complete(const TActorContext& ctx) override final; +}; + +} diff --git a/ydb/core/tx/columnshard/bg_tasks/transactions/tx_remove.cpp b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_remove.cpp new file mode 100644 index 000000000000..492c2daa6e21 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_remove.cpp @@ -0,0 +1,15 @@ +#include "tx_remove.h" +#include + +namespace NKikimr::NOlap::NBackground { + +bool TTxRemoveSession::Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { + Adapter->RemoveSessionFromLocalDatabase(txc, ClassName, Identifier).Validate(); + return true; +} + +void TTxRemoveSession::Complete(const TActorContext& /*ctx*/) { + AFL_VERIFY(Sessions->RemoveSession(ClassName, Identifier)); +} + +} diff --git a/ydb/core/tx/columnshard/bg_tasks/transactions/tx_remove.h b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_remove.h new file mode 100644 index 000000000000..09d2d85ce7a7 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_remove.h @@ -0,0 +1,34 @@ +#pragma once +#include +#include +#include +#include + +namespace NKikimr::NOlap::NBackground { +class TSessionsContainer; +class TTxRemoveSession: public NTabletFlatExecutor::ITransaction { +private: + using TBase = NTabletFlatExecutor::ITransaction; + const TString ClassName; + const TString Identifier; + std::shared_ptr Sessions; + const std::shared_ptr Adapter; +public: + TTxRemoveSession(const TString& className, const TString& identifier, const std::shared_ptr& adapter, const std::shared_ptr& sessions) + : ClassName(className) + , Identifier(identifier) + , Sessions(sessions) + , Adapter(adapter) + { + AFL_VERIFY(!!Adapter); + AFL_VERIFY(!!Sessions); + AFL_VERIFY(!!ClassName); + AFL_VERIFY(!!Identifier); + } + + bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) override; + void Complete(const TActorContext& ctx) override; + TTxType GetTxType() const override { return NColumnShard::TXTYPE_REMOVE_BACKGROUND_SESSION; } +}; + +} diff --git a/ydb/core/tx/columnshard/bg_tasks/transactions/tx_save_progress.cpp b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_save_progress.cpp new file mode 100644 index 000000000000..f829142d4edd --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_save_progress.cpp @@ -0,0 +1,14 @@ +#include "tx_save_progress.h" +#include + +namespace NKikimr::NOlap::NBackground { + +bool TTxSaveSessionProgress::Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { + Adapter->SaveProgressToLocalDatabase(txc, Session->SerializeToLocalDatabaseRecord()).Validate(); + return true; +} + +void TTxSaveSessionProgress::DoComplete(const TActorContext& /*ctx*/) { +} + +} diff --git a/ydb/core/tx/columnshard/bg_tasks/transactions/tx_save_progress.h b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_save_progress.h new file mode 100644 index 000000000000..1c72c7e0ccb8 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_save_progress.h @@ -0,0 +1,29 @@ +#pragma once +#include "tx_general.h" +#include +#include +#include +#include + +namespace NKikimr::NOlap::NBackground { +class TTxSaveSessionProgress: public TTxGeneral { +private: + using TBase = TTxGeneral; + const std::shared_ptr Session; + const std::shared_ptr Adapter; + virtual void DoComplete(const TActorContext& ctx) override; +public: + TTxSaveSessionProgress(const std::shared_ptr& session, const NActors::TActorId& progressActorId, const std::shared_ptr& adapter, const ui64 txInternalId) + : TBase(progressActorId, txInternalId) + , Session(session) + , Adapter(adapter) + { + AFL_VERIFY(!!Adapter); + AFL_VERIFY(!!Session); + } + + bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& /*ctx*/) override; + TTxType GetTxType() const override { return NColumnShard::TXTYPE_SAVE_BACKGROUND_SESSION_PROGRESS; } +}; + +} diff --git a/ydb/core/tx/columnshard/bg_tasks/transactions/tx_save_state.cpp b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_save_state.cpp new file mode 100644 index 000000000000..84ebda8713e5 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_save_state.cpp @@ -0,0 +1,20 @@ +#include "tx_save_state.h" +#include + +namespace NKikimr::NOlap::NBackground { + +bool TTxSaveSessionState::Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { + Adapter->SaveStateToLocalDatabase(txc, Session->SerializeToLocalDatabaseRecord()).Validate(); + return true; +} + +void TTxSaveSessionState::DoComplete(const TActorContext& ctx) { + if (Session->GetLogicContainer()->IsReadyForRemove()) { + ctx.Send(Adapter->GetTabletActorId(), new TEvRemoveSession(Session->GetLogicClassName(), Session->GetIdentifier())); + } else if (Session->GetLogicContainer()->IsReadyForStart()) { + TStartContext context(ctx.SelfID, Session->GetChannelContainer()); + Session->StartActor(context); + } +} + +} diff --git a/ydb/core/tx/columnshard/bg_tasks/transactions/tx_save_state.h b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_save_state.h new file mode 100644 index 000000000000..0894aa9b76a8 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/transactions/tx_save_state.h @@ -0,0 +1,29 @@ +#pragma once +#include "tx_general.h" +#include +#include +#include +#include + +namespace NKikimr::NOlap::NBackground { +class TTxSaveSessionState: public TTxGeneral { +private: + using TBase = TTxGeneral; + const std::shared_ptr Session; + const std::shared_ptr Adapter; + virtual void DoComplete(const TActorContext& ctx) override; +public: + TTxSaveSessionState(const std::shared_ptr& session, const std::optional progressActorId, const std::shared_ptr& adapter, const ui64 txInternalId) + : TBase(progressActorId, txInternalId) + , Session(session) + , Adapter(adapter) + { + AFL_VERIFY(!!Session); + AFL_VERIFY(!!Adapter); + } + + bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) override; + TTxType GetTxType() const override { return NColumnShard::TXTYPE_SAVE_BACKGROUND_SESSION_STATE; } +}; + +} diff --git a/ydb/core/tx/columnshard/bg_tasks/transactions/ya.make b/ydb/core/tx/columnshard/bg_tasks/transactions/ya.make new file mode 100644 index 000000000000..301b5858eac8 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/transactions/ya.make @@ -0,0 +1,18 @@ +LIBRARY() + +SRCS( + tx_save_progress.cpp + tx_save_state.cpp + tx_remove.cpp + tx_add.cpp +) + +PEERDIR( + ydb/core/tablet_flat + ydb/core/tx/columnshard/bg_tasks/abstract + ydb/core/tx/columnshard/bg_tasks/events + ydb/core/tx/columnshard/bg_tasks/session + ydb/core/protos +) + +END() diff --git a/ydb/core/tx/columnshard/bg_tasks/ya.make b/ydb/core/tx/columnshard/bg_tasks/ya.make new file mode 100644 index 000000000000..2f352dc06d71 --- /dev/null +++ b/ydb/core/tx/columnshard/bg_tasks/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +PEERDIR( + ydb/core/tx/columnshard/bg_tasks/abstract + ydb/core/tx/columnshard/bg_tasks/events + ydb/core/tx/columnshard/bg_tasks/manager + ydb/core/tx/columnshard/bg_tasks/protos + ydb/core/tx/columnshard/bg_tasks/transactions + ydb/core/tx/columnshard/bg_tasks/session +) + +END() diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.cpp b/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.cpp new file mode 100644 index 000000000000..ffa34560383d --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.cpp @@ -0,0 +1,65 @@ +#include "adapter.h" +#include + +namespace NKikimr::NSchemeShard::NBackground { + +bool TAdapter::DoLoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque& records) { + NIceDb::TNiceDb db(txc.DB); + using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions; + auto rowset = db.Table().Select(); + if (!rowset.IsReady()) { + return false; + } + + std::deque result; + while (!rowset.EndOfSet()) { + NOlap::NBackground::TSessionRecord sRecord; + sRecord.SetClassName(rowset.GetValue()); + sRecord.SetIdentifier(rowset.GetValue()); + sRecord.SetLogicDescription(rowset.GetValue()); + sRecord.SetStatusChannel(rowset.GetValue()); + sRecord.SetProgress(rowset.GetValue()); + sRecord.SetState(rowset.GetValue()); + result.emplace_back(std::move(sRecord)); + if (!rowset.Next()) { + return false; + } + } + std::swap(result, records); + return true; +} + +void TAdapter::DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) { + NIceDb::TNiceDb db(txc.DB); + using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions; + db.Table().Key(container.GetClassName(), container.GetIdentifier()).Update( + NIceDb::TUpdate(container.GetProgress()) + ); +} + +void TAdapter::DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) { + NIceDb::TNiceDb db(txc.DB); + using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions; + db.Table().Key(container.GetClassName(), container.GetIdentifier()).Update( + NIceDb::TUpdate(container.GetState()) + ); +} + +void TAdapter::DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) { + NIceDb::TNiceDb db(txc.DB); + using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions; + db.Table().Key(container.GetClassName(), container.GetIdentifier()).Update( + NIceDb::TUpdate(container.GetLogicDescription()), + NIceDb::TUpdate(container.GetStatusChannel()), + NIceDb::TUpdate(container.GetProgress()), + NIceDb::TUpdate(container.GetState()) + ); +} + +void TAdapter::DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) { + NIceDb::TNiceDb db(txc.DB); + using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions; + db.Table().Key(className, identifier).Delete(); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.h b/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.h new file mode 100644 index 000000000000..ec50c4ea1eef --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.h @@ -0,0 +1,14 @@ +#pragma once +#include + +namespace NKikimr::NSchemeShard::NBackground { + +class TAdapter: public NKikimr::NOlap::NBackground::ITabletAdapter { +protected: + virtual bool DoLoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque& records) override; + virtual void DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) override; + virtual void DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) override; + virtual void DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) override; + virtual void DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) override;; +}; +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/ya.make b/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/ya.make new file mode 100644 index 000000000000..4b5a0b0f87a0 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/adapter/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + adapter.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/bg_tasks/abstract +) + +END() diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/protos/data.proto b/ydb/core/tx/schemeshard/olap/bg_tasks/protos/data.proto new file mode 100644 index 000000000000..6b862855b56b --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/protos/data.proto @@ -0,0 +1,33 @@ +package NKikimrSchemeShardTxBackgroundProto; + +import "ydb/core/protos/flat_scheme_op.proto"; + +message TTxModification { + optional NKikimrSchemeOp.TModifyScheme Transaction = 1; +} + +message TTxChainCommonData { + optional string TablePath = 1; + repeated TTxModification Modification = 2; +} + +message TTxChainTask { + optional TTxChainCommonData CommonData = 1; +} + +message TTxChainSessionLogic { + optional TTxChainCommonData CommonData = 1; +} + +message TTxChainSessionProgress { + optional uint32 StepForExecute = 1; +} + +message TTxChainSessionState { +} + +message TStatusChannel { + optional uint64 SSTabletId = 1; + optional string TaskClassName = 2; + optional string TaskIdentifier = 3; +} diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/protos/ya.make b/ydb/core/tx/schemeshard/olap/bg_tasks/protos/ya.make new file mode 100644 index 000000000000..da3a31187631 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/protos/ya.make @@ -0,0 +1,13 @@ +PROTO_LIBRARY() + +SRCS( + data.proto +) + +PEERDIR( + ydb/core/protos +) + +EXCLUDE_TAGS(GO_PROTO) + +END() diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/resharding/task.cpp b/ydb/core/tx/schemeshard/olap/bg_tasks/resharding/task.cpp new file mode 100644 index 000000000000..5d6c6f5798d7 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/resharding/task.cpp @@ -0,0 +1,5 @@ +#include "task.h" + +namespace NKikimr::NOlap::NBackground { + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/resharding/task.h b/ydb/core/tx/schemeshard/olap/bg_tasks/resharding/task.h new file mode 100644 index 000000000000..c72f2a9e5fc1 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/resharding/task.h @@ -0,0 +1,182 @@ +#pragma once +#include "session.h" +#include +#include +#include + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +class TStatusChannel: public NBackgroundTasks::TInterfaceProtoAdapter { +private: + TString TaskClassName; + TString TaskIdentifier; + ui64 SSTabletId; +public: + TStatusChannel(const TString taskClassName, const TString& taskId, const ui64 ssTabletId) + : TaskClassName(taskClassName) + , TaskIdentifier(taskId) + , SSTabletId(ssTabletId) + { + AFL_VERIFY(!!TaskClassName); + AFL_VERIFY(!!TaskIdentifier); + AFL_VERIFY(SSTabletId); + } + virtual void DoOnFail(const TString& errorMessage) const override { + + } + virtual void DoOnAdded() const override { + + } + virtual void DoOnFinished() const override { + + } + virtual TProtoStorage DoSerializeToProto() const override { + TProtoStorage result; + result.SetTaskClassName(TaskClassName); + result.SetTaskIdentifier(TaskIdentifier); + result.SetSSTabletId(SSTabletId); + return result; + } + virtual TConclusionStatus DoDeserializeFromProto(const TProtoStorage& proto) override { + TaskClassName = proto.GetTaskClassName(); + TaskIdentifier = result.GetTaskIdentifier(); + SSTabletId = result.GetSSTabletId(); + return TConclusionStatus::Success(); + } +}; + +class TTransactionsActor: public NOlap::NBackground::TSessionActor { +private: + std::shared_ptr SessionLogic; +protected: + virtual void OnTxCompleted(const ui64 txInternalId) override { + + } + virtual void OnSessionProgressSaved() override { + + } + virtual void OnSessionStateSaved() override { + if (SessionLogic->GetStepForExecute() < SessionLogic->GetTransactions().size()) { + SendTransactionForExecute(SessionLogic->GetTransactions()[SessionLogic->GetStepForExecute()]); + } + } + virtual void OnBootstrap(const TActorContext& ctx) override { + Become(&TTransactionsActor::StateInProgress); + SessionLogic = Session.GetLogicAsVerifiedPtr(); + AFL_VERIFY(SessionLogic->GetStepForExecute() < SessionLogic->GetTransactions().size()); + SendTransactionForExecute(SessionLogic->GetTransactions()[SessionLogic->GetStepForExecute()]); + } + + void SendTransactionForExecute(const NKikimrSchemeOp::TModifyScheme& modification) const { + auto ev = std::make_unique(txId, SSTabletId); + *ev->Record.AddTransaction() = modification; + NActors::TActivationContext::AsActorContext().Send(MakePipePeNodeCacheID(false), + new TEvPipeCache::TEvForward(ev.release(), SSTabletId, true), IEventHandle::FlagTrackDelivery, SessionLogic->GetStepForExecute()); + } + + void Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev) { + SessionLogic->NextStep(); + SaveSessionProgress(); + } +public: + STATEFN(StateInProgress) { + const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_BACKGROUND)("SelfId", SelfId())("TabletId", TabletId); + switch (ev->GetTypeRewrite()) { + hFunc(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle); + default: + TBase::StateInProgress(ev); + } + } + +}; + +class TSessionTransactions: public NOlap::NBackground::TSessionProtoAdapter< + NKikimrSchemeShardTxBackgroundProto::TSessionTransactionsLogic, + NKikimrSchemeShardTxBackgroundProto::TSessionTransactionsProgress, + NKikimrSchemeShardTxBackgroundProto::TSessionTransactionsState> { +public: + static TString GetStaticClassName() { + return "SS::BG::TRANSACTIONS"; + } +private: + using TBase = NBackgroundTasks::TInterfaceProtoAdapter; + YDB_READONLY_DEF(TString, TablePath); + YDB_READONLY_DEF(std::vector, Transactions); + YDB_READONLY(ui32, StepForExecute, 0); +protected: + virtual TConclusion> DoCreateActor(const TStartContext& context) const override { + + } + virtual TConclusionStatus DoDeserializeFromProto(const TProtoLogic& proto) override { + TablePath = proto.GetTablePath(); + for (auto&& i : proto.GetModifications()) { + Transactions.emplace_back(i.GetTransaction()); + } + return TConclusionStatus::Success(); + } + virtual TProtoLogic DoSerializeToProto() const override { + TProtoLogic result; + result.SetTablePath(TablePath); + for (auto&& i : Transactions) { + *result.AddModification()->MutableTransaction() = i; + } + return result; + } + virtual TConclusionStatus DoDeserializeProgressFromProto(const TProtoProgress& proto) override { + StepForExecute = proto.GetStepForExecute(); + return TConclusionStatus::Success(); + } + virtual TProtoProgress DoSerializeProgressToProto() const override { + TProtoProgress proto; + proto.SetStepForExecute(StepForExecute); + return proto; + } + virtual TConclusionStatus DoDeserializeStateFromProto(const TProtoState& proto) override { + return TConclusionStatus::Success(); + } + virtual TProtoState DoSerializeStateToProto() const override { + + } +public: + virtual TString GetClassName() const override { + return GetStaticClassName(); + } + virtual bool IsReadyForStart() const override { + return true; + } + virtual bool IsFinished() const override { + AFL_VERIFY(StepForExecute <= Transactions.size()) + return StepForExecute == Transactions.size(); + } + virtual bool IsReadyForRemove() const override { + return true; + } +}; + +class TTaskResharding: public NBackgroundTasks::TInterfaceProtoAdapter { +private: + using TBase = NBackgroundTasks::TInterfaceProtoAdapter; +public: + static TString GetStaticClassName() { + return "SS::OLAP::RESHARDING"; + } +private: + virtual TConclusionStatus DoDeserializeFromProto(const TProtoStorage& proto) override { + TablePath = proto.GetTablePath(); + } + virtual TProtoStorage DoSerializeToProto() const { + TProtoStorage result; + result.SetTablePath(TablePath); + return result; + } + virtual std::shared_ptr DoBuildSession() const override { + return std::make_shared(TablePath); + } + static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetStaticClassName()); +public: + virtual TString GetClassName() const override { + return GetStaticClassName(); + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.cpp b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.cpp new file mode 100644 index 000000000000..4cbf2a7f6ac4 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.cpp @@ -0,0 +1,38 @@ +#include "actor.h" +#include +#include + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +void TTxChainActor::SendTransactionForExecute(const ui64 txId, const NKikimrSchemeOp::TModifyScheme& modification) const { + auto ev = std::make_unique(txId, (ui64)TabletId); + *ev->Record.AddTransaction() = modification; + NActors::TActivationContext::AsActorContext().Send(MakePipePeNodeCacheID(false), + new TEvPipeCache::TEvForward(ev.release(), (ui64)TabletId, true), IEventHandle::FlagTrackDelivery, SessionLogic->GetStepForExecute()); +} + +void TTxChainActor::OnBootstrap(const TActorContext& ctx) { + Become(&TTxChainActor::StateInProgress); + SessionLogic = Session->GetLogicAsVerifiedPtr(); + AFL_VERIFY(SessionLogic->GetStepForExecute() < SessionLogic->GetTxData().GetTransactions().size()); + TxAllocatorClient = RegisterWithSameMailbox(CreateTxAllocatorClient(&AppDataVerified())); + ctx.Send(TxAllocatorClient, MakeHolder(1)); +} + +void TTxChainActor::Handle(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev) { + AFL_VERIFY(ev->Get()->TxIds.size() == 1); + SendTransactionForExecute(ev->Get()->TxIds.front(), SessionLogic->GetTxData().GetTransactions()[SessionLogic->GetStepForExecute()]); +} + +void TTxChainActor::Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& /*ev*/) { + SessionLogic->NextStep(); + SaveSessionProgress(); +} + +void TTxChainActor::OnSessionStateSaved() { + if (SessionLogic->GetStepForExecute() < SessionLogic->GetTxData().GetTransactions().size()) { + NActors::TActivationContext::AsActorContext().Send(TxAllocatorClient, MakeHolder(1)); + } +} + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.h b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.h new file mode 100644 index 000000000000..add6ac97d267 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/actor.h @@ -0,0 +1,51 @@ +#pragma once +#include "session.h" +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +class TTxChainActor: public NKikimr::NOlap::NBackground::TSessionActor { +private: + using TBase = NKikimr::NOlap::NBackground::TSessionActor; + std::shared_ptr SessionLogic; + NActors::TActorId TxAllocatorClient; +protected: + virtual void OnTxCompleted(const ui64 /*txInternalId*/) override { + + } + virtual void OnSessionProgressSaved() override { + + } + virtual void OnSessionStateSaved() override; + virtual void OnBootstrap(const TActorContext& /*ctx*/) override; + + void SendTransactionForExecute(const ui64 txId, const NKikimrSchemeOp::TModifyScheme& modification) const; + + void Handle(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev); + void Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& /*ev*/); +public: + TTxChainActor(const NKikimr::NOlap::TTabletId tabletId, const NActors::TActorId tabletActorId, const std::shared_ptr& session, const std::shared_ptr& adapter) + : TBase(tabletId, tabletActorId, session, adapter) + { + AFL_VERIFY(!!Session); + AFL_VERIFY(!!Adapter); + } + + STATEFN(StateInProgress) { + const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_BACKGROUND)("SelfId", SelfId())("TabletId", TabletId); + switch (ev->GetTypeRewrite()) { + hFunc(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle); + hFunc(TEvTxAllocatorClient::TEvAllocateResult, Handle); + default: + TBase::StateInProgress(ev); + } + } + +}; +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/common.cpp b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/common.cpp new file mode 100644 index 000000000000..b4d7c94df9d6 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/common.cpp @@ -0,0 +1,22 @@ +#include "common.h" + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +TConclusionStatus TTxChainData::DeserializeFromProto(const TProtoStorage& proto) { + TablePath = proto.GetTablePath(); + for (auto&& i : proto.GetModification()) { + Transactions.emplace_back(i.GetTransaction()); + } + return TConclusionStatus::Success(); +} + +TTxChainData::TProtoStorage TTxChainData::SerializeToProto() const { + TProtoStorage result; + result.SetTablePath(TablePath); + for (auto&& i : Transactions) { + *result.AddModification()->MutableTransaction() = i; + } + return result; +} + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/common.h b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/common.h new file mode 100644 index 000000000000..bce627d7a668 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/common.h @@ -0,0 +1,18 @@ +#pragma once +#include +#include +#include + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +class TTxChainData { +private: + YDB_READONLY_DEF(TString, TablePath); + YDB_READONLY_DEF(std::vector, Transactions); +public: + using TProtoStorage = NKikimrSchemeShardTxBackgroundProto::TTxChainCommonData; + TConclusionStatus DeserializeFromProto(const TProtoStorage& proto); + TProtoStorage SerializeToProto() const; +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/session.cpp b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/session.cpp new file mode 100644 index 000000000000..e916ae0434f2 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/session.cpp @@ -0,0 +1,10 @@ +#include "session.h" +#include "actor.h" + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +TConclusion> TTxChainSession::DoCreateActor(const NKikimr::NOlap::NBackground::TStartContext& context) const { + return std::make_unique(context.GetTabletId(), context.GetTabletActorId(), context.GetSessionSelfPtr(), context.GetAdapter()); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/session.h b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/session.h new file mode 100644 index 000000000000..8c3cbf9e7ead --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/session.h @@ -0,0 +1,75 @@ +#pragma once +#include "common.h" +#include +#include + +#include +#include + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +class TTxChainSession: public NKikimr::NOlap::NBackground::TSessionProtoAdapter< + NKikimrSchemeShardTxBackgroundProto::TTxChainSessionLogic, + NKikimrSchemeShardTxBackgroundProto::TTxChainSessionProgress, + NKikimrSchemeShardTxBackgroundProto::TTxChainSessionState> { +public: + static TString GetStaticClassName() { + return "SS::BG::TX_CHAIN"; + } +private: + using TBase = NKikimr::NOlap::NBackground::TSessionProtoAdapter; + YDB_READONLY_DEF(TTxChainData, TxData); + YDB_READONLY(ui32, StepForExecute, 0); +protected: + virtual TConclusion> DoCreateActor(const NKikimr::NOlap::NBackground::TStartContext& context) const override; + virtual TConclusionStatus DoDeserializeFromProto(const TProtoLogic& proto) override { + return TxData.DeserializeFromProto(proto.GetCommonData()); + } + virtual TProtoLogic DoSerializeToProto() const override { + TProtoLogic result; + *result.MutableCommonData() = TxData.SerializeToProto(); + return result; + } + virtual TConclusionStatus DoDeserializeProgressFromProto(const TProtoProgress& proto) override { + StepForExecute = proto.GetStepForExecute(); + return TConclusionStatus::Success(); + } + virtual TProtoProgress DoSerializeProgressToProto() const override { + TProtoProgress proto; + proto.SetStepForExecute(StepForExecute); + return proto; + } + virtual TConclusionStatus DoDeserializeStateFromProto(const TProtoState& /*proto*/) override { + return TConclusionStatus::Success(); + } + virtual TProtoState DoSerializeStateToProto() const override { + TProtoState result; + return result; + } +public: + TTxChainSession(const TTxChainData& data) + : TxData(data) + { + + } + + void NextStep() { + AFL_VERIFY(++StepForExecute <= GetTxData().GetTransactions().size()); + } + + virtual TString GetClassName() const override { + return GetStaticClassName(); + } + virtual bool IsReadyForStart() const override { + return true; + } + virtual bool IsFinished() const override { + AFL_VERIFY(StepForExecute <= GetTxData().GetTransactions().size()); + return StepForExecute == GetTxData().GetTransactions().size(); + } + virtual bool IsReadyForRemove() const override { + return true; + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/status_channel.cpp b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/status_channel.cpp new file mode 100644 index 000000000000..135bac68d2f3 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/status_channel.cpp @@ -0,0 +1,5 @@ +#include "status_channel.h" + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/status_channel.h b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/status_channel.h new file mode 100644 index 000000000000..b42cc11a8db7 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/status_channel.h @@ -0,0 +1,50 @@ +#pragma once +#include +#include +#include +#include +#include + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +class TStatusChannel: public NBackgroundTasks::TInterfaceProtoAdapter { +private: + using TBase = NBackgroundTasks::TInterfaceProtoAdapter; + TString TaskClassName; + TString TaskIdentifier; + ui64 SSTabletId; +public: + TStatusChannel(const TString taskClassName, const TString& taskId, const ui64 ssTabletId) + : TaskClassName(taskClassName) + , TaskIdentifier(taskId) + , SSTabletId(ssTabletId) + { + AFL_VERIFY(!!TaskClassName); + AFL_VERIFY(!!TaskIdentifier); + AFL_VERIFY(SSTabletId); + } + virtual void DoOnFail(const TString& /*errorMessage*/) const override { + + } + virtual void DoOnAdded() const override { + + } + virtual void DoOnFinished() const override { + + } + virtual TProtoStorage DoSerializeToProto() const override { + TProtoStorage result; + result.SetTaskClassName(TaskClassName); + result.SetTaskIdentifier(TaskIdentifier); + result.SetSSTabletId(SSTabletId); + return result; + } + virtual TConclusionStatus DoDeserializeFromProto(const TProtoStorage& proto) override { + TaskClassName = proto.GetTaskClassName(); + TaskIdentifier = proto.GetTaskIdentifier(); + SSTabletId = proto.GetSSTabletId(); + return TConclusionStatus::Success(); + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/task.cpp b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/task.cpp new file mode 100644 index 000000000000..945814b1a2eb --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/task.cpp @@ -0,0 +1,10 @@ +#include "task.h" +#include "session.h" + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +std::shared_ptr TTxChainTask::DoBuildSession() const { + return std::make_shared(TxData); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/task.h b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/task.h new file mode 100644 index 000000000000..e8ba8ead96fa --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/task.h @@ -0,0 +1,37 @@ +#pragma once +#include "common.h" +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NSchemeShard::NOlap::NBackground { + +class TTxChainTask: public NBackgroundTasks::TInterfaceProtoAdapter { +private: + TTxChainData TxData; + using TBase = NBackgroundTasks::TInterfaceProtoAdapter; +public: + static TString GetStaticClassName() { + return "SS::BG::TX_CHAIN"; + } +private: + virtual TConclusionStatus DoDeserializeFromProto(const TProtoStorage& proto) override { + return TxData.DeserializeFromProto(proto.GetCommonData()); + } + virtual TProtoStorage DoSerializeToProto() const override { + TProtoStorage result; + *result.MutableCommonData() = TxData.SerializeToProto(); + return result; + } + virtual std::shared_ptr DoBuildSession() const override; + static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetStaticClassName()); +public: + virtual TString GetClassName() const override { + return GetStaticClassName(); + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/ya.make b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/ya.make new file mode 100644 index 000000000000..edaaef5b8527 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + status_channel.cpp + task.cpp + session.cpp + actor.cpp + common.cpp +) + +PEERDIR( + ydb/core/tx/schemeshard/olap/bg_tasks/protos + ydb/core/tx/columnshard/bg_tasks/abstract +) + +END() diff --git a/ydb/core/tx/schemeshard/olap/bg_tasks/ya.make b/ydb/core/tx/schemeshard/olap/bg_tasks/ya.make new file mode 100644 index 000000000000..f83b5e3e843d --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/bg_tasks/ya.make @@ -0,0 +1,9 @@ +LIBRARY() + +PEERDIR( + ydb/core/tx/schemeshard/olap/bg_tasks/adapter + ydb/core/tx/schemeshard/olap/bg_tasks/protos + ydb/core/tx/schemeshard/olap/bg_tasks/tx_chain +) + +END() diff --git a/ydb/core/tx/schemeshard/olap/ya.make b/ydb/core/tx/schemeshard/olap/ya.make index fbb487ef0b5a..d41824702a58 100644 --- a/ydb/core/tx/schemeshard/olap/ya.make +++ b/ydb/core/tx/schemeshard/olap/ya.make @@ -2,6 +2,7 @@ LIBRARY() PEERDIR( ydb/core/tx/schemeshard/olap/columns + ydb/core/tx/schemeshard/olap/bg_tasks ydb/core/tx/schemeshard/olap/indexes ydb/core/tx/schemeshard/olap/schema ydb/core/tx/schemeshard/olap/common diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 2c5c92617953..4961c21f98a2 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1,6 +1,7 @@ #include "schemeshard.h" #include "schemeshard_impl.h" #include "schemeshard_svp_migration.h" +#include "olap/bg_tasks/adapter/adapter.h" #include #include @@ -407,6 +408,7 @@ void TSchemeShard::Clear() { Views.clear(); ColumnTables = { }; + BackgroundSessionsManager = std::make_shared(std::make_shared()); RevertedMigrations.clear(); @@ -4230,6 +4232,7 @@ TSchemeShard::TSchemeShard(const TActorId &tablet, TTabletStorageInfo *info) TabletCounters = TabletCountersPtr.Get(); SelfPinger = new TSelfPinger(SelfTabletId(), TabletCounters); + BackgroundSessionsManager = std::make_shared(std::make_shared()); } const TDomainsInfo::TDomain& TSchemeShard::GetDomainDescription(const TActorContext &ctx) const { @@ -4319,16 +4322,6 @@ void TSchemeShard::OnTabletDead(TEvTablet::TEvTabletDead::TPtr &ev, const TActor Die(ctx); } -static TVector CollectTxAllocators(const TAppData *appData) { - TVector allocators; - if (const auto& domain = appData->DomainsInfo->Domain) { - for (auto tabletId: domain->TxAllocators) { - allocators.push_back(tabletId); - } - } - return allocators; -} - void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { const TTabletId selfTabletId = SelfTabletId(); @@ -4376,7 +4369,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { AllowServerlessStorageBilling = appData->FeatureFlags.GetAllowServerlessStorageBillingForSchemeShard(); appData->Icb->RegisterSharedControl(AllowServerlessStorageBilling, "SchemeShard_AllowServerlessStorageBilling"); - TxAllocatorClient = RegisterWithSameMailbox(CreateTxAllocatorClient(CollectTxAllocators(appData))); + TxAllocatorClient = RegisterWithSameMailbox(CreateTxAllocatorClient(appData)); SysPartitionStatsCollector = Register(NSysView::CreatePartitionStatsCollector().Release()); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index c00aef33df61..7fedcfed1a49 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -49,6 +49,7 @@ #include #include #include +#include #include #include #include @@ -249,6 +250,7 @@ class TSchemeShard TTempTablesState TempTablesState; TTablesStorage ColumnTables; + std::shared_ptr BackgroundSessionsManager; // it is only because we need to manage undo of upgrade subdomain, finally remove it THashMap> RevertedMigrations; diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 4bf23b3691cb..1a7abb8950ed 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1758,15 +1758,27 @@ struct Schema : NIceDb::Schema { >; }; - struct View : Table<108> { - struct PathId : Column<1, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; }; - struct AlterVersion : Column<2, NScheme::NTypeIds::Uint64> {}; - struct QueryText : Column<3, NScheme::NTypeIds::String> {}; + struct View: Table<108> { + struct PathId: Column<1, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; }; + struct AlterVersion: Column<2, NScheme::NTypeIds::Uint64> {}; + struct QueryText: Column<3, NScheme::NTypeIds::String> {}; using TKey = TableKey; using TColumns = TableColumns; }; + struct BackgroundSessions: Table<109> { + struct ClassName: Column<1, NScheme::NTypeIds::String> {}; + struct Identifier: Column<2, NScheme::NTypeIds::String> {}; + struct StatusChannel: Column<3, NScheme::NTypeIds::String> {}; + struct LogicDescription: Column<4, NScheme::NTypeIds::String> {}; + struct Progress: Column<5, NScheme::NTypeIds::String> {}; + struct State: Column<6, NScheme::NTypeIds::String> {}; + + using TKey = TableKey; + using TColumns = TableColumns; + }; + using TTables = SchemaTables< Paths, TxInFlight, @@ -1874,7 +1886,8 @@ struct Schema : NIceDb::Schema { ExternalDataSource, PersQueueGroupStats, BuildColumnOperationSettings, - View + View, + BackgroundSessions >; static constexpr ui64 SysParam_NextPathId = 1; diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index ac8e6d436cb0..310f550bdd79 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -276,6 +276,7 @@ PEERDIR( ydb/library/yql/minikql ydb/library/yql/providers/common/proto ydb/services/bg_tasks + ydb/core/tx/columnshard/bg_tasks/manager ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/tx/tx_allocator_client/actor_client.cpp b/ydb/core/tx/tx_allocator_client/actor_client.cpp index 00e47c440ec7..7ee371522588 100644 --- a/ydb/core/tx/tx_allocator_client/actor_client.cpp +++ b/ydb/core/tx/tx_allocator_client/actor_client.cpp @@ -2,6 +2,7 @@ #include "client.h" #include +#include #include #include @@ -125,4 +126,22 @@ IActor* CreateTxAllocatorClient(TVector txAllocators) { return new TTxAllocatorClientActor(std::move(txAllocators)); } +namespace { + +TVector CollectTxAllocators(const TAppData* appData) { + TVector allocators; + if (const auto& domain = appData->DomainsInfo->Domain) { + for (auto tabletId : domain->TxAllocators) { + allocators.push_back(tabletId); + } + } + return allocators; +} + +} + +IActor* CreateTxAllocatorClient(const TAppData* appData) { + return CreateTxAllocatorClient(CollectTxAllocators(appData)); +} + } // NKikimr diff --git a/ydb/core/tx/tx_allocator_client/actor_client.h b/ydb/core/tx/tx_allocator_client/actor_client.h index 3f1f608928ca..5d5d97f33358 100644 --- a/ydb/core/tx/tx_allocator_client/actor_client.h +++ b/ydb/core/tx/tx_allocator_client/actor_client.h @@ -4,7 +4,9 @@ #include + namespace NKikimr { +struct TAppData; struct TEvTxAllocatorClient { enum EEv { @@ -49,5 +51,6 @@ struct TEvTxAllocatorClient { }; // TTxAllocatorClientEvents IActor* CreateTxAllocatorClient(TVector txAllocators); +IActor* CreateTxAllocatorClient(const TAppData* appData); } // NKikimr diff --git a/ydb/services/bg_tasks/abstract/interface.h b/ydb/services/bg_tasks/abstract/interface.h index fd556a2e60b0..cc7a512ca8b0 100644 --- a/ydb/services/bg_tasks/abstract/interface.h +++ b/ydb/services/bg_tasks/abstract/interface.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -137,16 +138,12 @@ class TCommonInterfaceContainer { template const T& GetAsSafe() const { - auto result = std::dynamic_pointer_cast(Object); - Y_ABORT_UNLESS(!!result); - return *result; + return *GetObjectPtrVerifiedAs(); } template T& GetAsSafe() { - auto result = std::dynamic_pointer_cast(Object); - Y_ABORT_UNLESS(!!result); - return *result; + return *GetObjectPtrVerifiedAs(); } std::shared_ptr GetObjectPtr() const { @@ -158,6 +155,13 @@ class TCommonInterfaceContainer { return Object; } + template + std::shared_ptr GetObjectPtrVerifiedAs() const { + auto result = std::dynamic_pointer_cast(Object); + Y_ABORT_UNLESS(!!result); + return result; + } + const IInterface& GetObjectVerified() const { AFL_VERIFY(Object); return *Object; @@ -239,6 +243,27 @@ class TInterfaceStringContainer: public TCommonInterfaceContainer { } }; +template +class TInterfaceProtoAdapter: public IBaseInterface { +private: + virtual TConclusionStatus DoDeserializeFromProto(const TProto& proto) = 0; + virtual TProto DoSerializeToProto() const = 0; +protected: + using TProtoStorage = TProto; + virtual TConclusionStatus DoDeserializeFromString(const TString& data) override final { + TProto proto; + if (!proto.ParseFromArray(data.data(), data.size())) { + return TConclusionStatus::Fail("cannot parse proto string as " + TypeName()); + } + return DoDeserializeFromProto(proto); + } + virtual TString DoSerializeToString() const override final { + TProto proto = DoSerializeToProto(); + return proto.SerializeAsString(); + } +}; + + class TDefaultJsonContainerPolicy { public: static TString GetClassName(const NJson::TJsonValue& jsonInfo) { diff --git a/ydb/services/bg_tasks/abstract/ya.make b/ydb/services/bg_tasks/abstract/ya.make index d6ea911e3ad0..4e973f9674d9 100644 --- a/ydb/services/bg_tasks/abstract/ya.make +++ b/ydb/services/bg_tasks/abstract/ya.make @@ -14,6 +14,7 @@ PEERDIR( ydb/library/actors/core ydb/public/api/protos ydb/services/bg_tasks/protos + ydb/library/conclusion ydb/core/base )