Skip to content

Commit

Permalink
Fix sink index column order (#9190)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Sep 13, 2024
1 parent 7b31943 commit acee834
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 19 deletions.
4 changes: 4 additions & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
ydb/core/kqp/ut/service KqpQueryService.TableSink_OlapRWQueries
ydb/core/kqp/ut/service KqpQueryService.TableSink_Htap
ydb/core/kqp/ut/tx KqpSnapshotRead.ReadOnlyTxWithIndexCommitsOnConcurrentWrite+withSink
ydb/core/kqp/ut/tx KqpSinkMvcc.OltpNamedStatement
ydb/core/kqp/ut/tx KqpSinkMvcc.OlapNamedStatement
ydb/core/kqp/ut/tx KqpSinkMvcc.OltpMultiSinks
ydb/core/kqp/ut/tx KqpSinkMvcc.OlapMultiSinks
ydb/core/persqueue/ut [*/*]*
ydb/core/persqueue/ut TPQTest.*DirectRead*
ydb/core/persqueue/ut/ut_with_sdk [*/*]*
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_delete_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,18 @@ TExprBase KqpBuildDeleteIndexStages(TExprBase node, TExprContext& ctx, const TKq
effects.emplace_back(tableDelete);

for (const auto& [tableNode, indexDesc] : indexes) {
THashSet<TStringBuf> indexTableColumns;
THashSet<TStringBuf> indexTableColumnsSet;
TVector<TStringBuf> indexTableColumns;

for (const auto& column : indexDesc->KeyColumns) {
YQL_ENSURE(indexTableColumns.emplace(column).second);
YQL_ENSURE(indexTableColumnsSet.emplace(column).second);
indexTableColumns.emplace_back(column);
}

for (const auto& column : pk) {
indexTableColumns.insert(column);
if (indexTableColumnsSet.insert(column).second) {
indexTableColumns.emplace_back(column);
}
}

auto deleteIndexKeys = MakeRowsFromDict(lookupDict.Cast(), pk, indexTableColumns, del.Pos(), ctx);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ NYql::NNodes::TCoLambda MakeRowsPayloadSelector(const NYql::NNodes::TCoAtomList&
const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx);

NYql::NNodes::TExprBase MakeRowsFromDict(const NYql::NNodes::TDqPhyPrecompute& dict, const TVector<TString>& dictKeys,
const THashSet<TStringBuf>& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
const TVector<TStringBuf>& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx);

// Same as MakeRowsFromDict but skip rows which marked as non changed (true in second tuple)
NYql::NNodes::TExprBase MakeRowsFromTupleDict(const NYql::NNodes::TDqPhyPrecompute& dict, const TVector<TString>& dictKeys,
const THashSet<TStringBuf>& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
const TVector<TStringBuf>& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx);

NYql::NNodes::TMaybeNode<NYql::NNodes::TDqCnUnionAll> MakeConditionalInsertRows(const NYql::NNodes::TExprBase& input,
const NYql::TKikimrTableDescription& table, const TMaybe<THashSet<TStringBuf>>& inputColumn, bool abortOnError,
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ TMaybeNode<TDqPhyPrecompute> PrecomputeTableLookupDict(const TDqPhyPrecompute& l
}

TExprBase MakeRowsFromDict(const TDqPhyPrecompute& dict, const TVector<TString>& dictKeys,
const THashSet<TStringBuf>& columns, TPositionHandle pos, TExprContext& ctx)
const TVector<TStringBuf>& columns, TPositionHandle pos, TExprContext& ctx)
{
THashSet<TString> dictKeysSet(dictKeys.begin(), dictKeys.end());
auto dictTupleArg = TCoArgument(ctx.NewArgument(pos, "dict_tuple"));
Expand Down Expand Up @@ -296,7 +296,7 @@ TExprBase MakeRowsFromDict(const TDqPhyPrecompute& dict, const TVector<TString>&
}

TExprBase MakeRowsFromTupleDict(const TDqPhyPrecompute& dict, const TVector<TString>& dictKeys,
const THashSet<TStringBuf>& columns, TPositionHandle pos, TExprContext& ctx)
const TVector<TStringBuf>& columns, TPositionHandle pos, TExprContext& ctx)
{
THashSet<TString> dictKeysSet(dictKeys.begin(), dictKeys.end());
auto dictTupleArg = TCoArgument(ctx.NewArgument(pos, "dict_tuple"));
Expand Down
15 changes: 10 additions & 5 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ using namespace NYql::NNodes;
namespace {

TExprBase MakeInsertIndexRows(const TDqPhyPrecompute& inputRows, const TKikimrTableDescription& table,
const THashSet<TStringBuf>& inputColumns, const THashSet<TStringBuf>& indexColumns,
const THashSet<TStringBuf>& inputColumns, const TVector<TStringBuf>& indexColumns,
TPositionHandle pos, TExprContext& ctx)
{
auto inputRowArg = TCoArgument(ctx.NewArgument(pos, "input_row"));
Expand Down Expand Up @@ -113,19 +113,24 @@ TExprBase KqpBuildInsertIndexStages(TExprBase node, TExprContext& ctx, const TKq
effects.emplace_back(upsertTable);

for (const auto& [tableNode, indexDesc] : indexes) {
THashSet<TStringBuf> indexTableColumns;
THashSet<TStringBuf> indexTableColumnsSet;
TVector<TStringBuf> indexTableColumns;

for (const auto& column : indexDesc->KeyColumns) {
YQL_ENSURE(indexTableColumns.emplace(column).second);
YQL_ENSURE(indexTableColumnsSet.emplace(column).second);
indexTableColumns.emplace_back(column);
}

for (const auto& column : table.Metadata->KeyColumnNames) {
indexTableColumns.insert(column);
if (indexTableColumnsSet.insert(column).second) {
indexTableColumns.emplace_back(column);
}
}

for (const auto& column : indexDesc->DataColumns) {
if (inputColumnsSet.contains(column)) {
YQL_ENSURE(indexTableColumns.emplace(column).second);
YQL_ENSURE(indexTableColumnsSet.emplace(column).second);
indexTableColumns.emplace_back(column);
}
}

Expand Down
15 changes: 10 additions & 5 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ TExprBase MakeNonexistingRowsFilter(const TDqPhyPrecompute& inputRows, const TDq

TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecompute& inputRows,
const TDqPhyPrecompute& lookupDict, const THashSet<TStringBuf>& inputColumns,
const THashSet<TStringBuf>& indexColumns, const TKikimrTableDescription& table, TPositionHandle pos,
const TVector<TStringBuf>& indexColumns, const TKikimrTableDescription& table, TPositionHandle pos,
TExprContext& ctx, bool opt)
{
// Check if we can update index table from just input data
Expand Down Expand Up @@ -686,9 +686,11 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,

for (const auto& [tableNode, indexDesc] : indexes) {
bool indexKeyColumnsUpdated = false;
THashSet<TStringBuf> indexTableColumns;
THashSet<TStringBuf> indexTableColumnsSet;
TVector<TStringBuf> indexTableColumns;
for (const auto& column : indexDesc->KeyColumns) {
YQL_ENSURE(indexTableColumns.emplace(column).second);
YQL_ENSURE(indexTableColumnsSet.emplace(column).second);
indexTableColumns.emplace_back(column);

if (mode == TKqpPhyUpsertIndexMode::UpdateOn && table.GetKeyColumnIndex(column)) {
// Table PK cannot be updated, so don't consider PK columns update as index update
Expand All @@ -701,7 +703,9 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
}

for (const auto& column : pk) {
indexTableColumns.insert(column);
if (indexTableColumnsSet.insert(column).second) {
indexTableColumns.emplace_back(column);
}
}

auto indexTableColumnsWithoutData = indexTableColumns;
Expand All @@ -710,7 +714,8 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
bool optUpsert = true;
for (const auto& column : indexDesc->DataColumns) {
// TODO: Conder not fetching/updating data columns without input value.
YQL_ENSURE(indexTableColumns.emplace(column).second);
YQL_ENSURE(indexTableColumnsSet.emplace(column).second);
indexTableColumns.emplace_back(column);

if (inputColumnsSet.contains(column)) {
indexDataColumnsUpdated = true;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ TCoAtomList BuildColumnsList(const THashSet<TStringBuf>& columns, TPositionHandl
return BuildColumnsListImpl(columns, pos, ctx);
}

TCoAtomList BuildColumnsList(const TVector<TStringBuf>& columns, NYql::TPositionHandle pos,NYql::TExprContext& ctx) {
return BuildColumnsListImpl(columns, pos, ctx);
}

TCoAtomList BuildColumnsList(const TVector<TString>& columns, TPositionHandle pos, TExprContext& ctx) {
return BuildColumnsListImpl(columns, pos, ctx);
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ NYql::NNodes::TMaybeNode<NYql::NNodes::TDqPhyPrecompute> BuildLookupKeysPrecompu
NYql::NNodes::TCoAtomList BuildColumnsList(const THashSet<TStringBuf>& columns, NYql::TPositionHandle pos,
NYql::TExprContext& ctx);

NYql::NNodes::TCoAtomList BuildColumnsList(const TVector<TStringBuf>& columns, NYql::TPositionHandle pos,
NYql::TExprContext& ctx);

NYql::NNodes::TCoAtomList BuildColumnsList(const TVector<TString>& columns, NYql::TPositionHandle pos,
NYql::TExprContext& ctx);

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/ut/tx/kqp_sink_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ class TTableDataModificationTester {
std::unique_ptr<TKikimrRunner> Kikimr;
YDB_ACCESSOR(bool, IsOlap, false);
YDB_ACCESSOR(bool, FastSnapshotExpiration, false);
YDB_ACCESSOR(bool, DisableSinks, false);

virtual void DoExecute() = 0;
public:
void Execute() {
AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(!DisableSinks);
AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(!DisableSinks);
AppConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true);
auto settings = TKikimrSettings().SetAppConfig(AppConfig).SetWithSampleTables(false);
if (FastSnapshotExpiration) {
Expand Down
89 changes: 89 additions & 0 deletions ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,95 @@ Y_UNIT_TEST_SUITE(KqpSinkMvcc) {
tester.SetIsOlap(true);
tester.Execute();
}

class TNamedStatement : public TTableDataModificationTester {
protected:
void DoExecute() override {
auto client = Kikimr->GetQueryClient();

auto session1 = client.GetSession().GetValueSync().GetSession();

{
auto result = session1.ExecuteQuery(Q_(R"(
$data = SELECT * FROM `/Root/KV`;
DELETE FROM `/Root/KV` WHERE 1=1;
SELECT COUNT(*) FROM `/Root/KV`;
SELECT COUNT(*) FROM $data;
DELETE FROM `/Root/KV` ON SELECT 424242u AS Key, "One" As Value;
UPSERT INTO `/Root/KV` (Key, Value) VALUES (424242u, "One");
SELECT COUNT(*) FROM `/Root/KV`;
SELECT COUNT(*) FROM $data;
)"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([[0u]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[0u]])", FormatResultSetYson(result.GetResultSet(1)));
CompareYson(R"([[1u]])", FormatResultSetYson(result.GetResultSet(2)));
CompareYson(R"([[1u]])", FormatResultSetYson(result.GetResultSet(3)));
}
}
};

Y_UNIT_TEST(OltpNamedStatementNoSink) {
TNamedStatement tester;
tester.SetDisableSinks(true);
tester.SetIsOlap(false);
tester.Execute();
}

Y_UNIT_TEST(OltpNamedStatement) {
TNamedStatement tester;
tester.SetIsOlap(false);
tester.Execute();
}

Y_UNIT_TEST(OlapNamedStatement) {
TNamedStatement tester;
tester.SetIsOlap(true);
tester.Execute();
}

class TMultiSinks: public TTableDataModificationTester {
protected:
void DoExecute() override {
auto client = Kikimr->GetQueryClient();

auto session1 = client.GetSession().GetValueSync().GetSession();

{
auto result = session1.ExecuteQuery(Q_(R"(
UPSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "1");
UPSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "2");
)"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
{
auto result = session1.ExecuteQuery(Q_(R"(
SELECT Value FROM `/Root/KV` WHERE Key = 1u;
)"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([[["2"]]])", FormatResultSetYson(result.GetResultSet(0)));
}
}
};

Y_UNIT_TEST(OltpMultiSinksNoSinks) {
TMultiSinks tester;
tester.SetDisableSinks(true);
tester.SetIsOlap(false);
tester.Execute();
}

Y_UNIT_TEST(OltpMultiSinks) {
TMultiSinks tester;
tester.SetIsOlap(false);
tester.Execute();
}

Y_UNIT_TEST(OlapMultiSinks) {
TMultiSinks tester;
tester.SetIsOlap(true);
tester.Execute();
}
}

} // namespace NKqp
Expand Down

0 comments on commit acee834

Please sign in to comment.