From cff405577622ac52e34042634d8dcefa74859e14 Mon Sep 17 00:00:00 2001 From: Semyon Danilov Date: Wed, 20 Dec 2023 09:23:14 +0000 Subject: [PATCH] Review fixes --- ydb/core/keyvalue/keyvalue_flat_impl.h | 7 +++--- ydb/core/tablet_flat/flat_exec_seat.cpp | 6 ++--- ydb/core/tablet_flat/flat_exec_seat.h | 19 ++++----------- ydb/core/tablet_flat/flat_executor.cpp | 12 +++++----- ydb/core/tablet_flat/flat_executor.h | 6 ++--- .../tablet_flat/flat_executor_txloglogic.cpp | 17 ++++++++------ ydb/core/tablet_flat/tablet_flat_executed.cpp | 12 +++++----- ydb/core/tablet_flat/tablet_flat_executed.h | 6 ++--- ydb/core/tablet_flat/tablet_flat_executor.h | 23 +++++++++++++++++-- ydb/core/tx/datashard/datashard.cpp | 16 +++++-------- .../tx/datashard/datashard__progress_tx.cpp | 8 +++---- .../datashard/datashard__propose_tx_base.cpp | 4 ++-- .../tx/datashard/datashard__read_iterator.cpp | 12 +++++----- ydb/core/tx/datashard/datashard__write.cpp | 4 ++-- ydb/core/tx/datashard/datashard_txs.h | 2 +- 15 files changed, 82 insertions(+), 72 deletions(-) diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h index 2d6c0a1aea16..0e200ad08412 100644 --- a/ydb/core/keyvalue/keyvalue_flat_impl.h +++ b/ydb/core/keyvalue/keyvalue_flat_impl.h @@ -122,8 +122,9 @@ class TKeyValueFlat : public TActor, public NTabletFlatExecutor:: TKeyValueFlat *Self; TVector TrashBeingCommitted; - TTxRequest(THolder intermediate, TKeyValueFlat *keyValueFlat) - : Intermediate(std::move(intermediate)) + TTxRequest(THolder intermediate, TKeyValueFlat *keyValueFlat, NWilson::TTraceId &&traceId) + : NTabletFlatExecutor::ITransaction(std::move(traceId)) + , Intermediate(std::move(intermediate)) , Self(keyValueFlat) { Intermediate->Response.SetStatus(NMsgBusProxy::MSTATUS_UNKNOWN); @@ -390,7 +391,7 @@ class TKeyValueFlat : public TActor, public NTabletFlatExecutor:: State.OnEvIntermediate(*(ev->Get()->Intermediate), ctx); auto traceId = ev->Get()->Intermediate->Span.GetTraceId(); - Execute(new TTxRequest(std::move(ev->Get()->Intermediate), this), ctx, std::move(traceId)); + Execute(new TTxRequest(std::move(ev->Get()->Intermediate), this, std::move(traceId)), ctx); } void Handle(TEvKeyValue::TEvNotify::TPtr &ev, const TActorContext &ctx) { diff --git a/ydb/core/tablet_flat/flat_exec_seat.cpp b/ydb/core/tablet_flat/flat_exec_seat.cpp index 1af617a924c8..028f4f1eebde 100644 --- a/ydb/core/tablet_flat/flat_exec_seat.cpp +++ b/ydb/core/tablet_flat/flat_exec_seat.cpp @@ -9,14 +9,14 @@ namespace NTabletFlatExecutor { } Self->Complete(ctx); - TxSpan.Attribute("rw", isRW); - TxSpan.EndOk(); + Self->TxSpan.Attribute("rw", isRW); + Self->TxSpan.EndOk(); } void TSeat::Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept { Self->Terminate(reason, ctx); - TxSpan.EndError("Terminated"); + Self->TxSpan.EndError("Terminated"); } } // namespace NTabletFlatExecutor diff --git a/ydb/core/tablet_flat/flat_exec_seat.h b/ydb/core/tablet_flat/flat_exec_seat.h index 83234d434523..0e501da2405d 100644 --- a/ydb/core/tablet_flat/flat_exec_seat.h +++ b/ydb/core/tablet_flat/flat_exec_seat.h @@ -17,13 +17,10 @@ namespace NTabletFlatExecutor { TSeat(const TSeat&) = delete; - TSeat(ui32 uniqId, TAutoPtr self, NWilson::TTraceId txTraceId) + TSeat(ui32 uniqId, TAutoPtr self) : UniqID(uniqId) , Self(self) { - if (txTraceId) { - SetupTxSpan(std::move(txTraceId)); - } } void Describe(IOutputStream &out) const noexcept @@ -37,17 +34,12 @@ namespace NTabletFlatExecutor { void Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept; - void SetupTxSpan(NWilson::TTraceId txTraceId) noexcept { - TxSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(txTraceId), "Tablet.Transaction"); - TxSpan.Attribute("Type", TypeName(*Self)); - } - NWilson::TSpan CreateExecutionSpan() noexcept { - return NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Execute"); + return NWilson::TSpan(TWilsonTablet::Tablet, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Execute"); } void StartEnqueuedSpan() noexcept { - WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Enqueued"); + WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Enqueued"); } void FinishEnqueuedSpan() noexcept { @@ -55,7 +47,7 @@ namespace NTabletFlatExecutor { } void CreatePendingSpan() noexcept { - WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Pending"); + WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Pending"); } void FinishPendingSpan() noexcept { @@ -63,12 +55,11 @@ namespace NTabletFlatExecutor { } NWilson::TTraceId GetTxTraceId() const noexcept { - return TxSpan.GetTraceId(); + return Self->TxSpan.GetTraceId(); } const ui64 UniqID = Max(); const TAutoPtr Self; - NWilson::TSpan TxSpan; NWilson::TSpan WaitingSpan; ui64 Retries = 0; TPinned Pinned; diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 9967b930abf8..645e5c2774e5 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -1576,10 +1576,10 @@ bool TExecutor::CanExecuteTransaction() const { return Stats->IsActive && (Stats->IsFollower || PendingPartSwitches.empty()) && !BrokenTransaction; } -void TExecutor::DoExecute(TAutoPtr self, bool allowImmediate, const TActorContext &ctx, NWilson::TTraceId traceId) { +void TExecutor::DoExecute(TAutoPtr self, bool allowImmediate, const TActorContext &ctx) { Y_ABORT_UNLESS(ActivationQueue, "attempt to execute transaction before activation"); - TAutoPtr seat = new TSeat(++TransactionUniqCounter, self, std::move(traceId)); + TAutoPtr seat = new TSeat(++TransactionUniqCounter, self); LWTRACK(TransactionBegin, seat->Self->Orbit, seat->UniqID, Owner->TabletID(), TypeName(*seat->Self)); @@ -1634,12 +1634,12 @@ void TExecutor::DoExecute(TAutoPtr self, bool allowImmediate, cons ExecuteTransaction(seat, ctx); } -void TExecutor::Execute(TAutoPtr self, const TActorContext &ctx, NWilson::TTraceId traceId) { - DoExecute(self, true, ctx, std::move(traceId)); +void TExecutor::Execute(TAutoPtr self, const TActorContext &ctx) { + DoExecute(self, true, ctx); } -void TExecutor::Enqueue(TAutoPtr self, const TActorContext &ctx, NWilson::TTraceId traceId) { - DoExecute(self, false, ctx, std::move(traceId)); +void TExecutor::Enqueue(TAutoPtr self, const TActorContext &ctx) { + DoExecute(self, false, ctx); } void TExecutor::ExecuteTransaction(TAutoPtr seat, const TActorContext &ctx) { diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index 2d136d7bd957..594ccb0dbecd 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -629,9 +629,9 @@ class TExecutor void Boot(TEvTablet::TEvBoot::TPtr &ev, const TActorContext &ctx) override; void Restored(TEvTablet::TEvRestored::TPtr &ev, const TActorContext &ctx) override; void DetachTablet(const TActorContext &ctx) override; - void DoExecute(TAutoPtr transaction, bool allowImmediate, const TActorContext &ctx, NWilson::TTraceId traceId); - void Execute(TAutoPtr transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) override; - void Enqueue(TAutoPtr transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) override; + void DoExecute(TAutoPtr transaction, bool allowImmediate, const TActorContext &ctx); + void Execute(TAutoPtr transaction, const TActorContext &ctx) override; + void Enqueue(TAutoPtr transaction, const TActorContext &ctx) override; TLeaseCommit* AttachLeaseCommit(TLogCommit* commit, bool force = false); TLeaseCommit* EnsureReadOnlyLease(TMonotonic at); diff --git a/ydb/core/tablet_flat/flat_executor_txloglogic.cpp b/ydb/core/tablet_flat/flat_executor_txloglogic.cpp index 481940bd4bfd..34add0bbd1ae 100644 --- a/ydb/core/tablet_flat/flat_executor_txloglogic.cpp +++ b/ydb/core/tablet_flat/flat_executor_txloglogic.cpp @@ -184,26 +184,29 @@ TLogicRedo::TCommitRWTransactionResult TLogicRedo::CommitRWTransaction( if (!Batch->Commit) { Batch->Commit = CommitManager->Begin(false, ECommit::Redo, seat->GetTxTraceId()); } else { + const TAutoPtr &tx = seat->Self; // Batch commit's TraceId will be used for all blobstorage requests of the batch. - if (!Batch->Commit->TraceId && seat->TxSpan) { + if (!Batch->Commit->TraceId && tx->TxSpan) { // It is possible that the original or consequent transactions didn't have a TraceId, // but if a new transaction of a batch has TraceId, use it for the whole batch // (and consequent traced transactions). Batch->Commit->TraceId = seat->GetTxTraceId(); } else { - seat->TxSpan.Link(Batch->Commit->TraceId, {}); + tx->TxSpan.Link(Batch->Commit->TraceId, {}); } - for (TSeat* tx = Batch->Commit->FirstTx; tx != nullptr; tx = tx->NextCommitTx) { + i64 batchSize = Batch->Bodies.size() + 1; + + for (TSeat* curSeat = Batch->Commit->FirstTx; curSeat != nullptr; curSeat = curSeat->NextCommitTx) { // Update batch size of the transaction, whose TraceId the commit uses (first transaction in batch, that has TraceId). - if (tx->TxSpan) { - i64 batchSize = Batch->Bodies.size() + 1; - tx->TxSpan.Attribute("BatchSize", batchSize); + if (curSeat->Self->TxSpan) { + curSeat->Self->TxSpan.Attribute("BatchSize", batchSize); break; } } - seat->TxSpan.Attribute("Batched", true); + tx->TxSpan.Attribute("Batched", true); + tx->TxSpan.Attribute("BatchSize", batchSize); } Batch->Commit->PushTx(seat.Get()); diff --git a/ydb/core/tablet_flat/tablet_flat_executed.cpp b/ydb/core/tablet_flat/tablet_flat_executed.cpp index f96a73a6d88a..de09bd2f6bac 100644 --- a/ydb/core/tablet_flat/tablet_flat_executed.cpp +++ b/ydb/core/tablet_flat/tablet_flat_executed.cpp @@ -29,19 +29,19 @@ IExecutor* TTabletExecutedFlat::CreateExecutor(const TActorContext &ctx) { return Executor(); } -void TTabletExecutedFlat::Execute(TAutoPtr transaction, const TActorContext &ctx, NWilson::TTraceId traceId) { +void TTabletExecutedFlat::Execute(TAutoPtr transaction, const TActorContext &ctx) { Y_UNUSED(ctx); - Execute(transaction, std::move(traceId)); + Execute(transaction); } -void TTabletExecutedFlat::Execute(TAutoPtr transaction, NWilson::TTraceId traceId) { +void TTabletExecutedFlat::Execute(TAutoPtr transaction) { if (transaction) - static_cast(Executor())->Execute(transaction, ExecutorCtx(*TlsActivationContext), std::move(traceId)); + static_cast(Executor())->Execute(transaction, ExecutorCtx(*TlsActivationContext)); } -void TTabletExecutedFlat::EnqueueExecute(TAutoPtr transaction, NWilson::TTraceId traceId) { +void TTabletExecutedFlat::EnqueueExecute(TAutoPtr transaction) { if (transaction) - static_cast(Executor())->Enqueue(transaction, ExecutorCtx(*TlsActivationContext), std::move(traceId)); + static_cast(Executor())->Enqueue(transaction, ExecutorCtx(*TlsActivationContext)); } const NTable::TScheme& TTabletExecutedFlat::Scheme() const noexcept { diff --git a/ydb/core/tablet_flat/tablet_flat_executed.h b/ydb/core/tablet_flat/tablet_flat_executed.h index 546f97e4f730..e1a61e40393e 100644 --- a/ydb/core/tablet_flat/tablet_flat_executed.h +++ b/ydb/core/tablet_flat/tablet_flat_executed.h @@ -23,9 +23,9 @@ class TTabletExecutedFlat : public NFlatExecutorSetup::ITablet { IExecutor* Executor() const { return Executor0; } const TInstant StartTime() const { return StartTime0; } - void Execute(TAutoPtr transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}); - void Execute(TAutoPtr transaction, NWilson::TTraceId traceId = {}); - void EnqueueExecute(TAutoPtr transaction, NWilson::TTraceId traceId = {}); + void Execute(TAutoPtr transaction, const TActorContext &ctx); + void Execute(TAutoPtr transaction); + void EnqueueExecute(TAutoPtr transaction); const NTable::TScheme& Scheme() const noexcept; diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index a596fbb7cf63..fa14d8bb7d88 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -278,6 +279,12 @@ class ITransaction : TNonCopyable { : Orbit(std::move(orbit)) { } + ITransaction(NWilson::TTraceId &&traceId) + : TxSpan(NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "Tablet.Transaction")) + { + TxSpan.Attribute("Type", TypeName(*this)); + } + virtual ~ITransaction() = default; /// @return true if execution complete and transaction is ready for commit virtual bool Execute(TTransactionContext &txc, const TActorContext &ctx) = 0; @@ -293,8 +300,15 @@ class ITransaction : TNonCopyable { out << TypeName(*this); } + void SetupTxSpan(NWilson::TTraceId traceId) noexcept { + TxSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "Tablet.Transaction"); + TxSpan.Attribute("Type", TypeName(*this)); + } + public: NLWTrace::TOrbit Orbit; + + NWilson::TSpan TxSpan; }; template @@ -313,6 +327,11 @@ class TTransactionBase : public ITransaction { : ITransaction(std::move(orbit)) , Self(self) { } + + TTransactionBase(T *self, NWilson::TTraceId &&traceId) + : ITransaction(std::move(traceId)) + , Self(self) + { } }; struct TExecutorStats { @@ -518,8 +537,8 @@ namespace NFlatExecutorSetup { // all followers had completed log with requested gc-barrier virtual void FollowerGcApplied(ui32 step, TDuration followerSyncDelay) = 0; - virtual void Execute(TAutoPtr transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) = 0; - virtual void Enqueue(TAutoPtr transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) = 0; + virtual void Execute(TAutoPtr transaction, const TActorContext &ctx) = 0; + virtual void Enqueue(TAutoPtr transaction, const TActorContext &ctx) = 0; virtual void ConfirmReadOnlyLease(TMonotonic at) = 0; virtual void ConfirmReadOnlyLease(TMonotonic at, std::function callback) = 0; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 879a61750b52..d537fce3f583 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2818,8 +2818,7 @@ void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr && UpdateProposeQueueSize(); } else { // Prepare planned transactions as soon as possible - TTxProposeTransactionBase *tx = new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false); - Execute(tx, ctx, tx->GetTraceId()); + Execute(new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx); } } @@ -2837,8 +2836,7 @@ void TDataShard::ProposeTransaction(NEvents::TDataEvents::TEvWrite::TPtr&& ev, c UpdateProposeQueueSize(); } else { // Prepare planned transactions as soon as possible - TTxWrite *tx = new TTxWrite(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false); - Execute(tx, ctx, tx->GetTraceId()); + Execute(new TTxWrite(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx); } } @@ -2903,14 +2901,12 @@ void TDataShard::Handle(TEvPrivate::TEvDelayedProposeTransaction::TPtr &ev, cons switch (item.Event->GetTypeRewrite()) { case TEvDataShard::TEvProposeTransaction::EventType: { auto event = IEventHandle::Downcast(std::move(item.Event)); - TTxProposeTransactionBase *tx = new TTxProposeTransactionBase(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true); - Execute(tx, ctx, tx->GetTraceId()); + Execute(new TTxProposeTransactionBase(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx); return; } case NEvents::TDataEvents::TEvWrite::EventType: { auto event = IEventHandle::Downcast(std::move(item.Event)); - TTxWrite *tx = new TTxWrite(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true); - Execute(tx, ctx, tx->GetTraceId()); + Execute(new TTxWrite(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx); return; } default: @@ -4121,13 +4117,13 @@ bool TDataShard::ReassignChannelsEnabled() const { } void TDataShard::ExecuteProgressTx(const TActorContext& ctx) { - Execute(new TTxProgressTransaction(this, {}), ctx); + Execute(new TTxProgressTransaction(this, {}, {}), ctx); } void TDataShard::ExecuteProgressTx(TOperation::TPtr op, const TActorContext& ctx) { Y_ABORT_UNLESS(op->IsInProgress()); NWilson::TTraceId traceId = op->GetTraceId(); - Execute(new TTxProgressTransaction(this, std::move(op)), ctx, std::move(traceId)); + Execute(new TTxProgressTransaction(this, std::move(op), std::move(traceId)), ctx); } TDuration TDataShard::CleanupTimeout() const { diff --git a/ydb/core/tx/datashard/datashard__progress_tx.cpp b/ydb/core/tx/datashard/datashard__progress_tx.cpp index 0181855a764b..bc71c887af47 100644 --- a/ydb/core/tx/datashard/datashard__progress_tx.cpp +++ b/ydb/core/tx/datashard/datashard__progress_tx.cpp @@ -6,8 +6,8 @@ namespace NKikimr { namespace NDataShard { -TDataShard::TTxProgressTransaction::TTxProgressTransaction(TDataShard *self, TOperation::TPtr op) - : TBase(self) +TDataShard::TTxProgressTransaction::TTxProgressTransaction(TDataShard *self, TOperation::TPtr op, NWilson::TTraceId &&traceId) + : TBase(self, std::move(traceId)) , ActiveOp(std::move(op)) {} @@ -61,11 +61,11 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const ActiveOp->IncrementInProgress(); if (ActiveOp->OperationSpan) { - if (!txc.Seat.TxSpan) { + if (!TxSpan) { // If Progress Tx for this operation is being executed the first time, // it won't have a span, because we choose what operation to run in the transaction itself. // We create transaction span and transaction execution spans here instead. - txc.Seat.SetupTxSpan(ActiveOp->GetTraceId()); + SetupTxSpan(ActiveOp->GetTraceId()); auxExecuteSpan = txc.Seat.CreateExecutionSpan(); } } diff --git a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp index fa0e73e1013d..51fcf70f0827 100644 --- a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp +++ b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp @@ -15,14 +15,14 @@ TDataShard::TTxProposeTransactionBase::TTxProposeTransactionBase(TDataShard *sel TEvDataShard::TEvProposeTransaction::TPtr &&ev, TInstant receivedAt, ui64 tieBreakerIndex, bool delayed) - : TBase(self) + : TBase(self, std::move(ev->TraceId)) , Ev(std::move(ev)) , ReceivedAt(receivedAt) , TieBreakerIndex(tieBreakerIndex) , Kind(static_cast(Ev->Get()->GetTxKind())) , TxId(Ev->Get()->GetTxId()) , Acked(!delayed) - , ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, std::move(Ev->TraceId), "ProposeTransaction", NWilson::EFlags::AUTO_END) + , ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, TxSpan.GetTraceId(), "ProposeTransaction", NWilson::EFlags::AUTO_END) { ProposeTransactionSpan.Attribute("Shard", std::to_string(self->TabletID())); } diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 50e0b774e82d..6552f276d954 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -1871,8 +1871,8 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB bool WaitComplete = false; public: - TTxReadViaPipeline(TDataShard* ds, TEvDataShard::TEvRead::TPtr ev) - : TBase(ds) + TTxReadViaPipeline(TDataShard* ds, TEvDataShard::TEvRead::TPtr ev, NWilson::TTraceId &&traceId) + : TBase(ds, std::move(traceId)) , Ev(std::move(ev)) , ReadId(Ev->Sender, Ev->Get()->Record.GetReadId()) {} @@ -2235,8 +2235,8 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase bool DelayedResult = false; public: - TTxReadContinue(TDataShard* ds, TEvDataShard::TEvReadContinue::TPtr ev) - : TBase(ds) + TTxReadContinue(TDataShard* ds, TEvDataShard::TEvReadContinue::TPtr ev, NWilson::TTraceId &&traceId) + : TBase(ds, std::move(traceId)) , Ev(ev) {} @@ -2694,7 +2694,7 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size()); - Executor()->Execute(new TTxReadViaPipeline(this, ev), ctx, request->ReadSpan.GetTraceId()); + Executor()->Execute(new TTxReadViaPipeline(this, ev, request->ReadSpan.GetTraceId()), ctx); } void TDataShard::Handle(TEvDataShard::TEvReadContinue::TPtr& ev, const TActorContext& ctx) { @@ -2704,7 +2704,7 @@ void TDataShard::Handle(TEvDataShard::TEvReadContinue::TPtr& ev, const TActorCon return; } - Executor()->Execute(new TTxReadContinue(this, ev), ctx, it->second->Request->ReadSpan.GetTraceId()); + Executor()->Execute(new TTxReadContinue(this, ev, it->second->Request->ReadSpan.GetTraceId()), ctx); } void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/tx/datashard/datashard__write.cpp b/ydb/core/tx/datashard/datashard__write.cpp index ced4bfba4d8d..581ee461bdb5 100644 --- a/ydb/core/tx/datashard/datashard__write.cpp +++ b/ydb/core/tx/datashard/datashard__write.cpp @@ -10,13 +10,13 @@ LWTRACE_USING(DATASHARD_PROVIDER) namespace NKikimr::NDataShard { TDataShard::TTxWrite::TTxWrite(TDataShard* self, NEvents::TDataEvents::TEvWrite::TPtr ev, TInstant receivedAt, ui64 tieBreakerIndex, bool delayed) - : TBase(self) + : TBase(self, std::move(ev->TraceId)) , Ev(std::move(ev)) , ReceivedAt(receivedAt) , TieBreakerIndex(tieBreakerIndex) , TxId(Ev->Get()->GetTxId()) , Acked(!delayed) - , ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, std::move(Ev->TraceId), "ProposeTransaction", NWilson::EFlags::AUTO_END) + , ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, TxSpan.GetTraceId(), "ProposeTransaction", NWilson::EFlags::AUTO_END) { ProposeTransactionSpan.Attribute("Shard", std::to_string(self->TabletID())); } diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h index 7e46b97885bb..ae862657b5f5 100644 --- a/ydb/core/tx/datashard/datashard_txs.h +++ b/ydb/core/tx/datashard/datashard_txs.h @@ -67,7 +67,7 @@ class TDataShard::TTxPlanStep : public NTabletFlatExecutor::TTransactionBase { public: - explicit TTxProgressTransaction(TDataShard *self, TOperation::TPtr op = {}); + explicit TTxProgressTransaction(TDataShard *self, TOperation::TPtr op, NWilson::TTraceId &&traceId); bool Execute(TTransactionContext &txc, const TActorContext &ctx) override; void Complete(const TActorContext &ctx) override; TTxType GetTxType() const override { return TXTYPE_PROGRESS_START; }