Skip to content

Commit

Permalink
provide bg tasks flow into schemeshard
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed May 6, 2024
1 parent 04c59b3 commit 903a699
Show file tree
Hide file tree
Showing 38 changed files with 825 additions and 92 deletions.
23 changes: 9 additions & 14 deletions ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,31 @@ namespace NKikimr::NOlap::NBackground {

class ITabletAdapter {
private:
YDB_READONLY_DEF(NActors::TActorId, TabletActorId);
virtual bool DoLoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque<TSessionRecord>& records) = 0;
virtual TConclusionStatus DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual TConclusionStatus DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual TConclusionStatus DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual TConclusionStatus DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) = 0;
virtual void DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual void DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual void DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual void DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) = 0;
public:
ITabletAdapter(const NActors::TActorId& actorId)
: TabletActorId(actorId)
{
virtual ~ITabletAdapter() = default;

}

[[nodiscard]] TConclusionStatus RemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) {
void RemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) {
return DoRemoveSessionFromLocalDatabase(txc, className, identifier);
}

[[nodiscard]] bool LoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque<TSessionRecord>& records) {
return DoLoadSessionsFromLocalDatabase(txc, records);
}

[[nodiscard]] TConclusionStatus SaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
void SaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
return DoSaveSessionToLocalDatabase(txc, session);
}

[[nodiscard]] TConclusionStatus SaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
void SaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
return DoSaveStateToLocalDatabase(txc, session);
}

[[nodiscard]] TConclusionStatus SaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
void SaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
return DoSaveProgressToLocalDatabase(txc, session);
}
};
Expand Down
28 changes: 28 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/control.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
#include "control.h"
#include <ydb/core/tx/columnshard/bg_tasks/protos/data.pb.h>

namespace NKikimr::NOlap::NBackground {

NKikimrTxBackgroundProto::TSessionControlContainer TSessionControlContainer::SerializeToProto() const {
NKikimrTxBackgroundProto::TSessionControlContainer result;
result.SetSessionClassName(SessionClassName);
result.SetSessionIdentifier(SessionIdentifier);
result.SetStatusChannelContainer(ChannelContainer.SerializeToString());
result.SetLogicControlContainer(LogicControlContainer.SerializeToString());
return result;
}

NKikimr::TConclusionStatus TSessionControlContainer::DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto) {
SessionClassName = proto.GetSessionClassName();
SessionIdentifier = proto.GetSessionIdentifier();
if (!SessionClassName) {
return TConclusionStatus::Fail("incorrect session class name for bg_task");
}
if (!SessionIdentifier) {
return TConclusionStatus::Fail("incorrect session id for bg_task");
}
if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) {
return TConclusionStatus::Fail("cannot parse channel from proto");
}
if (!LogicControlContainer.DeserializeFromString(proto.GetLogicControlContainer())) {
return TConclusionStatus::Fail("cannot parse logic from proto");
}
return TConclusionStatus::Success();
}

}
32 changes: 6 additions & 26 deletions ydb/core/tx/columnshard/bg_tasks/abstract/control.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#pragma once
#include "session.h"
#include "status_channel.h"
#include <ydb/core/tx/columnshard/bg_tasks/protos/data.pb.h>
#include <ydb/services/bg_tasks/abstract/interface.h>
#include <ydb/library/accessor/accessor.h>

namespace NKikimrTxBackgroundProto {
class TSessionControlContainer;
}

namespace NKikimr::NOlap::NBackground {

class ISessionLogicControl {
Expand Down Expand Up @@ -50,31 +53,8 @@ class TSessionControlContainer {
YDB_READONLY_DEF(TStatusChannelContainer, ChannelContainer);
YDB_READONLY_DEF(TSessionLogicControlContainer, LogicControlContainer);
public:
NKikimrTxBackgroundProto::TSessionControlContainer SerializeToProto() const {
NKikimrTxBackgroundProto::TSessionControlContainer result;
result.SetSessionClassName(SessionClassName);
result.SetSessionIdentifier(SessionIdentifier);
result.SetStatusChannelContainer(ChannelContainer.SerializeToString());
result.SetLogicControlContainer(LogicControlContainer.SerializeToString());
return result;
}
TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto) {
SessionClassName = proto.GetSessionClassName();
SessionIdentifier = proto.GetSessionIdentifier();
if (!SessionClassName) {
return TConclusionStatus::Fail("incorrect session class name for bg_task");
}
if (!SessionIdentifier) {
return TConclusionStatus::Fail("incorrect session id for bg_task");
}
if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) {
return TConclusionStatus::Fail("cannot parse channel from proto");
}
if (!LogicControlContainer.DeserializeFromString(proto.GetLogicControlContainer())) {
return TConclusionStatus::Fail("cannot parse logic from proto");
}
return TConclusionStatus::Success();
}
NKikimrTxBackgroundProto::TSessionControlContainer SerializeToProto() const;
TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto);

TSessionControlContainer() = default;

Expand Down
18 changes: 14 additions & 4 deletions ydb/core/tx/columnshard/bg_tasks/abstract/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,24 @@

namespace NKikimr::NOlap::NBackground {

class TSession;
class ITabletAdapter;

class TStartContext {
private:
YDB_READONLY_DEF(TTabletId, TabletId);
YDB_READONLY_DEF(NActors::TActorId, TabletActorId);
YDB_READONLY_DEF(TStatusChannelContainer, Channel);
YDB_READONLY_DEF(std::shared_ptr<TSession>, SessionSelfPtr);
YDB_READONLY_DEF(std::shared_ptr<ITabletAdapter>, Adapter);
public:
TStartContext(const NActors::TActorId& tabletActorId, const TStatusChannelContainer channel)
: TabletActorId(tabletActorId)
TStartContext(const TTabletId tabletId, const NActors::TActorId& tabletActorId, const TStatusChannelContainer channel, const std::shared_ptr<TSession>& sessionSelfPtr,
const std::shared_ptr<ITabletAdapter>& adapter)
: TabletId(tabletId)
, TabletActorId(tabletActorId)
, Channel(channel)
, SessionSelfPtr(sessionSelfPtr)
, Adapter(adapter)
{

}
Expand Down Expand Up @@ -82,7 +92,7 @@ class ISessionLogic {
};

template <class TProtoLogicExt, class TProtoProgressExt, class TProtoStateExt>
class TSessionProtoAdapter: public TInterfaceProtoAdapter<TProtoLogicExt, ISessionLogic> {
class TSessionProtoAdapter: public NBackgroundTasks::TInterfaceProtoAdapter<TProtoLogicExt, ISessionLogic> {
protected:
using TProtoProgress = TProtoProgressExt;
using TProtoState = TProtoStateExt;
Expand All @@ -101,7 +111,7 @@ class TSessionProtoAdapter: public TInterfaceProtoAdapter<TProtoLogicExt, ISessi
return DoDeserializeProgressFromProto(proto);
}
virtual TString DoSerializeProgressToString() const override final {
TProtoProgress proto = DoSerializeToProto();
TProtoProgress proto = DoSerializeProgressToProto();
return proto.SerializeAsString();
}
virtual TConclusionStatus DoDeserializeStateFromString(const TString& data) override final {
Expand Down
23 changes: 23 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/task.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
#include "task.h"
#include <ydb/core/tx/columnshard/bg_tasks/protos/data.pb.h>

namespace NKikimr::NOlap::NBackground {

NKikimrTxBackgroundProto::TTaskContainer TTask::SerializeToProto() const {
NKikimrTxBackgroundProto::TTaskContainer result;
result.SetIdentifier(Identifier);
result.SetStatusChannelContainer(ChannelContainer.SerializeToString());
result.SetTaskDescriptionContainer(DescriptionContainer.SerializeToString());
return result;
}

NKikimr::TConclusionStatus TTask::DeserializeFromProto(const NKikimrTxBackgroundProto::TTaskContainer& proto) {
Identifier = proto.GetIdentifier();
if (!Identifier) {
return TConclusionStatus::Fail("empty identifier is not correct for bg_task");
}
if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) {
return TConclusionStatus::Fail("cannot parse status channel from proto");
}
if (!DescriptionContainer.DeserializeFromString(proto.GetTaskDescriptionContainer())) {
return TConclusionStatus::Fail("cannot parse task description from proto");
}
return TConclusionStatus::Success();
}

}
27 changes: 6 additions & 21 deletions ydb/core/tx/columnshard/bg_tasks/abstract/task.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#pragma once
#include "session.h"
#include <ydb/core/tx/columnshard/bg_tasks/protos/data.pb.h>
#include <ydb/services/bg_tasks/abstract/interface.h>
#include <ydb/library/accessor/accessor.h>

namespace NKikimrTxBackgroundProto {
class TTaskContainer;
}

namespace NKikimr::NOlap::NBackground {

class ITaskDescription {
Expand Down Expand Up @@ -54,26 +57,8 @@ class TTask {
AFL_VERIFY(!!ChannelContainer);
AFL_VERIFY(!!DescriptionContainer);
}
NKikimrTxBackgroundProto::TTaskContainer SerializeToProto() const {
NKikimrTxBackgroundProto::TTaskContainer result;
result.SetIdentifier(Identifier);
result.SetStatusChannelContainer(ChannelContainer.SerializeToString());
result.SetTaskDescriptionContainer(DescriptionContainer.SerializeToString());
return result;
}
TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TTaskContainer& proto) {
Identifier = proto.GetIdentifier();
if (!Identifier) {
return TConclusionStatus::Fail("empty identifier is not correct for bg_task");
}
if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) {
return TConclusionStatus::Fail("cannot parse status channel from proto");
}
if (!DescriptionContainer.DeserializeFromString(proto.GetTaskDescriptionContainer())) {
return TConclusionStatus::Fail("cannot parse task description from proto");
}
return TConclusionStatus::Success();
}
NKikimrTxBackgroundProto::TTaskContainer SerializeToProto() const;
TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TTaskContainer& proto);
};

}
9 changes: 5 additions & 4 deletions ydb/core/tx/columnshard/bg_tasks/manager/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ namespace NKikimr::NOlap::NBackground {

class TSessionActor: public NActors::TActorBootstrapped<TSessionActor> {
private:
const TTabletId TabletId;
const NActors::TActorId TabletActorId;
ui64 TxCounter = 0;
std::optional<ui64> SaveSessionProgressTx;
std::optional<ui64> SaveSessionStateTx;
std::shared_ptr<ITabletAdapter> Adapter;
virtual void OnTxCompleted(const ui64 txInternalId) = 0;
virtual void OnSessionProgressSaved() = 0;
virtual void OnSessionStateSaved() = 0;
virtual void OnBootstrap(const TActorContext& ctx) = 0;
protected:
const TTabletId TabletId;
const NActors::TActorId TabletActorId;
std::shared_ptr<ITabletAdapter> Adapter;
std::shared_ptr<TSession> Session;
ui64 GetNextTxId() {
return ++TxCounter;
Expand All @@ -35,8 +35,9 @@ class TSessionActor: public NActors::TActorBootstrapped<TSessionActor> {

void SaveSessionState();
public:
TSessionActor(const TTabletId tabletId, const std::shared_ptr<TSession>& session, const std::shared_ptr<ITabletAdapter>& adapter)
TSessionActor(const TTabletId tabletId, const NActors::TActorId tabletActorId, const std::shared_ptr<TSession>& session, const std::shared_ptr<ITabletAdapter>& adapter)
: TabletId(tabletId)
, TabletActorId(tabletActorId)
, Adapter(adapter)
, Session(session)
{
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/bg_tasks/manager/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/core/tx/columnshard/bg_tasks/transactions/tx_save_state.h>
#include <ydb/core/tx/columnshard/bg_tasks/transactions/tx_add.h>
#include <ydb/core/tx/columnshard/bg_tasks/transactions/tx_remove.h>
#include <ydb/core/tx/columnshard/bg_tasks/protos/data.pb.h>

namespace NKikimr::NOlap::NBackground {

Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/columnshard/bg_tasks/manager/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
#include <ydb/core/tx/columnshard/bg_tasks/abstract/session.h>
#include <ydb/core/tx/columnshard/bg_tasks/session/session.h>
#include <ydb/core/tx/columnshard/bg_tasks/session/storage.h>
#include <ydb/core/tx/columnshard/bg_tasks/protos/data.pb.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>

namespace NKikimrTxBackgroundProto {
class TTaskContainer;
class TSessionControlContainer;
}

namespace NKikimr::NOlap::NBackground {

class TSessionsManager {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class TSession {
return result;
}

template <class T>
std::shared_ptr<T> GetLogicAsVerifiedPtr() const {
return LogicContainer.GetObjectPtrVerifiedAs<T>();
}

[[nodiscard]] TConclusionStatus DeserializeFromLocalDatabase(TSessionRecord&& record) {
Identifier = record.GetIdentifier();
ChannelContainer.DeserializeFromString(record.GetStatusChannel());
Expand Down
65 changes: 65 additions & 0 deletions ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include "adapter.h"
#include <ydb/core/tx/schemeshard/schemeshard_schema.h>

namespace NKikimr::NSchemeShard::NBackground {

bool TAdapter::DoLoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque<NKikimr::NOlap::NBackground::TSessionRecord>& records) {
NIceDb::TNiceDb db(txc.DB);
using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions;
auto rowset = db.Table<BackgroundSessions>().Select();
if (!rowset.IsReady()) {
return false;
}

std::deque<NOlap::NBackground::TSessionRecord> result;
while (!rowset.EndOfSet()) {
NOlap::NBackground::TSessionRecord sRecord;
sRecord.SetClassName(rowset.GetValue<BackgroundSessions::ClassName>());
sRecord.SetIdentifier(rowset.GetValue<BackgroundSessions::Identifier>());
sRecord.SetLogicDescription(rowset.GetValue<BackgroundSessions::LogicDescription>());
sRecord.SetStatusChannel(rowset.GetValue<BackgroundSessions::StatusChannel>());
sRecord.SetProgress(rowset.GetValue<BackgroundSessions::Progress>());
sRecord.SetState(rowset.GetValue<BackgroundSessions::State>());
result.emplace_back(std::move(sRecord));
if (!rowset.Next()) {
return false;
}
}
std::swap(result, records);
return true;
}

void TAdapter::DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) {
NIceDb::TNiceDb db(txc.DB);
using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions;
db.Table<BackgroundSessions>().Key(container.GetClassName(), container.GetIdentifier()).Update(
NIceDb::TUpdate<BackgroundSessions::Progress>(container.GetProgress())
);
}

void TAdapter::DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) {
NIceDb::TNiceDb db(txc.DB);
using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions;
db.Table<BackgroundSessions>().Key(container.GetClassName(), container.GetIdentifier()).Update(
NIceDb::TUpdate<BackgroundSessions::State>(container.GetState())
);
}

void TAdapter::DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) {
NIceDb::TNiceDb db(txc.DB);
using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions;
db.Table<BackgroundSessions>().Key(container.GetClassName(), container.GetIdentifier()).Update(
NIceDb::TUpdate<BackgroundSessions::LogicDescription>(container.GetLogicDescription()),
NIceDb::TUpdate<BackgroundSessions::StatusChannel>(container.GetStatusChannel()),
NIceDb::TUpdate<BackgroundSessions::Progress>(container.GetProgress()),
NIceDb::TUpdate<BackgroundSessions::State>(container.GetState())
);
}

void TAdapter::DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) {
NIceDb::TNiceDb db(txc.DB);
using BackgroundSessions = NSchemeShard::Schema::BackgroundSessions;
db.Table<BackgroundSessions>().Key(className, identifier).Delete();
}

}
14 changes: 14 additions & 0 deletions ydb/core/tx/schemeshard/olap/bg_tasks/adapter/adapter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once
#include <ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h>

namespace NKikimr::NSchemeShard::NBackground {

class TAdapter: public NKikimr::NOlap::NBackground::ITabletAdapter {
protected:
virtual bool DoLoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque<NKikimr::NOlap::NBackground::TSessionRecord>& records) override;
virtual void DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) override;
virtual void DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) override;
virtual void DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const NKikimr::NOlap::NBackground::TSessionRecord& container) override;
virtual void DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) override;;
};
}
Loading

0 comments on commit 903a699

Please sign in to comment.