Skip to content

Commit

Permalink
KIKIMR-20217 Initial datashard tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
SammyVimes committed Dec 19, 2023
1 parent 377bc60 commit 6d5c9c8
Show file tree
Hide file tree
Showing 20 changed files with 186 additions and 94 deletions.
5 changes: 4 additions & 1 deletion ydb/core/kqp/runtime/kqp_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,11 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq

Counters->CreatedIterators->Inc();
ReadIdByTabletId[state->TabletId].push_back(id);

NWilson::TTraceId traceId; // TODO: get traceId from kqp.

Send(PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
IEventHandle::FlagTrackDelivery);
IEventHandle::FlagTrackDelivery, 0, std::move(traceId));

if (!FirstShardStarted) {
state->IsFirst = true;
Expand Down
16 changes: 13 additions & 3 deletions ydb/core/tablet_flat/flat_exec_seat.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ namespace NTabletFlatExecutor {
TSeat(ui32 uniqId, TAutoPtr<ITransaction> self, NWilson::TTraceId txTraceId)
: UniqID(uniqId)
, Self(self)
, TxSpan(NWilson::TSpan(TWilsonTablet::Tablet, std::move(txTraceId), "Tablet.Transaction"))
{

if (txTraceId) {
SetupTxSpan(std::move(txTraceId));
}
}

void Describe(IOutputStream &out) const noexcept
Expand All @@ -36,7 +37,16 @@ namespace NTabletFlatExecutor {

void Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept;

void CreateEnqueuedSpan() noexcept {
void SetupTxSpan(NWilson::TTraceId txTraceId) noexcept {
TxSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(txTraceId), "Tablet.Transaction");
TxSpan.Attribute("Type", TypeName(*Self));
}

NWilson::TSpan CreateExecutionSpan() noexcept {
return NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Execute");
}

void StartEnqueuedSpan() noexcept {
WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Enqueued");
}

Expand Down
18 changes: 9 additions & 9 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void TExecutor::RecreatePageCollectionsCache() noexcept
auto &seat = xpair.second->Seat;
xpair.second->WaitingSpan.EndOk();
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
seat->CreateEnqueuedSpan();
seat->StartEnqueuedSpan();
ActivationQueue->Push(seat.Release());
ActivateTransactionWaiting++;
}
Expand Down Expand Up @@ -520,7 +520,7 @@ void TExecutor::PlanTransactionActivation() {
TAutoPtr<TSeat> seat = PendingQueue->Pop();
seat->FinishPendingSpan();
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
seat->CreateEnqueuedSpan();
seat->StartEnqueuedSpan();
ActivationQueue->Push(seat.Release());
ActivateTransactionWaiting++;
--Stats->TxPending;
Expand All @@ -541,7 +541,7 @@ void TExecutor::ActivateWaitingTransactions(TPrivatePageCache::TPage::TWaitQueue
it->second->WaitingSpan.EndOk();
auto &seat = it->second->Seat;
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
seat->CreateEnqueuedSpan();
seat->StartEnqueuedSpan();
ActivationQueue->Push(seat.Release());
ActivateTransactionWaiting++;
TransactionWaitPads.erase(waitPad);
Expand Down Expand Up @@ -1624,7 +1624,7 @@ void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, cons

if (ActiveTransaction || ActivateTransactionWaiting || !allowImmediate) {
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
seat->CreateEnqueuedSpan();
seat->StartEnqueuedSpan();
ActivationQueue->Push(seat.Release());
ActivateTransactionWaiting++;
PlanTransactionActivation();
Expand Down Expand Up @@ -1653,14 +1653,14 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
PrivatePageCache->ResetTouchesAndToLoad(true);
TPageCollectionTxEnv env(*Database, *PrivatePageCache);

TTransactionContext txc(Owner->TabletID(), Generation(), Step(), *Database, env, seat->CurrentTxDataLimit, seat->TaskId);
TTransactionContext txc(*seat, Owner->TabletID(), Generation(), Step(), *Database, env, seat->CurrentTxDataLimit, seat->TaskId);
txc.NotEnoughMemory(seat->NotEnoughMemoryCount);

Database->Begin(Stamp(), env);

LWTRACK(TransactionExecuteBegin, seat->Self->Orbit, seat->UniqID);

NWilson::TSpan txExecuteSpan(TWilsonTablet::Tablet, seat->GetTxTraceId(), "Tablet.Transaction.Execute");
NWilson::TSpan txExecuteSpan = seat->CreateExecutionSpan();
const bool done = seat->Self->Execute(txc, ctx.MakeFor(OwnerActorId));
txExecuteSpan.EndOk();

Expand Down Expand Up @@ -1857,7 +1857,7 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv &
// then tx may be re-activated.
if (!PrivatePageCache->GetStats().CurrentCacheMisses) {
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
seat->CreateEnqueuedSpan();
seat->StartEnqueuedSpan();
ActivationQueue->Push(seat.Release());
ActivateTransactionWaiting++;
PlanTransactionActivation();
Expand Down Expand Up @@ -2945,7 +2945,7 @@ void TExecutor::StartSeat(ui64 task, TResource *cookie_) noexcept
PostponedTransactions.erase(it);
Memory->AcquiredMemory(*seat, task);
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
seat->CreateEnqueuedSpan();
seat->StartEnqueuedSpan();
ActivationQueue->Push(seat.Release());
ActivateTransactionWaiting++;
PlanTransactionActivation();
Expand Down
20 changes: 17 additions & 3 deletions ydb/core/tablet_flat/flat_executor_txloglogic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,26 @@ TLogicRedo::TCommitRWTransactionResult TLogicRedo::CommitRWTransaction(
if (!Batch->Commit) {
Batch->Commit = CommitManager->Begin(false, ECommit::Redo, seat->GetTxTraceId());
} else {
i64 batchSize = Batch->Bodies.size() + 1;
// Batch commit's TraceId will be used for all blobstorage requests of the batch.
if (!Batch->Commit->TraceId && seat->TxSpan) {
// It is possible that the original or consequent transactions didn't have a TraceId,
// but if a new transaction of a batch has TraceId, use it for the whole batch
// (and consequent traced transactions).
Batch->Commit->TraceId = seat->GetTxTraceId();
} else {
seat->TxSpan.Link(Batch->Commit->TraceId, {});
}

Batch->Commit->FirstTx->TxSpan.Attribute("BatchSize", batchSize);
for (TSeat* tx = Batch->Commit->FirstTx; tx != nullptr; tx = tx->NextCommitTx) {
// Update batch size of the transaction, whose TraceId the commit uses (first transaction in batch, that has TraceId).
if (tx->TxSpan) {
i64 batchSize = Batch->Bodies.size() + 1;
tx->TxSpan.Attribute("BatchSize", batchSize);
break;
}
}

seat->TxSpan.Attribute("Batched", true);
seat->TxSpan.Link(Batch->Commit->FirstTx->GetTxTraceId(), {});
}

Batch->Commit->PushTx(seat.Get());
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tablet_flat/tablet_flat_executed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction, NWilson::T
static_cast<TExecutor*>(Executor())->Execute(transaction, ExecutorCtx(*TlsActivationContext), std::move(traceId));
}

void TTabletExecutedFlat::EnqueueExecute(TAutoPtr<ITransaction> transaction) {
void TTabletExecutedFlat::EnqueueExecute(TAutoPtr<ITransaction> transaction, NWilson::TTraceId traceId) {
if (transaction)
static_cast<TExecutor*>(Executor())->Enqueue(transaction, ExecutorCtx(*TlsActivationContext));
static_cast<TExecutor*>(Executor())->Enqueue(transaction, ExecutorCtx(*TlsActivationContext), std::move(traceId));
}

const NTable::TScheme& TTabletExecutedFlat::Scheme() const noexcept {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/tablet_flat_executed.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TTabletExecutedFlat : public NFlatExecutorSetup::ITablet {

void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {});
void Execute(TAutoPtr<ITransaction> transaction, NWilson::TTraceId traceId = {});
void EnqueueExecute(TAutoPtr<ITransaction> transaction);
void EnqueueExecute(TAutoPtr<ITransaction> transaction, NWilson::TTraceId traceId = {});

const NTable::TScheme& Scheme() const noexcept;

Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tablet_flat/tablet_flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace NTabletFlatExecutor {
class TTransactionContext;
class TExecutor;
struct TPageCollectionTxEnv;
struct TSeat;

class TTableSnapshotContext : public TThrRefBase, TNonCopyable {
friend class TExecutor;
Expand Down Expand Up @@ -200,9 +201,10 @@ class TTransactionContext : public TTxMemoryProviderBase {
friend class TExecutor;

public:
TTransactionContext(ui64 tablet, ui32 gen, ui32 step, NTable::TDatabase &db, IExecuting &env,
TTransactionContext(TSeat &seat, ui64 tablet, ui32 gen, ui32 step, NTable::TDatabase &db, IExecuting &env,
ui64 memoryLimit, ui64 taskId)
: TTxMemoryProviderBase(memoryLimit, taskId)
, Seat(seat)
, Tablet(tablet)
, Generation(gen)
, Step(step)
Expand All @@ -226,6 +228,7 @@ class TTransactionContext : public TTxMemoryProviderBase {
}

public:
TSeat& Seat;
const ui64 Tablet = Max<ui32>();
const ui32 Generation = Max<ui32>();
const ui32 Step = Max<ui32>();
Expand Down
18 changes: 12 additions & 6 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2818,9 +2818,11 @@ void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr &&
UpdateProposeQueueSize();
} else {
// Prepare planned transactions as soon as possible
Execute(new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx);
TTxProposeTransactionBase *tx = new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false);
Execute(tx, ctx, tx->GetTraceId());
}
}

void TDataShard::ProposeTransaction(NEvents::TDataEvents::TEvWrite::TPtr&& ev, const TActorContext& ctx) {
auto* msg = ev->Get();
const auto& record = msg->Record;
Expand All @@ -2835,7 +2837,8 @@ void TDataShard::ProposeTransaction(NEvents::TDataEvents::TEvWrite::TPtr&& ev, c
UpdateProposeQueueSize();
} else {
// Prepare planned transactions as soon as possible
Execute(new TTxWrite(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx);
TTxWrite *tx = new TTxWrite(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false);
Execute(tx, ctx, tx->GetTraceId());
}
}

Expand Down Expand Up @@ -2900,12 +2903,14 @@ void TDataShard::Handle(TEvPrivate::TEvDelayedProposeTransaction::TPtr &ev, cons
switch (item.Event->GetTypeRewrite()) {
case TEvDataShard::TEvProposeTransaction::EventType: {
auto event = IEventHandle::Downcast<TEvDataShard::TEvProposeTransaction>(std::move(item.Event));
Execute(new TTxProposeTransactionBase(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
TTxProposeTransactionBase *tx = new TTxProposeTransactionBase(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true);
Execute(tx, ctx, tx->GetTraceId());
return;
}
case NEvents::TDataEvents::TEvWrite::EventType: {
auto event = IEventHandle::Downcast<NEvents::TDataEvents::TEvWrite>(std::move(item.Event));
Execute(new TTxWrite(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
TTxWrite *tx = new TTxWrite(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true);
Execute(tx, ctx, tx->GetTraceId());
return;
}
default:
Expand Down Expand Up @@ -4116,12 +4121,13 @@ bool TDataShard::ReassignChannelsEnabled() const {
}

void TDataShard::ExecuteProgressTx(const TActorContext& ctx) {
Execute(new TTxProgressTransaction(this), ctx);
Execute(new TTxProgressTransaction(this, {}), ctx);
}

void TDataShard::ExecuteProgressTx(TOperation::TPtr op, const TActorContext& ctx) {
Y_ABORT_UNLESS(op->IsInProgress());
Execute(new TTxProgressTransaction(this, std::move(op)), ctx);
NWilson::TTraceId traceId = op->GetTraceId();
Execute(new TTxProgressTransaction(this, std::move(op)), ctx, std::move(traceId));
}

TDuration TDataShard::CleanupTimeout() const {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/datashard/datashard.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <ydb/core/scheme/scheme_type_registry.h>
#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/tablet_flat/flat_row_versions.h>
#include <ydb/library/actors/wilson/wilson_span.h>

#include <library/cpp/lwtrace/shuttle.h>
#include <library/cpp/time_provider/time_provider.h>
Expand Down Expand Up @@ -952,6 +953,9 @@ struct TEvDataShard {

// Orbit used for tracking request events
NLWTrace::TOrbit Orbit;

// Wilson span for this request.
NWilson::TSpan ReadSpan;
};

struct TEvReadResult : public TEventPB<TEvReadResult,
Expand Down
17 changes: 16 additions & 1 deletion ydb/core/tx/datashard/datashard__progress_tx.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "datashard_txs.h"
#include "datashard_failpoints.h"

#include <ydb/core/tablet_flat/flat_exec_seat.h>

namespace NKikimr {
namespace NDataShard {

Expand All @@ -23,7 +25,7 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const
return true;
}

NIceDb::TNiceDb db(txc.DB);
NWilson::TSpan auxExecuteSpan;

if (!ActiveOp) {
const bool expireSnapshotsAllowed = (
Expand All @@ -44,6 +46,7 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const
Self->Pipeline.ActivateWaitingTxOps(ctx);

ActiveOp = Self->Pipeline.GetNextActiveOp(false);

if (!ActiveOp) {
Self->IncCounter(COUNTER_TX_PROGRESS_IDLE);
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
Expand All @@ -56,6 +59,16 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const
<< ActiveOp->GetKind() << " " << *ActiveOp << " (unit "
<< ActiveOp->GetCurrentUnit() << ") at " << Self->TabletID());
ActiveOp->IncrementInProgress();

if (ActiveOp->OperationSpan) {
if (!txc.Seat.TxSpan) {
// If Progress Tx for this operation is being executed the first time,
// it won't have a span, because we choose what operation to run in the transaction itself.
// We create transaction span and transaction execution spans here instead.
txc.Seat.SetupTxSpan(ActiveOp->GetTraceId());
auxExecuteSpan = txc.Seat.CreateExecutionSpan();
}
}
}

Y_ABORT_UNLESS(ActiveOp && ActiveOp->IsInProgress());
Expand All @@ -68,6 +81,7 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const
case EExecutionStatus::Restart:
// Restart even if current CompleteList is not empty
// It will be extended in subsequent iterations
auxExecuteSpan.EndOk();
return false;

case EExecutionStatus::Reschedule:
Expand Down Expand Up @@ -103,6 +117,7 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const
}

// Commit all side effects
auxExecuteSpan.EndOk();
return true;
} catch (...) {
Y_ABORT("there must be no leaked exceptions");
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/datashard/datashard__propose_tx_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ TDataShard::TTxProposeTransactionBase::TTxProposeTransactionBase(TDataShard *sel
, Acked(!delayed)
, ProposeTransactionSpan(TWilsonKqp::ProposeTransaction, std::move(Ev->TraceId), "ProposeTransaction", NWilson::EFlags::AUTO_END)
{
ProposeTransactionSpan.Attribute("Shard", std::to_string(self->TabletID()));
}

bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransactionContext &txc,
Expand Down Expand Up @@ -76,7 +77,7 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa
return true;
}

TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx);
TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx, ProposeTransactionSpan.GetTraceId());

// Unsuccessful operation parse.
if (op->IsAborted()) {
Expand Down
Loading

0 comments on commit 6d5c9c8

Please sign in to comment.