Skip to content

Commit

Permalink
Merge 903a699 into 77a85af
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored May 6, 2024
2 parents 77a85af + 903a699 commit f6687cd
Show file tree
Hide file tree
Showing 68 changed files with 2,103 additions and 22 deletions.
1 change: 1 addition & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ struct TKikimrEvents : TEvents {
ES_NEBIUS_ACCESS_SERVICE,
ES_REPLICATION_SERVICE,
ES_BACKUP_SERVICE,
ES_TX_BACKGROUND,
};
};

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/protos/counters_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,8 @@ enum ETxTypes {
TXTYPE_DATA_SHARING_APPLY_LINKS_MODIFICATION = 27 [(TxTypeOpts) = {Name: "TxDataSharingApplyLinksModification"}];
TXTYPE_DATA_SHARING_WRITE_SOURCE_CURSOR = 28 [(TxTypeOpts) = {Name: "TxDataSharingWriteSourceCursor"}];
TXTYPE_EXPORT_SAVE_CURSOR = 29 [(TxTypeOpts) = {Name: "TxExportSaveCursor"}];
TXTYPE_REMOVE_BACKGROUND_SESSION = 30 [(TxTypeOpts) = {Name: "TxRemoveBackgroundSession"}];
TXTYPE_ADD_BACKGROUND_SESSION = 31 [(TxTypeOpts) = {Name: "TxAddBackgroundSession"}];
TXTYPE_SAVE_BACKGROUND_SESSION_PROGRESS = 32 [(TxTypeOpts) = {Name: "TxSaveBackgroundSessionProgress"}];
TXTYPE_SAVE_BACKGROUND_SESSION_STATE = 33 [(TxTypeOpts) = {Name: "TxSaveBackgroundSessionState"}];
}
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/adapter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "adapter.h"

namespace NKikimr::NOlap::NBackground {

}
41 changes: 41 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once
#include "session.h"
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/library/actors/core/actorid.h>
#include <ydb/library/accessor/accessor.h>
#include <ydb/library/conclusion/status.h>

namespace NKikimr::NOlap::NBackground {

class ITabletAdapter {
private:
virtual bool DoLoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque<TSessionRecord>& records) = 0;
virtual void DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual void DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual void DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual void DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) = 0;
public:
virtual ~ITabletAdapter() = default;

void RemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) {
return DoRemoveSessionFromLocalDatabase(txc, className, identifier);
}

[[nodiscard]] bool LoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque<TSessionRecord>& records) {
return DoLoadSessionsFromLocalDatabase(txc, records);
}

void SaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
return DoSaveSessionToLocalDatabase(txc, session);
}

void SaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
return DoSaveStateToLocalDatabase(txc, session);
}

void SaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
return DoSaveProgressToLocalDatabase(txc, session);
}
};

}
33 changes: 33 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/control.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include "control.h"
#include <ydb/core/tx/columnshard/bg_tasks/protos/data.pb.h>

namespace NKikimr::NOlap::NBackground {

NKikimrTxBackgroundProto::TSessionControlContainer TSessionControlContainer::SerializeToProto() const {
NKikimrTxBackgroundProto::TSessionControlContainer result;
result.SetSessionClassName(SessionClassName);
result.SetSessionIdentifier(SessionIdentifier);
result.SetStatusChannelContainer(ChannelContainer.SerializeToString());
result.SetLogicControlContainer(LogicControlContainer.SerializeToString());
return result;
}

NKikimr::TConclusionStatus TSessionControlContainer::DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto) {
SessionClassName = proto.GetSessionClassName();
SessionIdentifier = proto.GetSessionIdentifier();
if (!SessionClassName) {
return TConclusionStatus::Fail("incorrect session class name for bg_task");
}
if (!SessionIdentifier) {
return TConclusionStatus::Fail("incorrect session id for bg_task");
}
if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) {
return TConclusionStatus::Fail("cannot parse channel from proto");
}
if (!LogicControlContainer.DeserializeFromString(proto.GetLogicControlContainer())) {
return TConclusionStatus::Fail("cannot parse logic from proto");
}
return TConclusionStatus::Success();
}

}
69 changes: 69 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/control.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#pragma once
#include "session.h"
#include "status_channel.h"
#include <ydb/services/bg_tasks/abstract/interface.h>
#include <ydb/library/accessor/accessor.h>

namespace NKikimrTxBackgroundProto {
class TSessionControlContainer;
}

namespace NKikimr::NOlap::NBackground {

class ISessionLogicControl {
private:
YDB_READONLY_DEF(TString, Identifier);
virtual TConclusionStatus DoApply(const std::shared_ptr<ISessionLogic>& session) const = 0;
virtual TConclusionStatus DoDeserializeFromString(const TString& data) = 0;
virtual TString DoSerializeToString() const = 0;
public:
using TFactory = NObjectFactory::TObjectFactory<ISessionLogicControl, TString>;

virtual ~ISessionLogicControl() = default;

TConclusionStatus DeserializeFromString(const TString& data) {
return DoDeserializeFromString(data);
}

TString SerializeToString() const {
return DoSerializeToString();
}

TConclusionStatus Apply(const std::shared_ptr<ISessionLogic>& session) const {
session->CheckStatusCorrect();
auto result = DoApply(session);
session->CheckStatusCorrect();
return result;
}

virtual TString GetClassName() const = 0;
};

class TSessionLogicControlContainer: public NBackgroundTasks::TInterfaceStringContainer<ISessionLogicControl> {
private:
using TBase = NBackgroundTasks::TInterfaceStringContainer<ISessionLogicControl>;
public:
using TBase::TBase;
};

class TSessionControlContainer {
private:
YDB_READONLY_DEF(TString, SessionClassName);
YDB_READONLY_DEF(TString, SessionIdentifier);
YDB_READONLY_DEF(TStatusChannelContainer, ChannelContainer);
YDB_READONLY_DEF(TSessionLogicControlContainer, LogicControlContainer);
public:
NKikimrTxBackgroundProto::TSessionControlContainer SerializeToProto() const;
TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto);

TSessionControlContainer() = default;

TSessionControlContainer(const TStatusChannelContainer& channel, const TSessionLogicControlContainer& logic)
: ChannelContainer(channel)
, LogicControlContainer(logic) {
AFL_VERIFY(!!ChannelContainer);
AFL_VERIFY(!!LogicControlContainer);
}
};

}
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/session.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "session.h"

namespace NKikimr::NOlap::NBackground {

}
148 changes: 148 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/session.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#pragma once
#include "status_channel.h"

#include <ydb/core/tx/columnshard/common/tablet_id.h>
#include <ydb/services/bg_tasks/abstract/interface.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/actors/core/actorid.h>
#include <ydb/library/conclusion/status.h>
#include <ydb/library/conclusion/result.h>

namespace NKikimr::NOlap::NBackground {

class TSession;
class ITabletAdapter;

class TStartContext {
private:
YDB_READONLY_DEF(TTabletId, TabletId);
YDB_READONLY_DEF(NActors::TActorId, TabletActorId);
YDB_READONLY_DEF(TStatusChannelContainer, Channel);
YDB_READONLY_DEF(std::shared_ptr<TSession>, SessionSelfPtr);
YDB_READONLY_DEF(std::shared_ptr<ITabletAdapter>, Adapter);
public:
TStartContext(const TTabletId tabletId, const NActors::TActorId& tabletActorId, const TStatusChannelContainer channel, const std::shared_ptr<TSession>& sessionSelfPtr,
const std::shared_ptr<ITabletAdapter>& adapter)
: TabletId(tabletId)
, TabletActorId(tabletActorId)
, Channel(channel)
, SessionSelfPtr(sessionSelfPtr)
, Adapter(adapter)
{

}
};

class ISessionLogic {
private:
mutable bool ActorConstructed = false;
virtual TConclusionStatus DoDeserializeProgressFromString(const TString& data) = 0;
virtual TConclusionStatus DoDeserializeStateFromString(const TString& data) = 0;
virtual TConclusionStatus DoDeserializeFromString(const TString& data) = 0;
virtual TString DoSerializeProgressToString() const = 0;
virtual TString DoSerializeStateToString() const = 0;
virtual TString DoSerializeToString() const = 0;
virtual TConclusion<std::unique_ptr<NActors::IActor>> DoCreateActor(const TStartContext& context) const = 0;
public:
using TFactory = NObjectFactory::TObjectFactory<ISessionLogic, TString>;

virtual ~ISessionLogic() = default;

virtual TString GetClassName() const = 0;

void CheckStatusCorrect() const {
}

TConclusionStatus DeserializeProgressFromString(const TString& data) {
return DoDeserializeProgressFromString(data);
}
TString SerializeProgressToString() const {
CheckStatusCorrect();
return DoSerializeProgressToString();
}
TConclusionStatus DeserializeFromString(const TString& data) {
return DoDeserializeFromString(data);
}
TString SerializeToString() const {
CheckStatusCorrect();
return DoSerializeToString();
}
TConclusionStatus DeserializeStateFromString(const TString& data) {
return DoDeserializeStateFromString(data);
}
TString SerializeStateToString() const {
CheckStatusCorrect();
return DoSerializeStateToString();
}

std::unique_ptr<NActors::IActor> CreateActor(const TStartContext& context) const {
AFL_VERIFY(IsReadyForStart());
AFL_VERIFY(!IsFinished());
AFL_VERIFY(!ActorConstructed);
ActorConstructed = true;
std::unique_ptr<NActors::IActor> result = DoCreateActor(context).DetachResult();
AFL_VERIFY(!!result);
return result;
}

virtual bool IsReadyForStart() const = 0;
virtual bool IsFinished() const = 0;
virtual bool IsReadyForRemove() const = 0;
};

template <class TProtoLogicExt, class TProtoProgressExt, class TProtoStateExt>
class TSessionProtoAdapter: public NBackgroundTasks::TInterfaceProtoAdapter<TProtoLogicExt, ISessionLogic> {
protected:
using TProtoProgress = TProtoProgressExt;
using TProtoState = TProtoStateExt;
using TProtoLogic = TProtoLogicExt;
private:
virtual TConclusionStatus DoDeserializeProgressFromProto(const TProtoProgress& proto) = 0;
virtual TProtoProgress DoSerializeProgressToProto() const = 0;
virtual TConclusionStatus DoDeserializeStateFromProto(const TProtoState& proto) = 0;
virtual TProtoState DoSerializeStateToProto() const = 0;
protected:
virtual TConclusionStatus DoDeserializeProgressFromString(const TString& data) override final {
TProtoProgress proto;
if (!proto.ParseFromArray(data.data(), data.size())) {
return TConclusionStatus::Fail("cannot parse proto string as " + TypeName<TProtoProgress>());
}
return DoDeserializeProgressFromProto(proto);
}
virtual TString DoSerializeProgressToString() const override final {
TProtoProgress proto = DoSerializeProgressToProto();
return proto.SerializeAsString();
}
virtual TConclusionStatus DoDeserializeStateFromString(const TString& data) override final {
TProtoState proto;
if (!proto.ParseFromArray(data.data(), data.size())) {
return TConclusionStatus::Fail("cannot parse proto string as " + TypeName<TProtoState>());
}
return DoDeserializeStateFromProto(proto);
}
virtual TString DoSerializeStateToString() const override final {
TProtoState proto = DoSerializeStateToProto();
return proto.SerializeAsString();
}
};

class TSessionLogicContainer: public NBackgroundTasks::TInterfaceStringContainer<ISessionLogic> {
private:
using TBase = NBackgroundTasks::TInterfaceStringContainer<ISessionLogic>;
public:
using TBase::TBase;
};

class TSessionRecord {
private:
YDB_ACCESSOR_DEF(TString, Identifier);
YDB_ACCESSOR_DEF(TString, ClassName);
YDB_ACCESSOR_DEF(TString, LogicDescription);
YDB_ACCESSOR_DEF(TString, StatusChannel);
YDB_ACCESSOR_DEF(TString, Progress);
YDB_ACCESSOR_DEF(TString, State);
public:
};

}
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "status_channel.h"

namespace NKikimr::NOlap::NBackground {

}
Loading

0 comments on commit f6687cd

Please sign in to comment.