Skip to content

Commit

Permalink
Sink metrics & trace (#10397)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Oct 14, 2024
1 parent c73f760 commit 881e575
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 5 deletions.
12 changes: 12 additions & 0 deletions ydb/core/kqp/counters/kqp_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,18 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
DataShardIteratorMessages = KqpGroup->GetCounter("IteratorReads/DatashardMessages", true);
IteratorDeliveryProblems = KqpGroup->GetCounter("IteratorReads/DeliveryProblems", true);

/* sink writes */
WriteActorsShardResolve = KqpGroup->GetCounter("SinkWrites/WriteActorShardResolve", true);
WriteActorsCount = KqpGroup->GetCounter("SinkWrites/WriteActorsCount", false);
WriteActorImmediateWrites = KqpGroup->GetCounter("SinkWrites/WriteActorImmediateWrites", true);
WriteActorImmediateWritesRetries = KqpGroup->GetCounter("SinkWrites/WriteActorImmediateWritesRetries", true);
WriteActorWritesSizeHistogram =
KqpGroup->GetHistogram("SinkWrites/WriteActorWritesSize", NMonitoring::ExponentialHistogram(28, 2, 1));
WriteActorWritesOperationsHistogram =
KqpGroup->GetHistogram("SinkWrites/WriteActorWritesOperations", NMonitoring::ExponentialHistogram(20, 2, 1));
WriteActorWritesLatencyHistogram =
KqpGroup->GetHistogram("SinkWrites/WriteActorWritesLatencyMs", NMonitoring::ExponentialHistogram(20, 2, 1));

/* sequencers */

SequencerActorsCount = KqpGroup->GetCounter("Sequencer/ActorCount", false);
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/kqp/counters/kqp_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,15 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages;
::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems;

// Sink write counters
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorsShardResolve;
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorsCount;
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorImmediateWrites;
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorImmediateWritesRetries;
NMonitoring::THistogramPtr WriteActorWritesSizeHistogram;
NMonitoring::THistogramPtr WriteActorWritesOperationsHistogram;
NMonitoring::THistogramPtr WriteActorWritesLatencyHistogram;

// Scheduler signals
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerThrottled;
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerCapacity;
Expand Down
45 changes: 41 additions & 4 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <ydb/core/tx/tx.h>
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/wilson_ids/wilson.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>

Expand Down Expand Up @@ -134,10 +135,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
, InconsistentTx(
Settings.GetInconsistentTx())
, MemoryLimit(MessageSettings.InFlightMemoryLimitPerActorBytes)
, WriteActorSpan(TWilsonKqp::WriteActor, NWilson::TTraceId(args.TraceId), "WriteActor")
{
YQL_ENSURE(std::holds_alternative<ui64>(TxId));
YQL_ENSURE(!ImmediateTx);
EgressStats.Level = args.StatsLevel;

Counters->WriteActorsCount->Inc();
}

void Bootstrap() {
Expand Down Expand Up @@ -244,6 +248,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
}

void ResolveTable() {
Counters->WriteActorsShardResolve->Inc();
SchemeEntry.reset();
SchemeRequest.reset();

Expand All @@ -267,8 +272,11 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
entry.ShowPrivatePath = true;
request->ResultSet.emplace_back(entry);

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request));
WriteActorStateSpan = NWilson::TSpan(TWilsonKqp::WriteActorTableNavigate, WriteActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}), 0, 0, WriteActorSpan.GetTraceId());
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request), 0, 0, WriteActorSpan.GetTraceId());
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
Expand Down Expand Up @@ -327,7 +335,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
request->ResultSet.emplace_back(std::move(keyRange));

TAutoPtr<TEvTxProxySchemeCache::TEvResolveKeySet> resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request));
Send(MakeSchemeCacheID(), resolveReq.Release(), 0, 0);
Send(MakeSchemeCacheID(), resolveReq.Release(), 0, 0, WriteActorSpan.GetTraceId());
}

void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
Expand Down Expand Up @@ -368,6 +376,8 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
}()
<< ", Cookie=" << ev->Cookie);



switch (ev->Get()->GetStatus()) {
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: {
CA_LOG_E("Got UNSPECIFIED for table `"
Expand Down Expand Up @@ -557,6 +567,11 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
EgressStats.Chunks++;
EgressStats.Splits++;
EgressStats.Resume();

if (auto it = SendTime.find(shardId); it != std::end(SendTime)) {
Counters->WriteActorWritesLatencyHistogram->Collect((TInstant::Now() - it->second).MilliSeconds());
SendTime.erase(it);
}
}
resumeNotificator.CheckMemory();
}
Expand Down Expand Up @@ -594,7 +609,6 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
NYql::NDqProto::StatusIds::UNAVAILABLE);
return;
}

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(
NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);

Expand Down Expand Up @@ -628,6 +642,16 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
ShardedWriteController->GetDataFormat());
}

if (metadata->SendAttempts == 0) {
Counters->WriteActorImmediateWrites->Inc();
Counters->WriteActorWritesSizeHistogram->Collect(serializationResult.TotalDataSize);
Counters->WriteActorWritesOperationsHistogram->Collect(metadata->OperationsCount);

SendTime[shardId] = TInstant::Now();
} else {
Counters->WriteActorImmediateWritesRetries->Inc();
}

CA_LOG_D("Send EvWrite to ShardID=" << shardId << ", TxId=" << evWrite->Record.GetTxId()
<< ", TxMode=" << evWrite->Record.GetTxMode()
<< ", LockTxId=" << evWrite->Record.GetLockTxId() << ", LockNodeId=" << evWrite->Record.GetLockNodeId()
Expand Down Expand Up @@ -723,6 +747,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
NYql::TIssues issues;
issues.AddIssue(std::move(issue));

if (WriteActorStateSpan) {
WriteActorStateSpan.EndError(issues.ToOneLineString());
}
if (WriteActorSpan) {
WriteActorSpan.EndError(issues.ToOneLineString());
}

Callbacks->OnAsyncOutputError(OutputIndex, std::move(issues), statusCode);
}

Expand All @@ -732,6 +763,8 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
}

void Prepare() {
WriteActorStateSpan.EndOk();

YQL_ENSURE(SchemeEntry);
ResolveAttempts = 0;

Expand Down Expand Up @@ -803,12 +836,16 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
std::optional<NSchemeCache::TSchemeCacheRequest::TEntry> SchemeRequest;
ui64 ResolveAttempts = 0;

THashMap<ui64, TInstant> SendTime;
THashMap<ui64, TLockInfo> LocksInfo;
bool Finished = false;

const i64 MemoryLimit;

IShardedWriteControllerPtr ShardedWriteController = nullptr;

NWilson::TSpan WriteActorSpan;
NWilson::TSpan WriteActorStateSpan;
};

void RegisterKqpWriteActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<TKqpCounters> counters) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/wilson_ids/wilson.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ namespace NKikimr {
LookupActor = TComponentTracingLevels::TQueryProcessor::Basic,
LookupActorShardsResolve = TComponentTracingLevels::TQueryProcessor::Detailed,

WriteActor = TComponentTracingLevels::TQueryProcessor::Basic,
WriteActorTableNavigate = TComponentTracingLevels::TQueryProcessor::Detailed,

BulkUpsertActor = TComponentTracingLevels::TQueryProcessor::TopLevel,
};
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ struct IDqAsyncIoFactory : public TThrRefBase {
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
IRandomProvider *const RandomProvider;
NWilson::TTraceId TraceId;
};

struct TInputTransformArguments {
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
.Alloc = Alloc,
.RandomProvider = randomProvider
.RandomProvider = randomProvider,
.TraceId = ComputeActorSpan.GetTraceId(),
});
} catch (const std::exception& ex) {
throw yexception() << "Failed to create sink " << outputDesc.GetSink().GetType() << ": " << ex.what();
Expand Down

0 comments on commit 881e575

Please sign in to comment.