From 4cd5e13005fb44cbd6ee7dbaafc29cc03897357a Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Fri, 22 Mar 2024 13:44:41 +0300 Subject: [PATCH] Support CopySequence in schemeshard (#2693) --- ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 130 +++ ydb/core/protos/counters_schemeshard.proto | 5 + ydb/core/protos/counters_sequenceshard.proto | 1 + ydb/core/protos/flat_scheme_op.proto | 6 + ydb/core/protos/out/out_sequenceshard.cpp | 4 + ydb/core/protos/tx_sequenceshard.proto | 31 + .../tx/schemeshard/schemeshard__operation.cpp | 2 + .../schemeshard__operation_common.h | 8 +- .../schemeshard__operation_copy_sequence.cpp | 742 ++++++++++++++++++ .../schemeshard__operation_copy_table.cpp | 63 +- .../schemeshard/schemeshard__operation_part.h | 3 + ydb/core/tx/schemeshard/schemeshard_impl.cpp | 13 + ydb/core/tx/schemeshard/schemeshard_impl.h | 1 + .../tx/schemeshard/schemeshard_tx_infly.h | 6 + .../tx/schemeshard/ut_helpers/helpers.cpp | 19 + ydb/core/tx/schemeshard/ut_helpers/helpers.h | 4 + .../tx/schemeshard/ut_helpers/test_env.cpp | 10 +- .../schemeshard/ut_sequence/ut_sequence.cpp | 5 + ydb/core/tx/schemeshard/ut_sequence/ya.make | 1 + .../ut_sequence_reboots.cpp | 76 +- .../schemeshard/ut_sequence_reboots/ya.make | 2 +- ydb/core/tx/schemeshard/ya.make | 1 + ydb/core/tx/sequenceshard/public/events.h | 57 ++ .../tx/sequenceshard/sequenceshard_impl.cpp | 1 + .../tx/sequenceshard/sequenceshard_impl.h | 2 + .../tx/sequenceshard/tx_create_sequence.cpp | 16 +- ydb/core/tx/sequenceshard/tx_get_sequence.cpp | 93 +++ ydb/core/tx/sequenceshard/ut_helpers.cpp | 25 + ydb/core/tx/sequenceshard/ut_helpers.h | 4 + .../tx/sequenceshard/ut_sequenceshard.cpp | 9 + ydb/core/tx/sequenceshard/ya.make | 1 + ydb/core/ydb_convert/table_description.h | 5 + 32 files changed, 1325 insertions(+), 21 deletions(-) create mode 100644 ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp create mode 100644 ydb/core/tx/sequenceshard/tx_get_sequence.cpp diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index a8525d76f2b0..b6b34bc21d50 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -1698,6 +1698,136 @@ Y_UNIT_TEST_SUITE(KqpPg) { } } + Y_UNIT_TEST(CopyTableSerialColumns) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetEnableNotNullDataColumns(true)); + auto client = kikimr.GetTableClient(); + auto session = client.CreateSession().GetValueSync().GetSession(); + { + const auto query = Q_(R"( + --!syntax_pg + CREATE TABLE PgSerial ( + key serial PRIMARY KEY, + value int2 + ))"); + + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO PgSerial (value) values (1); + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto result = session.CopyTable("/Root/PgSerial", "/Root/copy").GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto desc = session.DescribeTable("/Root/copy").ExtractValueSync(); + UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM copy; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO copy (value) values (1); + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM copy; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"];["2";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM PgSerial; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO PgSerial (value) values (1); + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM copy; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"];["2";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM PgSerial; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"];["2";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + } + Y_UNIT_TEST(CreateIndex) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);; diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index ca2a07a1d508..9ac6d2341844 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -195,6 +195,8 @@ enum ESimpleCounters { COUNTER_IN_FLIGHT_OPS_TxDropView = 157 [(CounterOpts) = {Name: "InFlightOps/DropView"}]; COUNTER_GRAPHSHARD_COUNT = 158 [(CounterOpts) = {Name: "GraphShards"}]; + + COUNTER_IN_FLIGHT_OPS_TxCopySequence = 159 [(CounterOpts) = {Name: "InFlightOps/CopySequence"}]; } enum ECumulativeCounters { @@ -315,6 +317,7 @@ enum ECumulativeCounters { COUNTER_FINISHED_OPS_TxCreateView = 95 [(CounterOpts) = {Name: "FinishedOps/CreateView"}]; COUNTER_FINISHED_OPS_TxDropView = 96 [(CounterOpts) = {Name: "FinishedOps/DropView"}]; COUNTER_FINISHED_OPS_TxAlterView = 97 [(CounterOpts) = {Name: "FinishedOps/AlterView"}]; + COUNTER_FINISHED_OPS_TxCopySequence = 98 [(CounterOpts) = {Name: "FinishedOps/TxCopySequence"}]; } enum EPercentileCounters { @@ -536,4 +539,6 @@ enum ETxTypes { TXTYPE_PERSQUEUE_PROPOSE_RESULT = 83 [(TxTypeOpts) = {Name: "TxPersQueueProposeResult"}]; TXTYPE_PERSQUEUE_PROPOSE_ATTACH_RESULT = 84 [(TxTypeOpts) = {Name: "TxProposeTransactionAttachResult"}]; TXTYPE_UPDATE_DOMAIN_REPLY = 85 [(TxTypeOpts) = {Name: "TxUpdateDomainReply"}]; + + TXTYPE_SEQUENCESHARD_GET_SEQUENCE_RESULT = 86 [(TxTypeOpts) = {Name: "TxSequenceShardGetSequenceResult"}]; } diff --git a/ydb/core/protos/counters_sequenceshard.proto b/ydb/core/protos/counters_sequenceshard.proto index 6ace98084695..ec396a082dac 100644 --- a/ydb/core/protos/counters_sequenceshard.proto +++ b/ydb/core/protos/counters_sequenceshard.proto @@ -34,4 +34,5 @@ enum ETxTypes { TXTYPE_FREEZE_SEQUENCE = 7 [(TxTypeOpts) = {Name: "TxFreezeSequence"}]; TXTYPE_RESTORE_SEQUENCE = 8 [(TxTypeOpts) = {Name: "TxRestoreSequence"}]; TXTYPE_REDIRECT_SEQUENCE = 9 [(TxTypeOpts) = {Name: "TxRedirectSequence"}]; + TXTYPE_GET_SEQUENCE = 10 [(TxTypeOpts) = {Name: "TxGetSequence"}]; } diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index ef2f2b193f5b..165b66347f7d 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1590,6 +1590,12 @@ message TModifyScheme { optional TViewDescription CreateView = 64; optional NActorsProto.TActorId TempTableOwnerActorId = 65; + + optional TCopySequence CopySequence = 66; +} + +message TCopySequence { + optional string CopyFrom = 1; } // "Script", used by client to parse text files with multiple DDL commands diff --git a/ydb/core/protos/out/out_sequenceshard.cpp b/ydb/core/protos/out/out_sequenceshard.cpp index 066fa1309119..ce193e08a826 100644 --- a/ydb/core/protos/out/out_sequenceshard.cpp +++ b/ydb/core/protos/out/out_sequenceshard.cpp @@ -29,3 +29,7 @@ Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvRestoreSequenceResult::EStatus, Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvRedirectSequenceResult::EStatus, stream, value) { stream << NKikimrTxSequenceShard::TEvRedirectSequenceResult::EStatus_Name(value); } + +Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus, stream, value) { + stream << NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus_Name(value); +} diff --git a/ydb/core/protos/tx_sequenceshard.proto b/ydb/core/protos/tx_sequenceshard.proto index 9a034f314fe9..0f13af504618 100644 --- a/ydb/core/protos/tx_sequenceshard.proto +++ b/ydb/core/protos/tx_sequenceshard.proto @@ -39,6 +39,7 @@ message TEvCreateSequence { oneof OptionalCycle { bool Cycle = 9; } + bool Frozen = 10; // defaults to false } message TEvCreateSequenceResult { @@ -224,3 +225,33 @@ message TEvRedirectSequenceResult { uint64 TxId = 3; uint64 TxPartId = 4; } + +message TEvGetSequence { + NKikimrProto.TPathID PathId = 1; + uint64 TxId = 2; + uint64 TxPartId = 3; +} + +message TEvGetSequenceResult { + enum EStatus { + UNKNOWN = 0; + SUCCESS = 1; + PIPE_OUTDATED = 2; + SEQUENCE_NOT_FOUND = 3; + SEQUENCE_MOVED = 4; + } + + EStatus Status = 1; + uint64 Origin = 2; + uint64 TxId = 3; + uint64 TxPartId = 4; + uint64 MovedTo = 5; // moved to a different sequence shard + sint64 MinValue = 6; + sint64 MaxValue = 7; + sint64 StartValue = 8; + sint64 NextValue = 9; + bool NextUsed = 10; + uint64 Cache = 11; + sint64 Increment = 12; + bool Cycle = 13; +} diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index 676771621098..4af3fd8a6f47 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -1033,6 +1033,8 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState:: Y_ABORT("TODO: implement"); case TTxState::ETxType::TxDropSequence: return CreateDropSequence(NextPartId(), txState); + case TTxState::ETxType::TxCopySequence: + return CreateCopySequence(NextPartId(), txState); case TTxState::ETxType::TxFillIndex: Y_ABORT("deprecated"); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index 1910df417b58..8816ee761c78 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -43,6 +43,7 @@ void UpdatePartitioningForCopyTable(TOperationId operationId, TTxState& txState, class TProposedWaitParts: public TSubOperationState { private: TOperationId OperationId; + const TTxState::ETxState NextState; TString DebugHint() const override { return TStringBuilder() @@ -51,8 +52,9 @@ class TProposedWaitParts: public TSubOperationState { } public: - TProposedWaitParts(TOperationId id) + TProposedWaitParts(TOperationId id, TTxState::ETxState nextState = TTxState::Done) : OperationId(id) + , NextState(nextState) { IgnoreMessages(DebugHint(), { TEvHive::TEvCreateTabletReply::EventType @@ -124,7 +126,7 @@ class TProposedWaitParts: public TSubOperationState { // Got notifications from all datashards? if (txState->ShardsInProgress.empty()) { NTableState::AckAllSchemaChanges(OperationId, *txState, context); - context.SS->ChangeTxState(db, OperationId, TTxState::Done); + context.SS->ChangeTxState(db, OperationId, NextState); return true; } @@ -968,7 +970,7 @@ class TPropose: public TSubOperationState { TTxState* txState = context.SS->FindTx(OperationId); Y_ABORT_UNLESS(txState); Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreatePQGroup || txState->TxType == TTxState::TxAlterPQGroup || txState->TxType == TTxState::TxAllocatePQ); - + TPathId pathId = txState->TargetPathId; TPathElement::TPtr path = context.SS->PathsById.at(pathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp new file mode 100644 index 000000000000..f1f408817e99 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp @@ -0,0 +1,742 @@ +#include "schemeshard__operation_part.h" +#include "schemeshard__operation_common.h" +#include "schemeshard_impl.h" + +#include +#include +#include + + +namespace { + +using namespace NKikimr; +using namespace NSchemeShard; + +class TConfigureParts : public TSubOperationState { +private: + TOperationId OperationId; + + TString DebugHint() const override { + return TStringBuilder() + << "TCopySequence TConfigureParts" + << " operationId#" << OperationId; + } + +public: + TConfigureParts(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), { + TEvHive::TEvCreateTabletReply::EventType, + }); + } + + bool HandleReply(NSequenceShard::TEvSequenceShard::TEvCreateSequenceResult::TPtr& ev, TOperationContext& context) override { + auto ssId = context.SS->SelfTabletId(); + auto tabletId = TTabletId(ev->Get()->Record.GetOrigin()); + auto status = ev->Get()->Record.GetStatus(); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts HandleReply TEvCreateSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + + switch (status) { + case NKikimrTxSequenceShard::TEvCreateSequenceResult::SUCCESS: + case NKikimrTxSequenceShard::TEvCreateSequenceResult::SEQUENCE_ALREADY_EXISTS: + // Treat expected status as success + break; + + default: + // Treat all other replies as unexpected and spurious + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts HandleReply ignoring unexpected TEvCreateSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + return false; + } + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + Y_ABORT_UNLESS(txState->State == TTxState::ConfigureParts); + + auto shardIdx = context.SS->MustGetShardIdx(tabletId); + if (!txState->ShardsInProgress.erase(shardIdx)) { + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCreateSequence TConfigureParts HandleReply ignoring duplicate TEvCreateSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + return false; + } + + context.OnComplete.UnbindMsgFromPipe(OperationId, tabletId, txState->TargetPathId); + + if (txState->ShardsInProgress.empty()) { + NIceDb::TNiceDb db(context.GetDB()); + context.SS->ChangeTxState(db, OperationId, TTxState::Propose); + context.OnComplete.ActivateTx(OperationId); + return true; + } + + return false; + } + + bool ProgressState(TOperationContext& context) override { + auto ssId = context.SS->SelfTabletId(); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TConfigureParts ProgressState" + << " operationId# " << OperationId + << " at tablet " << ssId); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + Y_ABORT_UNLESS(!txState->Shards.empty()); + + txState->ClearShardsInProgress(); + + Y_ABORT_UNLESS(txState->Shards.size() == 1); + for (auto shard : txState->Shards) { + auto shardIdx = shard.Idx; + auto tabletId = context.SS->ShardInfos.at(shardIdx).TabletID; + Y_ABORT_UNLESS(shard.TabletType == ETabletType::SequenceShard); + Y_ABORT_UNLESS(tabletId != InvalidTabletId); + + auto event = MakeHolder(txState->TargetPathId); + event->Record.SetTxId(ui64(OperationId.GetTxId())); + event->Record.SetTxPartId(OperationId.GetSubTxId()); + event->Record.SetFrozen(true); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCoptSequence TConfigureParts ProgressState" + << " sending TEvCreateSequence to tablet " << tabletId + << " operationId# " << OperationId + << " at tablet " << ssId); + + context.OnComplete.BindMsgToPipe(OperationId, tabletId, txState->TargetPathId, event.Release()); + + // Wait for results from this shard + txState->ShardsInProgress.insert(shardIdx); + } + + return false; + } +}; + + +class TPropose: public TSubOperationState { +private: + TOperationId OperationId; + + TString DebugHint() const override { + return TStringBuilder() + << "TCopySequence TPropose" + << " operationId#" << OperationId; + } + +public: + TPropose(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), { + NSequenceShard::TEvSequenceShard::TEvCreateSequenceResult::EventType, + }); + } + + bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { + auto step = TStepId(ev->Get()->StepId); + auto ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvOperationPlan" + << ", at schemeshard: " << ssId); + + TTxState* txState = context.SS->FindTx(OperationId); + if (!txState) { + return false; + } + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + + TPathId pathId = txState->TargetPathId; + TPathElement::TPtr path = context.SS->PathsById.at(pathId); + + Y_VERIFY_S(context.SS->Sequences.contains(pathId), "Sequence not found. PathId: " << pathId); + TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(pathId); + Y_ABORT_UNLESS(sequenceInfo); + TSequenceInfo::TPtr alterData = sequenceInfo->AlterData; + Y_ABORT_UNLESS(alterData); + + NIceDb::TNiceDb db(context.GetDB()); + + path->StepCreated = step; + context.SS->PersistCreateStep(db, pathId, step); + + context.SS->Sequences[pathId] = alterData; + context.SS->PersistSequenceAlterRemove(db, pathId); + context.SS->PersistSequence(db, pathId, *alterData); + + auto parentDir = context.SS->PathsById.at(path->ParentPathId); + if (parentDir->IsLikeDirectory()) { + ++parentDir->DirAlterVersion; + context.SS->PersistPathDirAlterVersion(db, parentDir); + } + context.SS->ClearDescribePathCaches(parentDir); + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); + + context.SS->ClearDescribePathCaches(path); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + context.SS->ChangeTxState(db, OperationId, TTxState::CopyTableBarrier); + return true; + } + + bool ProgressState(TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " ProgressState" + << ", at schemeshard: " << ssId); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + + context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, TStepId(0)); + return false; + } +}; + +class TCopyTableBarrier: public TSubOperationState { +private: + TOperationId OperationId; + + TString DebugHint() const override { + return TStringBuilder() + << "TCopySequence TCopyTableBarrier" + << " operationId: " << OperationId; + } + +public: + TCopyTableBarrier(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), { + TEvPrivate::TEvOperationPlan::EventType, + NSequenceShard::TEvSequenceShard::TEvCreateSequenceResult::EventType, + }); + } + + bool HandleReply(TEvPrivate::TEvCompleteBarrier::TPtr& ev, TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvPrivate::TEvCompleteBarrier" + << ", msg: " << ev->Get()->ToString() + << ", at tablet" << ssId); + + NIceDb::TNiceDb db(context.GetDB()); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + + context.SS->ChangeTxState(db, OperationId, TTxState::ProposedCopySequence); + return true; + } + + bool ProgressState(TOperationContext& context) override { + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << "ProgressState, operation type " + << TTxState::TypeName(txState->TxType)); + + context.OnComplete.Barrier(OperationId, "CopyTableBarrier"); + return false; + } +}; + +class TProposedCopySequence : public TSubOperationState { +private: + TOperationId OperationId; + NKikimrTxSequenceShard::TEvGetSequenceResult GetSequenceResult; + + TString DebugHint() const override { + return TStringBuilder() + << "TCopySequence TProposedCopySequence" + << " operationId#" << OperationId; + } + +public: + TProposedCopySequence(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), { + TEvPrivate::TEvOperationPlan::EventType, + TEvPrivate::TEvCompleteBarrier::EventType, + NSequenceShard::TEvSequenceShard::TEvCreateSequenceResult::EventType, + }); + } + + bool HandleReply(NSequenceShard::TEvSequenceShard::TEvRestoreSequenceResult::TPtr& ev, TOperationContext& context) override { + auto ssId = context.SS->SelfTabletId(); + auto tabletId = TTabletId(ev->Get()->Record.GetOrigin()); + auto status = ev->Get()->Record.GetStatus(); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TProposedCopySequence HandleReply TEvRestoreSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + + switch (status) { + case NKikimrTxSequenceShard::TEvRestoreSequenceResult::SUCCESS: + case NKikimrTxSequenceShard::TEvRestoreSequenceResult::SEQUENCE_ALREADY_ACTIVE: break; + default: + // Treat all other replies as unexpected and spurious + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TProposedCopySequence HandleReply ignoring unexpected TEvRestoreSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + return false; + } + + TTxState* txState = context.SS->FindTx(OperationId); + context.OnComplete.UnbindMsgFromPipe(OperationId, tabletId, txState->TargetPathId); + + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + Y_ABORT_UNLESS(txState->State == TTxState::ProposedCopySequence); + + auto shardIdx = context.SS->MustGetShardIdx(tabletId); + if (!txState->ShardsInProgress.erase(shardIdx)) { + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TProposedCopySequence HandleReply ignoring duplicate TEvRestoreSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + return false; + } + + if (!txState->ShardsInProgress.empty()) { + return false; + } + + NIceDb::TNiceDb db(context.GetDB()); + context.SS->ChangeTxState(db, OperationId, TTxState::Done); + context.OnComplete.ActivateTx(OperationId); + return true; + } + + bool HandleReply(NSequenceShard::TEvSequenceShard::TEvGetSequenceResult::TPtr& ev, TOperationContext& context) override { + auto ssId = context.SS->SelfTabletId(); + auto tabletId = TTabletId(ev->Get()->Record.GetOrigin()); + auto status = ev->Get()->Record.GetStatus(); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TProposedCopySequence HandleReply TEvGetSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + + switch (status) { + case NKikimrTxSequenceShard::TEvGetSequenceResult::SUCCESS: break; + default: + // Treat all other replies as unexpected and spurious + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TProposedCopySequence HandleReply ignoring unexpected TEvGetSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + return false; + } + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + Y_ABORT_UNLESS(txState->State == TTxState::ProposedCopySequence); + + auto shardIdx = context.SS->MustGetShardIdx(tabletId); + + context.OnComplete.UnbindMsgFromPipe(OperationId, tabletId, txState->SourcePathId); + + if (!txState->ShardsInProgress.erase(shardIdx)) { + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TProposedCopySequence HandleReply ignoring duplicate TEvGetSequenceResult" + << " shardId# " << tabletId + << " status# " << status + << " operationId# " << OperationId + << " at tablet " << ssId); + return false; + } + + if (!txState->ShardsInProgress.empty()) { + return false; + } + + auto getSequenceResult = ev->Get()->Record; + + Y_ABORT_UNLESS(txState->Shards.size() == 1); + for (auto shard : txState->Shards) { + auto shardIdx = shard.Idx; + auto currentTabletId = context.SS->ShardInfos.at(shardIdx).TabletID; + + Y_ABORT_UNLESS(currentTabletId != InvalidTabletId); + + auto event = MakeHolder( + txState->TargetPathId, getSequenceResult); + event->Record.SetTxId(ui64(OperationId.GetTxId())); + event->Record.SetTxPartId(OperationId.GetSubTxId()); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TProposedCopySequence ProgressState" + << " sending TEvRestoreSequence to tablet " << currentTabletId + << " operationId# " << OperationId + << " at tablet " << ssId); + + context.OnComplete.BindMsgToPipe(OperationId, currentTabletId, txState->TargetPathId, event.Release()); + + // Wait for results from this shard + txState->ShardsInProgress.insert(shardIdx); + } + + return false; + } + + bool ProgressState(TOperationContext& context) override { + auto ssId = context.SS->SelfTabletId(); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TProposedCopySequence ProgressState" + << " operationId# " << OperationId + << " at tablet " << ssId); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCopySequence); + Y_ABORT_UNLESS(!txState->Shards.empty()); + Y_ABORT_UNLESS(txState->SourcePathId != InvalidPathId); + + Y_ABORT_UNLESS(txState->Shards.size() == 1); + for (auto shard : txState->Shards) { + auto shardIdx = shard.Idx; + auto tabletId = context.SS->ShardInfos.at(shardIdx).TabletID; + + auto event = MakeHolder(txState->SourcePathId); + event->Record.SetTxId(ui64(OperationId.GetTxId())); + event->Record.SetTxPartId(OperationId.GetSubTxId()); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence TProposedCopySequence ProgressState" + << " sending TEvGetSequence to tablet " << tabletId + << " operationId# " << OperationId + << " at tablet " << ssId); + + context.OnComplete.BindMsgToPipe(OperationId, tabletId, txState->SourcePathId, event.Release()); + + txState->ShardsInProgress.insert(shardIdx); + } + + return false; + } +}; + +class TCopySequence: public TSubOperation { + + static TTxState::ETxState NextState() { + return TTxState::CreateParts; + } + + TTxState::ETxState NextState(TTxState::ETxState state) const override { + switch (state) { + case TTxState::Waiting: + case TTxState::CreateParts: + return TTxState::ConfigureParts; + case TTxState::ConfigureParts: + return TTxState::Propose; + case TTxState::Propose: + return TTxState::CopyTableBarrier; + case TTxState::CopyTableBarrier: + return TTxState::ProposedCopySequence; + case TTxState::ProposedCopySequence: + return TTxState::Done; + default: + return TTxState::Invalid; + } + return TTxState::Invalid; + } + + TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) override { + using TPtr = TSubOperationState::TPtr; + + switch (state) { + case TTxState::Waiting: + case TTxState::CreateParts: + return TPtr(new TCreateParts(OperationId)); + case TTxState::ConfigureParts: + return TPtr(new TConfigureParts(OperationId)); + case TTxState::Propose: + return TPtr(new TPropose(OperationId)); + case TTxState::CopyTableBarrier: + return TPtr(new TCopyTableBarrier(OperationId)); + case TTxState::ProposedCopySequence: + return TPtr(new TProposedCopySequence(OperationId)); + case TTxState::Done: + return TPtr(new TDone(OperationId)); + default: + return nullptr; + } + } + +public: + using TSubOperation::TSubOperation; + + THolder Propose(const TString& owner, TOperationContext& context) override { + const TTabletId ssId = context.SS->SelfTabletId(); + + const auto acceptExisted = !Transaction.GetFailOnExist(); + const TString& parentPathStr = Transaction.GetWorkingDir(); + auto& copySequence = Transaction.GetCopySequence(); + auto& descr = Transaction.GetSequence(); + const TString& name = descr.GetName(); + + LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TCopySequence Propose" + << ", path: " << parentPathStr << "/" << name + << ", opId: " << OperationId + << ", at schemeshard: " << ssId); + + TEvSchemeShard::EStatus status = NKikimrScheme::StatusAccepted; + auto result = MakeHolder(status, ui64(OperationId.GetTxId()), ui64(ssId)); + + NSchemeShard::TPath parentPath = NSchemeShard::TPath::Resolve(parentPathStr, context.SS); + { + NSchemeShard::TPath::TChecker checks = parentPath.Check(); + checks + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsCommonSensePath(); + + if (checks) { + if (parentPath->IsTable()) { + // allow immediately inside a normal table + if (parentPath.IsUnderOperation()) { + checks.IsUnderTheSameOperation(OperationId.GetTxId()); // allowed only as part of consistent operations + } + } else { + // otherwise don't allow unexpected object types + checks.IsLikeDirectory(); + } + } + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + return result; + } + } + + TPath srcPath = TPath::Resolve(copySequence.GetCopyFrom(), context.SS); + { + TPath::TChecker checks = srcPath.Check(); + checks + .NotEmpty() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsSequence() + .NotUnderTheSameOperation(OperationId.GetTxId()) + .NotUnderOperation(); + + if (checks) { + if (!parentPath->IsTable()) { + // otherwise don't allow unexpected object types + checks.IsLikeDirectory(); + } + } + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + return result; + } + } + + auto domainPathId = parentPath.GetPathIdForDomain(); + auto domainInfo = parentPath.DomainInfo(); + + Y_ABORT_UNLESS(context.SS->Sequences.contains(srcPath.Base()->PathId)); + TSequenceInfo::TPtr srcSequence = context.SS->Sequences.at(srcPath.Base()->PathId); + Y_ABORT_UNLESS(!srcSequence->Sharding.GetSequenceShards().empty()); + + const auto& protoSequenceShard = *srcSequence->Sharding.GetSequenceShards().rbegin(); + TShardIdx sequenceShard = FromProto(protoSequenceShard); + + const TString acl = Transaction.GetModifyACL().GetDiffACL(); + + NSchemeShard::TPath dstPath = parentPath.Child(name); + { + NSchemeShard::TPath::TChecker checks = dstPath.Check(); + checks.IsAtLocalSchemeShard(); + if (dstPath.IsResolved()) { + checks + .IsResolved() + .NotUnderDeleting() + .FailOnExist(TPathElement::EPathType::EPathTypeSequence, acceptExisted); + } else { + checks + .NotEmpty() + .NotResolved(); + } + + if (checks) { + checks.IsValidLeafName(); + + if (!parentPath->IsTable()) { + checks.DepthLimit(); + } + + checks + .PathsLimit() + .DirChildrenLimit() + .IsTheSameDomain(srcPath) + .IsValidACL(acl); + } + + if (!checks) { + result->SetError(checks.GetStatus(), checks.GetError()); + if (dstPath.IsResolved()) { + result->SetPathCreateTxId(ui64(dstPath->CreateTxId)); + result->SetPathId(dstPath->PathId.LocalPathId); + } + return result; + } + } + + TString errStr; + + if (!TSequenceInfo::ValidateCreate(descr, errStr)) { + result->SetError(NKikimrScheme::StatusSchemeError, errStr); + return result; + } + + if (!context.SS->CheckApplyIf(Transaction, errStr)) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); + return result; + } + + dstPath.MaterializeLeaf(owner); + result->SetPathId(dstPath->PathId.LocalPathId); + context.SS->TabletCounters->Simple()[COUNTER_SEQUENCE_COUNT].Add(1); + + srcPath.Base()->PathState = TPathElement::EPathState::EPathStateCopying; + srcPath.Base()->LastTxId = OperationId.GetTxId(); + + TPathId pathId = dstPath->PathId; + dstPath->CreateTxId = OperationId.GetTxId(); + dstPath->LastTxId = OperationId.GetTxId(); + dstPath->PathState = TPathElement::EPathState::EPathStateCreate; + dstPath->PathType = TPathElement::EPathType::EPathTypeSequence; + + if (parentPath->HasActiveChanges()) { + TTxId parentTxId = parentPath->PlannedToCreate() ? parentPath->CreateTxId : parentPath->LastTxId; + context.OnComplete.Dependence(parentTxId, OperationId.GetTxId()); + } + + TTxState& txState = + context.SS->CreateTx(OperationId, TTxState::TxCopySequence, pathId, srcPath.Base()->PathId); + txState.State = TTxState::Propose; + + TSequenceInfo::TPtr sequenceInfo = new TSequenceInfo(0); + TSequenceInfo::TPtr alterData = sequenceInfo->CreateNextVersion(); + alterData->Description = descr; + + txState.Shards.emplace_back(sequenceShard, ETabletType::SequenceShard, TTxState::ConfigureParts); + auto& shardInfo = context.SS->ShardInfos.at(sequenceShard); + if (shardInfo.CurrentTxId != OperationId.GetTxId()) { + context.OnComplete.Dependence(shardInfo.CurrentTxId, OperationId.GetTxId()); + } + + { + auto* p = alterData->Sharding.AddSequenceShards(); + p->SetOwnerId(sequenceShard.GetOwnerId()); + p->SetLocalId(ui64(sequenceShard.GetLocalId())); + } + + NIceDb::TNiceDb db(context.GetDB()); + + context.SS->ChangeTxState(db, OperationId, txState.State); + context.OnComplete.ActivateTx(OperationId); + + context.SS->PersistPath(db, dstPath->PathId); + if (!acl.empty()) { + dstPath->ApplyACL(acl); + context.SS->PersistACL(db, dstPath.Base()); + } + + context.SS->Sequences[pathId] = sequenceInfo; + context.SS->PersistSequence(db, pathId, *sequenceInfo); + context.SS->PersistSequenceAlter(db, pathId, *alterData); + context.SS->IncrementPathDbRefCount(pathId); + + context.SS->PersistTxState(db, OperationId); + context.SS->PersistUpdateNextPathId(db); + + for (auto shard : txState.Shards) { + if (shard.Operation == TTxState::CreateParts) { + context.SS->PersistChannelsBinding(db, shard.Idx, context.SS->ShardInfos.at(shard.Idx).BindedChannels); + context.SS->PersistShardMapping(db, shard.Idx, InvalidTabletId, domainPathId, OperationId.GetTxId(), shard.TabletType); + } + } + + ++parentPath->DirAlterVersion; + context.SS->PersistPathDirAlterVersion(db, parentPath.Base()); + context.SS->ClearDescribePathCaches(parentPath.Base()); + context.OnComplete.PublishToSchemeBoard(OperationId, parentPath->PathId); + + context.SS->ClearDescribePathCaches(dstPath.Base()); + context.OnComplete.PublishToSchemeBoard(OperationId, dstPath->PathId); + + domainInfo->IncPathsInside(); + parentPath->IncAliveChildren(); + + SetState(NextState()); + return result; + } + + void AbortPropose(TOperationContext&) override { + Y_ABORT("no AbortPropose for TCopySequence"); + } + + void AbortUnsafe(TTxId, TOperationContext&) override { + Y_ABORT("no AbortUnsafe for TCopySequence"); + } +}; + +} + +namespace NKikimr::NSchemeShard { + +ISubOperation::TPtr CreateCopySequence(TOperationId id, const TTxTransaction& tx) +{ + return MakeSubOperation(id, tx); +} + +ISubOperation::TPtr CreateCopySequence(TOperationId id, TTxState::ETxState state) { + return MakeSubOperation(id, state); +} + +} diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp index 25bc09da6648..a69fb2956a37 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp @@ -1,6 +1,7 @@ #include "schemeshard__operation_part.h" #include "schemeshard__operation_common.h" #include "schemeshard_impl.h" +#include "schemeshard_tx_infly.h" #include @@ -225,6 +226,58 @@ class TPropose: public TSubOperationState { } }; +class TCopyTableBarrier: public TSubOperationState { +private: + TOperationId OperationId; + + TString DebugHint() const override { + return TStringBuilder() + << "TCopyTable TCopyTableBarrier" + << " operationId: " << OperationId; + } + +public: + TCopyTableBarrier(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), + { TEvHive::TEvCreateTabletReply::EventType + , TEvDataShard::TEvProposeTransactionResult::EventType + , TEvPrivate::TEvOperationPlan::EventType + , TEvDataShard::TEvSchemaChanged::EventType } + ); + } + + bool HandleReply(TEvPrivate::TEvCompleteBarrier::TPtr& ev, TOperationContext& context) override { + TTabletId ssId = context.SS->SelfTabletId(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvPrivate::TEvCompleteBarrier" + << ", msg: " << ev->Get()->ToString() + << ", at tablet" << ssId); + + NIceDb::TNiceDb db(context.GetDB()); + + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + + context.SS->ChangeTxState(db, OperationId, TTxState::Done); + return true; + } + + bool ProgressState(TOperationContext& context) override { + TTxState* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << "ProgressState, operation type " + << TTxState::TypeName(txState->TxType)); + + context.OnComplete.Barrier(OperationId, "CopyTableBarrier"); + return false; + } +}; + class TCopyTable: public TSubOperation { THashSet LocalSequences; @@ -243,6 +296,8 @@ class TCopyTable: public TSubOperation { case TTxState::Propose: return TTxState::ProposedWaitParts; case TTxState::ProposedWaitParts: + return TTxState::CopyTableBarrier; + case TTxState::CopyTableBarrier: return TTxState::Done; default: return TTxState::Invalid; @@ -259,7 +314,9 @@ class TCopyTable: public TSubOperation { case TTxState::Propose: return MakeHolder(OperationId); case TTxState::ProposedWaitParts: - return MakeHolder(OperationId); + return MakeHolder(OperationId, TTxState::ETxState::CopyTableBarrier); + case TTxState::CopyTableBarrier: + return MakeHolder(OperationId); case TTxState::Done: return MakeHolder(OperationId); default: @@ -775,9 +832,11 @@ TVector CreateCopyTable(TOperationId nextId, const TTxTrans NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence); scheme.SetFailOnExist(tx.GetFailOnExist()); + auto* copySequence = scheme.MutableCopySequence(); + copySequence->SetCopyFrom(copying.GetCopyFromTable() + "/" + sequenceDescription.GetName()); *scheme.MutableSequence() = std::move(sequenceDescription); - result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme)); + result.push_back(CreateCopySequence(NextPartId(nextId, result), scheme)); } return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 76baccd4dabf..1dadd2059f44 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -50,6 +50,7 @@ action(NSequenceShard::TEvSequenceShard::TEvFreezeSequenceResult, NSchemeShard::TXTYPE_SEQUENCESHARD_FREEZE_SEQUENCE_RESULT) \ action(NSequenceShard::TEvSequenceShard::TEvRestoreSequenceResult, NSchemeShard::TXTYPE_SEQUENCESHARD_RESTORE_SEQUENCE_RESULT) \ action(NSequenceShard::TEvSequenceShard::TEvRedirectSequenceResult, NSchemeShard::TXTYPE_SEQUENCESHARD_REDIRECT_SEQUENCE_RESULT) \ + action(NSequenceShard::TEvSequenceShard::TEvGetSequenceResult, NSchemeShard::TXTYPE_SEQUENCESHARD_GET_SEQUENCE_RESULT) \ \ action(NReplication::TEvController::TEvCreateReplicationResult, NSchemeShard::TXTYPE_CREATE_REPLICATION_RESULT) \ action(NReplication::TEvController::TEvDropReplicationResult, NSchemeShard::TXTYPE_DROP_REPLICATION_RESULT) \ @@ -584,6 +585,8 @@ ISubOperation::TPtr CreateNewSequence(TOperationId id, const TTxTransaction& tx) ISubOperation::TPtr CreateNewSequence(TOperationId id, TTxState::ETxState state); ISubOperation::TPtr CreateDropSequence(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateDropSequence(TOperationId id, TTxState::ETxState state); +ISubOperation::TPtr CreateCopySequence(TOperationId id, const TTxTransaction& tx); +ISubOperation::TPtr CreateCopySequence(TOperationId id, TTxState::ETxState state); ISubOperation::TPtr CreateNewReplication(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateNewReplication(TOperationId id, TTxState::ETxState state); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 8bc501e657a5..bf604eb8767c 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1434,6 +1434,7 @@ TPathElement::EPathState TSchemeShard::CalcPathState(TTxState::ETxType txType, T case TTxState::TxCreateColumnTable: case TTxState::TxCreateCdcStream: case TTxState::TxCreateSequence: + case TTxState::TxCopySequence: case TTxState::TxCreateReplication: case TTxState::TxCreateBlobDepot: case TTxState::TxCreateExternalTable: @@ -4491,6 +4492,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(NSequenceShard::TEvSequenceShard::TEvFreezeSequenceResult, Handle); HFuncTraced(NSequenceShard::TEvSequenceShard::TEvRestoreSequenceResult, Handle); HFuncTraced(NSequenceShard::TEvSequenceShard::TEvRedirectSequenceResult, Handle); + HFuncTraced(NSequenceShard::TEvSequenceShard::TEvGetSequenceResult, Handle); // replication HFuncTraced(NReplication::TEvController::TEvCreateReplicationResult, Handle); @@ -5772,6 +5774,17 @@ void TSchemeShard::Handle(NSequenceShard::TEvSequenceShard::TEvRedirectSequenceR Execute(CreateTxOperationReply(TOperationId(txId, partId), ev), ctx); } +void TSchemeShard::Handle(NSequenceShard::TEvSequenceShard::TEvGetSequenceResult::TPtr &ev, const TActorContext &ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Handle TEvGetSequenceResult" + << ", at schemeshard: " << TabletID() + << ", message: " << ev->Get()->Record.ShortDebugString()); + + TTxId txId = TTxId(ev->Get()->Record.GetTxId()); + TSubTxId partId = TSubTxId(ev->Get()->Record.GetTxPartId()); + Execute(CreateTxOperationReply(TOperationId(txId, partId), ev), ctx); +} + void TSchemeShard::Handle(NReplication::TEvController::TEvCreateReplicationResult::TPtr &ev, const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Handle TEvCreateReplicationResult" diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index fff4bdfdc297..2b560c12f506 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1034,6 +1034,7 @@ class TSchemeShard void Handle(NSequenceShard::TEvSequenceShard::TEvFreezeSequenceResult::TPtr &ev, const TActorContext &ctx); void Handle(NSequenceShard::TEvSequenceShard::TEvRestoreSequenceResult::TPtr &ev, const TActorContext &ctx); void Handle(NSequenceShard::TEvSequenceShard::TEvRedirectSequenceResult::TPtr &ev, const TActorContext &ctx); + void Handle(NSequenceShard::TEvSequenceShard::TEvGetSequenceResult::TPtr &ev, const TActorContext &ctx); void Handle(NReplication::TEvController::TEvCreateReplicationResult::TPtr &ev, const TActorContext &ctx); void Handle(NReplication::TEvController::TEvDropReplicationResult::TPtr &ev, const TActorContext &ctx); void Handle(TEvDataShard::TEvProposeTransactionResult::TPtr &ev, const TActorContext &ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h index 849f91a9dee0..70c2728a5371 100644 --- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h +++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h @@ -129,6 +129,7 @@ struct TTxState { item(TxCreateView, 83) \ item(TxAlterView, 84) \ item(TxDropView, 85) \ + item(TxCopySequence, 86) \ // TX_STATE_TYPE_ENUM @@ -193,6 +194,8 @@ struct TTxState { item(WaitShadowPathPublication, 136, "") \ item(DeletePathBarrier, 137, "") \ item(SyncHive, 138, "") \ + item(CopyTableBarrier, 139, "") \ + item(ProposedCopySequence, 140, "") \ item(Done, 240, "") \ item(Aborted, 250, "") @@ -342,6 +345,7 @@ struct TTxState { case TxCreateExternalTable: case TxCreateExternalDataSource: case TxCreateView: + case TxCopySequence: return true; case TxInitializeBuildIndex: //this is more like alter case TxCreateCdcStreamAtTable: @@ -472,6 +476,7 @@ struct TTxState { case TxCreateExternalTable: case TxCreateExternalDataSource: case TxCreateView: + case TxCopySequence: return false; case TxAlterPQGroup: case TxAlterTable: @@ -559,6 +564,7 @@ struct TTxState { case TxCreateCdcStreamAtTable: case TxCreateCdcStreamAtTableWithInitialScan: case TxCreateSequence: + case TxCopySequence: case TxCreateReplication: case TxCreateBlobDepot: case TxInitializeBuildIndex: diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index d8bfa0c69d92..c3e0c5a65ae9 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -2355,4 +2356,22 @@ namespace NSchemeShardUT_Private { UNIT_ASSERT_C(successIsExpected == (status == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED), "Status: " << ev->Get()->Record.GetStatus() << " Issues: " << ev->Get()->Record.GetIssues()); } + + void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path) { + auto request = MakeHolder(path); + runtime.Send(new IEventHandle(NSequenceProxy::MakeSequenceProxyServiceID(), sender, request.Release())); + } + + i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender) { + auto ev = runtime.GrabEdgeEventRethrow(sender); + auto* msg = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(msg->Status, Ydb::StatusIds::SUCCESS); + return msg->Value; + } + + i64 DoNextVal(TTestActorRuntime& runtime, const TString& path) { + auto sender = runtime.AllocateEdgeActor(0); + SendNextValRequest(runtime, sender, path); + return WaitNextValResult(runtime, sender); + } } diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index ddd1153e6c3f..1c2e5df68d4b 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -552,4 +552,8 @@ namespace NSchemeShardUT_Private { void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector& keyTags, const TVector& valueTags, const TVector& recordIds); void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected = true); + void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path); + i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender); + i64 DoNextVal(TTestActorRuntime& runtime, const TString& path); + } //NSchemeShardUT_Private diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index d4c7a1db3bea..85d8c0399302 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -569,7 +570,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe set.SetMinimumStepSeconds(5); // 5 seconds } // - + for (const auto& sid : opts.SystemBackupSIDs_) { app.AddSystemBackupSID(sid); } @@ -615,6 +616,13 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe runtime.RegisterService(MakeTxProxyID(), txProxyId, node); } + // Create sequence proxies + for (size_t i = 0; i < runtime.GetNodeCount(); ++i) { + IActor* sequenceProxy = NSequenceProxy::CreateSequenceProxy(); + TActorId sequenceProxyId = runtime.Register(sequenceProxy, i); + runtime.RegisterService(NSequenceProxy::MakeSequenceProxyServiceID(), sequenceProxyId, i); + } + //SetupBoxAndStoragePool(runtime, sender, TTestTxConfig::DomainUid); TxReliablePropose = runtime.Register(new TTxReliablePropose(schemeRoot)); diff --git a/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp b/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp index 9ecada64c2eb..d7ddeee9f041 100644 --- a/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp +++ b/ydb/core/tx/schemeshard/ut_sequence/ut_sequence.cpp @@ -1,3 +1,4 @@ +#include #include using namespace NKikimr::NSchemeShard; @@ -359,10 +360,14 @@ Y_UNIT_TEST_SUITE(TSequence) { Name: "myseq" } )"); + env.TestWaitNotification(runtime, txId); TestCopyTable(runtime, ++txId, "/MyRoot", "copy", "/MyRoot/Table"); env.TestWaitNotification(runtime, txId); + + TestLs(runtime, "/MyRoot/copy/myseq", TDescribeOptionsBuilder().SetShowPrivateTable(true), NLs::PathExist); + env.TestWaitNotification(runtime, txId); } } // Y_UNIT_TEST_SUITE(TSequence) diff --git a/ydb/core/tx/schemeshard/ut_sequence/ya.make b/ydb/core/tx/schemeshard/ut_sequence/ya.make index d6ba601923dd..d48d9e399ec9 100644 --- a/ydb/core/tx/schemeshard/ut_sequence/ya.make +++ b/ydb/core/tx/schemeshard/ut_sequence/ya.make @@ -20,6 +20,7 @@ PEERDIR( ydb/core/testlib/default ydb/core/tx ydb/core/tx/columnshard + ydb/core/tx/datashard ydb/core/tx/schemeshard/ut_helpers ydb/library/yql/public/udf/service/exception_policy ) diff --git a/ydb/core/tx/schemeshard/ut_sequence_reboots/ut_sequence_reboots.cpp b/ydb/core/tx/schemeshard/ut_sequence_reboots/ut_sequence_reboots.cpp index d0553d6ad40e..b6fa29f79618 100644 --- a/ydb/core/tx/schemeshard/ut_sequence_reboots/ut_sequence_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_sequence_reboots/ut_sequence_reboots.cpp @@ -23,11 +23,9 @@ Y_UNIT_TEST_SUITE(TSequenceReboots) { )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); - TestLs(runtime, "/MyRoot/seq", false, NLs::PathExist); - { TInactiveZone inactive(activeZone); - // no inactive finalization + TestLs(runtime, "/MyRoot/seq", false, NLs::PathExist); } }); } @@ -105,14 +103,20 @@ Y_UNIT_TEST_SUITE(TSequenceReboots) { {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusAlreadyExists, NKikimrScheme::StatusMultipleModifications}); t.TestEnv->TestWaitNotification(runtime, t.TxId); - TestLs(runtime, "/MyRoot/seq", false, NLs::PathExist); + { + TInactiveZone inactive(activeZone); + TestLs(runtime, "/MyRoot/seq", false, NLs::PathExist); + } t.TestEnv->ReliablePropose(runtime, DropSequenceRequest(++t.TxId, "/MyRoot", "seq"), {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusMultipleModifications}); t.TestEnv->TestWaitNotification(runtime, t.TxId); - TestLs(runtime, "/MyRoot/seq", false, NLs::PathNotExist); + { + TInactiveZone inactive(activeZone); + TestLs(runtime, "/MyRoot/seq", false, NLs::PathNotExist); + } t.TestEnv->ReliablePropose(runtime, CreateSequenceRequest(++t.TxId, "/MyRoot", R"( @@ -121,11 +125,9 @@ Y_UNIT_TEST_SUITE(TSequenceReboots) { {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusAlreadyExists, NKikimrScheme::StatusMultipleModifications}); t.TestEnv->TestWaitNotification(runtime, t.TxId); - TestLs(runtime, "/MyRoot/seq", false, NLs::PathExist); - { TInactiveZone inactive(activeZone); - // no inactive finalization + TestLs(runtime, "/MyRoot/seq", false, NLs::PathExist); } }); } @@ -163,19 +165,69 @@ Y_UNIT_TEST_SUITE(TSequenceReboots) { {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusAlreadyExists, NKikimrScheme::StatusMultipleModifications}); t.TestEnv->TestWaitNotification(runtime, t.TxId); - TestLs(runtime, "/MyRoot/Table/seq1", TDescribeOptionsBuilder().SetShowPrivateTable(true), NLs::PathExist); - TestLs(runtime, "/MyRoot/Table/seq2", TDescribeOptionsBuilder().SetShowPrivateTable(true), NLs::PathExist); + { + TInactiveZone inactive(activeZone); + TestLs( + runtime, "/MyRoot/Table/seq1", TDescribeOptionsBuilder().SetShowPrivateTable(true), NLs::PathExist); + TestLs( + runtime, "/MyRoot/Table/seq2", TDescribeOptionsBuilder().SetShowPrivateTable(true), NLs::PathExist); + + } t.TestEnv->ReliablePropose(runtime, DropTableRequest(++t.TxId, "/MyRoot", "Table"), {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusPathDoesNotExist, NKikimrScheme::StatusMultipleModifications}); t.TestEnv->TestWaitNotification(runtime, t.TxId); - TestLs(runtime, "/MyRoot/Table", false, NLs::PathNotExist); + { + TInactiveZone inactive(activeZone); + TestLs(runtime, "/MyRoot/Table", false, NLs::PathNotExist); + } + }); + } + + Y_UNIT_TEST(CopyTableWithSequence) { + TTestWithReboots t(false); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::SEQUENCESHARD, NActors::NLog::PRI_TRACE); + + { + TInactiveZone inactive(activeZone); + TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" DefaultFromSequence: "myseq" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "ValueIndex" + KeyColumnNames: ["value"] + } + SequenceDescription { + Name: "myseq" + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + i64 value = DoNextVal(runtime, "/MyRoot/Table/myseq"); + UNIT_ASSERT_VALUES_EQUAL(value, 1); + } + + t.TestEnv->ReliablePropose(runtime, CopyTableRequest(++t.TxId, "/MyRoot", "copy", "/MyRoot/Table"), + {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusAlreadyExists, + NKikimrScheme::StatusMultipleModifications}); + t.TestEnv->TestWaitNotification(runtime, t.TxId); { TInactiveZone inactive(activeZone); - // no inactive finalization + + TestLs( + runtime, "/MyRoot/copy/myseq", TDescribeOptionsBuilder().SetShowPrivateTable(true), NLs::PathExist); + + i64 value = DoNextVal(runtime, "/MyRoot/copy/myseq"); + UNIT_ASSERT_VALUES_EQUAL(value, 2); } }); } diff --git a/ydb/core/tx/schemeshard/ut_sequence_reboots/ya.make b/ydb/core/tx/schemeshard/ut_sequence_reboots/ya.make index d431a016bb5d..8e9c98748570 100644 --- a/ydb/core/tx/schemeshard/ut_sequence_reboots/ya.make +++ b/ydb/core/tx/schemeshard/ut_sequence_reboots/ya.make @@ -2,7 +2,7 @@ UNITTEST_FOR(ydb/core/tx/schemeshard) FORK_SUBTESTS() -SPLIT_FACTOR(2) +SPLIT_FACTOR(6) IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) TIMEOUT(3600) diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 7dac75c00b8c..a8c455c13c2d 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -102,6 +102,7 @@ SRCS( schemeshard__operation_common_external_table.cpp schemeshard__operation_common_subdomain.h schemeshard__operation_consistent_copy_tables.cpp + schemeshard__operation_copy_sequence.cpp schemeshard__operation_copy_table.cpp schemeshard__operation_create_backup.cpp schemeshard__operation_create_bsv.cpp diff --git a/ydb/core/tx/sequenceshard/public/events.h b/ydb/core/tx/sequenceshard/public/events.h index 599bd718c180..62187515d291 100644 --- a/ydb/core/tx/sequenceshard/public/events.h +++ b/ydb/core/tx/sequenceshard/public/events.h @@ -24,6 +24,8 @@ namespace NSequenceShard { EvRestoreSequenceResult, EvRedirectSequence, EvRedirectSequenceResult, + EvGetSequence, + EvGetSequenceResult, EvEnd, }; @@ -97,6 +99,11 @@ namespace NSequenceShard { return std::move(*this); } + TBuilder&& SetFrozen(bool frozen) && { + Msg->Record.SetFrozen(frozen); + return std::move(*this); + } + THolder Done() && { return std::move(Msg); } @@ -323,6 +330,11 @@ namespace NSequenceShard { InitFrom(record); } + TEvRestoreSequence(const TPathId& pathId, const NKikimrTxSequenceShard::TEvGetSequenceResult& record) { + SetPathId(pathId); + InitFrom(record); + } + void SetPathId(const TPathId& pathId) { auto* p = Record.MutablePathId(); p->SetOwnerId(pathId.OwnerId); @@ -344,6 +356,17 @@ namespace NSequenceShard { Record.SetIncrement(record.GetIncrement()); Record.SetCycle(record.GetCycle()); } + + void InitFrom(const NKikimrTxSequenceShard::TEvGetSequenceResult& record) { + Record.SetMinValue(record.GetMinValue()); + Record.SetMaxValue(record.GetMaxValue()); + Record.SetStartValue(record.GetStartValue()); + Record.SetNextValue(record.GetNextValue()); + Record.SetNextUsed(record.GetNextUsed()); + Record.SetCache(record.GetCache()); + Record.SetIncrement(record.GetIncrement()); + Record.SetCycle(record.GetCycle()); + } }; struct TEvRestoreSequenceResult @@ -394,6 +417,40 @@ namespace NSequenceShard { } }; + struct TEvGetSequence + : public TEventPB + { + TEvGetSequence() = default; + + explicit TEvGetSequence(const TPathId& pathId) { + SetPathId(pathId); + } + + void SetPathId(const TPathId& pathId) { + auto* p = Record.MutablePathId(); + p->SetOwnerId(pathId.OwnerId); + p->SetLocalId(pathId.LocalPathId); + } + + TPathId GetPathId() const { + const auto& p = Record.GetPathId(); + return TPathId(p.GetOwnerId(), p.GetLocalId()); + } + }; + + struct TEvGetSequenceResult + : public TEventPB + { + using EStatus = NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus; + + TEvGetSequenceResult() = default; + + TEvGetSequenceResult(EStatus status, ui64 origin) { + Record.SetStatus(status); + Record.SetOrigin(origin); + } + }; + }; } // namespace NSequenceShard diff --git a/ydb/core/tx/sequenceshard/sequenceshard_impl.cpp b/ydb/core/tx/sequenceshard/sequenceshard_impl.cpp index c23602e643c8..ed88fcbd4233 100644 --- a/ydb/core/tx/sequenceshard/sequenceshard_impl.cpp +++ b/ydb/core/tx/sequenceshard/sequenceshard_impl.cpp @@ -91,6 +91,7 @@ namespace NSequenceShard { HFunc(TEvSequenceShard::TEvFreezeSequence, Handle); HFunc(TEvSequenceShard::TEvRestoreSequence, Handle); HFunc(TEvSequenceShard::TEvRedirectSequence, Handle); + HFunc(TEvSequenceShard::TEvGetSequence, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { diff --git a/ydb/core/tx/sequenceshard/sequenceshard_impl.h b/ydb/core/tx/sequenceshard/sequenceshard_impl.h index 0a36fee2b17b..e3e39b6c867c 100644 --- a/ydb/core/tx/sequenceshard/sequenceshard_impl.h +++ b/ydb/core/tx/sequenceshard/sequenceshard_impl.h @@ -74,6 +74,7 @@ namespace NSequenceShard { struct TTxFreezeSequence; struct TTxRestoreSequence; struct TTxRedirectSequence; + struct TTxGetSequence; void RunTxInitSchema(const TActorContext& ctx); void RunTxInit(const TActorContext& ctx); @@ -100,6 +101,7 @@ namespace NSequenceShard { void Handle(TEvSequenceShard::TEvFreezeSequence::TPtr& ev, const TActorContext& ctx); void Handle(TEvSequenceShard::TEvRestoreSequence::TPtr& ev, const TActorContext& ctx); void Handle(TEvSequenceShard::TEvRedirectSequence::TPtr& ev, const TActorContext& ctx); + void Handle(TEvSequenceShard::TEvGetSequence::TPtr& ev, const TActorContext& ctx); private: struct TPipeInfo { diff --git a/ydb/core/tx/sequenceshard/tx_create_sequence.cpp b/ydb/core/tx/sequenceshard/tx_create_sequence.cpp index e362cda7beaa..38822c47c10e 100644 --- a/ydb/core/tx/sequenceshard/tx_create_sequence.cpp +++ b/ydb/core/tx/sequenceshard/tx_create_sequence.cpp @@ -64,6 +64,16 @@ namespace NSequenceShard { sequence.StartValue = sequence.MaxValue; } } + + bool frozen = msg->Record.GetFrozen(); + if (frozen) { + sequence.State = Schema::ESequenceState::Frozen; + } + + if (msg->Record.OptionalCycle_case() == NKikimrTxSequenceShard::TEvCreateSequence::kCycle) { + sequence.Cycle = msg->Record.GetCycle(); + } + sequence.NextValue = sequence.StartValue; sequence.NextUsed = false; if (msg->Record.OptionalCache_case() == NKikimrTxSequenceShard::TEvCreateSequence::kCache) { @@ -82,7 +92,8 @@ namespace NSequenceShard { NIceDb::TUpdate(sequence.NextUsed), NIceDb::TUpdate(sequence.Cache), NIceDb::TUpdate(sequence.Increment), - NIceDb::TUpdate(sequence.Cycle)); + NIceDb::TUpdate(sequence.Cycle), + NIceDb::TUpdate(sequence.State)); SetResult(NKikimrTxSequenceShard::TEvCreateSequenceResult::SUCCESS); SLOG_N("TTxCreateSequence.Execute SUCCESS" << " PathId# " << pathId @@ -91,7 +102,8 @@ namespace NSequenceShard { << " StartValue# " << sequence.StartValue << " Cache# " << sequence.Cache << " Increment# " << sequence.Increment - << " Cycle# " << (sequence.Cycle ? "true" : "false")); + << " Cycle# " << (sequence.Cycle ? "true" : "false") + << " State# " << (frozen ? "Frozen" : "Active")); return true; } diff --git a/ydb/core/tx/sequenceshard/tx_get_sequence.cpp b/ydb/core/tx/sequenceshard/tx_get_sequence.cpp new file mode 100644 index 000000000000..53a95cb8b017 --- /dev/null +++ b/ydb/core/tx/sequenceshard/tx_get_sequence.cpp @@ -0,0 +1,93 @@ +#include "sequenceshard_impl.h" + +namespace NKikimr { +namespace NSequenceShard { + + struct TSequenceShard::TTxGetSequence : public TTxBase { + explicit TTxGetSequence(TSelf* self, TEvSequenceShard::TEvGetSequence::TPtr&& ev) + : TTxBase(self) + , Ev(std::move(ev)) + { } + + TTxType GetTxType() const override { return TXTYPE_GET_SEQUENCE; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + Y_UNUSED(txc); + + const auto* msg = Ev->Get(); + + auto pathId = msg->GetPathId(); + + SLOG_T("TTxGetSequence.Execute" + << " PathId# " << pathId); + + if (!Self->CheckPipeRequest(Ev->Recipient)) { + SetResult(NKikimrTxSequenceShard::TEvGetSequenceResult::PIPE_OUTDATED); + SLOG_T("TTxGetSequence.Execute PIPE_OUTDATED" + << " PathId# " << pathId); + return true; + } + + auto it = Self->Sequences.find(pathId); + if (it == Self->Sequences.end()) { + SetResult(NKikimrTxSequenceShard::TEvGetSequenceResult::SEQUENCE_NOT_FOUND); + SLOG_T("TTxGetSequence.Execute SEQUENCE_NOT_FOUND" + << " PathId# " << pathId); + return true; + } + + auto& sequence = it->second; + switch (sequence.State) { + case Schema::ESequenceState::Active: + break; + case Schema::ESequenceState::Frozen: + break; + case Schema::ESequenceState::Moved: { + SetResult(NKikimrTxSequenceShard::TEvGetSequenceResult::SEQUENCE_MOVED); + Result->Record.SetMovedTo(sequence.MovedTo); + SLOG_T("TTxGetSequence.Execute SEQUENCE_MOVED" + << " PathId# " << pathId + << " MovedTo# " << sequence.MovedTo); + return true; + } + } + + SetResult(NKikimrTxSequenceShard::TEvGetSequenceResult::SUCCESS); + Result->Record.SetMinValue(sequence.MinValue); + Result->Record.SetMaxValue(sequence.MaxValue); + Result->Record.SetStartValue(sequence.StartValue); + Result->Record.SetNextValue(sequence.NextValue); + Result->Record.SetNextUsed(sequence.NextUsed); + Result->Record.SetCache(sequence.Cache); + Result->Record.SetIncrement(sequence.Increment); + Result->Record.SetCycle(sequence.Cycle); + SLOG_N("TTxGetSequence.Execute SUCCESS" + << " PathId# " << pathId); + return true; + } + + void Complete(const TActorContext& ctx) override { + SLOG_T("TTxGetSequence.Complete"); + + if (Result) { + ctx.Send(Ev->Sender, Result.Release(), 0, Ev->Cookie); + } + } + + void SetResult(NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus status) { + Result.Reset(new TEvSequenceShard::TEvGetSequenceResult(status, Self->TabletID())); + Result->Record.SetTxId(Ev->Get()->Record.GetTxId()); + Result->Record.SetTxPartId(Ev->Get()->Record.GetTxPartId()); + } + + TEvSequenceShard::TEvGetSequence::TPtr Ev; + THolder Result; + }; + + + void TSequenceShard::Handle(TEvSequenceShard::TEvGetSequence::TPtr& ev, const TActorContext& ctx) { + Execute(new TTxGetSequence(this, std::move(ev)), ctx); + } + +} // namespace NSequenceShard +} // namespace NKikimr diff --git a/ydb/core/tx/sequenceshard/ut_helpers.cpp b/ydb/core/tx/sequenceshard/ut_helpers.cpp index 8736616f2704..a73622975575 100644 --- a/ydb/core/tx/sequenceshard/ut_helpers.cpp +++ b/ydb/core/tx/sequenceshard/ut_helpers.cpp @@ -268,5 +268,30 @@ namespace NSequenceShard { return NextRedirectSequenceResult(cookie, edge); } + void TTestContext::SendGetSequence(ui64 cookie, const TActorId& edge, const TPathId& pathId) + { + SendFromEdge( + edge, + new TEvSequenceShard::TEvGetSequence(pathId), + cookie); + } + + THolder TTestContext::NextGetSequenceResult( + ui64 cookie, const TActorId& edge) + { + auto result = ExpectEdgeEvent(edge, cookie); + UNIT_ASSERT_VALUES_EQUAL(result->Record.GetOrigin(), TabletId); + return result; + } + + THolder TTestContext::GetSequence( + const TPathId& pathId) + { + ui64 cookie = RandomNumber(); + auto edge = Runtime->AllocateEdgeActor(); + SendGetSequence(cookie, edge, pathId); + return NextGetSequenceResult(cookie, edge); + } + } // namespace NSequenceShard } // namespace NKikimr diff --git a/ydb/core/tx/sequenceshard/ut_helpers.h b/ydb/core/tx/sequenceshard/ut_helpers.h index 396e963894fe..9ad22c03fce3 100644 --- a/ydb/core/tx/sequenceshard/ut_helpers.h +++ b/ydb/core/tx/sequenceshard/ut_helpers.h @@ -101,6 +101,10 @@ namespace NSequenceShard { ui64 cookie, const TActorId& edge); THolder RedirectSequence( const TPathId& pathId, ui64 redirectTo); + + void SendGetSequence(ui64 cookie, const TActorId& edge, const TPathId& pathId); + THolder NextGetSequenceResult(ui64 cookie, const TActorId& edge); + THolder GetSequence(const TPathId& pathId); }; } // namespace NSequenceShard diff --git a/ydb/core/tx/sequenceshard/ut_sequenceshard.cpp b/ydb/core/tx/sequenceshard/ut_sequenceshard.cpp index 9e082db7c6dc..1aa67fb78f4a 100644 --- a/ydb/core/tx/sequenceshard/ut_sequenceshard.cpp +++ b/ydb/core/tx/sequenceshard/ut_sequenceshard.cpp @@ -165,6 +165,15 @@ namespace NSequenceShard { UNIT_ASSERT_VALUES_EQUAL(allocateResult->Record.GetAllocationStart(), 200011); UNIT_ASSERT_VALUES_EQUAL(allocateResult->Record.GetAllocationCount(), 5u); } + + // get sequence + { + auto getResult = ctx.GetSequence(TPathId(123, 51)); + UNIT_ASSERT_VALUES_EQUAL(getResult->Record.GetStatus(), + NKikimrTxSequenceShard::TEvGetSequenceResult::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(getResult->Record.GetNextValue(), 200016); + UNIT_ASSERT_VALUES_EQUAL(getResult->Record.GetCache(), 5u); + } } Y_UNIT_TEST(MarkedPipeRetries) { diff --git a/ydb/core/tx/sequenceshard/ya.make b/ydb/core/tx/sequenceshard/ya.make index b47e4534140e..b4befe185615 100644 --- a/ydb/core/tx/sequenceshard/ya.make +++ b/ydb/core/tx/sequenceshard/ya.make @@ -17,6 +17,7 @@ SRCS( tx_create_sequence.cpp tx_drop_sequence.cpp tx_freeze_sequence.cpp + tx_get_sequence.cpp tx_init.cpp tx_init_schema.cpp tx_mark_schemeshard_pipe.cpp diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index 28f45b421993..1c1a3b761f0a 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -128,6 +128,11 @@ bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out, const Ydb::Table::CreateTableRequest& in, const TTableProfiles& profiles, Ydb::StatusIds::StatusCode& status, TString& error, bool indexedTable = false); + +// out +void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, + const NKikimrSchemeOp::TTableDescription& in); + // out void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in);