Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor lazy table creation for WM service #4900

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,4 +355,14 @@ struct TEvDescribeSecretsResponse : public NActors::TEventLocal<TEvDescribeSecre
TDescription Description;
};

struct TEvScriptExecutionsTablesCreationFinished : public NActors::TEventLocal<TEvScriptExecutionsTablesCreationFinished, TKqpScriptExecutionEvents::EvScriptExecutionsTableCreationFinished> {
TEvScriptExecutionsTablesCreationFinished(bool success, NYql::TIssues issues)
: Success(success)
, Issues(std::move(issues))
{}

const bool Success;
const NYql::TIssues Issues;
};

} // namespace NKikimr::NKqp
1 change: 1 addition & 0 deletions ydb/core/kqp/common/simple/kqp_event_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ struct TKqpScriptExecutionEvents {
EvGetScriptExecutionOperationQueryResponse,
EvDescribeSecretsResponse,
EvSaveScriptResultPartFinished,
EvScriptExecutionsTableCreationFinished,
};
};

Expand Down
41 changes: 30 additions & 11 deletions ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
EvOnRequestTimeout,
EvCloseIdleSessions,
EvResourcesSnapshot,
EvScriptExecutionsTableCreationFinished,
};

struct TEvReadyToPublishResources : public TEventLocal<TEvReadyToPublishResources, EEv::EvReadyToPublishResources> {};
Expand Down Expand Up @@ -169,10 +168,6 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot)
: Snapshot(std::move(snapshot)) {}
};

struct TEvScriptExecutionsTablesCreationFinished : public NActors::TEventLocal<TEvScriptExecutionsTablesCreationFinished, EvScriptExecutionsTableCreationFinished> {
TEvScriptExecutionsTablesCreationFinished() = default;
};
};

public:
Expand Down Expand Up @@ -1320,7 +1315,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle);
hFunc(TEvKqp::TEvCreateSessionResponse, ForwardEvent);
hFunc(TEvPrivate::TEvCloseIdleSessions, Handle);
hFunc(TEvPrivate::TEvScriptExecutionsTablesCreationFinished, Handle);
hFunc(TEvScriptExecutionsTablesCreationFinished, Handle);
hFunc(NKqp::TEvForgetScriptExecutionOperation, Handle);
hFunc(NKqp::TEvGetScriptExecutionOperation, Handle);
hFunc(NKqp::TEvListScriptExecutionOperations, Handle);
Expand Down Expand Up @@ -1582,11 +1577,16 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
switch (ScriptExecutionsCreationStatus) {
case EScriptExecutionsCreationStatus::NotStarted:
ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Pending;
Register(CreateScriptExecutionsTablesCreator(MakeHolder<TEvPrivate::TEvScriptExecutionsTablesCreationFinished>()), TMailboxType::HTSwap, AppData()->SystemPoolId);
Register(CreateScriptExecutionsTablesCreator(), TMailboxType::HTSwap, AppData()->SystemPoolId);
[[fallthrough]];
case EScriptExecutionsCreationStatus::Pending:
if (DelayedEventsQueue.size() < 10000) {
DelayedEventsQueue.emplace_back(std::move(ev));
DelayedEventsQueue.push_back({
.Event = std::move(ev),
.ResponseBuilder = [](Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
return new TResponse(status, std::move(issues));
}
});
} else {
NYql::TIssues issues;
issues.AddIssue("Too many queued requests");
Expand All @@ -1598,10 +1598,25 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
}
}

void Handle(TEvPrivate::TEvScriptExecutionsTablesCreationFinished::TPtr&) {
void Handle(TEvScriptExecutionsTablesCreationFinished::TPtr& ev) {
ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Finished;

NYql::TIssue rootIssue;
if (!ev->Get()->Success) {
ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::NotStarted;
rootIssue.SetMessage("Failed to create script execution tables");
for (const NYql::TIssue& issue : ev->Get()->Issues) {
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue));
}
}

while (!DelayedEventsQueue.empty()) {
Send(std::move(DelayedEventsQueue.front()));
auto delayedEvent = std::move(DelayedEventsQueue.front());
if (ev->Get()->Success) {
Send(std::move(delayedEvent.Event));
} else {
Send(delayedEvent.Event->Sender, delayedEvent.ResponseBuilder(Ydb::StatusIds::INTERNAL_ERROR, {rootIssue}));
}
DelayedEventsQueue.pop_front();
}
}
Expand Down Expand Up @@ -1765,8 +1780,12 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
Pending,
Finished,
};
struct TDelayedEvent {
THolder<IEventHandle> Event;
std::function<IEventBase*(Ydb::StatusIds::StatusCode, NYql::TIssues)> ResponseBuilder;
};
EScriptExecutionsCreationStatus ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::NotStarted;
std::deque<THolder<IEventHandle>> DelayedEventsQueue;
std::deque<TDelayedEvent> DelayedEventsQueue;
bool IsLookupByRmScheduled = false;
TActorId KqpTempTablesAgentActor;
};
Expand Down
198 changes: 75 additions & 123 deletions ydb/core/kqp/proxy_service/kqp_script_executions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,140 +79,92 @@ class TQueryBase : public NKikimr::TQueryBase {
};


class TScriptExecutionsTablesCreator : public TActorBootstrapped<TScriptExecutionsTablesCreator> {
public:
explicit TScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent)
: ResultEvent(std::move(resultEvent))
{
}

void Registered(NActors::TActorSystem* sys, const NActors::TActorId& owner) override {
NActors::TActorBootstrapped<TScriptExecutionsTablesCreator>::Registered(sys, owner);
Owner = owner;
}
class TScriptExecutionsTablesCreator : public NTableCreator::TMultiTableCreator {
using TBase = NTableCreator::TMultiTableCreator;

void Bootstrap() {
Become(&TScriptExecutionsTablesCreator::StateFunc);
RunCreateScriptExecutions();
RunCreateScriptExecutionLeases();
RunCreateScriptResultSets();
}
public:
explicit TScriptExecutionsTablesCreator()
: TBase({
GetScriptExecutionsCreator(),
GetScriptExecutionLeasesCreator(),
GetScriptResultSetsCreator()
})
{}

private:
static NKikimrSchemeOp::TColumnDescription Col(const TString& columnName, const char* columnType) {
NKikimrSchemeOp::TColumnDescription desc;
desc.SetName(columnName);
desc.SetType(columnType);
return desc;
}

static NKikimrSchemeOp::TColumnDescription Col(const TString& columnName, NScheme::TTypeId columnType) {
return Col(columnName, NScheme::TypeName(columnType));
}

static NKikimrSchemeOp::TTTLSettings TtlCol(const TString& columnName) {
NKikimrSchemeOp::TTTLSettings settings;
settings.MutableEnabled()->SetExpireAfterSeconds(DEADLINE_OFFSET.Seconds());
settings.MutableEnabled()->SetColumnName(columnName);
settings.MutableEnabled()->MutableSysSettings()->SetRunInterval(BRO_RUN_INTERVAL.MicroSeconds());
return settings;
}

void RunCreateScriptExecutions() {
TablesCreating++;
Register(
CreateTableCreator(
{ ".metadata", "script_executions" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("run_script_actor_id", NScheme::NTypeIds::Text),
Col("operation_status", NScheme::NTypeIds::Int32),
Col("execution_status", NScheme::NTypeIds::Int32),
Col("finalization_status", NScheme::NTypeIds::Int32),
Col("execution_mode", NScheme::NTypeIds::Int32),
Col("start_ts", NScheme::NTypeIds::Timestamp),
Col("end_ts", NScheme::NTypeIds::Timestamp),
Col("query_text", NScheme::NTypeIds::Text),
Col("syntax", NScheme::NTypeIds::Int32),
Col("ast", NScheme::NTypeIds::Text),
Col("ast_compressed", NScheme::NTypeIds::String),
Col("ast_compression_method", NScheme::NTypeIds::Text),
Col("issues", NScheme::NTypeIds::JsonDocument),
Col("plan", NScheme::NTypeIds::JsonDocument),
Col("meta", NScheme::NTypeIds::JsonDocument),
Col("parameters", NScheme::NTypeIds::String), // TODO: store aparameters separately to support bigger storage.
Col("result_set_metas", NScheme::NTypeIds::JsonDocument),
Col("stats", NScheme::NTypeIds::JsonDocument),
Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
Col("customer_supplied_id", NScheme::NTypeIds::Text),
Col("user_token", NScheme::NTypeIds::Text),
Col("script_sinks", NScheme::NTypeIds::JsonDocument),
Col("script_secret_names", NScheme::NTypeIds::JsonDocument),
},
{ "database", "execution_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at")
)
static IActor* GetScriptExecutionsCreator() {
return CreateTableCreator(
{ ".metadata", "script_executions" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("run_script_actor_id", NScheme::NTypeIds::Text),
Col("operation_status", NScheme::NTypeIds::Int32),
Col("execution_status", NScheme::NTypeIds::Int32),
Col("finalization_status", NScheme::NTypeIds::Int32),
Col("execution_mode", NScheme::NTypeIds::Int32),
Col("start_ts", NScheme::NTypeIds::Timestamp),
Col("end_ts", NScheme::NTypeIds::Timestamp),
Col("query_text", NScheme::NTypeIds::Text),
Col("syntax", NScheme::NTypeIds::Int32),
Col("ast", NScheme::NTypeIds::Text),
Col("ast_compressed", NScheme::NTypeIds::String),
Col("ast_compression_method", NScheme::NTypeIds::Text),
Col("issues", NScheme::NTypeIds::JsonDocument),
Col("plan", NScheme::NTypeIds::JsonDocument),
Col("meta", NScheme::NTypeIds::JsonDocument),
Col("parameters", NScheme::NTypeIds::String), // TODO: store aparameters separately to support bigger storage.
Col("result_set_metas", NScheme::NTypeIds::JsonDocument),
Col("stats", NScheme::NTypeIds::JsonDocument),
Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
Col("customer_supplied_id", NScheme::NTypeIds::Text),
Col("user_token", NScheme::NTypeIds::Text),
Col("script_sinks", NScheme::NTypeIds::JsonDocument),
Col("script_secret_names", NScheme::NTypeIds::JsonDocument),
},
{ "database", "execution_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at", DEADLINE_OFFSET, BRO_RUN_INTERVAL)
);
}

void RunCreateScriptExecutionLeases() {
TablesCreating++;
Register(
CreateTableCreator(
{ ".metadata", "script_execution_leases" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("lease_deadline", NScheme::NTypeIds::Timestamp),
Col("lease_generation", NScheme::NTypeIds::Int64),
Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
},
{ "database", "execution_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at")
)
static IActor* GetScriptExecutionLeasesCreator() {
return CreateTableCreator(
{ ".metadata", "script_execution_leases" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("lease_deadline", NScheme::NTypeIds::Timestamp),
Col("lease_generation", NScheme::NTypeIds::Int64),
Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
},
{ "database", "execution_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at", DEADLINE_OFFSET, BRO_RUN_INTERVAL)
);
}

void RunCreateScriptResultSets() {
TablesCreating++;
Register(
CreateTableCreator(
{ ".metadata", "result_sets" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("result_set_id", NScheme::NTypeIds::Int32),
Col("row_id", NScheme::NTypeIds::Int64),
Col("expire_at", NScheme::NTypeIds::Timestamp),
Col("result_set", NScheme::NTypeIds::String),
Col("accumulated_size", NScheme::NTypeIds::Int64),
},
{ "database", "execution_id", "result_set_id", "row_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at")
)
static IActor* GetScriptResultSetsCreator() {
return CreateTableCreator(
{ ".metadata", "result_sets" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("result_set_id", NScheme::NTypeIds::Int32),
Col("row_id", NScheme::NTypeIds::Int64),
Col("expire_at", NScheme::NTypeIds::Timestamp),
Col("result_set", NScheme::NTypeIds::String),
Col("accumulated_size", NScheme::NTypeIds::Int64),
},
{ "database", "execution_id", "result_set_id", "row_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at", DEADLINE_OFFSET, BRO_RUN_INTERVAL)
);
}

void Handle(TEvTableCreator::TEvCreateTableResponse::TPtr&) {
Y_ABORT_UNLESS(TablesCreating > 0);
if (--TablesCreating == 0) {
Send(Owner, std::move(ResultEvent));
PassAway();
}
void OnTablesCreated(bool success, NYql::TIssues issues) override {
Send(Owner, new TEvScriptExecutionsTablesCreationFinished(success, std::move(issues)));
}

STRICT_STFUNC(StateFunc,
hFunc(TEvTableCreator::TEvCreateTableResponse, Handle);
)

private:
THolder<NActors::IEventBase> ResultEvent;
NActors::TActorId Owner;
size_t TablesCreating = 0;
};

Ydb::Query::ExecMode GetExecModeFromAction(NKikimrKqp::EQueryAction action) {
Expand Down Expand Up @@ -2867,8 +2819,8 @@ NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPt
return new TCreateScriptExecutionActor(std::move(ev), queryServiceConfig, counters, maxRunTime);
}

NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent) {
return new TScriptExecutionsTablesCreator(std::move(resultEvent));
NActors::IActor* CreateScriptExecutionsTablesCreator() {
return new TScriptExecutionsTablesCreator();
}

NActors::IActor* CreateForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/proxy_service/kqp_script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace NKikimr::NKqp {

// Creates all needed tables.
// Sends result event back when the work is done.
NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent);
NActors::IActor* CreateScriptExecutionsTablesCreator();

// Create script execution and run it.
NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr<TKqpCounters> counters, TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT);
Expand Down
Loading
Loading