Skip to content

Commit

Permalink
CheckWriteUnit & FinishProposeWriteUnit (#631)
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored Dec 21, 2023
1 parent 67afd6e commit 0aedc7a
Show file tree
Hide file tree
Showing 16 changed files with 367 additions and 268 deletions.
10 changes: 10 additions & 0 deletions ydb/core/tx/data_events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ struct TDataEvents {
return result;
}

TString GetError() const {
return TStringBuilder() << "Status: " << Record.GetStatus() << " Issues: " << Record.GetIssues();
}

NKikimrDataEvents::TEvWriteResult::EStatus GetStatus() const { return Record.GetStatus(); }

bool IsPrepared() const { return GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED; }
bool IsComplete() const { return GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED; }
bool IsError() const { return !IsPrepared() && !IsComplete(); }

void SetOrbit(NLWTrace::TOrbit&& orbit) { Orbit = std::move(orbit); }
NLWTrace::TOrbit& GetOrbit() { return Orbit; }
NLWTrace::TOrbit&& MoveOrbit() { return std::move(Orbit); }
Expand Down
220 changes: 27 additions & 193 deletions ydb/core/tx/datashard/check_write_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "datashard_impl.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"

#include "ydb/core/tx/datashard/datashard_write_operation.h"
#include <ydb/core/tablet/tablet_exception.h>

namespace NKikimr {
Expand All @@ -24,7 +24,7 @@ class TCheckWriteUnit: public TExecutionUnit {

TCheckWriteUnit::TCheckWriteUnit(TDataShard &dataShard,
TPipeline &pipeline)
: TExecutionUnit(EExecutionUnitKind::CheckDataTx, false, dataShard, pipeline)
: TExecutionUnit(EExecutionUnitKind::CheckWrite, false, dataShard, pipeline)
{
}

Expand All @@ -45,126 +45,37 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
Y_ABORT_UNLESS(!op->IsAborted());

if (CheckRejectDataTx(op, ctx)) {
op->Abort(EExecutionUnitKind::FinishPropose);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

return EExecutionStatus::Executed;
}

//TODO: remove this return
return EExecutionStatus::Executed;

TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
auto dataTx = tx->GetDataTx();
Y_ABORT_UNLESS(dataTx);
Y_ABORT_UNLESS(dataTx->Ready() || dataTx->RequirePrepare());

if (dataTx->Ready()) {
DataShard.IncCounter(COUNTER_MINIKQL_PROGRAM_SIZE, dataTx->ProgramSize());
} else {
Y_ABORT_UNLESS(dataTx->RequirePrepare());
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Require prepare Tx " << op->GetTxId() << " at " << DataShard.TabletID()
<< ": " << dataTx->GetErrors());
}
TWriteOperation* writeOp = dynamic_cast<TWriteOperation*>(op.Get());
Y_VERIFY_S(writeOp, "cannot cast operation of kind " << op->GetKind());
auto writeTx = writeOp->WriteTx();
Y_ABORT_UNLESS(writeTx);
Y_ABORT_UNLESS(writeTx->Ready() || writeTx->RequirePrepare());

// Check if we are out of space and tx wants to update user
// or system table.
if (DataShard.IsAnyChannelYellowStop()
&& (dataTx->HasWrites() || !op->IsImmediate())) {
&& (writeTx->HasWrites() || !op->IsImmediate())) {
TString err = TStringBuilder()
<< "Cannot perform transaction: out of disk space at tablet "
<< DataShard.TabletID() << " txId " << op->GetTxId();

DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);

BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err);
op->Abort(EExecutionUnitKind::FinishPropose);
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
}

if (tx->IsMvccSnapshotRead()) {
auto snapshot = tx->GetMvccSnapshot();
if (DataShard.IsFollower()) {
TString err = TStringBuilder()
<< "Operation " << *op << " cannot read from snapshot " << snapshot
<< " using data tx on a follower " << DataShard.TabletID();

BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
op->Abort(EExecutionUnitKind::FinishPropose);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
} else if (!DataShard.IsMvccEnabled()) {
TString err = TStringBuilder()
<< "Operation " << *op << " reads from snapshot " << snapshot
<< " with MVCC feature disabled at " << DataShard.TabletID();

BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
op->Abort(EExecutionUnitKind::FinishPropose);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
} else if (snapshot < DataShard.GetSnapshotManager().GetLowWatermark()) {
TString err = TStringBuilder()
<< "Operation " << *op << " reads from stale snapshot " << snapshot
<< " at " << DataShard.TabletID();

BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
->AddError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST, err);
op->Abort(EExecutionUnitKind::FinishPropose);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

;
}
}

TEngineBay::TSizes txReads;

if (op->IsDataTx()) {
bool hasTotalKeysSizeLimit = !!dataTx->PerShardKeysSizeLimitBytes();
txReads = dataTx->CalcReadSizes(hasTotalKeysSizeLimit);

if (txReads.ReadSize > DataShard.GetTxReadSizeLimit()) {
TString err = TStringBuilder()
<< "Transaction read size " << txReads.ReadSize << " exceeds limit "
<< DataShard.GetTxReadSizeLimit() << " at tablet " << DataShard.TabletID()
<< " txId " << op->GetTxId();

BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
->AddError(NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED, err);
op->Abort(EExecutionUnitKind::FinishPropose);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
}

if (hasTotalKeysSizeLimit
&& txReads.TotalKeysSize > *dataTx->PerShardKeysSizeLimitBytes()) {
TString err = TStringBuilder()
<< "Transaction total keys size " << txReads.TotalKeysSize
<< " exceeds limit " << *dataTx->PerShardKeysSizeLimitBytes()
<< " at tablet " << DataShard.TabletID() << " txId " << op->GetTxId();

BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
->AddError(NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED, err);
op->Abort(EExecutionUnitKind::FinishPropose);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
}

for (const auto& key : dataTx->TxInfo().Keys) {
{
for (const auto& key : writeTx->TxInfo().Keys) {
if (key.IsWrite && DataShard.IsUserTable(key.Key->TableId)) {
ui64 keySize = 0;
for (const auto& cell : key.Key->Range.From) {
Expand All @@ -176,9 +87,8 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
<< " bytes which exceeds limit " << NLimits::MaxWriteKeySize
<< " bytes at " << DataShard.TabletID();

BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
op->Abort(EExecutionUnitKind::FinishPropose);
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

Expand All @@ -193,8 +103,8 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
<< "Transaction write column value of " << col.ImmediateUpdateSize
<< " bytes is larger than the allowed threshold";

BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::EXEC_ERROR)->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
op->Abort(EExecutionUnitKind::FinishPropose);
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

Expand All @@ -216,10 +126,10 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,

DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);

BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err);
op->Abort(EExecutionUnitKind::FinishPropose);
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
}
Expand All @@ -229,96 +139,20 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
}
}

if (op->IsReadTable()) {
const auto& record = dataTx->GetReadTableTransaction();
const auto& userTables = DataShard.GetUserTables();

TMaybe<TString> schemaChangedError;
if (auto it = userTables.find(record.GetTableId().GetTableId()); it != userTables.end()) {
const auto& tableInfo = *it->second;
for (const auto& columnRecord : record.GetColumns()) {
if (auto* columnInfo = tableInfo.Columns.FindPtr(columnRecord.GetId())) {
// TODO: column types don't change when bound by id, but we may want to check anyway
} else {
schemaChangedError = TStringBuilder() << "ReadTable cannot find column "
<< columnRecord.GetName() << " (" << columnRecord.GetId() << ")";
break;
}
}
// TODO: validate key ranges?
} else {
schemaChangedError = TStringBuilder() << "ReadTable cannot find table "
<< record.GetTableId().GetOwnerId() << ":" << record.GetTableId().GetTableId();
}

if (schemaChangedError) {
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::ERROR)
->AddError(NKikimrTxDataShard::TError::SCHEME_CHANGED, *schemaChangedError);
op->Abort(EExecutionUnitKind::FinishPropose);
return EExecutionStatus::Executed;
}

if (record.HasSnapshotStep() && record.HasSnapshotTxId()) {
if (!op->IsImmediate()) {
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError(
NKikimrTxDataShard::TError::BAD_ARGUMENT,
"ReadTable from snapshot must be an immediate transaction");
op->Abort(EExecutionUnitKind::FinishPropose);
return EExecutionStatus::Executed;
}

const TSnapshotKey key(
record.GetTableId().GetOwnerId(),
record.GetTableId().GetTableId(),
record.GetSnapshotStep(),
record.GetSnapshotTxId());

if (!DataShard.GetSnapshotManager().AcquireReference(key)) {
// TODO: try upgrading to mvcc snapshot when available
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError(
NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST,
TStringBuilder()
<< "Shard " << DataShard.TabletID()
<< " has no snapshot " << key);
op->Abort(EExecutionUnitKind::FinishPropose);
return EExecutionStatus::Executed;
}

op->SetAcquiredSnapshotKey(key);
op->SetUsingSnapshotFlag();
}
}

if (!op->IsImmediate()) {
if (!Pipeline.AssignPlanInterval(op)) {
TString err = TStringBuilder()
<< "Can't propose tx " << op->GetTxId() << " at blocked shard "
<< DataShard.TabletID();
BuildResult(op)->AddError(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED, err);
op->Abort(EExecutionUnitKind::FinishPropose);
TString err = TStringBuilder() << "Can't propose tx " << op->GetTxId() << " at blocked shard " << DataShard.TabletID();

writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
}

auto &res = BuildResult(op);
res->SetPrepared(op->GetMinStep(), op->GetMaxStep(), op->GetReceivedAt());

if (op->IsDataTx()) {
res->Record.SetReadSize(txReads.ReadSize);
res->Record.SetReplySize(txReads.ReplySize);

for (const auto& rs : txReads.OutReadSetSize) {
auto entry = res->Record.AddOutgoingReadSetInfo();
entry->SetShardId(rs.first);
entry->SetSize(rs.second);
}
}

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Prepared " << op->GetKind() << " transaction txId " << op->GetTxId()
<< " at tablet " << DataShard.TabletID());
writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(writeOp->WriteTx()->TabletId(), op->GetTxId(), {op->GetMinStep(), op->GetMaxStep(), {}}));
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Prepared " << *op << " at " << DataShard.TabletID());
}

return EExecutionStatus::Executed;
Expand Down
10 changes: 4 additions & 6 deletions ydb/core/tx/datashard/datashard.h
Original file line number Diff line number Diff line change
Expand Up @@ -641,12 +641,11 @@ struct TEvDataShard {
TString GetError() const {
if (Record.ErrorSize() > 0) {
TString result;
TStringOutput out(result);
for (ui32 i = 0; i < Record.ErrorSize(); ++i) {
if (Record.GetError(i).HasReason()) {
result += Record.GetError(i).GetReason() + "|";
} else {
result += "no reason|";
}
out << Record.GetError(i).GetKind() << " ("
<< (Record.GetError(i).HasReason() ? Record.GetError(i).GetReason() : "no reason")
<< ") |";
}
return result;
} else {
Expand All @@ -665,7 +664,6 @@ struct TEvDataShard {
error->SetKey(keyBuffer.data(), keyBuffer.size());
}
}

private:
bool ForceOnline = false;
bool ForceDirty = false;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2018,6 +2018,7 @@ class TDataShard

enum ELogThrottlerType {
CheckDataTxUnit_Execute = 0,
CheckWriteUnit_Execute = 0,
TxProposeTransactionBase_Execute,
FinishProposeUnit_CompleteRequest,
FinishProposeUnit_UpdateCounters,
Expand Down
Loading

0 comments on commit 0aedc7a

Please sign in to comment.