Skip to content

Commit

Permalink
KIKIMR-19452: Pg insert from selection by column order
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Dec 15, 2023
1 parent 2fbf87b commit 4362647
Show file tree
Hide file tree
Showing 14 changed files with 395 additions and 241 deletions.
11 changes: 8 additions & 3 deletions ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,11 +380,16 @@ class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer
}

auto op = GetTableOp(node);
if (NPgTypeAnn::NeedsValuesRename(node, op)) {
if (!NPgTypeAnn::RewriteValuesColumnNames(node, table, ctx, Types)) {
if (NPgTypeAnn::IsPgInsert(node, op)) {
TExprNode::TPtr newInput;
auto ok = NCommon::RenamePgSelectColumns(node.Input().Cast<TCoPgSelect>(), newInput, table->Metadata->ColumnOrder, ctx, Types);
if (!ok) {
return TStatus::Error;
}
return TStatus::Repeat;
if (newInput != node.Input().Ptr()) {
node.Ptr()->ChildRef(TKiWriteTable::idx_Input) = newInput;
return TStatus::Repeat;
}
}

if (!rowType) {
Expand Down
130 changes: 8 additions & 122 deletions ydb/core/kqp/provider/yql_kikimr_type_ann_pg.cpp
Original file line number Diff line number Diff line change
@@ -1,134 +1,19 @@
#include "yql_kikimr_type_ann_pg.h"

#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <util/string/join.h>

namespace NYql {
namespace NPgTypeAnn {

using namespace NNodes;

namespace {
bool MatchesSetItemOption(const TExprBase& setItemOption, TStringBuf name) {
if (setItemOption.Ref().IsList() && setItemOption.Ref().ChildrenSize() > 0) {
if (setItemOption.Ref().ChildPtr(0)->Content() == name) {
return true;
}
}
return false;
bool IsPgInsert(const TKiWriteTable &node, TYdbOperation op) {
if (auto pgSelect = node.Input().Maybe<TCoPgSelect>()) {
return op == TYdbOperation::InsertAbort
&& NCommon::NeedToRenamePgSelectColumns(pgSelect.Cast());
}

TExprNode::TPtr GetSetItemOptionValue(const TExprBase& setItemOption) {
if (setItemOption.Ref().IsList() && setItemOption.Ref().ChildrenSize() > 1) {
return setItemOption.Ref().ChildPtr(1);
}
return nullptr;
}

bool TransformPgSetItemOption(
const TKiWriteTable& node,
TStringBuf optionName,
std::function<void(const TExprBase&)> lambda
) {
bool applied = false;
if (auto pgSelect = node.Input().Maybe<TCoPgSelect>()) {
for (const auto& option : pgSelect.Cast().SelectOptions()) {
if (option.Name() == "set_items") {
auto pgSetItems = option.Value().Cast<TExprList>();
for (const auto& setItem : pgSetItems) {
auto setItemNode = setItem.Cast<TCoPgSetItem>();
for (const auto& setItemOption : setItemNode.SetItemOptions()) {
if (MatchesSetItemOption(setItemOption, optionName)) {
applied = true;
lambda(setItemOption);
}
}
}
}
}
}
return applied;
}

TExprNode::TPtr GetSetItemOption(const TKiWriteTable& node, TStringBuf optionName) {
TExprNode::TPtr nodePtr = nullptr;
TransformPgSetItemOption(node, optionName, [&nodePtr](const TExprBase& option) {
nodePtr = option.Ptr();
});
return nodePtr;
}
} //namespace

bool NeedsValuesRename(const NNodes::TKiWriteTable &node, TYdbOperation op) {
auto fill = GetSetItemOption(node, "fill_target_columns");

return op == TYdbOperation::InsertAbort
&& fill
&& !GetSetItemOptionValue(TExprBase(fill));
}

bool RewriteValuesColumnNames(
const TKiWriteTable& node,
const TKikimrTableDescription* table,
TExprContext& ctx,
TTypeAnnotationContext& types
) {
bool ok = true;
TransformPgSetItemOption(node, "values", [&ok, &node, &table, &ctx, &types](const TExprBase &setItemOption) {
auto values = GetSetItemOptionValue(setItemOption);
if (values->ChildrenSize() > table->Metadata->Columns.size()) {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << Sprintf(
"VALUES have %zu columns, INSERT INTO expects: %zu",
values->ChildrenSize(),
table->Metadata->Columns.size()
)));
ok = false;
return;
}
TExprNode::TListType columns;
THashMap<TString, TString> valueColumnName;
columns.reserve(values->ChildrenSize());
for (ui32 index = 0; index < values->ChildrenSize(); ++index) {
valueColumnName[values->Child(index)->Content()] = table->Metadata->ColumnOrder.at(index);
columns.push_back(Build<TCoAtom>(ctx, node.Pos())
.Value(table->Metadata->ColumnOrder.at(index))
.Done().Ptr());
columns.back()->SetTypeAnn(values->Child(index)->GetTypeAnn());
}
values->ChangeChildrenInplace(std::move(columns));
auto input = node.Ptr()->ChildRef(TKiWriteTable::idx_Input);
const TTypeAnnotationNode* inputType;
switch (input->GetTypeAnn()->GetKind()) {
case ETypeAnnotationKind::List:
inputType = input->GetTypeAnn()->Cast<TListExprType>()->GetItemType();
break;
default:
inputType = input->GetTypeAnn();
break;
}
Y_ENSURE(inputType->GetKind() == ETypeAnnotationKind::Struct);
TVector<const TItemExprType*> rowTypeItems;
for (const auto& item : inputType->Cast<TStructExprType>()->GetItems()) {
rowTypeItems.emplace_back(ctx.MakeType<TItemExprType>(valueColumnName[item->GetName()], item->GetItemType()));
}
const TStructExprType* rowStructType = ctx.MakeType<TStructExprType>(rowTypeItems);
if (input->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) {
input->SetTypeAnn(ctx.MakeType<TListExprType>(rowStructType));
} else {
input->SetTypeAnn(rowStructType);
}
types.SetColumnOrder(*input, TVector<TString>(table->Metadata->ColumnOrder.begin(), table->Metadata->ColumnOrder.begin() + values->ChildrenSize()), ctx, /*overwrite=*/true);
});
if (ok) {
auto fill = GetSetItemOption(node, "fill_target_columns");
fill->ChangeChildrenInplace({
fill->Child(0),
Build<TCoAtom>(ctx, node.Pos())
.Value("done")
.Done().Ptr()
});
fill->ChildPtr(1)->SetTypeAnn(ctx.MakeType<TUnitExprType>());
}
return ok;
return false;
}

bool ValidatePgUpdateKeys(const TKiWriteTable& node, const TKikimrTableDescription* table, TExprContext& ctx) {
Expand All @@ -140,7 +25,8 @@ bool ValidatePgUpdateKeys(const TKiWriteTable& node, const TKikimrTableDescripti
ok = false;
}
};
TransformPgSetItemOption(node, "result", [&updateKeyCheck](const TExprBase& setItemOptionNode) {
auto pgSelect = node.Input().Cast<TCoPgSelect>();
NCommon::TransformPgSetItemOption(pgSelect, "result", [&updateKeyCheck](const TExprBase& setItemOptionNode) {
auto setItemOption = setItemOptionNode.Cast<TCoNameValueTuple>();
auto resultList = setItemOption.Value().Cast<TExprList>();
for (size_t id = 1; id < resultList.Size(); id++) {
Expand Down
8 changes: 1 addition & 7 deletions ydb/core/kqp/provider/yql_kikimr_type_ann_pg.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,10 @@
namespace NYql {
namespace NPgTypeAnn {

bool NeedsValuesRename(
bool IsPgInsert(
const NNodes::TKiWriteTable& node,
TYdbOperation op);

bool RewriteValuesColumnNames(
const NNodes::TKiWriteTable& node,
const TKikimrTableDescription* table,
TExprContext& ctx,
TTypeAnnotationContext& types);

bool ValidatePgUpdateKeys(
const NNodes::TKiWriteTable& node,
const TKikimrTableDescription* table,
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,10 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
NULL::int2
))");
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(!result.IsSuccess());
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR), result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString());
UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n"
" <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n");
}

{ /* set NULL to not null pk column */
Expand All @@ -202,7 +204,7 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
NULL::int2, 123::int2
))");
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(!result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString());
UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n"
" <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n");
Expand Down
Loading

0 comments on commit 4362647

Please sign in to comment.