Skip to content

Commit

Permalink
schemeshard: reject operations with too big local tx commit (ydb-plat…
Browse files Browse the repository at this point in the history
…form#6760)

Add commit redo size check for successfully ignited operations as a precaution measure to avoid infinite loop of schemeshard hitting local tx commit redo size limit, restarting, attempting to propose persisted operation again, hitting commit redo size limit again, restarting and so on.

This could happen with inherently massive operations such as copy-tables used as a starting step of database export/backup.

Coping large number of tables with huge number of partitions can result in so large TTxOperationPropose local transaction that its size would hit the limit imposed by the tablet executor. Tablet violating that limit is considered broken and will be immediately stopped.
See ydb/core/tablet_flat/flat_executor.cpp, NTabletFlatExecutor::TExecutor::ExecuteTransaction().

KIKIMR-21751
  • Loading branch information
ijon authored Jul 18, 2024
1 parent ab98fb1 commit ebee36a
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 16 deletions.
125 changes: 110 additions & 15 deletions ydb/core/tx/schemeshard/schemeshard__operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,12 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
}

TOperation::TPtr operation = new TOperation(txId);
Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose

for (const auto& transaction : record.GetTransaction()) {
auto quotaResult = operation->ConsumeQuota(transaction, context);
if (quotaResult.Status != NKikimrScheme::StatusSuccess) {
response.Reset(new TProposeResponse(quotaResult.Status, ui64(txId), ui64(selfId)));
response->SetError(quotaResult.Status, quotaResult.Reason);
Operations.erase(txId);
return std::move(response);
}
}
Expand All @@ -131,7 +129,6 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
if (splitResult.Status != NKikimrScheme::StatusSuccess) {
response.Reset(new TProposeResponse(splitResult.Status, ui64(txId), ui64(selfId)));
response->SetError(splitResult.Status, splitResult.Reason);
Operations.erase(txId);
return std::move(response);
}

Expand All @@ -140,11 +137,15 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request

const TString owner = record.HasOwner() ? record.GetOwner() : BUILTIN_ACL_ROOT;

bool prevProposeUndoSafe = true;

Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose

for (const auto& transaction : transactions) {
auto parts = operation->ConstructParts(transaction, context);

if (parts.size() > 1) {
// les't allow altering impl index tables as part of consistent operation
// allow altering impl index tables as part of consistent operation
context.IsAllowedPrivateTables = true;
}

Expand Down Expand Up @@ -198,25 +199,77 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
<< ", with reason: " << response->Record.GetReason()
<< ", tx message: " << SecureDebugString(record));

context.OnComplete = {}; // recreate
context.DbChanges = {};
AbortOperationPropose(txId, context);

for (auto& toAbort : operation->Parts) {
toAbort->AbortPropose(context);
}
return std::move(response);
}

context.MemChanges.UnDo(context.SS);
context.OnComplete.ApplyOnExecute(context.SS, context.GetTxc(), context.Ctx);
Operations.erase(txId);
// Check suboperations for undo safety. Log first unsafe suboperation in the schema transaction.
if (prevProposeUndoSafe && !context.IsUndoChangesSafe()) {
prevProposeUndoSafe = false;

return std::move(response);
LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Operation part proposed ok, but propose itself is undo unsafe"
<< ", suboperation type: " << NKikimrSchemeOp::EOperationType_Name(part->GetTransaction().GetOperationType())
<< ", opId: " << part->GetOperationId()
<< ", at schemeshard: " << selfId
);
}
}
}

return std::move(response);
}

void TSchemeShard::AbortOperationPropose(const TTxId txId, TOperationContext& context) {
Y_ABORT_UNLESS(Operations.contains(txId));
TOperation::TPtr operation = Operations.at(txId);

// Drop operation side effects, undo memory changes
// (Local db changes were already applied)
context.OnComplete = {};
context.DbChanges = {};

for (auto& i : operation->Parts) {
i->AbortPropose(context);
}

context.MemChanges.UnDo(context.SS);

// And remove aborted operation from existence
Operations.erase(txId);
}

void AbortOperation(TOperationContext& context, const TTxId txId, const TString& reason) {
LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxOperationPropose Execute"
<< ", txId: " << txId
<< ", operation is rejected and all changes reverted"
<< ", " << reason
<< ", at schemeshard: " << context.SS->SelfTabletId()
);

context.GetTxc().DB.RollbackChanges();
context.SS->AbortOperationPropose(txId, context);
}

bool IsCommitRedoSizeOverLimit(TString* reason, TOperationContext& context) {
// MaxCommitRedoMB is the ICB control shared with NTabletFlatExecutor::TExecutor.
// We subtract from MaxCommitRedoMB additional 1MB for anything extra
// that executor/tablet may (or may not) add under the hood
const ui64 limitBytes = (context.SS->MaxCommitRedoMB - 1) << 20; // MB to bytes
const ui64 commitRedoBytes = context.GetTxc().DB.GetCommitRedoBytes();
if (commitRedoBytes >= limitBytes) {
*reason = TStringBuilder()
<< "local tx commit redo size generated by IgniteOperation() is more than allowed limit: "
<< "commit redo size " << commitRedoBytes
<< ", limit " << limitBytes
<< ", excess " << (commitRedoBytes - limitBytes)
;
return true;
}
return false;
}

struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
using TBase = NTabletFlatExecutor::TTransactionBase<TSchemeShard>;

Expand All @@ -236,6 +289,7 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti

bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
TTabletId selfId = Self->SelfTabletId();
auto txId = TTxId(Request->Get()->Record.GetTxId());

LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TTxOperationPropose Execute"
Expand All @@ -246,7 +300,6 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti

auto [userToken, tokenParseError] = ParseUserToken(Request->Get()->Record.GetUserToken());
if (tokenParseError) {
auto txId = Request->Get()->Record.GetTxId();
Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusInvalidParameter, ui64(txId), ui64(selfId), "Failed to parse user token");
return true;
}
Expand All @@ -258,10 +311,52 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti
TStorageChanges dbChanges;
TOperationContext context{Self, txc, ctx, OnComplete, memChanges, dbChanges, std::move(userToken)};

//NOTE: Successful IgniteOperation will leave created operation in Self->Operations and accumulated changes in the context.
// Unsuccessful IgniteOperation will leave no operation and context will also be clean.
Response = Self->IgniteOperation(*Request->Get(), context);

OnComplete.ApplyOnExecute(Self, txc, ctx);
//NOTE: Successfully created operation also must be checked for the size of this local tx.
//
// Limitation on a commit redo size of local transactions is imposed at the tablet executor level
// (See ydb/core/tablet_flat/flat_executor.cpp, NTabletFlatExecutor::TExecutor::ExecuteTransaction()).
// And a tablet violating that limit is considered broken and will be stopped unconditionally and immediately.
//
// So even if operation was ignited successfully, it's local tx size still must be checked
// as a precaution measure to avoid infinite loop of schemeshard restarting, attempting to propose
// persisted operation again, hitting commit redo size limit and restarting again.
//
// On unsuccessful check, local tx should be rolled back, operation should be rejected and
// all accumulated changes dropped or reverted.
//

// Actually build commit redo (dbChanges could be empty)
dbChanges.Apply(Self, txc, ctx);

if (Self->Operations.contains(txId)) {
Y_ABORT_UNLESS(Response->IsDone() || Response->IsAccepted() || Response->IsConditionalAccepted());

// Check local tx commit redo size
TString reason;
if (IsCommitRedoSizeOverLimit(&reason, context)) {
Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusSchemeError, ui64(txId), ui64(selfId), reason);

AbortOperation(context, txId, reason);

if (!context.IsUndoChangesSafe()) {
LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxOperationPropose Execute"
<< ", opId: " << txId
<< ", operation should be rejected and all changes be reverted"
<< ", but context.IsUndoChangesSafe is false, which means some direct writes have been done"
<< ", message: " << SecureDebugString(Request->Get()->Record)
<< ", at schemeshard: " << context.SS->SelfTabletId()
);
}
}
}

// Apply accumulated changes (changes could be empty)
OnComplete.ApplyOnExecute(Self, txc, ctx);

return true;
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4463,6 +4463,8 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {
appData->Icb->RegisterSharedControl(DisablePublicationsOfDropping, "SchemeShard_DisablePublicationsOfDropping");
appData->Icb->RegisterSharedControl(FillAllocatePQ, "SchemeShard_FillAllocatePQ");

appData->Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB");

AllowDataColumnForIndexTable = appData->FeatureFlags.GetEnableDataColumnForIndexTable();
appData->Icb->RegisterSharedControl(AllowDataColumnForIndexTable, "SchemeShard_AllowDataColumnForIndexTable");

Expand Down
7 changes: 6 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ class TSchemeShard
TControlWrapper DisablePublicationsOfDropping;
TControlWrapper FillAllocatePQ;

// Shared with NTabletFlatExecutor::TExecutor
TControlWrapper MaxCommitRedoMB;

TSplitSettings SplitSettings;

struct TTenantInitState {
Expand Down Expand Up @@ -370,6 +373,8 @@ class TSchemeShard
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})};

THolder<TProposeResponse> IgniteOperation(TProposeRequest& request, TOperationContext& context);
void AbortOperationPropose(const TTxId txId, TOperationContext& context);

THolder<TEvDataShard::TEvProposeTransaction> MakeDataShardProposal(const TPathId& pathId, const TOperationId& opId,
const TString& body, const TActorContext& ctx) const;

Expand Down Expand Up @@ -419,7 +424,7 @@ class TSchemeShard
return MakeLocalId(NextLocalPathId);
}

TPathId AllocatePathId () {
TPathId AllocatePathId() {
TPathId next = PeekNextPathId();
++NextLocalPathId;
return next;
Expand Down
113 changes: 113 additions & 0 deletions ydb/core/tx/schemeshard/ut_base/ut_commit_redo_limit.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>

using namespace NKikimr;
using namespace NSchemeShard;
using namespace NSchemeShardUT_Private;

Y_UNIT_TEST_SUITE(TSchemeShardCheckProposeSize) {

//TODO: can't check all operations as many of them do not implement
// TSubOperation::AbortPropose() properly and will abort.

Y_UNIT_TEST(CopyTable) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);

// Take control over MaxCommitRedoMB ICB setting.
// Drop down its min-value limit to be able to set it as low as test needs.
TControlWrapper MaxCommitRedoMB;
{
runtime.GetAppData().Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB");
MaxCommitRedoMB.Reset(200, 1, 4096);
}

ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "table"
Columns { Name: "key" Type: "Uint64"}
Columns { Name: "value" Type: "Utf8"}
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

// 1. Set MaxCommitRedoMB to 1 and try to create table.
//
// (Check at the operation's Propose tests commit redo size against (MaxCommitRedoMB - 1)
// to give 1MB leeway to executer/tablet inner stuff to may be do "something extra".
// So MaxCommitRedoMB = 1 means effective 0 for the size of operation's commit.)
{
MaxCommitRedoMB = 1;
AsyncCopyTable(runtime, ++txId, "/MyRoot", "table-copy", "/MyRoot/table");
TestModificationResults(runtime, txId,
{{NKikimrScheme::StatusSchemeError, "local tx commit redo size generated by IgniteOperation() is more than allowed limit"}}
);
env.TestWaitNotification(runtime, txId);
}

// 2. Set MaxCommitRedoMB back to high value and try again.
{
MaxCommitRedoMB = 200;
AsyncCopyTable(runtime, ++txId, "/MyRoot", "table-copy", "/MyRoot/table");
env.TestWaitNotification(runtime, txId);
}
}

Y_UNIT_TEST(CopyTables) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);

// Take control over MaxCommitRedoMB ICB setting.
// Drop down its min-value limit to be able to set it as low as test needs.
TControlWrapper MaxCommitRedoMB;
{
runtime.GetAppData().Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB");
MaxCommitRedoMB.Reset(200, 1, 4096);
}

const ui64 tables = 100;
const ui64 shardsPerTable = 1;

ui64 txId = 100;

for (ui64 i : xrange(tables)) {
TestCreateTable(runtime, ++txId, "/MyRoot", Sprintf(
R"(
Name: "table-%lu"
Columns { Name: "key" Type: "Uint64"}
Columns { Name: "value" Type: "Utf8"}
KeyColumnNames: ["key"]
UniformPartitionsCount: %lu
)",
i,
shardsPerTable
));
env.TestWaitNotification(runtime, txId);
}

auto testCopyTables = [](auto& runtime, ui64 txId, ui64 tables) {
TVector<TEvTx*> schemeTxs;
for (ui64 i : xrange(tables)) {
schemeTxs.push_back(CopyTableRequest(txId, "/MyRoot", Sprintf("table-%lu-copy", i), Sprintf("/MyRoot/table-%lu", i)));
}
AsyncSend(runtime, TTestTxConfig::SchemeShard, CombineSchemeTransactions(schemeTxs));
};

// 1. Set MaxCommitRedoMB to 1 and try to copy tables.
{
MaxCommitRedoMB = 1;
testCopyTables(runtime, ++txId, tables);
TestModificationResults(runtime, txId,
{{NKikimrScheme::StatusSchemeError, "local tx commit redo size generated by IgniteOperation() is more than allowed limit"}}
);
}

// 2. Set MaxCommitRedoMB back to high value and try again.
{
MaxCommitRedoMB = 200;
testCopyTables(runtime, ++txId, tables);
TestModificationResults(runtime, txId, {{NKikimrScheme::StatusAccepted}});
}
}

}
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_base/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ SRCS(
ut_base.cpp
ut_info_types.cpp
ut_table_pg_types.cpp
ut_commit_redo_limit.cpp
)

END()

0 comments on commit ebee36a

Please sign in to comment.