Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIKIMR-20217 Initial datashard tracing #530

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions ydb/core/keyvalue/keyvalue_flat_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ class TKeyValueFlat : public TActor<TKeyValueFlat>, public NTabletFlatExecutor::
TKeyValueFlat *Self;
TVector<TLogoBlobID> TrashBeingCommitted;

TTxRequest(THolder<TIntermediate> intermediate, TKeyValueFlat *keyValueFlat)
: Intermediate(std::move(intermediate))
TTxRequest(THolder<TIntermediate> intermediate, TKeyValueFlat *keyValueFlat, NWilson::TTraceId &&traceId)
: NTabletFlatExecutor::ITransaction(std::move(traceId))
, Intermediate(std::move(intermediate))
, Self(keyValueFlat)
{
Intermediate->Response.SetStatus(NMsgBusProxy::MSTATUS_UNKNOWN);
Expand Down Expand Up @@ -390,7 +391,7 @@ class TKeyValueFlat : public TActor<TKeyValueFlat>, public NTabletFlatExecutor::

State.OnEvIntermediate(*(ev->Get()->Intermediate), ctx);
auto traceId = ev->Get()->Intermediate->Span.GetTraceId();
Execute(new TTxRequest(std::move(ev->Get()->Intermediate), this), ctx, std::move(traceId));
Execute(new TTxRequest(std::move(ev->Get()->Intermediate), this, std::move(traceId)), ctx);
}

void Handle(TEvKeyValue::TEvNotify::TPtr &ev, const TActorContext &ctx) {
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/runtime/kqp_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,11 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq

Counters->CreatedIterators->Inc();
ReadIdByTabletId[state->TabletId].push_back(id);

NWilson::TTraceId traceId; // TODO: get traceId from kqp.

Send(PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
IEventHandle::FlagTrackDelivery);
IEventHandle::FlagTrackDelivery, 0, std::move(traceId));

if (!FirstShardStarted) {
state->IsFirst = true;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tablet_flat/flat_exec_seat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ namespace NTabletFlatExecutor {
}
Self->Complete(ctx);

TxSpan.Attribute("rw", isRW);
TxSpan.EndOk();
Self->TxSpan.Attribute("rw", isRW);
Self->TxSpan.EndOk();
}

void TSeat::Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept {
Self->Terminate(reason, ctx);

TxSpan.EndError("Terminated");
Self->TxSpan.EndError("Terminated");
}

} // namespace NTabletFlatExecutor
Expand Down
17 changes: 9 additions & 8 deletions ydb/core/tablet_flat/flat_exec_seat.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ namespace NTabletFlatExecutor {

TSeat(const TSeat&) = delete;

TSeat(ui32 uniqId, TAutoPtr<ITransaction> self, NWilson::TTraceId txTraceId)
TSeat(ui32 uniqId, TAutoPtr<ITransaction> self)
: UniqID(uniqId)
, Self(self)
, TxSpan(NWilson::TSpan(TWilsonTablet::Tablet, std::move(txTraceId), "Tablet.Transaction"))
{

}

void Describe(IOutputStream &out) const noexcept
Expand All @@ -36,29 +34,32 @@ namespace NTabletFlatExecutor {

void Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept;

void CreateEnqueuedSpan() noexcept {
WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Enqueued");
NWilson::TSpan CreateExecutionSpan() noexcept {
return NWilson::TSpan(TWilsonTablet::Tablet, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Execute");
}

void StartEnqueuedSpan() noexcept {
WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Enqueued");
}

void FinishEnqueuedSpan() noexcept {
WaitingSpan.EndOk();
}

void CreatePendingSpan() noexcept {
WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Pending");
WaitingSpan = NWilson::TSpan(TWilsonTablet::Tablet, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Pending");
}

void FinishPendingSpan() noexcept {
WaitingSpan.EndOk();
}

NWilson::TTraceId GetTxTraceId() const noexcept {
return TxSpan.GetTraceId();
return Self->TxSpan.GetTraceId();
}

const ui64 UniqID = Max<ui64>();
const TAutoPtr<ITransaction> Self;
NWilson::TSpan TxSpan;
NWilson::TSpan WaitingSpan;
ui64 Retries = 0;
TPinned Pinned;
Expand Down
30 changes: 15 additions & 15 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void TExecutor::RecreatePageCollectionsCache() noexcept
auto &seat = xpair.second->Seat;
xpair.second->WaitingSpan.EndOk();
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
seat->CreateEnqueuedSpan();
seat->StartEnqueuedSpan();
ActivationQueue->Push(seat.Release());
ActivateTransactionWaiting++;
}
Expand Down Expand Up @@ -520,7 +520,7 @@ void TExecutor::PlanTransactionActivation() {
TAutoPtr<TSeat> seat = PendingQueue->Pop();
seat->FinishPendingSpan();
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
seat->CreateEnqueuedSpan();
seat->StartEnqueuedSpan();
ActivationQueue->Push(seat.Release());
ActivateTransactionWaiting++;
--Stats->TxPending;
Expand All @@ -541,7 +541,7 @@ void TExecutor::ActivateWaitingTransactions(TPrivatePageCache::TPage::TWaitQueue
it->second->WaitingSpan.EndOk();
auto &seat = it->second->Seat;
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
seat->CreateEnqueuedSpan();
seat->StartEnqueuedSpan();
ActivationQueue->Push(seat.Release());
ActivateTransactionWaiting++;
TransactionWaitPads.erase(waitPad);
Expand Down Expand Up @@ -1576,10 +1576,10 @@ bool TExecutor::CanExecuteTransaction() const {
return Stats->IsActive && (Stats->IsFollower || PendingPartSwitches.empty()) && !BrokenTransaction;
}

void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, const TActorContext &ctx, NWilson::TTraceId traceId) {
void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, const TActorContext &ctx) {
Y_ABORT_UNLESS(ActivationQueue, "attempt to execute transaction before activation");

TAutoPtr<TSeat> seat = new TSeat(++TransactionUniqCounter, self, std::move(traceId));
TAutoPtr<TSeat> seat = new TSeat(++TransactionUniqCounter, self);

LWTRACK(TransactionBegin, seat->Self->Orbit, seat->UniqID, Owner->TabletID(), TypeName(*seat->Self));

Expand Down Expand Up @@ -1624,7 +1624,7 @@ void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, cons

if (ActiveTransaction || ActivateTransactionWaiting || !allowImmediate) {
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
seat->CreateEnqueuedSpan();
seat->StartEnqueuedSpan();
ActivationQueue->Push(seat.Release());
ActivateTransactionWaiting++;
PlanTransactionActivation();
Expand All @@ -1634,12 +1634,12 @@ void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, cons
ExecuteTransaction(seat, ctx);
}

void TExecutor::Execute(TAutoPtr<ITransaction> self, const TActorContext &ctx, NWilson::TTraceId traceId) {
DoExecute(self, true, ctx, std::move(traceId));
void TExecutor::Execute(TAutoPtr<ITransaction> self, const TActorContext &ctx) {
DoExecute(self, true, ctx);
}

void TExecutor::Enqueue(TAutoPtr<ITransaction> self, const TActorContext &ctx, NWilson::TTraceId traceId) {
DoExecute(self, false, ctx, std::move(traceId));
void TExecutor::Enqueue(TAutoPtr<ITransaction> self, const TActorContext &ctx) {
DoExecute(self, false, ctx);
}

void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ctx) {
Expand All @@ -1653,14 +1653,14 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
PrivatePageCache->ResetTouchesAndToLoad(true);
TPageCollectionTxEnv env(*Database, *PrivatePageCache);

TTransactionContext txc(Owner->TabletID(), Generation(), Step(), *Database, env, seat->CurrentTxDataLimit, seat->TaskId);
TTransactionContext txc(*seat, Owner->TabletID(), Generation(), Step(), *Database, env, seat->CurrentTxDataLimit, seat->TaskId);
txc.NotEnoughMemory(seat->NotEnoughMemoryCount);

Database->Begin(Stamp(), env);

LWTRACK(TransactionExecuteBegin, seat->Self->Orbit, seat->UniqID);

NWilson::TSpan txExecuteSpan(TWilsonTablet::Tablet, seat->GetTxTraceId(), "Tablet.Transaction.Execute");
NWilson::TSpan txExecuteSpan = seat->CreateExecutionSpan();
const bool done = seat->Self->Execute(txc, ctx.MakeFor(OwnerActorId));
txExecuteSpan.EndOk();

Expand Down Expand Up @@ -1857,7 +1857,7 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv &
// then tx may be re-activated.
if (!PrivatePageCache->GetStats().CurrentCacheMisses) {
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
seat->CreateEnqueuedSpan();
seat->StartEnqueuedSpan();
ActivationQueue->Push(seat.Release());
ActivateTransactionWaiting++;
PlanTransactionActivation();
Expand Down Expand Up @@ -2945,7 +2945,7 @@ void TExecutor::StartSeat(ui64 task, TResource *cookie_) noexcept
PostponedTransactions.erase(it);
Memory->AcquiredMemory(*seat, task);
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
seat->CreateEnqueuedSpan();
seat->StartEnqueuedSpan();
ActivationQueue->Push(seat.Release());
ActivateTransactionWaiting++;
PlanTransactionActivation();
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tablet_flat/flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -629,9 +629,9 @@ class TExecutor
void Boot(TEvTablet::TEvBoot::TPtr &ev, const TActorContext &ctx) override;
void Restored(TEvTablet::TEvRestored::TPtr &ev, const TActorContext &ctx) override;
void DetachTablet(const TActorContext &ctx) override;
void DoExecute(TAutoPtr<ITransaction> transaction, bool allowImmediate, const TActorContext &ctx, NWilson::TTraceId traceId);
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) override;
void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) override;
void DoExecute(TAutoPtr<ITransaction> transaction, bool allowImmediate, const TActorContext &ctx);
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override;
void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override;

TLeaseCommit* AttachLeaseCommit(TLogCommit* commit, bool force = false);
TLeaseCommit* EnsureReadOnlyLease(TMonotonic at);
Expand Down
24 changes: 20 additions & 4 deletions ydb/core/tablet_flat/flat_executor_txloglogic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,29 @@ TLogicRedo::TCommitRWTransactionResult TLogicRedo::CommitRWTransaction(
if (!Batch->Commit) {
Batch->Commit = CommitManager->Begin(false, ECommit::Redo, seat->GetTxTraceId());
} else {
const TAutoPtr<ITransaction> &tx = seat->Self;
// Batch commit's TraceId will be used for all blobstorage requests of the batch.
if (!Batch->Commit->TraceId && tx->TxSpan) {
// It is possible that the original or consequent transactions didn't have a TraceId,
// but if a new transaction of a batch has TraceId, use it for the whole batch
// (and consequent traced transactions).
Batch->Commit->TraceId = seat->GetTxTraceId();
} else {
tx->TxSpan.Link(Batch->Commit->TraceId, {});
}

i64 batchSize = Batch->Bodies.size() + 1;

Batch->Commit->FirstTx->TxSpan.Attribute("BatchSize", batchSize);
for (TSeat* curSeat = Batch->Commit->FirstTx; curSeat != nullptr; curSeat = curSeat->NextCommitTx) {
// Update batch size of the transaction, whose TraceId the commit uses (first transaction in batch, that has TraceId).
if (curSeat->Self->TxSpan) {
curSeat->Self->TxSpan.Attribute("BatchSize", batchSize);
break;
}
}

seat->TxSpan
.Attribute("Batched", true)
.Link(Batch->Commit->FirstTx->GetTxTraceId());
tx->TxSpan.Attribute("Batched", true)
.Attribute("BatchSize", batchSize);
}

Batch->Commit->PushTx(seat.Get());
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tablet_flat/tablet_flat_executed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ IExecutor* TTabletExecutedFlat::CreateExecutor(const TActorContext &ctx) {
return Executor();
}

void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId) {
void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) {
Y_UNUSED(ctx);
Execute(transaction, std::move(traceId));
Execute(transaction);
}

void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction, NWilson::TTraceId traceId) {
void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction) {
if (transaction)
static_cast<TExecutor*>(Executor())->Execute(transaction, ExecutorCtx(*TlsActivationContext), std::move(traceId));
static_cast<TExecutor*>(Executor())->Execute(transaction, ExecutorCtx(*TlsActivationContext));
}

void TTabletExecutedFlat::EnqueueExecute(TAutoPtr<ITransaction> transaction) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tablet_flat/tablet_flat_executed.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class TTabletExecutedFlat : public NFlatExecutorSetup::ITablet {
IExecutor* Executor() const { return Executor0; }
const TInstant StartTime() const { return StartTime0; }

void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {});
void Execute(TAutoPtr<ITransaction> transaction, NWilson::TTraceId traceId = {});
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx);
void Execute(TAutoPtr<ITransaction> transaction);
void EnqueueExecute(TAutoPtr<ITransaction> transaction);

const NTable::TScheme& Scheme() const noexcept;
Expand Down
28 changes: 25 additions & 3 deletions ydb/core/tablet_flat/tablet_flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/core/base/tablet.h>
#include <ydb/core/base/blobstorage.h>
#include <ydb/library/actors/wilson/wilson_span.h>
#include <ydb/library/wilson_ids/wilson.h>
#include <library/cpp/lwtrace/shuttle.h>
#include <util/generic/maybe.h>
#include <util/system/type_name.h>
Expand All @@ -25,6 +26,7 @@ namespace NTabletFlatExecutor {
class TTransactionContext;
class TExecutor;
struct TPageCollectionTxEnv;
struct TSeat;

class TTableSnapshotContext : public TThrRefBase, TNonCopyable {
friend class TExecutor;
Expand Down Expand Up @@ -200,9 +202,10 @@ class TTransactionContext : public TTxMemoryProviderBase {
friend class TExecutor;

public:
TTransactionContext(ui64 tablet, ui32 gen, ui32 step, NTable::TDatabase &db, IExecuting &env,
TTransactionContext(TSeat &seat, ui64 tablet, ui32 gen, ui32 step, NTable::TDatabase &db, IExecuting &env,
ui64 memoryLimit, ui64 taskId)
: TTxMemoryProviderBase(memoryLimit, taskId)
, Seat(seat)
, Tablet(tablet)
, Generation(gen)
, Step(step)
Expand All @@ -226,6 +229,7 @@ class TTransactionContext : public TTxMemoryProviderBase {
}

public:
TSeat& Seat;
const ui64 Tablet = Max<ui32>();
const ui32 Generation = Max<ui32>();
const ui32 Step = Max<ui32>();
Expand Down Expand Up @@ -275,6 +279,12 @@ class ITransaction : TNonCopyable {
: Orbit(std::move(orbit))
{ }

ITransaction(NWilson::TTraceId &&traceId)
: TxSpan(NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "Tablet.Transaction"))
{
TxSpan.Attribute("Type", TypeName(*this));
}

virtual ~ITransaction() = default;
/// @return true if execution complete and transaction is ready for commit
virtual bool Execute(TTransactionContext &txc, const TActorContext &ctx) = 0;
Expand All @@ -290,8 +300,15 @@ class ITransaction : TNonCopyable {
out << TypeName(*this);
}

void SetupTxSpan(NWilson::TTraceId traceId) noexcept {
TxSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "Tablet.Transaction");
TxSpan.Attribute("Type", TypeName(*this));
}

public:
NLWTrace::TOrbit Orbit;

NWilson::TSpan TxSpan;
};

template<typename T>
Expand All @@ -310,6 +327,11 @@ class TTransactionBase : public ITransaction {
: ITransaction(std::move(orbit))
, Self(self)
{ }

TTransactionBase(T *self, NWilson::TTraceId &&traceId)
: ITransaction(std::move(traceId))
, Self(self)
{ }
};

struct TExecutorStats {
Expand Down Expand Up @@ -515,8 +537,8 @@ namespace NFlatExecutorSetup {
// all followers had completed log with requested gc-barrier
virtual void FollowerGcApplied(ui32 step, TDuration followerSyncDelay) = 0;

virtual void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) = 0;
virtual void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) = 0;
virtual void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) = 0;
virtual void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) = 0;

virtual void ConfirmReadOnlyLease(TMonotonic at) = 0;
virtual void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) = 0;
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2821,6 +2821,7 @@ void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr &&
Execute(new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx);
}
}

void TDataShard::ProposeTransaction(NEvents::TDataEvents::TEvWrite::TPtr&& ev, const TActorContext& ctx) {
auto* msg = ev->Get();
const auto& record = msg->Record;
Expand Down Expand Up @@ -4116,12 +4117,13 @@ bool TDataShard::ReassignChannelsEnabled() const {
}

void TDataShard::ExecuteProgressTx(const TActorContext& ctx) {
Execute(new TTxProgressTransaction(this), ctx);
Execute(new TTxProgressTransaction(this, {}, {}), ctx);
}

void TDataShard::ExecuteProgressTx(TOperation::TPtr op, const TActorContext& ctx) {
Y_ABORT_UNLESS(op->IsInProgress());
Execute(new TTxProgressTransaction(this, std::move(op)), ctx);
NWilson::TTraceId traceId = op->GetTraceId();
Execute(new TTxProgressTransaction(this, std::move(op), std::move(traceId)), ctx);
}

TDuration TDataShard::CleanupTimeout() const {
Expand Down
Loading
Loading