Skip to content

Commit

Permalink
trace(kqp): add tracing ro read actors
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Jan 4, 2024
1 parent 8579425 commit 5650a13
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 41 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool op
limits.MemoryQuotaManager = std::make_shared<NYql::NDq::TGuaranteeQuotaManager>(limit * 2, limit);

auto computeActor = NKikimr::NKqp::CreateKqpComputeActor(ExecuterId, TxId, taskDesc, AsyncIoFactory,
AppData()->FunctionRegistry, settings, limits, NWilson::TTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr());
AppData()->FunctionRegistry, settings, limits, ExecuterSpan.GetTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr());

if (optimizeProtoForLocalExecution) {
TVector<google::protobuf::Message*>& taskSourceSettings = static_cast<TKqpComputeActor*>(computeActor)->MutableTaskSourceSettings();
Expand Down
23 changes: 20 additions & 3 deletions ydb/core/kqp/runtime/kqp_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <library/cpp/threading/hot_swap/hot_swap.h>
#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/wilson_ids/wilson.h>

#include <util/generic/intrlist.h>

Expand Down Expand Up @@ -399,6 +400,7 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
, Counters(counters)
, UseFollowers(false)
, PipeCacheId(MainPipeCacheId)
, ReadActorSpan(TWilsonKqp::ReadActor, NWilson::TTraceId(args.TraceId), "ReadActor")
{
Y_ABORT_UNLESS(Arena);
Y_ABORT_UNLESS(settings->GetArena() == Arena->Get());
Expand Down Expand Up @@ -569,6 +571,9 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
ResolveShards[ResolveShardId] = state;
ResolveShardId += 1;

ReadActorStateSpan = NWilson::TSpan(TWilsonKqp::ReadActorShardsResolve, ReadActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
}
Expand Down Expand Up @@ -617,9 +622,13 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
}
}

ReadActorStateSpan.EndError(error);

return RuntimeError(error, statusCode);
}

ReadActorStateSpan.EndOk();

auto keyDesc = std::move(request->ResultSet[0].KeyDescription);

if (keyDesc->GetPartitions().size() == 1) {
Expand Down Expand Up @@ -896,10 +905,8 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
Counters->CreatedIterators->Inc();
ReadIdByTabletId[state->TabletId].push_back(id);

NWilson::TTraceId traceId; // TODO: get traceId from kqp.

Send(PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
IEventHandle::FlagTrackDelivery, 0, std::move(traceId));
IEventHandle::FlagTrackDelivery, 0, ReadActorSpan.GetTraceId());

if (!FirstShardStarted) {
state->IsFirst = true;
Expand Down Expand Up @@ -1385,6 +1392,8 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
}
}
TBase::PassAway();

ReadActorSpan.End();
}

void RuntimeError(const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues = {}) {
Expand All @@ -1395,6 +1404,11 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq

NYql::TIssues issues;
issues.AddIssue(std::move(issue));

if (ReadActorSpan) {
ReadActorSpan.EndError(issues.ToOneLineString());
}

Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode));
}

Expand Down Expand Up @@ -1491,6 +1505,9 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
size_t TotalRetries = 0;

bool FirstShardStarted = false;

NWilson::TSpan ReadActorSpan;
NWilson::TSpan ReadActorStateSpan;
};


Expand Down
71 changes: 43 additions & 28 deletions ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
#include "kqp_stream_lookup_actor.h"

#include <ydb/library/actors/core/actor_bootstrapped.h>

#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/engine/minikql/minikql_engine_host.h>
#include <ydb/core/kqp/common/kqp_resolve.h>
#include <ydb/core/kqp/common/kqp_event_ids.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
#include <ydb/core/kqp/runtime/kqp_stream_lookup_worker.h>
#include <ydb/core/protos/kqp_stats.pb.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/kqp/common/kqp_event_ids.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
#include <ydb/core/kqp/runtime/kqp_stream_lookup_worker.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
#include <ydb/library/wilson_ids/wilson.h>

namespace NKikimr {
namespace NKqp {
Expand All @@ -25,24 +26,22 @@ static constexpr ui64 MAX_SHARD_RETRIES = 10;

class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLookupActor>, public NYql::NDq::IDqComputeActorAsyncInput {
public:
TKqpStreamLookupActor(ui64 inputIndex, NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input,
const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv,
const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc,
const NYql::NDqProto::TTaskInput& inputDesc, NKikimrKqp::TKqpStreamLookupSettings&& settings,
TKqpStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args, NKikimrKqp::TKqpStreamLookupSettings&& settings,
TIntrusivePtr<TKqpCounters> counters)
: LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << inputIndex << ", CA Id " << computeActorId)
, InputIndex(inputIndex)
, Input(input)
, ComputeActorId(computeActorId)
, TypeEnv(typeEnv)
, Alloc(alloc)
: LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << args.InputIndex << ", CA Id " << args.ComputeActorId)
, InputIndex(args.InputIndex)
, Input(args.TransformInput)
, ComputeActorId(args.ComputeActorId)
, TypeEnv(args.TypeEnv)
, Alloc(args.Alloc)
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc))
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc))
, Counters(counters)
, LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor")
{
IngressStats.Level = statsLevel;
IngressStats.Level = args.StatsLevel;
}

virtual ~TKqpStreamLookupActor() {
Expand Down Expand Up @@ -174,6 +173,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku

Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0));
TActorBootstrapped<TKqpStreamLookupActor>::PassAway();

LookupActorSpan.End();
}

i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
Expand Down Expand Up @@ -234,10 +235,15 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath());
if (ev->Get()->Request->ErrorCount > 0) {
return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: "
<< StreamLookupWorker->GetTablePath(), NYql::NDqProto::StatusIds::SCHEME_ERROR);
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
<< StreamLookupWorker->GetTablePath();
LookupActorStateSpan.EndError(errorMsg);

return RuntimeError(errorMsg, NYql::NDqProto::StatusIds::SCHEME_ERROR);
}

LookupActorStateSpan.EndOk();

auto& resultSet = ev->Get()->Request->ResultSet;
YQL_ENSURE(resultSet.size() == 1, "Expected one result for range [NULL, +inf)");
Partitioning = resultSet[0].KeyDescription->Partitioning;
Expand Down Expand Up @@ -342,8 +348,11 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
<< " was resolved: " << !!Partitioning);

if (!Partitioning) {
RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath()
<< " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT);
TString errorMsg = TStringBuilder() << "Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath()
<< " (request timeout exceeded)";
LookupActorStateSpan.EndError(errorMsg);

RuntimeError(errorMsg, NYql::NDqProto::StatusIds::TIMEOUT);
}
}

Expand Down Expand Up @@ -392,7 +401,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC);

Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), shardId, true),
IEventHandle::FlagTrackDelivery);
IEventHandle::FlagTrackDelivery, 0, LookupActorSpan.GetTraceId());

read.State = EReadState::Running;

Expand Down Expand Up @@ -438,6 +447,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));

Counters->IteratorsShardResolve->Inc();
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));

Expand Down Expand Up @@ -467,6 +479,11 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku

NYql::TIssues issues;
issues.AddIssue(std::move(issue));

if (LookupActorSpan) {
LookupActorSpan.EndError(issues.ToOneLineString());
}

Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode));
}

Expand Down Expand Up @@ -495,17 +512,15 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
ui64 ReadBytesCount = 0;

TIntrusivePtr<TKqpCounters> Counters;
NWilson::TSpan LookupActorSpan;
NWilson::TSpan LookupActorStateSpan;
};

} // namespace

std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc,
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args,
NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters) {
auto actor = new TKqpStreamLookupActor(inputIndex, statsLevel, input, computeActorId, typeEnv, holderFactory,
alloc, inputDesc, std::move(settings), counters);
auto actor = new TKqpStreamLookupActor(std::move(args), std::move(settings), counters);
return {actor, actor};
}

Expand Down
5 changes: 1 addition & 4 deletions ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
namespace NKikimr {
namespace NKqp {

std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc,
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args,
NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters>);

} // namespace NKqp
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ namespace NKqp {
void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<TKqpCounters> counters) {
factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [counters](NKikimrKqp::TKqpStreamLookupSettings&& settings,
NYql::NDq::TDqAsyncIoFactory::TInputTransformArguments&& args) {
return CreateStreamLookupActor(args.InputIndex, args.StatsLevel, args.TransformInput, args.ComputeActorId, args.TypeEnv,
args.HolderFactory, args.Alloc, args.InputDesc, std::move(settings), counters);
return CreateStreamLookupActor(std::move(args), std::move(settings), counters);
});
}

Expand Down
64 changes: 63 additions & 1 deletion ydb/core/tx/datashard/datashard_ut_trace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
}

std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) "
", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (RunTasks) , "
", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , "
"(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
"(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
"(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
Expand All @@ -377,6 +377,68 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
}

Y_UNIT_TEST(TestTraceDistributedSelectViaReadActors) {
auto [runtime, server, sender] = TestCreateServer();

CreateShardedTable(server, sender, "/Root", "table-1", 1, false);

FakeWilsonUploader* uploader = new FakeWilsonUploader();
TActorId uploaderId = runtime.Register(uploader, 0);
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
runtime.SimulateSleep(TDuration::Seconds(10));

SplitTable(runtime, server, 5);

ExecSQL(
server,
sender,
"UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);",
true,
Ydb::StatusIds::SUCCESS
);

ExecSQL(
server,
sender,
"UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 100), (4, 300), (6, 500), (8, 700), (10, 900);",
true,
Ydb::StatusIds::SUCCESS
);

NWilson::TTraceId traceId = NWilson::TTraceId::NewTraceId(15, 4095);

ExecSQL(
server,
sender,
"SELECT * FROM `/Root/table-1`;",
true,
Ydb::StatusIds::SUCCESS,
std::move(traceId)
);

uploader->BuildTraceTrees();

UNIT_ASSERT_EQUAL(1, uploader->Traces.size());

FakeWilsonUploader::Trace& trace = uploader->Traces.begin()->second;

auto readActorSpan = trace.Root.BFSFindOne("ReadActor");
UNIT_ASSERT(readActorSpan);

auto dsReads = readActorSpan->get().FindAll("DataShard.Read"); // Read actor sends EvRead to each shard.
UNIT_ASSERT_EQUAL(dsReads.size(), 2);

std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , "
"(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , "
"(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , "
"(DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
"(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) , "
"(ReadIterator.ReadOperation)]) , (DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
"[(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)])])])])])";
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
}

Y_UNIT_TEST(TestTraceWriteImmediateOnShard) {
auto [runtime, server, sender] = TestCreateServer();

Expand Down
6 changes: 6 additions & 0 deletions ydb/library/wilson_ids/wilson.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ namespace NKikimr {
ProposeTransaction = 9,

ComputeActor = 9,

ReadActor = 9,
ReadActorShardsResolve = 10,

LookupActor = 9,
LookupActorShardsResolve = 10,
};
};

Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ struct IDqAsyncIoFactory : public TThrRefBase {
IMemoryQuotaManager::TPtr MemoryQuotaManager;
const google::protobuf::Message* SourceSettings = nullptr; // used only in case if we execute compute actor locally
TIntrusivePtr<NActors::TProtoArenaHolder> Arena; // Arena for SourceSettings
NWilson::TTraceId TraceId;
};

struct TSinkArguments {
Expand Down Expand Up @@ -247,6 +248,7 @@ struct IDqAsyncIoFactory : public TThrRefBase {
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NWilson::TTraceId TraceId;
};

struct TOutputTransformArguments {
Expand Down
6 changes: 4 additions & 2 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1593,7 +1593,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
.Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr,
.MemoryQuotaManager = MemoryLimits.MemoryQuotaManager,
.SourceSettings = (!settings.empty() ? settings.at(inputIndex) : nullptr),
.Arena = Task.GetArena()
.Arena = Task.GetArena(),
.TraceId = ComputeActorSpan.GetTraceId()
});
} catch (const std::exception& ex) {
throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what();
Expand Down Expand Up @@ -1623,7 +1624,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
.ProgramBuilder = *transform.ProgramBuilder,
.Alloc = TaskRunner->GetAllocatorPtr()
.Alloc = TaskRunner->GetAllocatorPtr(),
.TraceId = ComputeActorSpan.GetTraceId()
});
} catch (const std::exception& ex) {
throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what();
Expand Down

0 comments on commit 5650a13

Please sign in to comment.