Skip to content

Commit

Permalink
24-3: Datashard + Columnshard Reads (#6858)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Aug 8, 2024
1 parent 68915a1 commit 72e974f
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 26 deletions.
2 changes: 0 additions & 2 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
ydb/core/keyvalue/ut_trace TKeyValueTracingTest.*
ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
ydb/core/kqp/ut/olap KqpOlap.ScanQueryOltpAndOlap
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
ydb/core/kqp/ut/olap KqpOlap.YqlScriptOltpAndOlap
ydb/core/kqp/ut/olap KqpOlapAggregations.Aggregation_ResultCountAll_FilterL
ydb/core/kqp/ut/olap KqpOlapWrite.WriteDeleteCleanGC
ydb/core/kqp/ut/pg KqpPg.CreateIndex
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
for (const auto &input : stage.GetInputs()) {
hasStreamLookup |= input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup;
}

for (const auto &tableOp : stage.GetTableOps()) {
if (tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
// always need snapshot for OLAP reads
return true;
}
}
}
}

Expand Down
4 changes: 0 additions & 4 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,6 @@ class TKqpQueryState : public TNonCopyable {

bool NeedPersistentSnapshot() const {
auto type = GetType();
if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY ||
type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) {
return ::NKikimr::NKqp::HasOlapTableReadInTx(PreparedQuery->GetPhysicalQuery());
}
return (
type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
type == NKikimrKqp::QUERY_TYPE_AST_SCAN
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -844,9 +844,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery);
HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
if (HasOlapTable && HasOltpTable) {
HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
if (HasOlapTable && HasOltpTable && HasTableWrite) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"Transactions between column and row tables are disabled at current time.");
"Write transactions between column and row tables are disabled at current time.");
return false;
}
QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
Expand Down Expand Up @@ -2551,6 +2552,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

bool HasOlapTable = false;
bool HasOltpTable = false;
bool HasTableWrite = false;

TGUCSettings::TPtr GUCSettings;
};
Expand Down
113 changes: 95 additions & 18 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2790,7 +2790,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}

Expand All @@ -2803,20 +2803,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}

{
// column & row read
const TString sql = R"(
SELECT * FROM `/Root/DataShard`;
SELECT * FROM `/Root/ColumnShard`;
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}

Expand All @@ -2831,7 +2818,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}

Expand All @@ -2845,7 +2832,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}

Expand All @@ -2859,7 +2846,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}
}
Expand Down Expand Up @@ -3541,6 +3528,96 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}

Y_UNIT_TEST(ReadDatashardAndColumnshard) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false);

TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();

auto client = kikimr.GetQueryClient();

{
auto createTable = client.ExecuteQuery(R"sql(
CREATE TABLE `/Root/DataShard` (
Col1 Uint64 NOT NULL,
Col2 Int32,
Col3 String,
PRIMARY KEY (Col1)
) WITH (
STORE = ROW,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
);
CREATE TABLE `/Root/ColumnShard` (
Col1 Uint64 NOT NULL,
Col2 Int32,
Col3 String,
PRIMARY KEY (Col1)
) WITH (
STORE = COLUMN,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
);
)sql", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(createTable.IsSuccess(), createTable.GetIssues().ToString());
}

{
auto replaceValues = client.ExecuteQuery(R"sql(
REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
(1u, 1, "row");
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
}

{
auto replaceValues = client.ExecuteQuery(R"sql(
REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
(2u, 2, "column");
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
}

{
auto it = client.StreamExecuteQuery(R"sql(
SELECT * FROM `/Root/ColumnShard`;
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
TString output = StreamResultToYson(it);
CompareYson(
output,
R"([[2u;[2];["column"]]])");
}

{
auto it = client.StreamExecuteQuery(R"sql(
SELECT * FROM `/Root/DataShard`
UNION ALL
SELECT * FROM `/Root/ColumnShard`;
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
TString output = StreamResultToYson(it);
CompareYson(
output,
R"([[1u;[1];["row"]];[2u;[2];["column"]]])");
}

{
auto it = client.StreamExecuteQuery(R"sql(
SELECT r.Col3, c.Col3 FROM `/Root/DataShard` AS r
JOIN `/Root/ColumnShard` AS c ON r.Col1 + 1 = c.Col1;
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
TString output = StreamResultToYson(it);
CompareYson(
output,
R"([[["row"];["column"]]])");
}
}

Y_UNIT_TEST(ReplaceIntoWithDefaultValue) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(false);
Expand Down

0 comments on commit 72e974f

Please sign in to comment.