Skip to content

Commit

Permalink
Merge c34380a into 45c8bad
Browse files Browse the repository at this point in the history
  • Loading branch information
VPolka authored May 27, 2024
2 parents 45c8bad + c34380a commit 843db90
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 36 deletions.
5 changes: 4 additions & 1 deletion ydb/core/kqp/common/compilation/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
const TMaybe<TKqpQueryId>& query, bool isQueryActionPrepare, TInstant deadline,
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr)
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing())
: UserToken(userToken)
, Uid(uid)
, Query(query)
Expand All @@ -89,6 +89,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
, Orbit(std::move(orbit))
, TempTablesState(std::move(tempTablesState))
, IntrestedInResult(std::move(intrestedInResult))
, QueryAst(queryAst)
{
}

Expand All @@ -107,6 +108,8 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::

TKqpTempTablesState::TConstPtr TempTablesState;
std::shared_ptr<std::atomic<bool>> IntrestedInResult;

TMaybe<TQueryAst> QueryAst;
};

struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,13 +759,17 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
NWilson::TSpan compileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService");

TKqpCompileSettings compileSettings(true, request.IsQueryActionPrepare, false, request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE);
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query,
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, request.Query ? *request.Query : *compileResult->Query,
compileSettings, request.UserToken, dbCounters, request.GUCSettings, request.ApplicationName,
ev->Cookie, std::move(ev->Get()->IntrestedInResult),
ev->Get()->UserRequestContext,
ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(),
std::move(compileServiceSpan), std::move(ev->Get()->TempTablesState));

if (TableServiceConfig.GetEnableAstCache() && request.QueryAst) {
return CompileByAst(*request.QueryAst, compileRequest, ctx);
}

if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
Counters->ReportCompileRequestRejected(dbCounters);

Expand Down Expand Up @@ -1138,7 +1142,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
<< ", message: " << e.what());
}

void Reply(const TActorId& sender, const TVector<TQueryAst> astStatements, const TKqpQueryId query,
void Reply(const TActorId& sender, const TVector<TQueryAst>& astStatements, const TKqpQueryId query,
const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
LWTRACK(KqpCompileServiceReply,
Expand All @@ -1157,7 +1161,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
ctx.Send(sender, responseEv.Release(), 0, cookie);
}

void ReplyQueryStatements(const TActorId& sender, const TVector<TQueryAst> astStatements,
void ReplyQueryStatements(const TActorId& sender, const TVector<TQueryAst>& astStatements,
const TKqpQueryId query, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
LWTRACK(KqpCompileServiceReplyStatements, orbit);
Expand Down
37 changes: 34 additions & 3 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,31 @@ using namespace NSchemeCache;
#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
#define LOG_T(msg) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)


TKqpQueryState::TQueryTxId::TQueryTxId(const TQueryTxId& other) {
YQL_ENSURE(!Id);
Id = other.Id;
}

TKqpQueryState::TQueryTxId& TKqpQueryState::TQueryTxId::operator=(const TQueryTxId& id) {
YQL_ENSURE(!Id);
Id = id.Id;
return *this;
}

void TKqpQueryState::TQueryTxId::SetValue(const TTxId& id) {
YQL_ENSURE(!Id);
Id = id.Id;
}

TTxId TKqpQueryState::TQueryTxId::GetValue() {
return Id ? *Id : TTxId();
}

void TKqpQueryState::TQueryTxId::Reset() {
Id = TTxId();
}

bool TKqpQueryState::EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response) {
Y_ENSURE(response.Request);
const auto& navigate = *response.Request;
Expand Down Expand Up @@ -244,8 +269,15 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque
compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt);
}

return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid, CompileResult->Query, isQueryActionPrepare,
compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState);
TMaybe<TQueryAst> statementAst;
if (!Statements.empty()) {
YQL_ENSURE(CurrentStatementId < Statements.size());
statementAst = Statements[CurrentStatementId];
}

return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid, query, isQueryActionPrepare,
compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState,
statementAst);
}

std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildSplitRequest(std::shared_ptr<std::atomic<bool>> cookie, const TGUCSettings::TPtr& gUCSettingsPtr) {
Expand Down Expand Up @@ -295,7 +327,6 @@ bool TKqpQueryState::PrepareNextStatementPart() {
QueryData = {};
PreparedQuery = {};
CompileResult = {};
TxCtx = {};
CurrentTx = 0;
TableVersions = {};
MaxReadType = ETableReadType::Other;
Expand Down
24 changes: 18 additions & 6 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ namespace NKikimr::NKqp {
// common case).
class TKqpQueryState : public TNonCopyable {
public:
class TQueryTxId {
public:
TQueryTxId() = default;
TQueryTxId(const TQueryTxId& other);
TQueryTxId& operator=(const TQueryTxId& id);

void SetValue(const TTxId& id);
TTxId GetValue();

void Reset();

private:
TMaybe<TTxId> Id;
};

TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database, const TMaybe<TString>& applicationName,
const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession, const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TString& sessionId, TMonotonic startedAt)
Expand Down Expand Up @@ -110,7 +125,7 @@ class TKqpQueryState : public TNonCopyable {
NWilson::TSpan KqpSessionSpan;
ETableReadType MaxReadType = ETableReadType::Other;

TTxId TxId; // User tx
TQueryTxId TxId; // User tx
bool Commit = false;
bool Commited = false;

Expand All @@ -132,6 +147,7 @@ class TKqpQueryState : public TNonCopyable {
NYql::TIssues Issues;

TVector<TQueryAst> Statements;
TMaybe<TQueryTxId> ImplicitTxId = {}; // Implicit tx for all statements
ui32 CurrentStatementId = 0;
ui32 StatementResultIndex = 0;
ui32 StatementResultSize = 0;
Expand Down Expand Up @@ -417,15 +433,12 @@ class TKqpQueryState : public TNonCopyable {
}

void PrepareCurrentStatement() {
QueryData = {};
QueryData = std::make_shared<TQueryData>(TxCtx->TxAlloc);
PreparedQuery = {};
CompileResult = {};
TxCtx = {};
CurrentTx = 0;
TableVersions = {};
MaxReadType = ETableReadType::Other;
Commit = false;
Commited = false;
TopicOperations = {};
ReplayMessage = {};
}
Expand All @@ -444,7 +457,6 @@ class TKqpQueryState : public TNonCopyable {
TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED;
break;
default:
Commit = true;
TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
}
}
Expand Down
41 changes: 24 additions & 17 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
QueryState->TxCtx = std::move(txCtx);
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
QueryState->TxId = txId;
QueryState->TxId.SetValue(txId);
if (!CheckTransactionLocks(/*tx*/ nullptr)) {
return;
}
Expand Down Expand Up @@ -540,6 +540,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

void Handle(TEvKqp::TEvParseResponse::TPtr& ev) {
QueryState->SaveAndCheckParseResult(std::move(*ev->Get()));
Ydb::Table::TransactionSettings settings;
settings.mutable_serializable_read_write();
BeginTx(settings);
QueryState->ImplicitTxId = QueryState->TxId;
CompileStatement();
}

Expand Down Expand Up @@ -680,7 +684,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}

void BeginTx(const Ydb::Table::TransactionSettings& settings) {
QueryState->TxId = UlidGen.Next();
QueryState->TxId.SetValue(UlidGen.Next());
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry,
AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects);

Expand All @@ -704,7 +708,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->TxCtx->SetIsolationLevel(settings);
QueryState->TxCtx->OnBeginQuery();

if (!Transactions.CreateNew(QueryState->TxId, QueryState->TxCtx)) {
if (!Transactions.CreateNew(QueryState->TxId.GetValue(), QueryState->TxCtx)) {
std::vector<TIssue> issues{
YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS)};
ythrow TRequestFail(Ydb::StatusIds::BAD_SESSION,
Expand All @@ -717,14 +721,14 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
Counters->ReportBeginTransaction(Settings.DbCounters, Transactions.EvictedTx, Transactions.Size(), Transactions.ToBeAbortedSize());
}

static const Ydb::Table::TransactionControl& GetImpliedTxControl() {
auto create = []() -> Ydb::Table::TransactionControl {
Ydb::Table::TransactionControl control;
Ydb::Table::TransactionControl GetImpliedTxControl() {
Ydb::Table::TransactionControl control;
control.set_commit_tx(QueryState->ProcessingLastStatement());
if (QueryState->ImplicitTxId) {
control.set_tx_id(QueryState->ImplicitTxId->GetValue().GetHumanStr());
} else {
control.mutable_begin_tx()->mutable_serializable_read_write();
control.set_commit_tx(true);
return control;
};
static const Ydb::Table::TransactionControl control = create();
}
return control;
}

Expand All @@ -744,14 +748,17 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
QueryState->TxCtx = txCtx;
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
QueryState->TxId = txId;
if (hasTxControl) {
LOG_E("AAA");
QueryState->TxId.SetValue(txId);
}
break;
}
case Ydb::Table::TransactionControl::kBeginTx: {
BeginTx(txControl.begin_tx());
break;
}
case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET:
}
case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET:
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST)
<< "wrong TxControl: tx_selector must be set";
break;
Expand Down Expand Up @@ -1539,7 +1546,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

void FillTxInfo(NKikimrKqp::TQueryResponse* response) {
YQL_ENSURE(QueryState);
response->MutableTxMeta()->set_id(QueryState->TxId.GetHumanStr());
response->MutableTxMeta()->set_id(QueryState->TxId.GetValue().GetHumanStr());

if (QueryState->TxCtx) {
auto txInfo = QueryState->TxCtx->GetInfo();
Expand Down Expand Up @@ -1612,8 +1619,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

if (QueryState->Commit) {
ResetTxState();
Transactions.ReleaseTransaction(QueryState->TxId);
QueryState->TxId = TTxId();
Transactions.ReleaseTransaction(QueryState->TxId.GetValue());
QueryState->TxId.Reset();
}

FillTxInfo(response);
Expand Down Expand Up @@ -1958,7 +1965,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
auto& txCtx = QueryState->TxCtx;
if (txCtx->IsInvalidated()) {
Transactions.AddToBeAborted(txCtx);
Transactions.ReleaseTransaction(QueryState->TxId);
Transactions.ReleaseTransaction(QueryState->TxId.GetValue());
}
DiscardPersistentSnapshot(txCtx->SnapshotHandle);
}
Expand Down
Loading

0 comments on commit 843db90

Please sign in to comment.