diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index 4c650607cbde..88b7197d72f2 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -112,14 +112,12 @@ THolder TSchemeShard::IgniteOperation(TProposeRequest& request } TOperation::TPtr operation = new TOperation(txId); - Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose for (const auto& transaction : record.GetTransaction()) { auto quotaResult = operation->ConsumeQuota(transaction, context); if (quotaResult.Status != NKikimrScheme::StatusSuccess) { response.Reset(new TProposeResponse(quotaResult.Status, ui64(txId), ui64(selfId))); response->SetError(quotaResult.Status, quotaResult.Reason); - Operations.erase(txId); return std::move(response); } } @@ -139,7 +137,6 @@ THolder TSchemeShard::IgniteOperation(TProposeRequest& request if (splitResult.Status != NKikimrScheme::StatusSuccess) { response.Reset(new TProposeResponse(splitResult.Status, ui64(txId), ui64(selfId))); response->SetError(splitResult.Status, splitResult.Reason); - Operations.erase(txId); return std::move(response); } @@ -148,11 +145,15 @@ THolder TSchemeShard::IgniteOperation(TProposeRequest& request const TString owner = record.HasOwner() ? record.GetOwner() : BUILTIN_ACL_ROOT; + bool prevProposeUndoSafe = true; + + Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose + for (const auto& transaction : transactions) { auto parts = operation->ConstructParts(transaction, context); if (parts.size() > 1) { - // les't allow altering impl index tables as part of consistent operation + // allow altering impl index tables as part of consistent operation context.IsAllowedPrivateTables = true; } @@ -206,18 +207,21 @@ THolder TSchemeShard::IgniteOperation(TProposeRequest& request << ", with reason: " << response->Record.GetReason() << ", tx message: " << PrintSecurely(record)); - context.OnComplete = {}; // recreate - context.DbChanges = {}; + AbortOperationPropose(txId, context); - for (auto& toAbort : operation->Parts) { - toAbort->AbortPropose(context); - } + return std::move(response); + } - context.MemChanges.UnDo(context.SS); - context.OnComplete.ApplyOnExecute(context.SS, context.GetTxc(), context.Ctx); - Operations.erase(txId); + // Check suboperations for undo safety. Log first unsafe suboperation in the schema transaction. + if (prevProposeUndoSafe && !context.IsUndoChangesSafe()) { + prevProposeUndoSafe = false; - return std::move(response); + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Operation part proposed ok, but propose itself is undo unsafe" + << ", suboperation type: " << NKikimrSchemeOp::EOperationType_Name(part->GetTransaction().GetOperationType()) + << ", opId: " << part->GetOperationId() + << ", at schemeshard: " << selfId + ); } } } @@ -225,6 +229,55 @@ THolder TSchemeShard::IgniteOperation(TProposeRequest& request return std::move(response); } +void TSchemeShard::AbortOperationPropose(const TTxId txId, TOperationContext& context) { + Y_ABORT_UNLESS(Operations.contains(txId)); + TOperation::TPtr operation = Operations.at(txId); + + // Drop operation side effects, undo memory changes + // (Local db changes were already applied) + context.OnComplete = {}; + context.DbChanges = {}; + + for (auto& i : operation->Parts) { + i->AbortPropose(context); + } + + context.MemChanges.UnDo(context.SS); + + // And remove aborted operation from existence + Operations.erase(txId); +} + +void AbortOperation(TOperationContext& context, const TTxId txId, const TString& reason) { + LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxOperationPropose Execute" + << ", txId: " << txId + << ", operation is rejected and all changes reverted" + << ", " << reason + << ", at schemeshard: " << context.SS->SelfTabletId() + ); + + context.GetTxc().DB.RollbackChanges(); + context.SS->AbortOperationPropose(txId, context); +} + +bool IsCommitRedoSizeOverLimit(TString* reason, TOperationContext& context) { + // MaxCommitRedoMB is the ICB control shared with NTabletFlatExecutor::TExecutor. + // We subtract from MaxCommitRedoMB additional 1MB for anything extra + // that executor/tablet may (or may not) add under the hood + const ui64 limitBytes = (context.SS->MaxCommitRedoMB - 1) << 20; // MB to bytes + const ui64 commitRedoBytes = context.GetTxc().DB.GetCommitRedoBytes(); + if (commitRedoBytes >= limitBytes) { + *reason = TStringBuilder() + << "local tx commit redo size generated by IgniteOperation() is more than allowed limit: " + << "commit redo size " << commitRedoBytes + << ", limit " << limitBytes + << ", excess " << (commitRedoBytes - limitBytes) + ; + return true; + } + return false; +} + struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransactionBase { using TBase = NTabletFlatExecutor::TTransactionBase; @@ -244,6 +297,7 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override { TTabletId selfId = Self->SelfTabletId(); + auto txId = TTxId(Request->Get()->Record.GetTxId()); LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxOperationPropose Execute" @@ -254,7 +308,6 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti auto [userToken, tokenParseError] = ParseUserToken(Request->Get()->Record.GetUserToken()); if (tokenParseError) { - auto txId = Request->Get()->Record.GetTxId(); Response = MakeHolder(NKikimrScheme::StatusInvalidParameter, ui64(txId), ui64(selfId), "Failed to parse user token"); return true; } @@ -266,10 +319,52 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti TStorageChanges dbChanges; TOperationContext context{Self, txc, ctx, OnComplete, memChanges, dbChanges, std::move(userToken)}; + //NOTE: Successful IgniteOperation will leave created operation in Self->Operations and accumulated changes in the context. + // Unsuccessful IgniteOperation will leave no operation and context will also be clean. Response = Self->IgniteOperation(*Request->Get(), context); - OnComplete.ApplyOnExecute(Self, txc, ctx); + //NOTE: Successfully created operation also must be checked for the size of this local tx. + // + // Limitation on a commit redo size of local transactions is imposed at the tablet executor level + // (See ydb/core/tablet_flat/flat_executor.cpp, NTabletFlatExecutor::TExecutor::ExecuteTransaction()). + // And a tablet violating that limit is considered broken and will be stopped unconditionally and immediately. + // + // So even if operation was ignited successfully, it's local tx size still must be checked + // as a precaution measure to avoid infinite loop of schemeshard restarting, attempting to propose + // persisted operation again, hitting commit redo size limit and restarting again. + // + // On unsuccessful check, local tx should be rolled back, operation should be rejected and + // all accumulated changes dropped or reverted. + // + + // Actually build commit redo (dbChanges could be empty) dbChanges.Apply(Self, txc, ctx); + + if (Self->Operations.contains(txId)) { + Y_ABORT_UNLESS(Response->IsDone() || Response->IsAccepted() || Response->IsConditionalAccepted()); + + // Check local tx commit redo size + TString reason; + if (IsCommitRedoSizeOverLimit(&reason, context)) { + Response = MakeHolder(NKikimrScheme::StatusSchemeError, ui64(txId), ui64(selfId), reason); + + AbortOperation(context, txId, reason); + + if (!context.IsUndoChangesSafe()) { + LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxOperationPropose Execute" + << ", opId: " << txId + << ", operation should be rejected and all changes be reverted" + << ", but context.IsUndoChangesSafe is false, which means some direct writes have been done" + << ", message: " << SecureDebugString(Request->Get()->Record) + << ", at schemeshard: " << context.SS->SelfTabletId() + ); + } + } + } + + // Apply accumulated changes (changes could be empty) + OnComplete.ApplyOnExecute(Self, txc, ctx); + return true; } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 71633c3a992b..99587f3defa5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -4358,6 +4358,8 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { appData->Icb->RegisterSharedControl(DisablePublicationsOfDropping, "SchemeShard_DisablePublicationsOfDropping"); appData->Icb->RegisterSharedControl(FillAllocatePQ, "SchemeShard_FillAllocatePQ"); + appData->Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB"); + AllowDataColumnForIndexTable = appData->FeatureFlags.GetEnableDataColumnForIndexTable(); appData->Icb->RegisterSharedControl(AllowDataColumnForIndexTable, "SchemeShard_AllowDataColumnForIndexTable"); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index d24671b5524d..ad5d206ce9e6 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -181,6 +181,9 @@ class TSchemeShard TControlWrapper DisablePublicationsOfDropping; TControlWrapper FillAllocatePQ; + // Shared with NTabletFlatExecutor::TExecutor + TControlWrapper MaxCommitRedoMB; + TSplitSettings SplitSettings; struct TTenantInitState { @@ -350,6 +353,8 @@ class TSchemeShard NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})}; THolder IgniteOperation(TProposeRequest& request, TOperationContext& context); + void AbortOperationPropose(const TTxId txId, TOperationContext& context); + THolder MakeDataShardProposal(const TPathId& pathId, const TOperationId& opId, const TString& body, const TActorContext& ctx) const; @@ -399,7 +404,7 @@ class TSchemeShard return MakeLocalId(NextLocalPathId); } - TPathId AllocatePathId () { + TPathId AllocatePathId() { TPathId next = PeekNextPathId(); ++NextLocalPathId; return next; diff --git a/ydb/core/tx/schemeshard/ut_base/ut_commit_redo_limit.cpp b/ydb/core/tx/schemeshard/ut_base/ut_commit_redo_limit.cpp new file mode 100644 index 000000000000..260b4765d771 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_base/ut_commit_redo_limit.cpp @@ -0,0 +1,113 @@ +#include + +using namespace NKikimr; +using namespace NSchemeShard; +using namespace NSchemeShardUT_Private; + +Y_UNIT_TEST_SUITE(TSchemeShardCheckProposeSize) { + + //TODO: can't check all operations as many of them do not implement + // TSubOperation::AbortPropose() properly and will abort. + + Y_UNIT_TEST(CopyTable) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + // Take control over MaxCommitRedoMB ICB setting. + // Drop down its min-value limit to be able to set it as low as test needs. + TControlWrapper MaxCommitRedoMB; + { + runtime.GetAppData().Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB"); + MaxCommitRedoMB.Reset(200, 1, 4096); + } + + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "table" + Columns { Name: "key" Type: "Uint64"} + Columns { Name: "value" Type: "Utf8"} + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + // 1. Set MaxCommitRedoMB to 1 and try to create table. + // + // (Check at the operation's Propose tests commit redo size against (MaxCommitRedoMB - 1) + // to give 1MB leeway to executer/tablet inner stuff to may be do "something extra". + // So MaxCommitRedoMB = 1 means effective 0 for the size of operation's commit.) + { + MaxCommitRedoMB = 1; + AsyncCopyTable(runtime, ++txId, "/MyRoot", "table-copy", "/MyRoot/table"); + TestModificationResults(runtime, txId, + {{NKikimrScheme::StatusSchemeError, "local tx commit redo size generated by IgniteOperation() is more than allowed limit"}} + ); + env.TestWaitNotification(runtime, txId); + } + + // 2. Set MaxCommitRedoMB back to high value and try again. + { + MaxCommitRedoMB = 200; + AsyncCopyTable(runtime, ++txId, "/MyRoot", "table-copy", "/MyRoot/table"); + env.TestWaitNotification(runtime, txId); + } + } + + Y_UNIT_TEST(CopyTables) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + // Take control over MaxCommitRedoMB ICB setting. + // Drop down its min-value limit to be able to set it as low as test needs. + TControlWrapper MaxCommitRedoMB; + { + runtime.GetAppData().Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB"); + MaxCommitRedoMB.Reset(200, 1, 4096); + } + + const ui64 tables = 100; + const ui64 shardsPerTable = 1; + + ui64 txId = 100; + + for (ui64 i : xrange(tables)) { + TestCreateTable(runtime, ++txId, "/MyRoot", Sprintf( + R"( + Name: "table-%lu" + Columns { Name: "key" Type: "Uint64"} + Columns { Name: "value" Type: "Utf8"} + KeyColumnNames: ["key"] + UniformPartitionsCount: %lu + )", + i, + shardsPerTable + )); + env.TestWaitNotification(runtime, txId); + } + + auto testCopyTables = [](auto& runtime, ui64 txId, ui64 tables) { + TVector schemeTxs; + for (ui64 i : xrange(tables)) { + schemeTxs.push_back(CopyTableRequest(txId, "/MyRoot", Sprintf("table-%lu-copy", i), Sprintf("/MyRoot/table-%lu", i))); + } + AsyncSend(runtime, TTestTxConfig::SchemeShard, CombineSchemeTransactions(schemeTxs)); + }; + + // 1. Set MaxCommitRedoMB to 1 and try to copy tables. + { + MaxCommitRedoMB = 1; + testCopyTables(runtime, ++txId, tables); + TestModificationResults(runtime, txId, + {{NKikimrScheme::StatusSchemeError, "local tx commit redo size generated by IgniteOperation() is more than allowed limit"}} + ); + } + + // 2. Set MaxCommitRedoMB back to high value and try again. + { + MaxCommitRedoMB = 200; + testCopyTables(runtime, ++txId, tables); + TestModificationResults(runtime, txId, {{NKikimrScheme::StatusAccepted}}); + } + } + +} diff --git a/ydb/core/tx/schemeshard/ut_base/ya.make b/ydb/core/tx/schemeshard/ut_base/ya.make index d6716aa43a33..b48df0c52912 100644 --- a/ydb/core/tx/schemeshard/ut_base/ya.make +++ b/ydb/core/tx/schemeshard/ut_base/ya.make @@ -28,6 +28,7 @@ SRCS( ut_info_types.cpp ut_allocate_pq.cpp ut_table_pg_types.cpp + ut_commit_redo_limit.cpp ) END()