From 2e94f582c0a6febafa580b76e9c14df8cdbe23ff Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Thu, 29 Feb 2024 13:50:45 +0300 Subject: [PATCH 01/11] Fix copy table --- ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp index 25bc09da6648..366539ff6530 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp @@ -280,6 +280,10 @@ class TCopyTable: public TSubOperation { return AppData()->AllowShadowDataInSchemeShardForTests; } + void SetLocalSequences(const THashSet& localSequences) { + LocalSequences = localSequences; + } + THolder Propose(const TString& owner, TOperationContext& context) override { const TTabletId ssId = context.SS->SelfTabletId(); From 8d81570006b08aeb1b43664a0438b577fe29d27f Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Fri, 1 Mar 2024 15:38:50 +0300 Subject: [PATCH 02/11] Fixes --- ydb/core/ydb_convert/table_description.h | 4 ++++ ydb/public/api/protos/ydb_table.proto | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index 28f45b421993..6b6cc7c0079b 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -132,4 +132,8 @@ bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out, void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in); +// out +void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, + const NKikimrSchemeOp::TTableDescription& in); + } // namespace NKikimr diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index 295b67c12777..b3db22626ac2 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -355,7 +355,7 @@ message ColumnMeta { // Column default value option oneof default_value { TypedValue from_literal = 5; - SequenceDescription from_sequence = 6; + string from_sequence = 6; // path to sequence } } @@ -552,6 +552,8 @@ message CreateTableRequest { bool temporary = 19; // Is table column or row oriented StoreType store_type = 20; + // List of sequence descriptions + repeated SequenceDescription sequence_descriptions = 21; } message CreateTableResponse { From 45c05e08753fb48ea4a26934a621a1ca7f3ce4c5 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Mon, 4 Mar 2024 12:39:54 +0300 Subject: [PATCH 03/11] Fixes --- .../tx/schemeshard/ut_export/ut_export.cpp | 48 ++++++++----------- .../tx/schemeshard/ut_restore/ut_restore.cpp | 1 + ydb/public/api/protos/ydb_table.proto | 4 +- 3 files changed, 22 insertions(+), 31 deletions(-) diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index 0408d35f5141..e12e62b3dae0 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -303,20 +303,7 @@ Y_UNIT_TEST_SUITE(TExportToS3Tests) { Columns { Name: "value" Type: "Utf8" - DefaultFromLiteral { - type { - optional_type { - item { - type_id: UTF8 - } - } - } - value { - items { - text_value: "a" - } - } - } + DefaultFromSequence: "myseq" } KeyColumnNames: ["key"] PartitionConfig { @@ -338,6 +325,15 @@ Y_UNIT_TEST_SUITE(TExportToS3Tests) { } } } + Sequences { + Name: "myseq" + MinValue: 1 + MaxValue: 20 + StartValue: 1 + Cache: 1 + Increment: 1 + Cycle: false + } )"}; Run(runtime, env, tables, Sprintf(R"( @@ -391,20 +387,7 @@ columns { } } not_null: false - from_literal { - type { - optional_type { - item { - type_id: UTF8 - } - } - } - value { - items { - text_value: "a" - } - } - } + from_sequence: "myseq" } primary_key: "key" storage_settings { @@ -419,6 +402,15 @@ partitioning_settings { partitioning_by_load: DISABLED min_partitions_count: 1 } +sequence_descriptions { + name: "myseq" + min_value: 1 + max_value: 3 + start_value: 1 + cache: 1 + increment: 1 + cycle: true +} )"); } diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index 8028f2c0bad3..b906c5b0dbb1 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -920,6 +920,7 @@ value { Name: "myseq" } )"); + env.TestWaitNotification(runtime, txId); TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index b3db22626ac2..295b67c12777 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -355,7 +355,7 @@ message ColumnMeta { // Column default value option oneof default_value { TypedValue from_literal = 5; - string from_sequence = 6; // path to sequence + SequenceDescription from_sequence = 6; } } @@ -552,8 +552,6 @@ message CreateTableRequest { bool temporary = 19; // Is table column or row oriented StoreType store_type = 20; - // List of sequence descriptions - repeated SequenceDescription sequence_descriptions = 21; } message CreateTableResponse { From 1d0dc64aa1b8c427707b748055b311822394914b Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Mon, 4 Mar 2024 14:08:12 +0300 Subject: [PATCH 04/11] Fixes --- ydb/core/ydb_convert/table_description.h | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index 6b6cc7c0079b..1c1a3b761f0a 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -128,6 +128,7 @@ bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out, const Ydb::Table::CreateTableRequest& in, const TTableProfiles& profiles, Ydb::StatusIds::StatusCode& status, TString& error, bool indexedTable = false); + // out void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in); From 189064cf9539f5674a34e39f00a80477f37504cc Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Mon, 4 Mar 2024 15:47:10 +0300 Subject: [PATCH 05/11] Fixes --- .../tx/schemeshard/ut_export/ut_export.cpp | 48 +++++++++++-------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index e12e62b3dae0..0408d35f5141 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -303,7 +303,20 @@ Y_UNIT_TEST_SUITE(TExportToS3Tests) { Columns { Name: "value" Type: "Utf8" - DefaultFromSequence: "myseq" + DefaultFromLiteral { + type { + optional_type { + item { + type_id: UTF8 + } + } + } + value { + items { + text_value: "a" + } + } + } } KeyColumnNames: ["key"] PartitionConfig { @@ -325,15 +338,6 @@ Y_UNIT_TEST_SUITE(TExportToS3Tests) { } } } - Sequences { - Name: "myseq" - MinValue: 1 - MaxValue: 20 - StartValue: 1 - Cache: 1 - Increment: 1 - Cycle: false - } )"}; Run(runtime, env, tables, Sprintf(R"( @@ -387,7 +391,20 @@ columns { } } not_null: false - from_sequence: "myseq" + from_literal { + type { + optional_type { + item { + type_id: UTF8 + } + } + } + value { + items { + text_value: "a" + } + } + } } primary_key: "key" storage_settings { @@ -402,15 +419,6 @@ partitioning_settings { partitioning_by_load: DISABLED min_partitions_count: 1 } -sequence_descriptions { - name: "myseq" - min_value: 1 - max_value: 3 - start_value: 1 - cache: 1 - increment: 1 - cycle: true -} )"); } From 1b4bab9eed2c923ff0ab5daedcb2e2d4b8ae20d1 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Tue, 5 Mar 2024 13:06:09 +0300 Subject: [PATCH 06/11] Fixes --- ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index b906c5b0dbb1..8028f2c0bad3 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -920,7 +920,6 @@ value { Name: "myseq" } )"); - env.TestWaitNotification(runtime, txId); TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { From 996d85e8d057683b73341ca49c6887661e049a55 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 13 Mar 2024 13:19:29 +0300 Subject: [PATCH 07/11] Initial commit --- ydb/core/protos/counters_schemeshard.proto | 3 +++ ydb/core/protos/flat_scheme_op.proto | 1 + ydb/core/tx/schemeshard/schemeshard__operation.cpp | 2 ++ ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp | 3 ++- ydb/core/tx/schemeshard/schemeshard__operation_part.h | 2 ++ ydb/core/tx/schemeshard/schemeshard_impl.cpp | 1 + ydb/core/tx/schemeshard/schemeshard_tx_infly.h | 4 ++++ ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp | 4 ++++ ydb/core/tx/schemeshard/ut_sequence/ya.make | 1 + ydb/core/tx/schemeshard/ya.make | 1 + 10 files changed, 21 insertions(+), 1 deletion(-) diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index ca2a07a1d508..70319340c6ac 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -195,6 +195,8 @@ enum ESimpleCounters { COUNTER_IN_FLIGHT_OPS_TxDropView = 157 [(CounterOpts) = {Name: "InFlightOps/DropView"}]; COUNTER_GRAPHSHARD_COUNT = 158 [(CounterOpts) = {Name: "GraphShards"}]; + + COUNTER_IN_FLIGHT_OPS_TxCopySequence = 159 [(CounterOpts) = {Name: "InFlightOps/CopySequence"}]; } enum ECumulativeCounters { @@ -315,6 +317,7 @@ enum ECumulativeCounters { COUNTER_FINISHED_OPS_TxCreateView = 95 [(CounterOpts) = {Name: "FinishedOps/CreateView"}]; COUNTER_FINISHED_OPS_TxDropView = 96 [(CounterOpts) = {Name: "FinishedOps/DropView"}]; COUNTER_FINISHED_OPS_TxAlterView = 97 [(CounterOpts) = {Name: "FinishedOps/AlterView"}]; + COUNTER_FINISHED_OPS_TxCopySequence = 98 [(CounterOpts) = {Name: "FinishedOps/TxCopySequence"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index ee3f4460437a..a244fd449888 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1295,6 +1295,7 @@ message TSequenceDescription { optional uint64 Cache = 8; // number of items to cache, defaults to 1 optional sint64 Increment = 9; // increment at each call, defaults to 1 optional bool Cycle = 10; // true when cycle on overflow is allowed + optional string CopyFromSequence = 11; // for copy sequence } message TSequenceSharding { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index 676771621098..4af3fd8a6f47 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -1033,6 +1033,8 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState:: Y_ABORT("TODO: implement"); case TTxState::ETxType::TxDropSequence: return CreateDropSequence(NextPartId(), txState); + case TTxState::ETxType::TxCopySequence: + return CreateCopySequence(NextPartId(), txState); case TTxState::ETxType::TxFillIndex: Y_ABORT("deprecated"); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp index 366539ff6530..cb4b6c1e1a69 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp @@ -779,9 +779,10 @@ TVector CreateCopyTable(TOperationId nextId, const TTxTrans NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence); scheme.SetFailOnExist(tx.GetFailOnExist()); + sequenceDescription.SetCopyFromSequence(copying.GetCopyFromTable() + "/" + sequenceDescription.GetName()); *scheme.MutableSequence() = std::move(sequenceDescription); - result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme)); + result.push_back(CreateCopySequence(NextPartId(nextId, result), scheme)); } return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 76baccd4dabf..4b420825810b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -584,6 +584,8 @@ ISubOperation::TPtr CreateNewSequence(TOperationId id, const TTxTransaction& tx) ISubOperation::TPtr CreateNewSequence(TOperationId id, TTxState::ETxState state); ISubOperation::TPtr CreateDropSequence(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateDropSequence(TOperationId id, TTxState::ETxState state); +ISubOperation::TPtr CreateCopySequence(TOperationId id, const TTxTransaction& tx); +ISubOperation::TPtr CreateCopySequence(TOperationId id, TTxState::ETxState state); ISubOperation::TPtr CreateNewReplication(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateNewReplication(TOperationId id, TTxState::ETxState state); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 4b2aa7eea12d..c4fa7c7a9635 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1433,6 +1433,7 @@ TPathElement::EPathState TSchemeShard::CalcPathState(TTxState::ETxType txType, T case TTxState::TxCreateColumnTable: case TTxState::TxCreateCdcStream: case TTxState::TxCreateSequence: + case TTxState::TxCopySequence: case TTxState::TxCreateReplication: case TTxState::TxCreateBlobDepot: case TTxState::TxCreateExternalTable: diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h index 849f91a9dee0..dc89583282d8 100644 --- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h +++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h @@ -129,6 +129,7 @@ struct TTxState { item(TxCreateView, 83) \ item(TxAlterView, 84) \ item(TxDropView, 85) \ + item(TxCopySequence, 86) \ // TX_STATE_TYPE_ENUM @@ -342,6 +343,7 @@ struct TTxState { case TxCreateExternalTable: case TxCreateExternalDataSource: case TxCreateView: + case TxCopySequence: return true; case TxInitializeBuildIndex: //this is more like alter case TxCreateCdcStreamAtTable: @@ -472,6 +474,7 @@ struct TTxState { case TxCreateExternalTable: case TxCreateExternalDataSource: case TxCreateView: + case TxCopySequence: return false; case TxAlterPQGroup: case TxAlterTable: @@ -559,6 +562,7 @@ struct TTxState { case TxCreateCdcStreamAtTable: case TxCreateCdcStreamAtTableWithInitialScan: case TxCreateSequence: + case TxCopySequence: case TxCreateReplication: case TxCreateBlobDepot: case TxInitializeBuildIndex: diff --git a/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp b/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp index 9ecada64c2eb..0c6a292132b0 100644 --- a/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp +++ b/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp @@ -1,3 +1,4 @@ +#include #include using namespace NKikimr::NSchemeShard; @@ -362,6 +363,9 @@ Y_UNIT_TEST_SUITE(TSequence) { env.TestWaitNotification(runtime, txId); TestCopyTable(runtime, ++txId, "/MyRoot", "copy", "/MyRoot/Table"); + + TestLs(runtime, "/MyRoot/copy/myseq", TDescribeOptionsBuilder().SetShowPrivateTable(true), NLs::PathExist); + env.TestWaitNotification(runtime, txId); } diff --git a/ydb/core/tx/schemeshard/ut_sequence/ya.make b/ydb/core/tx/schemeshard/ut_sequence/ya.make index d6ba601923dd..d48d9e399ec9 100644 --- a/ydb/core/tx/schemeshard/ut_sequence/ya.make +++ b/ydb/core/tx/schemeshard/ut_sequence/ya.make @@ -20,6 +20,7 @@ PEERDIR( ydb/core/testlib/default ydb/core/tx ydb/core/tx/columnshard + ydb/core/tx/datashard ydb/core/tx/schemeshard/ut_helpers ydb/library/yql/public/udf/service/exception_policy ) diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 7dac75c00b8c..a8c455c13c2d 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -102,6 +102,7 @@ SRCS( schemeshard__operation_common_external_table.cpp schemeshard__operation_common_subdomain.h schemeshard__operation_consistent_copy_tables.cpp + schemeshard__operation_copy_sequence.cpp schemeshard__operation_copy_table.cpp schemeshard__operation_create_backup.cpp schemeshard__operation_create_bsv.cpp From c8c8682f0b9457bbb6ac77b62598532d2efe3c99 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 13 Mar 2024 13:19:57 +0300 Subject: [PATCH 08/11] Add copy --- .../schemeshard__operation_copy_sequence.cpp | 626 ++++++++++++++++++ 1 file changed, 626 insertions(+) create mode 100644 ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp new file mode 100644 index 000000000000..bde05ea47437 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp @@ -0,0 +1,626 @@ +#include "schemeshard__operation_part.h" +#include "schemeshard__operation_common.h" +#include "schemeshard_impl.h" + +#include +#include +#include + + +namespace { + +using namespace NKikimr; +using namespace NSchemeShard; + +class TConfigureParts : public TSubOperationState { +private: + TOperationId OperationId; + ui32 RestoredSequences = 0; + + TString DebugHint() const override { + return TStringBuilder() + << "TCopySequence TConfigureParts" + << " operationId#" << OperationId; + } + +public: + TConfigureParts(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), { + TEvHive::TEvCreateTabletReply::EventType, + }); + } + + bool HandleReply(NSequenceShard::TEvSequenceShard::TEvRestoreSequenceResult::TPtr& ev, TOperationContext& context) override { + auto ssId = context.SS->SelfTabletId(); + auto tabletId = TTabletId(ev->Get()->Record.GetOrigin()); + auto status = ev->Get()->Record.GetStatus(); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts HandleReply TEvRestoreSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + + switch (status) { + case NKikimrTxSequenceShard::TEvRestoreSequenceResult::SUCCESS: break; + default: + // Treat all other replies as unexpected and spurious + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts HandleReply ignoring unexpected TEvRestoreSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + return false; + } + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + Y_ABORT_UNLESS(txState->State == TTxState::ConfigureParts); + + auto shardIdx = context.SS->MustGetShardIdx(tabletId); + if (!txState->ShardsInProgress.erase(shardIdx)) { + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts HandleReply ignoring duplicate TEvRestoreSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + return false; + } + + if (!txState->ShardsInProgress.empty()) { + return false; + } + + RestoredSequences++; + + if (RestoredSequences > 1 && txState->ShardsInProgress.empty()) { + NIceDb::TNiceDb db(context.GetDB()); + context.SS->ChangeTxState(db, OperationId, TTxState::Propose); + context.OnComplete.ActivateTx(OperationId); + return true; + } + + TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(txState->SourcePathId); + Y_ABORT_UNLESS(sequenceInfo); + Y_ABORT_UNLESS(!sequenceInfo->AlterData); + + for (const auto& shardIdxProto : sequenceInfo->Sharding.GetSequenceShards()) { + TShardIdx shardIdx = FromProto(shardIdxProto); + auto currentTabletId = context.SS->ShardInfos.at(shardIdx).TabletID; + + auto event = MakeHolder(txState->SourcePathId); + event->Record.SetTxId(ui64(OperationId.GetTxId())); + event->Record.SetTxPartId(OperationId.GetSubTxId()); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts ProgressState" + << " sending TEvRestoreSequence to tablet " << currentTabletId + << " operationId# " << OperationId + << " at tablet " << ssId); + + context.OnComplete.BindMsgToPipe(OperationId, currentTabletId, txState->SourcePathId, event.Release()); + + // Wait for results from this shard + txState->ShardsInProgress.insert(shardIdx); + } + + return false; + } + + bool HandleReply(NSequenceShard::TEvSequenceShard::TEvFreezeSequenceResult::TPtr& ev, TOperationContext& context) override { + auto ssId = context.SS->SelfTabletId(); + auto tabletId = TTabletId(ev->Get()->Record.GetOrigin()); + auto status = ev->Get()->Record.GetStatus(); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts HandleReply TEvFreezeSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + + switch (status) { + case NKikimrTxSequenceShard::TEvFreezeSequenceResult::SUCCESS: break; + default: + // Treat all other replies as unexpected and spurious + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts HandleReply ignoring unexpected TEvFreezeSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + return false; + } + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + Y_ABORT_UNLESS(txState->State == TTxState::ConfigureParts); + + auto shardIdx = context.SS->MustGetShardIdx(tabletId); + if (!txState->ShardsInProgress.erase(shardIdx)) { + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts HandleReply ignoring duplicate TEvFreezeSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + return false; + } + + if (!txState->ShardsInProgress.empty()) { + return false; + } + + for (auto shard : txState->Shards) { + auto shardIdx = shard.Idx; + auto currentTabletId = context.SS->ShardInfos.at(shardIdx).TabletID; + Y_ABORT_UNLESS(shard.TabletType == ETabletType::SequenceShard); + + if (currentTabletId == InvalidTabletId) { + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts ProgressState" + << " shard " << shardIdx << " is not created yet, waiting" + << " operationId# " << OperationId + << " at tablet " << ssId); + context.OnComplete.WaitShardCreated(shardIdx, OperationId); + txState->ShardsInProgress.insert(shardIdx); + return false; + } + + auto event = MakeHolder( + txState->TargetPathId, ev->Get()->Record); + event->Record.SetTxId(ui64(OperationId.GetTxId())); + event->Record.SetTxPartId(OperationId.GetSubTxId()); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts ProgressState" + << " sending TEvRestoreSequence to tablet " << currentTabletId + << " operationId# " << OperationId + << " at tablet " << ssId); + + context.OnComplete.BindMsgToPipe(OperationId, currentTabletId, txState->TargetPathId, event.Release()); + + // Wait for results from this shard + txState->ShardsInProgress.insert(shardIdx); + } + + return false; + } + + bool ProgressState(TOperationContext& context) override { + auto ssId = context.SS->SelfTabletId(); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts ProgressState" + << " operationId# " << OperationId + << " at tablet " << ssId); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + Y_ABORT_UNLESS(!txState->Shards.empty()); + Y_ABORT_UNLESS(txState->SourcePathId != InvalidPathId); + + TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(txState->SourcePathId); + Y_ABORT_UNLESS(sequenceInfo); + Y_ABORT_UNLESS(!sequenceInfo->AlterData); + + for (const auto& shardIdxProto : sequenceInfo->Sharding.GetSequenceShards()) { + TShardIdx shardIdx = FromProto(shardIdxProto); + auto tabletId = context.SS->ShardInfos.at(shardIdx).TabletID; + + auto event = MakeHolder(txState->SourcePathId); + event->Record.SetTxId(ui64(OperationId.GetTxId())); + event->Record.SetTxPartId(OperationId.GetSubTxId()); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts ProgressState" + << " sending TEvFreezeSequence to tablet " << tabletId + << " operationId# " << OperationId + << " at tablet " << ssId); + + context.OnComplete.BindMsgToPipe(OperationId, tabletId, txState->SourcePathId, event.Release()); + + txState->ShardsInProgress.insert(shardIdx); + } + + return false; + } +}; + +class TPropose: public TSubOperationState { +private: + TOperationId OperationId; + + TString DebugHint() const override { + return TStringBuilder() + << "TCopySequence TPropose" + << " operationId#" << OperationId; + } + +public: + TPropose(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), { + TEvHive::TEvCreateTabletReply::EventType, + NSequenceShard::TEvSequenceShard::TEvFreezeSequenceResult::EventType, + NSequenceShard::TEvSequenceShard::TEvRestoreSequenceResult::EventType, + }); + } + + bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { + auto step = TStepId(ev->Get()->StepId); + auto ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvOperationPlan" + << ", at schemeshard: " << ssId); + + TTxState* txState = context.SS->FindTx(OperationId); + if (!txState) { + return false; + } + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + + TPathId pathId = txState->TargetPathId; + TPathElement::TPtr path = context.SS->PathsById.at(pathId); + + Y_VERIFY_S(context.SS->Sequences.contains(pathId), "Sequence not found. PathId: " << pathId); + TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(pathId); + Y_ABORT_UNLESS(sequenceInfo); + TSequenceInfo::TPtr alterData = sequenceInfo->AlterData; + Y_ABORT_UNLESS(alterData); + + NIceDb::TNiceDb db(context.GetDB()); + + path->StepCreated = step; + context.SS->PersistCreateStep(db, pathId, step); + + context.SS->Sequences[pathId] = alterData; + context.SS->PersistSequenceAlterRemove(db, pathId); + context.SS->PersistSequence(db, pathId, *alterData); + + auto parentDir = context.SS->PathsById.at(path->ParentPathId); + if (parentDir->IsLikeDirectory()) { + ++parentDir->DirAlterVersion; + context.SS->PersistPathDirAlterVersion(db, parentDir); + } + context.SS->ClearDescribePathCaches(parentDir); + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); + + context.SS->ClearDescribePathCaches(path); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + context.SS->ChangeTxState(db, OperationId, TTxState::Done); + return true; + } + + bool ProgressState(TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " ProgressState" + << ", at schemeshard: " << ssId); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + + context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, TStepId(0)); + return false; + } +}; + +class TCopySequence: public TSubOperation { + + static TTxState::ETxState NextState() { + return TTxState::CreateParts; + } + + TTxState::ETxState NextState(TTxState::ETxState state) const override { + switch (state) { + case TTxState::Waiting: + case TTxState::CreateParts: + return TTxState::ConfigureParts; + case TTxState::ConfigureParts: + return TTxState::Propose; + case TTxState::Propose: + return TTxState::Done; + default: + return TTxState::Invalid; + } + return TTxState::Invalid; + } + + TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) override { + using TPtr = TSubOperationState::TPtr; + + switch (state) { + case TTxState::Waiting: + case TTxState::CreateParts: + return TPtr(new TCreateParts(OperationId)); + case TTxState::ConfigureParts: + return TPtr(new TConfigureParts(OperationId)); + case TTxState::Propose: + return TPtr(new TPropose(OperationId)); + case TTxState::Done: + return TPtr(new TDone(OperationId)); + default: + return nullptr; + } + } + +public: + using TSubOperation::TSubOperation; + + THolder Propose(const TString& owner, TOperationContext& context) override { + const TTabletId ssId = context.SS->SelfTabletId(); + + const auto acceptExisted = !Transaction.GetFailOnExist(); + const TString& parentPathStr = Transaction.GetWorkingDir(); + auto& descr = Transaction.GetSequence(); + const TString& name = descr.GetName(); + + LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence Propose" + << ", path: " << parentPathStr << "/" << name + << ", opId: " << OperationId + << ", at schemeshard: " << ssId); + + TEvSchemeShard::EStatus status = NKikimrScheme::StatusAccepted; + auto result = MakeHolder(status, ui64(OperationId.GetTxId()), ui64(ssId)); + + NSchemeShard::TPath parentPath = NSchemeShard::TPath::Resolve(parentPathStr, context.SS); + { + NSchemeShard::TPath::TChecker checks = parentPath.Check(); + checks + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsCommonSensePath(); + + if (checks) { + if (parentPath->IsTable()) { + // allow immediately inside a normal table + if (parentPath.IsUnderOperation()) { + checks.IsUnderTheSameOperation(OperationId.GetTxId()); // allowed only as part of consistent operations + } + } else { + // otherwise don't allow unexpected object types + checks.IsLikeDirectory(); + } + } + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + return result; + } + } + + TPath srcPath = TPath::Resolve(descr.GetCopyFromSequence(), context.SS); + { + TPath::TChecker checks = srcPath.Check(); + checks + .NotEmpty() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsSequence() + .NotUnderTheSameOperation(OperationId.GetTxId()) + .NotUnderOperation(); + + if (checks) { + if (!parentPath->IsTable()) { + // otherwise don't allow unexpected object types + checks.IsLikeDirectory(); + } + } + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + return result; + } + } + + auto domainPathId = parentPath.GetPathIdForDomain(); + auto domainInfo = parentPath.DomainInfo(); + + // TODO: maybe select from several shards + ui64 shardsToCreate = 0; + TShardIdx sequenceShard; + if (domainInfo->GetSequenceShards().empty()) { + ++shardsToCreate; + } else { + sequenceShard = *domainInfo->GetSequenceShards().begin(); + } + + const TString acl = Transaction.GetModifyACL().GetDiffACL(); + + NSchemeShard::TPath dstPath = parentPath.Child(name); + { + NSchemeShard::TPath::TChecker checks = dstPath.Check(); + checks.IsAtLocalSchemeShard(); + if (dstPath.IsResolved()) { + checks + .IsResolved() + .NotUnderDeleting() + .FailOnExist(TPathElement::EPathType::EPathTypeSequence, acceptExisted); + } else { + checks + .NotEmpty() + .NotResolved(); + } + + if (checks) { + checks.IsValidLeafName(); + + if (!parentPath->IsTable()) { + checks.DepthLimit(); + } + + checks + .PathsLimit() + .DirChildrenLimit() + .ShardsLimit(shardsToCreate) + .IsTheSameDomain(srcPath) + //.PathShardsLimit(shardsToCreate) + .IsValidACL(acl); + } + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + if (dstPath.IsResolved()) { + result->SetPathCreateTxId(ui64(dstPath->CreateTxId)); + result->SetPathId(dstPath->PathId.LocalPathId); + } + return result; + } + } + + TString errStr; + + if (!TSequenceInfo::ValidateCreate(descr, errStr)) { + result->SetError(NKikimrScheme::StatusSchemeError, errStr); + return result; + } + + if (!context.SS->CheckApplyIf(Transaction, errStr)) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); + return result; + } + + Y_ABORT_UNLESS(context.SS->Sequences.contains(srcPath.Base()->PathId)); + + const ui32 profileId = 0; + TChannelsBindings channelsBindings; + if (shardsToCreate) { + if (!context.SS->ResolveTabletChannels(profileId, dstPath.GetPathIdForDomain(), channelsBindings)) { + result->SetError(NKikimrScheme::StatusInvalidParameter, + "Unable to construct channel binding for sequence shard with the storage pool"); + return result; + } + } + + dstPath.MaterializeLeaf(owner); + result->SetPathId(dstPath->PathId.LocalPathId); + context.SS->TabletCounters->Simple()[COUNTER_SEQUENCE_COUNT].Add(1); + + TPathId pathId = dstPath->PathId; + dstPath->CreateTxId = OperationId.GetTxId(); + dstPath->LastTxId = OperationId.GetTxId(); + dstPath->PathState = TPathElement::EPathState::EPathStateCreate; + dstPath->PathType = TPathElement::EPathType::EPathTypeSequence; + + if (parentPath->HasActiveChanges()) { + TTxId parentTxId = parentPath->PlannedToCreate() ? parentPath->CreateTxId : parentPath->LastTxId; + context.OnComplete.Dependence(parentTxId, OperationId.GetTxId()); + } + + TTxState& txState = + context.SS->CreateTx(OperationId, TTxState::TxCopySequence, pathId, srcPath.Base()->PathId); + txState.State = TTxState::ConfigureParts; + + TSequenceInfo::TPtr sequenceInfo = new TSequenceInfo(0); + TSequenceInfo::TPtr alterData = sequenceInfo->CreateNextVersion(); + alterData->Description = descr; + + if (shardsToCreate) { + sequenceShard = context.SS->RegisterShardInfo( + TShardInfo::SequenceShardInfo(OperationId.GetTxId(), domainPathId) + .WithBindedChannels(channelsBindings)); + context.SS->TabletCounters->Simple()[COUNTER_SEQUENCESHARD_COUNT].Add(1); + txState.Shards.emplace_back(sequenceShard, ETabletType::SequenceShard, TTxState::CreateParts); + txState.State = TTxState::CreateParts; + context.SS->PathsById.at(domainPathId)->IncShardsInside(); + domainInfo->AddInternalShard(sequenceShard); + domainInfo->AddSequenceShard(sequenceShard); + } else { + txState.Shards.emplace_back(sequenceShard, ETabletType::SequenceShard, TTxState::ConfigureParts); + auto& shardInfo = context.SS->ShardInfos.at(sequenceShard); + if (shardInfo.CurrentTxId != OperationId.GetTxId()) { + context.OnComplete.Dependence(shardInfo.CurrentTxId, OperationId.GetTxId()); + } + } + + { + auto* p = alterData->Sharding.AddSequenceShards(); + p->SetOwnerId(sequenceShard.GetOwnerId()); + p->SetLocalId(ui64(sequenceShard.GetLocalId())); + } + + NIceDb::TNiceDb db(context.GetDB()); + + context.SS->ChangeTxState(db, OperationId, txState.State); + context.OnComplete.ActivateTx(OperationId); + + context.SS->PersistPath(db, dstPath->PathId); + if (!acl.empty()) { + dstPath->ApplyACL(acl); + context.SS->PersistACL(db, dstPath.Base()); + } + + context.SS->Sequences[pathId] = sequenceInfo; + context.SS->PersistSequence(db, pathId, *sequenceInfo); + context.SS->PersistSequenceAlter(db, pathId, *alterData); + context.SS->IncrementPathDbRefCount(pathId); + + context.SS->PersistTxState(db, OperationId); + context.SS->PersistUpdateNextPathId(db); + if (shardsToCreate) { + context.SS->PersistUpdateNextShardIdx(db); + } + + for (auto shard : txState.Shards) { + if (shard.Operation == TTxState::CreateParts) { + context.SS->PersistChannelsBinding(db, shard.Idx, context.SS->ShardInfos.at(shard.Idx).BindedChannels); + context.SS->PersistShardMapping(db, shard.Idx, InvalidTabletId, domainPathId, OperationId.GetTxId(), shard.TabletType); + } + } + + ++parentPath->DirAlterVersion; + context.SS->PersistPathDirAlterVersion(db, parentPath.Base()); + context.SS->ClearDescribePathCaches(parentPath.Base()); + context.OnComplete.PublishToSchemeBoard(OperationId, parentPath->PathId); + + context.SS->ClearDescribePathCaches(dstPath.Base()); + context.OnComplete.PublishToSchemeBoard(OperationId, dstPath->PathId); + + domainInfo->IncPathsInside(); + parentPath->IncAliveChildren(); + + SetState(NextState()); + return result; + } + + void AbortPropose(TOperationContext&) override { + Y_ABORT("no AbortPropose for TCopySequence"); + } + + void AbortUnsafe(TTxId, TOperationContext&) override { + Y_ABORT("no AbortUnsafe for TCopySequence"); + } +}; + +} + +namespace NKikimr::NSchemeShard { + +ISubOperation::TPtr CreateCopySequence(TOperationId id, const TTxTransaction& tx) +{ + return MakeSubOperation(id, tx); +} + +ISubOperation::TPtr CreateCopySequence(TOperationId id, TTxState::ETxState state) { + return MakeSubOperation(id, state); +} + +} From 6dca0dd5226063bcdc7861b19c2408a0a7fc3da8 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 13 Mar 2024 14:26:56 +0300 Subject: [PATCH 09/11] Fixes --- .../schemeshard/schemeshard__operation_copy_sequence.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp index bde05ea47437..99869ed7f2eb 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp @@ -16,6 +16,7 @@ class TConfigureParts : public TSubOperationState { private: TOperationId OperationId; ui32 RestoredSequences = 0; + NKikimrTxSequenceShard::TEvFreezeSequenceResult FreezeSequenceResult; TString DebugHint() const override { return TStringBuilder() @@ -94,7 +95,8 @@ class TConfigureParts : public TSubOperationState { TShardIdx shardIdx = FromProto(shardIdxProto); auto currentTabletId = context.SS->ShardInfos.at(shardIdx).TabletID; - auto event = MakeHolder(txState->SourcePathId); + auto event = MakeHolder( + txState->SourcePathId, FreezeSequenceResult); event->Record.SetTxId(ui64(OperationId.GetTxId())); event->Record.SetTxPartId(OperationId.GetSubTxId()); @@ -158,6 +160,8 @@ class TConfigureParts : public TSubOperationState { return false; } + FreezeSequenceResult = ev->Get()->Record; + for (auto shard : txState->Shards) { auto shardIdx = shard.Idx; auto currentTabletId = context.SS->ShardInfos.at(shardIdx).TabletID; @@ -175,7 +179,7 @@ class TConfigureParts : public TSubOperationState { } auto event = MakeHolder( - txState->TargetPathId, ev->Get()->Record); + txState->TargetPathId, FreezeSequenceResult); event->Record.SetTxId(ui64(OperationId.GetTxId())); event->Record.SetTxPartId(OperationId.GetSubTxId()); From f592eb979b08d47ec3b2389e71f7b19d12aacfb4 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 13 Mar 2024 14:27:17 +0300 Subject: [PATCH 10/11] Fixes --- ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 130 +++++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index 7aac95eca00c..58824e917d23 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -1698,6 +1698,136 @@ Y_UNIT_TEST_SUITE(KqpPg) { } } + Y_UNIT_TEST(CopyTableSerialColumns) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetEnableNotNullDataColumns(true)); + auto client = kikimr.GetTableClient(); + auto session = client.CreateSession().GetValueSync().GetSession(); + { + const auto query = Q_(R"( + --!syntax_pg + CREATE TABLE PgSerial ( + key serial PRIMARY KEY, + value int2 + ))"); + + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO PgSerial (value) values (1); + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto result = session.CopyTable("/Root/PgSerial", "/Root/copy").GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto desc = session.DescribeTable("/Root/copy").ExtractValueSync(); + UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM copy; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO copy (value) values (1); + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM copy; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"];["2";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM PgSerial; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO PgSerial (value) values (1); + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM copy; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"];["2";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM PgSerial; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"];["2";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + } + Y_UNIT_TEST(CreateIndex) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);; From 5c3b8a81bd631a0aa99e7be79580da9fd1b81a6d Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 13 Mar 2024 14:41:11 +0300 Subject: [PATCH 11/11] Fixes --- ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp index cb4b6c1e1a69..db2212d3f627 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp @@ -280,10 +280,6 @@ class TCopyTable: public TSubOperation { return AppData()->AllowShadowDataInSchemeShardForTests; } - void SetLocalSequences(const THashSet& localSequences) { - LocalSequences = localSequences; - } - THolder Propose(const TString& owner, TOperationContext& context) override { const TTabletId ssId = context.SS->SelfTabletId();