Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schemeshard: reject operations with too big local tx commit #6760

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 110 additions & 15 deletions ydb/core/tx/schemeshard/schemeshard__operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,12 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
}

TOperation::TPtr operation = new TOperation(txId);
Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose

for (const auto& transaction : record.GetTransaction()) {
auto quotaResult = operation->ConsumeQuota(transaction, context);
if (quotaResult.Status != NKikimrScheme::StatusSuccess) {
response.Reset(new TProposeResponse(quotaResult.Status, ui64(txId), ui64(selfId)));
response->SetError(quotaResult.Status, quotaResult.Reason);
Operations.erase(txId);
return std::move(response);
}
}
Expand All @@ -131,7 +129,6 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
if (splitResult.Status != NKikimrScheme::StatusSuccess) {
response.Reset(new TProposeResponse(splitResult.Status, ui64(txId), ui64(selfId)));
response->SetError(splitResult.Status, splitResult.Reason);
Operations.erase(txId);
return std::move(response);
}

Expand All @@ -140,11 +137,15 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request

const TString owner = record.HasOwner() ? record.GetOwner() : BUILTIN_ACL_ROOT;

bool prevProposeUndoSafe = true;

Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose

for (const auto& transaction : transactions) {
auto parts = operation->ConstructParts(transaction, context);

if (parts.size() > 1) {
// les't allow altering impl index tables as part of consistent operation
// allow altering impl index tables as part of consistent operation
context.IsAllowedPrivateTables = true;
}

Expand Down Expand Up @@ -198,25 +199,77 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
<< ", with reason: " << response->Record.GetReason()
<< ", tx message: " << SecureDebugString(record));

context.OnComplete = {}; // recreate
context.DbChanges = {};
AbortOperationPropose(txId, context);

for (auto& toAbort : operation->Parts) {
toAbort->AbortPropose(context);
}
return std::move(response);
}

context.MemChanges.UnDo(context.SS);
context.OnComplete.ApplyOnExecute(context.SS, context.GetTxc(), context.Ctx);
Operations.erase(txId);
// Check suboperations for undo safety. Log first unsafe suboperation in the schema transaction.
if (prevProposeUndoSafe && !context.IsUndoChangesSafe()) {
prevProposeUndoSafe = false;

return std::move(response);
LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Operation part proposed ok, but propose itself is undo unsafe"
<< ", suboperation type: " << NKikimrSchemeOp::EOperationType_Name(part->GetTransaction().GetOperationType())
<< ", opId: " << part->GetOperationId()
<< ", at schemeshard: " << selfId
);
}
}
}

return std::move(response);
}

void TSchemeShard::AbortOperationPropose(const TTxId txId, TOperationContext& context) {
Y_ABORT_UNLESS(Operations.contains(txId));
TOperation::TPtr operation = Operations.at(txId);

// Drop operation side effects, undo memory changes
// (Local db changes were already applied)
context.OnComplete = {};
context.DbChanges = {};

for (auto& i : operation->Parts) {
i->AbortPropose(context);
}

context.MemChanges.UnDo(context.SS);

// And remove aborted operation from existence
Operations.erase(txId);
}

void AbortOperation(TOperationContext& context, const TTxId txId, const TString& reason) {
LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxOperationPropose Execute"
<< ", txId: " << txId
<< ", operation is rejected and all changes reverted"
<< ", " << reason
<< ", at schemeshard: " << context.SS->SelfTabletId()
);

context.GetTxc().DB.RollbackChanges();
context.SS->AbortOperationPropose(txId, context);
}

bool IsCommitRedoSizeOverLimit(TString* reason, TOperationContext& context) {
// MaxCommitRedoMB is the ICB control shared with NTabletFlatExecutor::TExecutor.
// We subtract from MaxCommitRedoMB additional 1MB for anything extra
// that executor/tablet may (or may not) add under the hood
const ui64 limitBytes = (context.SS->MaxCommitRedoMB - 1) << 20; // MB to bytes
const ui64 commitRedoBytes = context.GetTxc().DB.GetCommitRedoBytes();
if (commitRedoBytes >= limitBytes) {
*reason = TStringBuilder()
<< "local tx commit redo size generated by IgniteOperation() is more than allowed limit: "
<< "commit redo size " << commitRedoBytes
<< ", limit " << limitBytes
<< ", excess " << (commitRedoBytes - limitBytes)
;
return true;
}
return false;
}

struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
using TBase = NTabletFlatExecutor::TTransactionBase<TSchemeShard>;

Expand All @@ -236,6 +289,7 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti

bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
TTabletId selfId = Self->SelfTabletId();
auto txId = TTxId(Request->Get()->Record.GetTxId());

LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TTxOperationPropose Execute"
Expand All @@ -246,7 +300,6 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti

auto [userToken, tokenParseError] = ParseUserToken(Request->Get()->Record.GetUserToken());
if (tokenParseError) {
auto txId = Request->Get()->Record.GetTxId();
Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusInvalidParameter, ui64(txId), ui64(selfId), "Failed to parse user token");
return true;
}
Expand All @@ -258,10 +311,52 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti
TStorageChanges dbChanges;
TOperationContext context{Self, txc, ctx, OnComplete, memChanges, dbChanges, std::move(userToken)};

//NOTE: Successful IgniteOperation will leave created operation in Self->Operations and accumulated changes in the context.
// Unsuccessful IgniteOperation will leave no operation and context will also be clean.
Response = Self->IgniteOperation(*Request->Get(), context);

OnComplete.ApplyOnExecute(Self, txc, ctx);
//NOTE: Successfully created operation also must be checked for the size of this local tx.
//
// Limitation on a commit redo size of local transactions is imposed at the tablet executor level
// (See ydb/core/tablet_flat/flat_executor.cpp, NTabletFlatExecutor::TExecutor::ExecuteTransaction()).
// And a tablet violating that limit is considered broken and will be stopped unconditionally and immediately.
//
// So even if operation was ignited successfully, it's local tx size still must be checked
// as a precaution measure to avoid infinite loop of schemeshard restarting, attempting to propose
// persisted operation again, hitting commit redo size limit and restarting again.
//
// On unsuccessful check, local tx should be rolled back, operation should be rejected and
// all accumulated changes dropped or reverted.
//

// Actually build commit redo (dbChanges could be empty)
dbChanges.Apply(Self, txc, ctx);

if (Self->Operations.contains(txId)) {
Y_ABORT_UNLESS(Response->IsDone() || Response->IsAccepted() || Response->IsConditionalAccepted());

// Check local tx commit redo size
TString reason;
if (IsCommitRedoSizeOverLimit(&reason, context)) {
Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusSchemeError, ui64(txId), ui64(selfId), reason);

AbortOperation(context, txId, reason);

if (!context.IsUndoChangesSafe()) {
LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxOperationPropose Execute"
<< ", opId: " << txId
<< ", operation should be rejected and all changes be reverted"
<< ", but context.IsUndoChangesSafe is false, which means some direct writes have been done"
<< ", message: " << SecureDebugString(Request->Get()->Record)
<< ", at schemeshard: " << context.SS->SelfTabletId()
);
}
}
}

// Apply accumulated changes (changes could be empty)
OnComplete.ApplyOnExecute(Self, txc, ctx);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Смущает, что в ApplyOnExecute передаётся txc. Я вроде бы перепроверил, и там в базу пишется только в некоторых случаях, и по идее в dependencies мы много не должны сохранить, в будущем надеюсь мы сделаем чтобы у нас всё это оптимистично в базу добавлялось и разночтений не возникало...


return true;
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4462,6 +4462,8 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {
appData->Icb->RegisterSharedControl(DisablePublicationsOfDropping, "SchemeShard_DisablePublicationsOfDropping");
appData->Icb->RegisterSharedControl(FillAllocatePQ, "SchemeShard_FillAllocatePQ");

appData->Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB");

AllowDataColumnForIndexTable = appData->FeatureFlags.GetEnableDataColumnForIndexTable();
appData->Icb->RegisterSharedControl(AllowDataColumnForIndexTable, "SchemeShard_AllowDataColumnForIndexTable");

Expand Down
7 changes: 6 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ class TSchemeShard
TControlWrapper DisablePublicationsOfDropping;
TControlWrapper FillAllocatePQ;

// Shared with NTabletFlatExecutor::TExecutor
TControlWrapper MaxCommitRedoMB;

TSplitSettings SplitSettings;

struct TTenantInitState {
Expand Down Expand Up @@ -370,6 +373,8 @@ class TSchemeShard
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})};

THolder<TProposeResponse> IgniteOperation(TProposeRequest& request, TOperationContext& context);
void AbortOperationPropose(const TTxId txId, TOperationContext& context);

THolder<TEvDataShard::TEvProposeTransaction> MakeDataShardProposal(const TPathId& pathId, const TOperationId& opId,
const TString& body, const TActorContext& ctx) const;

Expand Down Expand Up @@ -419,7 +424,7 @@ class TSchemeShard
return MakeLocalId(NextLocalPathId);
}

TPathId AllocatePathId () {
TPathId AllocatePathId() {
TPathId next = PeekNextPathId();
++NextLocalPathId;
return next;
Expand Down
113 changes: 113 additions & 0 deletions ydb/core/tx/schemeshard/ut_base/ut_commit_redo_limit.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>

using namespace NKikimr;
using namespace NSchemeShard;
using namespace NSchemeShardUT_Private;

Y_UNIT_TEST_SUITE(TSchemeShardCheckProposeSize) {

//TODO: can't check all operations as many of them do not implement
// TSubOperation::AbortPropose() properly and will abort.

Y_UNIT_TEST(CopyTable) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);

// Take control over MaxCommitRedoMB ICB setting.
// Drop down its min-value limit to be able to set it as low as test needs.
TControlWrapper MaxCommitRedoMB;
{
runtime.GetAppData().Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB");
MaxCommitRedoMB.Reset(200, 1, 4096);
}

ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "table"
Columns { Name: "key" Type: "Uint64"}
Columns { Name: "value" Type: "Utf8"}
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

// 1. Set MaxCommitRedoMB to 1 and try to create table.
//
// (Check at the operation's Propose tests commit redo size against (MaxCommitRedoMB - 1)
// to give 1MB leeway to executer/tablet inner stuff to may be do "something extra".
// So MaxCommitRedoMB = 1 means effective 0 for the size of operation's commit.)
{
MaxCommitRedoMB = 1;
AsyncCopyTable(runtime, ++txId, "/MyRoot", "table-copy", "/MyRoot/table");
TestModificationResults(runtime, txId,
{{NKikimrScheme::StatusSchemeError, "local tx commit redo size generated by IgniteOperation() is more than allowed limit"}}
);
env.TestWaitNotification(runtime, txId);
}

// 2. Set MaxCommitRedoMB back to high value and try again.
{
MaxCommitRedoMB = 200;
AsyncCopyTable(runtime, ++txId, "/MyRoot", "table-copy", "/MyRoot/table");
env.TestWaitNotification(runtime, txId);
}
}

Y_UNIT_TEST(CopyTables) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);

// Take control over MaxCommitRedoMB ICB setting.
// Drop down its min-value limit to be able to set it as low as test needs.
TControlWrapper MaxCommitRedoMB;
{
runtime.GetAppData().Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB");
MaxCommitRedoMB.Reset(200, 1, 4096);
}

const ui64 tables = 100;
const ui64 shardsPerTable = 1;

ui64 txId = 100;

for (ui64 i : xrange(tables)) {
TestCreateTable(runtime, ++txId, "/MyRoot", Sprintf(
R"(
Name: "table-%lu"
Columns { Name: "key" Type: "Uint64"}
Columns { Name: "value" Type: "Utf8"}
KeyColumnNames: ["key"]
UniformPartitionsCount: %lu
)",
i,
shardsPerTable
));
env.TestWaitNotification(runtime, txId);
}

auto testCopyTables = [](auto& runtime, ui64 txId, ui64 tables) {
TVector<TEvTx*> schemeTxs;
for (ui64 i : xrange(tables)) {
schemeTxs.push_back(CopyTableRequest(txId, "/MyRoot", Sprintf("table-%lu-copy", i), Sprintf("/MyRoot/table-%lu", i)));
}
AsyncSend(runtime, TTestTxConfig::SchemeShard, CombineSchemeTransactions(schemeTxs));
};

// 1. Set MaxCommitRedoMB to 1 and try to copy tables.
{
MaxCommitRedoMB = 1;
testCopyTables(runtime, ++txId, tables);
TestModificationResults(runtime, txId,
{{NKikimrScheme::StatusSchemeError, "local tx commit redo size generated by IgniteOperation() is more than allowed limit"}}
);
}

// 2. Set MaxCommitRedoMB back to high value and try again.
{
MaxCommitRedoMB = 200;
testCopyTables(runtime, ++txId, tables);
TestModificationResults(runtime, txId, {{NKikimrScheme::StatusAccepted}});
}
}

}
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_base/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ SRCS(
ut_base.cpp
ut_info_types.cpp
ut_table_pg_types.cpp
ut_commit_redo_limit.cpp
)

END()
Loading