Skip to content

Commit

Permalink
Support default from sequence (#4826)
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd authored May 27, 2024
1 parent d699d50 commit cb83f09
Show file tree
Hide file tree
Showing 10 changed files with 363 additions and 44 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/common/kqp_resolve.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct TTableConstInfo : public TAtomicRefCount<TTableConstInfo> {
Columns.emplace(phyColumn.GetId().GetName(), std::move(column));
if (!phyColumn.GetDefaultFromSequence().empty()) {
TString seq = phyColumn.GetDefaultFromSequence();
if (!seq.StartsWith(Path)) {
if (!seq.StartsWith("/")) {
seq = Path + "/" + seq;
}

Expand Down
34 changes: 26 additions & 8 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1318,16 +1318,34 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
auto columnTuple = item.Cast<TExprList>();
auto columnName = columnTuple.Item(0).Cast<TCoAtom>();
alter_columns->set_name(TString(columnName));

auto families = columnTuple.Item(1).Cast<TCoAtomList>();
if (families.Size() > 1) {
ctx.AddError(TIssue(ctx.GetPosition(families.Pos()),
"Unsupported number of families"));
auto alterColumnList = columnTuple.Item(1).Cast<TExprList>();
auto alterColumnAction = TString(alterColumnList.Item(0).Cast<TCoAtom>());
if (alterColumnAction == "setDefault") {
auto setDefault = alterColumnList.Item(1).Cast<TCoAtomList>();
auto func = TString(setDefault.Item(0).Cast<TCoAtom>());
auto arg = TString(setDefault.Item(1).Cast<TCoAtom>());
if (func != "nextval") {
ctx.AddError(TIssue(ctx.GetPosition(setDefault.Pos()),
TStringBuilder() << "Unsupported function to set default: " << func));
return SyncError();
}
auto fromSequence = alter_columns->mutable_from_sequence();
fromSequence->set_name(arg);
} else if (alterColumnAction == "setFamily") {
auto families = alterColumnList.Item(1).Cast<TCoAtomList>();
if (families.Size() > 1) {
ctx.AddError(TIssue(ctx.GetPosition(families.Pos()),
"Unsupported number of families"));
return SyncError();
}
for (auto family : families) {
alter_columns->set_family(TString(family.Value()));
}
} else {
ctx.AddError(TIssue(ctx.GetPosition(alterColumnList.Pos()),
TStringBuilder() << "Unsupported action to alter column"));
return SyncError();
}
for (auto family : families) {
alter_columns->set_family(TString(family.Value()));
}
}
} else if (name == "addColumnFamilies" || name == "alterColumnFamilies") {
auto listNode = action.Value().Cast<TExprList>();
Expand Down
34 changes: 28 additions & 6 deletions ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1361,12 +1361,34 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
<< " Column: \"" << name << "\" does not exist"));
return TStatus::Error;
}
auto families = columnTuple.Item(1);
if (families.Cast<TCoAtomList>().Size() > 1) {
ctx.AddError(TIssue(ctx.GetPosition(nameNode.Pos()), TStringBuilder()
<< "AlterTable : " << NCommon::FullTableName(table->Metadata->Cluster, table->Metadata->Name)
<< " Column: \"" << name
<< "\". Several column families for a single column are not yet supported"));
auto alterColumnList = columnTuple.Item(1).Cast<TExprList>();
auto alterColumnAction = TString(alterColumnList.Item(0).Cast<TCoAtom>());
if (alterColumnAction == "setDefault") {
auto setDefault = alterColumnList.Item(1).Cast<TCoAtomList>();
auto func = TString(setDefault.Item(0).Cast<TCoAtom>());
auto arg = TString(setDefault.Item(1).Cast<TCoAtom>());
if (func != "nextval") {
ctx.AddError(TIssue(ctx.GetPosition(nameNode.Pos()),
TStringBuilder() << "Unsupported function to set default: " << func));
return TStatus::Error;
}
if (setDefault.Size() > 2) {
ctx.AddError(TIssue(ctx.GetPosition(nameNode.Pos()),
TStringBuilder() << "Function nextval has exactly one argument"));
return TStatus::Error;
}
} else if (alterColumnAction == "setFamily") {
auto families = alterColumnList.Item(1).Cast<TCoAtomList>();
if (families.Size() > 1) {
ctx.AddError(TIssue(ctx.GetPosition(nameNode.Pos()), TStringBuilder()
<< "AlterTable : " << NCommon::FullTableName(table->Metadata->Cluster, table->Metadata->Name)
<< " Column: \"" << name
<< "\". Several column families for a single column are not yet supported"));
return TStatus::Error;
}
} else {
ctx.AddError(TIssue(ctx.GetPosition(nameNode.Pos()),
TStringBuilder() << "Unsupported action to alter column"));
return TStatus::Error;
}
}
Expand Down
174 changes: 174 additions & 0 deletions ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2548,6 +2548,180 @@ Y_UNIT_TEST_SUITE(KqpPg) {
}
}

Y_UNIT_TEST(AlterColumnSetDefaultFromSequence) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);;
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetKqpSettings({setting});
TKikimrRunner kikimr(
serverSettings.SetWithSampleTables(false));
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
auto client = kikimr.GetQueryClient();

auto session = client.GetSession().GetValueSync().GetSession();

auto tableClient = kikimr.GetTableClient();
auto tableClientSession = tableClient.CreateSession().GetValueSync().GetSession();

{
auto result = session.ExecuteQuery(R"(
--!syntax_pg
CREATE TABLE Pg (
key int4 PRIMARY KEY,
value int8
);
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
INSERT INTO Pg (key, value) values (1, 1);
)");

auto result = tableClientSession.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM Pg;
)");

auto result = tableClientSession.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}

{
auto result = session.ExecuteQuery(R"(
--!syntax_pg
ALTER TABLE Pg ALTER COLUMN value SET DEFAULT nextval('seq');
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT(!result.IsSuccess());
}

{
const auto queryCreate = R"(
--!syntax_pg
CREATE SEQUENCE IF NOT EXISTS seq1
START WITH 10
INCREMENT BY 2
MINVALUE 1
CACHE 3
CYCLE;
)";

auto resultCreate = session.ExecuteQuery(queryCreate, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(resultCreate.IsSuccess(), resultCreate.GetIssues().ToString());
}

{
auto result = session.ExecuteQuery(R"(
--!syntax_pg
ALTER TABLE Pg ALTER COLUMN value SET DEFAULT nextval('seq1');
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto runtime = kikimr.GetTestServer().GetRuntime();
TActorId sender = runtime->AllocateEdgeActor();
auto describeResult = DescribeTable(&kikimr.GetTestServer(), sender, "/Root/Pg");
UNIT_ASSERT_VALUES_EQUAL(describeResult.GetStatus(), NKikimrScheme::StatusSuccess);
const auto& tableDescription = describeResult.GetPathDescription().GetTable();

for (const auto& column: tableDescription.GetColumns()) {
if (column.GetName() == "value") {
UNIT_ASSERT(column.HasDefaultFromSequence());
UNIT_ASSERT(column.GetDefaultFromSequence() == "/Root/seq1");
break;
}
}
}

{
const auto query = Q_(R"(
--!syntax_pg
INSERT INTO Pg (key) values (2), (3);
)");

auto result = tableClientSession.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM Pg;
)");

auto result = tableClientSession.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"];["2";"10"];["3";"12"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto queryCreate = R"(
--!syntax_pg
CREATE SEQUENCE IF NOT EXISTS seq2
START WITH 5
INCREMENT BY 3
MINVALUE 1
CACHE 3
CYCLE;
)";

auto resultCreate = session.ExecuteQuery(queryCreate, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(resultCreate.IsSuccess(), resultCreate.GetIssues().ToString());
}

{
auto result = session.ExecuteQuery(R"(
--!syntax_pg
ALTER TABLE Pg ALTER COLUMN value SET DEFAULT nextval('seq2');
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
INSERT INTO Pg (key) values (4), (5);
)");

auto result = tableClientSession.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM Pg;
)");

auto result = tableClientSession.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"];["2";"10"];["3";"12"];["4";"5"];["5";"8"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}
}

Y_UNIT_TEST(TempTablesSessionsIsolation) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
Expand Down
47 changes: 21 additions & 26 deletions ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,36 +535,31 @@ class TAlterTable: public TSubOperation {

THashSet<TString> localSequences;

std::optional<TString> defaultFromSequence;
for (const auto& column: alter.GetColumns()) {
if (column.HasDefaultFromSequence()) {
defaultFromSequence = column.GetDefaultFromSequence();
}
}

if (defaultFromSequence.has_value()) {
Y_ABORT_UNLESS(alter.GetColumns().size() == 1);

const auto sequencePath = TPath::Resolve(*defaultFromSequence, context.SS);
{
const auto checks = sequencePath.Check();
checks
.NotEmpty()
.NotUnderDomainUpgrade()
.IsAtLocalSchemeShard()
.IsResolved()
.NotDeleted()
.IsSequence()
.NotUnderDeleting()
.NotUnderOperation();

if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
return result;
TString defaultFromSequence = column.GetDefaultFromSequence();

const auto sequencePath = TPath::Resolve(defaultFromSequence, context.SS);
{
const auto checks = sequencePath.Check();
checks
.NotEmpty()
.NotUnderDomainUpgrade()
.IsAtLocalSchemeShard()
.IsResolved()
.NotDeleted()
.IsSequence()
.NotUnderDeleting()
.NotUnderOperation();

if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
return result;
}
}
}

localSequences.insert(sequencePath.PathString());
localSequences.insert(sequencePath.PathString());
}
}

TString errStr;
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_info_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,13 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData(
const TTableInfo::TColumn& sourceColumn = source->Columns[colId];

if (col.HasDefaultFromSequence()) {
if (sourceColumn.PType.GetTypeId() != NScheme::NTypeIds::Int64) {
if (sourceColumn.PType.GetTypeId() != NScheme::NTypeIds::Int64
&& NPg::PgTypeIdFromTypeDesc(sourceColumn.PType.GetTypeDesc()) != INT8OID) {
TString sequenceType = sourceColumn.PType.GetTypeId() == NScheme::NTypeIds::Pg
? NPg::PgTypeNameFromTypeDesc(NPg::TypeDescFromPgTypeId(INT8OID))
: NScheme::TypeName(NScheme::NTypeIds::Int64);
errStr = Sprintf(
"Sequence value type '%s' must be equal to the column type '%s'", "Int64",
"Sequence value type '%s' must be equal to the column type '%s'", sequenceType.c_str(),
NScheme::TypeName(sourceColumn.PType, sourceColumn.PTypeMod).c_str());
return nullptr;
}
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/ydb_convert/table_description.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,17 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki
if (!alter.family().empty()) {
column->SetFamilyName(alter.family());
}
switch (alter.default_value_case()) {
case Ydb::Table::ColumnMeta::kFromSequence: {
auto fromSequence = column->MutableDefaultFromSequence();
TString sequenceName = alter.from_sequence().name();
if (!IsStartWithSlash(sequenceName)) {
*fromSequence = JoinPath({workingDir, sequenceName});
}
break;
}
default: break;
}
}

bool hadPartitionConfig = desc->HasPartitionConfig();
Expand Down
Loading

0 comments on commit cb83f09

Please sign in to comment.