Skip to content

Commit

Permalink
Streamlookup stable backports #7782 #9758 #9396 #10283 #10489 #10508 #…
Browse files Browse the repository at this point in the history
…7892 #10280 (#10341)

Co-authored-by: dmasloff <74042473+dmasloff@users.noreply.github.com>
Co-authored-by: Hor911 <hor911@ydb.tech>
  • Loading branch information
3 people authored Oct 18, 2024
1 parent 2e6c50e commit 9a9711d
Show file tree
Hide file tree
Showing 20 changed files with 375 additions and 144 deletions.
33 changes: 10 additions & 23 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,41 +212,26 @@ struct IDqAsyncLookupSource {
NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>>
>;
struct TEvLookupRequest: NActors::TEventLocal<TEvLookupRequest, TDqComputeEvents::EvLookupRequest> {
TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& request)
: Alloc(alloc)
, Request(std::move(request))
TEvLookupRequest(std::weak_ptr<TUnboxedValueMap> request)
: Request(std::move(request))
{
}
~TEvLookupRequest() {
auto guard = Guard(*Alloc);
TKeyTypeHelper empty;
Request = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
}
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
TUnboxedValueMap Request;
std::weak_ptr<TUnboxedValueMap> Request;
};

struct TEvLookupResult: NActors::TEventLocal<TEvLookupResult, TDqComputeEvents::EvLookupResult> {
TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& result)
: Alloc(alloc)
, Result(std::move(result))
TEvLookupResult(std::weak_ptr<TUnboxedValueMap> result)
: Result(std::move(result))
{
}
~TEvLookupResult() {
auto guard = Guard(*Alloc.get());
TKeyTypeHelper empty;
Result = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
}

std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
TUnboxedValueMap Result;
std::weak_ptr<TUnboxedValueMap> Result;
};

virtual size_t GetMaxSupportedKeysInRequest() const = 0;
//Initiate lookup for requested keys
//Only one request at a time is allowed. Request must contain no more than GetMaxSupportedKeysInRequest() keys
//Upon completion, results are sent in TEvLookupResult event to the preconfigured actor
virtual void AsyncLookup(TUnboxedValueMap&& request) = 0;
//Upon completion, TEvLookupResult event is sent to the preconfigured actor
virtual void AsyncLookup(std::weak_ptr<TUnboxedValueMap> request) = 0;
protected:
~IDqAsyncLookupSource() {}
};
Expand Down Expand Up @@ -280,6 +265,7 @@ struct IDqAsyncIoFactory : public TThrRefBase {
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> KeyTypeHelper;
NActors::TActorId ParentId;
::NMonitoring::TDynamicCounterPtr TaskCounters;
google::protobuf::Any LookupSource; //provider specific data source
const NKikimr::NMiniKQL::TStructType* KeyType;
const NKikimr::NMiniKQL::TStructType* PayloadType;
Expand Down Expand Up @@ -312,6 +298,7 @@ struct IDqAsyncIoFactory : public TThrRefBase {
const THashMap<TString, TString>& SecureParams;
const THashMap<TString, TString>& TaskParams;
const NActors::TActorId& ComputeActorId;
::NMonitoring::TDynamicCounterPtr TaskCounters;
const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
Expand Down
45 changes: 38 additions & 7 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
MkqlMemoryQuota = taskCounters->GetCounter("MkqlMemoryQuota");
OutputChannelSize = taskCounters->GetCounter("OutputChannelSize");
SourceCpuTimeMs = taskCounters->GetCounter("SourceCpuTimeMs", true);
InputTransformCpuTimeMs = taskCounters->GetCounter("InputTransformCpuTimeMs", true);
}
}

Expand Down Expand Up @@ -1308,6 +1309,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
.SecureParams = secureParams,
.TaskParams = taskParams,
.ComputeActorId = this->SelfId(),
.TaskCounters = TaskCounters,
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
.Alloc = Alloc,
Expand Down Expand Up @@ -1397,11 +1399,20 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>

void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {
Y_ABORT_UNLESS(SourcesMap.FindPtr(ev->Get()->InputIndex) || InputTransformsMap.FindPtr(ev->Get()->InputIndex));
auto cpuTimeDelta = TakeSourceCpuTimeDelta();
if (SourceCpuTimeMs) {
SourceCpuTimeMs->Add(cpuTimeDelta.MilliSeconds());
{
auto cpuTimeDelta = TakeSourceCpuTimeDelta();
if (SourceCpuTimeMs) {
SourceCpuTimeMs->Add(cpuTimeDelta.MilliSeconds());
}
CpuTimeSpent += cpuTimeDelta;
}
{
auto cpuTimeDelta = TakeInputTransformCpuTimeDelta();
if (InputTransformCpuTimeMs) {
InputTransformCpuTimeMs->Add(cpuTimeDelta.MilliSeconds());
}
CpuTimeSpent += cpuTimeDelta;
}
CpuTimeSpent += cpuTimeDelta;
ContinueExecute(EResumeSource::CANewAsyncInput);
}

Expand Down Expand Up @@ -1596,6 +1607,21 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
return result;
}

TDuration GetInputTransformCpuTime() const {
auto result = TDuration::Zero();
for (auto& [inputIndex, sourceInfo] : InputTransformsMap) {
result += sourceInfo.AsyncInput->GetCpuTime();
}
return result;
}

TDuration TakeInputTransformCpuTimeDelta() {
auto newInputTransformCpuTime = GetInputTransformCpuTime();
auto result = newInputTransformCpuTime - InputTransformCpuTime;
InputTransformCpuTime = newInputTransformCpuTime;
return result;
}

void FillStats(NDqProto::TDqComputeActorStats* dst, bool last) {
if (RuntimeSettings.CollectNone()) {
return;
Expand All @@ -1605,7 +1631,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
ReportEventElapsedTime();
}

dst->SetCpuTimeUs(CpuTime.MicroSeconds());
dst->SetCpuTimeUs(CpuTime.MicroSeconds() + SourceCpuTime.MicroSeconds() + InputTransformCpuTime.MicroSeconds());
dst->SetMaxMemoryUsage(MemoryLimits.MemoryQuotaManager->GetMaxMemorySize());

if (auto memProfileStats = GetMemoryProfileStats(); memProfileStats) {
Expand Down Expand Up @@ -1638,10 +1664,13 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}
FillTaskRunnerStats(Task.GetId(), Task.GetStageId(), *taskStats, protoTask, RuntimeSettings.GetCollectStatsLevel());

// More accurate cpu time counter:
auto cpuTimeUs = taskStats->ComputeCpuTime.MicroSeconds() + taskStats->BuildCpuTime.MicroSeconds();
if (TDerived::HasAsyncTaskRunner) {
protoTask->SetCpuTimeUs(CpuTime.MicroSeconds() + taskStats->ComputeCpuTime.MicroSeconds() + taskStats->BuildCpuTime.MicroSeconds());
// Async TR is another actor, summarize CPU usage
cpuTimeUs += CpuTime.MicroSeconds();
}
// CpuTimeUs does include SourceCpuTime
protoTask->SetCpuTimeUs(cpuTimeUs + SourceCpuTime.MicroSeconds() + InputTransformCpuTime.MicroSeconds());
protoTask->SetSourceCpuTimeUs(SourceCpuTime.MicroSeconds());

ui64 ingressBytes = 0;
Expand Down Expand Up @@ -1901,6 +1930,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
TDqComputeActorMetrics MetricsReporter;
NWilson::TSpan ComputeActorSpan;
TDuration SourceCpuTime;
TDuration InputTransformCpuTime;
private:
bool Running = true;
TInstant LastSendStatsTime;
Expand All @@ -1910,6 +1940,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryQuota;
::NMonitoring::TDynamicCounters::TCounterPtr OutputChannelSize;
::NMonitoring::TDynamicCounters::TCounterPtr SourceCpuTimeMs;
::NMonitoring::TDynamicCounters::TCounterPtr InputTransformCpuTimeMs;
THolder<NYql::TCounters> Stat;
TDuration CpuTimeSpent;
};
Expand Down
Loading

0 comments on commit 9a9711d

Please sign in to comment.