From ebee36a43aa4cc79b6e6da0e7a9f347cef5f0a09 Mon Sep 17 00:00:00 2001 From: ijon Date: Thu, 18 Jul 2024 16:51:26 +0300 Subject: [PATCH] schemeshard: reject operations with too big local tx commit (#6760) Add commit redo size check for successfully ignited operations as a precaution measure to avoid infinite loop of schemeshard hitting local tx commit redo size limit, restarting, attempting to propose persisted operation again, hitting commit redo size limit again, restarting and so on. This could happen with inherently massive operations such as copy-tables used as a starting step of database export/backup. Coping large number of tables with huge number of partitions can result in so large TTxOperationPropose local transaction that its size would hit the limit imposed by the tablet executor. Tablet violating that limit is considered broken and will be immediately stopped. See ydb/core/tablet_flat/flat_executor.cpp, NTabletFlatExecutor::TExecutor::ExecuteTransaction(). KIKIMR-21751 --- .../tx/schemeshard/schemeshard__operation.cpp | 125 +++++++++++++++--- ydb/core/tx/schemeshard/schemeshard_impl.cpp | 2 + ydb/core/tx/schemeshard/schemeshard_impl.h | 7 +- .../ut_base/ut_commit_redo_limit.cpp | 113 ++++++++++++++++ ydb/core/tx/schemeshard/ut_base/ya.make | 1 + 5 files changed, 232 insertions(+), 16 deletions(-) create mode 100644 ydb/core/tx/schemeshard/ut_base/ut_commit_redo_limit.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index ee7de50fa0a1..0343f5a82b11 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -104,14 +104,12 @@ THolder TSchemeShard::IgniteOperation(TProposeRequest& request } TOperation::TPtr operation = new TOperation(txId); - Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose for (const auto& transaction : record.GetTransaction()) { auto quotaResult = operation->ConsumeQuota(transaction, context); if (quotaResult.Status != NKikimrScheme::StatusSuccess) { response.Reset(new TProposeResponse(quotaResult.Status, ui64(txId), ui64(selfId))); response->SetError(quotaResult.Status, quotaResult.Reason); - Operations.erase(txId); return std::move(response); } } @@ -131,7 +129,6 @@ THolder TSchemeShard::IgniteOperation(TProposeRequest& request if (splitResult.Status != NKikimrScheme::StatusSuccess) { response.Reset(new TProposeResponse(splitResult.Status, ui64(txId), ui64(selfId))); response->SetError(splitResult.Status, splitResult.Reason); - Operations.erase(txId); return std::move(response); } @@ -140,11 +137,15 @@ THolder TSchemeShard::IgniteOperation(TProposeRequest& request const TString owner = record.HasOwner() ? record.GetOwner() : BUILTIN_ACL_ROOT; + bool prevProposeUndoSafe = true; + + Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose + for (const auto& transaction : transactions) { auto parts = operation->ConstructParts(transaction, context); if (parts.size() > 1) { - // les't allow altering impl index tables as part of consistent operation + // allow altering impl index tables as part of consistent operation context.IsAllowedPrivateTables = true; } @@ -198,18 +199,21 @@ THolder TSchemeShard::IgniteOperation(TProposeRequest& request << ", with reason: " << response->Record.GetReason() << ", tx message: " << SecureDebugString(record)); - context.OnComplete = {}; // recreate - context.DbChanges = {}; + AbortOperationPropose(txId, context); - for (auto& toAbort : operation->Parts) { - toAbort->AbortPropose(context); - } + return std::move(response); + } - context.MemChanges.UnDo(context.SS); - context.OnComplete.ApplyOnExecute(context.SS, context.GetTxc(), context.Ctx); - Operations.erase(txId); + // Check suboperations for undo safety. Log first unsafe suboperation in the schema transaction. + if (prevProposeUndoSafe && !context.IsUndoChangesSafe()) { + prevProposeUndoSafe = false; - return std::move(response); + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Operation part proposed ok, but propose itself is undo unsafe" + << ", suboperation type: " << NKikimrSchemeOp::EOperationType_Name(part->GetTransaction().GetOperationType()) + << ", opId: " << part->GetOperationId() + << ", at schemeshard: " << selfId + ); } } } @@ -217,6 +221,55 @@ THolder TSchemeShard::IgniteOperation(TProposeRequest& request return std::move(response); } +void TSchemeShard::AbortOperationPropose(const TTxId txId, TOperationContext& context) { + Y_ABORT_UNLESS(Operations.contains(txId)); + TOperation::TPtr operation = Operations.at(txId); + + // Drop operation side effects, undo memory changes + // (Local db changes were already applied) + context.OnComplete = {}; + context.DbChanges = {}; + + for (auto& i : operation->Parts) { + i->AbortPropose(context); + } + + context.MemChanges.UnDo(context.SS); + + // And remove aborted operation from existence + Operations.erase(txId); +} + +void AbortOperation(TOperationContext& context, const TTxId txId, const TString& reason) { + LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxOperationPropose Execute" + << ", txId: " << txId + << ", operation is rejected and all changes reverted" + << ", " << reason + << ", at schemeshard: " << context.SS->SelfTabletId() + ); + + context.GetTxc().DB.RollbackChanges(); + context.SS->AbortOperationPropose(txId, context); +} + +bool IsCommitRedoSizeOverLimit(TString* reason, TOperationContext& context) { + // MaxCommitRedoMB is the ICB control shared with NTabletFlatExecutor::TExecutor. + // We subtract from MaxCommitRedoMB additional 1MB for anything extra + // that executor/tablet may (or may not) add under the hood + const ui64 limitBytes = (context.SS->MaxCommitRedoMB - 1) << 20; // MB to bytes + const ui64 commitRedoBytes = context.GetTxc().DB.GetCommitRedoBytes(); + if (commitRedoBytes >= limitBytes) { + *reason = TStringBuilder() + << "local tx commit redo size generated by IgniteOperation() is more than allowed limit: " + << "commit redo size " << commitRedoBytes + << ", limit " << limitBytes + << ", excess " << (commitRedoBytes - limitBytes) + ; + return true; + } + return false; +} + struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransactionBase { using TBase = NTabletFlatExecutor::TTransactionBase; @@ -236,6 +289,7 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override { TTabletId selfId = Self->SelfTabletId(); + auto txId = TTxId(Request->Get()->Record.GetTxId()); LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxOperationPropose Execute" @@ -246,7 +300,6 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti auto [userToken, tokenParseError] = ParseUserToken(Request->Get()->Record.GetUserToken()); if (tokenParseError) { - auto txId = Request->Get()->Record.GetTxId(); Response = MakeHolder(NKikimrScheme::StatusInvalidParameter, ui64(txId), ui64(selfId), "Failed to parse user token"); return true; } @@ -258,10 +311,52 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti TStorageChanges dbChanges; TOperationContext context{Self, txc, ctx, OnComplete, memChanges, dbChanges, std::move(userToken)}; + //NOTE: Successful IgniteOperation will leave created operation in Self->Operations and accumulated changes in the context. + // Unsuccessful IgniteOperation will leave no operation and context will also be clean. Response = Self->IgniteOperation(*Request->Get(), context); - OnComplete.ApplyOnExecute(Self, txc, ctx); + //NOTE: Successfully created operation also must be checked for the size of this local tx. + // + // Limitation on a commit redo size of local transactions is imposed at the tablet executor level + // (See ydb/core/tablet_flat/flat_executor.cpp, NTabletFlatExecutor::TExecutor::ExecuteTransaction()). + // And a tablet violating that limit is considered broken and will be stopped unconditionally and immediately. + // + // So even if operation was ignited successfully, it's local tx size still must be checked + // as a precaution measure to avoid infinite loop of schemeshard restarting, attempting to propose + // persisted operation again, hitting commit redo size limit and restarting again. + // + // On unsuccessful check, local tx should be rolled back, operation should be rejected and + // all accumulated changes dropped or reverted. + // + + // Actually build commit redo (dbChanges could be empty) dbChanges.Apply(Self, txc, ctx); + + if (Self->Operations.contains(txId)) { + Y_ABORT_UNLESS(Response->IsDone() || Response->IsAccepted() || Response->IsConditionalAccepted()); + + // Check local tx commit redo size + TString reason; + if (IsCommitRedoSizeOverLimit(&reason, context)) { + Response = MakeHolder(NKikimrScheme::StatusSchemeError, ui64(txId), ui64(selfId), reason); + + AbortOperation(context, txId, reason); + + if (!context.IsUndoChangesSafe()) { + LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxOperationPropose Execute" + << ", opId: " << txId + << ", operation should be rejected and all changes be reverted" + << ", but context.IsUndoChangesSafe is false, which means some direct writes have been done" + << ", message: " << SecureDebugString(Request->Get()->Record) + << ", at schemeshard: " << context.SS->SelfTabletId() + ); + } + } + } + + // Apply accumulated changes (changes could be empty) + OnComplete.ApplyOnExecute(Self, txc, ctx); + return true; } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 765401edd1f7..73c90f5df789 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -4463,6 +4463,8 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { appData->Icb->RegisterSharedControl(DisablePublicationsOfDropping, "SchemeShard_DisablePublicationsOfDropping"); appData->Icb->RegisterSharedControl(FillAllocatePQ, "SchemeShard_FillAllocatePQ"); + appData->Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB"); + AllowDataColumnForIndexTable = appData->FeatureFlags.GetEnableDataColumnForIndexTable(); appData->Icb->RegisterSharedControl(AllowDataColumnForIndexTable, "SchemeShard_AllowDataColumnForIndexTable"); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 7a9eb9586dd6..02b10af80233 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -187,6 +187,9 @@ class TSchemeShard TControlWrapper DisablePublicationsOfDropping; TControlWrapper FillAllocatePQ; + // Shared with NTabletFlatExecutor::TExecutor + TControlWrapper MaxCommitRedoMB; + TSplitSettings SplitSettings; struct TTenantInitState { @@ -370,6 +373,8 @@ class TSchemeShard NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})}; THolder IgniteOperation(TProposeRequest& request, TOperationContext& context); + void AbortOperationPropose(const TTxId txId, TOperationContext& context); + THolder MakeDataShardProposal(const TPathId& pathId, const TOperationId& opId, const TString& body, const TActorContext& ctx) const; @@ -419,7 +424,7 @@ class TSchemeShard return MakeLocalId(NextLocalPathId); } - TPathId AllocatePathId () { + TPathId AllocatePathId() { TPathId next = PeekNextPathId(); ++NextLocalPathId; return next; diff --git a/ydb/core/tx/schemeshard/ut_base/ut_commit_redo_limit.cpp b/ydb/core/tx/schemeshard/ut_base/ut_commit_redo_limit.cpp new file mode 100644 index 000000000000..260b4765d771 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_base/ut_commit_redo_limit.cpp @@ -0,0 +1,113 @@ +#include + +using namespace NKikimr; +using namespace NSchemeShard; +using namespace NSchemeShardUT_Private; + +Y_UNIT_TEST_SUITE(TSchemeShardCheckProposeSize) { + + //TODO: can't check all operations as many of them do not implement + // TSubOperation::AbortPropose() properly and will abort. + + Y_UNIT_TEST(CopyTable) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + // Take control over MaxCommitRedoMB ICB setting. + // Drop down its min-value limit to be able to set it as low as test needs. + TControlWrapper MaxCommitRedoMB; + { + runtime.GetAppData().Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB"); + MaxCommitRedoMB.Reset(200, 1, 4096); + } + + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "table" + Columns { Name: "key" Type: "Uint64"} + Columns { Name: "value" Type: "Utf8"} + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + // 1. Set MaxCommitRedoMB to 1 and try to create table. + // + // (Check at the operation's Propose tests commit redo size against (MaxCommitRedoMB - 1) + // to give 1MB leeway to executer/tablet inner stuff to may be do "something extra". + // So MaxCommitRedoMB = 1 means effective 0 for the size of operation's commit.) + { + MaxCommitRedoMB = 1; + AsyncCopyTable(runtime, ++txId, "/MyRoot", "table-copy", "/MyRoot/table"); + TestModificationResults(runtime, txId, + {{NKikimrScheme::StatusSchemeError, "local tx commit redo size generated by IgniteOperation() is more than allowed limit"}} + ); + env.TestWaitNotification(runtime, txId); + } + + // 2. Set MaxCommitRedoMB back to high value and try again. + { + MaxCommitRedoMB = 200; + AsyncCopyTable(runtime, ++txId, "/MyRoot", "table-copy", "/MyRoot/table"); + env.TestWaitNotification(runtime, txId); + } + } + + Y_UNIT_TEST(CopyTables) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + // Take control over MaxCommitRedoMB ICB setting. + // Drop down its min-value limit to be able to set it as low as test needs. + TControlWrapper MaxCommitRedoMB; + { + runtime.GetAppData().Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB"); + MaxCommitRedoMB.Reset(200, 1, 4096); + } + + const ui64 tables = 100; + const ui64 shardsPerTable = 1; + + ui64 txId = 100; + + for (ui64 i : xrange(tables)) { + TestCreateTable(runtime, ++txId, "/MyRoot", Sprintf( + R"( + Name: "table-%lu" + Columns { Name: "key" Type: "Uint64"} + Columns { Name: "value" Type: "Utf8"} + KeyColumnNames: ["key"] + UniformPartitionsCount: %lu + )", + i, + shardsPerTable + )); + env.TestWaitNotification(runtime, txId); + } + + auto testCopyTables = [](auto& runtime, ui64 txId, ui64 tables) { + TVector schemeTxs; + for (ui64 i : xrange(tables)) { + schemeTxs.push_back(CopyTableRequest(txId, "/MyRoot", Sprintf("table-%lu-copy", i), Sprintf("/MyRoot/table-%lu", i))); + } + AsyncSend(runtime, TTestTxConfig::SchemeShard, CombineSchemeTransactions(schemeTxs)); + }; + + // 1. Set MaxCommitRedoMB to 1 and try to copy tables. + { + MaxCommitRedoMB = 1; + testCopyTables(runtime, ++txId, tables); + TestModificationResults(runtime, txId, + {{NKikimrScheme::StatusSchemeError, "local tx commit redo size generated by IgniteOperation() is more than allowed limit"}} + ); + } + + // 2. Set MaxCommitRedoMB back to high value and try again. + { + MaxCommitRedoMB = 200; + testCopyTables(runtime, ++txId, tables); + TestModificationResults(runtime, txId, {{NKikimrScheme::StatusAccepted}}); + } + } + +} diff --git a/ydb/core/tx/schemeshard/ut_base/ya.make b/ydb/core/tx/schemeshard/ut_base/ya.make index 4699c198de39..dd0047f98d6b 100644 --- a/ydb/core/tx/schemeshard/ut_base/ya.make +++ b/ydb/core/tx/schemeshard/ut_base/ya.make @@ -27,6 +27,7 @@ SRCS( ut_base.cpp ut_info_types.cpp ut_table_pg_types.cpp + ut_commit_redo_limit.cpp ) END()