Skip to content

Commit

Permalink
ALTER INDEX to change partitioning settings of indexImplTable (#4354)
Browse files Browse the repository at this point in the history
  • Loading branch information
jepett0 authored May 23, 2024
1 parent 6ea02c5 commit f702cec
Show file tree
Hide file tree
Showing 15 changed files with 608 additions and 396 deletions.
203 changes: 138 additions & 65 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <ydb/library/yql/minikql/mkql_program_builder.h>

#include <ydb/core/kqp/provider/yql_kikimr_results.h>
#include <ydb/core/kqp/gateway/utils/scheme_helpers.h>

namespace NYql {
namespace {
Expand Down Expand Up @@ -505,6 +506,96 @@ namespace {
}
}
}

bool IsPartitioningSetting(TStringBuf name) {
return name == "autoPartitioningBySize"
|| name == "partitionSizeMb"
|| name == "autoPartitioningByLoad"
|| name == "minPartitions"
|| name == "maxPartitions";
}

[[nodiscard]] bool ParsePartitioningSettings(
Ydb::Table::PartitioningSettings& partitioningSettings,
const TCoNameValueTuple& setting,
TExprContext& ctx
) {
auto name = setting.Name().Value();
if (name == "autoPartitioningBySize") {
auto val = to_lower(setting.Value().Cast<TCoAtom>().StringValue());
if (val == "enabled") {
partitioningSettings.set_partitioning_by_size(Ydb::FeatureFlag::ENABLED);
} else if (val == "disabled") {
partitioningSettings.set_partitioning_by_size(Ydb::FeatureFlag::DISABLED);
} else {
ctx.AddError(
TIssue(ctx.GetPosition(setting.Value().Cast<TCoAtom>().Pos()),
TStringBuilder() << "Unknown feature flag '" << val << "' for auto partitioning by size"
)
);
return false;
}
} else if (name == "partitionSizeMb") {
ui64 value = FromString<ui64>(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
);
if (value) {
partitioningSettings.set_partition_size_mb(value);
} else {
ctx.AddError(
TIssue(ctx.GetPosition(setting.Name().Pos()),
"Can't set preferred partition size to 0. "
"To disable auto partitioning by size use 'SET AUTO_PARTITIONING_BY_SIZE DISABLED'"
)
);
return false;
}
} else if (name == "autoPartitioningByLoad") {
auto val = to_lower(setting.Value().Cast<TCoAtom>().StringValue());
if (val == "enabled") {
partitioningSettings.set_partitioning_by_load(Ydb::FeatureFlag::ENABLED);
} else if (val == "disabled") {
partitioningSettings.set_partitioning_by_load(Ydb::FeatureFlag::DISABLED);
} else {
ctx.AddError(
TIssue(ctx.GetPosition(setting.Value().Cast<TCoAtom>().Pos()),
TStringBuilder() << "Unknown feature flag '" << val << "' for auto partitioning by load"
)
);
return false;
}
} else if (name == "minPartitions") {
ui64 value = FromString<ui64>(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
);
if (value) {
partitioningSettings.set_min_partitions_count(value);
} else {
ctx.AddError(
TIssue(ctx.GetPosition(setting.Name().Pos()),
"Can't set min partition count to 0"
)
);
return false;
}
} else if (name == "maxPartitions") {
ui64 value = FromString<ui64>(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
);
if (value) {
partitioningSettings.set_max_partitions_count(value);
} else {
ctx.AddError(
TIssue(ctx.GetPosition(setting.Name().Pos()),
"Can't set max partition count to 0"
)
);
return false;
}
}

return true;
}
}

class TKiSinkPlanInfoTransformer : public TGraphTransformerBase {
Expand Down Expand Up @@ -1290,71 +1381,10 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
alterTableRequest.set_set_compaction_policy(TString(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
));
} else if (name == "autoPartitioningBySize") {
auto partitioningSettings = alterTableRequest.mutable_alter_partitioning_settings();
auto val = to_lower(TString(setting.Value().Cast<TCoAtom>().Value()));
if (val == "enabled") {
partitioningSettings->set_partitioning_by_size(Ydb::FeatureFlag::ENABLED);
} else if (val == "disabled") {
partitioningSettings->set_partitioning_by_size(Ydb::FeatureFlag::DISABLED);
} else {
auto errText = TStringBuilder() << "Unknown feature flag '"
<< val
<< "' for auto partitioning by size";
ctx.AddError(TIssue(ctx.GetPosition(setting.Value().Cast<TCoAtom>().Pos()),
errText));
return SyncError();
}
} else if (name == "partitionSizeMb") {
ui64 value = FromString<ui64>(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
);
if (value) {
auto partitioningSettings = alterTableRequest.mutable_alter_partitioning_settings();
partitioningSettings->set_partition_size_mb(value);
} else {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
"Can't set preferred partition size to 0. "
"To disable auto partitioning by size use 'SET AUTO_PARTITIONING_BY_SIZE DISABLED'"));
return SyncError();
}
} else if (name == "autoPartitioningByLoad") {
auto partitioningSettings = alterTableRequest.mutable_alter_partitioning_settings();
TString val = to_lower(TString(setting.Value().Cast<TCoAtom>().Value()));
if (val == "enabled") {
partitioningSettings->set_partitioning_by_load(Ydb::FeatureFlag::ENABLED);
} else if (val == "disabled") {
partitioningSettings->set_partitioning_by_load(Ydb::FeatureFlag::DISABLED);
} else {
auto errText = TStringBuilder() << "Unknown feature flag '"
<< val
<< "' for auto partitioning by load";
ctx.AddError(TIssue(ctx.GetPosition(setting.Value().Cast<TCoAtom>().Pos()),
errText));
return SyncError();
}
} else if (name == "minPartitions") {
ui64 value = FromString<ui64>(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
);
if (value) {
auto partitioningSettings = alterTableRequest.mutable_alter_partitioning_settings();
partitioningSettings->set_min_partitions_count(value);
} else {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
"Can't set min partition count to 0"));
return SyncError();
}
} else if (name == "maxPartitions") {
ui64 value = FromString<ui64>(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
);
if (value) {
auto partitioningSettings = alterTableRequest.mutable_alter_partitioning_settings();
partitioningSettings->set_max_partitions_count(value);
} else {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
"Can't set max partition count to 0"));
} else if (IsPartitioningSetting(name)) {
if (!ParsePartitioningSettings(
*alterTableRequest.mutable_alter_partitioning_settings(), setting, ctx
)) {
return SyncError();
}
} else if (name == "keyBloomFilter") {
Expand Down Expand Up @@ -1476,6 +1506,49 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
default:
YQL_ENSURE(false, "Unknown index type: " << (ui32)add_index->type_case());
}
} else if (name == "alterIndex") {
if (maybeAlter.Cast().Actions().Size() > 1) {
ctx.AddError(
TIssue(ctx.GetPosition(action.Name().Pos()),
"ALTER INDEX action cannot be combined with any other ALTER TABLE action"
)
);
return SyncError();
}
auto listNode = action.Value().Cast<TCoNameValueTupleList>();
for (const auto& indexSetting : listNode) {
auto settingName = indexSetting.Name().Value();
if (settingName == "indexName") {
auto indexName = indexSetting.Value().Cast<TCoAtom>().StringValue();
auto indexTablePath = NKikimr::NKqp::NSchemeHelpers::CreateIndexTablePath(table.Metadata->Name, indexName);
alterTableRequest.set_path(std::move(indexTablePath));
} else if (settingName == "tableSettings") {
auto tableSettings = indexSetting.Value().Cast<TCoNameValueTupleList>();
for (const auto& tableSetting : tableSettings) {
if (IsPartitioningSetting(tableSetting.Name().Value())) {
if (!ParsePartitioningSettings(
*alterTableRequest.mutable_alter_partitioning_settings(), tableSetting, ctx
)) {
return SyncError();
}
} else {
ctx.AddError(
TIssue(ctx.GetPosition(tableSetting.Name().Pos()),
TStringBuilder() << "Unknown index table setting: " << name
)
);
return SyncError();
}
}
} else {
ctx.AddError(
TIssue(ctx.GetPosition(indexSetting.Name().Pos()),
TStringBuilder() << "Unknown alter index setting: " << settingName
)
);
return SyncError();
}
}
} else if (name == "dropIndex") {
auto nameNode = action.Value().Cast<TCoAtom>();
alterTableRequest.add_drop_indexes(TString(nameNode.Value()));
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,8 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
&& name != "setTableSettings"
&& name != "addChangefeed"
&& name != "dropChangefeed"
&& name != "renameIndexTo")
&& name != "renameIndexTo"
&& name != "alterIndex")
{
ctx.AddError(TIssue(ctx.GetPosition(action.Name().Pos()),
TStringBuilder() << "Unknown alter table action: " << name));
Expand Down
73 changes: 11 additions & 62 deletions ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1605,29 +1605,7 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) {

FillTable(session);

kikimr.GetTestServer().GetRuntime()->GetAppData().AdministrationAllowedSIDs.push_back("root@builtin");

{ // without token request is forbidded
Tests::TClient& client = kikimr.GetTestClient();
const TString scheme = R"(
Name: "indexImplTable"
PartitionConfig {
PartitioningPolicy {
MinPartitionsCount: 1
SizeToSplit: 100500
FastSplitSettings {
SizeThreshold: 100500
RowCountThreshold: 100500
}
}
}
)";
auto result = client.AlterTable("/Root/MultiShardIndexed/index", scheme, "user@builtin");
UNIT_ASSERT_VALUES_EQUAL_C(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_ERROR, "User must not be able to alter index impl table");
UNIT_ASSERT_VALUES_EQUAL(result->Record.GetErrorReason(), "Administrative access denied");
}

{ // with root token request is accepted
{ // regular users should be able to alter indexImplTable's PartitionConfig
Tests::TClient& client = kikimr.GetTestClient();
const TString scheme = R"(
Name: "indexImplTable"
Expand All @@ -1642,53 +1620,24 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) {
}
}
)";
auto result = client.AlterTable("/Root/MultiShardIndexed/index", scheme, "root@builtin");
UNIT_ASSERT_VALUES_EQUAL_C(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK, "Super user must be able to alter partition config");
auto result = client.AlterTable("/Root/MultiShardIndexed/index", scheme, {});
UNIT_ASSERT_VALUES_EQUAL_C(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK,
result->Record.ShortDebugString()
);
}

{ // after alter yql works fine
const TString query(R"(
{ // yql works fine after alter
const TString query = R"(
SELECT * FROM `/Root/MultiShardIndexed` VIEW index ORDER BY fk DESC LIMIT 1;
)");
)";

auto result = session.ExecuteDataQuery(
query,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())
.ExtractValueSync();
query,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()
).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(NYdb::FormatResultSetYson(result.GetResultSet(0)), "[[[4294967295u];[4u];[\"v4\"]]]");
}

FillTable(session);

{ // just for sure, public api got error when alter index
auto settings = NYdb::NTable::TAlterTableSettings()
.BeginAlterPartitioningSettings()
.SetPartitionSizeMb(50)
.SetMinPartitionsCount(4)
.SetMaxPartitionsCount(5)
.EndAlterPartitioningSettings();

auto result = session.AlterTable("/Root/MultiShardIndexed/index/indexImplTable", settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
}

{ // however public api is able to perform alter index if user has AlterSchema right and user is a member of the list AdministrationAllowedSIDs
auto clSettings = NYdb::NTable::TClientSettings().AuthToken("root@builtin").UseQueryCache(false);
auto client = NYdb::NTable::TTableClient(kikimr.GetDriver(), clSettings);
auto session = client.CreateSession().GetValueSync().GetSession();

auto settings = NYdb::NTable::TAlterTableSettings()
.BeginAlterPartitioningSettings()
.SetPartitionSizeMb(50)
.SetMinPartitionsCount(4)
.SetMaxPartitionsCount(5)
.EndAlterPartitioningSettings();

auto result = session.AlterTable("/Root/MultiShardIndexed/index/indexImplTable", settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

}

Y_UNIT_TEST_TWIN(DataColumnUpsertMixedSemantic, StreamLookup) {
Expand Down
Loading

0 comments on commit f702cec

Please sign in to comment.