Skip to content

Commit

Permalink
Atomic CTAS (for datashard) (ydb-platform#5912)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Jun 26, 2024
1 parent 09a6870 commit 35e8bdd
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 18 deletions.
45 changes: 44 additions & 1 deletion ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,50 @@ class TKqpGatewayProxy : public IKikimrGateway {
}

TFuture<TGenericResult> RenameTable(const TString& src, const TString& dst, const TString& cluster) override {
FORWARD_ENSURE_NO_PREPARE(RenameTable, src, dst, cluster);
CHECK_PREPARED_DDL(RenameTable);

auto metadata = SessionCtx->Tables().GetTable(cluster, src).Metadata;

std::pair<TString, TString> pathPair;
TString error;
if (!NSchemeHelpers::SplitTablePath(metadata->Name, GetDatabase(), pathPair, error, false)) {
return MakeFuture(ResultFromError<TGenericResult>(error));
}

auto temporary = metadata->Temporary;
auto renameTablePromise = NewPromise<TGenericResult>();

NKikimrSchemeOp::TModifyScheme schemeTx;
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpMoveTable);
schemeTx.SetWorkingDir(pathPair.first);

auto* renameTable = schemeTx.MutableMoveTable();
renameTable->SetSrcPath(src);
renameTable->SetDstPath(dst);

if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);


phyTx.MutableSchemeOperation()->MutableAlterTable()->Swap(&schemeTx);
TGenericResult result;
result.SetSuccess();
renameTablePromise.SetValue(result);
} else {
if (temporary) {
auto code = Ydb::StatusIds::BAD_REQUEST;
auto error = TStringBuilder() << "Not allowed to rename temp table";
IKqpGateway::TGenericResult errResult;
errResult.AddIssue(NYql::TIssue(error));
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
renameTablePromise.SetValue(errResult);
}
return Gateway->RenameTable(src, dst, cluster);
}

return renameTablePromise.GetFuture();
}

TFuture<TGenericResult> DropTable(const TString& cluster, const TDropTableSettings& settings) override {
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,10 @@ class TKqpHost : public IKqpHost {

SessionCtx->SetDatabase(database);
SessionCtx->SetCluster(cluster);
SessionCtx->SetTempTables(std::move(tempTablesState));
if (tempTablesState) {
SessionCtx->SetSessionId(tempTablesState->SessionId);
SessionCtx->SetTempTables(std::move(tempTablesState));
}

if (FederatedQuerySetup) {
ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory({},
Expand Down
74 changes: 73 additions & 1 deletion ydb/core/kqp/host/kqp_statement_rewrite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace {
struct TCreateTableAsResult {
NYql::TExprNode::TPtr CreateTable = nullptr;
NYql::TExprNode::TPtr ReplaceInto = nullptr;
NYql::TExprNode::TPtr MoveTable = nullptr;
};

bool IsOlap(const NYql::NNodes::TMaybeNode<NYql::NNodes::TCoNameValueTupleList>& tableSettings) {
Expand Down Expand Up @@ -71,6 +72,9 @@ namespace {
return std::nullopt;
}

auto tableNameNode = key.Ptr()->Child(0)->Child(1)->Child(0);
const TString tableName(tableNameNode->Content());

auto maybeList = writeArgs.Get(4).Maybe<NYql::NNodes::TExprList>();
if (!maybeList) {
return std::nullopt;
Expand Down Expand Up @@ -167,8 +171,39 @@ namespace {
}));
}

const bool isTemporary = settings.Temporary.IsValid() && settings.Temporary.Cast().Value() == "true";
if (isTemporary) {
exprCtx.AddError(NYql::TIssue(exprCtx.GetPosition(pos), "CREATE TEMPORARY TABLE AS is not supported at current time"));
return std::nullopt;
}

const bool isAtomicOperation = !isOlap;

const TString tmpTableName = TStringBuilder()
<< tableName
<< "_cas_"
<< TAppData::RandomProvider->GenRand();

const TString createTableName = !isAtomicOperation
? tableName
: (TStringBuilder()
<< "/Root/.tmp/sessions/"
<< sessionCtx->GetSessionId()
<< tmpTableName);

create = exprCtx.ReplaceNode(std::move(create), *columns, exprCtx.NewList(pos, std::move(columnNodes)));

if (isAtomicOperation) {
std::vector<NYql::TExprNodePtr> settingsNodes;
for (size_t index = 0; index < create->Child(4)->ChildrenSize(); ++index) {
settingsNodes.push_back(create->Child(4)->ChildPtr(index));
}
settingsNodes.push_back(
exprCtx.NewList(pos, {exprCtx.NewAtom(pos, "temporary")}));
create = exprCtx.ReplaceNode(std::move(create), *create->Child(4), exprCtx.NewList(pos, std::move(settingsNodes)));
create = exprCtx.ReplaceNode(std::move(create), *tableNameNode, exprCtx.NewAtom(pos, tmpTableName));
}

const auto topLevelRead = NYql::FindTopLevelRead(insertData.Ptr());

NYql::TExprNode::TListType insertSettings;
Expand All @@ -194,7 +229,7 @@ namespace {
exprCtx.NewList(pos, {
exprCtx.NewAtom(pos, "table"),
exprCtx.NewCallable(pos, "String", {
exprCtx.NewAtom(pos, key.Ptr()->Child(0)->Child(1)->Child(0)->Content()),
exprCtx.NewAtom(pos, createTableName),
}),
}),
}),
Expand All @@ -218,6 +253,40 @@ namespace {
}),
});

if (isAtomicOperation) {
result.MoveTable = exprCtx.NewCallable(pos, "Write!", {
exprCtx.NewWorld(pos),
exprCtx.NewCallable(pos, "DataSink", {
exprCtx.NewAtom(pos, "kikimr"),
exprCtx.NewAtom(pos, "db"),
}),
exprCtx.NewCallable(pos, "Key", {
exprCtx.NewList(pos, {
exprCtx.NewAtom(pos, "tablescheme"),
exprCtx.NewCallable(pos, "String", {
exprCtx.NewAtom(pos, createTableName),
}),
}),
}),
exprCtx.NewCallable(pos, "Void", {}),
exprCtx.NewList(pos, {
exprCtx.NewList(pos, {
exprCtx.NewAtom(pos, "mode"),
exprCtx.NewAtom(pos, "alter"),
}),
exprCtx.NewList(pos, {
exprCtx.NewAtom(pos, "actions"),
exprCtx.NewList(pos, {
exprCtx.NewList(pos, {
exprCtx.NewAtom(pos, "renameTo"),
exprCtx.NewAtom(pos, tableName),
}),
}),
}),
}),
});
}

return result;
}
}
Expand All @@ -238,6 +307,9 @@ TVector<NYql::TExprNode::TPtr> RewriteExpression(
YQL_ENSURE(result.empty());
result.push_back(rewriteResult->CreateTable);
result.push_back(rewriteResult->ReplaceInto);
if (rewriteResult->MoveTable) {
result.push_back(rewriteResult->MoveTable);
}
}
}
return true;
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,10 @@ class TKikimrSessionContext : public TThrRefBase {
return Database;
}

const TString& GetSessionId() const {
return SessionId;
}

void SetCluster(const TString& cluster) {
Cluster = cluster;
}
Expand All @@ -514,6 +518,10 @@ class TKikimrSessionContext : public TThrRefBase {
Database = database;
}

void SetSessionId(const TString& sessionId) {
SessionId = sessionId;
}

NKikimr::NKqp::TKqpTempTablesState::TConstPtr GetTempTablesState() const {
return TempTablesState;
}
Expand Down Expand Up @@ -544,6 +552,7 @@ class TKikimrSessionContext : public TThrRefBase {
TString UserName;
TString Cluster;
TString Database;
TString SessionId;
TKikimrConfiguration::TPtr Configuration;
TIntrusivePtr<TKikimrTablesData> TablesData;
TIntrusivePtr<TKikimrQueryContext> QueryCtx;
Expand Down
51 changes: 41 additions & 10 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ class TKqpQueryState : public TNonCopyable {
return RequestEv->GetDatabase();
}

bool IsSplitted() const {
return !SplittedExprs.empty();
}

bool IsCreateTableAs() const {
return IsSplitted();
}

// todo: gvit
// fill this hash set only once on query compilation.
void FillTables(const NKqpProto::TKqpPhyTx& phyTx) {
Expand Down Expand Up @@ -321,7 +329,6 @@ class TKqpQueryState : public TNonCopyable {

bool ShouldCommitWithCurrentTx(const TKqpPhyTxHolder::TConstPtr& tx) {
const auto& phyQuery = PreparedQuery->GetPhysicalQuery();

if (!Commit) {
return false;
}
Expand Down Expand Up @@ -357,11 +364,16 @@ class TKqpQueryState : public TNonCopyable {
return !TxCtx->TxHasEffects();
}

bool ShouldAcquireLocks() {
bool ShouldAcquireLocks(const TKqpPhyTxHolder::TConstPtr& tx) {
if (*TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) {
return false;
}

// Inconsistent writes (CTAS) don't require locks.
if (IsSplitted() && !HasTxSinkInTx(tx)) {
return false;
}

if (TxCtx->Locks.GetLockTxId() && !TxCtx->Locks.Broken()) {
return true; // Continue to acquire locks
}
Expand Down Expand Up @@ -416,16 +428,35 @@ class TKqpQueryState : public TNonCopyable {
return tx;
}

bool HasTxSinkInStage(const ::NKqpProto::TKqpPhyStage& stage) const {
for (const auto& sink : stage.GetSinks()) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink && sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
if (!settings.GetInconsistentTx()) {
return true;
}
}
}
return false;
}

bool HasTxSink() const {
const auto& query = PreparedQuery->GetPhysicalQuery();
for (auto& tx : query.GetTransactions()) {
for (const auto& stage : tx.GetStages()) {
if (HasTxSinkInStage(stage)) {
return true;
}
}
}
return false;
}

bool HasTxSinkInTx(const TKqpPhyTxHolder::TConstPtr& tx) const {
for (const auto& stage : tx->GetStages()) {
for (const auto& sink : stage.GetSinks()) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink && sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
if (!settings.GetInconsistentTx()) {
return true;
}
}
if (HasTxSinkInStage(stage)) {
return true;
}
}
return false;
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
switch (tx->GetType()) {
case NKqpProto::TKqpPhyTx::TYPE_SCHEME:
YQL_ENSURE(tx->StagesSize() == 0);

if (QueryState->HasTxControl() && QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"Scheme operations cannot be executed inside transaction");
Expand Down Expand Up @@ -1197,7 +1196,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}

request.TopicOperations = std::move(txCtx.TopicOperations);
} else if (QueryState->ShouldAcquireLocks()) {
} else if (QueryState->ShouldAcquireLocks(tx)) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();

if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
Expand Down Expand Up @@ -1232,7 +1231,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

auto userToken = QueryState->UserToken;
const TString requestType = QueryState->GetRequestType();
bool temporary = GetTemporaryTableInfo(tx).has_value();
const bool temporary = GetTemporaryTableInfo(tx).has_value();

auto executerActor = CreateKqpSchemeExecuter(tx, QueryState->GetType(), SelfId(), requestType, Settings.Database, userToken,
temporary, TempTablesState.SessionId, QueryState->UserRequestContext, KqpTempTablesAgentActor);
Expand Down Expand Up @@ -1375,6 +1374,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
if (!tx) {
return;
}
if (QueryState->IsCreateTableAs()) {
return;
}

auto optInfo = GetTemporaryTableInfo(tx);
if (optInfo) {
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/ut/query/kqp_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1607,7 +1607,8 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false);
.SetWithSampleTables(false)
.SetEnableTempTables(true);
TKikimrRunner kikimr(settings);

const TString query = R"(
Expand Down Expand Up @@ -1659,7 +1660,8 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false);
.SetWithSampleTables(false)
.SetEnableTempTables(true);
TKikimrRunner kikimr(settings);

const TString query = R"(
Expand Down

0 comments on commit 35e8bdd

Please sign in to comment.