From 0aedc7a4f3addd3701cc49d2882adc49b4b7dc3b Mon Sep 17 00:00:00 2001 From: azevaykin <145343289+azevaykin@users.noreply.github.com> Date: Thu, 21 Dec 2023 19:29:07 +0300 Subject: [PATCH] CheckWriteUnit & FinishProposeWriteUnit (#631) --- ydb/core/tx/data_events/events.h | 10 + ydb/core/tx/datashard/check_write_unit.cpp | 220 ++-------------- ydb/core/tx/datashard/datashard.h | 10 +- ydb/core/tx/datashard/datashard_impl.h | 1 + ydb/core/tx/datashard/datashard_ut_write.cpp | 39 ++- .../datashard/datashard_write_operation.cpp | 14 +- ydb/core/tx/datashard/execution_unit.cpp | 2 + ydb/core/tx/datashard/execution_unit_ctors.h | 3 +- ydb/core/tx/datashard/execution_unit_kind.h | 1 + ydb/core/tx/datashard/finish_propose_unit.cpp | 16 +- .../datashard/finish_propose_write_unit.cpp | 239 ++++++++++++++++++ ydb/core/tx/datashard/operation.h | 2 +- .../ut_common/datashard_ut_common.cpp | 32 +-- .../datashard/ut_common/datashard_ut_common.h | 2 +- ydb/core/tx/datashard/write_unit.cpp | 43 ++-- ydb/core/tx/datashard/ya.make | 1 + 16 files changed, 367 insertions(+), 268 deletions(-) create mode 100644 ydb/core/tx/datashard/finish_propose_write_unit.cpp diff --git a/ydb/core/tx/data_events/events.h b/ydb/core/tx/data_events/events.h index 442f9a19f8cb..e18a33ffba0d 100644 --- a/ydb/core/tx/data_events/events.h +++ b/ydb/core/tx/data_events/events.h @@ -106,6 +106,16 @@ struct TDataEvents { return result; } + TString GetError() const { + return TStringBuilder() << "Status: " << Record.GetStatus() << " Issues: " << Record.GetIssues(); + } + + NKikimrDataEvents::TEvWriteResult::EStatus GetStatus() const { return Record.GetStatus(); } + + bool IsPrepared() const { return GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED; } + bool IsComplete() const { return GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED; } + bool IsError() const { return !IsPrepared() && !IsComplete(); } + void SetOrbit(NLWTrace::TOrbit&& orbit) { Orbit = std::move(orbit); } NLWTrace::TOrbit& GetOrbit() { return Orbit; } NLWTrace::TOrbit&& MoveOrbit() { return std::move(Orbit); } diff --git a/ydb/core/tx/datashard/check_write_unit.cpp b/ydb/core/tx/datashard/check_write_unit.cpp index 32c9bd7fc1ce..8a45a5ef9ee8 100644 --- a/ydb/core/tx/datashard/check_write_unit.cpp +++ b/ydb/core/tx/datashard/check_write_unit.cpp @@ -1,7 +1,7 @@ #include "datashard_impl.h" #include "datashard_pipeline.h" -#include "execution_unit_ctors.h" +#include "ydb/core/tx/datashard/datashard_write_operation.h" #include namespace NKikimr { @@ -24,7 +24,7 @@ class TCheckWriteUnit: public TExecutionUnit { TCheckWriteUnit::TCheckWriteUnit(TDataShard &dataShard, TPipeline &pipeline) - : TExecutionUnit(EExecutionUnitKind::CheckDataTx, false, dataShard, pipeline) + : TExecutionUnit(EExecutionUnitKind::CheckWrite, false, dataShard, pipeline) { } @@ -45,126 +45,37 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, Y_ABORT_UNLESS(!op->IsAborted()); if (CheckRejectDataTx(op, ctx)) { - op->Abort(EExecutionUnitKind::FinishPropose); + op->Abort(EExecutionUnitKind::FinishProposeWrite); return EExecutionStatus::Executed; } - //TODO: remove this return - return EExecutionStatus::Executed; - - TActiveTransaction *tx = dynamic_cast(op.Get()); - Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); - auto dataTx = tx->GetDataTx(); - Y_ABORT_UNLESS(dataTx); - Y_ABORT_UNLESS(dataTx->Ready() || dataTx->RequirePrepare()); - - if (dataTx->Ready()) { - DataShard.IncCounter(COUNTER_MINIKQL_PROGRAM_SIZE, dataTx->ProgramSize()); - } else { - Y_ABORT_UNLESS(dataTx->RequirePrepare()); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, - "Require prepare Tx " << op->GetTxId() << " at " << DataShard.TabletID() - << ": " << dataTx->GetErrors()); - } + TWriteOperation* writeOp = dynamic_cast(op.Get()); + Y_VERIFY_S(writeOp, "cannot cast operation of kind " << op->GetKind()); + auto writeTx = writeOp->WriteTx(); + Y_ABORT_UNLESS(writeTx); + Y_ABORT_UNLESS(writeTx->Ready() || writeTx->RequirePrepare()); // Check if we are out of space and tx wants to update user // or system table. if (DataShard.IsAnyChannelYellowStop() - && (dataTx->HasWrites() || !op->IsImmediate())) { + && (writeTx->HasWrites() || !op->IsImmediate())) { TString err = TStringBuilder() << "Cannot perform transaction: out of disk space at tablet " << DataShard.TabletID() << " txId " << op->GetTxId(); DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE); - BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err); - op->Abort(EExecutionUnitKind::FinishPropose); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err); + op->Abort(EExecutionUnitKind::FinishProposeWrite); - LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err); + LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err); return EExecutionStatus::Executed; } - if (tx->IsMvccSnapshotRead()) { - auto snapshot = tx->GetMvccSnapshot(); - if (DataShard.IsFollower()) { - TString err = TStringBuilder() - << "Operation " << *op << " cannot read from snapshot " << snapshot - << " using data tx on a follower " << DataShard.TabletID(); - - BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST) - ->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err); - op->Abort(EExecutionUnitKind::FinishPropose); - - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); - - return EExecutionStatus::Executed; - } else if (!DataShard.IsMvccEnabled()) { - TString err = TStringBuilder() - << "Operation " << *op << " reads from snapshot " << snapshot - << " with MVCC feature disabled at " << DataShard.TabletID(); - - BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST) - ->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err); - op->Abort(EExecutionUnitKind::FinishPropose); - - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); - - return EExecutionStatus::Executed; - } else if (snapshot < DataShard.GetSnapshotManager().GetLowWatermark()) { - TString err = TStringBuilder() - << "Operation " << *op << " reads from stale snapshot " << snapshot - << " at " << DataShard.TabletID(); - - BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST) - ->AddError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST, err); - op->Abort(EExecutionUnitKind::FinishPropose); - - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); - - ; - } - } - - TEngineBay::TSizes txReads; - - if (op->IsDataTx()) { - bool hasTotalKeysSizeLimit = !!dataTx->PerShardKeysSizeLimitBytes(); - txReads = dataTx->CalcReadSizes(hasTotalKeysSizeLimit); - - if (txReads.ReadSize > DataShard.GetTxReadSizeLimit()) { - TString err = TStringBuilder() - << "Transaction read size " << txReads.ReadSize << " exceeds limit " - << DataShard.GetTxReadSizeLimit() << " at tablet " << DataShard.TabletID() - << " txId " << op->GetTxId(); - - BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST) - ->AddError(NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED, err); - op->Abort(EExecutionUnitKind::FinishPropose); - - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); - - return EExecutionStatus::Executed; - } - - if (hasTotalKeysSizeLimit - && txReads.TotalKeysSize > *dataTx->PerShardKeysSizeLimitBytes()) { - TString err = TStringBuilder() - << "Transaction total keys size " << txReads.TotalKeysSize - << " exceeds limit " << *dataTx->PerShardKeysSizeLimitBytes() - << " at tablet " << DataShard.TabletID() << " txId " << op->GetTxId(); - - BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST) - ->AddError(NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED, err); - op->Abort(EExecutionUnitKind::FinishPropose); - - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); - - return EExecutionStatus::Executed; - } - - for (const auto& key : dataTx->TxInfo().Keys) { + { + for (const auto& key : writeTx->TxInfo().Keys) { if (key.IsWrite && DataShard.IsUserTable(key.Key->TableId)) { ui64 keySize = 0; for (const auto& cell : key.Key->Range.From) { @@ -176,9 +87,8 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, << " bytes which exceeds limit " << NLimits::MaxWriteKeySize << " bytes at " << DataShard.TabletID(); - BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST) - ->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err); - op->Abort(EExecutionUnitKind::FinishPropose); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err); + op->Abort(EExecutionUnitKind::FinishProposeWrite); LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); @@ -193,8 +103,8 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, << "Transaction write column value of " << col.ImmediateUpdateSize << " bytes is larger than the allowed threshold"; - BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::EXEC_ERROR)->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err); - op->Abort(EExecutionUnitKind::FinishPropose); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err); + op->Abort(EExecutionUnitKind::FinishProposeWrite); LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); @@ -216,10 +126,10 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE); - BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err); - op->Abort(EExecutionUnitKind::FinishPropose); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err); + op->Abort(EExecutionUnitKind::FinishProposeWrite); - LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err); + LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err); return EExecutionStatus::Executed; } @@ -229,96 +139,20 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, } } - if (op->IsReadTable()) { - const auto& record = dataTx->GetReadTableTransaction(); - const auto& userTables = DataShard.GetUserTables(); - - TMaybe schemaChangedError; - if (auto it = userTables.find(record.GetTableId().GetTableId()); it != userTables.end()) { - const auto& tableInfo = *it->second; - for (const auto& columnRecord : record.GetColumns()) { - if (auto* columnInfo = tableInfo.Columns.FindPtr(columnRecord.GetId())) { - // TODO: column types don't change when bound by id, but we may want to check anyway - } else { - schemaChangedError = TStringBuilder() << "ReadTable cannot find column " - << columnRecord.GetName() << " (" << columnRecord.GetId() << ")"; - break; - } - } - // TODO: validate key ranges? - } else { - schemaChangedError = TStringBuilder() << "ReadTable cannot find table " - << record.GetTableId().GetOwnerId() << ":" << record.GetTableId().GetTableId(); - } - - if (schemaChangedError) { - BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::ERROR) - ->AddError(NKikimrTxDataShard::TError::SCHEME_CHANGED, *schemaChangedError); - op->Abort(EExecutionUnitKind::FinishPropose); - return EExecutionStatus::Executed; - } - - if (record.HasSnapshotStep() && record.HasSnapshotTxId()) { - if (!op->IsImmediate()) { - BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError( - NKikimrTxDataShard::TError::BAD_ARGUMENT, - "ReadTable from snapshot must be an immediate transaction"); - op->Abort(EExecutionUnitKind::FinishPropose); - return EExecutionStatus::Executed; - } - - const TSnapshotKey key( - record.GetTableId().GetOwnerId(), - record.GetTableId().GetTableId(), - record.GetSnapshotStep(), - record.GetSnapshotTxId()); - - if (!DataShard.GetSnapshotManager().AcquireReference(key)) { - // TODO: try upgrading to mvcc snapshot when available - BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError( - NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST, - TStringBuilder() - << "Shard " << DataShard.TabletID() - << " has no snapshot " << key); - op->Abort(EExecutionUnitKind::FinishPropose); - return EExecutionStatus::Executed; - } - - op->SetAcquiredSnapshotKey(key); - op->SetUsingSnapshotFlag(); - } - } - if (!op->IsImmediate()) { if (!Pipeline.AssignPlanInterval(op)) { - TString err = TStringBuilder() - << "Can't propose tx " << op->GetTxId() << " at blocked shard " - << DataShard.TabletID(); - BuildResult(op)->AddError(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED, err); - op->Abort(EExecutionUnitKind::FinishPropose); + TString err = TStringBuilder() << "Can't propose tx " << op->GetTxId() << " at blocked shard " << DataShard.TabletID(); + + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err); + op->Abort(EExecutionUnitKind::FinishProposeWrite); LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err); return EExecutionStatus::Executed; } - auto &res = BuildResult(op); - res->SetPrepared(op->GetMinStep(), op->GetMaxStep(), op->GetReceivedAt()); - - if (op->IsDataTx()) { - res->Record.SetReadSize(txReads.ReadSize); - res->Record.SetReplySize(txReads.ReplySize); - - for (const auto& rs : txReads.OutReadSetSize) { - auto entry = res->Record.AddOutgoingReadSetInfo(); - entry->SetShardId(rs.first); - entry->SetSize(rs.second); - } - } - - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, - "Prepared " << op->GetKind() << " transaction txId " << op->GetTxId() - << " at tablet " << DataShard.TabletID()); + writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(writeOp->WriteTx()->TabletId(), op->GetTxId(), {op->GetMinStep(), op->GetMaxStep(), {}})); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Prepared " << *op << " at " << DataShard.TabletID()); } return EExecutionStatus::Executed; diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index f4fd851519eb..6c0df19ac099 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -641,12 +641,11 @@ struct TEvDataShard { TString GetError() const { if (Record.ErrorSize() > 0) { TString result; + TStringOutput out(result); for (ui32 i = 0; i < Record.ErrorSize(); ++i) { - if (Record.GetError(i).HasReason()) { - result += Record.GetError(i).GetReason() + "|"; - } else { - result += "no reason|"; - } + out << Record.GetError(i).GetKind() << " (" + << (Record.GetError(i).HasReason() ? Record.GetError(i).GetReason() : "no reason") + << ") |"; } return result; } else { @@ -665,7 +664,6 @@ struct TEvDataShard { error->SetKey(keyBuffer.data(), keyBuffer.size()); } } - private: bool ForceOnline = false; bool ForceDirty = false; diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index b49d871c8f28..a3384c1fce49 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -2018,6 +2018,7 @@ class TDataShard enum ELogThrottlerType { CheckDataTxUnit_Execute = 0, + CheckWriteUnit_Execute = 0, TxProposeTransactionBase_Execute, FinishProposeUnit_CompleteRequest, FinishProposeUnit_UpdateCounters, diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp index 508dbc2062d0..9e95182b4f24 100644 --- a/ydb/core/tx/datashard/datashard_ut_write.cpp +++ b/ydb/core/tx/datashard/datashard_ut_write.cpp @@ -1,5 +1,6 @@ #include "datashard_active_transaction.h" #include "datashard_ut_read_table.h" +#include #include namespace NKikimr { @@ -10,7 +11,7 @@ using namespace Tests; using namespace NDataShardReadTableTest; Y_UNIT_TEST_SUITE(DataShardWrite) { - std::tuple TestCreateServer() { + std::tuple TestCreateServer() { TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root").SetUseRealThreads(false); @@ -25,18 +26,18 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { InitRoot(server, sender); - return {server, sender}; + return {runtime, server, sender}; } Y_UNIT_TEST(WriteImmediateOnShard) { - auto [server, sender] = TestCreateServer(); + auto [runtime, server, sender] = TestCreateServer(); auto opts = TShardedTableOptions().Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}}); auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts); const ui32 rowCount = 3; ui64 txId = 100; - Write(server, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); + Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All(); @@ -46,7 +47,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { } Y_UNIT_TEST(WriteImmediateOnShardManyColumns) { - auto [server, sender] = TestCreateServer(); + auto [runtime, server, sender] = TestCreateServer(); auto opts = TShardedTableOptions().Columns({{"key64", "Uint64", true, false}, {"key32", "Uint32", true, false}, {"value64", "Uint64", false, false}, {"value32", "Uint32", false, false}, {"valueUtf8", "Utf8", false, false}}); @@ -54,7 +55,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { const ui32 rowCount = 3; ui64 txId = 100; - Write(server, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); + Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All(); @@ -63,15 +64,37 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { "key64 = 10, key32 = 11, value64 = 12, value32 = 13, valueUtf8 = String_14\n"); } + Y_UNIT_TEST(WriteImmediateHugeKey) { + auto [runtime, server, sender] = TestCreateServer(); + + auto opts = TShardedTableOptions().Columns({{"key", "Utf8", true, false}}); + auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts); + + TString hugeStringValue(NLimits::MaxWriteKeySize + 1, 'X'); + TSerializedCellMatrix matrix({TCell(hugeStringValue.c_str(), hugeStringValue.size())}, 1, 1); + + auto evWrite = std::make_unique(100, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); + ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper(*evWrite).AddDataToPayload(matrix.ReleaseBuffer()); + evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, {1}, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC); + + runtime.SendToPipe(shards[0], sender, evWrite.release(), 0, GetPipeConfigWithRetries()); + auto ev = runtime.GrabEdgeEventRethrow(sender); + + const auto& record = ev->Get()->Record; + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST); + UNIT_ASSERT_VALUES_EQUAL(record.GetIssues().size(), 1); + UNIT_ASSERT(record.GetIssues(0).message().Contains("Operation [0:100] writes key of 1049601 bytes which exceeds limit 1049600 bytes")); + } + Y_UNIT_TEST(WriteOnShard) { - auto [server, sender] = TestCreateServer(); + auto [runtime, server, sender] = TestCreateServer(); TShardedTableOptions opts; auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts); const ui32 rowCount = 3; ui64 txId = 100; - Write(server, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE); + Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE); auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All(); diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp index f98d915754eb..0b0d9a5b8804 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.cpp +++ b/ydb/core/tx/datashard/datashard_write_operation.cpp @@ -43,7 +43,7 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta; - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TxId: " << StepTxId_.TxId << ", shard " << TabletId() << ", meta: " << Record().ShortDebugString()); + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Parsing write transaction for " << StepTxId_ << " at " << TabletId() << ", record: " << Record().ShortDebugString()); if (!ParseRecord(self)) return; @@ -464,10 +464,10 @@ class TFinalizeWriteTxPlanUnit: public TExecutionUnit { Y_UNUSED(txc); Y_UNUSED(ctx); - TWriteOperation* tx = dynamic_cast(op.Get()); - Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + TWriteOperation* writeOp = dynamic_cast(op.Get()); + Y_VERIFY_S(writeOp, "cannot cast operation of kind " << op->GetKind()); - tx->FinalizeWriteTxPlan(); + writeOp->FinalizeWriteTxPlan(); return EExecutionStatus::Executed; } @@ -513,13 +513,13 @@ void TWriteOperation::BuildExecutionPlan(bool loaded) plan.push_back(EExecutionUnitKind::CheckWrite); plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies); plan.push_back(EExecutionUnitKind::ExecuteWrite); - plan.push_back(EExecutionUnitKind::FinishPropose); + plan.push_back(EExecutionUnitKind::FinishProposeWrite); plan.push_back(EExecutionUnitKind::CompletedOperations); } /* else if (HasVolatilePrepareFlag()) { plan.push_back(EExecutionUnitKind::StoreDataTx); // note: stores in memory - plan.push_back(EExecutionUnitKind::FinishPropose); + plan.push_back(EExecutionUnitKind::FinishProposeWrite); Y_ABORT_UNLESS(!GetStep()); plan.push_back(EExecutionUnitKind::WaitForPlan); plan.push_back(EExecutionUnitKind::PlanQueue); @@ -532,7 +532,7 @@ void TWriteOperation::BuildExecutionPlan(bool loaded) if (!loaded) { plan.push_back(EExecutionUnitKind::CheckWrite); plan.push_back(EExecutionUnitKind::StoreDataTx); - plan.push_back(EExecutionUnitKind::FinishPropose); + plan.push_back(EExecutionUnitKind::FinishProposeWrite); } if (!GetStep()) plan.push_back(EExecutionUnitKind::WaitForPlan); diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp index 51ce8cf8ecc8..2d76a6ed186e 100644 --- a/ydb/core/tx/datashard/execution_unit.cpp +++ b/ydb/core/tx/datashard/execution_unit.cpp @@ -36,6 +36,8 @@ THolder CreateExecutionUnit(EExecutionUnitKind kind, return CreateBuildAndWaitDependenciesUnit(dataShard, pipeline); case EExecutionUnitKind::FinishPropose: return CreateFinishProposeUnit(dataShard, pipeline); + case EExecutionUnitKind::FinishProposeWrite: + return CreateFinishProposeWriteUnit(dataShard, pipeline); case EExecutionUnitKind::CompletedOperations: return CreateCompletedOperationsUnit(dataShard, pipeline); case EExecutionUnitKind::WaitForPlan: diff --git a/ydb/core/tx/datashard/execution_unit_ctors.h b/ydb/core/tx/datashard/execution_unit_ctors.h index 1a30d6f4dc9c..1689889a4b5b 100644 --- a/ydb/core/tx/datashard/execution_unit_ctors.h +++ b/ydb/core/tx/datashard/execution_unit_ctors.h @@ -18,7 +18,8 @@ THolder CreateStoreDistributedEraseTxUnit(TDataShard &dataShard, THolder CreateStoreCommitWritesTxUnit(TDataShard &dataShard, TPipeline &pipeline); THolder CreateBuildAndWaitDependenciesUnit(TDataShard &dataShard, TPipeline &pipeline); THolder CreateFinishProposeUnit(TDataShard &dataShard, TPipeline &pipeline); -THolder CreateCompletedOperationsUnit(TDataShard &dataShard, TPipeline &pipeline); +THolder CreateFinishProposeWriteUnit(TDataShard& dataShard, TPipeline& pipeline); +THolder CreateCompletedOperationsUnit(TDataShard& dataShard, TPipeline& pipeline); THolder CreateWaitForPlanUnit(TDataShard &dataShard, TPipeline &pipeline); THolder CreatePlanQueueUnit(TDataShard &dataShard, TPipeline &pipeline); THolder CreateLoadTxDetailsUnit(TDataShard &dataShard, TPipeline &pipeline); diff --git a/ydb/core/tx/datashard/execution_unit_kind.h b/ydb/core/tx/datashard/execution_unit_kind.h index a62a8718aa14..135dac076c8d 100644 --- a/ydb/core/tx/datashard/execution_unit_kind.h +++ b/ydb/core/tx/datashard/execution_unit_kind.h @@ -19,6 +19,7 @@ enum class EExecutionUnitKind: ui32 { StoreCommitWritesTx, BuildAndWaitDependencies, FinishPropose, + FinishProposeWrite, CompletedOperations, WaitForPlan, PlanQueue, diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp index 1d5218698225..2360bdc40040 100644 --- a/ydb/core/tx/datashard/finish_propose_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_unit.cpp @@ -29,7 +29,6 @@ class TFinishProposeUnit : public TExecutionUnit { void AddDiagnosticsResult(TOutputOpData::TResultPtr &res); void UpdateCounters(TOperation::TPtr op, const TActorContext &ctx); - TString PrintErrors(const NKikimrTxDataShard::TEvProposeTransactionResult &rec); }; TFinishProposeUnit::TFinishProposeUnit(TDataShard &dataShard, @@ -236,25 +235,14 @@ void TFinishProposeUnit::UpdateCounters(TOperation::TPtr op, DataShard.IncCounter(COUNTER_PREPARE_ERROR); LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::FinishProposeUnit_UpdateCounters), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, "Prepare transaction failed. txid " << op->GetTxId() - << " at tablet " << DataShard.TabletID() << " errors: " - << PrintErrors(res->Record)); + << " at tablet " << DataShard.TabletID() << " errors: " << res->GetError()); } else { DataShard.IncCounter(COUNTER_PREPARE_IMMEDIATE); } } } -TString TFinishProposeUnit::PrintErrors(const NKikimrTxDataShard::TEvProposeTransactionResult &rec) -{ - TString s; - TStringOutput str(s); - str << "[ "; - for (size_t i = 0; i < rec.ErrorSize(); ++i) { - str << rec.GetError(i).GetKind() << "(" << rec.GetError(i).GetReason() << ") "; - } - str << "]"; - return s; -} + THolder CreateFinishProposeUnit(TDataShard &dataShard, TPipeline &pipeline) diff --git a/ydb/core/tx/datashard/finish_propose_write_unit.cpp b/ydb/core/tx/datashard/finish_propose_write_unit.cpp new file mode 100644 index 000000000000..e6d6cef31559 --- /dev/null +++ b/ydb/core/tx/datashard/finish_propose_write_unit.cpp @@ -0,0 +1,239 @@ +#include "datashard_failpoints.h" +#include "datashard_impl.h" +#include "datashard_pipeline.h" +#include "datashard_write_operation.h" +#include "execution_unit_ctors.h" +#include "probes.h" + +LWTRACE_USING(DATASHARD_PROVIDER) + +namespace NKikimr { +namespace NDataShard { + +class TFinishProposeWriteUnit : public TExecutionUnit { +public: + TFinishProposeWriteUnit(TDataShard &dataShard, TPipeline &pipeline); + ~TFinishProposeWriteUnit() override; + + bool IsReadyToExecute(TOperation::TPtr op) const override; + EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext &txc, const TActorContext &ctx) override; + void Complete(TOperation::TPtr op, const TActorContext &ctx) override; + +private: + TDataShard::TPromotePostExecuteEdges PromoteImmediatePostExecuteEdges(const TOperation* op, TTransactionContext& txc); + void CompleteRequest(TOperation::TPtr op, const TActorContext &ctx); + void AddDiagnosticsResult(NEvents::TDataEvents::TEvWriteResult& res); + void UpdateCounters(const TWriteOperation* writeOp, const TActorContext& ctx); + + static TWriteOperation* CastWriteOperation(TOperation::TPtr op); +}; + +TFinishProposeWriteUnit::TFinishProposeWriteUnit(TDataShard &dataShard, + TPipeline &pipeline) + : TExecutionUnit(EExecutionUnitKind::FinishProposeWrite, false, dataShard, pipeline) +{ +} + +TFinishProposeWriteUnit::~TFinishProposeWriteUnit() +{ +} + +TWriteOperation* TFinishProposeWriteUnit::CastWriteOperation(TOperation::TPtr op) +{ + TWriteOperation* writeOp = dynamic_cast(op.Get()); + Y_ABORT_UNLESS(writeOp); + return writeOp; +} + +bool TFinishProposeWriteUnit::IsReadyToExecute(TOperation::TPtr) const +{ + return true; +} + +TDataShard::TPromotePostExecuteEdges TFinishProposeWriteUnit::PromoteImmediatePostExecuteEdges( + const TOperation* op, + TTransactionContext& txc) +{ + if (op->IsMvccSnapshotRead()) { + if (op->IsMvccSnapshotRepeatable()) { + return DataShard.PromoteImmediatePostExecuteEdges(op->GetMvccSnapshot(), TDataShard::EPromotePostExecuteEdges::RepeatableRead, txc); + } else { + return DataShard.PromoteImmediatePostExecuteEdges(op->GetMvccSnapshot(), TDataShard::EPromotePostExecuteEdges::ReadOnly, txc); + } + } else if (op->MvccReadWriteVersion) { + if (op->IsReadOnly()) { + return DataShard.PromoteImmediatePostExecuteEdges(*op->MvccReadWriteVersion, TDataShard::EPromotePostExecuteEdges::ReadOnly, txc); + } else { + return DataShard.PromoteImmediatePostExecuteEdges(*op->MvccReadWriteVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc); + } + } else { + return { }; + } +} + +EExecutionStatus TFinishProposeWriteUnit::Execute(TOperation::TPtr op, + TTransactionContext &txc, + const TActorContext &ctx) +{ + TWriteOperation* writeOp = CastWriteOperation(op); + if (writeOp->WriteResult()) + UpdateCounters(writeOp, ctx); + + bool hadWrites = false; + + // When mvcc is enabled we perform marking after transaction is executed + if (op->IsAborted()) { + // Make sure we confirm aborts with a commit + op->SetWaitCompletionFlag(true); + } else if (DataShard.IsFollower()) { + // It doesn't matter whether we wait or not + } else if (DataShard.IsMvccEnabled() && op->IsImmediate()) { + auto res = PromoteImmediatePostExecuteEdges(op.Get(), txc); + + if (res.HadWrites) { + hadWrites = true; + res.WaitCompletion = true; + } + + if (res.WaitCompletion) { + op->SetWaitCompletionFlag(true); + } + } + + if (op->HasVolatilePrepareFlag() && !op->HasResultSentFlag() && !op->IsDirty()) { + op->SetFinishProposeTs(DataShard.ConfirmReadOnlyLease()); + } + + if (!op->HasResultSentFlag() && (op->IsDirty() || op->HasVolatilePrepareFlag() || !Pipeline.WaitCompletion(op))) + CompleteRequest(op, ctx); + + if (!DataShard.IsFollower()) + DataShard.PlanCleanup(ctx); + + // Release acquired snapshot for immediate and aborted operations + // N.B. currently only immediate operations may acquire snapshots, but in + // the future it may be possible for read/write operations to read and write + // at different points in time. Those snapshots would need to stay acquired + // until the operation is complete. + auto status = EExecutionStatus::DelayComplete; + if (hadWrites) { + status = EExecutionStatus::DelayCompleteNoMoreRestarts; + } + if (op->HasAcquiredSnapshotKey() && (op->IsImmediate() || op->IsAborted())) { + if (DataShard.GetSnapshotManager().ReleaseReference(op->GetAcquiredSnapshotKey(), txc.DB, ctx.Now())) { + status = EExecutionStatus::DelayCompleteNoMoreRestarts; + } + + op->ResetAcquiredSnapshotKey(); + } + + return status; +} + +void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext &ctx) +{ + TWriteOperation* writeOp = CastWriteOperation(op); + + if (!op->HasResultSentFlag()) { + DataShard.IncCounter(COUNTER_PREPARE_COMPLETE); + + if (writeOp->WriteResult()) + CompleteRequest(op, ctx); + } + + Pipeline.ForgetUnproposedTx(op->GetTxId()); + if (op->IsImmediate()) { + Pipeline.RemoveCommittingOp(op); + Pipeline.RemoveActiveOp(op); + + DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords())); + DataShard.EmitHeartbeats(ctx); + } + + DataShard.SendRegistrationRequestTimeCast(ctx); +} + +void TFinishProposeWriteUnit::CompleteRequest(TOperation::TPtr op, const TActorContext &ctx) +{ + TWriteOperation* writeOp = CastWriteOperation(op); + auto res = writeOp->WriteResult(); + + TDuration duration = TAppData::TimeProvider->Now() - op->GetReceivedAt(); + + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, + "Propose transaction complete txid " << op->GetTxId() << " at tablet " + << DataShard.TabletID() << " send to client, propose latency: " + << duration.MilliSeconds() << " ms, status: " << res->GetStatus()); + + TString errors = res->GetError(); + if (errors.size()) { + LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::FinishProposeUnit_CompleteRequest), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, + "Errors while proposing transaction txid " << op->GetTxId() + << " at tablet " << DataShard.TabletID() << " status: " + << res->GetStatus() << " errors: " << errors); + } + + if (res->IsPrepared()) { + DataShard.IncCounter(COUNTER_PREPARE_SUCCESS_COMPLETE_LATENCY, duration); + } else { + DataShard.CheckSplitCanStart(ctx); + DataShard.CheckMvccStateChangeCanStart(ctx); + } + + if (op->HasNeedDiagnosticsFlag()) + AddDiagnosticsResult(*res); + + if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) { + if (res->IsPrepared()) { + LWTRACK(ProposeTransactionSendPrepared, op->Orbit); + } else { + LWTRACK(ProposeTransactionSendResult, op->Orbit); + res->SetOrbit(std::move(op->Orbit)); + } + + ctx.Send(writeOp->GetEv()->Sender, res.release(), 0, writeOp->GetEv()->Cookie); + } +} + +void TFinishProposeWriteUnit::AddDiagnosticsResult(NEvents::TDataEvents::TEvWriteResult& res) +{ + auto &tabletInfo = *res.Record.MutableTabletInfo(); + ActorIdToProto(DataShard.SelfId(), tabletInfo.MutableActorId()); + + tabletInfo.SetTabletId(DataShard.TabletID()); + tabletInfo.SetGeneration(DataShard.Generation()); + tabletInfo.SetStep(DataShard.GetExecutorStep()); + tabletInfo.SetIsFollower(DataShard.IsFollower()); +} + +void TFinishProposeWriteUnit::UpdateCounters(const TWriteOperation* writeOp, const TActorContext& ctx) +{ + const auto& res = writeOp->WriteResult(); + auto execLatency = TAppData::TimeProvider->Now() - writeOp->GetReceivedAt(); + DataShard.IncCounter(COUNTER_PREPARE_EXEC_LATENCY, execLatency); + if (res->IsPrepared()) { + DataShard.IncCounter(COUNTER_PREPARE_SUCCESS); + } else { + if (writeOp->IsDirty()) + DataShard.IncCounter(COUNTER_PREPARE_DIRTY); + + if (res->IsError()) { + DataShard.IncCounter(COUNTER_PREPARE_ERROR); + LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::FinishProposeUnit_UpdateCounters), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, + "Prepare transaction failed. txid " << writeOp->GetTxId() + << " at tablet " << DataShard.TabletID() << " errors: " << res->GetError()); + } else { + DataShard.IncCounter(COUNTER_PREPARE_IMMEDIATE); + } + } +} + + + +THolder CreateFinishProposeWriteUnit(TDataShard &dataShard, TPipeline &pipeline) +{ + return THolder(new TFinishProposeWriteUnit(dataShard, pipeline)); +} + +} // namespace NDataShard +} // namespace NKikimr diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 6545ab282891..16d5d867a2ef 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -398,7 +398,7 @@ class TBasicOpInfo { ResetFlag(TTxFlags::AcquiredSnapshotReference); } - bool IsMvccSnapshotRead() { return !MvccSnapshot.IsMax(); } + bool IsMvccSnapshotRead() const { return !MvccSnapshot.IsMax(); } const TRowVersion& GetMvccSnapshot() const { return MvccSnapshot; } bool IsMvccSnapshotRepeatable() const { return MvccSnapshotRepeatable; } void SetMvccSnapshot(const TRowVersion& snapshot, bool isRepeatable = true) { diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp index 739032c9ab45..190a197a3ef6 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp @@ -1832,7 +1832,7 @@ std::unique_ptr MakeWriteRequest(ui64 txId, NKik } TSerializedCellMatrix matrix(cells, rowCount, columns.size()); - TString blobData = matrix.GetBuffer(); + TString blobData = matrix.ReleaseBuffer(); UNIT_ASSERT(blobData.size() < 8_MB); @@ -1843,28 +1843,28 @@ std::unique_ptr MakeWriteRequest(ui64 txId, NKik return evWrite; } -NKikimrDataEvents::TEvWriteResult Write(Tests::TServer::TPtr server, ui64 shardId, ui64 tableId, const TVector& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode) +NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId, ui64 tableId, const TVector& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus) { - auto& runtime = *server->GetRuntime(); auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount); runtime.SendToPipe(shardId, sender, request.release(), 0, GetPipeConfigWithRetries()); auto ev = runtime.GrabEdgeEventRethrow(sender); auto status = ev->Get()->Record.GetStatus(); - NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus; - switch (txMode) { - case NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE: - expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED; - break; - case NKikimrDataEvents::TEvWrite::MODE_PREPARE: - case NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE: - expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED; - break; - default: - expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED; - UNIT_ASSERT_C(false, "Unexpected txMode: " << txMode); - break; + if (expectedStatus == NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED) { + switch (txMode) { + case NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE: + expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED; + break; + case NKikimrDataEvents::TEvWrite::MODE_PREPARE: + case NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE: + expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED; + break; + default: + expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED; + UNIT_ASSERT_C(false, "Unexpected txMode: " << txMode); + break; + } } UNIT_ASSERT_C(status == expectedStatus, "Status: " << ev->Get()->Record.GetStatus() << " Issues: " << ev->Get()->Record.GetIssues()); diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h index 64a7ef814eaa..613b7a86e480 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h @@ -710,7 +710,7 @@ void ExecSQL(Tests::TServer::TPtr server, bool dml = true, Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS); -NKikimrDataEvents::TEvWriteResult Write(Tests::TServer::TPtr server, ui64 shardId, ui64 tableId, const TVector& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode); +NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId, ui64 tableId, const TVector& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED); void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector>& types, const TVector& keys, const TVector& values); diff --git a/ydb/core/tx/datashard/write_unit.cpp b/ydb/core/tx/datashard/write_unit.cpp index 2e09da95fff1..f088a52420ce 100644 --- a/ydb/core/tx/datashard/write_unit.cpp +++ b/ydb/core/tx/datashard/write_unit.cpp @@ -35,14 +35,14 @@ class TWriteUnit : public TExecutionUnit { return !op->HasRuntimeConflicts(); } - void DoExecute(TDataShard* self, TWriteOperation* tx, TTransactionContext& txc, const TActorContext& ctx) { - const TValidatedWriteTx::TPtr& writeTx = tx->WriteTx(); + void DoExecute(TDataShard* self, TWriteOperation* writeOp, TTransactionContext& txc, const TActorContext& ctx) { + const TValidatedWriteTx::TPtr& writeTx = writeOp->WriteTx(); const ui64 tableId = writeTx->TableId().PathId.LocalPathId; const TTableId fullTableId(self->GetPathOwnerId(), tableId); const ui64 localTableId = self->GetLocalTableId(fullTableId); if (localTableId == 0) { - tx->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId); return; } const ui64 shadowTableId = self->GetShadowTableId(fullTableId); @@ -52,7 +52,7 @@ class TWriteUnit : public TExecutionUnit { Y_ABORT_UNLESS(TableInfo_.ShadowTid == shadowTableId); const ui32 writeTableId = localTableId; - auto [readVersion, writeVersion] = self->GetReadWriteVersions(tx); + auto [readVersion, writeVersion] = self->GetReadWriteVersions(writeOp); TDataShardUserDb userDb(*self, txc.DB, readVersion); TDataShardChangeGroupProvider groupProvider(*self, txc.DB); @@ -73,7 +73,7 @@ class TWriteUnit : public TExecutionUnit { const auto& cellType = TableInfo_.KeyColumnTypes[keyColIdx]; const TCell& cell = matrix.GetCell(rowIdx, keyColIdx); if (cellType.GetTypeId() == NScheme::NTypeIds::Uint8 && !cell.IsNull() && cell.AsValue() > 127) { - tx->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "Keys with Uint8 column values >127 are currently prohibited"); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "Keys with Uint8 column values >127 are currently prohibited"); return; } @@ -83,7 +83,7 @@ class TWriteUnit : public TExecutionUnit { } if (keyBytes > NLimits::MaxWriteKeySize) { - tx->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row key size of " << keyBytes << " bytes is larger than the allowed threshold " << NLimits::MaxWriteKeySize); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row key size of " << keyBytes << " bytes is larger than the allowed threshold " << NLimits::MaxWriteKeySize); return; } @@ -92,7 +92,7 @@ class TWriteUnit : public TExecutionUnit { ui32 columnTag = writeTx->RecordOperation().GetColumnIds(valueColIdx); const TCell& cell = matrix.GetCell(rowIdx, valueColIdx); if (cell.Size() > NLimits::MaxWriteValueSize) { - tx->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize); return; } @@ -111,16 +111,16 @@ class TWriteUnit : public TExecutionUnit { TableInfo_.Stats.UpdateTime = TAppData::TimeProvider->Now(); - tx->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildCommited(writeTx->TabletId(), tx->GetTxId())); + writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildCommited(writeTx->TabletId(), writeOp->GetTxId())); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << tx->GetTxId() << " at " << self->TabletID() << " write operation is executed"); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Executed write operation for " << *writeOp << " at " << writeOp->WriteTx()->TabletId()); } EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override { - TWriteOperation* tx = dynamic_cast(op.Get()); - Y_ABORT_UNLESS(tx != nullptr); + TWriteOperation* writeOp = dynamic_cast(op.Get()); + Y_ABORT_UNLESS(writeOp != nullptr); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << op->GetTxId() << " at " << tx->WriteTx()->TabletId() << " is executing write operation"); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Executing write operation for " << *op << " at " << writeOp->WriteTx()->TabletId()); if (op->Result() || op->HasResultSentFlag() || op->IsImmediate() && CheckRejectDataTx(op, ctx)) { return EExecutionStatus::Executed; @@ -136,7 +136,7 @@ class TWriteUnit : public TExecutionUnit { } else { //TODO: Prepared - tx->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(tx->WriteTx()->TabletId(), op->GetTxId(), {0, 0, {}})); + writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(writeOp->WriteTx()->TabletId(), op->GetTxId(), {0, 0, {}})); return EExecutionStatus::DelayCompleteNoMoreRestarts; } @@ -144,12 +144,12 @@ class TWriteUnit : public TExecutionUnit { TSetupSysLocks guardLocks(op, DataShard, &locksDb); try { - DoExecute(&DataShard, tx, txc, ctx); + DoExecute(&DataShard, writeOp, txc, ctx); } catch (const TNeedGlobalTxId&) { Y_VERIFY_S(op->GetGlobalTxId() == 0, - "Unexpected TNeedGlobalTxId exception for direct operation with TxId# " << op->GetGlobalTxId()); + "Unexpected TNeedGlobalTxId exception for write operation with TxId# " << op->GetGlobalTxId()); Y_VERIFY_S(op->IsImmediate(), - "Unexpected TNeedGlobalTxId exception for a non-immediate operation with TxId# " << op->GetTxId()); + "Unexpected TNeedGlobalTxId exception for a non-immediate write operation with TxId# " << op->GetTxId()); ctx.Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId(), @@ -169,7 +169,7 @@ class TWriteUnit : public TExecutionUnit { return EExecutionStatus::Continue; } - op->ChangeRecords() = std::move(tx->WriteTx()->GetCollectedChanges()); + op->ChangeRecords() = std::move(writeOp->WriteTx()->GetCollectedChanges()); DataShard.SysLocksTable().ApplyLocks(); DataShard.SubscribeNewLocks(ctx); @@ -183,10 +183,11 @@ class TWriteUnit : public TExecutionUnit { DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords())); DataShard.EmitHeartbeats(ctx); - TWriteOperation* tx = dynamic_cast(op.Get()); - Y_ABORT_UNLESS(tx != nullptr); + TWriteOperation* writeOp = dynamic_cast(op.Get()); + Y_ABORT_UNLESS(writeOp != nullptr); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << op->GetTxId() << " at " << tx->WriteTx()->TabletId() << " complete write operation"); + const auto& status = writeOp->WriteResult()->Record.status(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Completed write operation for " << *op << " at " << writeOp->WriteTx()->TabletId() << ", status " << status); //TODO: Counters // if (WriteResult->Record.status() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED || WriteResult->Record.status() == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED) { @@ -195,7 +196,7 @@ class TWriteUnit : public TExecutionUnit { // self->IncCounter(COUNTER_WRITE_ERROR); // } - ctx.Send(tx->GetEv()->Sender, tx->WriteResult().release(), 0, tx->GetEv()->Cookie); + ctx.Send(writeOp->GetEv()->Sender, writeOp->WriteResult().release(), 0, writeOp->GetEv()->Cookie); } }; // TWriteUnit diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index c7db84b49e8a..a2dfd13c428d 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -156,6 +156,7 @@ SRCS( export_scan.cpp finalize_build_index_unit.cpp finish_propose_unit.cpp + finish_propose_write_unit.cpp follower_edge.cpp initiate_build_index_unit.cpp key_conflicts.cpp