Skip to content

Commit

Permalink
Merge af17d86 into f3c6975
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Jun 10, 2024
2 parents f3c6975 + af17d86 commit d552190
Show file tree
Hide file tree
Showing 15 changed files with 246 additions and 76 deletions.
9 changes: 9 additions & 0 deletions ydb/core/kqp/common/kqp_yql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ TKqpUpsertRowsSettings TKqpUpsertRowsSettings::Parse(const TCoNameValueTupleList
} else if (name == TKqpUpsertRowsSettings::IsUpdateSettingName) {
YQL_ENSURE(tuple.Ref().ChildrenSize() == 1);
settings.IsUpdate = true;
} else if (name == TKqpUpsertRowsSettings::AllowInconsistentWritesSettingName) {
YQL_ENSURE(tuple.Ref().ChildrenSize() == 1);
settings.AllowInconsistentWrites = true;
} else {
YQL_ENSURE(false, "Unknown KqpUpsertRows setting name '" << name << "'");
}
Expand Down Expand Up @@ -310,6 +313,12 @@ NNodes::TCoNameValueTupleList TKqpUpsertRowsSettings::BuildNode(TExprContext& ct
.Name().Build(IsUpdateSettingName)
.Done());
}
if (AllowInconsistentWrites) {
settings.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(AllowInconsistentWritesSettingName)
.Done());
}

return Build<TCoNameValueTupleList>(ctx, pos)
.Add(settings)
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/common/kqp_yql.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,15 @@ struct TKqpReadTableSettings {
struct TKqpUpsertRowsSettings {
static constexpr TStringBuf InplaceSettingName = "Inplace";
static constexpr TStringBuf IsUpdateSettingName = "IsUpdate";
static constexpr TStringBuf AllowInconsistentWritesSettingName = "AllowInconsistentWrites";

bool Inplace = false;
bool IsUpdate = false;
bool AllowInconsistentWrites = false;

void SetInplace() { Inplace = true; }
void SetIsUpdate() { IsUpdate = true; }
void SetAllowInconsistentWrites() { AllowInconsistentWrites = true; }

static TKqpUpsertRowsSettings Parse(const NNodes::TCoNameValueTupleList& settingsList);
static TKqpUpsertRowsSettings Parse(const NNodes::TKqpUpsertRows& node);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@
"Children": [
{"Index": 0, "Name": "Table", "Type": "TKqpTable"},
{"Index": 1, "Name": "Columns", "Type": "TCoAtomList"},
{"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
{"Index": 2, "Name": "InconsistentWrite", "Type": "TCoAtom"},
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
]
},
{
Expand Down
26 changes: 18 additions & 8 deletions ydb/core/kqp/host/kqp_statement_rewrite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace {
NYql::TExprNode::TPtr ReplaceInto = nullptr;
};

bool IsColumnTable(const NYql::NNodes::TMaybeNode<NYql::NNodes::TCoNameValueTupleList>& tableSettings) {
bool IsOlap(const NYql::NNodes::TMaybeNode<NYql::NNodes::TCoNameValueTupleList>& tableSettings) {
if (!tableSettings) {
return false;
}
Expand Down Expand Up @@ -85,6 +85,8 @@ namespace {
return std::nullopt;
}

const bool isOlap = IsOlap(settings.TableSettings);

const auto& insertData = writeArgs.Get(3);
if (insertData.Ptr()->Content() == "Void") {
return std::nullopt;
Expand Down Expand Up @@ -136,7 +138,7 @@ namespace {
const auto name = item->GetName();
auto currentType = item->GetItemType();

const bool notNull = primariKeyColumns.contains(name) && IsColumnTable(settings.TableSettings);
const bool notNull = primariKeyColumns.contains(name) && isOlap;

if (notNull && currentType->GetKind() == NYql::ETypeAnnotationKind::Optional) {
currentType = currentType->Cast<NYql::TOptionalExprType>()->GetItemType();
Expand Down Expand Up @@ -169,6 +171,19 @@ namespace {

const auto topLevelRead = NYql::FindTopLevelRead(insertData.Ptr());

NYql::TExprNode::TListType insertSettings;
insertSettings.push_back(
exprCtx.NewList(pos, {
exprCtx.NewAtom(pos, "mode"),
exprCtx.NewAtom(pos, "replace"),
}));
if (!isOlap) {
insertSettings.push_back(
exprCtx.NewList(pos, {
exprCtx.NewAtom(pos, "AllowInconsistentWrites"),
}));
}

const auto insert = exprCtx.NewCallable(pos, "Write!", {
topLevelRead == nullptr ? exprCtx.NewWorld(pos) : exprCtx.NewCallable(pos, "Left!", {topLevelRead.Get()}),
exprCtx.NewCallable(pos, "DataSink", {
Expand All @@ -184,12 +199,7 @@ namespace {
}),
}),
insertData.Ptr(),
exprCtx.NewList(pos, {
exprCtx.NewList(pos, {
exprCtx.NewAtom(pos, "mode"),
exprCtx.NewAtom(pos, "replace"),
}),
}),
exprCtx.NewList(pos, std::move(insertSettings)),
});

TCreateTableAsResult result;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1840,7 +1840,7 @@ TStatus AnnotateKqpSinkEffect(const TExprNode::TPtr& node, TExprContext& ctx) {
}

TStatus AnnotateTableSinkSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureMinMaxArgsCount(*input, 2, 3, ctx)) {
if (!EnsureMinMaxArgsCount(*input, 2, 4, ctx)) {
return TStatus::Error;
}
input->SetTypeAnn(ctx.MakeType<TVoidExprType>());
Expand Down
13 changes: 10 additions & 3 deletions ydb/core/kqp/opt/kqp_opt_effects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ bool IsMapWrite(const TKikimrTableDescription& table, TExprBase input, TExprCont
#undef DBG
}

TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const TCoAtomList& columns, TExprContext& ctx) {
TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const TCoAtomList& columns,
const TKqpUpsertRowsSettings& settings, TExprContext& ctx) {
Y_DEBUG_ABORT_UNLESS(IsDqPureExpr(expr));

return Build<TDqStage>(ctx, expr.Pos())
Expand All @@ -239,6 +240,9 @@ TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const
.Settings<TKqpTableSinkSettings>()
.Table(table)
.Columns(columns)
.InconsistentWrite(settings.AllowInconsistentWrites
? ctx.NewAtom(expr.Pos(), "true")
: ctx.NewAtom(expr.Pos(), "false"))
.Settings()
.Build()
.Build()
Expand Down Expand Up @@ -292,7 +296,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
}
if (IsDqPureExpr(node.Input())) {
if (sinkEffect) {
stageInput = RebuildPureStageWithSink(node.Input(), node.Table(), node.Columns(), ctx);
stageInput = RebuildPureStageWithSink(node.Input(), node.Table(), node.Columns(), settings, ctx);
effect = Build<TKqpSinkEffect>(ctx, node.Pos())
.Stage(stageInput.Cast().Ptr())
.SinkIndex().Build("0")
Expand Down Expand Up @@ -330,6 +334,9 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
.Settings<TKqpTableSinkSettings>()
.Table(node.Table())
.Columns(node.Columns())
.InconsistentWrite(settings.AllowInconsistentWrites
? ctx.NewAtom(node.Pos(), "true")
: ctx.NewAtom(node.Pos(), "false"))
.Settings()
.Build()
.Build()
Expand All @@ -339,7 +346,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
.Name("row")
.Done();

if (table.Metadata->Kind == EKikimrTableKind::Olap) {
if (table.Metadata->Kind == EKikimrTableKind::Olap || settings.AllowInconsistentWrites) {
// OLAP is expected to write into all shards (hash partitioning),
// so we use serveral sinks for this without union all.
// (TODO: shuffle by shard instead of DqCnMap)
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/kqp/opt/kqp_opt_kql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputC
YQL_ENSURE(generateColumnsIfInsertNode);
TCoAtomList generateColumnsIfInsert = TCoNameValueTuple(generateColumnsIfInsertNode).Value().Cast<TCoAtomList>();

auto settings = FilterSettings(write.Settings().Ref(), {"AllowInconsistentWrites"}, ctx);
const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx);
if (generateColumnsIfInsert.Ref().ChildrenSize() > 0) {
return Build<TKqlInsertOnConflictUpdateRows>(ctx, write.Pos())
Expand All @@ -243,6 +244,7 @@ TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputC
.Input(input.Ptr())
.Columns(columns.Ptr())
.ReturningColumns(write.ReturningColumns())
.Settings(settings)
.Done();

return effect;
Expand All @@ -252,6 +254,7 @@ TExprBase BuildUpsertTableWithIndex(const TKiWriteTable& write, const TCoAtomLis
const TCoAtomList& autoincrement,
const TKikimrTableDescription& table, TExprContext& ctx)
{
auto settings = FilterSettings(write.Settings().Ref(), {"AllowInconsistentWrites"}, ctx);
const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx);
auto generateColumnsIfInsertNode = GetSetting(write.Settings().Ref(), "generate_columns_if_insert");
YQL_ENSURE(generateColumnsIfInsertNode);
Expand All @@ -263,6 +266,7 @@ TExprBase BuildUpsertTableWithIndex(const TKiWriteTable& write, const TCoAtomLis
.Columns(columns.Ptr())
.ReturningColumns(write.ReturningColumns())
.GenerateColumnsIfInsert(generateColumnsIfInsert)
.Settings(settings)
.Done();

return effect;
Expand All @@ -272,12 +276,14 @@ TExprBase BuildReplaceTable(const TKiWriteTable& write, const TCoAtomList& input
const TCoAtomList& autoincrement,
const TKikimrTableDescription& table, TExprContext& ctx)
{
auto settings = FilterSettings(write.Settings().Ref(), {"AllowInconsistentWrites"}, ctx);
const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx);
auto effect = Build<TKqlUpsertRows>(ctx, write.Pos())
.Table(BuildTableMeta(table, write.Pos(), ctx))
.Input(input.Ptr())
.Columns(columns)
.ReturningColumns(write.ReturningColumns())
.Settings(settings)
.Done();

return effect;
Expand All @@ -287,13 +293,15 @@ TExprBase BuildReplaceTableWithIndex(const TKiWriteTable& write, const TCoAtomLi
const TCoAtomList& autoincrement,
const TKikimrTableDescription& table, TExprContext& ctx)
{
auto settings = FilterSettings(write.Settings().Ref(), {"AllowInconsistentWrites"}, ctx);
const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx);
auto effect = Build<TKqlUpsertRowsIndex>(ctx, write.Pos())
.Table(BuildTableMeta(table, write.Pos(), ctx))
.Input(input.Ptr())
.Columns(columns.Ptr())
.ReturningColumns(write.ReturningColumns())
.GenerateColumnsIfInsert<TCoAtomList>().Build()
.Settings(settings)
.Done();

return effect;
Expand Down
24 changes: 16 additions & 8 deletions ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,26 +615,34 @@ class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer
generateColumnsIfInsert.push_back(ctx.NewAtom(node.Pos(), generatedColumn));
}

node.Ptr()->ChildRef(TKiWriteTable::idx_Settings) = Build<TCoNameValueTupleList>(ctx, node.Pos())
.Add(node.Settings())
.Add()
TVector<TCoNameValueTuple> settings;
for (const auto& setting : node.Settings()) {
settings.push_back(setting);
}
settings.push_back(
Build<TCoNameValueTuple>(ctx, node.Pos())
.Name().Build("input_columns")
.Value<TCoAtomList>()
.Add(columns)
.Build()
.Build()
.Add()
.Done());
settings.push_back(
Build<TCoNameValueTuple>(ctx, node.Pos())
.Name().Build("default_constraint_columns")
.Value<TCoAtomList>()
.Add(defaultConstraintColumns)
.Build()
.Build()
.Add()
.Done());
settings.push_back(
Build<TCoNameValueTuple>(ctx, node.Pos())
.Name().Build("generate_columns_if_insert")
.Value<TCoAtomList>()
.Add(generateColumnsIfInsert)
.Build()
.Build()
.Done());

node.Ptr()->ChildRef(TKiWriteTable::idx_Settings) = Build<TCoNameValueTupleList>(ctx, node.Pos())
.Add(settings)
.Done()
.Ptr();

Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
for (const auto& columnName : tableMeta->KeyColumnNames) {
const auto columnMeta = tableMeta->Columns.FindPtr(columnName);
YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + columnName + "\"");

auto keyColumnProto = settingsProto.AddKeyColumns();
keyColumnProto->SetId(columnMeta->Id);
keyColumnProto->SetName(columnName);
Expand Down Expand Up @@ -1062,6 +1063,10 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
}
}

if (const auto inconsistentWrite = settings.InconsistentWrite().Cast(); inconsistentWrite.StringValue() == "true") {
settingsProto.SetInconsistentTx(true);
}

internalSinkProto.MutableSettings()->PackFrom(settingsProto);
} else {
YQL_ENSURE(false, "Unsupported sink type");
Expand Down
Loading

0 comments on commit d552190

Please sign in to comment.