Skip to content

Commit

Permalink
Review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
SammyVimes committed Dec 20, 2023
1 parent 6d5c9c8 commit cff4055
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 72 deletions.
7 changes: 4 additions & 3 deletions ydb/core/keyvalue/keyvalue_flat_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ class TKeyValueFlat : public TActor<TKeyValueFlat>, public NTabletFlatExecutor::
TKeyValueFlat *Self;
TVector<TLogoBlobID> TrashBeingCommitted;

TTxRequest(THolder<TIntermediate> intermediate, TKeyValueFlat *keyValueFlat)
: Intermediate(std::move(intermediate))
TTxRequest(THolder<TIntermediate> intermediate, TKeyValueFlat *keyValueFlat, NWilson::TTraceId &&traceId)
: NTabletFlatExecutor::ITransaction(std::move(traceId))
, Intermediate(std::move(intermediate))
, Self(keyValueFlat)
{
Intermediate->Response.SetStatus(NMsgBusProxy::MSTATUS_UNKNOWN);
Expand Down Expand Up @@ -390,7 +391,7 @@ class TKeyValueFlat : public TActor<TKeyValueFlat>, public NTabletFlatExecutor::

State.OnEvIntermediate(*(ev->Get()->Intermediate), ctx);
auto traceId = ev->Get()->Intermediate->Span.GetTraceId();
Execute(new TTxRequest(std::move(ev->Get()->Intermediate), this), ctx, std::move(traceId));
Execute(new TTxRequest(std::move(ev->Get()->Intermediate), this, std::move(traceId)), ctx);
}

void Handle(TEvKeyValue::TEvNotify::TPtr &ev, const TActorContext &ctx) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tablet_flat/flat_exec_seat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ namespace NTabletFlatExecutor {
}
Self->Complete(ctx);

TxSpan.Attribute("rw", isRW);
TxSpan.EndOk();
Self->TxSpan.Attribute("rw", isRW);
Self->TxSpan.EndOk();
}

void TSeat::Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept {
Self->Terminate(reason, ctx);

TxSpan.EndError("Terminated");
Self->TxSpan.EndError("Terminated");
}

} // namespace NTabletFlatExecutor
Expand Down
19 changes: 5 additions & 14 deletions ydb/core/tablet_flat/flat_exec_seat.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ namespace NTabletFlatExecutor {

TSeat(const TSeat&) = delete;

TSeat(ui32 uniqId, TAutoPtr<ITransaction> self, NWilson::TTraceId txTraceId)
TSeat(ui32 uniqId, TAutoPtr<ITransaction> self)
: UniqID(uniqId)
, Self(self)
{
if (txTraceId) {
SetupTxSpan(std::move(txTraceId));
}
}

void Describe(IOutputStream &out) const noexcept
Expand All @@ -37,38 +34,32 @@ namespace NTabletFlatExecutor {

void Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept;

void SetupTxSpan(NWilson::TTraceId txTraceId) noexcept {
TxSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(txTraceId), "Tablet.Transaction");
TxSpan.Attribute("Type", TypeName(*Self));
}

NWilson::TSpan CreateExecutionSpan() noexcept {
return NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Execute");
return NWilson::TSpan(TWilsonTablet::Tablet, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Execute");
}

void StartEnqueuedSpan() noexcept {
WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Enqueued");
WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Enqueued");
}

void FinishEnqueuedSpan() noexcept {
WaitingSpan.EndOk();
}

void CreatePendingSpan() noexcept {
WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Pending");
WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Pending");
}

void FinishPendingSpan() noexcept {
WaitingSpan.EndOk();
}

NWilson::TTraceId GetTxTraceId() const noexcept {
return TxSpan.GetTraceId();
return Self->TxSpan.GetTraceId();
}

const ui64 UniqID = Max<ui64>();
const TAutoPtr<ITransaction> Self;
NWilson::TSpan TxSpan;
NWilson::TSpan WaitingSpan;
ui64 Retries = 0;
TPinned Pinned;
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1576,10 +1576,10 @@ bool TExecutor::CanExecuteTransaction() const {
return Stats->IsActive && (Stats->IsFollower || PendingPartSwitches.empty()) && !BrokenTransaction;
}

void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, const TActorContext &ctx, NWilson::TTraceId traceId) {
void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, const TActorContext &ctx) {
Y_ABORT_UNLESS(ActivationQueue, "attempt to execute transaction before activation");

TAutoPtr<TSeat> seat = new TSeat(++TransactionUniqCounter, self, std::move(traceId));
TAutoPtr<TSeat> seat = new TSeat(++TransactionUniqCounter, self);

LWTRACK(TransactionBegin, seat->Self->Orbit, seat->UniqID, Owner->TabletID(), TypeName(*seat->Self));

Expand Down Expand Up @@ -1634,12 +1634,12 @@ void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, cons
ExecuteTransaction(seat, ctx);
}

void TExecutor::Execute(TAutoPtr<ITransaction> self, const TActorContext &ctx, NWilson::TTraceId traceId) {
DoExecute(self, true, ctx, std::move(traceId));
void TExecutor::Execute(TAutoPtr<ITransaction> self, const TActorContext &ctx) {
DoExecute(self, true, ctx);
}

void TExecutor::Enqueue(TAutoPtr<ITransaction> self, const TActorContext &ctx, NWilson::TTraceId traceId) {
DoExecute(self, false, ctx, std::move(traceId));
void TExecutor::Enqueue(TAutoPtr<ITransaction> self, const TActorContext &ctx) {
DoExecute(self, false, ctx);
}

void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ctx) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tablet_flat/flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -629,9 +629,9 @@ class TExecutor
void Boot(TEvTablet::TEvBoot::TPtr &ev, const TActorContext &ctx) override;
void Restored(TEvTablet::TEvRestored::TPtr &ev, const TActorContext &ctx) override;
void DetachTablet(const TActorContext &ctx) override;
void DoExecute(TAutoPtr<ITransaction> transaction, bool allowImmediate, const TActorContext &ctx, NWilson::TTraceId traceId);
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) override;
void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) override;
void DoExecute(TAutoPtr<ITransaction> transaction, bool allowImmediate, const TActorContext &ctx);
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override;
void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override;

TLeaseCommit* AttachLeaseCommit(TLogCommit* commit, bool force = false);
TLeaseCommit* EnsureReadOnlyLease(TMonotonic at);
Expand Down
17 changes: 10 additions & 7 deletions ydb/core/tablet_flat/flat_executor_txloglogic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,26 +184,29 @@ TLogicRedo::TCommitRWTransactionResult TLogicRedo::CommitRWTransaction(
if (!Batch->Commit) {
Batch->Commit = CommitManager->Begin(false, ECommit::Redo, seat->GetTxTraceId());
} else {
const TAutoPtr<ITransaction> &tx = seat->Self;
// Batch commit's TraceId will be used for all blobstorage requests of the batch.
if (!Batch->Commit->TraceId && seat->TxSpan) {
if (!Batch->Commit->TraceId && tx->TxSpan) {
// It is possible that the original or consequent transactions didn't have a TraceId,
// but if a new transaction of a batch has TraceId, use it for the whole batch
// (and consequent traced transactions).
Batch->Commit->TraceId = seat->GetTxTraceId();
} else {
seat->TxSpan.Link(Batch->Commit->TraceId, {});
tx->TxSpan.Link(Batch->Commit->TraceId, {});
}

for (TSeat* tx = Batch->Commit->FirstTx; tx != nullptr; tx = tx->NextCommitTx) {
i64 batchSize = Batch->Bodies.size() + 1;

for (TSeat* curSeat = Batch->Commit->FirstTx; curSeat != nullptr; curSeat = curSeat->NextCommitTx) {
// Update batch size of the transaction, whose TraceId the commit uses (first transaction in batch, that has TraceId).
if (tx->TxSpan) {
i64 batchSize = Batch->Bodies.size() + 1;
tx->TxSpan.Attribute("BatchSize", batchSize);
if (curSeat->Self->TxSpan) {
curSeat->Self->TxSpan.Attribute("BatchSize", batchSize);
break;
}
}

seat->TxSpan.Attribute("Batched", true);
tx->TxSpan.Attribute("Batched", true);
tx->TxSpan.Attribute("BatchSize", batchSize);
}

Batch->Commit->PushTx(seat.Get());
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tablet_flat/tablet_flat_executed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ IExecutor* TTabletExecutedFlat::CreateExecutor(const TActorContext &ctx) {
return Executor();
}

void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId) {
void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) {
Y_UNUSED(ctx);
Execute(transaction, std::move(traceId));
Execute(transaction);
}

void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction, NWilson::TTraceId traceId) {
void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction) {
if (transaction)
static_cast<TExecutor*>(Executor())->Execute(transaction, ExecutorCtx(*TlsActivationContext), std::move(traceId));
static_cast<TExecutor*>(Executor())->Execute(transaction, ExecutorCtx(*TlsActivationContext));
}

void TTabletExecutedFlat::EnqueueExecute(TAutoPtr<ITransaction> transaction, NWilson::TTraceId traceId) {
void TTabletExecutedFlat::EnqueueExecute(TAutoPtr<ITransaction> transaction) {
if (transaction)
static_cast<TExecutor*>(Executor())->Enqueue(transaction, ExecutorCtx(*TlsActivationContext), std::move(traceId));
static_cast<TExecutor*>(Executor())->Enqueue(transaction, ExecutorCtx(*TlsActivationContext));
}

const NTable::TScheme& TTabletExecutedFlat::Scheme() const noexcept {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tablet_flat/tablet_flat_executed.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ class TTabletExecutedFlat : public NFlatExecutorSetup::ITablet {
IExecutor* Executor() const { return Executor0; }
const TInstant StartTime() const { return StartTime0; }

void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {});
void Execute(TAutoPtr<ITransaction> transaction, NWilson::TTraceId traceId = {});
void EnqueueExecute(TAutoPtr<ITransaction> transaction, NWilson::TTraceId traceId = {});
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx);
void Execute(TAutoPtr<ITransaction> transaction);
void EnqueueExecute(TAutoPtr<ITransaction> transaction);

const NTable::TScheme& Scheme() const noexcept;

Expand Down
23 changes: 21 additions & 2 deletions ydb/core/tablet_flat/tablet_flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/core/base/tablet.h>
#include <ydb/core/base/blobstorage.h>
#include <ydb/library/actors/wilson/wilson_span.h>
#include <ydb/library/wilson_ids/wilson.h>
#include <library/cpp/lwtrace/shuttle.h>
#include <util/generic/maybe.h>
#include <util/system/type_name.h>
Expand Down Expand Up @@ -278,6 +279,12 @@ class ITransaction : TNonCopyable {
: Orbit(std::move(orbit))
{ }

ITransaction(NWilson::TTraceId &&traceId)
: TxSpan(NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "Tablet.Transaction"))
{
TxSpan.Attribute("Type", TypeName(*this));
}

virtual ~ITransaction() = default;
/// @return true if execution complete and transaction is ready for commit
virtual bool Execute(TTransactionContext &txc, const TActorContext &ctx) = 0;
Expand All @@ -293,8 +300,15 @@ class ITransaction : TNonCopyable {
out << TypeName(*this);
}

void SetupTxSpan(NWilson::TTraceId traceId) noexcept {
TxSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "Tablet.Transaction");
TxSpan.Attribute("Type", TypeName(*this));
}

public:
NLWTrace::TOrbit Orbit;

NWilson::TSpan TxSpan;
};

template<typename T>
Expand All @@ -313,6 +327,11 @@ class TTransactionBase : public ITransaction {
: ITransaction(std::move(orbit))
, Self(self)
{ }

TTransactionBase(T *self, NWilson::TTraceId &&traceId)
: ITransaction(std::move(traceId))
, Self(self)
{ }
};

struct TExecutorStats {
Expand Down Expand Up @@ -518,8 +537,8 @@ namespace NFlatExecutorSetup {
// all followers had completed log with requested gc-barrier
virtual void FollowerGcApplied(ui32 step, TDuration followerSyncDelay) = 0;

virtual void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) = 0;
virtual void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) = 0;
virtual void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) = 0;
virtual void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) = 0;

virtual void ConfirmReadOnlyLease(TMonotonic at) = 0;
virtual void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) = 0;
Expand Down
16 changes: 6 additions & 10 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2818,8 +2818,7 @@ void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr &&
UpdateProposeQueueSize();
} else {
// Prepare planned transactions as soon as possible
TTxProposeTransactionBase *tx = new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false);
Execute(tx, ctx, tx->GetTraceId());
Execute(new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx);
}
}

Expand All @@ -2837,8 +2836,7 @@ void TDataShard::ProposeTransaction(NEvents::TDataEvents::TEvWrite::TPtr&& ev, c
UpdateProposeQueueSize();
} else {
// Prepare planned transactions as soon as possible
TTxWrite *tx = new TTxWrite(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false);
Execute(tx, ctx, tx->GetTraceId());
Execute(new TTxWrite(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx);
}
}

Expand Down Expand Up @@ -2903,14 +2901,12 @@ void TDataShard::Handle(TEvPrivate::TEvDelayedProposeTransaction::TPtr &ev, cons
switch (item.Event->GetTypeRewrite()) {
case TEvDataShard::TEvProposeTransaction::EventType: {
auto event = IEventHandle::Downcast<TEvDataShard::TEvProposeTransaction>(std::move(item.Event));
TTxProposeTransactionBase *tx = new TTxProposeTransactionBase(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true);
Execute(tx, ctx, tx->GetTraceId());
Execute(new TTxProposeTransactionBase(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
return;
}
case NEvents::TDataEvents::TEvWrite::EventType: {
auto event = IEventHandle::Downcast<NEvents::TDataEvents::TEvWrite>(std::move(item.Event));
TTxWrite *tx = new TTxWrite(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true);
Execute(tx, ctx, tx->GetTraceId());
Execute(new TTxWrite(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
return;
}
default:
Expand Down Expand Up @@ -4121,13 +4117,13 @@ bool TDataShard::ReassignChannelsEnabled() const {
}

void TDataShard::ExecuteProgressTx(const TActorContext& ctx) {
Execute(new TTxProgressTransaction(this, {}), ctx);
Execute(new TTxProgressTransaction(this, {}, {}), ctx);
}

void TDataShard::ExecuteProgressTx(TOperation::TPtr op, const TActorContext& ctx) {
Y_ABORT_UNLESS(op->IsInProgress());
NWilson::TTraceId traceId = op->GetTraceId();
Execute(new TTxProgressTransaction(this, std::move(op)), ctx, std::move(traceId));
Execute(new TTxProgressTransaction(this, std::move(op), std::move(traceId)), ctx);
}

TDuration TDataShard::CleanupTimeout() const {
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/datashard/datashard__progress_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
namespace NKikimr {
namespace NDataShard {

TDataShard::TTxProgressTransaction::TTxProgressTransaction(TDataShard *self, TOperation::TPtr op)
: TBase(self)
TDataShard::TTxProgressTransaction::TTxProgressTransaction(TDataShard *self, TOperation::TPtr op, NWilson::TTraceId &&traceId)
: TBase(self, std::move(traceId))
, ActiveOp(std::move(op))
{}

Expand Down Expand Up @@ -61,11 +61,11 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const
ActiveOp->IncrementInProgress();

if (ActiveOp->OperationSpan) {
if (!txc.Seat.TxSpan) {
if (!TxSpan) {
// If Progress Tx for this operation is being executed the first time,
// it won't have a span, because we choose what operation to run in the transaction itself.
// We create transaction span and transaction execution spans here instead.
txc.Seat.SetupTxSpan(ActiveOp->GetTraceId());
SetupTxSpan(ActiveOp->GetTraceId());
auxExecuteSpan = txc.Seat.CreateExecutionSpan();
}
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard__propose_tx_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ TDataShard::TTxProposeTransactionBase::TTxProposeTransactionBase(TDataShard *sel
TEvDataShard::TEvProposeTransaction::TPtr &&ev,
TInstant receivedAt, ui64 tieBreakerIndex,
bool delayed)
: TBase(self)
: TBase(self, std::move(ev->TraceId))
, Ev(std::move(ev))
, ReceivedAt(receivedAt)
, TieBreakerIndex(tieBreakerIndex)
, Kind(static_cast<EOperationKind>(Ev->Get()->GetTxKind()))
, TxId(Ev->Get()->GetTxId())
, Acked(!delayed)
, ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, std::move(Ev->TraceId), "ProposeTransaction", NWilson::EFlags::AUTO_END)
, ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, TxSpan.GetTraceId(), "ProposeTransaction", NWilson::EFlags::AUTO_END)
{
ProposeTransactionSpan.Attribute("Shard", std::to_string(self->TabletID()));
}
Expand Down
Loading

0 comments on commit cff4055

Please sign in to comment.