Skip to content

Commit

Permalink
Merge f2e0dad into b9e6764
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored May 27, 2024
2 parents b9e6764 + f2e0dad commit 05ecdcb
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 143 deletions.
190 changes: 73 additions & 117 deletions ydb/core/kqp/proxy_service/kqp_script_executions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,140 +79,96 @@ class TQueryBase : public NKikimr::TQueryBase {
};


class TScriptExecutionsTablesCreator : public TActorBootstrapped<TScriptExecutionsTablesCreator> {
class TScriptExecutionsTablesCreator : public NTableCreator::TMultiTableCreator {
using TBase = NTableCreator::TMultiTableCreator;

public:
explicit TScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent)
: ResultEvent(std::move(resultEvent))
{
}

void Registered(NActors::TActorSystem* sys, const NActors::TActorId& owner) override {
NActors::TActorBootstrapped<TScriptExecutionsTablesCreator>::Registered(sys, owner);
Owner = owner;
}

void Bootstrap() {
Become(&TScriptExecutionsTablesCreator::StateFunc);
RunCreateScriptExecutions();
RunCreateScriptExecutionLeases();
RunCreateScriptResultSets();
}
: TBase({
GetScriptExecutionsCreator(),
GetScriptExecutionLeasesCreator(),
GetScriptResultSetsCreator()
})
, ResultEvent(std::move(resultEvent))
{}

private:
static NKikimrSchemeOp::TColumnDescription Col(const TString& columnName, const char* columnType) {
NKikimrSchemeOp::TColumnDescription desc;
desc.SetName(columnName);
desc.SetType(columnType);
return desc;
}

static NKikimrSchemeOp::TColumnDescription Col(const TString& columnName, NScheme::TTypeId columnType) {
return Col(columnName, NScheme::TypeName(columnType));
}

static NKikimrSchemeOp::TTTLSettings TtlCol(const TString& columnName) {
NKikimrSchemeOp::TTTLSettings settings;
settings.MutableEnabled()->SetExpireAfterSeconds(DEADLINE_OFFSET.Seconds());
settings.MutableEnabled()->SetColumnName(columnName);
settings.MutableEnabled()->MutableSysSettings()->SetRunInterval(BRO_RUN_INTERVAL.MicroSeconds());
return settings;
}

void RunCreateScriptExecutions() {
TablesCreating++;
Register(
CreateTableCreator(
{ ".metadata", "script_executions" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("run_script_actor_id", NScheme::NTypeIds::Text),
Col("operation_status", NScheme::NTypeIds::Int32),
Col("execution_status", NScheme::NTypeIds::Int32),
Col("finalization_status", NScheme::NTypeIds::Int32),
Col("execution_mode", NScheme::NTypeIds::Int32),
Col("start_ts", NScheme::NTypeIds::Timestamp),
Col("end_ts", NScheme::NTypeIds::Timestamp),
Col("query_text", NScheme::NTypeIds::Text),
Col("syntax", NScheme::NTypeIds::Int32),
Col("ast", NScheme::NTypeIds::Text),
Col("ast_compressed", NScheme::NTypeIds::String),
Col("ast_compression_method", NScheme::NTypeIds::Text),
Col("issues", NScheme::NTypeIds::JsonDocument),
Col("plan", NScheme::NTypeIds::JsonDocument),
Col("meta", NScheme::NTypeIds::JsonDocument),
Col("parameters", NScheme::NTypeIds::String), // TODO: store aparameters separately to support bigger storage.
Col("result_set_metas", NScheme::NTypeIds::JsonDocument),
Col("stats", NScheme::NTypeIds::JsonDocument),
Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
Col("customer_supplied_id", NScheme::NTypeIds::Text),
Col("user_token", NScheme::NTypeIds::Text),
Col("script_sinks", NScheme::NTypeIds::JsonDocument),
Col("script_secret_names", NScheme::NTypeIds::JsonDocument),
},
{ "database", "execution_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at")
)
static IActor* GetScriptExecutionsCreator() {
return CreateTableCreator(
{ ".metadata", "script_executions" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("run_script_actor_id", NScheme::NTypeIds::Text),
Col("operation_status", NScheme::NTypeIds::Int32),
Col("execution_status", NScheme::NTypeIds::Int32),
Col("finalization_status", NScheme::NTypeIds::Int32),
Col("execution_mode", NScheme::NTypeIds::Int32),
Col("start_ts", NScheme::NTypeIds::Timestamp),
Col("end_ts", NScheme::NTypeIds::Timestamp),
Col("query_text", NScheme::NTypeIds::Text),
Col("syntax", NScheme::NTypeIds::Int32),
Col("ast", NScheme::NTypeIds::Text),
Col("ast_compressed", NScheme::NTypeIds::String),
Col("ast_compression_method", NScheme::NTypeIds::Text),
Col("issues", NScheme::NTypeIds::JsonDocument),
Col("plan", NScheme::NTypeIds::JsonDocument),
Col("meta", NScheme::NTypeIds::JsonDocument),
Col("parameters", NScheme::NTypeIds::String), // TODO: store aparameters separately to support bigger storage.
Col("result_set_metas", NScheme::NTypeIds::JsonDocument),
Col("stats", NScheme::NTypeIds::JsonDocument),
Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
Col("customer_supplied_id", NScheme::NTypeIds::Text),
Col("user_token", NScheme::NTypeIds::Text),
Col("script_sinks", NScheme::NTypeIds::JsonDocument),
Col("script_secret_names", NScheme::NTypeIds::JsonDocument),
},
{ "database", "execution_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at", DEADLINE_OFFSET, BRO_RUN_INTERVAL)
);
}

void RunCreateScriptExecutionLeases() {
TablesCreating++;
Register(
CreateTableCreator(
{ ".metadata", "script_execution_leases" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("lease_deadline", NScheme::NTypeIds::Timestamp),
Col("lease_generation", NScheme::NTypeIds::Int64),
Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
},
{ "database", "execution_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at")
)
static IActor* GetScriptExecutionLeasesCreator() {
return CreateTableCreator(
{ ".metadata", "script_execution_leases" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("lease_deadline", NScheme::NTypeIds::Timestamp),
Col("lease_generation", NScheme::NTypeIds::Int64),
Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
},
{ "database", "execution_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at", DEADLINE_OFFSET, BRO_RUN_INTERVAL)
);
}

void RunCreateScriptResultSets() {
TablesCreating++;
Register(
CreateTableCreator(
{ ".metadata", "result_sets" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("result_set_id", NScheme::NTypeIds::Int32),
Col("row_id", NScheme::NTypeIds::Int64),
Col("expire_at", NScheme::NTypeIds::Timestamp),
Col("result_set", NScheme::NTypeIds::String),
Col("accumulated_size", NScheme::NTypeIds::Int64),
},
{ "database", "execution_id", "result_set_id", "row_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at")
)
static IActor* GetScriptResultSetsCreator() {
return CreateTableCreator(
{ ".metadata", "result_sets" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("result_set_id", NScheme::NTypeIds::Int32),
Col("row_id", NScheme::NTypeIds::Int64),
Col("expire_at", NScheme::NTypeIds::Timestamp),
Col("result_set", NScheme::NTypeIds::String),
Col("accumulated_size", NScheme::NTypeIds::Int64),
},
{ "database", "execution_id", "result_set_id", "row_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at", DEADLINE_OFFSET, BRO_RUN_INTERVAL)
);
}

void Handle(TEvTableCreator::TEvCreateTableResponse::TPtr&) {
Y_ABORT_UNLESS(TablesCreating > 0);
if (--TablesCreating == 0) {
Send(Owner, std::move(ResultEvent));
PassAway();
}
void OnTablesCreated() override {
Send(Owner, std::move(ResultEvent));
}

STRICT_STFUNC(StateFunc,
hFunc(TEvTableCreator::TEvCreateTableResponse, Handle);
)

private:
THolder<NActors::IEventBase> ResultEvent;
NActors::TActorId Owner;
size_t TablesCreating = 0;
};

Ydb::Query::ExecMode GetExecModeFromAction(NKikimrKqp::EQueryAction action) {
Expand Down
53 changes: 53 additions & 0 deletions ydb/library/table_creator/table_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,59 @@ using TTableCreatorRetryPolicy = IRetryPolicy<bool>;

} // namespace

namespace NTableCreator {

NKikimrSchemeOp::TColumnDescription TMultiTableCreator::Col(const TString& columnName, const char* columnType) {
NKikimrSchemeOp::TColumnDescription desc;
desc.SetName(columnName);
desc.SetType(columnType);
return desc;
}

NKikimrSchemeOp::TColumnDescription TMultiTableCreator::Col(const TString& columnName, NScheme::TTypeId columnType) {
return Col(columnName, NScheme::TypeName(columnType));
}

NKikimrSchemeOp::TTTLSettings TMultiTableCreator::TtlCol(const TString& columnName, TDuration expireAfter, TDuration runInterval) {
NKikimrSchemeOp::TTTLSettings settings;
settings.MutableEnabled()->SetExpireAfterSeconds(expireAfter.Seconds());
settings.MutableEnabled()->SetColumnName(columnName);
settings.MutableEnabled()->MutableSysSettings()->SetRunInterval(runInterval.MicroSeconds());
return settings;
}

TMultiTableCreator::TMultiTableCreator(std::vector<NActors::IActor*> tableCreators)
: TableCreators(tableCreators)
{}

void TMultiTableCreator::Registered(NActors::TActorSystem* sys, const NActors::TActorId& owner) {
TBase::Registered(sys, owner);
Owner = owner;
}

void TMultiTableCreator::Bootstrap() {
Become(&TMultiTableCreator::StateFunc);

TablesCreating = TableCreators.size();
for (const auto creator : TableCreators) {
Register(creator);
}
}

void TMultiTableCreator::Handle(TEvTableCreator::TEvCreateTableResponse::TPtr&) {
Y_ABORT_UNLESS(TablesCreating > 0);
if (--TablesCreating == 0) {
OnTablesCreated();
PassAway();
}
}

STRICT_STFUNC(TMultiTableCreator::StateFunc,
hFunc(TEvTableCreator::TEvCreateTableResponse, Handle);
);

} // namespace NTableCreator

NActors::IActor* CreateTableCreator(
TVector<TString> pathComponents,
TVector<NKikimrSchemeOp::TColumnDescription> columns,
Expand Down
39 changes: 39 additions & 0 deletions ydb/library/table_creator/table_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
#include <ydb/library/services/services.pb.h>

#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>

#include <ydb/public/lib/scheme_types/scheme_type_id.h>

namespace NKikimr {

Expand All @@ -18,6 +21,42 @@ struct TEvTableCreator {
};
};

namespace NTableCreator {

class TMultiTableCreator : public NActors::TActorBootstrapped<TMultiTableCreator> {
using TBase = NActors::TActorBootstrapped<TMultiTableCreator>;

public:
explicit TMultiTableCreator(std::vector<NActors::IActor*> tableCreators);

void Bootstrap();

protected:
virtual void OnTablesCreated() = 0;

static NKikimrSchemeOp::TColumnDescription Col(const TString& columnName, const char* columnType);

static NKikimrSchemeOp::TColumnDescription Col(const TString& columnName, NScheme::TTypeId columnType);

static NKikimrSchemeOp::TTTLSettings TtlCol(const TString& columnName, TDuration expireAfter, TDuration runInterval);

private:
void Registered(NActors::TActorSystem* sys, const NActors::TActorId& owner) override;

void Handle(TEvTableCreator::TEvCreateTableResponse::TPtr&);

STFUNC(StateFunc);

protected:
NActors::TActorId Owner;

private:
std::vector<NActors::IActor*> TableCreators;
size_t TablesCreating = 0;
};

} // namespace NTableCreator

NActors::IActor* CreateTableCreator(
TVector<TString> pathComponents,
TVector<NKikimrSchemeOp::TColumnDescription> columns,
Expand Down
Loading

0 comments on commit 05ecdcb

Please sign in to comment.