From 2e94f582c0a6febafa580b76e9c14df8cdbe23ff Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Thu, 29 Feb 2024 13:50:45 +0300 Subject: [PATCH 01/14] Fix copy table --- ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp index 25bc09da6648..366539ff6530 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp @@ -280,6 +280,10 @@ class TCopyTable: public TSubOperation { return AppData()->AllowShadowDataInSchemeShardForTests; } + void SetLocalSequences(const THashSet& localSequences) { + LocalSequences = localSequences; + } + THolder Propose(const TString& owner, TOperationContext& context) override { const TTabletId ssId = context.SS->SelfTabletId(); From 8d81570006b08aeb1b43664a0438b577fe29d27f Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Fri, 1 Mar 2024 15:38:50 +0300 Subject: [PATCH 02/14] Fixes --- ydb/core/ydb_convert/table_description.h | 4 ++++ ydb/public/api/protos/ydb_table.proto | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index 28f45b421993..6b6cc7c0079b 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -132,4 +132,8 @@ bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out, void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in); +// out +void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, + const NKikimrSchemeOp::TTableDescription& in); + } // namespace NKikimr diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index 295b67c12777..b3db22626ac2 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -355,7 +355,7 @@ message ColumnMeta { // Column default value option oneof default_value { TypedValue from_literal = 5; - SequenceDescription from_sequence = 6; + string from_sequence = 6; // path to sequence } } @@ -552,6 +552,8 @@ message CreateTableRequest { bool temporary = 19; // Is table column or row oriented StoreType store_type = 20; + // List of sequence descriptions + repeated SequenceDescription sequence_descriptions = 21; } message CreateTableResponse { From 45c05e08753fb48ea4a26934a621a1ca7f3ce4c5 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Mon, 4 Mar 2024 12:39:54 +0300 Subject: [PATCH 03/14] Fixes --- .../tx/schemeshard/ut_export/ut_export.cpp | 48 ++++++++----------- .../tx/schemeshard/ut_restore/ut_restore.cpp | 1 + ydb/public/api/protos/ydb_table.proto | 4 +- 3 files changed, 22 insertions(+), 31 deletions(-) diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index 0408d35f5141..e12e62b3dae0 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -303,20 +303,7 @@ Y_UNIT_TEST_SUITE(TExportToS3Tests) { Columns { Name: "value" Type: "Utf8" - DefaultFromLiteral { - type { - optional_type { - item { - type_id: UTF8 - } - } - } - value { - items { - text_value: "a" - } - } - } + DefaultFromSequence: "myseq" } KeyColumnNames: ["key"] PartitionConfig { @@ -338,6 +325,15 @@ Y_UNIT_TEST_SUITE(TExportToS3Tests) { } } } + Sequences { + Name: "myseq" + MinValue: 1 + MaxValue: 20 + StartValue: 1 + Cache: 1 + Increment: 1 + Cycle: false + } )"}; Run(runtime, env, tables, Sprintf(R"( @@ -391,20 +387,7 @@ columns { } } not_null: false - from_literal { - type { - optional_type { - item { - type_id: UTF8 - } - } - } - value { - items { - text_value: "a" - } - } - } + from_sequence: "myseq" } primary_key: "key" storage_settings { @@ -419,6 +402,15 @@ partitioning_settings { partitioning_by_load: DISABLED min_partitions_count: 1 } +sequence_descriptions { + name: "myseq" + min_value: 1 + max_value: 3 + start_value: 1 + cache: 1 + increment: 1 + cycle: true +} )"); } diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index 8028f2c0bad3..b906c5b0dbb1 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -920,6 +920,7 @@ value { Name: "myseq" } )"); + env.TestWaitNotification(runtime, txId); TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index b3db22626ac2..295b67c12777 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -355,7 +355,7 @@ message ColumnMeta { // Column default value option oneof default_value { TypedValue from_literal = 5; - string from_sequence = 6; // path to sequence + SequenceDescription from_sequence = 6; } } @@ -552,8 +552,6 @@ message CreateTableRequest { bool temporary = 19; // Is table column or row oriented StoreType store_type = 20; - // List of sequence descriptions - repeated SequenceDescription sequence_descriptions = 21; } message CreateTableResponse { From 1d0dc64aa1b8c427707b748055b311822394914b Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Mon, 4 Mar 2024 14:08:12 +0300 Subject: [PATCH 04/14] Fixes --- ydb/core/ydb_convert/table_description.h | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index 6b6cc7c0079b..1c1a3b761f0a 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -128,6 +128,7 @@ bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out, const Ydb::Table::CreateTableRequest& in, const TTableProfiles& profiles, Ydb::StatusIds::StatusCode& status, TString& error, bool indexedTable = false); + // out void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in); From 189064cf9539f5674a34e39f00a80477f37504cc Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Mon, 4 Mar 2024 15:47:10 +0300 Subject: [PATCH 05/14] Fixes --- .../tx/schemeshard/ut_export/ut_export.cpp | 48 +++++++++++-------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index e12e62b3dae0..0408d35f5141 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -303,7 +303,20 @@ Y_UNIT_TEST_SUITE(TExportToS3Tests) { Columns { Name: "value" Type: "Utf8" - DefaultFromSequence: "myseq" + DefaultFromLiteral { + type { + optional_type { + item { + type_id: UTF8 + } + } + } + value { + items { + text_value: "a" + } + } + } } KeyColumnNames: ["key"] PartitionConfig { @@ -325,15 +338,6 @@ Y_UNIT_TEST_SUITE(TExportToS3Tests) { } } } - Sequences { - Name: "myseq" - MinValue: 1 - MaxValue: 20 - StartValue: 1 - Cache: 1 - Increment: 1 - Cycle: false - } )"}; Run(runtime, env, tables, Sprintf(R"( @@ -387,7 +391,20 @@ columns { } } not_null: false - from_sequence: "myseq" + from_literal { + type { + optional_type { + item { + type_id: UTF8 + } + } + } + value { + items { + text_value: "a" + } + } + } } primary_key: "key" storage_settings { @@ -402,15 +419,6 @@ partitioning_settings { partitioning_by_load: DISABLED min_partitions_count: 1 } -sequence_descriptions { - name: "myseq" - min_value: 1 - max_value: 3 - start_value: 1 - cache: 1 - increment: 1 - cycle: true -} )"); } From 1b4bab9eed2c923ff0ab5daedcb2e2d4b8ae20d1 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Tue, 5 Mar 2024 13:06:09 +0300 Subject: [PATCH 06/14] Fixes --- ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index b906c5b0dbb1..8028f2c0bad3 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -920,7 +920,6 @@ value { Name: "myseq" } )"); - env.TestWaitNotification(runtime, txId); TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( ExportToS3Settings { From 996d85e8d057683b73341ca49c6887661e049a55 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 13 Mar 2024 13:19:29 +0300 Subject: [PATCH 07/14] Initial commit --- ydb/core/protos/counters_schemeshard.proto | 3 +++ ydb/core/protos/flat_scheme_op.proto | 1 + ydb/core/tx/schemeshard/schemeshard__operation.cpp | 2 ++ ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp | 3 ++- ydb/core/tx/schemeshard/schemeshard__operation_part.h | 2 ++ ydb/core/tx/schemeshard/schemeshard_impl.cpp | 1 + ydb/core/tx/schemeshard/schemeshard_tx_infly.h | 4 ++++ ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp | 4 ++++ ydb/core/tx/schemeshard/ut_sequence/ya.make | 1 + ydb/core/tx/schemeshard/ya.make | 1 + 10 files changed, 21 insertions(+), 1 deletion(-) diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index ca2a07a1d508..70319340c6ac 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -195,6 +195,8 @@ enum ESimpleCounters { COUNTER_IN_FLIGHT_OPS_TxDropView = 157 [(CounterOpts) = {Name: "InFlightOps/DropView"}]; COUNTER_GRAPHSHARD_COUNT = 158 [(CounterOpts) = {Name: "GraphShards"}]; + + COUNTER_IN_FLIGHT_OPS_TxCopySequence = 159 [(CounterOpts) = {Name: "InFlightOps/CopySequence"}]; } enum ECumulativeCounters { @@ -315,6 +317,7 @@ enum ECumulativeCounters { COUNTER_FINISHED_OPS_TxCreateView = 95 [(CounterOpts) = {Name: "FinishedOps/CreateView"}]; COUNTER_FINISHED_OPS_TxDropView = 96 [(CounterOpts) = {Name: "FinishedOps/DropView"}]; COUNTER_FINISHED_OPS_TxAlterView = 97 [(CounterOpts) = {Name: "FinishedOps/AlterView"}]; + COUNTER_FINISHED_OPS_TxCopySequence = 98 [(CounterOpts) = {Name: "FinishedOps/TxCopySequence"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index ee3f4460437a..a244fd449888 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1295,6 +1295,7 @@ message TSequenceDescription { optional uint64 Cache = 8; // number of items to cache, defaults to 1 optional sint64 Increment = 9; // increment at each call, defaults to 1 optional bool Cycle = 10; // true when cycle on overflow is allowed + optional string CopyFromSequence = 11; // for copy sequence } message TSequenceSharding { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index 676771621098..4af3fd8a6f47 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -1033,6 +1033,8 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState:: Y_ABORT("TODO: implement"); case TTxState::ETxType::TxDropSequence: return CreateDropSequence(NextPartId(), txState); + case TTxState::ETxType::TxCopySequence: + return CreateCopySequence(NextPartId(), txState); case TTxState::ETxType::TxFillIndex: Y_ABORT("deprecated"); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp index 366539ff6530..cb4b6c1e1a69 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp @@ -779,9 +779,10 @@ TVector CreateCopyTable(TOperationId nextId, const TTxTrans NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence); scheme.SetFailOnExist(tx.GetFailOnExist()); + sequenceDescription.SetCopyFromSequence(copying.GetCopyFromTable() + "/" + sequenceDescription.GetName()); *scheme.MutableSequence() = std::move(sequenceDescription); - result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme)); + result.push_back(CreateCopySequence(NextPartId(nextId, result), scheme)); } return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 76baccd4dabf..4b420825810b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -584,6 +584,8 @@ ISubOperation::TPtr CreateNewSequence(TOperationId id, const TTxTransaction& tx) ISubOperation::TPtr CreateNewSequence(TOperationId id, TTxState::ETxState state); ISubOperation::TPtr CreateDropSequence(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateDropSequence(TOperationId id, TTxState::ETxState state); +ISubOperation::TPtr CreateCopySequence(TOperationId id, const TTxTransaction& tx); +ISubOperation::TPtr CreateCopySequence(TOperationId id, TTxState::ETxState state); ISubOperation::TPtr CreateNewReplication(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateNewReplication(TOperationId id, TTxState::ETxState state); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 4b2aa7eea12d..c4fa7c7a9635 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1433,6 +1433,7 @@ TPathElement::EPathState TSchemeShard::CalcPathState(TTxState::ETxType txType, T case TTxState::TxCreateColumnTable: case TTxState::TxCreateCdcStream: case TTxState::TxCreateSequence: + case TTxState::TxCopySequence: case TTxState::TxCreateReplication: case TTxState::TxCreateBlobDepot: case TTxState::TxCreateExternalTable: diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h index 849f91a9dee0..dc89583282d8 100644 --- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h +++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h @@ -129,6 +129,7 @@ struct TTxState { item(TxCreateView, 83) \ item(TxAlterView, 84) \ item(TxDropView, 85) \ + item(TxCopySequence, 86) \ // TX_STATE_TYPE_ENUM @@ -342,6 +343,7 @@ struct TTxState { case TxCreateExternalTable: case TxCreateExternalDataSource: case TxCreateView: + case TxCopySequence: return true; case TxInitializeBuildIndex: //this is more like alter case TxCreateCdcStreamAtTable: @@ -472,6 +474,7 @@ struct TTxState { case TxCreateExternalTable: case TxCreateExternalDataSource: case TxCreateView: + case TxCopySequence: return false; case TxAlterPQGroup: case TxAlterTable: @@ -559,6 +562,7 @@ struct TTxState { case TxCreateCdcStreamAtTable: case TxCreateCdcStreamAtTableWithInitialScan: case TxCreateSequence: + case TxCopySequence: case TxCreateReplication: case TxCreateBlobDepot: case TxInitializeBuildIndex: diff --git a/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp b/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp index 9ecada64c2eb..0c6a292132b0 100644 --- a/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp +++ b/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp @@ -1,3 +1,4 @@ +#include #include using namespace NKikimr::NSchemeShard; @@ -362,6 +363,9 @@ Y_UNIT_TEST_SUITE(TSequence) { env.TestWaitNotification(runtime, txId); TestCopyTable(runtime, ++txId, "/MyRoot", "copy", "/MyRoot/Table"); + + TestLs(runtime, "/MyRoot/copy/myseq", TDescribeOptionsBuilder().SetShowPrivateTable(true), NLs::PathExist); + env.TestWaitNotification(runtime, txId); } diff --git a/ydb/core/tx/schemeshard/ut_sequence/ya.make b/ydb/core/tx/schemeshard/ut_sequence/ya.make index d6ba601923dd..d48d9e399ec9 100644 --- a/ydb/core/tx/schemeshard/ut_sequence/ya.make +++ b/ydb/core/tx/schemeshard/ut_sequence/ya.make @@ -20,6 +20,7 @@ PEERDIR( ydb/core/testlib/default ydb/core/tx ydb/core/tx/columnshard + ydb/core/tx/datashard ydb/core/tx/schemeshard/ut_helpers ydb/library/yql/public/udf/service/exception_policy ) diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 7dac75c00b8c..a8c455c13c2d 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -102,6 +102,7 @@ SRCS( schemeshard__operation_common_external_table.cpp schemeshard__operation_common_subdomain.h schemeshard__operation_consistent_copy_tables.cpp + schemeshard__operation_copy_sequence.cpp schemeshard__operation_copy_table.cpp schemeshard__operation_create_backup.cpp schemeshard__operation_create_bsv.cpp From c8c8682f0b9457bbb6ac77b62598532d2efe3c99 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 13 Mar 2024 13:19:57 +0300 Subject: [PATCH 08/14] Add copy --- .../schemeshard__operation_copy_sequence.cpp | 626 ++++++++++++++++++ 1 file changed, 626 insertions(+) create mode 100644 ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp new file mode 100644 index 000000000000..bde05ea47437 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp @@ -0,0 +1,626 @@ +#include "schemeshard__operation_part.h" +#include "schemeshard__operation_common.h" +#include "schemeshard_impl.h" + +#include +#include +#include + + +namespace { + +using namespace NKikimr; +using namespace NSchemeShard; + +class TConfigureParts : public TSubOperationState { +private: + TOperationId OperationId; + ui32 RestoredSequences = 0; + + TString DebugHint() const override { + return TStringBuilder() + << "TCopySequence TConfigureParts" + << " operationId#" << OperationId; + } + +public: + TConfigureParts(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), { + TEvHive::TEvCreateTabletReply::EventType, + }); + } + + bool HandleReply(NSequenceShard::TEvSequenceShard::TEvRestoreSequenceResult::TPtr& ev, TOperationContext& context) override { + auto ssId = context.SS->SelfTabletId(); + auto tabletId = TTabletId(ev->Get()->Record.GetOrigin()); + auto status = ev->Get()->Record.GetStatus(); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts HandleReply TEvRestoreSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + + switch (status) { + case NKikimrTxSequenceShard::TEvRestoreSequenceResult::SUCCESS: break; + default: + // Treat all other replies as unexpected and spurious + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts HandleReply ignoring unexpected TEvRestoreSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + return false; + } + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + Y_ABORT_UNLESS(txState->State == TTxState::ConfigureParts); + + auto shardIdx = context.SS->MustGetShardIdx(tabletId); + if (!txState->ShardsInProgress.erase(shardIdx)) { + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts HandleReply ignoring duplicate TEvRestoreSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + return false; + } + + if (!txState->ShardsInProgress.empty()) { + return false; + } + + RestoredSequences++; + + if (RestoredSequences > 1 && txState->ShardsInProgress.empty()) { + NIceDb::TNiceDb db(context.GetDB()); + context.SS->ChangeTxState(db, OperationId, TTxState::Propose); + context.OnComplete.ActivateTx(OperationId); + return true; + } + + TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(txState->SourcePathId); + Y_ABORT_UNLESS(sequenceInfo); + Y_ABORT_UNLESS(!sequenceInfo->AlterData); + + for (const auto& shardIdxProto : sequenceInfo->Sharding.GetSequenceShards()) { + TShardIdx shardIdx = FromProto(shardIdxProto); + auto currentTabletId = context.SS->ShardInfos.at(shardIdx).TabletID; + + auto event = MakeHolder(txState->SourcePathId); + event->Record.SetTxId(ui64(OperationId.GetTxId())); + event->Record.SetTxPartId(OperationId.GetSubTxId()); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts ProgressState" + << " sending TEvRestoreSequence to tablet " << currentTabletId + << " operationId# " << OperationId + << " at tablet " << ssId); + + context.OnComplete.BindMsgToPipe(OperationId, currentTabletId, txState->SourcePathId, event.Release()); + + // Wait for results from this shard + txState->ShardsInProgress.insert(shardIdx); + } + + return false; + } + + bool HandleReply(NSequenceShard::TEvSequenceShard::TEvFreezeSequenceResult::TPtr& ev, TOperationContext& context) override { + auto ssId = context.SS->SelfTabletId(); + auto tabletId = TTabletId(ev->Get()->Record.GetOrigin()); + auto status = ev->Get()->Record.GetStatus(); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts HandleReply TEvFreezeSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + + switch (status) { + case NKikimrTxSequenceShard::TEvFreezeSequenceResult::SUCCESS: break; + default: + // Treat all other replies as unexpected and spurious + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts HandleReply ignoring unexpected TEvFreezeSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + return false; + } + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + Y_ABORT_UNLESS(txState->State == TTxState::ConfigureParts); + + auto shardIdx = context.SS->MustGetShardIdx(tabletId); + if (!txState->ShardsInProgress.erase(shardIdx)) { + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts HandleReply ignoring duplicate TEvFreezeSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + return false; + } + + if (!txState->ShardsInProgress.empty()) { + return false; + } + + for (auto shard : txState->Shards) { + auto shardIdx = shard.Idx; + auto currentTabletId = context.SS->ShardInfos.at(shardIdx).TabletID; + Y_ABORT_UNLESS(shard.TabletType == ETabletType::SequenceShard); + + if (currentTabletId == InvalidTabletId) { + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts ProgressState" + << " shard " << shardIdx << " is not created yet, waiting" + << " operationId# " << OperationId + << " at tablet " << ssId); + context.OnComplete.WaitShardCreated(shardIdx, OperationId); + txState->ShardsInProgress.insert(shardIdx); + return false; + } + + auto event = MakeHolder( + txState->TargetPathId, ev->Get()->Record); + event->Record.SetTxId(ui64(OperationId.GetTxId())); + event->Record.SetTxPartId(OperationId.GetSubTxId()); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts ProgressState" + << " sending TEvRestoreSequence to tablet " << currentTabletId + << " operationId# " << OperationId + << " at tablet " << ssId); + + context.OnComplete.BindMsgToPipe(OperationId, currentTabletId, txState->TargetPathId, event.Release()); + + // Wait for results from this shard + txState->ShardsInProgress.insert(shardIdx); + } + + return false; + } + + bool ProgressState(TOperationContext& context) override { + auto ssId = context.SS->SelfTabletId(); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts ProgressState" + << " operationId# " << OperationId + << " at tablet " << ssId); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + Y_ABORT_UNLESS(!txState->Shards.empty()); + Y_ABORT_UNLESS(txState->SourcePathId != InvalidPathId); + + TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(txState->SourcePathId); + Y_ABORT_UNLESS(sequenceInfo); + Y_ABORT_UNLESS(!sequenceInfo->AlterData); + + for (const auto& shardIdxProto : sequenceInfo->Sharding.GetSequenceShards()) { + TShardIdx shardIdx = FromProto(shardIdxProto); + auto tabletId = context.SS->ShardInfos.at(shardIdx).TabletID; + + auto event = MakeHolder(txState->SourcePathId); + event->Record.SetTxId(ui64(OperationId.GetTxId())); + event->Record.SetTxPartId(OperationId.GetSubTxId()); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts ProgressState" + << " sending TEvFreezeSequence to tablet " << tabletId + << " operationId# " << OperationId + << " at tablet " << ssId); + + context.OnComplete.BindMsgToPipe(OperationId, tabletId, txState->SourcePathId, event.Release()); + + txState->ShardsInProgress.insert(shardIdx); + } + + return false; + } +}; + +class TPropose: public TSubOperationState { +private: + TOperationId OperationId; + + TString DebugHint() const override { + return TStringBuilder() + << "TCopySequence TPropose" + << " operationId#" << OperationId; + } + +public: + TPropose(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), { + TEvHive::TEvCreateTabletReply::EventType, + NSequenceShard::TEvSequenceShard::TEvFreezeSequenceResult::EventType, + NSequenceShard::TEvSequenceShard::TEvRestoreSequenceResult::EventType, + }); + } + + bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { + auto step = TStepId(ev->Get()->StepId); + auto ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvOperationPlan" + << ", at schemeshard: " << ssId); + + TTxState* txState = context.SS->FindTx(OperationId); + if (!txState) { + return false; + } + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + + TPathId pathId = txState->TargetPathId; + TPathElement::TPtr path = context.SS->PathsById.at(pathId); + + Y_VERIFY_S(context.SS->Sequences.contains(pathId), "Sequence not found. PathId: " << pathId); + TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(pathId); + Y_ABORT_UNLESS(sequenceInfo); + TSequenceInfo::TPtr alterData = sequenceInfo->AlterData; + Y_ABORT_UNLESS(alterData); + + NIceDb::TNiceDb db(context.GetDB()); + + path->StepCreated = step; + context.SS->PersistCreateStep(db, pathId, step); + + context.SS->Sequences[pathId] = alterData; + context.SS->PersistSequenceAlterRemove(db, pathId); + context.SS->PersistSequence(db, pathId, *alterData); + + auto parentDir = context.SS->PathsById.at(path->ParentPathId); + if (parentDir->IsLikeDirectory()) { + ++parentDir->DirAlterVersion; + context.SS->PersistPathDirAlterVersion(db, parentDir); + } + context.SS->ClearDescribePathCaches(parentDir); + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); + + context.SS->ClearDescribePathCaches(path); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + context.SS->ChangeTxState(db, OperationId, TTxState::Done); + return true; + } + + bool ProgressState(TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " ProgressState" + << ", at schemeshard: " << ssId); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + + context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, TStepId(0)); + return false; + } +}; + +class TCopySequence: public TSubOperation { + + static TTxState::ETxState NextState() { + return TTxState::CreateParts; + } + + TTxState::ETxState NextState(TTxState::ETxState state) const override { + switch (state) { + case TTxState::Waiting: + case TTxState::CreateParts: + return TTxState::ConfigureParts; + case TTxState::ConfigureParts: + return TTxState::Propose; + case TTxState::Propose: + return TTxState::Done; + default: + return TTxState::Invalid; + } + return TTxState::Invalid; + } + + TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) override { + using TPtr = TSubOperationState::TPtr; + + switch (state) { + case TTxState::Waiting: + case TTxState::CreateParts: + return TPtr(new TCreateParts(OperationId)); + case TTxState::ConfigureParts: + return TPtr(new TConfigureParts(OperationId)); + case TTxState::Propose: + return TPtr(new TPropose(OperationId)); + case TTxState::Done: + return TPtr(new TDone(OperationId)); + default: + return nullptr; + } + } + +public: + using TSubOperation::TSubOperation; + + THolder Propose(const TString& owner, TOperationContext& context) override { + const TTabletId ssId = context.SS->SelfTabletId(); + + const auto acceptExisted = !Transaction.GetFailOnExist(); + const TString& parentPathStr = Transaction.GetWorkingDir(); + auto& descr = Transaction.GetSequence(); + const TString& name = descr.GetName(); + + LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence Propose" + << ", path: " << parentPathStr << "/" << name + << ", opId: " << OperationId + << ", at schemeshard: " << ssId); + + TEvSchemeShard::EStatus status = NKikimrScheme::StatusAccepted; + auto result = MakeHolder(status, ui64(OperationId.GetTxId()), ui64(ssId)); + + NSchemeShard::TPath parentPath = NSchemeShard::TPath::Resolve(parentPathStr, context.SS); + { + NSchemeShard::TPath::TChecker checks = parentPath.Check(); + checks + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsCommonSensePath(); + + if (checks) { + if (parentPath->IsTable()) { + // allow immediately inside a normal table + if (parentPath.IsUnderOperation()) { + checks.IsUnderTheSameOperation(OperationId.GetTxId()); // allowed only as part of consistent operations + } + } else { + // otherwise don't allow unexpected object types + checks.IsLikeDirectory(); + } + } + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + return result; + } + } + + TPath srcPath = TPath::Resolve(descr.GetCopyFromSequence(), context.SS); + { + TPath::TChecker checks = srcPath.Check(); + checks + .NotEmpty() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsSequence() + .NotUnderTheSameOperation(OperationId.GetTxId()) + .NotUnderOperation(); + + if (checks) { + if (!parentPath->IsTable()) { + // otherwise don't allow unexpected object types + checks.IsLikeDirectory(); + } + } + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + return result; + } + } + + auto domainPathId = parentPath.GetPathIdForDomain(); + auto domainInfo = parentPath.DomainInfo(); + + // TODO: maybe select from several shards + ui64 shardsToCreate = 0; + TShardIdx sequenceShard; + if (domainInfo->GetSequenceShards().empty()) { + ++shardsToCreate; + } else { + sequenceShard = *domainInfo->GetSequenceShards().begin(); + } + + const TString acl = Transaction.GetModifyACL().GetDiffACL(); + + NSchemeShard::TPath dstPath = parentPath.Child(name); + { + NSchemeShard::TPath::TChecker checks = dstPath.Check(); + checks.IsAtLocalSchemeShard(); + if (dstPath.IsResolved()) { + checks + .IsResolved() + .NotUnderDeleting() + .FailOnExist(TPathElement::EPathType::EPathTypeSequence, acceptExisted); + } else { + checks + .NotEmpty() + .NotResolved(); + } + + if (checks) { + checks.IsValidLeafName(); + + if (!parentPath->IsTable()) { + checks.DepthLimit(); + } + + checks + .PathsLimit() + .DirChildrenLimit() + .ShardsLimit(shardsToCreate) + .IsTheSameDomain(srcPath) + //.PathShardsLimit(shardsToCreate) + .IsValidACL(acl); + } + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + if (dstPath.IsResolved()) { + result->SetPathCreateTxId(ui64(dstPath->CreateTxId)); + result->SetPathId(dstPath->PathId.LocalPathId); + } + return result; + } + } + + TString errStr; + + if (!TSequenceInfo::ValidateCreate(descr, errStr)) { + result->SetError(NKikimrScheme::StatusSchemeError, errStr); + return result; + } + + if (!context.SS->CheckApplyIf(Transaction, errStr)) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); + return result; + } + + Y_ABORT_UNLESS(context.SS->Sequences.contains(srcPath.Base()->PathId)); + + const ui32 profileId = 0; + TChannelsBindings channelsBindings; + if (shardsToCreate) { + if (!context.SS->ResolveTabletChannels(profileId, dstPath.GetPathIdForDomain(), channelsBindings)) { + result->SetError(NKikimrScheme::StatusInvalidParameter, + "Unable to construct channel binding for sequence shard with the storage pool"); + return result; + } + } + + dstPath.MaterializeLeaf(owner); + result->SetPathId(dstPath->PathId.LocalPathId); + context.SS->TabletCounters->Simple()[COUNTER_SEQUENCE_COUNT].Add(1); + + TPathId pathId = dstPath->PathId; + dstPath->CreateTxId = OperationId.GetTxId(); + dstPath->LastTxId = OperationId.GetTxId(); + dstPath->PathState = TPathElement::EPathState::EPathStateCreate; + dstPath->PathType = TPathElement::EPathType::EPathTypeSequence; + + if (parentPath->HasActiveChanges()) { + TTxId parentTxId = parentPath->PlannedToCreate() ? parentPath->CreateTxId : parentPath->LastTxId; + context.OnComplete.Dependence(parentTxId, OperationId.GetTxId()); + } + + TTxState& txState = + context.SS->CreateTx(OperationId, TTxState::TxCopySequence, pathId, srcPath.Base()->PathId); + txState.State = TTxState::ConfigureParts; + + TSequenceInfo::TPtr sequenceInfo = new TSequenceInfo(0); + TSequenceInfo::TPtr alterData = sequenceInfo->CreateNextVersion(); + alterData->Description = descr; + + if (shardsToCreate) { + sequenceShard = context.SS->RegisterShardInfo( + TShardInfo::SequenceShardInfo(OperationId.GetTxId(), domainPathId) + .WithBindedChannels(channelsBindings)); + context.SS->TabletCounters->Simple()[COUNTER_SEQUENCESHARD_COUNT].Add(1); + txState.Shards.emplace_back(sequenceShard, ETabletType::SequenceShard, TTxState::CreateParts); + txState.State = TTxState::CreateParts; + context.SS->PathsById.at(domainPathId)->IncShardsInside(); + domainInfo->AddInternalShard(sequenceShard); + domainInfo->AddSequenceShard(sequenceShard); + } else { + txState.Shards.emplace_back(sequenceShard, ETabletType::SequenceShard, TTxState::ConfigureParts); + auto& shardInfo = context.SS->ShardInfos.at(sequenceShard); + if (shardInfo.CurrentTxId != OperationId.GetTxId()) { + context.OnComplete.Dependence(shardInfo.CurrentTxId, OperationId.GetTxId()); + } + } + + { + auto* p = alterData->Sharding.AddSequenceShards(); + p->SetOwnerId(sequenceShard.GetOwnerId()); + p->SetLocalId(ui64(sequenceShard.GetLocalId())); + } + + NIceDb::TNiceDb db(context.GetDB()); + + context.SS->ChangeTxState(db, OperationId, txState.State); + context.OnComplete.ActivateTx(OperationId); + + context.SS->PersistPath(db, dstPath->PathId); + if (!acl.empty()) { + dstPath->ApplyACL(acl); + context.SS->PersistACL(db, dstPath.Base()); + } + + context.SS->Sequences[pathId] = sequenceInfo; + context.SS->PersistSequence(db, pathId, *sequenceInfo); + context.SS->PersistSequenceAlter(db, pathId, *alterData); + context.SS->IncrementPathDbRefCount(pathId); + + context.SS->PersistTxState(db, OperationId); + context.SS->PersistUpdateNextPathId(db); + if (shardsToCreate) { + context.SS->PersistUpdateNextShardIdx(db); + } + + for (auto shard : txState.Shards) { + if (shard.Operation == TTxState::CreateParts) { + context.SS->PersistChannelsBinding(db, shard.Idx, context.SS->ShardInfos.at(shard.Idx).BindedChannels); + context.SS->PersistShardMapping(db, shard.Idx, InvalidTabletId, domainPathId, OperationId.GetTxId(), shard.TabletType); + } + } + + ++parentPath->DirAlterVersion; + context.SS->PersistPathDirAlterVersion(db, parentPath.Base()); + context.SS->ClearDescribePathCaches(parentPath.Base()); + context.OnComplete.PublishToSchemeBoard(OperationId, parentPath->PathId); + + context.SS->ClearDescribePathCaches(dstPath.Base()); + context.OnComplete.PublishToSchemeBoard(OperationId, dstPath->PathId); + + domainInfo->IncPathsInside(); + parentPath->IncAliveChildren(); + + SetState(NextState()); + return result; + } + + void AbortPropose(TOperationContext&) override { + Y_ABORT("no AbortPropose for TCopySequence"); + } + + void AbortUnsafe(TTxId, TOperationContext&) override { + Y_ABORT("no AbortUnsafe for TCopySequence"); + } +}; + +} + +namespace NKikimr::NSchemeShard { + +ISubOperation::TPtr CreateCopySequence(TOperationId id, const TTxTransaction& tx) +{ + return MakeSubOperation(id, tx); +} + +ISubOperation::TPtr CreateCopySequence(TOperationId id, TTxState::ETxState state) { + return MakeSubOperation(id, state); +} + +} From 6dca0dd5226063bcdc7861b19c2408a0a7fc3da8 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 13 Mar 2024 14:26:56 +0300 Subject: [PATCH 09/14] Fixes --- .../schemeshard/schemeshard__operation_copy_sequence.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp index bde05ea47437..99869ed7f2eb 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp @@ -16,6 +16,7 @@ class TConfigureParts : public TSubOperationState { private: TOperationId OperationId; ui32 RestoredSequences = 0; + NKikimrTxSequenceShard::TEvFreezeSequenceResult FreezeSequenceResult; TString DebugHint() const override { return TStringBuilder() @@ -94,7 +95,8 @@ class TConfigureParts : public TSubOperationState { TShardIdx shardIdx = FromProto(shardIdxProto); auto currentTabletId = context.SS->ShardInfos.at(shardIdx).TabletID; - auto event = MakeHolder(txState->SourcePathId); + auto event = MakeHolder( + txState->SourcePathId, FreezeSequenceResult); event->Record.SetTxId(ui64(OperationId.GetTxId())); event->Record.SetTxPartId(OperationId.GetSubTxId()); @@ -158,6 +160,8 @@ class TConfigureParts : public TSubOperationState { return false; } + FreezeSequenceResult = ev->Get()->Record; + for (auto shard : txState->Shards) { auto shardIdx = shard.Idx; auto currentTabletId = context.SS->ShardInfos.at(shardIdx).TabletID; @@ -175,7 +179,7 @@ class TConfigureParts : public TSubOperationState { } auto event = MakeHolder( - txState->TargetPathId, ev->Get()->Record); + txState->TargetPathId, FreezeSequenceResult); event->Record.SetTxId(ui64(OperationId.GetTxId())); event->Record.SetTxPartId(OperationId.GetSubTxId()); From f592eb979b08d47ec3b2389e71f7b19d12aacfb4 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 13 Mar 2024 14:27:17 +0300 Subject: [PATCH 10/14] Fixes --- ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 130 +++++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index 7aac95eca00c..58824e917d23 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -1698,6 +1698,136 @@ Y_UNIT_TEST_SUITE(KqpPg) { } } + Y_UNIT_TEST(CopyTableSerialColumns) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetEnableNotNullDataColumns(true)); + auto client = kikimr.GetTableClient(); + auto session = client.CreateSession().GetValueSync().GetSession(); + { + const auto query = Q_(R"( + --!syntax_pg + CREATE TABLE PgSerial ( + key serial PRIMARY KEY, + value int2 + ))"); + + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO PgSerial (value) values (1); + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto result = session.CopyTable("/Root/PgSerial", "/Root/copy").GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto desc = session.DescribeTable("/Root/copy").ExtractValueSync(); + UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM copy; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO copy (value) values (1); + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM copy; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"];["2";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM PgSerial; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO PgSerial (value) values (1); + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM copy; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"];["2";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM PgSerial; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"];["2";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + } + Y_UNIT_TEST(CreateIndex) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);; From 5c3b8a81bd631a0aa99e7be79580da9fd1b81a6d Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 13 Mar 2024 14:41:11 +0300 Subject: [PATCH 11/14] Fixes --- ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp index cb4b6c1e1a69..db2212d3f627 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp @@ -280,10 +280,6 @@ class TCopyTable: public TSubOperation { return AppData()->AllowShadowDataInSchemeShardForTests; } - void SetLocalSequences(const THashSet& localSequences) { - LocalSequences = localSequences; - } - THolder Propose(const TString& owner, TOperationContext& context) override { const TTabletId ssId = context.SS->SelfTabletId(); From 363d6a3884b2afc312067b28bab4eb0060a2fefa Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 13 Mar 2024 14:47:01 +0300 Subject: [PATCH 12/14] Fixes --- .../tx/schemeshard/schemeshard__operation_copy_sequence.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp index 99869ed7f2eb..9bb94878f971 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp @@ -518,6 +518,9 @@ class TCopySequence: public TSubOperation { result->SetPathId(dstPath->PathId.LocalPathId); context.SS->TabletCounters->Simple()[COUNTER_SEQUENCE_COUNT].Add(1); + srcPath.Base()->PathState = TPathElement::EPathState::EPathStateCopying; + srcPath.Base()->LastTxId = OperationId.GetTxId(); + TPathId pathId = dstPath->PathId; dstPath->CreateTxId = OperationId.GetTxId(); dstPath->LastTxId = OperationId.GetTxId(); From 1befb13fbb34d4351130a08d986be9639760792f Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Mon, 18 Mar 2024 15:14:14 +0300 Subject: [PATCH 13/14] Fixes --- ydb/core/protos/counters_sequenceshard.proto | 1 + ydb/core/protos/out/out_sequenceshard.cpp | 4 + ydb/core/protos/tx_sequenceshard.proto | 30 ++++ .../schemeshard__operation_copy_sequence.cpp | 113 +++++++------ .../schemeshard__operation_copy_table.cpp | 150 +++++++++++++++++- .../tx/schemeshard/schemeshard_tx_infly.h | 1 + ydb/core/tx/schemeshard/ut_sequence/ya.make | 2 +- ydb/core/tx/sequenceshard/public/events.h | 52 ++++++ .../tx/sequenceshard/sequenceshard_impl.cpp | 1 + .../tx/sequenceshard/sequenceshard_impl.h | 2 + ydb/core/tx/sequenceshard/tx_get_sequence.cpp | 93 +++++++++++ ydb/core/tx/sequenceshard/ut_helpers.cpp | 25 +++ ydb/core/tx/sequenceshard/ut_helpers.h | 4 + .../tx/sequenceshard/ut_sequenceshard.cpp | 9 ++ ydb/core/tx/sequenceshard/ya.make | 1 + 15 files changed, 439 insertions(+), 49 deletions(-) create mode 100644 ydb/core/tx/sequenceshard/tx_get_sequence.cpp diff --git a/ydb/core/protos/counters_sequenceshard.proto b/ydb/core/protos/counters_sequenceshard.proto index 6ace98084695..ec396a082dac 100644 --- a/ydb/core/protos/counters_sequenceshard.proto +++ b/ydb/core/protos/counters_sequenceshard.proto @@ -34,4 +34,5 @@ enum ETxTypes { TXTYPE_FREEZE_SEQUENCE = 7 [(TxTypeOpts) = {Name: "TxFreezeSequence"}]; TXTYPE_RESTORE_SEQUENCE = 8 [(TxTypeOpts) = {Name: "TxRestoreSequence"}]; TXTYPE_REDIRECT_SEQUENCE = 9 [(TxTypeOpts) = {Name: "TxRedirectSequence"}]; + TXTYPE_GET_SEQUENCE = 10 [(TxTypeOpts) = {Name: "TxGetSequence"}]; } diff --git a/ydb/core/protos/out/out_sequenceshard.cpp b/ydb/core/protos/out/out_sequenceshard.cpp index 066fa1309119..ce193e08a826 100644 --- a/ydb/core/protos/out/out_sequenceshard.cpp +++ b/ydb/core/protos/out/out_sequenceshard.cpp @@ -29,3 +29,7 @@ Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvRestoreSequenceResult::EStatus, Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvRedirectSequenceResult::EStatus, stream, value) { stream << NKikimrTxSequenceShard::TEvRedirectSequenceResult::EStatus_Name(value); } + +Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus, stream, value) { + stream << NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus_Name(value); +} diff --git a/ydb/core/protos/tx_sequenceshard.proto b/ydb/core/protos/tx_sequenceshard.proto index 9a034f314fe9..69202bcf60fe 100644 --- a/ydb/core/protos/tx_sequenceshard.proto +++ b/ydb/core/protos/tx_sequenceshard.proto @@ -224,3 +224,33 @@ message TEvRedirectSequenceResult { uint64 TxId = 3; uint64 TxPartId = 4; } + +message TEvGetSequence { + NKikimrProto.TPathID PathId = 1; + uint64 TxId = 2; + uint64 TxPartId = 3; +} + +message TEvGetSequenceResult { + enum EStatus { + UNKNOWN = 0; + SUCCESS = 1; + PIPE_OUTDATED = 2; + SEQUENCE_NOT_FOUND = 3; + SEQUENCE_MOVED = 4; + } + + EStatus Status = 1; + uint64 Origin = 2; + uint64 TxId = 3; + uint64 TxPartId = 4; + uint64 MovedTo = 5; // moved to a different sequence shard + sint64 MinValue = 6; + sint64 MaxValue = 7; + sint64 StartValue = 8; + sint64 NextValue = 9; + bool NextUsed = 10; + uint64 Cache = 11; + sint64 Increment = 12; + bool Cycle = 13; +} diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp index 9bb94878f971..47d0447e1c2f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp @@ -12,11 +12,57 @@ namespace { using namespace NKikimr; using namespace NSchemeShard; +class TCopyTableBarrier: public TSubOperationState { +private: + TOperationId OperationId; + + TString DebugHint() const override { + return TStringBuilder() + << "TCopySequence TCopyTableBarrier" + << " operationId: " << OperationId; + } + +public: + TCopyTableBarrier(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), {}); + } + + bool HandleReply(TEvPrivate::TEvCompleteBarrier::TPtr& ev, TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvPrivate::TEvCompleteBarrier" + << ", msg: " << ev->Get()->ToString() + << ", at tablet" << ssId); + + NIceDb::TNiceDb db(context.GetDB()); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + + context.SS->ChangeTxState(db, OperationId, TTxState::CreateParts); + return true; + } + + bool ProgressState(TOperationContext& context) override { + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << "ProgressState, operation type " + << TTxState::TypeName(txState->TxType)); + + context.OnComplete.Barrier(OperationId, "CopyTableBarrier"); + return false; + } +}; + class TConfigureParts : public TSubOperationState { private: TOperationId OperationId; - ui32 RestoredSequences = 0; - NKikimrTxSequenceShard::TEvFreezeSequenceResult FreezeSequenceResult; + NKikimrTxSequenceShard::TEvGetSequenceResult GetSequenceResult; TString DebugHint() const override { return TStringBuilder() @@ -78,61 +124,32 @@ class TConfigureParts : public TSubOperationState { return false; } - RestoredSequences++; - - if (RestoredSequences > 1 && txState->ShardsInProgress.empty()) { - NIceDb::TNiceDb db(context.GetDB()); - context.SS->ChangeTxState(db, OperationId, TTxState::Propose); - context.OnComplete.ActivateTx(OperationId); - return true; - } - - TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(txState->SourcePathId); - Y_ABORT_UNLESS(sequenceInfo); - Y_ABORT_UNLESS(!sequenceInfo->AlterData); - - for (const auto& shardIdxProto : sequenceInfo->Sharding.GetSequenceShards()) { - TShardIdx shardIdx = FromProto(shardIdxProto); - auto currentTabletId = context.SS->ShardInfos.at(shardIdx).TabletID; - - auto event = MakeHolder( - txState->SourcePathId, FreezeSequenceResult); - event->Record.SetTxId(ui64(OperationId.GetTxId())); - event->Record.SetTxPartId(OperationId.GetSubTxId()); - - LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TCopySequence TConfigureParts ProgressState" - << " sending TEvRestoreSequence to tablet " << currentTabletId - << " operationId# " << OperationId - << " at tablet " << ssId); - - context.OnComplete.BindMsgToPipe(OperationId, currentTabletId, txState->SourcePathId, event.Release()); - - // Wait for results from this shard - txState->ShardsInProgress.insert(shardIdx); - } + NIceDb::TNiceDb db(context.GetDB()); + context.SS->ChangeTxState(db, OperationId, TTxState::Propose); + context.OnComplete.ActivateTx(OperationId); + return true; return false; } - bool HandleReply(NSequenceShard::TEvSequenceShard::TEvFreezeSequenceResult::TPtr& ev, TOperationContext& context) override { + bool HandleReply(NSequenceShard::TEvSequenceShard::TEvGetSequenceResult::TPtr& ev, TOperationContext& context) override { auto ssId = context.SS->SelfTabletId(); auto tabletId = TTabletId(ev->Get()->Record.GetOrigin()); auto status = ev->Get()->Record.GetStatus(); LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TCopySequence TConfigureParts HandleReply TEvFreezeSequenceResult" + "TCopySequence TConfigureParts HandleReply TEvGetSequenceResult" << " shardId# " << tabletId << " status# " << status << " operationId# " << OperationId << " at tablet " << ssId); switch (status) { - case NKikimrTxSequenceShard::TEvFreezeSequenceResult::SUCCESS: break; + case NKikimrTxSequenceShard::TEvGetSequenceResult::SUCCESS: break; default: // Treat all other replies as unexpected and spurious LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TCopySequence TConfigureParts HandleReply ignoring unexpected TEvFreezeSequenceResult" + "TCopySequence TConfigureParts HandleReply ignoring unexpected TEvGetSequenceResult" << " shardId# " << tabletId << " status# " << status << " operationId# " << OperationId @@ -148,7 +165,7 @@ class TConfigureParts : public TSubOperationState { auto shardIdx = context.SS->MustGetShardIdx(tabletId); if (!txState->ShardsInProgress.erase(shardIdx)) { LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TCopySequence TConfigureParts HandleReply ignoring duplicate TEvFreezeSequenceResult" + "TCopySequence TConfigureParts HandleReply ignoring duplicate TEvGetSequenceResult" << " shardId# " << tabletId << " status# " << status << " operationId# " << OperationId @@ -160,7 +177,7 @@ class TConfigureParts : public TSubOperationState { return false; } - FreezeSequenceResult = ev->Get()->Record; + GetSequenceResult = ev->Get()->Record; for (auto shard : txState->Shards) { auto shardIdx = shard.Idx; @@ -179,7 +196,7 @@ class TConfigureParts : public TSubOperationState { } auto event = MakeHolder( - txState->TargetPathId, FreezeSequenceResult); + txState->TargetPathId, GetSequenceResult); event->Record.SetTxId(ui64(OperationId.GetTxId())); event->Record.SetTxPartId(OperationId.GetSubTxId()); @@ -219,13 +236,13 @@ class TConfigureParts : public TSubOperationState { TShardIdx shardIdx = FromProto(shardIdxProto); auto tabletId = context.SS->ShardInfos.at(shardIdx).TabletID; - auto event = MakeHolder(txState->SourcePathId); + auto event = MakeHolder(txState->SourcePathId); event->Record.SetTxId(ui64(OperationId.GetTxId())); event->Record.SetTxPartId(OperationId.GetSubTxId()); LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TCopySequence TConfigureParts ProgressState" - << " sending TEvFreezeSequence to tablet " << tabletId + << " sending TEvGetSequence to tablet " << tabletId << " operationId# " << OperationId << " at tablet " << ssId); @@ -325,12 +342,13 @@ class TPropose: public TSubOperationState { class TCopySequence: public TSubOperation { static TTxState::ETxState NextState() { - return TTxState::CreateParts; + return TTxState::CopyTableBarrier; } TTxState::ETxState NextState(TTxState::ETxState state) const override { switch (state) { - case TTxState::Waiting: + case TTxState::CopyTableBarrier: + return TTxState::CreateParts; case TTxState::CreateParts: return TTxState::ConfigureParts; case TTxState::ConfigureParts: @@ -347,7 +365,8 @@ class TCopySequence: public TSubOperation { using TPtr = TSubOperationState::TPtr; switch (state) { - case TTxState::Waiting: + case TTxState::CopyTableBarrier: + return TPtr(new TCopyTableBarrier(OperationId)); case TTxState::CreateParts: return TPtr(new TCreateParts(OperationId)); case TTxState::ConfigureParts: diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp index db2212d3f627..289b0989cdd9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp @@ -1,6 +1,7 @@ #include "schemeshard__operation_part.h" #include "schemeshard__operation_common.h" #include "schemeshard_impl.h" +#include "schemeshard_tx_infly.h" #include @@ -225,6 +226,149 @@ class TPropose: public TSubOperationState { } }; +class TProposedWaitParts: public TSubOperationState { +private: + TOperationId OperationId; + + TString DebugHint() const override { + return TStringBuilder() + << "TCopyTable TProposedWaitParts" + << " operationId#" << OperationId; + } + +public: + TProposedWaitParts(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), + {TEvHive::TEvCreateTabletReply::EventType, + TEvColumnShard::TEvProposeTransactionResult::EventType, + TEvPrivate::TEvOperationPlan::EventType}); + } + + bool HandleReply(TEvDataShard::TEvSchemaChanged::TPtr& ev, TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + const auto& evRecord = ev->Get()->Record; + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvSchemaChanged" + << " at tablet: " << ssId); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvSchemaChanged" + << " at tablet: " << ssId + << " message: " << evRecord.ShortDebugString()); + + if (!NTableState::CollectSchemaChanged(OperationId, ev, context)) { + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvSchemaChanged" + << " CollectSchemaChanged: false"); + return false; + } + + Y_ABORT_UNLESS(context.SS->FindTx(OperationId)); + TTxState& txState = *context.SS->FindTx(OperationId); + + if (!txState.ReadyForNotifications) { + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvSchemaChanged" + << " ReadyForNotifications: false"); + return false; + } + + return true; + } + + bool ProgressState(TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " ProgressState" + << " at tablet: " << ssId); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopyTable); + + NIceDb::TNiceDb db(context.GetDB()); + + txState->ClearShardsInProgress(); + + for (auto& shard : txState->Shards) { + if (shard.Operation < TTxState::ProposedWaitParts) { + shard.Operation = TTxState::ProposedWaitParts; + context.SS->PersistUpdateTxShard(db, OperationId, shard.Idx, shard.Operation); + } + + Y_ABORT_UNLESS(context.SS->ShardInfos.contains(shard.Idx)); + context.OnComplete.RouteByTablet(OperationId, context.SS->ShardInfos.at(shard.Idx).TabletID); + } + + txState->UpdateShardsInProgress(TTxState::ProposedWaitParts); + + txState->AcceptPendingSchemeNotification(); + + if (txState->ShardsInProgress.empty()) { + NTableState::AckAllSchemaChanges(OperationId, *txState, context); + context.SS->ChangeTxState(db, OperationId, TTxState::CopyTableBarrier); + return true; + } + + return false; + } +}; + +class TCopyTableBarrier: public TSubOperationState { +private: + TOperationId OperationId; + + TString DebugHint() const override { + return TStringBuilder() + << "TCopyTable TCopyTableBarrier" + << " operationId: " << OperationId; + } + +public: + TCopyTableBarrier(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), + { TEvHive::TEvCreateTabletReply::EventType + , TEvDataShard::TEvProposeTransactionResult::EventType + , TEvPrivate::TEvOperationPlan::EventType + , TEvDataShard::TEvSchemaChanged::EventType } + ); + } + + bool HandleReply(TEvPrivate::TEvCompleteBarrier::TPtr& ev, TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvPrivate::TEvCompleteBarrier" + << ", msg: " << ev->Get()->ToString() + << ", at tablet" << ssId); + + NIceDb::TNiceDb db(context.GetDB()); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + + context.SS->ChangeTxState(db, OperationId, TTxState::Done); + return true; + } + + bool ProgressState(TOperationContext& context) override { + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << "ProgressState, operation type " + << TTxState::TypeName(txState->TxType)); + + context.OnComplete.Barrier(OperationId, "CopyTableBarrier"); + return false; + } +}; + class TCopyTable: public TSubOperation { THashSet LocalSequences; @@ -243,6 +387,8 @@ class TCopyTable: public TSubOperation { case TTxState::Propose: return TTxState::ProposedWaitParts; case TTxState::ProposedWaitParts: + return TTxState::CopyTableBarrier; + case TTxState::CopyTableBarrier: return TTxState::Done; default: return TTxState::Invalid; @@ -259,7 +405,9 @@ class TCopyTable: public TSubOperation { case TTxState::Propose: return MakeHolder(OperationId); case TTxState::ProposedWaitParts: - return MakeHolder(OperationId); + return MakeHolder(OperationId); + case TTxState::CopyTableBarrier: + return MakeHolder(OperationId); case TTxState::Done: return MakeHolder(OperationId); default: diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h index dc89583282d8..1cd1e7a7a066 100644 --- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h +++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h @@ -194,6 +194,7 @@ struct TTxState { item(WaitShadowPathPublication, 136, "") \ item(DeletePathBarrier, 137, "") \ item(SyncHive, 138, "") \ + item(CopyTableBarrier, 139, "") \ item(Done, 240, "") \ item(Aborted, 250, "") diff --git a/ydb/core/tx/schemeshard/ut_sequence/ya.make b/ydb/core/tx/schemeshard/ut_sequence/ya.make index d48d9e399ec9..3c1a2d0bbcf3 100644 --- a/ydb/core/tx/schemeshard/ut_sequence/ya.make +++ b/ydb/core/tx/schemeshard/ut_sequence/ya.make @@ -9,7 +9,7 @@ IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) SIZE(LARGE) TAG(ya:fat) ELSE() - TIMEOUT(600) + TIMEOUT(50) SIZE(MEDIUM) ENDIF() diff --git a/ydb/core/tx/sequenceshard/public/events.h b/ydb/core/tx/sequenceshard/public/events.h index 599bd718c180..10acabd2f942 100644 --- a/ydb/core/tx/sequenceshard/public/events.h +++ b/ydb/core/tx/sequenceshard/public/events.h @@ -24,6 +24,8 @@ namespace NSequenceShard { EvRestoreSequenceResult, EvRedirectSequence, EvRedirectSequenceResult, + EvGetSequence, + EvGetSequenceResult, EvEnd, }; @@ -323,6 +325,11 @@ namespace NSequenceShard { InitFrom(record); } + TEvRestoreSequence(const TPathId& pathId, const NKikimrTxSequenceShard::TEvGetSequenceResult& record) { + SetPathId(pathId); + InitFrom(record); + } + void SetPathId(const TPathId& pathId) { auto* p = Record.MutablePathId(); p->SetOwnerId(pathId.OwnerId); @@ -344,6 +351,17 @@ namespace NSequenceShard { Record.SetIncrement(record.GetIncrement()); Record.SetCycle(record.GetCycle()); } + + void InitFrom(const NKikimrTxSequenceShard::TEvGetSequenceResult& record) { + Record.SetMinValue(record.GetMinValue()); + Record.SetMaxValue(record.GetMaxValue()); + Record.SetStartValue(record.GetStartValue()); + Record.SetNextValue(record.GetNextValue()); + Record.SetNextUsed(record.GetNextUsed()); + Record.SetCache(record.GetCache()); + Record.SetIncrement(record.GetIncrement()); + Record.SetCycle(record.GetCycle()); + } }; struct TEvRestoreSequenceResult @@ -394,6 +412,40 @@ namespace NSequenceShard { } }; + struct TEvGetSequence + : public TEventPB + { + TEvGetSequence() = default; + + explicit TEvGetSequence(const TPathId& pathId) { + SetPathId(pathId); + } + + void SetPathId(const TPathId& pathId) { + auto* p = Record.MutablePathId(); + p->SetOwnerId(pathId.OwnerId); + p->SetLocalId(pathId.LocalPathId); + } + + TPathId GetPathId() const { + const auto& p = Record.GetPathId(); + return TPathId(p.GetOwnerId(), p.GetLocalId()); + } + }; + + struct TEvGetSequenceResult + : public TEventPB + { + using EStatus = NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus; + + TEvGetSequenceResult() = default; + + TEvGetSequenceResult(EStatus status, ui64 origin) { + Record.SetStatus(status); + Record.SetOrigin(origin); + } + }; + }; } // namespace NSequenceShard diff --git a/ydb/core/tx/sequenceshard/sequenceshard_impl.cpp b/ydb/core/tx/sequenceshard/sequenceshard_impl.cpp index c23602e643c8..ed88fcbd4233 100644 --- a/ydb/core/tx/sequenceshard/sequenceshard_impl.cpp +++ b/ydb/core/tx/sequenceshard/sequenceshard_impl.cpp @@ -91,6 +91,7 @@ namespace NSequenceShard { HFunc(TEvSequenceShard::TEvFreezeSequence, Handle); HFunc(TEvSequenceShard::TEvRestoreSequence, Handle); HFunc(TEvSequenceShard::TEvRedirectSequence, Handle); + HFunc(TEvSequenceShard::TEvGetSequence, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { diff --git a/ydb/core/tx/sequenceshard/sequenceshard_impl.h b/ydb/core/tx/sequenceshard/sequenceshard_impl.h index 0a36fee2b17b..e3e39b6c867c 100644 --- a/ydb/core/tx/sequenceshard/sequenceshard_impl.h +++ b/ydb/core/tx/sequenceshard/sequenceshard_impl.h @@ -74,6 +74,7 @@ namespace NSequenceShard { struct TTxFreezeSequence; struct TTxRestoreSequence; struct TTxRedirectSequence; + struct TTxGetSequence; void RunTxInitSchema(const TActorContext& ctx); void RunTxInit(const TActorContext& ctx); @@ -100,6 +101,7 @@ namespace NSequenceShard { void Handle(TEvSequenceShard::TEvFreezeSequence::TPtr& ev, const TActorContext& ctx); void Handle(TEvSequenceShard::TEvRestoreSequence::TPtr& ev, const TActorContext& ctx); void Handle(TEvSequenceShard::TEvRedirectSequence::TPtr& ev, const TActorContext& ctx); + void Handle(TEvSequenceShard::TEvGetSequence::TPtr& ev, const TActorContext& ctx); private: struct TPipeInfo { diff --git a/ydb/core/tx/sequenceshard/tx_get_sequence.cpp b/ydb/core/tx/sequenceshard/tx_get_sequence.cpp new file mode 100644 index 000000000000..53a95cb8b017 --- /dev/null +++ b/ydb/core/tx/sequenceshard/tx_get_sequence.cpp @@ -0,0 +1,93 @@ +#include "sequenceshard_impl.h" + +namespace NKikimr { +namespace NSequenceShard { + + struct TSequenceShard::TTxGetSequence : public TTxBase { + explicit TTxGetSequence(TSelf* self, TEvSequenceShard::TEvGetSequence::TPtr&& ev) + : TTxBase(self) + , Ev(std::move(ev)) + { } + + TTxType GetTxType() const override { return TXTYPE_GET_SEQUENCE; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + Y_UNUSED(txc); + + const auto* msg = Ev->Get(); + + auto pathId = msg->GetPathId(); + + SLOG_T("TTxGetSequence.Execute" + << " PathId# " << pathId); + + if (!Self->CheckPipeRequest(Ev->Recipient)) { + SetResult(NKikimrTxSequenceShard::TEvGetSequenceResult::PIPE_OUTDATED); + SLOG_T("TTxGetSequence.Execute PIPE_OUTDATED" + << " PathId# " << pathId); + return true; + } + + auto it = Self->Sequences.find(pathId); + if (it == Self->Sequences.end()) { + SetResult(NKikimrTxSequenceShard::TEvGetSequenceResult::SEQUENCE_NOT_FOUND); + SLOG_T("TTxGetSequence.Execute SEQUENCE_NOT_FOUND" + << " PathId# " << pathId); + return true; + } + + auto& sequence = it->second; + switch (sequence.State) { + case Schema::ESequenceState::Active: + break; + case Schema::ESequenceState::Frozen: + break; + case Schema::ESequenceState::Moved: { + SetResult(NKikimrTxSequenceShard::TEvGetSequenceResult::SEQUENCE_MOVED); + Result->Record.SetMovedTo(sequence.MovedTo); + SLOG_T("TTxGetSequence.Execute SEQUENCE_MOVED" + << " PathId# " << pathId + << " MovedTo# " << sequence.MovedTo); + return true; + } + } + + SetResult(NKikimrTxSequenceShard::TEvGetSequenceResult::SUCCESS); + Result->Record.SetMinValue(sequence.MinValue); + Result->Record.SetMaxValue(sequence.MaxValue); + Result->Record.SetStartValue(sequence.StartValue); + Result->Record.SetNextValue(sequence.NextValue); + Result->Record.SetNextUsed(sequence.NextUsed); + Result->Record.SetCache(sequence.Cache); + Result->Record.SetIncrement(sequence.Increment); + Result->Record.SetCycle(sequence.Cycle); + SLOG_N("TTxGetSequence.Execute SUCCESS" + << " PathId# " << pathId); + return true; + } + + void Complete(const TActorContext& ctx) override { + SLOG_T("TTxGetSequence.Complete"); + + if (Result) { + ctx.Send(Ev->Sender, Result.Release(), 0, Ev->Cookie); + } + } + + void SetResult(NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus status) { + Result.Reset(new TEvSequenceShard::TEvGetSequenceResult(status, Self->TabletID())); + Result->Record.SetTxId(Ev->Get()->Record.GetTxId()); + Result->Record.SetTxPartId(Ev->Get()->Record.GetTxPartId()); + } + + TEvSequenceShard::TEvGetSequence::TPtr Ev; + THolder Result; + }; + + + void TSequenceShard::Handle(TEvSequenceShard::TEvGetSequence::TPtr& ev, const TActorContext& ctx) { + Execute(new TTxGetSequence(this, std::move(ev)), ctx); + } + +} // namespace NSequenceShard +} // namespace NKikimr diff --git a/ydb/core/tx/sequenceshard/ut_helpers.cpp b/ydb/core/tx/sequenceshard/ut_helpers.cpp index 8736616f2704..a73622975575 100644 --- a/ydb/core/tx/sequenceshard/ut_helpers.cpp +++ b/ydb/core/tx/sequenceshard/ut_helpers.cpp @@ -268,5 +268,30 @@ namespace NSequenceShard { return NextRedirectSequenceResult(cookie, edge); } + void TTestContext::SendGetSequence(ui64 cookie, const TActorId& edge, const TPathId& pathId) + { + SendFromEdge( + edge, + new TEvSequenceShard::TEvGetSequence(pathId), + cookie); + } + + THolder TTestContext::NextGetSequenceResult( + ui64 cookie, const TActorId& edge) + { + auto result = ExpectEdgeEvent(edge, cookie); + UNIT_ASSERT_VALUES_EQUAL(result->Record.GetOrigin(), TabletId); + return result; + } + + THolder TTestContext::GetSequence( + const TPathId& pathId) + { + ui64 cookie = RandomNumber(); + auto edge = Runtime->AllocateEdgeActor(); + SendGetSequence(cookie, edge, pathId); + return NextGetSequenceResult(cookie, edge); + } + } // namespace NSequenceShard } // namespace NKikimr diff --git a/ydb/core/tx/sequenceshard/ut_helpers.h b/ydb/core/tx/sequenceshard/ut_helpers.h index 396e963894fe..9ad22c03fce3 100644 --- a/ydb/core/tx/sequenceshard/ut_helpers.h +++ b/ydb/core/tx/sequenceshard/ut_helpers.h @@ -101,6 +101,10 @@ namespace NSequenceShard { ui64 cookie, const TActorId& edge); THolder RedirectSequence( const TPathId& pathId, ui64 redirectTo); + + void SendGetSequence(ui64 cookie, const TActorId& edge, const TPathId& pathId); + THolder NextGetSequenceResult(ui64 cookie, const TActorId& edge); + THolder GetSequence(const TPathId& pathId); }; } // namespace NSequenceShard diff --git a/ydb/core/tx/sequenceshard/ut_sequenceshard.cpp b/ydb/core/tx/sequenceshard/ut_sequenceshard.cpp index 9e082db7c6dc..1aa67fb78f4a 100644 --- a/ydb/core/tx/sequenceshard/ut_sequenceshard.cpp +++ b/ydb/core/tx/sequenceshard/ut_sequenceshard.cpp @@ -165,6 +165,15 @@ namespace NSequenceShard { UNIT_ASSERT_VALUES_EQUAL(allocateResult->Record.GetAllocationStart(), 200011); UNIT_ASSERT_VALUES_EQUAL(allocateResult->Record.GetAllocationCount(), 5u); } + + // get sequence + { + auto getResult = ctx.GetSequence(TPathId(123, 51)); + UNIT_ASSERT_VALUES_EQUAL(getResult->Record.GetStatus(), + NKikimrTxSequenceShard::TEvGetSequenceResult::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(getResult->Record.GetNextValue(), 200016); + UNIT_ASSERT_VALUES_EQUAL(getResult->Record.GetCache(), 5u); + } } Y_UNIT_TEST(MarkedPipeRetries) { diff --git a/ydb/core/tx/sequenceshard/ya.make b/ydb/core/tx/sequenceshard/ya.make index b47e4534140e..b4befe185615 100644 --- a/ydb/core/tx/sequenceshard/ya.make +++ b/ydb/core/tx/sequenceshard/ya.make @@ -17,6 +17,7 @@ SRCS( tx_create_sequence.cpp tx_drop_sequence.cpp tx_freeze_sequence.cpp + tx_get_sequence.cpp tx_init.cpp tx_init_schema.cpp tx_mark_schemeshard_pipe.cpp From c9899ddf07c01fbfb014b078289562299bdc5260 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Mon, 18 Mar 2024 15:27:26 +0300 Subject: [PATCH 14/14] Fixes --- ydb/core/protos/counters_schemeshard.proto | 2 ++ ydb/core/tx/schemeshard/schemeshard__operation_part.h | 1 + 2 files changed, 3 insertions(+) diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 70319340c6ac..9ac6d2341844 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -539,4 +539,6 @@ enum ETxTypes { TXTYPE_PERSQUEUE_PROPOSE_RESULT = 83 [(TxTypeOpts) = {Name: "TxPersQueueProposeResult"}]; TXTYPE_PERSQUEUE_PROPOSE_ATTACH_RESULT = 84 [(TxTypeOpts) = {Name: "TxProposeTransactionAttachResult"}]; TXTYPE_UPDATE_DOMAIN_REPLY = 85 [(TxTypeOpts) = {Name: "TxUpdateDomainReply"}]; + + TXTYPE_SEQUENCESHARD_GET_SEQUENCE_RESULT = 86 [(TxTypeOpts) = {Name: "TxSequenceShardGetSequenceResult"}]; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 4b420825810b..1dadd2059f44 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -50,6 +50,7 @@ action(NSequenceShard::TEvSequenceShard::TEvFreezeSequenceResult, NSchemeShard::TXTYPE_SEQUENCESHARD_FREEZE_SEQUENCE_RESULT) \ action(NSequenceShard::TEvSequenceShard::TEvRestoreSequenceResult, NSchemeShard::TXTYPE_SEQUENCESHARD_RESTORE_SEQUENCE_RESULT) \ action(NSequenceShard::TEvSequenceShard::TEvRedirectSequenceResult, NSchemeShard::TXTYPE_SEQUENCESHARD_REDIRECT_SEQUENCE_RESULT) \ + action(NSequenceShard::TEvSequenceShard::TEvGetSequenceResult, NSchemeShard::TXTYPE_SEQUENCESHARD_GET_SEQUENCE_RESULT) \ \ action(NReplication::TEvController::TEvCreateReplicationResult, NSchemeShard::TXTYPE_CREATE_REPLICATION_RESULT) \ action(NReplication::TEvController::TEvDropReplicationResult, NSchemeShard::TXTYPE_DROP_REPLICATION_RESULT) \