Skip to content

Commit

Permalink
Support CopySequence in schemeshard (#2693)
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd authored Mar 22, 2024
1 parent 431b71b commit 4cd5e13
Show file tree
Hide file tree
Showing 32 changed files with 1,325 additions and 21 deletions.
130 changes: 130 additions & 0 deletions ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1698,6 +1698,136 @@ Y_UNIT_TEST_SUITE(KqpPg) {
}
}

Y_UNIT_TEST(CopyTableSerialColumns) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetEnableNotNullDataColumns(true));
auto client = kikimr.GetTableClient();
auto session = client.CreateSession().GetValueSync().GetSession();
{
const auto query = Q_(R"(
--!syntax_pg
CREATE TABLE PgSerial (
key serial PRIMARY KEY,
value int2
))");

auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
INSERT INTO PgSerial (value) values (1);
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto result = session.CopyTable("/Root/PgSerial", "/Root/copy").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto desc = session.DescribeTable("/Root/copy").ExtractValueSync();
UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM copy;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
INSERT INTO copy (value) values (1);
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM copy;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"];["2";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM PgSerial;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
INSERT INTO PgSerial (value) values (1);
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM copy;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"];["2";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM PgSerial;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"];["2";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}
}

Y_UNIT_TEST(CreateIndex) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/protos/counters_schemeshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ enum ESimpleCounters {
COUNTER_IN_FLIGHT_OPS_TxDropView = 157 [(CounterOpts) = {Name: "InFlightOps/DropView"}];

COUNTER_GRAPHSHARD_COUNT = 158 [(CounterOpts) = {Name: "GraphShards"}];

COUNTER_IN_FLIGHT_OPS_TxCopySequence = 159 [(CounterOpts) = {Name: "InFlightOps/CopySequence"}];
}

enum ECumulativeCounters {
Expand Down Expand Up @@ -315,6 +317,7 @@ enum ECumulativeCounters {
COUNTER_FINISHED_OPS_TxCreateView = 95 [(CounterOpts) = {Name: "FinishedOps/CreateView"}];
COUNTER_FINISHED_OPS_TxDropView = 96 [(CounterOpts) = {Name: "FinishedOps/DropView"}];
COUNTER_FINISHED_OPS_TxAlterView = 97 [(CounterOpts) = {Name: "FinishedOps/AlterView"}];
COUNTER_FINISHED_OPS_TxCopySequence = 98 [(CounterOpts) = {Name: "FinishedOps/TxCopySequence"}];
}

enum EPercentileCounters {
Expand Down Expand Up @@ -536,4 +539,6 @@ enum ETxTypes {
TXTYPE_PERSQUEUE_PROPOSE_RESULT = 83 [(TxTypeOpts) = {Name: "TxPersQueueProposeResult"}];
TXTYPE_PERSQUEUE_PROPOSE_ATTACH_RESULT = 84 [(TxTypeOpts) = {Name: "TxProposeTransactionAttachResult"}];
TXTYPE_UPDATE_DOMAIN_REPLY = 85 [(TxTypeOpts) = {Name: "TxUpdateDomainReply"}];

TXTYPE_SEQUENCESHARD_GET_SEQUENCE_RESULT = 86 [(TxTypeOpts) = {Name: "TxSequenceShardGetSequenceResult"}];
}
1 change: 1 addition & 0 deletions ydb/core/protos/counters_sequenceshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ enum ETxTypes {
TXTYPE_FREEZE_SEQUENCE = 7 [(TxTypeOpts) = {Name: "TxFreezeSequence"}];
TXTYPE_RESTORE_SEQUENCE = 8 [(TxTypeOpts) = {Name: "TxRestoreSequence"}];
TXTYPE_REDIRECT_SEQUENCE = 9 [(TxTypeOpts) = {Name: "TxRedirectSequence"}];
TXTYPE_GET_SEQUENCE = 10 [(TxTypeOpts) = {Name: "TxGetSequence"}];
}
6 changes: 6 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1590,6 +1590,12 @@ message TModifyScheme {
optional TViewDescription CreateView = 64;

optional NActorsProto.TActorId TempTableOwnerActorId = 65;

optional TCopySequence CopySequence = 66;
}

message TCopySequence {
optional string CopyFrom = 1;
}

// "Script", used by client to parse text files with multiple DDL commands
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/protos/out/out_sequenceshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvRestoreSequenceResult::EStatus,
Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvRedirectSequenceResult::EStatus, stream, value) {
stream << NKikimrTxSequenceShard::TEvRedirectSequenceResult::EStatus_Name(value);
}

Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus, stream, value) {
stream << NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus_Name(value);
}
31 changes: 31 additions & 0 deletions ydb/core/protos/tx_sequenceshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ message TEvCreateSequence {
oneof OptionalCycle {
bool Cycle = 9;
}
bool Frozen = 10; // defaults to false
}

message TEvCreateSequenceResult {
Expand Down Expand Up @@ -224,3 +225,33 @@ message TEvRedirectSequenceResult {
uint64 TxId = 3;
uint64 TxPartId = 4;
}

message TEvGetSequence {
NKikimrProto.TPathID PathId = 1;
uint64 TxId = 2;
uint64 TxPartId = 3;
}

message TEvGetSequenceResult {
enum EStatus {
UNKNOWN = 0;
SUCCESS = 1;
PIPE_OUTDATED = 2;
SEQUENCE_NOT_FOUND = 3;
SEQUENCE_MOVED = 4;
}

EStatus Status = 1;
uint64 Origin = 2;
uint64 TxId = 3;
uint64 TxPartId = 4;
uint64 MovedTo = 5; // moved to a different sequence shard
sint64 MinValue = 6;
sint64 MaxValue = 7;
sint64 StartValue = 8;
sint64 NextValue = 9;
bool NextUsed = 10;
uint64 Cache = 11;
sint64 Increment = 12;
bool Cycle = 13;
}
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,8 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::
Y_ABORT("TODO: implement");
case TTxState::ETxType::TxDropSequence:
return CreateDropSequence(NextPartId(), txState);
case TTxState::ETxType::TxCopySequence:
return CreateCopySequence(NextPartId(), txState);

case TTxState::ETxType::TxFillIndex:
Y_ABORT("deprecated");
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard__operation_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ void UpdatePartitioningForCopyTable(TOperationId operationId, TTxState& txState,
class TProposedWaitParts: public TSubOperationState {
private:
TOperationId OperationId;
const TTxState::ETxState NextState;

TString DebugHint() const override {
return TStringBuilder()
Expand All @@ -51,8 +52,9 @@ class TProposedWaitParts: public TSubOperationState {
}

public:
TProposedWaitParts(TOperationId id)
TProposedWaitParts(TOperationId id, TTxState::ETxState nextState = TTxState::Done)
: OperationId(id)
, NextState(nextState)
{
IgnoreMessages(DebugHint(),
{ TEvHive::TEvCreateTabletReply::EventType
Expand Down Expand Up @@ -124,7 +126,7 @@ class TProposedWaitParts: public TSubOperationState {
// Got notifications from all datashards?
if (txState->ShardsInProgress.empty()) {
NTableState::AckAllSchemaChanges(OperationId, *txState, context);
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
context.SS->ChangeTxState(db, OperationId, NextState);
return true;
}

Expand Down Expand Up @@ -968,7 +970,7 @@ class TPropose: public TSubOperationState {
TTxState* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreatePQGroup || txState->TxType == TTxState::TxAlterPQGroup || txState->TxType == TTxState::TxAllocatePQ);

TPathId pathId = txState->TargetPathId;
TPathElement::TPtr path = context.SS->PathsById.at(pathId);

Expand Down
Loading

0 comments on commit 4cd5e13

Please sign in to comment.